Skip to content

Commit

Permalink
chore: initial version of metadata query abstract function
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 29, 2024
1 parent c5781d3 commit 31b8df0
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 11 deletions.
19 changes: 17 additions & 2 deletions src/kiara/interfaces/python_api/base_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,9 @@ def export_values(
)

if export_related_metadata:

print(values)

raise NotImplementedError("Exporting related metadata not implemented yet.")

if additional_archive_metadata:
Expand Down Expand Up @@ -2662,10 +2665,10 @@ def assemble_filter_pipeline_config(
# ------------------------------------------------------------------------------------------------------------------
# metadata-related methods

def register_metadata(
def register_metadata_item(
self, key: str, value: str, store: Union[str, None] = None
) -> uuid.UUID:
"""Register a comment into the specified metadata store.
"""Register a metadata item into the specified metadata store.
Currently, this allows you to store comments within the default kiara context. You can use any string,
as key, for example a stringified `job_id`, or `value_id`, or any other string that makes sense in
Expand Down Expand Up @@ -2698,6 +2701,18 @@ def register_metadata(
key=key, item=item, store=store
)

def find_metadata_items(self, **matcher_params: Any):

from kiara.registries.metadata import MetadataMatcher

# ref_item_type = "job"
ref_item_id = "b295ce99-4024-4468-b167-a61604d2a0d9"

matcher = MetadataMatcher.create_matcher(**matcher_params)


return self.context.metadata_registry.find_metadata_items(matcher=matcher)

# ------------------------------------------------------------------------------------------------------------------
# render-related methods

Expand Down
20 changes: 16 additions & 4 deletions src/kiara/interfaces/python_api/models/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,14 @@ def create_kiarchive(
_kiara: Union["Kiara", None] = PrivateAttr(default=None)

@property
def metadata_archive(self) -> "MetadataArchive":
def metadata_archive(self) -> Union["MetadataArchive", None]:

if self._metadata_archive:
return self._metadata_archive

if self.metadata_archive_config is None:
return None

from kiara.utils.stores import create_new_archive

metadata_archive: MetadataArchive = create_new_archive( # type: ignore
Expand All @@ -257,11 +260,14 @@ def metadata_archive(self) -> "MetadataArchive":
return self._metadata_archive

@property
def data_archive(self) -> "DataArchive":
def data_archive(self) -> Union["DataArchive", None]:

if self._data_archive:
return self._data_archive

if self.data_archive_config is None:
return None

from kiara.utils.stores import create_new_archive

data_archive: DataArchive = create_new_archive( # type: ignore
Expand All @@ -276,11 +282,14 @@ def data_archive(self) -> "DataArchive":
return self._data_archive

@property
def alias_archive(self) -> "AliasArchive":
def alias_archive(self) -> Union["AliasArchive", None]:

if self._alias_archive is not None:
return self._alias_archive

if self.alias_archive_config is None:
return None

from kiara.utils.stores import create_new_archive

alias_archive: AliasStore = create_new_archive( # type: ignore
Expand All @@ -294,11 +303,14 @@ def alias_archive(self) -> "AliasArchive":
return self._alias_archive

@property
def job_archive(self) -> "JobArchive":
def job_archive(self) -> Union["JobArchive", None]:

if self._jobs_archive is not None:
return self._jobs_archive

if self.job_archive_config is None:
return None

from kiara.utils.stores import create_new_archive

jobs_archive: JobStore = create_new_archive( # type: ignore
Expand Down
46 changes: 44 additions & 2 deletions src/kiara/registries/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
import uuid
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Union, List

from pydantic import Field
from pydantic import Field, field_validator

from kiara.defaults import DEFAULT_METADATA_STORE_MARKER, DEFAULT_STORE_MARKER
from kiara.models import KiaraModel
from kiara.models.events import RegistryEvent
from kiara.models.metadata import CommentMetadata, KiaraMetadata
from kiara.registries.metadata.metadata_store import MetadataArchive, MetadataStore
Expand All @@ -13,6 +14,40 @@
from kiara.context import Kiara


class MetadataMatcher(KiaraModel):
"""An object describing requirements metadata items should satisfy in order to be included in a query result.
"""

@classmethod
def create_matcher(cls, **match_options: Any):
m = MetadataMatcher(**match_options)
return m

metadata_item_keys: Union[None, List[str]] = Field(description="The metadata item key to match (if provided).", default=None)
reference_item_types: Union[None, List[str]] = Field(description="A 'reference_item_type' a metadata item is referenced from.", default=None)
reference_item_keys: Union[None, List[str]] = Field(description="A 'reference_item_key' a metadata item is referenced from.", default=None)
reference_item_ids: Union[None, List[str]] = Field(description="An list of ids that a metadata item is referenced from.", default=None)


@field_validator("reference_item_types", "reference_item_keys", "reference_item_ids", "metadata_item_keys", mode="before")
@classmethod
def validate_reference_item_ids(cls, v):

print(v)
print(type(v))

if v is None:
return None
elif isinstance(v, str):
return [v]
else:
v = set(v)
return list(v)




class MetadataArchiveAddedEvent(RegistryEvent):

event_type: Literal["metadata_archive_added"] = "metadata_archive_added"
Expand Down Expand Up @@ -141,6 +176,13 @@ def get_archive(
f"Can't retrieve archive with id '{archive_id_or_alias}': no archive with that id registered."
)

def find_metadata_items(self, matcher: MetadataMatcher) -> List[Dict[str, Any]]:

mounted_store: MetadataArchive = self.get_archive()

return mounted_store.find_matching_metadata_items(matcher=matcher)


def retrieve_metadata_item(
self,
key: str,
Expand Down
14 changes: 13 additions & 1 deletion src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import abc
import json
import uuid
from typing import Any, Dict, Generic, Iterable, Mapping, Tuple, Union
from typing import Any, Dict, Generic, Iterable, Mapping, Tuple, Union, TYPE_CHECKING, Generator

from kiara.exceptions import KiaraException
from kiara.models.metadata import KiaraMetadata
from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive

if TYPE_CHECKING:
from kiara.registries.metadata import MetadataMatcher


class MetadataArchive(BaseArchive[ARCHIVE_CONFIG_CLS], Generic[ARCHIVE_CONFIG_CLS]):
"""Base class for data archiv implementationss."""
Expand All @@ -33,6 +36,15 @@ def __init__(
self._schema_stored_cache: Dict[str, Any] = {}
self._schema_stored_item: Dict[str, uuid.UUID] = {}

def find_matching_metadata_items(self, matcher: "MetadataMatcher", metadata_item_result_fields: Union[Iterable[str], None]=None, reference_item_result_fields: Union[Iterable[str], None]=None) -> Generator[Tuple[Any, ...], None, None]:

return self._find_matching_metadata_items(matcher=matcher, metadata_item_result_fields=metadata_item_result_fields, reference_item_result_fields=reference_item_result_fields)

@abc.abstractmethod
def _find_matching_metadata_items(self, matcher: "MetadataMatcher", metadata_item_result_fields: Union[Iterable[str], None]=None, reference_item_result_fields: Union[Iterable[str], None]=None) -> Generator[Tuple[Any, ...], None, None]:
pass


def retrieve_metadata_item(
self,
metadata_item_key: str,
Expand Down
73 changes: 71 additions & 2 deletions src/kiara/registries/metadata/metadata_store/sqlite_store.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
import uuid
from pathlib import Path
from typing import Any, Dict, Mapping, Tuple, Union
from typing import Any, Dict, Mapping, Tuple, Union, Iterable, Generator

import orjson
from sqlalchemy import text
from sqlalchemy.engine import Engine

from kiara.exceptions import KiaraException
from kiara.registries import SqliteArchiveConfig
from kiara.registries.metadata import MetadataArchive, MetadataStore
from kiara.registries.metadata import MetadataArchive, MetadataStore, MetadataMatcher
from kiara.utils.dates import get_current_time_incl_timezone
from kiara.utils.db import create_archive_engine, delete_archive_db

Expand Down Expand Up @@ -153,6 +153,75 @@ def sqlite_engine(self) -> "Engine":
# event.listen(self._cached_engine, "connect", _pragma_on_connect)
return self._cached_engine

def _find_matching_metadata_items(self, matcher: "MetadataMatcher",
metadata_item_result_fields: Union[Iterable[str], None] = None,
reference_item_result_fields: Union[Iterable[str], None] = None) -> Generator[Tuple[Any, ...], None, None]:

# find all metadata items first

if not metadata_item_result_fields:
metadata_fields_str = "m.*"
else:
metadata_fields_str = ", ".join((f"m.{x}" for x in metadata_item_result_fields))

metadata_fields_str += ", :result_type as result_type"

sql_string = f"SELECT {metadata_fields_str} FROM metadata m "
conditions = []
params = {
"result_type": "metadata_item"
}

ref_query = False
if matcher.reference_item_types or matcher.reference_item_keys or matcher.reference_item_ids:
ref_query = True
sql_string += "JOIN metadata_references r ON m.metadata_item_id = r.metadata_item_id"

if matcher.metadata_item_keys:
conditions.append("WHERE m.metadata_item_key in :metadata_item_keys")
params["metadata_item_key"] = matcher.metadata_item_keys

if matcher.reference_item_ids:
assert ref_query
in_clause = []
for idx, item_id in enumerate(matcher.reference_item_ids):
params[f"ri_id_{idx}"] = item_id
in_clause.append(f":ri_id_{idx}")
in_clause_str = ", ".join(in_clause)
conditions.append(f"WHERE r.reference_item_id IN ({in_clause_str})")
params["reference_item_ids"] = tuple(matcher.reference_item_ids)

if matcher.reference_item_types:
assert ref_query
conditions.append("AND r.reference_item_type IN :reference_item_types")
params["reference_item_types"] = tuple(matcher.reference_item_types)

if matcher.reference_item_keys:
assert ref_query
conditions.append("AND r.reference_item_key IN :reference_item_keys")
params["reference_item_keys"] = tuple(matcher.reference_item_keys)

if conditions:

for cond in conditions:
sql_string += f" {cond} AND"

sql_string = sql_string[:-4]
sql = text(sql_string)

# ... now construct the query to find the reference items (if applicable)
if not reference_item_result_fields:
reference_fields_str = "*"
else:
reference_fields_str = ", ".join((f"r.{x}" for x in reference_item_result_fields))


with self.sqlite_engine.connect() as connection:
result = connection.execute(sql, params)
for row in result:
yield row


def _retrieve_referenced_metadata_item_data(
self, key: str, reference_type: str, reference_key: str, reference_id: str
) -> Union[Tuple[str, Mapping[str, Any]], None]:
Expand Down
Binary file modified tests/resources/archives/export_test.kiarchive
Binary file not shown.

0 comments on commit 31b8df0

Please sign in to comment.