From 1109e241c86ac5d459c4bdaa69595c6fd03e5f58 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Wed, 15 Jan 2025 23:54:14 +0100 Subject: [PATCH] update details background task --- autosubmit_api/bgtasks/bgtask.py | 12 --- autosubmit_api/bgtasks/scheduler.py | 2 +- .../bgtasks/tasks/details_updater.py | 88 +++++++++++++++++++ .../repositories/experiment_details.py | 39 +++++++- .../repositories/experiment_status.py | 7 +- .../workers/populate_details/__init__.py | 0 .../workers/populate_details/populate.py | 86 ------------------ autosubmit_api/workers/populate_details_db.py | 2 +- tests/test_bg_tasks.py | 4 +- 9 files changed, 133 insertions(+), 107 deletions(-) create mode 100644 autosubmit_api/bgtasks/tasks/details_updater.py delete mode 100644 autosubmit_api/workers/populate_details/__init__.py delete mode 100644 autosubmit_api/workers/populate_details/populate.py diff --git a/autosubmit_api/bgtasks/bgtask.py b/autosubmit_api/bgtasks/bgtask.py index a23aa27e..2b220452 100644 --- a/autosubmit_api/bgtasks/bgtask.py +++ b/autosubmit_api/bgtasks/bgtask.py @@ -1,9 +1,7 @@ from abc import ABC, abstractmethod import traceback from autosubmit_api.logger import logger -from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.workers.business import process_graph_drawings -from autosubmit_api.workers.populate_details.populate import DetailsProcessor class BackgroundTaskTemplate(ABC): @@ -41,16 +39,6 @@ def trigger_options(self) -> dict: raise NotImplementedError -class PopulateDetailsDB(BackgroundTaskTemplate): - id = "TASK_POPDET" - trigger_options = {"trigger": "interval", "hours": 4} - - @classmethod - def procedure(cls): - APIBasicConfig.read() - return DetailsProcessor(APIBasicConfig).process() - - class PopulateGraph(BackgroundTaskTemplate): id = "TASK_POPGRPH" trigger_options = {"trigger": "interval", "hours": 24} diff --git a/autosubmit_api/bgtasks/scheduler.py b/autosubmit_api/bgtasks/scheduler.py index d7fa3752..af182b3b 100644 --- a/autosubmit_api/bgtasks/scheduler.py +++ b/autosubmit_api/bgtasks/scheduler.py @@ -4,9 +4,9 @@ from apscheduler.triggers.cron import CronTrigger from autosubmit_api.bgtasks.bgtask import ( BackgroundTaskTemplate, - PopulateDetailsDB, PopulateGraph, ) +from autosubmit_api.bgtasks.tasks.details_updater import PopulateDetailsDB from autosubmit_api.bgtasks.tasks.status_updater import StatusUpdater from autosubmit_api.config import ( get_disable_background_tasks, diff --git a/autosubmit_api/bgtasks/tasks/details_updater.py b/autosubmit_api/bgtasks/tasks/details_updater.py new file mode 100644 index 00000000..c75d6d35 --- /dev/null +++ b/autosubmit_api/bgtasks/tasks/details_updater.py @@ -0,0 +1,88 @@ +from typing import List +from autosubmit_api.bgtasks.bgtask import BackgroundTaskTemplate +from autosubmit_api.builders.configuration_facade_builder import ( + AutosubmitConfigurationFacadeBuilder, + ConfigurationFacadeDirector, +) +from autosubmit_api.repositories.experiment import ( + ExperimentModel, + create_experiment_repository, +) +from autosubmit_api.repositories.experiment_details import ( + create_experiment_details_repository, + ExperimentDetailsModel, +) + + +class PopulateDetailsDB(BackgroundTaskTemplate): + id = "TASK_POPDET" + trigger_options = {"trigger": "interval", "hours": 4} + + @classmethod + def procedure(cls): + """ + Procedure to populate the details table + """ + + # Get experiments + experiments = cls._get_experiments() + + rows_added = 0 + + for experiment in experiments: + try: + # Get details data from the experiment + details_data = cls._build_details_data_from_experiment( + experiment.id, experiment.name + ) + + # Insert details data into the details table + rows_added += cls._upsert_details_data(details_data) + except Exception as exc: + cls.logger.error( + f"[{cls.id}] Error while processing experiment {experiment.name}: {exc}" + ) + + return rows_added + + @classmethod + def _get_experiments(cls) -> List[ExperimentModel]: + """ + Get the experiments list + """ + experiment_repository = create_experiment_repository() + return experiment_repository.get_all() + + @classmethod + def _build_details_data_from_experiment( + self, exp_id: int, expid: str + ) -> ExperimentDetailsModel: + """ + Build the details data from the experiment + """ + autosubmit_config = ConfigurationFacadeDirector( + AutosubmitConfigurationFacadeBuilder(expid) + ).build_autosubmit_configuration_facade() + return ExperimentDetailsModel( + exp_id=exp_id, + user=autosubmit_config.get_owner_name(), + created=autosubmit_config.get_experiment_created_time_as_datetime(), + model=autosubmit_config.get_model(), + branch=autosubmit_config.get_branch(), + hpc=autosubmit_config.get_main_platform(), + ) + + @classmethod + def _upsert_details_data(cls, details_data: ExperimentDetailsModel): + """ + Insert or update details data into the details table + """ + details_repository = create_experiment_details_repository() + return details_repository.upsert_details( + details_data.exp_id, + details_data.user, + details_data.created, + details_data.model, + details_data.branch, + details_data.hpc, + ) diff --git a/autosubmit_api/repositories/experiment_details.py b/autosubmit_api/repositories/experiment_details.py index 22efadc7..14792feb 100644 --- a/autosubmit_api/repositories/experiment_details.py +++ b/autosubmit_api/repositories/experiment_details.py @@ -38,6 +38,22 @@ def get_by_exp_id(self, exp_id: int) -> ExperimentDetailsModel: :raises ValueError: If the experiment detail is not found """ + @abstractmethod + def upsert_details( + self, exp_id: int, user: str, created: str, model: str, branch: str, hpc: str + ) -> int: + """ + Upsert the experiment details + + :param exp_id: The numerical experiment id + :param user: The user who created the experiment + :param created: The creation date of the experiment + :param model: The model used in the experiment + :param branch: The branch of the model + :param hpc: The HPC used in the experiment + :return: The number of rows affected + """ + class ExperimentDetailsSQLRepository(ExperimentDetailsRepository): def __init__(self, engine: Engine, table: Table): @@ -58,7 +74,7 @@ def delete_all(self) -> int: conn.commit() return result.rowcount - def get_by_exp_id(self, exp_id): + def get_by_exp_id(self, exp_id: int): with self.engine.connect() as conn: statement = self.table.select().where(self.table.c.exp_id == exp_id) result = conn.execute(statement).first() @@ -73,6 +89,27 @@ def get_by_exp_id(self, exp_id): hpc=result.hpc, ) + def upsert_details(self, exp_id, user, created, model, branch, hpc): + with self.engine.connect() as conn: + with conn.begin(): + try: + del_stmnt = self.table.delete().where(self.table.c.exp_id == exp_id) + ins_stmnt = self.table.insert().values( + exp_id=exp_id, + user=user, + created=created, + model=model, + branch=branch, + hpc=hpc, + ) + conn.execute(del_stmnt) + result = conn.execute(ins_stmnt) + conn.commit() + return result.rowcount + except Exception as exc: + conn.rollback() + raise exc + def create_experiment_details_repository() -> ExperimentDetailsRepository: engine = create_autosubmit_db_engine() diff --git a/autosubmit_api/repositories/experiment_status.py b/autosubmit_api/repositories/experiment_status.py index 4eb545ab..85e735d1 100644 --- a/autosubmit_api/repositories/experiment_status.py +++ b/autosubmit_api/repositories/experiment_status.py @@ -44,7 +44,7 @@ def get_only_running_expids(self) -> List[str]: """ Gets list of running experiments expids """ - + @abstractmethod def delete_all(self) -> int: """ @@ -99,18 +99,17 @@ def upsert_status(self, exp_id: int, expid: str, status: str): conn.execute(del_stmnt) result = conn.execute(ins_stmnt) conn.commit() + return result.rowcount except Exception as exc: conn.rollback() raise exc - return result.rowcount - def get_only_running_expids(self): with self.engine.connect() as conn: statement = self.table.select().where(self.table.c.status == "RUNNING") result = conn.execute(statement).all() return [row.name for row in result] - + def delete_all(self): with self.engine.connect() as conn: statement = delete(self.table) diff --git a/autosubmit_api/workers/populate_details/__init__.py b/autosubmit_api/workers/populate_details/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/autosubmit_api/workers/populate_details/populate.py b/autosubmit_api/workers/populate_details/populate.py deleted file mode 100644 index 0fed6100..00000000 --- a/autosubmit_api/workers/populate_details/populate.py +++ /dev/null @@ -1,86 +0,0 @@ -from autosubmit_api.logger import logger -from autosubmit_api.builders.configuration_facade_builder import ( - ConfigurationFacadeDirector, - AutosubmitConfigurationFacadeBuilder, -) -from autosubmit_api.config.basicConfig import APIBasicConfig -from collections import namedtuple -from typing import List - -from autosubmit_api.repositories.experiment import create_experiment_repository -from autosubmit_api.repositories.experiment_details import ( - create_experiment_details_repository, -) - - -ExperimentDetails = namedtuple( - "ExperimentDetails", ["owner", "created", "model", "branch", "hpc"] -) -Experiment = namedtuple("Experiment", ["id", "name"]) - - -class DetailsProcessor: - def __init__(self, basic_config: APIBasicConfig): - self.basic_config = basic_config - # self.main_db_engine = create_autosubmit_db_engine() - self.experiment_db = create_experiment_repository() - self.details_db = create_experiment_details_repository() - - def process(self): - new_details = self._get_all_details() - self._clean_table() - return self._insert_many_into_details_table(new_details) - - def _get_experiments(self) -> List[Experiment]: - experiments = [] - query_result = self.experiment_db.get_all() - - for exp in query_result: - experiments.append(Experiment(exp.id, exp.name)) - - return experiments - - def _get_details_data_from_experiment(self, expid: str) -> ExperimentDetails: - autosubmit_config = ConfigurationFacadeDirector( - AutosubmitConfigurationFacadeBuilder(expid) - ).build_autosubmit_configuration_facade(self.basic_config) - return ExperimentDetails( - autosubmit_config.get_owner_name(), - autosubmit_config.get_experiment_created_time_as_datetime(), - autosubmit_config.get_model(), - autosubmit_config.get_branch(), - autosubmit_config.get_main_platform(), - ) - - def _get_all_details(self) -> List[dict]: - experiments = self._get_experiments() - result = [] - exp_ids = set() - for experiment in experiments: - try: - detail = self._get_details_data_from_experiment(experiment.name) - if experiment.id not in exp_ids: - result.append( - { - "exp_id": experiment.id, - "user": detail.owner, - "created": detail.created, - "model": detail.model, - "branch": detail.branch, - "hpc": detail.hpc, - } - ) - exp_ids.add(experiment.id) - except Exception as exc: - logger.warning( - ("Error on experiment {}: {}".format(experiment.name, str(exc))) - ) - return result - - def _insert_many_into_details_table(self, values: List[dict]) -> int: - rowcount = self.details_db.insert_many(values) - return rowcount - - def _clean_table(self) -> int: - rowcount = self.details_db.delete_all() - return rowcount diff --git a/autosubmit_api/workers/populate_details_db.py b/autosubmit_api/workers/populate_details_db.py index 16b06e02..45922e5a 100644 --- a/autosubmit_api/workers/populate_details_db.py +++ b/autosubmit_api/workers/populate_details_db.py @@ -1,4 +1,4 @@ -from autosubmit_api.bgtasks.bgtask import PopulateDetailsDB +from autosubmit_api.bgtasks.tasks.details_updater import PopulateDetailsDB def main(): PopulateDetailsDB.run() diff --git a/tests/test_bg_tasks.py b/tests/test_bg_tasks.py index 5afa17c1..9ab9f838 100644 --- a/tests/test_bg_tasks.py +++ b/tests/test_bg_tasks.py @@ -1,10 +1,10 @@ +from autosubmit_api.bgtasks.tasks.details_updater import PopulateDetailsDB from autosubmit_api.config.basicConfig import APIBasicConfig from autosubmit_api.database import tables from autosubmit_api.database.common import create_autosubmit_db_engine from autosubmit_api.repositories.experiment_details import ( create_experiment_details_repository, ) -from autosubmit_api.workers.populate_details.populate import DetailsProcessor class TestDetailsPopulate: @@ -16,7 +16,7 @@ def test_process(self, fixture_mock_basic_config: APIBasicConfig): rows = conn.execute(tables.details_table.select()).all() assert len(rows) == 0 - count = DetailsProcessor(fixture_mock_basic_config).process() + count = PopulateDetailsDB.procedure() rows = conn.execute(tables.details_table.select()).all()