Skip to content

Commit

Permalink
chore: prepare job history storage
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 7, 2024
1 parent 0a1d2ee commit 06a9e02
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
23 changes: 22 additions & 1 deletion src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,7 @@ def queue_job(
operation: Union[str, Path, Manifest, OperationInfo, JobDesc],
inputs: Union[Mapping[str, Any], None],
operation_config: Union[None, Mapping[str, Any]] = None,
**job_metadata,
) -> uuid.UUID:
"""
Queue a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result.
Expand All @@ -2821,11 +2822,19 @@ def queue_job(
operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found)..
inputs: the operation inputs
operation_config: the (optional) module config in case 'operation' is a module name
**job_metadata: additional metadata to store with the job
Returns:
the queued job id
"""

if "comment" not in job_metadata.keys():
raise KiaraException("You need to provide a 'comment' for the job.")

comment = job_metadata.get("comment")
if not isinstance(comment, str):
raise KiaraException("The 'comment' must be a string.")

if inputs is None:
inputs = {}

Expand Down Expand Up @@ -2895,13 +2904,21 @@ def queue_job(
manifest = _operation

job_id = self.queue_manifest(manifest=manifest, inputs=inputs)

from kiara.models.metadata import CommentMetadata

comment_metadata = CommentMetadata(comment=comment)
self.context.metadata_registry.register_metadata_item(
key="comment", item=comment_metadata, force=False, store=None
)
return job_id

def run_job(
self,
operation: Union[str, Path, Manifest, OperationInfo, JobDesc],
inputs: Union[None, Mapping[str, Any]] = None,
operation_config: Union[None, Mapping[str, Any]] = None,
**job_metadata,
) -> ValueMapReadOnly:
"""
Run a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result.
Expand All @@ -2918,6 +2935,7 @@ def run_job(
operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found)..
inputs: the operation inputs
operation_config: the (optional) module config in case 'operation' is a module name
**job_metadata: additional metadata to store with the job
Returns:
the job result value map
Expand All @@ -2927,7 +2945,10 @@ def run_job(
inputs = {}

job_id = self.queue_job(
operation=operation, inputs=inputs, operation_config=operation_config
operation=operation,
inputs=inputs,
operation_config=operation_config,
**job_metadata,
)
return self.context.job_registry.retrieve_result(job_id=job_id)

Expand Down
34 changes: 20 additions & 14 deletions src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,12 @@ def store_metadata_item(
self,
key: str,
item: KiaraMetadata,
reference_item: Any = None,
reference_item_type: Union[str, None] = None,
reference_item_id: Union[str, None] = None,
force: bool = False,
store: Union[str, uuid.UUID, None] = None,
) -> uuid.UUID:

if reference_item:
raise NotImplementedError(
"Cannot store metadata item with reference item, not implemented yet."
)

GLOBAL_REFERENCE_TYPE = "global"
DEFAULT_GLOBAL_REFERENCE_ID = "default"

reference_item_type = GLOBAL_REFERENCE_TYPE
reference_item_id = DEFAULT_GLOBAL_REFERENCE_ID

if store:
raise NotImplementedError(
"Cannot store metadata item with store, not implemented yet."
Expand Down Expand Up @@ -112,13 +102,29 @@ def store_metadata_item(
value_hash=data_hash,
model_type_id=model_type,
model_schema_hash=model_schema_hash,
reference_item_type=reference_item_type,
reference_item_id=reference_item_id,
force=force,
)

if (reference_item_id and not reference_item_type) or (
reference_item_type and not reference_item_id
):
raise ValueError(
"If reference_item_id is set, reference_item_type must be set as well."
)

if reference_item_type:
self._store_metadata_reference(
reference_item_type, reference_item_id, str(metadata_item_id)
)

return metadata_item_id

@abc.abstractmethod
def _store_metadata_reference(
self, reference_item_type: str, reference_item_id: str, metadata_item_id: str
) -> None:
pass

@abc.abstractmethod
def _store_metadata_item(
self,
Expand Down
9 changes: 6 additions & 3 deletions src/kiara/registries/metadata/metadata_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,14 @@ def sqlite_engine(self) -> "Engine":
metadata_item_hash TEXT NOT NULL,
model_type_id TEXT NOT NULL,
model_schema_hash TEXT NOT NULL,
metadata_value TEXT NOT NULL,
FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash)
);
CREATE TABLE IF NOT EXISTS metadata_references (
reference_item_type TEXT NOT NULL,
reference_item_id TEXT NOT NULL,
metadata_value TEXT NOT NULL,
FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash),
UNIQUE (metadata_item_key, reference_item_type, reference_item_id)
metadata_item_id TEXT NOT NULL,
FOREIGN KEY (metadata_item_id) REFERENCES metadata (metadata_item_id)
);
"""

Expand Down

0 comments on commit 06a9e02

Please sign in to comment.