From a565b61a0bcf5a7e86018e7fcdaa147de2d14a86 Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Tue, 12 Mar 2024 11:15:10 +0100 Subject: [PATCH] chore: list job ids in stores --- src/kiara/interfaces/python_api/__init__.py | 86 ++++++++++++++++++- src/kiara/interfaces/python_api/workflow.py | 2 +- src/kiara/models/module/jobs.py | 33 ++++++- src/kiara/registries/jobs/__init__.py | 54 +++++++++--- .../registries/jobs/job_store/__init__.py | 22 ++++- .../registries/jobs/job_store/sqlite_store.py | 38 +++++++- src/kiara/registries/metadata/__init__.py | 16 ++++ 7 files changed, 234 insertions(+), 17 deletions(-) diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index 69103311c..f3f48d268 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -59,7 +59,6 @@ from kiara.interfaces.python_api.models.job import JobDesc from kiara.interfaces.python_api.value import StoreValueResult, StoreValuesResult from kiara.models.context import ContextInfo, ContextInfos -from kiara.models.module.jobs import ActiveJob from kiara.models.module.manifest import Manifest from kiara.models.module.operation import Operation from kiara.models.rendering import RenderValueResult @@ -101,6 +100,7 @@ ) from kiara.interfaces.python_api.workflow import Workflow from kiara.models.archives import KiArchiveInfo + from kiara.models.module.jobs import ActiveJob, JobRecord from kiara.models.module.pipeline import PipelineConfig, PipelineStructure from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo from kiara.registries import KiaraArchive @@ -1322,7 +1322,7 @@ def list_value_ids(self, **matcher_params) -> List[uuid.UUID]: """ List all available value ids for this kiara context. - This method exists mainly so frontend can retrieve a list of all value_ids that exists on the backend without + This method exists mainly so frontends can retrieve a list of all value_ids that exists on the backend without having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] would be preferable in most cases, because it is called under the hood, and the performance advantage of not @@ -1385,6 +1385,16 @@ def get_value(self, value: Union[str, Value, uuid.UUID, Path]) -> Value: return self.context.data_registry.get_value(value=value) def get_values(self, **values: Union[str, Value, uuid.UUID]) -> ValueMapReadOnly: + """Retrieve Value instances for the specified value ids or aliases. + + This is a convenience method to get fully 'hydrated' `Value` objects from references to them. + + Arguments: + values: a dictionary with value ids or aliases as keys, and value instances as values + + Returns: + a mapping with value_id as key, and [kiara.models.values.value.Value] as value + """ return self.context.data_registry.load_values(values=values) @@ -2962,7 +2972,7 @@ def run_job( ) return self.context.job_registry.retrieve_result(job_id=job_id) - def get_job(self, job_id: Union[str, uuid.UUID]) -> ActiveJob: + def get_job(self, job_id: Union[str, uuid.UUID]) -> "ActiveJob": """Retrieve the status of the job with the provided id.""" if isinstance(job_id, str): job_id = uuid.UUID(job_id) @@ -2978,6 +2988,76 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly: result = self.context.job_registry.retrieve_result(job_id=job_id) return result + def list_job_record_ids(self, **matcher_params): + """List all available job ids in this kiara context, ordered from newest to oldest. + + This method exists mainly so frontends can retrieve a list of all job ids in order, without having + to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could + also just use `list_jobs` and take the keys from the result. + + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. + + Arguments: + matcher_params: additional parameters to pass to the job matcher + + Returns: + a list of job ids, ordered from latest to earliest + """ + + if matcher_params: + records = list(self.list_job_records(**matcher_params).keys()) + else: + job_ids = self.context.job_registry.retrieve_all_job_record_ids() + + return job_ids + + def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]: + """List all available job ids in this kiara context, ordered from newest to oldest. + + This method exists mainly so frontends can retrieve a list of all job ids in order, without having + to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could + also just use `list_jobs` and take the keys from the result. + + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. + + Arguments: + matcher_params: additional parameters to pass to the job matcher + + Returns: + a list of job details, ordered from latest to earliest + + """ + + if matcher_params: + raise NotImplementedError("Job matching is not implemented yet") + from kiara.models.module.jobs import JobMatcher + + matcher = JobMatcher(**matcher_params) + job_records = self.context.job_registry.find_job_records(matcher=matcher) + else: + job_records = self.context.job_registry.retrieve_all_job_records() + + return job_records + + def get_job_record(self, job_id: Union[str, uuid.UUID]) -> "JobRecord": + + if isinstance(job_id, str): + job_id = uuid.UUID(job_id) + + job_record = self.context.job_registry.get_job_record(job_id=job_id) + return job_record + + def get_job_metadata(self, job_id: Union[str, uuid.UUID]) -> Mapping[str, Any]: + """Retrieve the metadata for the specified job.""" + + if isinstance(job_id, str): + job_id = uuid.UUID(job_id) + + metadata = self.context.metadata_registry.retrieve_job_metadata_items( + job_id=job_id + ) + return metadata + def render_value( self, value: Union[str, uuid.UUID, Value], diff --git a/src/kiara/interfaces/python_api/workflow.py b/src/kiara/interfaces/python_api/workflow.py index 69355c0ee..971d6a7a2 100644 --- a/src/kiara/interfaces/python_api/workflow.py +++ b/src/kiara/interfaces/python_api/workflow.py @@ -659,7 +659,7 @@ def _apply_inputs(self) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]] is_resolved=step_details.step.module.manifest.is_resolved, inputs=step_details.inputs, ) - match = self._kiara.job_registry.find_matching_job_record( + match = self._kiara.job_registry.find_job_record_for_manifest( inputs_manifest=job_config ) if match: diff --git a/src/kiara/models/module/jobs.py b/src/kiara/models/module/jobs.py index b066d6b43..8d20623f1 100644 --- a/src/kiara/models/module/jobs.py +++ b/src/kiara/models/module/jobs.py @@ -21,6 +21,7 @@ from kiara.exceptions import InvalidValuesException, KiaraException from kiara.models import KiaraModel from kiara.models.module.manifest import InputsManifest +from kiara.utils.dates import get_current_time_incl_timezone if TYPE_CHECKING: from kiara.context import DataRegistry, Kiara @@ -138,7 +139,8 @@ class ActiveJob(KiaraModel): ) job_log: JobLog = Field(description="The lob jog.") submitted: datetime = Field( - description="When the job was submitted.", default_factory=datetime.now + description="When the job was submitted.", + default_factory=get_current_time_incl_timezone, ) started: Union[datetime, None] = Field( description="When the job was started.", default=None @@ -250,6 +252,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): job_record = JobRecord( job_id=active_job.job_id, + job_submitted=active_job.submitted, module_type=active_job.job_config.module_type, module_config=active_job.job_config.module_config, is_resolved=active_job.job_config.is_resolved, @@ -267,6 +270,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): return job_record job_id: uuid.UUID = Field(description="The globally unique id for this job.") + job_submitted: datetime = Field(description="When the job was submitted.") environment_hashes: Mapping[str, Mapping[str, str]] = Field( description="Hashes for the environments this value was created in." ) @@ -336,3 +340,30 @@ def create_renderable(self, **config: Any) -> RenderableType: # h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION) # self._outputs_hash = h[obj] # return self._outputs_hash + + +class JobMatcher(KiaraModel): + @classmethod + def create_matcher(self, **match_options: Any): + + m = JobMatcher(**match_options) + return m + + job_ids: List[uuid.UUID] = Field( + description="A list of job ids, if specified, only jobs with one of these ids will be included.", + default_factory=list, + ) + earliest: Union[None, datetime] = Field( + description="The earliest time when the job was created.", default=None + ) + latest: Union[None, datetime] = Field( + description="The latest time when the job was created.", default=None + ) + operation_inputs: List[uuid.UUID] = Field( + description="A list of value ids, if specified, only jobs that use one of them will be included.", + default_factory=list, + ) + produced_outputs: List[uuid.UUID] = Field( + description="A list of value ids, if specified, only jobs that produced one of them will be included.", + default_factory=list, + ) diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index aea889685..d40655c6f 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -7,7 +7,8 @@ import abc import uuid -from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Type, Union +from datetime import datetime +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Type, Union import structlog from bidict import bidict @@ -350,22 +351,55 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]: raise NotImplementedError() - def retrieve_all_job_records(self) -> Mapping[str, JobRecord]: + def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]: - all_records: Dict[str, JobRecord] = {} + pass + + def retrieve_all_job_record_ids(self) -> List[uuid.UUID]: + """Retrieve a list of all available job record ids, sorted from latest to earliest.""" + + all_records: Dict[uuid.UUID, datetime] = {} for archive in self.job_archives.values(): - all_record_ids = archive.retrieve_all_job_hashes() - if all_record_ids is None: - return {} + all_record_ids = archive.retrieve_all_job_ids() + # TODO: check for duplicates and mismatching datetimes + all_records.update(all_record_ids) + + all_ids_sorted = [ + uuid + for uuid, _ in sorted( + all_records.items(), key=lambda item: item[1], reverse=True + ) + ] + + return all_ids_sorted + + def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]: + """Retrieves all job records from all job archives. + + Returns: + a map of job-id/job-record pairs, sorted by job submission time, from latest to earliest + """ + + all_records: Dict[uuid.UUID, JobRecord] = {} + for archive in self.job_archives.values(): + all_record_ids = archive.retrieve_all_job_ids().keys() for r in all_record_ids: assert r not in all_records.keys() - job_record = archive.retrieve_record_for_job_hash(r) + job_record = archive.retrieve_record_for_job_id(r) assert job_record is not None all_records[r] = job_record - return all_records + all_records_sorted = { + job_id: job + for job_id, job in sorted( + all_records.items(), + key=lambda item: item[1].job_submitted, + reverse=True, + ) + } + return all_records_sorted - def find_matching_job_record( + def find_job_record_for_manifest( self, inputs_manifest: InputsManifest ) -> Union[uuid.UUID, None]: """ @@ -481,7 +515,7 @@ def execute_job( job_hash=job_config.job_hash, ) - stored_job = self.find_matching_job_record(inputs_manifest=job_config) + stored_job = self.find_job_record_for_manifest(inputs_manifest=job_config) is_pipeline_step = False if job_config.pipeline_metadata is None else True if is_pipeline_step: diff --git a/src/kiara/registries/jobs/job_store/__init__.py b/src/kiara/registries/jobs/job_store/__init__.py index 449590dbd..0527208a5 100644 --- a/src/kiara/registries/jobs/job_store/__init__.py +++ b/src/kiara/registries/jobs/job_store/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import abc -from typing import Iterable, Union +import uuid +from datetime import datetime +from typing import Iterable, Mapping, Union from kiara.models.module.jobs import JobRecord from kiara.registries import BaseArchive @@ -32,6 +34,24 @@ def retrieve_all_job_hashes( If the job archive retrieves its jobs in a dynamic way, this will return 'None'. """ + @abc.abstractmethod + def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """ + Retrieve a list of all job record ids in the archive, along with when they where submitted. + """ + + def retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """Retrieve a list of all job ids in the archive, along with when they where submitted.""" + return self._retrieve_all_job_ids() + + @abc.abstractmethod + def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + pass + + def retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + job_record = self._retrieve_record_for_job_id(job_id=job_id) + return job_record + @abc.abstractmethod def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: pass diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index 2b8b39697..82e60307c 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +import uuid +from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, Mapping, Union @@ -114,6 +116,7 @@ def sqlite_engine(self) -> "Engine": CREATE TABLE IF NOT EXISTS job_records ( job_id TEXT PRIMARY KEY, job_hash TEXT TEXT NOT NULL, + job_submitted TEXT NOT NULL, manifest_hash TEXT NOT NULL, input_ids_hash TEXT NOT NULL, inputs_data_hash TEXT NOT NULL, @@ -146,6 +149,36 @@ def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None] job_record = JobRecord(**job_record_data) return job_record + def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]: + """ + Retrieve a list of all job record ids in the archive. + """ + + sql = text( + "SELECT job_id, job_submitted FROM job_records ORDER BY job_submitted DESC;" + ) + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql) + return {uuid.UUID(row[0]): datetime.fromisoformat(row[1]) for row in result} + + def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]: + + sql = text("SELECT job_metadata FROM job_records WHERE job_id = :job_id") + + params = {"job_id": str(job_id)} + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql, params) + row = result.fetchone() + if not row: + return None + + job_record_json = row[0] + job_record_data = orjson.loads(job_record_json) + job_record = JobRecord(**job_record_data) + return job_record + def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, @@ -222,11 +255,14 @@ def store_job_record(self, job_record: JobRecord): job_record_json = job_record.model_dump_json() + job_submitted = job_record.job_submitted.isoformat() + sql = text( - "INSERT OR IGNORE INTO job_records(job_id, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)" + "INSERT OR IGNORE INTO job_records(job_id, job_submitted, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_submitted, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)" ) params = { "job_id": str(job_record.job_id), + "job_submitted": job_submitted, "job_hash": job_hash, "manifest_hash": manifest_hash, "input_ids_hash": input_ids_hash, diff --git a/src/kiara/registries/metadata/__init__.py b/src/kiara/registries/metadata/__init__.py index 1728d6467..ce1c8f7ec 100644 --- a/src/kiara/registries/metadata/__init__.py +++ b/src/kiara/registries/metadata/__init__.py @@ -142,6 +142,18 @@ def get_archive( f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered." ) + def retrieve_metadata_item( + self, + key: str, + reference_item_type: Union[str, None] = None, + reference_item_id: Union[str, None] = None, + store: Union[str, uuid.UUID, None] = None, + ) -> Union[KiaraMetadata, None]: + + mounted_store: MetadataStore = self.get_archive(archive_id_or_alias=store) + + pass + def register_metadata_item( self, key: str, @@ -181,3 +193,7 @@ def register_job_metadata_items( reference_item_id=str(job_id), store=store, ) + + def retrieve_job_metadata_items(self, job_id: uuid.UUID): + + pass