From da00351ef1d679718e876c99c88c6f9436e87f25 Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Wed, 13 Mar 2024 15:17:18 +0100 Subject: [PATCH] feat: initial job filtering --- src/kiara/interfaces/python_api/__init__.py | 111 ++++++++++++------ src/kiara/models/module/jobs.py | 20 ++++ src/kiara/registries/data/__init__.py | 3 + src/kiara/registries/jobs/__init__.py | 44 +++++-- .../registries/jobs/job_store/__init__.py | 15 ++- .../registries/jobs/job_store/sqlite_store.py | 59 +++++++++- src/kiara/utils/cli/run.py | 2 +- tests/test_archives/test_archive_import.py | 8 +- 8 files changed, 203 insertions(+), 59 deletions(-) diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index 89574b2cf..08abf8ed6 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -1318,10 +1318,25 @@ def register_data( ) return value + def list_all_value_ids(self) -> List[uuid.UUID]: + """List all value ids in the current context. + + This returns everything, even internal values. It should be faster than using + `list_value_ids` with equivalent parameters, because no filtering has to happen. + + Returns: + all value_ids in the current context, using every registered store + """ + + _values = self.context.data_registry.retrieve_all_available_value_ids() + return sorted(_values) + def list_value_ids(self, **matcher_params) -> List[uuid.UUID]: """ List all available value ids for this kiara context. + By default, this also includes internal values. + This method exists mainly so frontends can retrieve a list of all value_ids that exists on the backend without having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] @@ -1329,24 +1344,40 @@ def list_value_ids(self, **matcher_params) -> List[uuid.UUID]: having to look up value details is gone. Arguments: - matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters + matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters and defaults Returns: a list of value ids """ - if matcher_params: - values = self.list_values(**matcher_params) - return sorted((v.value_id for v in values.values())) - else: - _values = self.context.data_registry.retrieve_all_available_value_ids() - return sorted(_values) + + values = self.list_values(**matcher_params) + return sorted((v.value_id for v in values.values())) + + def list_all_values(self) -> ValueMapReadOnly: + """List all values in the current context, incl. internal ones. + + This should be faster than `list_values` with equivalent matcher params, because no + filtering has to happen. + """ + + # TODO: make that parallel? + values = { + k: self.context.data_registry.get_value(k) + for k in self.context.data_registry.retrieve_all_available_value_ids() + } + result = ValueMapReadOnly.create_from_values( + **{str(k): v for k, v in values.items()} + ) + return result def list_values(self, **matcher_params: Any) -> ValueMapReadOnly: """ - List all available values, optionally filter. + List all available (relevant) values, optionally filter. - Retrieve information about all values that are available in the current kiara context session (both stored - and non-stored). + Retrieve information about all values that are available in the current kiara context session (both stored and non-stored). + + Check the `ValueMatcher` class for available parameters and defaults, for example this excludes + internal values by default. Arguments: matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters @@ -1354,15 +1385,9 @@ def list_values(self, **matcher_params: Any) -> ValueMapReadOnly: Returns: a dictionary with value_id as key, and [kiara.models.values.value.Value] as value """ - if matcher_params: - matcher = ValueMatcher.create_matcher(**matcher_params) - values = self.context.data_registry.find_values(matcher=matcher) - else: - # TODO: make that parallel? - values = { - k: self.context.data_registry.get_value(k) - for k in self.context.data_registry.retrieve_all_available_value_ids() - } + + matcher = ValueMatcher.create_matcher(**matcher_params) + values = self.context.data_registry.find_values(matcher=matcher) result = ValueMapReadOnly.create_from_values( **{str(k): v for k, v in values.items()} @@ -2997,14 +3022,21 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly: result = self.context.job_registry.retrieve_result(job_id=job_id) return result + def list_all_job_record_ids(self) -> List[uuid.UUID]: + """List all available job ids in this kiara context, ordered from newest to oldest, including internal jobs. + + This should be faster than `list_job_record_ids` with equivalent parameters, because no filtering + needs to be done. + """ + + job_ids = self.context.job_registry.retrieve_all_job_record_ids() + return job_ids + def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]: """List all available job ids in this kiara context, ordered from newest to oldest. - This method exists mainly so frontends can retrieve a list of all job ids in order, without having - to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could - also just use `list_jobs` and take the keys from the result. - - You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. By default, this method for example + does not return jobs marked as 'internal'. Arguments: matcher_params: additional parameters to pass to the job matcher @@ -3013,19 +3045,24 @@ def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]: a list of job ids, ordered from latest to earliest """ - if matcher_params: - job_ids = list(self.list_job_records(**matcher_params).keys()) - else: - job_ids = self.context.job_registry.retrieve_all_job_record_ids() - + job_ids = list(self.list_job_records(**matcher_params).keys()) return job_ids + def list_all_job_records(self) -> Mapping[uuid.UUID, "JobRecord"]: + """List all available job records in this kiara context, ordered from newest to oldest, including internal jobs. + + This should be faster than `list_job_records` with equivalent parameters, because no filtering + needs to be done. + """ + + job_records = self.context.job_registry.retrieve_all_job_records() + return job_records + def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]: """List all available job ids in this kiara context, ordered from newest to oldest. - This method exists mainly so frontends can retrieve a list of all job ids in order, without having - to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could - also just use `list_jobs` and take the keys from the result. + You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. By default, this method for example + does not return jobs marked as 'internal'. You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. @@ -3037,14 +3074,10 @@ def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]: """ - if matcher_params: - raise NotImplementedError("Job matching is not implemented yet") - from kiara.models.module.jobs import JobMatcher + from kiara.models.module.jobs import JobMatcher - matcher = JobMatcher(**matcher_params) - job_records = self.context.job_registry.find_job_records(matcher=matcher) - else: - job_records = self.context.job_registry.retrieve_all_job_records() + matcher = JobMatcher(**matcher_params) + job_records = self.context.job_registry.find_job_records(matcher=matcher) return job_records diff --git a/src/kiara/models/module/jobs.py b/src/kiara/models/module/jobs.py index 8d20623f1..c5fb82812 100644 --- a/src/kiara/models/module/jobs.py +++ b/src/kiara/models/module/jobs.py @@ -12,6 +12,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Union +from pydantic import field_validator from pydantic.fields import Field, PrivateAttr from pydantic.main import BaseModel from rich import box @@ -250,9 +251,13 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): ) inputs_data_hash = str(inputs_data_cid) + module = kiara.module_registry.create_module(active_job.job_config) + is_internal = module.characteristics.is_internal + job_record = JobRecord( job_id=active_job.job_id, job_submitted=active_job.submitted, + is_internal=is_internal, module_type=active_job.job_config.module_type, module_config=active_job.job_config.module_config, is_resolved=active_job.job_config.is_resolved, @@ -278,6 +283,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): description="Information about the environments this value was created in.", default=None, ) + is_internal: bool = Field(description="Whether this job was created by the system.") # job_hash: str = Field(description="The hash of the job. Calculated from manifest & input_ids hashes.") # manifest_hash: str = Field(description="The hash of the manifest.") # input_ids_hash: str = Field(description="The hash of the field names and input ids (the value_ids/uuids).") @@ -353,6 +359,7 @@ def create_matcher(self, **match_options: Any): description="A list of job ids, if specified, only jobs with one of these ids will be included.", default_factory=list, ) + allow_internal: bool = Field(description="Allow internal jobs.", default=False) earliest: Union[None, datetime] = Field( description="The earliest time when the job was created.", default=None ) @@ -367,3 +374,16 @@ def create_matcher(self, **match_options: Any): description="A list of value ids, if specified, only jobs that produced one of them will be included.", default_factory=list, ) + + @field_validator("job_ids", mode="before") + @classmethod + def validate_job_ids(cls, v): + + if v is None: + return [] + elif isinstance(v, uuid.UUID): + return [v] + elif isinstance(v, str): + return [uuid.UUID(v)] + else: + return [x if isinstance(x, uuid.UUID) else uuid.UUID(x) for x in v] diff --git a/src/kiara/registries/data/__init__.py b/src/kiara/registries/data/__init__.py index 8fa67ee47..fff8cd988 100644 --- a/src/kiara/registries/data/__init__.py +++ b/src/kiara/registries/data/__init__.py @@ -543,6 +543,9 @@ def store_value( ) self._event_callback(store_event) + if _value.job_id: + self._kiara.job_registry.store_job_record(job_id=_value.job_id) + return persisted_value def lookup_aliases(self, value: Union[Value, uuid.UUID]) -> Set[str]: diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index db3bdb57b..be7942715 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -21,7 +21,13 @@ JobRecordPreStoreEvent, JobRecordStoredEvent, ) -from kiara.models.module.jobs import ActiveJob, JobConfig, JobRecord, JobStatus +from kiara.models.module.jobs import ( + ActiveJob, + JobConfig, + JobMatcher, + JobRecord, + JobStatus, +) from kiara.models.module.manifest import InputsManifest, Manifest from kiara.models.values.value import ValueMap, ValueMapReadOnly from kiara.processing import ModuleProcessor @@ -38,7 +44,7 @@ MANIFEST_SUB_PATH = "manifests" -class JobMatcher(abc.ABC): +class ExistingJobMatcher(abc.ABC): def __init__(self, kiara: "Kiara"): self._kiara: Kiara = kiara @@ -50,14 +56,14 @@ def find_existing_job( pass -class NoneJobMatcher(JobMatcher): +class NoneExistingJobMatcher(ExistingJobMatcher): def find_existing_job( self, inputs_manifest: InputsManifest ) -> Union[JobRecord, None]: return None -class ValueIdJobMatcher(JobMatcher): +class ValueIdExistingJobMatcher(ExistingJobMatcher): def find_existing_job( self, inputs_manifest: InputsManifest ) -> Union[JobRecord, None]: @@ -85,7 +91,7 @@ def find_existing_job( return job_record -class DataHashJobMatcher(JobMatcher): +class DataHashExistingJobMatcher(ExistingJobMatcher): def find_existing_job( self, inputs_manifest: InputsManifest ) -> Union[JobRecord, None]: @@ -151,7 +157,7 @@ def __init__(self, kiara: "Kiara"): self._kiara: Kiara = kiara - self._job_matcher_cache: Dict[JobCacheStrategy, JobMatcher] = {} + self._job_matcher_cache: Dict[JobCacheStrategy, ExistingJobMatcher] = {} self._active_jobs: bidict[str, uuid.UUID] = bidict() self._failed_jobs: Dict[str, uuid.UUID] = {} @@ -172,7 +178,7 @@ def __init__(self, kiara: "Kiara"): # self.register_job_archive(default_file_store, store_alias="default_data_store") # type: ignore @property - def job_matcher(self) -> JobMatcher: + def job_matcher(self) -> ExistingJobMatcher: from kiara.context.runtime_config import JobCacheStrategy @@ -189,11 +195,11 @@ def job_matcher(self) -> JobMatcher: job_matcher = self._job_matcher_cache.get(strategy, None) if job_matcher is None: if strategy == JobCacheStrategy.no_cache: - job_matcher = NoneJobMatcher(kiara=self._kiara) + job_matcher = NoneExistingJobMatcher(kiara=self._kiara) elif strategy == JobCacheStrategy.value_id: - job_matcher = ValueIdJobMatcher(kiara=self._kiara) + job_matcher = ValueIdExistingJobMatcher(kiara=self._kiara) elif strategy == JobCacheStrategy.data_hash: - job_matcher = DataHashJobMatcher(kiara=self._kiara) + job_matcher = DataHashExistingJobMatcher(kiara=self._kiara) else: raise Exception(f"Job cache strategy not implemented: {strategy}") self._job_matcher_cache[strategy] = job_matcher @@ -353,7 +359,23 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]: def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]: - raise NotImplementedError("Job matching is Not implemented yet.") + all_records: List[JobRecord] = [] + for archive in self.job_archives.values(): + + _job_records = archive.retrieve_matching_job_records(matcher=matcher) + all_records.extend(_job_records) + + # TODO: check for duplicates and mismatching datetimes + all_jobs_sorted = { + job.job_id: job + for job in sorted( + all_records, + key=lambda item: item.job_submitted, + reverse=True, + ) + } + + return all_jobs_sorted def retrieve_all_job_record_ids(self) -> List[uuid.UUID]: """Retrieve a list of all available job record ids, sorted from latest to earliest.""" diff --git a/src/kiara/registries/jobs/job_store/__init__.py b/src/kiara/registries/jobs/job_store/__init__.py index 0527208a5..898c64474 100644 --- a/src/kiara/registries/jobs/job_store/__init__.py +++ b/src/kiara/registries/jobs/job_store/__init__.py @@ -2,9 +2,9 @@ import abc import uuid from datetime import datetime -from typing import Iterable, Mapping, Union +from typing import Generator, Iterable, Mapping, Union -from kiara.models.module.jobs import JobRecord +from kiara.models.module.jobs import JobMatcher, JobRecord from kiara.registries import BaseArchive @@ -61,6 +61,17 @@ def retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]: job_record = self._retrieve_record_for_job_hash(job_hash=job_hash) return job_record + def retrieve_matching_job_records( + self, matcher: JobMatcher + ) -> Generator[JobRecord, None, None]: + return self._retrieve_matching_job_records(matcher=matcher) + + @abc.abstractmethod + def _retrieve_matching_job_records( + self, matcher: JobMatcher + ) -> Generator[JobRecord, None, None]: + pass + class JobStore(JobArchive): @classmethod diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index 82e60307c..b6bff4e3f 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -2,13 +2,13 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, Mapping, Union +from typing import Any, Dict, Generator, Iterable, Mapping, Union import orjson from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine -from kiara.models.module.jobs import JobRecord +from kiara.models.module.jobs import JobMatcher, JobRecord from kiara.registries import SqliteArchiveConfig from kiara.registries.jobs import JobArchive, JobStore @@ -179,6 +179,61 @@ def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, Non job_record = JobRecord(**job_record_data) return job_record + def _retrieve_matching_job_records( + self, matcher: JobMatcher + ) -> Generator[JobRecord, None, None]: + + query_conditions = [] + params: Dict[str, Any] = {} + if matcher.job_ids: + query_conditions.append("job_id IN :job_ids") + params["job_ids"] = (str(x) for x in matcher.job_ids) + + if not matcher.allow_internal: + cond = "json_extract(job_metadata, '$.is_internal') = 0" + query_conditions.append(cond) + + if matcher.earliest: + cond = "job_submitted >= :earliest" + query_conditions.append(cond) + params["earliest"] = matcher.earliest.isoformat() + + if matcher.latest: + cond = "job_submitted <= :latest" + query_conditions.append(cond) + params["latest"] = matcher.latest.isoformat() + + if matcher.operation_inputs: + raise NotImplementedError( + "Job matcher 'operation_inputs' not implemented yet" + ) + + if matcher.produced_outputs: + raise NotImplementedError( + "Job matcher 'produced_outputs' not implemented yet" + ) + + sql_query = "SELECT job_id, job_metadata FROM job_records" + if query_conditions: + sql_query += " WHERE " + + for query_cond in query_conditions: + sql_query += "( " + query_cond + " ) AND " + + sql_query = sql_query[:-5] + ";" + + sql = text(sql_query) + + with self.sqlite_engine.connect() as connection: + result = connection.execute(sql, params) + for row in result: + # job_id = uuid.UUID(row[0]) + job_metadata = orjson.loads(row[1]) + job_record = JobRecord(**job_metadata) + yield job_record + + return + def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, diff --git a/src/kiara/utils/cli/run.py b/src/kiara/utils/cli/run.py index 099561e58..58b5fa9c0 100644 --- a/src/kiara/utils/cli/run.py +++ b/src/kiara/utils/cli/run.py @@ -426,7 +426,7 @@ def execute_job( if error: sys.exit(1) - api.context.job_registry.store_job_record(job_id=job_id) + # api.context.job_registry.store_job_record(job_id=job_id) if len(saved_results) == 1: title = "[b]Stored result value[/b]" diff --git a/tests/test_archives/test_archive_import.py b/tests/test_archives/test_archive_import.py index d591326fb..b358c7f42 100644 --- a/tests/test_archives/test_archive_import.py +++ b/tests/test_archives/test_archive_import.py @@ -15,14 +15,14 @@ def test_archive_import_values_no_alias(api: KiaraAPI): archive_file = resources_folder / "archives" / "export_test.kiarchive" - assert not api.list_value_ids() + assert not api.list_all_value_ids() result = api.import_archive(archive_file, no_aliases=True) assert len(result) == 6 assert "512af8ae-f85f-4629-83fe-3b37d3841a77" in result.keys() - assert uuid.UUID("512af8ae-f85f-4629-83fe-3b37d3841a77") in api.list_value_ids() + assert uuid.UUID("512af8ae-f85f-4629-83fe-3b37d3841a77") in api.list_all_value_ids() assert ["export_test#y"] == api.list_alias_names() @@ -33,13 +33,13 @@ def test_archive_import_values_with_alias(api: KiaraAPI): archive_file = resources_folder / "archives" / "export_test.kiarchive" - assert not api.list_value_ids() + assert not api.list_all_value_ids() result = api.import_archive(archive_file, no_aliases=False) assert len(result) == 6 assert "512af8ae-f85f-4629-83fe-3b37d3841a77" in result.keys() - assert uuid.UUID("512af8ae-f85f-4629-83fe-3b37d3841a77") in api.list_value_ids() + assert uuid.UUID("512af8ae-f85f-4629-83fe-3b37d3841a77") in api.list_all_value_ids() assert ["export_test#y", "y"] == api.list_alias_names()