Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fred3m committed Jul 8, 2024
1 parent c807bc5 commit 6a3c3f1
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 23 deletions.
51 changes: 51 additions & 0 deletions doc/lsst.rubintv.analysis.service/design.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
.. _rubintv_analysis_service-design:

=====================================
Design of rubintv_analysis_service
=====================================

.. contents:: Table of Contents
:depth: 2

Overview
========

The ``rubintv_analysis_service`` is a backend Python service designed to support the Derived Data Visualization (DDV) tool within the Rubin Observatory's software ecosystem. It provides a set of libraries and scripts that facilitate the analysis and visualization of astronomical data.

Architecture
============

The service is structured around a series of commands and tasks, each responsible for a specific aspect of data processing and visualization. Key components include:

- **Worker Script**: A script that initializes and runs the service, handling configuration and database connections.

- [`rubintv_worker.py`](rubintv_analysis_service/scripts/rubintv_worker.py)

The script is designed to be run on a worker POD that is part of a Kubernetes cluster. It is responsible for initializing the service, loading configuration, and connecting to the Butler and consDB. It listens for incoming commands from the web application, executes them, and returns the results.

There is also a [`mock server`](rubintv_analysis_service/scripts/mock_server.py) that can be used for testing the service before being built on either the USDF or summit.

- **Commands**: Modular operations that perform specific tasks, such as loading columns, detector images, and detector information. These are implemented in various Python modules within the ``commands`` directory, for example the[`db.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/db.py) module contains commands for loading information from the consolidated database (consDB), while the [`image.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/image.py) module contains commands for loading detector images (not yet implemented), and [`butler.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/butler.py) contains commands for loading data from a Butler repository.

All commands derive from the `BaseCommand` class, which provides a common interface for command execution. All inherited classes are required to have parameters as keyword arguments, and implement the `BaseCommand.build_contents` method. This is done to separate the different steps in processing a command:
1. Reading the JSON command and converting it into a python dictionary.
2. Parsing the command and converting it from JSON into a `BaseCommand` instance.
3. Executing the command.
4. Packaging the results of the command into a JSON response and sending it to the rubintv web application.

The `BaseCommand.build_contents` method is called during execution, and must return the result as a `dict` that will be converted into JSON and returned to the user.

Configuration
=============

Configuration for the service is managed through the following YAML files, allowing for flexible deployment and customization of the service's behavior:

- **config.yaml**: Main configuration file specifying service parameters.
- **joins.yaml**: Configuration for database joins.

Configuration options can be overwritten using commad line arguments, which are parsed using the `argparse` module.

Dart/Flutter Frontend
=====================

The frontend of the DDV tool is implemented using the Dart programming language and the Flutter framework. It provides a web-based interface for users to interact with the service, submit commands, and visualize the results, and is located at https://github.com/lsst-ts/rubintv_visualization, which is built on top of [`rubin_chart`](https://github.com/lsst-sitcom/rubin_chart), an open source plotting library in flutter also written by the project.
5 changes: 4 additions & 1 deletion doc/lsst.rubintv.analysis.service/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
lsst.rubintv.analysis.service
#############################

.. Paragraph that describes what this Python module does and links to related modules and frameworks.
This is the backend python service to run the Derived Data Visualization (DDV) tool.


.. _lsst.rubintv.analysis.service-using:

Expand All @@ -18,6 +19,8 @@ toctree linking to topics related to using the module's APIs.
.. toctree::
:maxdepth: 2

design

.. _lsst.rubintv.analysis.service-contributing:

Contributing
Expand Down
30 changes: 19 additions & 11 deletions python/lsst/rubintv/analysis/service/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import json
import logging
import traceback
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING
Expand All @@ -34,7 +35,7 @@
logger = logging.getLogger("lsst.rubintv.analysis.service.command")


def construct_error_message(error_name: str, description: str) -> str:
def construct_error_message(error_name: str, description: str, traceback: str) -> str:
"""Use a standard format for all error messages.
Parameters
Expand All @@ -55,12 +56,13 @@ def construct_error_message(error_name: str, description: str) -> str:
"content": {
"error": error_name,
"description": description,
"traceback": traceback,
},
}
)


def error_msg(error: Exception) -> str:
def error_msg(error: Exception, traceback: str) -> str:
"""Handle errors received while parsing or executing a command.
Parameters
Expand All @@ -76,23 +78,23 @@ def error_msg(error: Exception) -> str:
"""
if isinstance(error, json.decoder.JSONDecodeError):
return construct_error_message("JSON decoder error", error.args[0])
return construct_error_message("JSON decoder error", error.args[0], traceback)

if isinstance(error, CommandParsingError):
return construct_error_message("parsing error", error.args[0])
return construct_error_message("parsing error", error.args[0], traceback)

if isinstance(error, CommandExecutionError):
return construct_error_message("execution error", error.args[0])
return construct_error_message("execution error", error.args[0], traceback)

if isinstance(error, CommandResponseError):
return construct_error_message("command response error", error.args[0])
return construct_error_message("command response error", error.args[0], traceback)

# We should always receive one of the above errors, so the code should
# never get to here. But we generate this response just in case something
# very unexpected happens, or (more likely) the code is altered in such a
# way that this line is it.
msg = "An unknown error occurred, you should never reach this message."
return construct_error_message(error.__class__.__name__, msg)
return construct_error_message(error.__class__.__name__, msg, traceback)


class CommandParsingError(Exception):
Expand Down Expand Up @@ -201,7 +203,8 @@ def execute_command(command_str: str, data_center: DataCenter) -> str:
raise CommandParsingError(f"Could not generate a valid command from {command_str}")
except Exception as err:
logging.exception("Error converting command to JSON.")
return error_msg(err)
traceback_string = traceback.format_exc()
return error_msg(err, traceback_string)

try:
if "name" not in command_dict.keys():
Expand All @@ -215,13 +218,15 @@ def execute_command(command_str: str, data_center: DataCenter) -> str:

except Exception as err:
logging.exception(f"Error parsing command {command_dict}")
return error_msg(CommandParsingError(f"'{err}' error while parsing command"))
traceback_string = traceback.format_exc()
return error_msg(CommandParsingError(f"'{err}' error while parsing command"), traceback_string)

try:
command.execute(data_center)
except Exception as err:
logging.exception(f"Error executing command {command_dict}")
return error_msg(CommandExecutionError(f"{err} error executing command."))
traceback_string = traceback.format_exc()
return error_msg(CommandExecutionError(f"{err} error executing command."), traceback_string)

try:
if "requestId" in command_dict:
Expand All @@ -230,6 +235,9 @@ def execute_command(command_str: str, data_center: DataCenter) -> str:
result = command.to_json()
except Exception as err:
logging.exception("Error converting command response to JSON.")
return error_msg(CommandResponseError(f"{err} error converting command response to JSON."))
traceback_string = traceback.format_exc()
return error_msg(
CommandResponseError(f"{err} error converting command response to JSON."), traceback_string
)

return result
8 changes: 7 additions & 1 deletion python/lsst/rubintv/analysis/service/commands/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@

@dataclass(kw_only=True)
class LoadDetectorImageCommand(BaseCommand):
"""Load an image from a data center."""
"""Load an image from a data center.
This command is not yet implemented, but will use the
`viewer.py` module, adapted from `https://github.com/fred3m/toyz`
to load image tiles and send them to the client to display
detector images.
"""

database: str
detector: int
Expand Down
89 changes: 79 additions & 10 deletions python/lsst/rubintv/analysis/service/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,24 @@
logger = logging.getLogger("lsst.rubintv.analysis.service.database")


# Exposure tables currently in the schema
exposure_tables = [
"exposure",
"ccdexposure",
"ccdexposure_camera",
]

# Tables in the schema for single visit exposures
visit1_tables = [
"visit1",
"visit1_quicklook",
"ccdvisit1",
"ccdvisit1_quicklook",
]

# Flex tables in the schema.
# These are currently not implement and would take some thought implmenting
# correctly, so we ignore them for now.
flex_tables = [
"exposure_flexdata",
"exposure_flexdata_schema",
Expand Down Expand Up @@ -80,13 +85,37 @@ def get_table_schema(schema: dict, table: str) -> dict:
raise UnrecognizedTableError("Could not find the table '{table}' in database")


class EnhancedJoinBuilder:
class JoinError(Exception):
"""An error that occurs when a join cannot be made between two tables"""

pass


class JoinBuilder:
"""Builds joins between tables in sqlalchemy.
Using a dictionary of joins, usually from the joins.yaml file,
this class builds a graph of joins between tables so that given a
list of tables it can create a join that connects all the tables.
Attributes
----------
tables :
A dictionary of tables in the schema.
joins :
A list of inner joins between tables. Each item in the list should
have a ``matches`` key with another dictionary as values.
The values will have the names of the tables being joined as keys
and a list of columns to join on as values.
"""

def __init__(self, tables: dict[str, sqlalchemy.Table], joins: list[dict]):
self.tables = tables
self.joins = joins
self.join_graph = self._build_join_graph()

def _build_join_graph(self) -> dict[str, dict[str, list[str]]]:
"""Create the graph of joins from the list of joins."""
graph = {table: {} for table in self.tables}
for join in self.joins:
tables = list(join["matches"].keys())
Expand All @@ -97,6 +126,24 @@ def _build_join_graph(self) -> dict[str, dict[str, list[str]]]:
return graph

def _find_join_path(self, start: str, end: str) -> list[str]:
"""Find a path between two tables in the join graph.
In some cases, such as between vist1 and ccdvisit1_quicklook,
this might require intermediary joins.
Parameters
----------
start :
The name of the table to start the join from.
end :
The name of the table to join to.
Returns
-------
result :
A list of tables that can be joined to get from the
first table to the last table.
"""
queue = [(start, [start])]
visited = set()

Expand All @@ -109,28 +156,41 @@ def _find_join_path(self, start: str, end: str) -> list[str]:
for neighbor in self.join_graph[node]:
if neighbor not in visited:
queue.append((neighbor, path + [neighbor]))
return []
raise JoinError(f"No path found between {start} and {end}")

def build_join(self, table_names: set[str]) -> sqlalchemy.Table | sqlalchemy.Join:
"""Build a join between all of the tables in a SQL statement.
Parameters
----------
table_names :
A set of table names to join.
Returns
-------
result :
The join between all of the tables.
"""
tables = list(table_names)
select_from = self.tables[tables[0]]
# Use the first table as the starting point
joined_tables = set([tables[0]])
logger.info(f"Starting join with table: {tables[0]}")
logger.info(f"all tables: {tables}")

for i in range(1, len(tables)):
# Move to the next table
current_table = tables[i]
if current_table in joined_tables:
logger.info(f"Skipping {current_table} as it's already joined")
continue

# find the join path from the first table to the current table
join_path = self._find_join_path(tables[0], current_table)
logger.info(f"Join path from {tables[0]} to {current_table}: {join_path}")

if not join_path:
raise ValueError(f"No join path found between {tables[0]} and {current_table}")

for j in range(1, len(join_path)):
# Join all of the tables in the join_path
t1, t2 = join_path[j - 1], join_path[j]
if t2 in joined_tables:
logger.info(f"Skipping {t2} as it's already joined")
Expand All @@ -152,6 +212,7 @@ def build_join(self, table_names: set[str]) -> sqlalchemy.Table | sqlalchemy.Joi
if not join_conditions:
raise ValueError(f"No valid join conditions found between {t1} and {t2}")

# Implement the join in sqlalchemy
select_from = sqlalchemy.join(select_from, self.tables[t2], *join_conditions)
joined_tables.add(t2)

Expand All @@ -170,14 +231,14 @@ class ConsDbSchema:
metadata :
The metadata for the database.
joins :
A dictionary of joins between tables.
A JoinBuilder object that builds joins between tables.
"""

engine: sqlalchemy.engine.Engine
schema: dict
metadata: sqlalchemy.MetaData
tables: dict[str, sqlalchemy.Table]
joins: EnhancedJoinBuilder
joins: JoinBuilder

def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_templates: list):
self.engine = engine
Expand All @@ -201,7 +262,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template
schema=schema["name"],
)

self.joins = EnhancedJoinBuilder(self.tables, join_templates)
self.joins = JoinBuilder(self.tables, join_templates)

def get_table_names(self) -> tuple[str, ...]:
"""Given a schema, return a list of dataset names
Expand Down Expand Up @@ -262,8 +323,14 @@ def get_column(self, column: str) -> sqlalchemy.Column:
return self.tables[table].columns[column]

def fetch_data(self, query_model: sqlalchemy.Select) -> dict[str, list]:
logger.info(f"Query: {query_model}")
"""Load data from the database.
Parameters
----------
query_model :
The query to run on the database.
"""
logger.info(f"Query: {query_model}")
connection = self.engine.connect()
result = connection.execute(query_model)
data = result.fetchall()
Expand Down Expand Up @@ -333,11 +400,13 @@ def query(
A query used on the table.
If `query` is ``None`` then all the rows
in the query are returned.
data_ids :
The data IDs to query, in the format ``(day_obs, seq_num)``.
Returns
-------
result :
A list of the rows that were returned by the query.
A dictionary of columns as keys and lists of values as values.
"""
# Get the models for the columns
table_columns, table_names, data_id_columns = self.get_column_models(columns)
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/rubintv/analysis/service/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

# This module was adapted from https://github.com/fred3m/toyz and has not yet
# been tested.

from __future__ import annotations

import datetime
Expand Down

0 comments on commit 6a3c3f1

Please sign in to comment.