diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py
index 168df24..1c7701d 100644
--- a/autosubmit_api/autosubmit_legacy/job/job_list.py
+++ b/autosubmit_api/autosubmit_legacy/job/job_list.py
@@ -586,7 +586,7 @@ def get_job_times_collection(basic_config: APIBasicConfig, allJobs, expid, job_t
job_data = None
try:
experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history()
- job_data = experiment_history.manager.get_all_last_job_data_dcs() if experiment_history.is_header_ready() else None
+ job_data = experiment_history.manager.get_all_last_job_data_dcs()
except Exception:
print(traceback.print_exc())
# Result variables
diff --git a/autosubmit_api/builders/experiment_history_builder.py b/autosubmit_api/builders/experiment_history_builder.py
index eebf502..8275cd1 100644
--- a/autosubmit_api/builders/experiment_history_builder.py
+++ b/autosubmit_api/builders/experiment_history_builder.py
@@ -15,10 +15,6 @@ def __init__(self, expid: str):
def generate_experiment_history_db_manager(self):
pass
- @abstractmethod
- def initialize_experiment_history_db_manager(self):
- pass
-
@abstractmethod
def generate_logger(self):
pass
@@ -35,11 +31,6 @@ def generate_experiment_history_db_manager(self):
self._validate_basic_config()
self.experiment_history_db_manager = ExperimentHistoryDbManager(self.expid, self.basic_config)
- def initialize_experiment_history_db_manager(self):
- if not self.experiment_history_db_manager:
- raise Exception("Experiment Database Manager is missing")
- self.experiment_history_db_manager.initialize()
-
def generate_logger(self):
self._validate_basic_config()
self.logger = Logging(self.expid, self.basic_config)
@@ -48,9 +39,6 @@ def make_experiment_history(self) -> ExperimentHistory:
self._validate_basic_config()
if not self.experiment_history_db_manager:
raise Exception("Experiment Database Manager is missing")
- else:
- if not self.experiment_history_db_manager.my_database_exists():
- raise Exception("Job/Runs database does not exist")
if not self.logger:
raise Exception("Logging is missing.")
return ExperimentHistory(self.expid, self.basic_config, self.experiment_history_db_manager, self.logger)
@@ -59,17 +47,6 @@ class ExperimentHistoryDirector(object):
def __init__(self, builder: Builder):
self.builder = builder
- def build_current_experiment_history(self, basic_config: Optional[APIBasicConfig] = None) -> ExperimentHistory:
- """ Builds ExperimentHistory updated to current version. """
- if basic_config:
- self.builder.set_basic_config(basic_config)
- else:
- self.builder.generate_basic_config()
- self.builder.generate_experiment_history_db_manager()
- self.builder.initialize_experiment_history_db_manager()
- self.builder.generate_logger()
- return self.builder.make_experiment_history()
-
def build_reader_experiment_history(self, basic_config: Optional[APIBasicConfig] = None) -> ExperimentHistory:
""" Buids ExperimentHistory that doesn't update to current version automatically. """
if basic_config:
diff --git a/autosubmit_api/experiment/common_requests.py b/autosubmit_api/experiment/common_requests.py
index a42e5fe..d7ea7f5 100644
--- a/autosubmit_api/experiment/common_requests.py
+++ b/autosubmit_api/experiment/common_requests.py
@@ -235,7 +235,6 @@ def get_experiment_data(expid: str) -> Dict[str, Any]:
if experiment_run and experiment_run.total > 0:
result["total_jobs"] = experiment_run.total
result["completed_jobs"] = experiment_run.completed
- result["db_historic_version"] = experiment_history.manager.db_version
except Exception as exc:
logger.warning((traceback.format_exc()))
logger.warning(
diff --git a/autosubmit_api/history/database_managers/database_manager.py b/autosubmit_api/history/database_managers/database_manager.py
deleted file mode 100644
index b8755a8..0000000
--- a/autosubmit_api/history/database_managers/database_manager.py
+++ /dev/null
@@ -1,97 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2015-2020 Earth Sciences Department, BSC-CNS
-# This file is part of Autosubmit.
-
-# Autosubmit is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# Autosubmit is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with Autosubmit. If not, see .
-
-import sqlite3
-import os
-from typing import Tuple, List
-from autosubmit_api.history import utils as HUtils
-from autosubmit_api.history.database_managers import database_models as Models
-from autosubmit_api.config.basicConfig import APIBasicConfig
-from abc import ABCMeta
-
-class DatabaseManager(metaclass=ABCMeta):
- """ Simple database manager. Needs expid. """
- AS_TIMES_DB_NAME = "as_times.db" # default AS_TIMES location
- ECEARTH_DB_NAME = "ecearth.db" # default EC_EARTH_DB_NAME location
- def __init__(self, expid: str, basic_config: APIBasicConfig):
- self.expid = expid
- self.JOBDATA_DIR = basic_config.JOBDATA_DIR
- self.LOCAL_ROOT_DIR = basic_config.LOCAL_ROOT_DIR
- self.db_version = Models.DatabaseVersion.NO_DATABASE.value
-
- def get_connection(self, path: str) -> sqlite3.Connection:
- """
- Create a database connection to the SQLite database specified by path.
- :param path: database file name
- :return: Connection object or None
- """
- if not os.path.exists(path):
- self._create_database_file(path)
- return sqlite3.connect(path)
-
- def _create_database_file(self, path: str):
- """ creates a database files with full permissions """
- os.umask(0)
- os.open(path, os.O_WRONLY | os.O_CREAT, 0o776)
-
- def execute_statement_on_dbfile(self, path: str, statement: str):
- """ Executes a statement on a database file specified by path. """
- conn = self.get_connection(path)
- cursor = conn.cursor()
- cursor.execute(statement)
- conn.commit()
- conn.close()
-
- def execute_many_statements_on_dbfile(self, path: str, statements: List[str]) -> None:
- """
- Updates the table schema using a **small** list of statements. No Exception raised.
- Should be used to execute a list of schema updates that might have been already applied.
- """
- for statement in statements:
- try:
- self.execute_statement_on_dbfile(path, statement)
- except Exception:
- pass
-
- def get_from_statement(self, path: str, statement: str) -> List[Tuple]:
- """ Get the rows from a statement with no arguments """
- conn = self.get_connection(path)
- conn.text_factory = str
- cursor = conn.cursor()
- cursor.execute(statement)
- statement_rows = cursor.fetchall()
- conn.close()
- return statement_rows
-
- def get_from_statement_with_arguments(self, path: str, statement: str, arguments: Tuple) -> List[Tuple]:
- """ Get the rows from a statement with arguments """
- conn = self.get_connection(path)
- conn.text_factory = str
- cursor = conn.cursor()
- cursor.execute(statement, arguments)
- statement_rows = cursor.fetchall()
- conn.close()
- return statement_rows
-
- def get_built_select_statement(self, table_name: str, conditions: str = None) -> str:
- """ Build and return a SELECT statement with the same fields as the model. Requires that the table is associated with a model (namedtuple). """
- model = Models.get_correct_model_for_table_and_version(table_name, self.db_version) # Models.table_name_to_model[table_name]
- if conditions:
- return "SELECT {0} FROM {1} WHERE {2}".format(HUtils.get_fields_as_comma_str(model), table_name, conditions)
- else:
- return "SELECT {0} FROM {1}".format(HUtils.get_fields_as_comma_str(model), table_name)
diff --git a/autosubmit_api/history/database_managers/experiment_history_db_manager.py b/autosubmit_api/history/database_managers/experiment_history_db_manager.py
index efc40ce..840ef01 100644
--- a/autosubmit_api/history/database_managers/experiment_history_db_manager.py
+++ b/autosubmit_api/history/database_managers/experiment_history_db_manager.py
@@ -15,160 +15,32 @@
# You should have received a copy of the GNU General Public License
# along with Autosubmit. If not, see .
-import os
-import textwrap
-from autosubmit_api.persistance.experiment import ExperimentPaths
from autosubmit_api.history.database_managers import database_models as Models
from autosubmit_api.history.data_classes.job_data import JobData
from autosubmit_api.history.data_classes.experiment_run import ExperimentRun
from autosubmit_api.config.basicConfig import APIBasicConfig
-from autosubmit_api.history.database_managers.database_manager import DatabaseManager
from typing import List, Optional
from collections import namedtuple
-class ExperimentHistoryDbManager(DatabaseManager):
+from autosubmit_api.repositories.experiment_run import create_experiment_run_repository
+from autosubmit_api.repositories.job_data import create_experiment_job_data_repository
+
+class ExperimentHistoryDbManager:
""" Manages actions directly on the database.
"""
def __init__(self, expid: str, basic_config: APIBasicConfig):
- """ Requires expid and jobdata_dir_path. """
- super(ExperimentHistoryDbManager, self).__init__(expid, basic_config)
- self._set_schema_changes()
- self._set_table_queries()
- exp_paths = ExperimentPaths(expid)
- self.historicaldb_file_path = exp_paths.job_data_db
- if self.my_database_exists():
- self.set_db_version_models()
-
- def initialize(self):
- """ Check if database exists. Updates to current version if necessary. """
- if self.my_database_exists():
- if not self.is_current_version():
- self.update_historical_database()
- else:
- self.create_historical_database()
+ """ Requires expid """
+ self.expid = expid
self.set_db_version_models()
- def set_db_version_models(self):
- self.db_version = self._get_pragma_version()
- self.experiment_run_row_model = Models.get_experiment_row_model(self.db_version)
- self.job_data_row_model = Models.get_job_data_row_model(self.db_version)
-
- def my_database_exists(self):
- return os.path.exists(self.historicaldb_file_path)
-
- def is_header_ready_db_version(self):
- if self.my_database_exists():
- return self._get_pragma_version() >= Models.DatabaseVersion.EXPERIMENT_HEADER_SCHEMA_CHANGES.value
- return False
-
- def is_current_version(self):
- if self.my_database_exists():
- return self._get_pragma_version() == Models.DatabaseVersion.CURRENT_DB_VERSION.value
- return False
-
- def _set_table_queries(self):
- """ Sets basic table queries. """
- self.create_table_header_query = textwrap.dedent(
- '''CREATE TABLE
- IF NOT EXISTS experiment_run (
- run_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- created TEXT NOT NULL,
- modified TEXT NOT NULL,
- start INTEGER NOT NULL,
- finish INTEGER,
- chunk_unit TEXT NOT NULL,
- chunk_size INTEGER NOT NULL,
- completed INTEGER NOT NULL,
- total INTEGER NOT NULL,
- failed INTEGER NOT NULL,
- queuing INTEGER NOT NULL,
- running INTEGER NOT NULL,
- submitted INTEGER NOT NULL,
- suspended INTEGER NOT NULL DEFAULT 0,
- metadata TEXT
- );
- ''')
- self.create_table_query = textwrap.dedent(
- '''CREATE TABLE
- IF NOT EXISTS job_data (
- id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
- counter INTEGER NOT NULL,
- job_name TEXT NOT NULL,
- created TEXT NOT NULL,
- modified TEXT NOT NULL,
- submit INTEGER NOT NULL,
- start INTEGER NOT NULL,
- finish INTEGER NOT NULL,
- status TEXT NOT NULL,
- rowtype INTEGER NOT NULL,
- ncpus INTEGER NOT NULL,
- wallclock TEXT NOT NULL,
- qos TEXT NOT NULL,
- energy INTEGER NOT NULL,
- date TEXT NOT NULL,
- section TEXT NOT NULL,
- member TEXT NOT NULL,
- chunk INTEGER NOT NULL,
- last INTEGER NOT NULL,
- platform TEXT NOT NULL,
- job_id INTEGER NOT NULL,
- extra_data TEXT NOT NULL,
- nnodes INTEGER NOT NULL DEFAULT 0,
- run_id INTEGER,
- MaxRSS REAL NOT NULL DEFAULT 0.0,
- AveRSS REAL NOT NULL DEFAULT 0.0,
- out TEXT NOT NULL,
- err TEXT NOT NULL,
- rowstatus INTEGER NOT NULL DEFAULT 0,
- children TEXT,
- platform_output TEXT,
- UNIQUE(counter,job_name)
- );
- ''')
- self.create_index_query = textwrap.dedent('''
- CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name);
- ''')
-
- def _set_schema_changes(self):
- """ Creates the list of schema changes"""
- self.version_schema_changes = [
- "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0",
- "ALTER TABLE job_data ADD COLUMN run_id INTEGER"
- ]
- # Version 15
- self.version_schema_changes.extend([
- "ALTER TABLE job_data ADD COLUMN MaxRSS REAL NOT NULL DEFAULT 0.0",
- "ALTER TABLE job_data ADD COLUMN AveRSS REAL NOT NULL DEFAULT 0.0",
- "ALTER TABLE job_data ADD COLUMN out TEXT NOT NULL DEFAULT ''",
- "ALTER TABLE job_data ADD COLUMN err TEXT NOT NULL DEFAULT ''",
- "ALTER TABLE job_data ADD COLUMN rowstatus INTEGER NOT NULL DEFAULT 0",
- "ALTER TABLE experiment_run ADD COLUMN suspended INTEGER NOT NULL DEFAULT 0",
- "ALTER TABLE experiment_run ADD COLUMN metadata TEXT"
- ])
- # Version 16
- self.version_schema_changes.extend([
- "ALTER TABLE experiment_run ADD COLUMN modified TEXT"
- ])
- # Version 17
- self.version_schema_changes.extend([
- "ALTER TABLE job_data ADD COLUMN children TEXT",
- "ALTER TABLE job_data ADD COLUMN platform_output TEXT"
- ])
-
- def create_historical_database(self):
- """ Creates the historical database with the latest changes. """
- self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_header_query)
- self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_query)
- self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_index_query)
- self._set_historical_pragma_version(Models.DatabaseVersion.CURRENT_DB_VERSION.value)
+ self.runs_repo = create_experiment_run_repository(expid)
+ self.jobs_repo = create_experiment_job_data_repository(expid)
- def update_historical_database(self):
- """ Updates the historical database with the latest changes IF necessary. """
- self.execute_many_statements_on_dbfile(self.historicaldb_file_path, self.version_schema_changes)
- self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_index_query)
- self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_header_query)
- self._set_historical_pragma_version(Models.DatabaseVersion.CURRENT_DB_VERSION.value)
+ def set_db_version_models(self):
+ # From version 3.13.0 of Autosubmit, latest model is used.
+ self.experiment_run_row_model = Models.ExperimentRunRow
+ self.job_data_row_model = Models.JobDataRow
def get_experiment_run_dc_with_max_id(self):
""" Get Current (latest) ExperimentRun data class. """
@@ -176,11 +48,8 @@ def get_experiment_run_dc_with_max_id(self):
def _get_experiment_run_with_max_id(self):
""" Get Models.ExperimentRunRow for the maximum id run. """
- statement = self.get_built_select_statement("experiment_run", "run_id > 0 ORDER BY run_id DESC LIMIT 0, 1")
- max_experiment_run = self.get_from_statement(self.historicaldb_file_path, statement)
- if len(max_experiment_run) == 0:
- raise Exception("No Experiment Runs registered.")
- return self.experiment_run_row_model(*max_experiment_run[0])
+ max_experiment_run = self.runs_repo.get_last_run()
+ return self.experiment_run_row_model(**(max_experiment_run.model_dump()))
def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]:
if run_id:
@@ -188,31 +57,31 @@ def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]:
return None
def _get_experiment_run_by_id(self, run_id: int) -> namedtuple:
- statement = self.get_built_select_statement("experiment_run", "run_id=?")
- arguments = (run_id,)
- experiment_run = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
- if len(experiment_run) == 0:
- raise Exception("Experiment run {0} for experiment {1} does not exists.".format(run_id, self.expid))
- return self.experiment_run_row_model(*experiment_run[0])
+ experiment_run = self.runs_repo.get_run_by_id(run_id)
+ return self.experiment_run_row_model(**(experiment_run.model_dump()))
def get_experiment_runs_dcs(self) -> List[ExperimentRun]:
experiment_run_rows = self._get_experiment_runs()
return [ExperimentRun.from_model(row) for row in experiment_run_rows]
def _get_experiment_runs(self) -> List[namedtuple]:
- statement = self.get_built_select_statement("experiment_run")
- experiment_runs = self.get_from_statement(self.historicaldb_file_path, statement)
- return [self.experiment_run_row_model(*row) for row in experiment_runs]
+ experiment_runs = self.runs_repo.get_all()
+ return [
+ self.experiment_run_row_model(**(run.model_dump()))
+ for run in experiment_runs
+ ]
def get_job_data_dcs_all(self) -> List[JobData]:
""" Gets all content from job_data ordered by id (from table). """
- return [JobData.from_model(row) for row in self.get_job_data_all()]
+ return [JobData.from_model(row) for row in self._get_job_data_all()]
- def get_job_data_all(self):
+ def _get_job_data_all(self):
""" Gets all content from job_data as list of Models.JobDataRow from database. """
- statement = self.get_built_select_statement("job_data", "id > 0 ORDER BY id")
- job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement)
- return [self.job_data_row_model(*row) for row in job_data_rows]
+ job_data_rows = self.jobs_repo.get_all()
+ return [
+ self.job_data_row_model(**(job_data.model_dump()))
+ for job_data in job_data_rows
+ ]
def get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id: int) -> List[JobData]:
if not run_id or package_code <= Models.RowType.NORMAL:
@@ -223,10 +92,11 @@ def get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id:
return [JobData.from_model(row) for row in job_data_rows]
def _get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id: int) -> List[namedtuple]:
- statement = self.get_built_select_statement("job_data", "run_id=? and rowtype=? and status=? ORDER BY id")
- arguments = (run_id, package_code, "COMPLETED")
- job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
- return [self.job_data_row_model(*row) for row in job_data_rows]
+ job_data_rows = self.jobs_repo.get_job_data_COMPLETED_by_rowtype_run_id(package_code, run_id)
+ return [
+ self.job_data_row_model(**(job_data.model_dump()))
+ for job_data in job_data_rows
+ ]
def get_job_data_dcs_COMPLETED_by_section(self, section: str) -> List[JobData]:
# arguments = {"status": "COMPLETED", "section": section}
@@ -234,10 +104,11 @@ def get_job_data_dcs_COMPLETED_by_section(self, section: str) -> List[JobData]:
return [JobData.from_model(row) for row in job_data_rows]
def _get_job_data_COMPLETD_by_section(self, section):
- statement = self.get_built_select_statement("job_data", "status=? and (section=? or member=?) ORDER BY id")
- arguments = ("COMPLETED", section, section)
- job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
- return [self.job_data_row_model(*row) for row in job_data_rows]
+ job_data_rows = self.jobs_repo.get_job_data_COMPLETD_by_section(section)
+ return [
+ self.job_data_row_model(**(job_data.model_dump()))
+ for job_data in job_data_rows
+ ]
def get_all_last_job_data_dcs(self):
""" Gets JobData data classes in job_data for last=1. """
@@ -246,9 +117,11 @@ def get_all_last_job_data_dcs(self):
def _get_all_last_job_data_rows(self):
""" Get List of Models.JobDataRow for last=1. """
- statement = self.get_built_select_statement("job_data", "last=1 and rowtype >= 2")
- job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement)
- return [self.job_data_row_model(*row) for row in job_data_rows]
+ job_data_rows = self.jobs_repo.get_last_job_data()
+ return [
+ self.job_data_row_model(**(job_data.model_dump()))
+ for job_data in job_data_rows
+ ]
def get_job_data_dcs_by_name(self, job_name: str) -> List[JobData]:
job_data_rows = self._get_job_data_by_name(job_name)
@@ -256,20 +129,8 @@ def get_job_data_dcs_by_name(self, job_name: str) -> List[JobData]:
def _get_job_data_by_name(self, job_name: str) -> List[namedtuple]:
""" Get List of Models.JobDataRow for job_name """
- statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC")
- arguments = (job_name,)
- job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
- return [self.job_data_row_model(*row) for row in job_data_rows]
-
- def _set_historical_pragma_version(self, version=10):
- """ Sets the pragma version. """
- statement = "pragma user_version={v:d};".format(v=version)
- self.execute_statement_on_dbfile(self.historicaldb_file_path, statement)
-
- def _get_pragma_version(self) -> int:
- """ Gets current pragma version as int. """
- statement = "pragma user_version;"
- pragma_result = self.get_from_statement(self.historicaldb_file_path, statement)
- if len(pragma_result) <= 0:
- raise Exception("Error while getting the pragma version. This might be a signal of a deeper problem. Review previous errors.")
- return int(Models.PragmaVersion(*pragma_result[0]).version)
+ job_data_rows = self.jobs_repo.get_jobs_by_name(job_name)
+ return [
+ self.job_data_row_model(**(job_data.model_dump()))
+ for job_data in job_data_rows
+ ]
diff --git a/autosubmit_api/history/experiment_history.py b/autosubmit_api/history/experiment_history.py
index 635be98..4b77a81 100644
--- a/autosubmit_api/history/experiment_history.py
+++ b/autosubmit_api/history/experiment_history.py
@@ -40,11 +40,6 @@ def __init__(self, expid: str, basic_config: APIBasicConfig, experiment_history_
self._log.log(str(exp), traceback.format_exc())
self.manager = None
- def is_header_ready(self):
- if self.manager:
- return self.manager.is_header_ready_db_version()
- return False
-
def get_historic_job_data(self, job_name: str) -> List[Dict[str, Any]]:
result = []
all_job_data_dcs = self.manager.get_job_data_dcs_by_name(job_name)
diff --git a/autosubmit_api/repositories/job_data.py b/autosubmit_api/repositories/job_data.py
index 72cac36..1efe086 100644
--- a/autosubmit_api/repositories/job_data.py
+++ b/autosubmit_api/repositories/job_data.py
@@ -39,6 +39,7 @@ class ExperimentJobDataModel(BaseModel):
err: Any
rowstatus: Any
children: Any
+ platform_output: Any
class ExperimentJobDataRepository(ABC):
diff --git a/tests/test_history.py b/tests/test_history.py
new file mode 100644
index 0000000..987fd4f
--- /dev/null
+++ b/tests/test_history.py
@@ -0,0 +1,231 @@
+import pytest
+from typing import Any, Dict, Tuple
+from autosubmit_api.config.basicConfig import APIBasicConfig
+from autosubmit_api.history.database_managers.experiment_history_db_manager import (
+ ExperimentHistoryDbManager,
+)
+
+
+@pytest.mark.parametrize(
+ "expid, expected_last_run, expected_number_of_runs",
+ [
+ (
+ "a003",
+ {
+ "run_id": 3,
+ "created": "2024-01-12-16:37:15",
+ "modified": "2024-01-12-16:39:06",
+ "start": 1705073835,
+ "finish": 1705073946,
+ "chunk_unit": "month",
+ "chunk_size": 4,
+ "submitted": 0,
+ "queuing": 0,
+ "running": 0,
+ "completed": 8,
+ "failed": 0,
+ "total": 8,
+ "suspended": 0,
+ },
+ 3,
+ ),
+ (
+ "a1ve",
+ {
+ "run_id": 1,
+ "created": "2024-12-12-15:14:44",
+ "modified": "2024-12-12-15:16:29",
+ "start": 1734012884,
+ "finish": 1734012989,
+ "chunk_unit": "month",
+ "chunk_size": 4,
+ "submitted": 0,
+ "queuing": 0,
+ "running": 0,
+ "completed": 8,
+ "failed": 0,
+ "total": 8,
+ "suspended": 0,
+ },
+ 1,
+ ),
+ (
+ "a3tb",
+ {
+ "run_id": 51,
+ "created": "2022-03-15-13:33:39",
+ "modified": "2022-03-15-16:12:50",
+ "start": 1647347619,
+ "finish": 1647352313,
+ "chunk_unit": "month",
+ "chunk_size": 1,
+ "submitted": 0,
+ "queuing": 1,
+ "running": 0,
+ "completed": 28,
+ "failed": 0,
+ "total": 55,
+ "suspended": 2,
+ },
+ 51,
+ ),
+ (
+ "a6zj",
+ {
+ "run_id": 1,
+ "created": "2024-04-11-16:53:56",
+ "modified": "2024-04-11-16:53:56",
+ "start": 1712847236,
+ "finish": 0,
+ "chunk_unit": "month",
+ "chunk_size": 4,
+ "submitted": 0,
+ "queuing": 0,
+ "running": 0,
+ "completed": 0,
+ "failed": 0,
+ "total": 10,
+ "suspended": 0,
+ },
+ 1,
+ ),
+ ],
+)
+def test_runs_retrievals(
+ fixture_mock_basic_config: APIBasicConfig,
+ expid: str,
+ expected_last_run: Dict[str, Any],
+ expected_number_of_runs: int,
+):
+ history_db_manager = ExperimentHistoryDbManager(expid, fixture_mock_basic_config)
+
+ # Get the last run and the selected run
+ last_run = history_db_manager.get_experiment_run_dc_with_max_id()
+ selected_run = history_db_manager.get_experiment_run_by_id(last_run.run_id)
+
+ # Check if the last run and the selected run are the same as the expected run
+ assert last_run.run_id == expected_last_run["run_id"] == selected_run.run_id
+ assert last_run.created == expected_last_run["created"] == selected_run.created
+ assert last_run.modified == expected_last_run["modified"] == selected_run.modified
+ assert last_run.start == expected_last_run["start"] == selected_run.start
+ assert last_run.finish == expected_last_run["finish"] == selected_run.finish
+ assert (
+ last_run.chunk_unit
+ == expected_last_run["chunk_unit"]
+ == selected_run.chunk_unit
+ )
+ assert (
+ last_run.chunk_size
+ == expected_last_run["chunk_size"]
+ == selected_run.chunk_size
+ )
+ assert (
+ last_run.submitted == expected_last_run["submitted"] == selected_run.submitted
+ )
+ assert last_run.queuing == expected_last_run["queuing"] == selected_run.queuing
+ assert last_run.running == expected_last_run["running"] == selected_run.running
+ assert (
+ last_run.completed == expected_last_run["completed"] == selected_run.completed
+ )
+ assert last_run.failed == expected_last_run["failed"] == selected_run.failed
+ assert last_run.total == expected_last_run["total"] == selected_run.total
+ assert (
+ last_run.suspended == expected_last_run["suspended"] == selected_run.suspended
+ )
+
+ # Get all runs
+ runs = history_db_manager.get_experiment_runs_dcs()
+
+ # Check if the last run is in the list of runs
+ assert last_run.run_id in [run.run_id for run in runs]
+
+ # Check size of the list of runs
+ assert len(runs) == expected_number_of_runs
+
+
+@pytest.mark.parametrize(
+ "expid, expected_number_job_rows, expected_number_last_job_rows, expected_completed_per_wrapper, expected_completed_per_section, expected_number_job_rows_by_job_name",
+ [
+ (
+ "a003",
+ 16,
+ 8,
+ {},
+ {"LOCAL_SETUP": 2, "SIM": 4},
+ {"a003_REMOTE_SETUP": 2, "a003_20220401_fc0_1_SIM": 2},
+ ),
+ (
+ "a1ve",
+ 16,
+ 8,
+ {},
+ {"INI": 1, "SIM": 1},
+ {"a1ve_SIM": 2, "a1ve_1_ASIM": 2, "a1ve_2_ASIM": 2},
+ ),
+ (
+ "a3tb",
+ 284,
+ 162,
+ {(3, 3): 0},
+ {"LOCAL_SETUP": 20, "SIM": 30},
+ {"a3tb_LOCAL_SETUP": 20, "a3tb_20000101_fc01_1_SIM": 1},
+ ),
+ ("a6zj", 0, 0, {}, {}, {}),
+ ],
+)
+def test_jobs_retrievals(
+ fixture_mock_basic_config: APIBasicConfig,
+ expid: str,
+ expected_number_job_rows: int,
+ expected_completed_per_wrapper: Dict[Tuple[int, int], int],
+ expected_completed_per_section: Dict[str, int],
+ expected_number_last_job_rows: int,
+ expected_number_job_rows_by_job_name: Dict[str, int],
+):
+ history_db_manager = ExperimentHistoryDbManager(expid, fixture_mock_basic_config)
+
+ # Get all jobs
+ all_jobs = history_db_manager.get_job_data_dcs_all()
+
+ # Check size of the list of jobs
+ assert len(all_jobs) == expected_number_job_rows
+
+ # Get all completed jobs per wrapper run id
+ for (
+ package_code,
+ run_id,
+ ), expected_completed in expected_completed_per_wrapper.items():
+ assert package_code > 2
+ completed_jobs_per_wrapper = (
+ history_db_manager.get_job_data_dc_COMPLETED_by_wrapper_run_id(
+ package_code, run_id
+ )
+ )
+
+ # Check size of the list of completed jobs per wrapper run id
+ assert len(completed_jobs_per_wrapper) == expected_completed
+
+ # Get all completed jobs per section
+ for section, expected_completed in expected_completed_per_section.items():
+ completed_jobs_per_section = (
+ history_db_manager.get_job_data_dcs_COMPLETED_by_section(section)
+ )
+
+ # Check size of the list of completed jobs per section
+ assert len(completed_jobs_per_section) == expected_completed
+
+ # Get all last jobs
+ last_jobs = history_db_manager.get_all_last_job_data_dcs()
+
+ # Check size of the list of last jobs
+ assert len(last_jobs) == expected_number_last_job_rows
+
+ # Get all job rows by job name
+ for (
+ job_name,
+ expected_number_job_rows,
+ ) in expected_number_job_rows_by_job_name.items():
+ job_rows = history_db_manager.get_job_data_dcs_by_name(job_name)
+
+ # Check size of the list of job rows by job name
+ assert len(job_rows) == expected_number_job_rows