diff --git a/.gitignore b/.gitignore index a7a92a1e9..cd642c305 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,5 @@ environment.yaml .pixi dev.py store.sqlite +**.kiarchive +**.kontext diff --git a/CHANGELOG.md b/CHANGELOG.md index 32c678933..a8eb7be6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Version 0.5.9 (upcoming) +- archive export & import feature +- expanded input options for 'store_values' API endpoint + + ## Version 0.5.8 - add 'mock' module type diff --git a/pyproject.toml b/pyproject.toml index bb4dc6f37..c902a89fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,6 +125,7 @@ sqlite_workflow_store = "kiara.registries.workflows.sqlite_store:SqliteWorkflowS run = "kiara.interfaces.cli.run:run" info = "kiara.interfaces.cli.info.commands:info" context = "kiara.interfaces.cli.context.commands:context" +archive = "kiara.interfaces.cli.archive.commands:context" data = "kiara.interfaces.cli.data.commands:data" module = "kiara.interfaces.cli.module.commands:module" operation = "kiara.interfaces.cli.operation.commands:operation" diff --git a/src/kiara/context/config.py b/src/kiara/context/config.py index 75d743fc0..7be36c34c 100644 --- a/src/kiara/context/config.py +++ b/src/kiara/context/config.py @@ -223,6 +223,82 @@ def archives(self) -> List["KiaraArchive"]: class KiaraContextConfig(BaseModel): + @classmethod + def create_from_sqlite_db(cls, db_path: Path) -> "KiaraContextConfig": + + import sqlite3 + + if not db_path.exists(): + context_id = str(uuid.uuid4()) + conn = sqlite3.connect(db_path) + c = conn.cursor() + c.execute( + """CREATE TABLE context_metadata + (key text PRIMARY KEY , value text NOT NULL)""" + ) + c.execute( + "INSERT INTO context_metadata VALUES ('context_id', ?)", (context_id,) + ) + c.execute( + """CREATE TABLE archive_metadata + (key text PRIMARY KEY , value text NOT NULL)""" + ) + c.execute( + "INSERT INTO archive_metadata VALUES ('archive_id', ?)", (context_id,) + ) + + conn.commit() + conn.close() + else: + try: + + with sqlite3.connect(db_path) as conn: + context_id = conn.execute( + "SELECT value FROM context_metadata WHERE key = 'context_id'" + ).fetchone()[0] + except Exception as e: + raise KiaraException( + f"Can't read context from sqlite db '{db_path}': {e}" + ) + + base_path = os.path.abspath(kiara_app_dirs.user_data_dir) + stores_base_path = os.path.join(base_path, "stores") + workflow_base_path = os.path.join( + stores_base_path, "filesystem_stores", "workflows" + ) + workflow_store_path = os.path.join(workflow_base_path, context_id) + + data_store_config = KiaraArchiveConfig( + archive_type="sqlite_data_store", + config={"sqlite_db_path": db_path.as_posix()}, + ) + alias_store_config = KiaraArchiveConfig( + archive_type="sqlite_alias_store", + config={"sqlite_db_path": db_path.as_posix()}, + ) + job_store_config = KiaraArchiveConfig( + archive_type="sqlite_job_store", + config={"sqlite_db_path": db_path.as_posix()}, + ) + workflow_store_config = KiaraArchiveConfig( + archive_type="filesystem_workflow_store", + config={"archive_path": workflow_store_path}, + ) + + archives = { + DEFAULT_DATA_STORE_MARKER: data_store_config, + DEFAULT_ALIAS_STORE_MARKER: alias_store_config, + DEFAULT_JOB_STORE_MARKER: job_store_config, + DEFAULT_WORKFLOW_STORE_MARKER: workflow_store_config, + } + + context_config = cls( + context_id=context_id, + archives=archives, + ) + + return context_config + model_config = ConfigDict(extra="forbid") context_id: str = Field(description="A globally unique id for this kiara context.") @@ -515,7 +591,7 @@ def _validate_context(self, context_config: KiaraContextConfig) -> bool: def create_default_sqlite_archive_config() -> Dict[str, Any]: store_id = str(uuid.uuid4()) - file_name = f"{store_id}.sqlite" + file_name = f"{store_id}.karchive" archive_path = Path( os.path.abspath(os.path.join(sqlite_base_path, file_name)) ) @@ -638,39 +714,51 @@ def create_context_config( if not context_alias: context_alias = DEFAULT_CONTEXT_NAME + if context_alias in self.available_context_names: raise Exception( f"Can't create kiara context '{context_alias}': context with that alias already registered." ) - if os.path.sep in context_alias: - raise Exception( - f"Can't create context with alias '{context_alias}': no special characters allowed." + if context_alias.endswith(".kontext"): + context_db_file = Path(context_alias) + context_config: KiaraContextConfig = ( + KiaraContextConfig.create_from_sqlite_db(db_path=context_db_file) ) + self._validate_context(context_config=context_config) + context_config._context_config_path = context_db_file + else: - context_file = ( - Path(os.path.join(self.context_search_paths[0])) / f"{context_alias}.yaml" - ) + if os.path.sep in context_alias: + raise Exception( + f"Can't create context with alias '{context_alias}': no special characters allowed." + ) - archives: Dict[str, KiaraArchiveConfig] = {} - # create_default_archives(kiara_config=self) - context_id = ID_REGISTRY.generate( - obj_type=KiaraContextConfig, comment=f"new kiara context '{context_alias}'" - ) + context_file = ( + Path(os.path.join(self.context_search_paths[0])) + / f"{context_alias}.yaml" + ) - context_config = KiaraContextConfig( - context_id=str(context_id), archives=archives, extra_pipelines=[] - ) + archives: Dict[str, KiaraArchiveConfig] = {} + # create_default_archives(kiara_config=self) + context_id = ID_REGISTRY.generate( + obj_type=KiaraContextConfig, + comment=f"new kiara context '{context_alias}'", + ) - self._validate_context(context_config=context_config) + context_config = KiaraContextConfig( + context_id=str(context_id), archives=archives, extra_pipelines=[] + ) + + self._validate_context(context_config=context_config) - context_file.parent.mkdir(parents=True, exist_ok=True) - with open(context_file, "wt") as f: - yaml.dump(context_config.model_dump(), f) + context_file.parent.mkdir(parents=True, exist_ok=True) + with open(context_file, "wt") as f: + yaml.dump(context_config.model_dump(), f) - context_config._context_config_path = context_file + context_config._context_config_path = context_file + self._available_context_files[context_alias] = context_file - self._available_context_files[context_alias] = context_file self._context_data[context_alias] = context_config return context_config @@ -687,13 +775,20 @@ def create_context( with contextlib.suppress(Exception): context = uuid.UUID(context) # type: ignore - if isinstance(context, str) and os.path.exists(context): + if isinstance(context, str) and ( + os.path.exists(context) or context.endswith(".kontext") + ): context = Path(context) if isinstance(context, Path): - with context.open("rt") as f: - data = yaml.load(f) - context_config = KiaraContextConfig(**data) + if context.name.endswith(".kontext"): + context_config = KiaraContextConfig.create_from_sqlite_db( + db_path=context + ) + else: + with context.open("rt") as f: + data = yaml.load(f) + context_config = KiaraContextConfig(**data) elif isinstance(context, str): context_config = self.get_context_config(context_name=context) elif isinstance(context, uuid.UUID): diff --git a/src/kiara/context/runtime_config.py b/src/kiara/context/runtime_config.py index a1bd20a16..806c19db7 100644 --- a/src/kiara/context/runtime_config.py +++ b/src/kiara/context/runtime_config.py @@ -13,6 +13,11 @@ class JobCacheStrategy(Enum): class KiaraRuntimeConfig(BaseSettings): + """The runtime configuration for a *kiara* backend. + + The most important option here is the 'job_cache' setting, which determines how the runtime will match a new job against the records of past ones, in order to find a matching one and not have to re-run the possibly expensive job again. By default, no matching is done, other options are matching based on exact input value ids, or (more expensive) matching based on the input data hashes. + """ + model_config = SettingsConfigDict( extra="forbid", validate_assignment=True, env_prefix="kiara_runtime_" ) diff --git a/src/kiara/interfaces/__init__.py b/src/kiara/interfaces/__init__.py index b944c217f..68e71911b 100644 --- a/src/kiara/interfaces/__init__.py +++ b/src/kiara/interfaces/__init__.py @@ -230,6 +230,7 @@ def __init__( ensure_plugins: Union[str, Iterable[str], None] = None, exit_process: bool = True, ): + if not context: context = os.environ.get("KIARA_CONTEXT", None) diff --git a/src/kiara/interfaces/cli/archive/__init__.py b/src/kiara/interfaces/cli/archive/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/archive/commands.py b/src/kiara/interfaces/cli/archive/commands.py new file mode 100644 index 000000000..a685dc0a9 --- /dev/null +++ b/src/kiara/interfaces/cli/archive/commands.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- + +# Copyright (c) 2021, Markus Binsteiner +# +# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) +from typing import TYPE_CHECKING + +import rich_click as click + +from kiara.utils.cli import ( + OutputFormat, + output_format_option, + terminal_print_model, +) + +if TYPE_CHECKING: + pass + + +@click.group("archive") +@click.pass_context +def context(ctx): + """Kiara archive related sub-commands.""" + + +@context.command("explain") +@click.argument("archive", nargs=1, required=True) +@output_format_option() +@click.pass_context +def explain_archive( + ctx, + format: str, + archive: str, +): + """Print details of an archive file.""" + + from kiara.api import KiaraAPI + + kiara_api: KiaraAPI = ctx.obj.kiara_api + + infos = kiara_api.get_archive_info(archive) + + if not format or format == OutputFormat.TERMINAL: + for info in infos: + types = ", ".join(info.archive_type_info.supported_item_types) + terminal_print_model(info, in_panel=f"Archive type(s): {types}") + else: + terminal_print_model(*infos, format=format) diff --git a/src/kiara/interfaces/cli/context/commands.py b/src/kiara/interfaces/cli/context/commands.py index 48b1f1f36..ad6ae9470 100644 --- a/src/kiara/interfaces/cli/context/commands.py +++ b/src/kiara/interfaces/cli/context/commands.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Tuple, Union import rich_click as click +from rich import box from rich.panel import Panel from kiara.interfaces import KiaraAPIWrap, get_console @@ -42,10 +43,17 @@ def list_contexts(ctx) -> None: @context.command("explain") @click.argument("context_name", nargs=-1, required=False) @click.option("--value-ids", "-i", help="Show value ids.", is_flag=True, default=False) +@click.option( + "--show-config", "-c", help="Also show kiara config.", is_flag=True, default=False +) @output_format_option() @click.pass_context def explain_context( - ctx, format: str, value_ids: bool, context_name: Union[Tuple[str], None] = None + ctx, + format: str, + value_ids: bool, + context_name: Union[Tuple[str], None] = None, + show_config: bool = False, ): """Print details for one or several contexts.""" kiara_config: KiaraConfig = ctx.obj.kiara_config @@ -58,15 +66,36 @@ def explain_context( from kiara.models.context import ContextInfo + render_config = { + "show_lines": False, + "show_header": False, + "show_description": False, + } + + if show_config: + from rich.table import Table + + config = kiara_config.create_renderable(**render_config) + table = Table(show_header=False, show_lines=False, box=box.SIMPLE) + table.add_column("key", style="i") + table.add_column("value") + if kiara_config._config_path: + table.add_row("config file", f" {kiara_config._config_path}") + table.add_row("config", config) + terminal_print(table, in_panel="Kiara config") + if len(contexts) == 1: kcc = kiara_config.get_context_config(contexts[0]) - cs = ContextInfo.create_from_context_config( kcc, context_name=contexts[0], runtime_config=kiara_config.runtime_config ) terminal_print_model( - cs, format=format, full_details=True, show_value_ids=value_ids + cs, + format=format, + full_details=True, + show_value_ids=value_ids, + in_panel=f"Context '{contexts[0]}'", ) else: diff --git a/src/kiara/interfaces/cli/data/commands.py b/src/kiara/interfaces/cli/data/commands.py index 8e8d987a5..9c4e55be9 100644 --- a/src/kiara/interfaces/cli/data/commands.py +++ b/src/kiara/interfaces/cli/data/commands.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from kiara.api import Kiara, KiaraAPI from kiara.operations.included_core_operations.filter import FilterOperationType + from kiara.registries.aliases import AliasArchive from kiara.registries.data import DataArchive, DataStore logger = structlog.getLogger() @@ -520,7 +521,7 @@ def filter_value( @click.option( "--archive-alias", "-a", - help="The alias to use for the exported archive. If not provided, the first alias will be used.", + help="The alias to use for the exported archive. If not provided, the first alias will be used. This is used as default in the stored archive, if not overwritten by a user.", required=False, ) @click.option( @@ -536,18 +537,16 @@ def filter_value( type=click.Choice(["zstd", "lz4", "lzma", "none"]), default="zstd", ) -@click.option( - "--force", "-f", help="Force overwriting an existing archive.", is_flag=True -) +@click.option("--append", "-A", help="Append data to existing archive.", is_flag=True) @click.argument("aliases", nargs=-1, required=True) @click.pass_context -def export_data_store( +def export_data_archive( ctx, - aliases: str, + aliases: Tuple[str], archive_alias: Union[None, str], path: Union[str, None], compression: str, - force: bool, + append: bool, ): """Export one or several values into a new data data_store.""" @@ -579,7 +578,10 @@ def export_data_store( if not path: base_path = "." - file_name = f"{archive_alias}.kiarchive" + if archive_alias.endswith(".kiarchive"): + file_name = archive_alias + else: + file_name = f"{archive_alias}.kiarchive" terminal_print(f"Creating new data_store '{file_name}'...") else: base_path = os.path.dirname(path) @@ -590,10 +592,8 @@ def export_data_store( terminal_print(f"Creating new data_store '{path}'...") full_path = Path(base_path) / file_name - if full_path.is_file() and force: - full_path.unlink() - if full_path.exists(): + if full_path.exists() and not append: terminal_print(f"[red]Error[/red]: File '{full_path}' already exists.") sys.exit(1) @@ -617,7 +617,7 @@ def export_data_store( data_store_alias = kiara_api.context.data_registry.register_data_archive(data_store) # type: ignore alias_store_alias = kiara_api.context.alias_registry.register_archive(archive_store) # type: ignore - terminal_print("Exporting value into new data_store...") + terminal_print("Exporting value(s) into new data_store...") no_default_value = False @@ -638,7 +638,7 @@ def export_data_store( key = f"value_{idx}" values_to_store[key] = value if value_alias: - alias_map[key] = value_alias + alias_map[key] = [value_alias] try: @@ -678,9 +678,6 @@ def import_data_store(ctx, archive: str): terminal_print(f"[red]Error[/red]: No data archives found in '{archive}'") sys.exit(1) - terminal_print("Registering data archive...") - store_alias = kiara_api.context.data_registry.register_data_archive(data_archive) - values = data_archive.value_ids if values is None: terminal_print( @@ -688,7 +685,20 @@ def import_data_store(ctx, archive: str): ) sys.exit(1) - result = kiara_api.store_values(values=values, alias_store=store_alias) + terminal_print("Registering data archive...") + data_store_alias = kiara_api.context.data_registry.register_data_archive( + data_archive + ) + + alias_archive: "AliasArchive" = archives.get("alias", None) # type: ignore + alias_map = {} + if alias_archive: + # terminal_print("Registering alias archive...") + # alias_archive_alias = kiara_api.context.alias_registry.register_archive(alias_archive) + for alias, value_id in alias_archive.retrieve_all_aliases().items(): + alias_map.setdefault(str(value_id), []).append(alias) + + result = kiara_api.store_values(values=values, alias_map=alias_map) terminal_print(result) terminal_print("Done.") diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index 22d782388..705cfbc9d 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -18,6 +18,7 @@ List, Mapping, MutableMapping, + Set, Type, Union, ) @@ -94,10 +95,10 @@ WorkflowsMap, ) from kiara.interfaces.python_api.workflow import Workflow + from kiara.models.archives import ArchiveInfo from kiara.models.module.pipeline import PipelineConfig, PipelineStructure from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo - logger = structlog.getLogger() yaml = YAML(typ="safe") @@ -197,6 +198,9 @@ def retrieve_plugin_info(self, plugin_name: str) -> KiaraPluginInfo: """ Get information about a plugin. + This contains information about included data-types, modules, operations, pipelines, as well as metadata + about author(s), etc. + Arguments: plugin_name: the name of the plugin @@ -210,6 +214,10 @@ def retrieve_plugin_info(self, plugin_name: str) -> KiaraPluginInfo: return info def retrieve_plugin_infos(self, plugin_name_regex: str = "^kiara[-_]plugin\\..*"): + """Get information about multiple plugins. + + This is just a convenience method to get information about multiple plugins at once. + """ if not plugin_name_regex: plugin_name_regex = "^kiara[-_]plugin\\..*" @@ -235,11 +243,18 @@ def context(self) -> "Kiara": return self._current_context def get_runtime_config(self) -> "KiaraRuntimeConfig": - """Retrieve the current runtime configuration.""" + """Retrieve the current runtime configuration. + + Check the 'KiaraRuntimeConfig' class for more information about the available options. + """ return self.context.runtime_config def get_context_info(self) -> ContextInfo: - """Retrieve information about the current kiara context.""" + """Retrieve information about the current kiara context. + + This contains information about the context, like its name/alias, the values & aliases it contains, and which archives are connected to it. + + """ context_config = self._kiara_config.get_context_config( self.get_current_context_name() ) @@ -257,7 +272,8 @@ def ensure_plugin_packages( """ Ensure that the specified packages are installed. - This functionality is provisional, don't rely on it being available long-term. Ideally, we'll have other, external ways to manage the environment. + + NOTE: this is not tested, and it might go away in the future, so don't rely on it being available long-term. Ideally, we'll have other, external ways to manage the environment. Arguments: package_names: The names of the packages to install. @@ -363,23 +379,35 @@ def __getattribute__(self, item): # ================================================================================================================== # context-management related functions def list_context_names(self) -> List[str]: - """list the names of all available/registered contexts.""" + """list the names of all available/registered contexts. + + NOTE: this functionality might be changed in the future, depending on requirements and feedback and + whether we want to support single-file contexts in the future. + """ return list(self._kiara_config.available_context_names) def retrieve_context_infos(self) -> ContextInfos: - """Retrieve information about the available/registered contexts.""" + """Retrieve information about the available/registered contexts. + + NOTE: this functionality might be changed in the future, depending on requirements and feedback and whether we want to support single-file contexts in the future. + """ return ContextInfos.create_context_infos(self._kiara_config.context_configs) def get_current_context_name(self) -> str: - """Retrieve the name fo the current context.""" + """Retrieve the name of the current context. + + NOTE: this functionality might be changed in the future, depending on requirements and feedback and whether we want to support single-file contexts in the future. + """ if self._current_context_alias is None: self.context return self._current_context_alias # type: ignore - def create_new_context(self, context_name: str, set_active: bool) -> None: + def create_new_context(self, context_name: str, set_active: bool = True) -> None: """ Create a new context. + NOTE: this functionality might be changed in the future, depending on requirements and feedback and whether we want to support single-file contexts in the future. + Arguments: context_name: the name of the new context set_active: set the newly created context as the active one @@ -395,6 +423,10 @@ def create_new_context(self, context_name: str, set_active: bool) -> None: self._current_context_alias = context_name def set_active_context(self, context_name: str, create: bool = False) -> None: + """Set the currently active context for this KiarAPI instance. + + NOTE: this functionality might be changed in the future, depending on requirements and feedback and whether we want to support single-file contexts in the future. + """ if not context_name: raise Exception("No context name provided.") @@ -416,6 +448,25 @@ def set_active_context(self, context_name: str, create: bool = False) -> None: ) self._current_context_alias = context_name + # ================================================================================================================== + # methods for archives + + def get_archive_info(self, archive_file: str) -> List["ArchiveInfo"]: + + from kiara.context.config import KiaraArchiveReference + from kiara.models.archives import ArchiveInfo + + archive_ref = KiaraArchiveReference.load_existing_archive(archive_file) + + result = [] + for archive in archive_ref.archives: + info = ArchiveInfo.create_from_instance( + kiara=self.context, instance=archive + ) + result.append(info) + + return result + # ================================================================================================================== # methods for data_types @@ -432,6 +483,7 @@ def list_data_type_names(self, include_profiles: bool = False) -> List[str]: def is_internal_data_type(self, data_type_name: str) -> bool: """Checks if the data type is prepdominantly used internally by kiara, or whether it should be exposed to the user.""" + return self.context.type_registry.is_internal_type( data_type_name=data_type_name ) @@ -664,7 +716,9 @@ def create_operation( """ Create an [Operation][kiara.models.module.operation.Operation] instance for the specified module type and (optional) config. - This can be used to get information about the operation itself, it's inputs & outputs schemas, documentation etc. + An operation is defined as a specific module type, and a specific configuration. + + This endpoint can be used to get information about the operation itself, it's inputs & outputs schemas, documentation etc. Arguments: module_type: the registered name of the module @@ -743,7 +797,9 @@ def get_operation( """ Return the operation instance with the specified id. - This can be used to get information about a specific operation, like inputs/outputs scheman, documentation, etc. + The difference to the 'create_operation' endpoint is slight, in most cases you could use either of them, but this one is a bit more convenient in most cases, as it tries to do the right thing with whatever 'operation' argument you use it. The 'create_opearation' endpoint will always create a new 'Operation' instance, while this may or may not return a re-used one. + + This endpoint can be used to get information about a specific operation, like inputs/outputs scheman, documentation, etc. The order in which the operation argument is resolved: - if it's a string, and an existing, registered operation_id, the associated operation is returned @@ -1221,6 +1277,8 @@ def register_pipeline( """ Register a pipelne as new operation into this context. + If 'operation_id' is not provided, the id will be auto-determined (in most cases using the pipeline name). + Arguments: data: a dict or a path to a json/yaml file containing the definition operation_id: the id to use for the operation (if not specified, the id will be auto-determined) @@ -1326,7 +1384,8 @@ def get_value(self, value: Union[str, Value, uuid.UUID, Path]) -> Value: """ Retrieve a value instance with the specified id or alias. - Raises an exception if no value could be found. + Basically a convenience method to convert any possible Python type into + a 'Value' instance. Raises an exception if no value could be found. Arguments: value: a value id, alias or object that has a 'value_id' attribute. @@ -1344,6 +1403,9 @@ def query_value( """ Retrieve a value attribute with the specified id or alias. + NOTE: This is a provisional endpoint, don't use for now, if you have a requirement that would + be covered by this, please let me know. + A query path is delimited by "::", and has the following format: ``` @@ -1432,7 +1494,7 @@ def retrieve_value_info( """ Retrieve an info object for a value. - 'ValueInfo' objects contains augmented information on top of what 'normal' [Value][kiara.models.values.value.Value] objects + Companion method to 'get_value', 'ValueInfo' objects contains augmented information on top of what 'normal' [Value][kiara.models.values.value.Value] objects hold (like resolved properties for example), but they can take longer to create/resolve. If you don't need any of the augmented information, just use the [get_value][kiara.interfaces.python_api.KiaraAPI.get_value] method instead. @@ -1564,7 +1626,11 @@ def assemble_value_map( reuse_existing_data: bool = False, ) -> ValueMapReadOnly: """ - Retrive a [ValueMap][TODO] object from the provided value ids or value links. + Retrive a [ValueMap][kiara.models.values.value.ValueMap] object from the provided value ids or value links. + + In most cases, this endpoint won't be used by front-ends, it's a fairly low-level method that is + mainly used for internal purposes. If you have a use-case, let me know and I'll improve the docs + if insufficient. By default, this method can only use values/datasets that are already registered in *kiara*. If you want to auto-register 'raw' data, you need to set the 'register_data' flag to 'True', and provide a schema for each of the fields that are not yet registered. @@ -1647,15 +1713,21 @@ def store_value( alias_store: Union[str, None] = None, ) -> StoreValueResult: """ - Store the specified value in the (default) value store. + Store the specified value in a value store. + + If you provide values for the 'data_store' and/or 'alias_store' other than 'default', you need + to make sure those stores are registered with the current context. In most cases, the 'export' endpoint (to be done) will probably be an easier way to export values, which I suspect will + be the main use-case for this endpoint if any of the 'store' arguments where needed. Otherwise, this endpoint is useful to persist values for use in later seperate sessions. This method does not raise an error if the storing of the value fails, so you have to investigate the 'StoreValueResult' instance that is returned to see if the storing was successful. Arguments: value: the value (or a reference to it) - alias: (Optional) aliases for the value + alias: (Optional) one or several aliases for the value allow_overwrite: whether to allow overwriting existing aliases + data_store: the alias (or archive id as string) of the store to write the data + alias_store: the alias (or archive id as string) of the store to persist the alias(es)/value_id mapping """ if isinstance(alias, str): alias = [alias] @@ -1706,7 +1778,11 @@ def store_values( """ Store multiple values into the (default) kiara value store. - If you provide a non-mapping interable as 'values', the 'alias_map' argument must be 'False', and using aliases is not possible. + Convenience method to store multiple values. In a lot of cases you can be more flexible if you + loop over the values on the frontend side, and call the 'store_value' method for each value. + + If you provide a non-mapping interable as 'values', the 'alias_map' argument must either be 'False', or a + map with a stringified uuid refering to the value in question as key, and a list of aliases as value. If you use a mapping iterable as 'values': @@ -1727,37 +1803,55 @@ def store_values( result = {} if not isinstance(values, Mapping): - if alias_map is not False: + if not alias_map: + use_aliases = False + elif alias_map and (alias_map is True or isinstance(alias_map, str)): raise KiaraException( - msg="Cannot use aliases with non-mapping iterable." + msg="Cannot use auto-aliases with non-mapping iterable." ) + else: + use_aliases = True + + for value in values: + + aliases: Set[str] = set() - for idx, value in enumerate(values): value_obj = self.get_value(value) + if use_aliases: + alias_key = str(value_obj.value_id) + alias: Union[str, None] = alias_map.get(alias_key, None) + if alias: + aliases.update(alias) + store_result = self.store_value( value=value_obj, - alias=None, + alias=aliases, allow_overwrite=allow_overwrite, data_store=data_store, alias_store=alias_store, ) - result[f"value_{idx}"] = store_result + result[str(value_obj.value_id)] = store_result else: + for field_name, value in values.items(): if alias_map is False: - aliases: Union[None, Iterable[str]] = None + aliases_map: Union[None, Iterable[str]] = None elif alias_map is True: - aliases = [field_name] + aliases_map = [field_name] elif isinstance(alias_map, str): - aliases = [f"{alias_map}.{field_name}"] + aliases_map = [f"{alias_map}.{field_name}"] else: # means it's a mapping - aliases = alias_map.get(field_name) + _aliases = alias_map.get(field_name) + if _aliases: + aliases_map = list(_aliases) + else: + aliases_map = None value_obj = self.get_value(value) store_result = self.store_value( value=value_obj, - alias=aliases, + alias=aliases_map, allow_overwrite=allow_overwrite, data_store=data_store, alias_store=alias_store, @@ -1813,6 +1907,9 @@ def assemble_filter_pipeline_config( """ Assemble a (pipeline) module config to filter values of a specific data type. + NOTE: this is a preliminary endpoint, and might go away in the future. If you have a need for this + functionality, please let me know your requirements and we can work on fleshing this out. + Optionally, a module that uses the filtered dataset as input can be specified. # TODO: document filter names @@ -1854,6 +1951,8 @@ def retrieve_renderer_infos( ) -> RendererInfos: """Retrieve information about the available renderers. + Note: this is preliminary and mainly used in the cli, if another use-case comes up let me know and I'll make this more generic, and an 'official' endpoint. + Arguments: source_type: the type of the item to render (optional filter) target_type: the type/profile of the rendered result (optional filter) @@ -1881,6 +1980,10 @@ def retrieve_renderer_infos( return infos # type: ignore def retrieve_renderers_for(self, source_type: str) -> List[KiaraRenderer]: + """Retrieve available renderer instances for a specific data type. + + Note: this is not preliminary, and, mainly used in the cli, if another use-case comes up let me know and I'll make this more generic, and an 'official' endpoint. + """ return self.context.render_registry.retrieve_renderers_for_source_type( source_type=source_type @@ -1895,6 +1998,8 @@ def render( ) -> Any: """Render an internal instance of a supported source type into one of the supported target types. + Note: this is not preliminary, and, mainly used in the cli, if another use-case comes up let me know and I'll make this more generic, and an 'official' endpoint. + To find out the supported source/target combinations, you can use the kiara cli: ``` @@ -2020,6 +2125,8 @@ def queue_manifest( """ Queue a job using the provided manifest to describe the module and config that should be executed. + You probably want to use 'queue_job' instead. + Arguments: manifest: the manifest inputs: the job inputs (can be either references to values, or raw inputs @@ -2045,6 +2152,8 @@ def run_manifest( """ Run a job using the provided manifest to describe the module and config that should be executed. + You probably want to use 'run_job' instead. + Arguments: manifest: the manifest inputs: the job inputs (can be either references to values, or raw inputs diff --git a/src/kiara/models/archives.py b/src/kiara/models/archives.py index 14e24251a..12025ea32 100644 --- a/src/kiara/models/archives.py +++ b/src/kiara/models/archives.py @@ -27,12 +27,13 @@ TypeInfoItemGroup, ) from kiara.models.documentation import ( + AuthorModel, AuthorsMetadataModel, ContextMetadataModel, DocumentationMetadataModel, ) from kiara.models.python_class import PythonClass -from kiara.registries import ArchiveDetails, KiaraArchive +from kiara.registries import ArchiveDetails, ArchiveMetadata, KiaraArchive from kiara.utils.json import orjson_dumps if TYPE_CHECKING: @@ -96,7 +97,7 @@ def create_renderable(self, **config: Any) -> RenderableType: table.add_row("Python class", self.python_class.create_renderable()) - table.add_row("is_writeable", "yes" if self.is_writable else "no") + # table.add_row("is_writeable", "yes" if self.is_writable else "no") table.add_row( "supported_item_types", ", ".join(sorted(self.supported_item_types)) ) @@ -136,6 +137,26 @@ def create_from_archive( archive_aliases: Union[Iterable[str], None] = None, ): + doc_str = archive.archive_metadata.get("description", None) + doc = DocumentationMetadataModel.create(doc_str) + + authors_raw = archive.archive_metadata.get("authors", []) + _authors = [] + for author in authors_raw: + author = AuthorModel(**author) + _authors.append(author) + authors = AuthorsMetadataModel(authors=_authors) + + tags = archive.archive_metadata.get("tags", []) + labels = archive.archive_metadata.get("labels", {}) + + references = archive.archive_metadata.get("references", {}) + # TODO: add references model + + context = ContextMetadataModel(tags=tags, labels=labels, references=references) + + # archive_types = list(archive.supported_item_types()) + archive_type_info = ArchiveTypeInfo.create_from_type_class( archive.__class__, kiara=kiara ) @@ -145,12 +166,15 @@ def create_from_archive( archive_aliases = list(archive_aliases) return ArchiveInfo( archive_type_info=archive_type_info, - type_name=str(archive.archive_id), - documentation=archive_type_info.documentation, - authors=archive_type_info.authors, - context=archive_type_info.context, + archive_alias=archive.archive_alias, archive_id=archive.archive_id, + type_name=str(archive.archive_id), + documentation=doc, + authors=authors, + context=context, + # archive_types=archive_types, details=archive.get_archive_details(), + metadata=archive.archive_metadata, config=archive.config.model_dump(), aliases=archive_aliases, ) @@ -160,17 +184,47 @@ def category_name(cls) -> str: return "info.archive" archive_id: uuid.UUID = Field(description="The (globally unique) archive id.") + archive_alias: str = Field(description="The archive alias.") + archive_type_info: ArchiveTypeInfo = Field( description="Information about this archives' type." ) + # archive_types: List[Literal["data", "alias", "job_record", "workflow"]] = Field(description="The archive type.") + config: Mapping[str, Any] = Field(description="The configuration of this archive.") details: ArchiveDetails = Field( description="Type dependent (runtime) details for this archive." ) + metadata: ArchiveMetadata = Field(description="Metadata for this archive.") aliases: List[str] = Field( description="Aliases for this archive.", default_factory=list ) + def create_renderable(self, **config: Any) -> RenderableType: + from kiara.utils.output import extract_renderable + + table = Table(show_header=False, box=box.SIMPLE) + table.add_column("property", style="i") + table.add_column("value") + + details = extract_renderable(self.details, render_config=config) + metadata = extract_renderable(self.metadata, render_config=config) + type_info = self.archive_type_info.create_renderable(**config) + # table.add_row("archive id", str(self.archive_id)) + # table.add_row("archive alias", self.archive_alias) + # table.add_row("archive type(s)", ", ".join(self.archive_types)) + table.add_row("details", details) + table.add_row("metadata", metadata) + table.add_row("archive type info", type_info) + if self.documentation.is_set: + table.add_row("doc", self.documentation.create_renderable(**config)) + if self.authors.authors: + table.add_row("author(s)", self.authors.create_renderable(**config)) + if self.context.labels or self.context.tags or self.context.references: + table.add_row("context", self.context.create_renderable(**config)) + + return table + class ArchiveGroupInfo(InfoItemGroup): @@ -211,7 +265,7 @@ def combined_size(self) -> int: if archive_info.archive_id in archive_ids: continue archive_ids.add(archive_info.archive_id) - size = archive_info.details.size + size = archive_info.details.root.get("size", 0) if size and size > 0: combined = combined + size diff --git a/src/kiara/models/module/jobs.py b/src/kiara/models/module/jobs.py index 8ab2fdada..25ebae3b5 100644 --- a/src/kiara/models/module/jobs.py +++ b/src/kiara/models/module/jobs.py @@ -228,9 +228,13 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): runtime=active_job.runtime, # type: ignore ) - inputs_data_cid = active_job.job_config.calculate_inputs_data_cid( + ( + inputs_data_cid, + contains_invalid, + ) = active_job.job_config.calculate_inputs_data_cid( data_registry=kiara.data_registry ) + inputs_data_hash = str(inputs_data_cid) job_record = JobRecord( job_id=active_job.job_id, @@ -241,10 +245,13 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): outputs=active_job.results, runtime_details=job_details, environment_hashes=kiara.environment_registry.environment_hashes, - inputs_data_hash=( - str(inputs_data_cid) if inputs_data_cid is not None else None - ), + # input_ids_hash=active_job.job_config.input_ids_hash, + inputs_data_hash=inputs_data_hash, ) + job_record._manifest_cid = active_job.job_config.manifest_cid + job_record._manifest_data = active_job.job_config.manifest_data + job_record._jobs_cid = active_job.job_config.job_cid + job_record._inputs_cid = active_job.job_config.inputs_cid return job_record job_id: uuid.UUID = Field(description="The globally unique id for this job.") @@ -255,8 +262,11 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob): description="Information about the environments this value was created in.", default=None, ) + # job_hash: str = Field(description="The hash of the job. Calculated from manifest & input_ids hashes.") + # manifest_hash: str = Field(description="The hash of the manifest.") + # input_ids_hash: str = Field(description="The hash of the field names and input ids (the value_ids/uuids).") inputs_data_hash: Union[str, None] = Field( - description="A map of the hashes of this jobs inputs." + description="A map of the hashes of this jobs inputs (the hashes of field names and the actual bytes)." ) outputs: Dict[str, uuid.UUID] = Field(description="References to the job outputs.") @@ -301,7 +311,7 @@ def create_renderable(self, **config: Any) -> RenderableType: v = extract_renderable(attr) table.add_row(k, v) table.add_row("job hash", self.job_hash) - table.add_row("inputs hash", self.inputs_hash) + table.add_row("inputs hash", self.input_ids_hash) return table # @property diff --git a/src/kiara/models/module/manifest.py b/src/kiara/models/module/manifest.py index ab90ce2ce..4e1a5ea4b 100644 --- a/src/kiara/models/module/manifest.py +++ b/src/kiara/models/module/manifest.py @@ -6,7 +6,7 @@ # Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/) import uuid -from typing import TYPE_CHECKING, Any, ClassVar, Dict, Mapping, Union +from typing import TYPE_CHECKING, Any, ClassVar, Dict, Mapping, Tuple, Union import orjson from dag_cbor import IPLDKind @@ -147,8 +147,10 @@ class InputsManifest(Manifest): description="A map of all the input fields and value references." ) _inputs_cid: Union[CID, None] = PrivateAttr(default=None) + # _inputs_hash: Union[str, None] = PrivateAttr(default=None) _jobs_cid: Union[CID, None] = PrivateAttr(default=None) - _inputs_data_cid: Union[bool, CID, None] = PrivateAttr(default=None) + _inputs_data_cid: Union[CID, None] = PrivateAttr(default=None) + _input_data_contains_invalid: Union[bool, None] = PrivateAttr(default=None) @field_validator("inputs") @classmethod @@ -185,17 +187,23 @@ def inputs_cid(self) -> CID: return self._inputs_cid @property - def inputs_hash(self) -> str: + def input_ids_hash(self) -> str: + return str(self.inputs_cid) def calculate_inputs_data_cid( self, data_registry: "DataRegistry" - ) -> Union[CID, None]: + ) -> Tuple[CID, bool]: + """Calculates the cid of the data hashes contained in this inputs manifest. + + This returns two values in a tuple: the first value is the cid where 'invalid hash markes' (used when a value is not set) is set to 'None', the second one indicates whether such an + invalid hash marker was encountered. + + This might be important to know, because if the interface of the module in question changed (which is possible for those types of fields), the computed input might not be valid anymore and would need to be re-computed. + """ if self._inputs_data_cid is not None: - if self._inputs_data_cid is False: - return None - return self._inputs_data_cid # type: ignore + return (self._inputs_data_cid, self._inputs_data_contains_invalid) data_hashes: Dict[str, Any] = {} invalid = False @@ -204,13 +212,11 @@ def calculate_inputs_data_cid( value = data_registry.get_value(v) if value.value_hash == INVALID_HASH_MARKER: invalid = True - break - data_hashes[k] = CID.decode(value.value_hash) - - if invalid: - self._inputs_data_cid = False - return None + data_hashes[k] = None + else: + data_hashes[k] = CID.decode(value.value_hash) _, cid = compute_cid(data=data_hashes) self._inputs_data_cid = cid - return cid + self._inputs_data_contains_invalid = invalid + return (cid, invalid) diff --git a/src/kiara/modules/included_core_modules/filesystem.py b/src/kiara/modules/included_core_modules/filesystem.py index d2a7134e1..0b3f15dbf 100644 --- a/src/kiara/modules/included_core_modules/filesystem.py +++ b/src/kiara/modules/included_core_modules/filesystem.py @@ -111,10 +111,12 @@ def to__python_object(self, data: SerializedData, **config: Any): class ImportFileBundleConfig(KiaraModuleConfig): include_file_types: Union[None, List[str]] = Field( - description="File types to include.", default=None + description="File types to include. Type is list of strings, which will be matched using 'endswith' test.", + default=None, ) exclude_file_types: Union[None, List[str]] = Field( - description="File types to include.", default=None + description="File types to exclude. Type is list of strings, which will be matched with the 'endswith' test.", + default=None, ) diff --git a/src/kiara/registries/__init__.py b/src/kiara/registries/__init__.py index 2bf62b312..4253ccf4e 100644 --- a/src/kiara/registries/__init__.py +++ b/src/kiara/registries/__init__.py @@ -63,11 +63,8 @@ def create_new_store_config(cls, store_base_path: str, **kwargs) -> Self: logger = structlog.getLogger() -class ArchiveDetails(BaseModel): - - size: Union[int, None] = Field( - description="The size of the stored archive.", default=None - ) +class ArchiveDetails(RootModel): + root: Dict[str, Any] class ArchiveMetadata(RootModel): @@ -93,7 +90,7 @@ def get(self, key, default=None): # ) -NON_ARCHIVE_DETAILS = ArchiveDetails() +NON_ARCHIVE_DETAILS = ArchiveDetails(root={}) class KiaraArchive(abc.ABC, Generic[ARCHIVE_CONFIG_CLS]): @@ -283,7 +280,7 @@ def delete_archive(self, archive_id: Union[uuid.UUID, None] = None): ) logger.info( - "deleteing.archive", + "deleting.archive", archive_id=self.archive_id, item_types=self.supported_item_types(), archive_type=self.__class__.__name__, diff --git a/src/kiara/registries/aliases/__init__.py b/src/kiara/registries/aliases/__init__.py index a65cb521b..b3ad85457 100644 --- a/src/kiara/registries/aliases/__init__.py +++ b/src/kiara/registries/aliases/__init__.py @@ -23,7 +23,7 @@ from kiara.defaults import INVALID_ALIAS_NAMES from kiara.exceptions import KiaraException from kiara.models.events.alias_registry import AliasArchiveAddedEvent -from kiara.registries import BaseArchive +from kiara.registries import ArchiveDetails, BaseArchive from kiara.registries.data import ValueLink if TYPE_CHECKING: @@ -58,6 +58,12 @@ def find_value_id_for_alias(self, alias: str) -> Union[uuid.UUID, None]: def find_aliases_for_value_id(self, value_id: uuid.UUID) -> Union[Set[str], None]: pass + def get_archive_details(self) -> ArchiveDetails: + all_aliases = self.retrieve_all_aliases() + return ArchiveDetails( + root={"no_aliases": len(all_aliases), "aliases": sorted(all_aliases.keys())} + ) + class AliasStore(AliasArchive): @abc.abstractmethod diff --git a/src/kiara/registries/aliases/archives.py b/src/kiara/registries/aliases/archives.py index eb9b8ed4f..c9eccc2d0 100644 --- a/src/kiara/registries/aliases/archives.py +++ b/src/kiara/registries/aliases/archives.py @@ -48,8 +48,10 @@ def _retrieve_archive_metadata(self) -> Mapping[str, Any]: archive_id = _archive_metadata.get("archive_id", None) if not archive_id: try: - _archive_id = uuid.UUID(self.alias_store_path.name) - _archive_metadata["archive_id"] = _archive_id + _archive_id = uuid.UUID( + self.alias_store_path.name + ) # just to check if it's a valid UUID + _archive_metadata["archive_id"] = str(_archive_id) except Exception: raise Exception( f"Could not retrieve archive id for alias archive '{self.archive_alias}'." diff --git a/src/kiara/registries/data/data_store/filesystem_store.py b/src/kiara/registries/data/data_store/filesystem_store.py index b4e2dd8a4..d8eaca86c 100644 --- a/src/kiara/registries/data/data_store/filesystem_store.py +++ b/src/kiara/registries/data/data_store/filesystem_store.py @@ -95,8 +95,10 @@ def _retrieve_archive_metadata(self) -> Mapping[str, Any]: archive_id = _archive_metadata.get("archive_id", None) if not archive_id: try: - _archive_id = uuid.UUID(self.data_store_path.name) - _archive_metadata["archive_id"] = _archive_id + _archive_id = uuid.UUID( + self.data_store_path.name + ) # just to test it's a valid uuid + _archive_metadata["archive_id"] = str(_archive_id) except Exception: raise Exception( f"Could not retrieve archive id for alias archive '{self.archive_alias}'." @@ -113,7 +115,15 @@ def get_archive_details(self) -> ArchiveDetails: size = sum( f.stat().st_size for f in self.data_store_path.glob("**/*") if f.is_file() ) - return ArchiveDetails(size=size) + all_values = self.value_ids + num_values = len(all_values) + return ArchiveDetails( + root={ + "size": size, + "no_values": num_values, + "value_ids": sorted((str(x) for x in all_values)), + } + ) @property def data_store_path(self) -> Path: diff --git a/src/kiara/registries/data/data_store/sqlite_store.py b/src/kiara/registries/data/data_store/sqlite_store.py index af8a783b7..fcdcbd6a2 100644 --- a/src/kiara/registries/data/data_store/sqlite_store.py +++ b/src/kiara/registries/data/data_store/sqlite_store.py @@ -34,7 +34,6 @@ from kiara.registries.data.data_store import BaseDataStore from kiara.utils.hashfs import shard from kiara.utils.json import orjson_dumps -from kiara.utils.windows import fix_windows_longpath if TYPE_CHECKING: from multiformats import CID @@ -460,7 +459,16 @@ def _delete_archive(self): def get_archive_details(self) -> ArchiveDetails: size = self.sqlite_path.stat().st_size - return ArchiveDetails(size=size) + all_values = self.value_ids + num_values = len(all_values) + + return ArchiveDetails( + root={ + "size": size, + "no_values": num_values, + "value_ids": sorted((str(x) for x in all_values)), + } + ) class SqliteDataStore(SqliteDataArchive[SqliteDataStoreConfig], BaseDataStore): diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index b229981b7..2e52bc7ed 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -11,6 +11,7 @@ import structlog from bidict import bidict +from rich.console import Group from kiara.exceptions import FailedJobException from kiara.models.events import KiaraEvent @@ -51,7 +52,8 @@ def supported_item_types(cls) -> Iterable[str]: def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, - inputs_hash: Union[str, None] = None, + inputs_id_hash: Union[str, None] = None, + inputs_data_hash: Union[str, None] = None, ) -> Iterable[str]: """ Retrieve a list of all job record hashes (cids) that match the given filter arguments. @@ -135,6 +137,13 @@ def find_existing_job( matches = [] + ignore_internal = True + if ignore_internal: + + module = self._kiara.module_registry.create_module(inputs_manifest) + if module.characteristics.is_internal: + return None + for store_id, archive in self._kiara.job_registry.job_archives.items(): match = archive.retrieve_record_for_job_hash( @@ -155,11 +164,9 @@ def find_existing_job( return job_record - inputs_data_cid = inputs_manifest.calculate_inputs_data_cid( + inputs_data_cid, contains_invalid = inputs_manifest.calculate_inputs_data_cid( data_registry=self._kiara.data_registry ) - if not inputs_data_cid: - return None inputs_data_hash = str(inputs_data_cid) @@ -437,6 +444,7 @@ def find_matching_job_record( return None job_record = self.job_matcher.find_existing_job(inputs_manifest=inputs_manifest) + if job_record is None: return None @@ -448,6 +456,7 @@ def find_matching_job_record( job_hash=inputs_manifest.job_hash, module_type=inputs_manifest.module_type, ) + return job_record.job_id def prepare_job_config( @@ -480,12 +489,21 @@ def execute_job( job_metadata: Union[None, Any] = None, ) -> uuid.UUID: - log = logger.bind( - module_type=job_config.module_type, - module_config=job_config.module_config, - inputs={k: str(v) for k, v in job_config.inputs.items()}, - job_hash=job_config.job_hash, - ) + if job_config.module_type != "pipeline": + log = logger.bind( + module_type=job_config.module_type, + module_config=job_config.module_config, + inputs={k: str(v) for k, v in job_config.inputs.items()}, + job_hash=job_config.job_hash, + ) + else: + pipeline_name = job_config.module_config.get("pipeline_name", "n/a") + log = logger.bind( + module_type=job_config.module_type, + pipeline_name=pipeline_name, + inputs={k: str(v) for k, v in job_config.inputs.items()}, + job_hash=job_config.job_hash, + ) stored_job = self.find_matching_job_record(inputs_manifest=job_config) if stored_job is not None: @@ -494,6 +512,31 @@ def execute_job( job_id=str(stored_job), module_type=job_config.module_type, ) + if is_develop(): + + module = self._kiara.module_registry.create_module(manifest=job_config) + if job_metadata and job_metadata.get("is_pipeline_step", True): + step_id = job_metadata.get("step_id", None) + title = f"Using cached pipeline step: {step_id}" + else: + title = f"Using cached job for: {module.module_type_name}" + + from kiara.utils.debug import create_module_preparation_table + from kiara.utils.develop import log_dev_message + + stored_job_record = self.get_job_record(stored_job) + + table = create_module_preparation_table( + kiara=self._kiara, + job_config=job_config, + job_id=stored_job_record.job_id, + module=module, + ) + include = ["job_hash", "inputs_id_hash", "input_ids_hash", "outputs"] + table_job_record = stored_job_record.create_renderable(include=include) + panel = Group(table, table_job_record) + log_dev_message(panel, title=title) + return stored_job if job_metadata is None: diff --git a/src/kiara/registries/jobs/job_store/filesystem_store.py b/src/kiara/registries/jobs/job_store/filesystem_store.py index fe3963081..5738de478 100644 --- a/src/kiara/registries/jobs/job_store/filesystem_store.py +++ b/src/kiara/registries/jobs/job_store/filesystem_store.py @@ -8,7 +8,7 @@ import shutil import uuid from pathlib import Path -from typing import Any, Dict, Iterable, Mapping, Union +from typing import Any, Iterable, Mapping, Union import orjson import structlog @@ -49,7 +49,7 @@ def get_archive_details(self) -> ArchiveDetails: size = sum( f.stat().st_size for f in self.job_store_path.glob("**/*") if f.is_file() ) - return ArchiveDetails(size=size) + return ArchiveDetails(root={"size": size}) def _retrieve_archive_metadata(self) -> Mapping[str, Any]: @@ -61,8 +61,10 @@ def _retrieve_archive_metadata(self) -> Mapping[str, Any]: archive_id = _archive_metadata.get("archive_id", None) if not archive_id: try: - _archive_id = uuid.UUID(self.job_store_path.name) - _archive_metadata["archive_id"] = _archive_id + _archive_id = uuid.UUID( + self.job_store_path.name + ) # just to check if it's a valid uuid + _archive_metadata["archive_id"] = str(_archive_id) except Exception: raise Exception( f"Could not retrieve archive id for alias archive '{self.archive_alias}'." @@ -92,20 +94,23 @@ def _delete_archive(self) -> None: def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, - inputs_hash: Union[str, None] = None, + inputs_id_hash: Union[str, None] = None, + inputs_data_hash: Union[str, None] = None, ) -> Iterable[str]: base_path = self.job_store_path / MANIFEST_SUB_PATH if not manifest_hash: - if not inputs_hash: + if not inputs_id_hash: records = base_path.glob("*/*/*.job_record") else: - records = base_path.glob(f"*/{inputs_hash}/*.job_record") + records = base_path.glob(f"*/{inputs_id_hash}/*.job_record") else: - if not inputs_hash: + if not inputs_id_hash: records = base_path.glob(f"{manifest_hash}/*/*.job_record") else: - records = base_path.glob(f"{manifest_hash}/{inputs_hash}/*.job_record") + records = base_path.glob( + f"{manifest_hash}/{inputs_id_hash}/*.job_record" + ) result = [] for record in records: @@ -175,7 +180,11 @@ class FileSystemJobStore(FileSystemJobArchive, JobStore): def store_job_record(self, job_record: JobRecord): manifest_cid = job_record.manifest_cid - inputs_hash = job_record.inputs_hash + # inputs_hash = job_record.inputs_data_hash + + manifest_hash = job_record.manifest_hash + input_ids_hash = job_record.input_ids_hash + inputs_hash = job_record.inputs_data_hash base_path = self.job_store_path / MANIFEST_SUB_PATH manifest_folder = base_path / str(manifest_cid) diff --git a/src/kiara/registries/jobs/job_store/sqlite_store.py b/src/kiara/registries/jobs/job_store/sqlite_store.py index da0ee6594..ddf86a886 100644 --- a/src/kiara/registries/jobs/job_store/sqlite_store.py +++ b/src/kiara/registries/jobs/job_store/sqlite_store.py @@ -9,7 +9,6 @@ from kiara.models.module.jobs import JobRecord from kiara.registries import SqliteArchiveConfig from kiara.registries.jobs import JobArchive, JobStore -from kiara.utils.windows import fix_windows_longpath class SqliteJobArchive(JobArchive): @@ -113,9 +112,11 @@ def sqlite_engine(self) -> "Engine": self._cached_engine = create_engine(self.db_url, future=True) create_table_sql = """ CREATE TABLE IF NOT EXISTS job_records ( - job_hash TEXT PRIMARY KEY, + job_id TEXT PRIMARY KEY, + job_hash TEXT TEXT NOT NULL, manifest_hash TEXT NOT NULL, - inputs_hash TEXT NOT NULL, + input_ids_hash TEXT NOT NULL, + inputs_data_hash TEXT NOT NULL, job_metadata TEXT NOT NULL ); """ @@ -148,20 +149,21 @@ def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None] def retrieve_all_job_hashes( self, manifest_hash: Union[str, None] = None, - inputs_hash: Union[str, None] = None, + inputs_id_hash: Union[str, None] = None, + inputs_data_hash: Union[str, None] = None, ) -> Iterable[str]: if not manifest_hash: - if not inputs_hash: + if not inputs_id_hash: sql = text("SELECT job_hash FROM job_records") params = {} else: sql = text( "SELECT job_hash FROM job_records WHERE inputs_hash = :inputs_hash" ) - params = {"inputs_hash": inputs_hash} + params = {"inputs_hash": inputs_id_hash} else: - if not inputs_hash: + if not inputs_id_hash: sql = text( "SELECT job_hash FROM job_records WHERE manifest_hash = :manifest_hash" ) @@ -170,7 +172,7 @@ def retrieve_all_job_hashes( sql = text( "SELECT job_hash FROM job_records WHERE manifest_hash = :manifest_hash AND inputs_hash = :inputs_hash" ) - params = {"manifest_hash": manifest_hash, "inputs_hash": inputs_hash} + params = {"manifest_hash": manifest_hash, "inputs_hash": inputs_id_hash} with self.sqlite_engine.connect() as connection: result = connection.execute(sql, params) @@ -213,19 +215,22 @@ def _load_archive_config( def store_job_record(self, job_record: JobRecord): - manifest_hash = str(job_record.manifest_cid) - inputs_hash = job_record.inputs_hash - job_hash = job_record.job_hash + manifest_hash = job_record.manifest_hash + input_ids_hash = job_record.input_ids_hash + inputs_data_hash = job_record.inputs_data_hash + job_record_json = job_record.model_dump_json() sql = text( - "INSERT INTO job_records (job_hash, manifest_hash, inputs_hash, job_metadata) VALUES (:job_hash, :manifest_hash, :inputs_hash, :job_metadata)" + "INSERT OR IGNORE INTO job_records(job_id, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)" ) params = { + "job_id": str(job_record.job_id), "job_hash": job_hash, "manifest_hash": manifest_hash, - "inputs_hash": inputs_hash, + "input_ids_hash": input_ids_hash, + "inputs_data_hash": inputs_data_hash, "job_metadata": job_record_json, } diff --git a/src/kiara/utils/debug.py b/src/kiara/utils/debug.py index 9a89bcd9e..a620584c9 100644 --- a/src/kiara/utils/debug.py +++ b/src/kiara/utils/debug.py @@ -47,8 +47,11 @@ def create_module_preparation_table( module_details = dev_config.log.pre_run.module_info if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]: + pipeline_name = job_config.module_config.get("pipeline_name", None) if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]: table.add_row("module", job_config.module_type) + if pipeline_name: + table.add_row("pipeline name", pipeline_name) doc = module.operation.doc table.add_row( "module desc", @@ -59,6 +62,8 @@ def create_module_preparation_table( ) elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]: table.add_row("module", job_config.module_type) + if pipeline_name: + table.add_row("pipeline name", pipeline_name) doc = module.operation.doc table.add_row( "module doc", diff --git a/src/kiara/utils/output.py b/src/kiara/utils/output.py index ef9b2d0bb..196b24090 100644 --- a/src/kiara/utils/output.py +++ b/src/kiara/utils/output.py @@ -892,6 +892,7 @@ def create_recursive_table_from_model_object( show_lines = render_config.get("show_lines", True) show_header = render_config.get("show_header", True) + show_description = render_config.get("show_description", False) model_cls = model.__class__ table = RichTable(box=box.SIMPLE, show_lines=show_lines, show_header=show_header) @@ -918,9 +919,12 @@ def create_recursive_table_from_model_object( data_renderable = extract_renderable(data, render_config=render_config) sub_model = None else: + updated_render_config = dict(render_config) + updated_render_config["show_header"] = False sub_model = create_recursive_table_from_model_object( - data, render_config={"show_lines": True, "show_header": False} + data, render_config=updated_render_config ) + data_renderable = None group = [] @@ -928,7 +932,7 @@ def create_recursive_table_from_model_object( if data_renderable: group.append(data_renderable) group.append("") - if desc: + if desc and show_description: group.append(f"[i]{desc}[/i]") if sub_model: