Skip to content

Commit

Permalink
feat: initial job filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 13, 2024
1 parent 99b2f85 commit da00351
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 59 deletions.
111 changes: 72 additions & 39 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1318,51 +1318,76 @@ def register_data(
)
return value

def list_all_value_ids(self) -> List[uuid.UUID]:
"""List all value ids in the current context.
This returns everything, even internal values. It should be faster than using
`list_value_ids` with equivalent parameters, because no filtering has to happen.
Returns:
all value_ids in the current context, using every registered store
"""

_values = self.context.data_registry.retrieve_all_available_value_ids()
return sorted(_values)

def list_value_ids(self, **matcher_params) -> List[uuid.UUID]:
"""
List all available value ids for this kiara context.
By default, this also includes internal values.
This method exists mainly so frontends can retrieve a list of all value_ids that exists on the backend without
having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
would be preferable in most cases, because it is called under the hood, and the performance advantage of not
having to look up value details is gone.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters and defaults
Returns:
a list of value ids
"""
if matcher_params:
values = self.list_values(**matcher_params)
return sorted((v.value_id for v in values.values()))
else:
_values = self.context.data_registry.retrieve_all_available_value_ids()
return sorted(_values)

values = self.list_values(**matcher_params)
return sorted((v.value_id for v in values.values()))

def list_all_values(self) -> ValueMapReadOnly:
"""List all values in the current context, incl. internal ones.
This should be faster than `list_values` with equivalent matcher params, because no
filtering has to happen.
"""

# TODO: make that parallel?
values = {
k: self.context.data_registry.get_value(k)
for k in self.context.data_registry.retrieve_all_available_value_ids()
}
result = ValueMapReadOnly.create_from_values(
**{str(k): v for k, v in values.items()}
)
return result

def list_values(self, **matcher_params: Any) -> ValueMapReadOnly:
"""
List all available values, optionally filter.
List all available (relevant) values, optionally filter.
Retrieve information about all values that are available in the current kiara context session (both stored
and non-stored).
Retrieve information about all values that are available in the current kiara context session (both stored and non-stored).
Check the `ValueMatcher` class for available parameters and defaults, for example this excludes
internal values by default.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a dictionary with value_id as key, and [kiara.models.values.value.Value] as value
"""
if matcher_params:
matcher = ValueMatcher.create_matcher(**matcher_params)
values = self.context.data_registry.find_values(matcher=matcher)
else:
# TODO: make that parallel?
values = {
k: self.context.data_registry.get_value(k)
for k in self.context.data_registry.retrieve_all_available_value_ids()
}

matcher = ValueMatcher.create_matcher(**matcher_params)
values = self.context.data_registry.find_values(matcher=matcher)

result = ValueMapReadOnly.create_from_values(
**{str(k): v for k, v in values.items()}
Expand Down Expand Up @@ -2997,14 +3022,21 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly:
result = self.context.job_registry.retrieve_result(job_id=job_id)
return result

def list_all_job_record_ids(self) -> List[uuid.UUID]:
"""List all available job ids in this kiara context, ordered from newest to oldest, including internal jobs.
This should be faster than `list_job_record_ids` with equivalent parameters, because no filtering
needs to be done.
"""

job_ids = self.context.job_registry.retrieve_all_job_record_ids()
return job_ids

def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]:
"""List all available job ids in this kiara context, ordered from newest to oldest.
This method exists mainly so frontends can retrieve a list of all job ids in order, without having
to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could
also just use `list_jobs` and take the keys from the result.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. By default, this method for example
does not return jobs marked as 'internal'.
Arguments:
matcher_params: additional parameters to pass to the job matcher
Expand All @@ -3013,19 +3045,24 @@ def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]:
a list of job ids, ordered from latest to earliest
"""

if matcher_params:
job_ids = list(self.list_job_records(**matcher_params).keys())
else:
job_ids = self.context.job_registry.retrieve_all_job_record_ids()

job_ids = list(self.list_job_records(**matcher_params).keys())
return job_ids

def list_all_job_records(self) -> Mapping[uuid.UUID, "JobRecord"]:
"""List all available job records in this kiara context, ordered from newest to oldest, including internal jobs.
This should be faster than `list_job_records` with equivalent parameters, because no filtering
needs to be done.
"""

job_records = self.context.job_registry.retrieve_all_job_records()
return job_records

def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]:
"""List all available job ids in this kiara context, ordered from newest to oldest.
This method exists mainly so frontends can retrieve a list of all job ids in order, without having
to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could
also just use `list_jobs` and take the keys from the result.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class. By default, this method for example
does not return jobs marked as 'internal'.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class.
Expand All @@ -3037,14 +3074,10 @@ def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]:
"""

if matcher_params:
raise NotImplementedError("Job matching is not implemented yet")
from kiara.models.module.jobs import JobMatcher
from kiara.models.module.jobs import JobMatcher

matcher = JobMatcher(**matcher_params)
job_records = self.context.job_registry.find_job_records(matcher=matcher)
else:
job_records = self.context.job_registry.retrieve_all_job_records()
matcher = JobMatcher(**matcher_params)
job_records = self.context.job_registry.find_job_records(matcher=matcher)

return job_records

Expand Down
20 changes: 20 additions & 0 deletions src/kiara/models/module/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Union

from pydantic import field_validator
from pydantic.fields import Field, PrivateAttr
from pydantic.main import BaseModel
from rich import box
Expand Down Expand Up @@ -250,9 +251,13 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
)
inputs_data_hash = str(inputs_data_cid)

module = kiara.module_registry.create_module(active_job.job_config)
is_internal = module.characteristics.is_internal

job_record = JobRecord(
job_id=active_job.job_id,
job_submitted=active_job.submitted,
is_internal=is_internal,
module_type=active_job.job_config.module_type,
module_config=active_job.job_config.module_config,
is_resolved=active_job.job_config.is_resolved,
Expand All @@ -278,6 +283,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
description="Information about the environments this value was created in.",
default=None,
)
is_internal: bool = Field(description="Whether this job was created by the system.")
# job_hash: str = Field(description="The hash of the job. Calculated from manifest & input_ids hashes.")
# manifest_hash: str = Field(description="The hash of the manifest.")
# input_ids_hash: str = Field(description="The hash of the field names and input ids (the value_ids/uuids).")
Expand Down Expand Up @@ -353,6 +359,7 @@ def create_matcher(self, **match_options: Any):
description="A list of job ids, if specified, only jobs with one of these ids will be included.",
default_factory=list,
)
allow_internal: bool = Field(description="Allow internal jobs.", default=False)
earliest: Union[None, datetime] = Field(
description="The earliest time when the job was created.", default=None
)
Expand All @@ -367,3 +374,16 @@ def create_matcher(self, **match_options: Any):
description="A list of value ids, if specified, only jobs that produced one of them will be included.",
default_factory=list,
)

@field_validator("job_ids", mode="before")
@classmethod
def validate_job_ids(cls, v):

if v is None:
return []
elif isinstance(v, uuid.UUID):
return [v]
elif isinstance(v, str):
return [uuid.UUID(v)]
else:
return [x if isinstance(x, uuid.UUID) else uuid.UUID(x) for x in v]
3 changes: 3 additions & 0 deletions src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ def store_value(
)
self._event_callback(store_event)

if _value.job_id:
self._kiara.job_registry.store_job_record(job_id=_value.job_id)

return persisted_value

def lookup_aliases(self, value: Union[Value, uuid.UUID]) -> Set[str]:
Expand Down
44 changes: 33 additions & 11 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
JobRecordPreStoreEvent,
JobRecordStoredEvent,
)
from kiara.models.module.jobs import ActiveJob, JobConfig, JobRecord, JobStatus
from kiara.models.module.jobs import (
ActiveJob,
JobConfig,
JobMatcher,
JobRecord,
JobStatus,
)
from kiara.models.module.manifest import InputsManifest, Manifest
from kiara.models.values.value import ValueMap, ValueMapReadOnly
from kiara.processing import ModuleProcessor
Expand All @@ -38,7 +44,7 @@
MANIFEST_SUB_PATH = "manifests"


class JobMatcher(abc.ABC):
class ExistingJobMatcher(abc.ABC):
def __init__(self, kiara: "Kiara"):

self._kiara: Kiara = kiara
Expand All @@ -50,14 +56,14 @@ def find_existing_job(
pass


class NoneJobMatcher(JobMatcher):
class NoneExistingJobMatcher(ExistingJobMatcher):
def find_existing_job(
self, inputs_manifest: InputsManifest
) -> Union[JobRecord, None]:
return None


class ValueIdJobMatcher(JobMatcher):
class ValueIdExistingJobMatcher(ExistingJobMatcher):
def find_existing_job(
self, inputs_manifest: InputsManifest
) -> Union[JobRecord, None]:
Expand Down Expand Up @@ -85,7 +91,7 @@ def find_existing_job(
return job_record


class DataHashJobMatcher(JobMatcher):
class DataHashExistingJobMatcher(ExistingJobMatcher):
def find_existing_job(
self, inputs_manifest: InputsManifest
) -> Union[JobRecord, None]:
Expand Down Expand Up @@ -151,7 +157,7 @@ def __init__(self, kiara: "Kiara"):

self._kiara: Kiara = kiara

self._job_matcher_cache: Dict[JobCacheStrategy, JobMatcher] = {}
self._job_matcher_cache: Dict[JobCacheStrategy, ExistingJobMatcher] = {}

self._active_jobs: bidict[str, uuid.UUID] = bidict()
self._failed_jobs: Dict[str, uuid.UUID] = {}
Expand All @@ -172,7 +178,7 @@ def __init__(self, kiara: "Kiara"):
# self.register_job_archive(default_file_store, store_alias="default_data_store") # type: ignore

@property
def job_matcher(self) -> JobMatcher:
def job_matcher(self) -> ExistingJobMatcher:

from kiara.context.runtime_config import JobCacheStrategy

Expand All @@ -189,11 +195,11 @@ def job_matcher(self) -> JobMatcher:
job_matcher = self._job_matcher_cache.get(strategy, None)
if job_matcher is None:
if strategy == JobCacheStrategy.no_cache:
job_matcher = NoneJobMatcher(kiara=self._kiara)
job_matcher = NoneExistingJobMatcher(kiara=self._kiara)
elif strategy == JobCacheStrategy.value_id:
job_matcher = ValueIdJobMatcher(kiara=self._kiara)
job_matcher = ValueIdExistingJobMatcher(kiara=self._kiara)
elif strategy == JobCacheStrategy.data_hash:
job_matcher = DataHashJobMatcher(kiara=self._kiara)
job_matcher = DataHashExistingJobMatcher(kiara=self._kiara)
else:
raise Exception(f"Job cache strategy not implemented: {strategy}")
self._job_matcher_cache[strategy] = job_matcher
Expand Down Expand Up @@ -353,7 +359,23 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]:

def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]:

raise NotImplementedError("Job matching is Not implemented yet.")
all_records: List[JobRecord] = []
for archive in self.job_archives.values():

_job_records = archive.retrieve_matching_job_records(matcher=matcher)
all_records.extend(_job_records)

# TODO: check for duplicates and mismatching datetimes
all_jobs_sorted = {
job.job_id: job
for job in sorted(
all_records,
key=lambda item: item.job_submitted,
reverse=True,
)
}

return all_jobs_sorted

def retrieve_all_job_record_ids(self) -> List[uuid.UUID]:
"""Retrieve a list of all available job record ids, sorted from latest to earliest."""
Expand Down
15 changes: 13 additions & 2 deletions src/kiara/registries/jobs/job_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import abc
import uuid
from datetime import datetime
from typing import Iterable, Mapping, Union
from typing import Generator, Iterable, Mapping, Union

from kiara.models.module.jobs import JobRecord
from kiara.models.module.jobs import JobMatcher, JobRecord
from kiara.registries import BaseArchive


Expand Down Expand Up @@ -61,6 +61,17 @@ def retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]:
job_record = self._retrieve_record_for_job_hash(job_hash=job_hash)
return job_record

def retrieve_matching_job_records(
self, matcher: JobMatcher
) -> Generator[JobRecord, None, None]:
return self._retrieve_matching_job_records(matcher=matcher)

@abc.abstractmethod
def _retrieve_matching_job_records(
self, matcher: JobMatcher
) -> Generator[JobRecord, None, None]:
pass


class JobStore(JobArchive):
@classmethod
Expand Down
Loading

0 comments on commit da00351

Please sign in to comment.