From efebfb930ce324ccacf5713abd5668a721b83308 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Tue, 17 Dec 2024 10:32:43 +0100 Subject: [PATCH 1/4] remove usused code from builder --- .../builders/experiment_history_builder.py | 20 --- .../database_managers/database_manager.py | 19 --- .../experiment_history_db_manager.py | 125 ------------------ 3 files changed, 164 deletions(-) diff --git a/autosubmit_api/builders/experiment_history_builder.py b/autosubmit_api/builders/experiment_history_builder.py index eebf5024..def7b02f 100644 --- a/autosubmit_api/builders/experiment_history_builder.py +++ b/autosubmit_api/builders/experiment_history_builder.py @@ -15,10 +15,6 @@ def __init__(self, expid: str): def generate_experiment_history_db_manager(self): pass - @abstractmethod - def initialize_experiment_history_db_manager(self): - pass - @abstractmethod def generate_logger(self): pass @@ -35,11 +31,6 @@ def generate_experiment_history_db_manager(self): self._validate_basic_config() self.experiment_history_db_manager = ExperimentHistoryDbManager(self.expid, self.basic_config) - def initialize_experiment_history_db_manager(self): - if not self.experiment_history_db_manager: - raise Exception("Experiment Database Manager is missing") - self.experiment_history_db_manager.initialize() - def generate_logger(self): self._validate_basic_config() self.logger = Logging(self.expid, self.basic_config) @@ -59,17 +50,6 @@ class ExperimentHistoryDirector(object): def __init__(self, builder: Builder): self.builder = builder - def build_current_experiment_history(self, basic_config: Optional[APIBasicConfig] = None) -> ExperimentHistory: - """ Builds ExperimentHistory updated to current version. """ - if basic_config: - self.builder.set_basic_config(basic_config) - else: - self.builder.generate_basic_config() - self.builder.generate_experiment_history_db_manager() - self.builder.initialize_experiment_history_db_manager() - self.builder.generate_logger() - return self.builder.make_experiment_history() - def build_reader_experiment_history(self, basic_config: Optional[APIBasicConfig] = None) -> ExperimentHistory: """ Buids ExperimentHistory that doesn't update to current version automatically. """ if basic_config: diff --git a/autosubmit_api/history/database_managers/database_manager.py b/autosubmit_api/history/database_managers/database_manager.py index b8755a8f..28c75827 100644 --- a/autosubmit_api/history/database_managers/database_manager.py +++ b/autosubmit_api/history/database_managers/database_manager.py @@ -49,25 +49,6 @@ def _create_database_file(self, path: str): os.umask(0) os.open(path, os.O_WRONLY | os.O_CREAT, 0o776) - def execute_statement_on_dbfile(self, path: str, statement: str): - """ Executes a statement on a database file specified by path. """ - conn = self.get_connection(path) - cursor = conn.cursor() - cursor.execute(statement) - conn.commit() - conn.close() - - def execute_many_statements_on_dbfile(self, path: str, statements: List[str]) -> None: - """ - Updates the table schema using a **small** list of statements. No Exception raised. - Should be used to execute a list of schema updates that might have been already applied. - """ - for statement in statements: - try: - self.execute_statement_on_dbfile(path, statement) - except Exception: - pass - def get_from_statement(self, path: str, statement: str) -> List[Tuple]: """ Get the rows from a statement with no arguments """ conn = self.get_connection(path) diff --git a/autosubmit_api/history/database_managers/experiment_history_db_manager.py b/autosubmit_api/history/database_managers/experiment_history_db_manager.py index efc40ce5..10b4e50d 100644 --- a/autosubmit_api/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit_api/history/database_managers/experiment_history_db_manager.py @@ -16,7 +16,6 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . import os -import textwrap from autosubmit_api.persistance.experiment import ExperimentPaths from autosubmit_api.history.database_managers import database_models as Models @@ -33,22 +32,11 @@ class ExperimentHistoryDbManager(DatabaseManager): def __init__(self, expid: str, basic_config: APIBasicConfig): """ Requires expid and jobdata_dir_path. """ super(ExperimentHistoryDbManager, self).__init__(expid, basic_config) - self._set_schema_changes() - self._set_table_queries() exp_paths = ExperimentPaths(expid) self.historicaldb_file_path = exp_paths.job_data_db if self.my_database_exists(): self.set_db_version_models() - def initialize(self): - """ Check if database exists. Updates to current version if necessary. """ - if self.my_database_exists(): - if not self.is_current_version(): - self.update_historical_database() - else: - self.create_historical_database() - self.set_db_version_models() - def set_db_version_models(self): self.db_version = self._get_pragma_version() self.experiment_run_row_model = Models.get_experiment_row_model(self.db_version) @@ -62,114 +50,6 @@ def is_header_ready_db_version(self): return self._get_pragma_version() >= Models.DatabaseVersion.EXPERIMENT_HEADER_SCHEMA_CHANGES.value return False - def is_current_version(self): - if self.my_database_exists(): - return self._get_pragma_version() == Models.DatabaseVersion.CURRENT_DB_VERSION.value - return False - - def _set_table_queries(self): - """ Sets basic table queries. """ - self.create_table_header_query = textwrap.dedent( - '''CREATE TABLE - IF NOT EXISTS experiment_run ( - run_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - created TEXT NOT NULL, - modified TEXT NOT NULL, - start INTEGER NOT NULL, - finish INTEGER, - chunk_unit TEXT NOT NULL, - chunk_size INTEGER NOT NULL, - completed INTEGER NOT NULL, - total INTEGER NOT NULL, - failed INTEGER NOT NULL, - queuing INTEGER NOT NULL, - running INTEGER NOT NULL, - submitted INTEGER NOT NULL, - suspended INTEGER NOT NULL DEFAULT 0, - metadata TEXT - ); - ''') - self.create_table_query = textwrap.dedent( - '''CREATE TABLE - IF NOT EXISTS job_data ( - id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, - counter INTEGER NOT NULL, - job_name TEXT NOT NULL, - created TEXT NOT NULL, - modified TEXT NOT NULL, - submit INTEGER NOT NULL, - start INTEGER NOT NULL, - finish INTEGER NOT NULL, - status TEXT NOT NULL, - rowtype INTEGER NOT NULL, - ncpus INTEGER NOT NULL, - wallclock TEXT NOT NULL, - qos TEXT NOT NULL, - energy INTEGER NOT NULL, - date TEXT NOT NULL, - section TEXT NOT NULL, - member TEXT NOT NULL, - chunk INTEGER NOT NULL, - last INTEGER NOT NULL, - platform TEXT NOT NULL, - job_id INTEGER NOT NULL, - extra_data TEXT NOT NULL, - nnodes INTEGER NOT NULL DEFAULT 0, - run_id INTEGER, - MaxRSS REAL NOT NULL DEFAULT 0.0, - AveRSS REAL NOT NULL DEFAULT 0.0, - out TEXT NOT NULL, - err TEXT NOT NULL, - rowstatus INTEGER NOT NULL DEFAULT 0, - children TEXT, - platform_output TEXT, - UNIQUE(counter,job_name) - ); - ''') - self.create_index_query = textwrap.dedent(''' - CREATE INDEX IF NOT EXISTS ID_JOB_NAME ON job_data(job_name); - ''') - - def _set_schema_changes(self): - """ Creates the list of schema changes""" - self.version_schema_changes = [ - "ALTER TABLE job_data ADD COLUMN nnodes INTEGER NOT NULL DEFAULT 0", - "ALTER TABLE job_data ADD COLUMN run_id INTEGER" - ] - # Version 15 - self.version_schema_changes.extend([ - "ALTER TABLE job_data ADD COLUMN MaxRSS REAL NOT NULL DEFAULT 0.0", - "ALTER TABLE job_data ADD COLUMN AveRSS REAL NOT NULL DEFAULT 0.0", - "ALTER TABLE job_data ADD COLUMN out TEXT NOT NULL DEFAULT ''", - "ALTER TABLE job_data ADD COLUMN err TEXT NOT NULL DEFAULT ''", - "ALTER TABLE job_data ADD COLUMN rowstatus INTEGER NOT NULL DEFAULT 0", - "ALTER TABLE experiment_run ADD COLUMN suspended INTEGER NOT NULL DEFAULT 0", - "ALTER TABLE experiment_run ADD COLUMN metadata TEXT" - ]) - # Version 16 - self.version_schema_changes.extend([ - "ALTER TABLE experiment_run ADD COLUMN modified TEXT" - ]) - # Version 17 - self.version_schema_changes.extend([ - "ALTER TABLE job_data ADD COLUMN children TEXT", - "ALTER TABLE job_data ADD COLUMN platform_output TEXT" - ]) - - def create_historical_database(self): - """ Creates the historical database with the latest changes. """ - self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_header_query) - self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_query) - self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_index_query) - self._set_historical_pragma_version(Models.DatabaseVersion.CURRENT_DB_VERSION.value) - - def update_historical_database(self): - """ Updates the historical database with the latest changes IF necessary. """ - self.execute_many_statements_on_dbfile(self.historicaldb_file_path, self.version_schema_changes) - self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_index_query) - self.execute_statement_on_dbfile(self.historicaldb_file_path, self.create_table_header_query) - self._set_historical_pragma_version(Models.DatabaseVersion.CURRENT_DB_VERSION.value) - def get_experiment_run_dc_with_max_id(self): """ Get Current (latest) ExperimentRun data class. """ return ExperimentRun.from_model(self._get_experiment_run_with_max_id()) @@ -261,11 +141,6 @@ def _get_job_data_by_name(self, job_name: str) -> List[namedtuple]: job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) return [self.job_data_row_model(*row) for row in job_data_rows] - def _set_historical_pragma_version(self, version=10): - """ Sets the pragma version. """ - statement = "pragma user_version={v:d};".format(v=version) - self.execute_statement_on_dbfile(self.historicaldb_file_path, statement) - def _get_pragma_version(self) -> int: """ Gets current pragma version as int. """ statement = "pragma user_version;" From d22643396d526d50a2a7914febe61d1f10b53fd6 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Wed, 18 Dec 2024 21:40:29 +0100 Subject: [PATCH 2/4] add tests before refactor --- .../experiment_history_db_manager.py | 4 +- tests/test_history.py | 231 ++++++++++++++++++ 2 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 tests/test_history.py diff --git a/autosubmit_api/history/database_managers/experiment_history_db_manager.py b/autosubmit_api/history/database_managers/experiment_history_db_manager.py index 10b4e50d..98286222 100644 --- a/autosubmit_api/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit_api/history/database_managers/experiment_history_db_manager.py @@ -86,9 +86,9 @@ def _get_experiment_runs(self) -> List[namedtuple]: def get_job_data_dcs_all(self) -> List[JobData]: """ Gets all content from job_data ordered by id (from table). """ - return [JobData.from_model(row) for row in self.get_job_data_all()] + return [JobData.from_model(row) for row in self._get_job_data_all()] - def get_job_data_all(self): + def _get_job_data_all(self): """ Gets all content from job_data as list of Models.JobDataRow from database. """ statement = self.get_built_select_statement("job_data", "id > 0 ORDER BY id") job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement) diff --git a/tests/test_history.py b/tests/test_history.py new file mode 100644 index 00000000..987fd4fb --- /dev/null +++ b/tests/test_history.py @@ -0,0 +1,231 @@ +import pytest +from typing import Any, Dict, Tuple +from autosubmit_api.config.basicConfig import APIBasicConfig +from autosubmit_api.history.database_managers.experiment_history_db_manager import ( + ExperimentHistoryDbManager, +) + + +@pytest.mark.parametrize( + "expid, expected_last_run, expected_number_of_runs", + [ + ( + "a003", + { + "run_id": 3, + "created": "2024-01-12-16:37:15", + "modified": "2024-01-12-16:39:06", + "start": 1705073835, + "finish": 1705073946, + "chunk_unit": "month", + "chunk_size": 4, + "submitted": 0, + "queuing": 0, + "running": 0, + "completed": 8, + "failed": 0, + "total": 8, + "suspended": 0, + }, + 3, + ), + ( + "a1ve", + { + "run_id": 1, + "created": "2024-12-12-15:14:44", + "modified": "2024-12-12-15:16:29", + "start": 1734012884, + "finish": 1734012989, + "chunk_unit": "month", + "chunk_size": 4, + "submitted": 0, + "queuing": 0, + "running": 0, + "completed": 8, + "failed": 0, + "total": 8, + "suspended": 0, + }, + 1, + ), + ( + "a3tb", + { + "run_id": 51, + "created": "2022-03-15-13:33:39", + "modified": "2022-03-15-16:12:50", + "start": 1647347619, + "finish": 1647352313, + "chunk_unit": "month", + "chunk_size": 1, + "submitted": 0, + "queuing": 1, + "running": 0, + "completed": 28, + "failed": 0, + "total": 55, + "suspended": 2, + }, + 51, + ), + ( + "a6zj", + { + "run_id": 1, + "created": "2024-04-11-16:53:56", + "modified": "2024-04-11-16:53:56", + "start": 1712847236, + "finish": 0, + "chunk_unit": "month", + "chunk_size": 4, + "submitted": 0, + "queuing": 0, + "running": 0, + "completed": 0, + "failed": 0, + "total": 10, + "suspended": 0, + }, + 1, + ), + ], +) +def test_runs_retrievals( + fixture_mock_basic_config: APIBasicConfig, + expid: str, + expected_last_run: Dict[str, Any], + expected_number_of_runs: int, +): + history_db_manager = ExperimentHistoryDbManager(expid, fixture_mock_basic_config) + + # Get the last run and the selected run + last_run = history_db_manager.get_experiment_run_dc_with_max_id() + selected_run = history_db_manager.get_experiment_run_by_id(last_run.run_id) + + # Check if the last run and the selected run are the same as the expected run + assert last_run.run_id == expected_last_run["run_id"] == selected_run.run_id + assert last_run.created == expected_last_run["created"] == selected_run.created + assert last_run.modified == expected_last_run["modified"] == selected_run.modified + assert last_run.start == expected_last_run["start"] == selected_run.start + assert last_run.finish == expected_last_run["finish"] == selected_run.finish + assert ( + last_run.chunk_unit + == expected_last_run["chunk_unit"] + == selected_run.chunk_unit + ) + assert ( + last_run.chunk_size + == expected_last_run["chunk_size"] + == selected_run.chunk_size + ) + assert ( + last_run.submitted == expected_last_run["submitted"] == selected_run.submitted + ) + assert last_run.queuing == expected_last_run["queuing"] == selected_run.queuing + assert last_run.running == expected_last_run["running"] == selected_run.running + assert ( + last_run.completed == expected_last_run["completed"] == selected_run.completed + ) + assert last_run.failed == expected_last_run["failed"] == selected_run.failed + assert last_run.total == expected_last_run["total"] == selected_run.total + assert ( + last_run.suspended == expected_last_run["suspended"] == selected_run.suspended + ) + + # Get all runs + runs = history_db_manager.get_experiment_runs_dcs() + + # Check if the last run is in the list of runs + assert last_run.run_id in [run.run_id for run in runs] + + # Check size of the list of runs + assert len(runs) == expected_number_of_runs + + +@pytest.mark.parametrize( + "expid, expected_number_job_rows, expected_number_last_job_rows, expected_completed_per_wrapper, expected_completed_per_section, expected_number_job_rows_by_job_name", + [ + ( + "a003", + 16, + 8, + {}, + {"LOCAL_SETUP": 2, "SIM": 4}, + {"a003_REMOTE_SETUP": 2, "a003_20220401_fc0_1_SIM": 2}, + ), + ( + "a1ve", + 16, + 8, + {}, + {"INI": 1, "SIM": 1}, + {"a1ve_SIM": 2, "a1ve_1_ASIM": 2, "a1ve_2_ASIM": 2}, + ), + ( + "a3tb", + 284, + 162, + {(3, 3): 0}, + {"LOCAL_SETUP": 20, "SIM": 30}, + {"a3tb_LOCAL_SETUP": 20, "a3tb_20000101_fc01_1_SIM": 1}, + ), + ("a6zj", 0, 0, {}, {}, {}), + ], +) +def test_jobs_retrievals( + fixture_mock_basic_config: APIBasicConfig, + expid: str, + expected_number_job_rows: int, + expected_completed_per_wrapper: Dict[Tuple[int, int], int], + expected_completed_per_section: Dict[str, int], + expected_number_last_job_rows: int, + expected_number_job_rows_by_job_name: Dict[str, int], +): + history_db_manager = ExperimentHistoryDbManager(expid, fixture_mock_basic_config) + + # Get all jobs + all_jobs = history_db_manager.get_job_data_dcs_all() + + # Check size of the list of jobs + assert len(all_jobs) == expected_number_job_rows + + # Get all completed jobs per wrapper run id + for ( + package_code, + run_id, + ), expected_completed in expected_completed_per_wrapper.items(): + assert package_code > 2 + completed_jobs_per_wrapper = ( + history_db_manager.get_job_data_dc_COMPLETED_by_wrapper_run_id( + package_code, run_id + ) + ) + + # Check size of the list of completed jobs per wrapper run id + assert len(completed_jobs_per_wrapper) == expected_completed + + # Get all completed jobs per section + for section, expected_completed in expected_completed_per_section.items(): + completed_jobs_per_section = ( + history_db_manager.get_job_data_dcs_COMPLETED_by_section(section) + ) + + # Check size of the list of completed jobs per section + assert len(completed_jobs_per_section) == expected_completed + + # Get all last jobs + last_jobs = history_db_manager.get_all_last_job_data_dcs() + + # Check size of the list of last jobs + assert len(last_jobs) == expected_number_last_job_rows + + # Get all job rows by job name + for ( + job_name, + expected_number_job_rows, + ) in expected_number_job_rows_by_job_name.items(): + job_rows = history_db_manager.get_job_data_dcs_by_name(job_name) + + # Check size of the list of job rows by job name + assert len(job_rows) == expected_number_job_rows From d6fb8af100c6390008acc93a52d024ab7a95e4f9 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Thu, 19 Dec 2024 13:26:54 +0100 Subject: [PATCH 3/4] replace calls to repository ones --- .../experiment_history_db_manager.py | 72 ++++++++++--------- autosubmit_api/repositories/job_data.py | 1 + 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/autosubmit_api/history/database_managers/experiment_history_db_manager.py b/autosubmit_api/history/database_managers/experiment_history_db_manager.py index 98286222..534c108c 100644 --- a/autosubmit_api/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit_api/history/database_managers/experiment_history_db_manager.py @@ -26,6 +26,9 @@ from typing import List, Optional from collections import namedtuple +from autosubmit_api.repositories.experiment_run import create_experiment_run_repository +from autosubmit_api.repositories.job_data import create_experiment_job_data_repository + class ExperimentHistoryDbManager(DatabaseManager): """ Manages actions directly on the database. """ @@ -37,6 +40,9 @@ def __init__(self, expid: str, basic_config: APIBasicConfig): if self.my_database_exists(): self.set_db_version_models() + self.runs_repo = create_experiment_run_repository(expid) + self.jobs_repo = create_experiment_job_data_repository(expid) + def set_db_version_models(self): self.db_version = self._get_pragma_version() self.experiment_run_row_model = Models.get_experiment_row_model(self.db_version) @@ -56,11 +62,8 @@ def get_experiment_run_dc_with_max_id(self): def _get_experiment_run_with_max_id(self): """ Get Models.ExperimentRunRow for the maximum id run. """ - statement = self.get_built_select_statement("experiment_run", "run_id > 0 ORDER BY run_id DESC LIMIT 0, 1") - max_experiment_run = self.get_from_statement(self.historicaldb_file_path, statement) - if len(max_experiment_run) == 0: - raise Exception("No Experiment Runs registered.") - return self.experiment_run_row_model(*max_experiment_run[0]) + max_experiment_run = self.runs_repo.get_last_run() + return self.experiment_run_row_model(**(max_experiment_run.model_dump())) def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]: if run_id: @@ -68,21 +71,19 @@ def get_experiment_run_by_id(self, run_id: int) -> Optional[ExperimentRun]: return None def _get_experiment_run_by_id(self, run_id: int) -> namedtuple: - statement = self.get_built_select_statement("experiment_run", "run_id=?") - arguments = (run_id,) - experiment_run = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - if len(experiment_run) == 0: - raise Exception("Experiment run {0} for experiment {1} does not exists.".format(run_id, self.expid)) - return self.experiment_run_row_model(*experiment_run[0]) + experiment_run = self.runs_repo.get_run_by_id(run_id) + return self.experiment_run_row_model(**(experiment_run.model_dump())) def get_experiment_runs_dcs(self) -> List[ExperimentRun]: experiment_run_rows = self._get_experiment_runs() return [ExperimentRun.from_model(row) for row in experiment_run_rows] def _get_experiment_runs(self) -> List[namedtuple]: - statement = self.get_built_select_statement("experiment_run") - experiment_runs = self.get_from_statement(self.historicaldb_file_path, statement) - return [self.experiment_run_row_model(*row) for row in experiment_runs] + experiment_runs = self.runs_repo.get_all() + return [ + self.experiment_run_row_model(**(run.model_dump())) + for run in experiment_runs + ] def get_job_data_dcs_all(self) -> List[JobData]: """ Gets all content from job_data ordered by id (from table). """ @@ -90,9 +91,11 @@ def get_job_data_dcs_all(self) -> List[JobData]: def _get_job_data_all(self): """ Gets all content from job_data as list of Models.JobDataRow from database. """ - statement = self.get_built_select_statement("job_data", "id > 0 ORDER BY id") - job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement) - return [self.job_data_row_model(*row) for row in job_data_rows] + job_data_rows = self.jobs_repo.get_all() + return [ + self.job_data_row_model(**(job_data.model_dump())) + for job_data in job_data_rows + ] def get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id: int) -> List[JobData]: if not run_id or package_code <= Models.RowType.NORMAL: @@ -103,10 +106,11 @@ def get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id: return [JobData.from_model(row) for row in job_data_rows] def _get_job_data_dc_COMPLETED_by_wrapper_run_id(self, package_code: int, run_id: int) -> List[namedtuple]: - statement = self.get_built_select_statement("job_data", "run_id=? and rowtype=? and status=? ORDER BY id") - arguments = (run_id, package_code, "COMPLETED") - job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [self.job_data_row_model(*row) for row in job_data_rows] + job_data_rows = self.jobs_repo.get_job_data_COMPLETED_by_rowtype_run_id(package_code, run_id) + return [ + self.job_data_row_model(**(job_data.model_dump())) + for job_data in job_data_rows + ] def get_job_data_dcs_COMPLETED_by_section(self, section: str) -> List[JobData]: # arguments = {"status": "COMPLETED", "section": section} @@ -114,10 +118,11 @@ def get_job_data_dcs_COMPLETED_by_section(self, section: str) -> List[JobData]: return [JobData.from_model(row) for row in job_data_rows] def _get_job_data_COMPLETD_by_section(self, section): - statement = self.get_built_select_statement("job_data", "status=? and (section=? or member=?) ORDER BY id") - arguments = ("COMPLETED", section, section) - job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [self.job_data_row_model(*row) for row in job_data_rows] + job_data_rows = self.jobs_repo.get_job_data_COMPLETD_by_section(section) + return [ + self.job_data_row_model(**(job_data.model_dump())) + for job_data in job_data_rows + ] def get_all_last_job_data_dcs(self): """ Gets JobData data classes in job_data for last=1. """ @@ -126,9 +131,11 @@ def get_all_last_job_data_dcs(self): def _get_all_last_job_data_rows(self): """ Get List of Models.JobDataRow for last=1. """ - statement = self.get_built_select_statement("job_data", "last=1 and rowtype >= 2") - job_data_rows = self.get_from_statement(self.historicaldb_file_path, statement) - return [self.job_data_row_model(*row) for row in job_data_rows] + job_data_rows = self.jobs_repo.get_last_job_data() + return [ + self.job_data_row_model(**(job_data.model_dump())) + for job_data in job_data_rows + ] def get_job_data_dcs_by_name(self, job_name: str) -> List[JobData]: job_data_rows = self._get_job_data_by_name(job_name) @@ -136,10 +143,11 @@ def get_job_data_dcs_by_name(self, job_name: str) -> List[JobData]: def _get_job_data_by_name(self, job_name: str) -> List[namedtuple]: """ Get List of Models.JobDataRow for job_name """ - statement = self.get_built_select_statement("job_data", "job_name=? ORDER BY counter DESC") - arguments = (job_name,) - job_data_rows = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments) - return [self.job_data_row_model(*row) for row in job_data_rows] + job_data_rows = self.jobs_repo.get_jobs_by_name(job_name) + return [ + self.job_data_row_model(**(job_data.model_dump())) + for job_data in job_data_rows + ] def _get_pragma_version(self) -> int: """ Gets current pragma version as int. """ diff --git a/autosubmit_api/repositories/job_data.py b/autosubmit_api/repositories/job_data.py index 72cac369..1efe0863 100644 --- a/autosubmit_api/repositories/job_data.py +++ b/autosubmit_api/repositories/job_data.py @@ -39,6 +39,7 @@ class ExperimentJobDataModel(BaseModel): err: Any rowstatus: Any children: Any + platform_output: Any class ExperimentJobDataRepository(ABC): From 663a6fe3859c7117281c0fe19f4c083c2ae10811 Mon Sep 17 00:00:00 2001 From: ltenorio Date: Thu, 19 Dec 2024 14:16:48 +0100 Subject: [PATCH 4/4] deprecate old schema change checks --- .../autosubmit_legacy/job/job_list.py | 2 +- .../builders/experiment_history_builder.py | 3 - autosubmit_api/experiment/common_requests.py | 1 - .../database_managers/database_manager.py | 78 ------------------- .../experiment_history_db_manager.py | 36 ++------- autosubmit_api/history/experiment_history.py | 5 -- 6 files changed, 8 insertions(+), 117 deletions(-) delete mode 100644 autosubmit_api/history/database_managers/database_manager.py diff --git a/autosubmit_api/autosubmit_legacy/job/job_list.py b/autosubmit_api/autosubmit_legacy/job/job_list.py index 168df24d..1c7701d5 100644 --- a/autosubmit_api/autosubmit_legacy/job/job_list.py +++ b/autosubmit_api/autosubmit_legacy/job/job_list.py @@ -586,7 +586,7 @@ def get_job_times_collection(basic_config: APIBasicConfig, allJobs, expid, job_t job_data = None try: experiment_history = ExperimentHistoryDirector(ExperimentHistoryBuilder(expid)).build_reader_experiment_history() - job_data = experiment_history.manager.get_all_last_job_data_dcs() if experiment_history.is_header_ready() else None + job_data = experiment_history.manager.get_all_last_job_data_dcs() except Exception: print(traceback.print_exc()) # Result variables diff --git a/autosubmit_api/builders/experiment_history_builder.py b/autosubmit_api/builders/experiment_history_builder.py index def7b02f..8275cd10 100644 --- a/autosubmit_api/builders/experiment_history_builder.py +++ b/autosubmit_api/builders/experiment_history_builder.py @@ -39,9 +39,6 @@ def make_experiment_history(self) -> ExperimentHistory: self._validate_basic_config() if not self.experiment_history_db_manager: raise Exception("Experiment Database Manager is missing") - else: - if not self.experiment_history_db_manager.my_database_exists(): - raise Exception("Job/Runs database does not exist") if not self.logger: raise Exception("Logging is missing.") return ExperimentHistory(self.expid, self.basic_config, self.experiment_history_db_manager, self.logger) diff --git a/autosubmit_api/experiment/common_requests.py b/autosubmit_api/experiment/common_requests.py index a42e5fe1..d7ea7f53 100644 --- a/autosubmit_api/experiment/common_requests.py +++ b/autosubmit_api/experiment/common_requests.py @@ -235,7 +235,6 @@ def get_experiment_data(expid: str) -> Dict[str, Any]: if experiment_run and experiment_run.total > 0: result["total_jobs"] = experiment_run.total result["completed_jobs"] = experiment_run.completed - result["db_historic_version"] = experiment_history.manager.db_version except Exception as exc: logger.warning((traceback.format_exc())) logger.warning( diff --git a/autosubmit_api/history/database_managers/database_manager.py b/autosubmit_api/history/database_managers/database_manager.py deleted file mode 100644 index 28c75827..00000000 --- a/autosubmit_api/history/database_managers/database_manager.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2015-2020 Earth Sciences Department, BSC-CNS -# This file is part of Autosubmit. - -# Autosubmit is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# Autosubmit is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with Autosubmit. If not, see . - -import sqlite3 -import os -from typing import Tuple, List -from autosubmit_api.history import utils as HUtils -from autosubmit_api.history.database_managers import database_models as Models -from autosubmit_api.config.basicConfig import APIBasicConfig -from abc import ABCMeta - -class DatabaseManager(metaclass=ABCMeta): - """ Simple database manager. Needs expid. """ - AS_TIMES_DB_NAME = "as_times.db" # default AS_TIMES location - ECEARTH_DB_NAME = "ecearth.db" # default EC_EARTH_DB_NAME location - def __init__(self, expid: str, basic_config: APIBasicConfig): - self.expid = expid - self.JOBDATA_DIR = basic_config.JOBDATA_DIR - self.LOCAL_ROOT_DIR = basic_config.LOCAL_ROOT_DIR - self.db_version = Models.DatabaseVersion.NO_DATABASE.value - - def get_connection(self, path: str) -> sqlite3.Connection: - """ - Create a database connection to the SQLite database specified by path. - :param path: database file name - :return: Connection object or None - """ - if not os.path.exists(path): - self._create_database_file(path) - return sqlite3.connect(path) - - def _create_database_file(self, path: str): - """ creates a database files with full permissions """ - os.umask(0) - os.open(path, os.O_WRONLY | os.O_CREAT, 0o776) - - def get_from_statement(self, path: str, statement: str) -> List[Tuple]: - """ Get the rows from a statement with no arguments """ - conn = self.get_connection(path) - conn.text_factory = str - cursor = conn.cursor() - cursor.execute(statement) - statement_rows = cursor.fetchall() - conn.close() - return statement_rows - - def get_from_statement_with_arguments(self, path: str, statement: str, arguments: Tuple) -> List[Tuple]: - """ Get the rows from a statement with arguments """ - conn = self.get_connection(path) - conn.text_factory = str - cursor = conn.cursor() - cursor.execute(statement, arguments) - statement_rows = cursor.fetchall() - conn.close() - return statement_rows - - def get_built_select_statement(self, table_name: str, conditions: str = None) -> str: - """ Build and return a SELECT statement with the same fields as the model. Requires that the table is associated with a model (namedtuple). """ - model = Models.get_correct_model_for_table_and_version(table_name, self.db_version) # Models.table_name_to_model[table_name] - if conditions: - return "SELECT {0} FROM {1} WHERE {2}".format(HUtils.get_fields_as_comma_str(model), table_name, conditions) - else: - return "SELECT {0} FROM {1}".format(HUtils.get_fields_as_comma_str(model), table_name) diff --git a/autosubmit_api/history/database_managers/experiment_history_db_manager.py b/autosubmit_api/history/database_managers/experiment_history_db_manager.py index 534c108c..840ef017 100644 --- a/autosubmit_api/history/database_managers/experiment_history_db_manager.py +++ b/autosubmit_api/history/database_managers/experiment_history_db_manager.py @@ -15,46 +15,32 @@ # You should have received a copy of the GNU General Public License # along with Autosubmit. If not, see . -import os -from autosubmit_api.persistance.experiment import ExperimentPaths from autosubmit_api.history.database_managers import database_models as Models from autosubmit_api.history.data_classes.job_data import JobData from autosubmit_api.history.data_classes.experiment_run import ExperimentRun from autosubmit_api.config.basicConfig import APIBasicConfig -from autosubmit_api.history.database_managers.database_manager import DatabaseManager from typing import List, Optional from collections import namedtuple from autosubmit_api.repositories.experiment_run import create_experiment_run_repository from autosubmit_api.repositories.job_data import create_experiment_job_data_repository -class ExperimentHistoryDbManager(DatabaseManager): +class ExperimentHistoryDbManager: """ Manages actions directly on the database. """ def __init__(self, expid: str, basic_config: APIBasicConfig): - """ Requires expid and jobdata_dir_path. """ - super(ExperimentHistoryDbManager, self).__init__(expid, basic_config) - exp_paths = ExperimentPaths(expid) - self.historicaldb_file_path = exp_paths.job_data_db - if self.my_database_exists(): - self.set_db_version_models() + """ Requires expid """ + self.expid = expid + self.set_db_version_models() self.runs_repo = create_experiment_run_repository(expid) self.jobs_repo = create_experiment_job_data_repository(expid) def set_db_version_models(self): - self.db_version = self._get_pragma_version() - self.experiment_run_row_model = Models.get_experiment_row_model(self.db_version) - self.job_data_row_model = Models.get_job_data_row_model(self.db_version) - - def my_database_exists(self): - return os.path.exists(self.historicaldb_file_path) - - def is_header_ready_db_version(self): - if self.my_database_exists(): - return self._get_pragma_version() >= Models.DatabaseVersion.EXPERIMENT_HEADER_SCHEMA_CHANGES.value - return False + # From version 3.13.0 of Autosubmit, latest model is used. + self.experiment_run_row_model = Models.ExperimentRunRow + self.job_data_row_model = Models.JobDataRow def get_experiment_run_dc_with_max_id(self): """ Get Current (latest) ExperimentRun data class. """ @@ -148,11 +134,3 @@ def _get_job_data_by_name(self, job_name: str) -> List[namedtuple]: self.job_data_row_model(**(job_data.model_dump())) for job_data in job_data_rows ] - - def _get_pragma_version(self) -> int: - """ Gets current pragma version as int. """ - statement = "pragma user_version;" - pragma_result = self.get_from_statement(self.historicaldb_file_path, statement) - if len(pragma_result) <= 0: - raise Exception("Error while getting the pragma version. This might be a signal of a deeper problem. Review previous errors.") - return int(Models.PragmaVersion(*pragma_result[0]).version) diff --git a/autosubmit_api/history/experiment_history.py b/autosubmit_api/history/experiment_history.py index 635be988..4b77a813 100644 --- a/autosubmit_api/history/experiment_history.py +++ b/autosubmit_api/history/experiment_history.py @@ -40,11 +40,6 @@ def __init__(self, expid: str, basic_config: APIBasicConfig, experiment_history_ self._log.log(str(exp), traceback.format_exc()) self.manager = None - def is_header_ready(self): - if self.manager: - return self.manager.is_header_ready_db_version() - return False - def get_historic_job_data(self, job_name: str) -> List[Dict[str, Any]]: result = [] all_job_data_dcs = self.manager.get_job_data_dcs_by_name(job_name)