Skip to content

Commit

Permalink
chore: start work on mock/pipeline related modules
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Dec 18, 2023
1 parent 5a62b15 commit 43cc6a5
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 76 deletions.
19 changes: 19 additions & 0 deletions examples/jobs/simple_1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
operation: mock
module_config:
title: dummy_and
desc: dummy_desc
inputs_schema:
a:
type: boolean
doc: The first input.
b:
type: boolean
doc: The second input.
outputs_schema:
y:
type: boolean
doc: The output.
result_value: true
inputs:
a: true
b: true
143 changes: 67 additions & 76 deletions src/kiara_plugin/develop/modules/__init__.py
Original file line number Diff line number Diff line change
@@ -1,104 +1,95 @@
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING, Any, Mapping, Type

import orjson
from typing import Any, Dict, Mapping

from kiara.models.module.jobs import JobLog
from kiara.models.module.pipeline import PipelineConfig
from kiara.models.values.value import SerializedData, Value, ValueMap
from kiara.modules import KiaraModule, ValueMapSchema
from kiara.modules.included_core_modules.create_from import CreateFromModule
from kiara.modules.included_core_modules.serialization import DeserializeValueModule

if TYPE_CHECKING:
from kiara.models.filesystem import KiaraFile, KiaraFileBundle


class CreatePipelineModule(CreateFromModule):

_module_type_name = "create.pipeline"

def create__pipeline__from__file(self, source_value: Value) -> Any:
from boltons.strutils import slugify
from pydantic import Field

from kiara.models.module.pipeline import PipelineConfig

file: KiaraFile = source_value.data
pipeline_config = PipelineConfig.from_file(path=file.path)

return pipeline_config


class LoadPipelineConfig(DeserializeValueModule):
from kiara.api import KiaraModule, KiaraModuleConfig, ValueMap, ValueMapSchema
from kiara.models.module.pipeline import PipelineConfig

_module_type_name = "load.pipeline"

class MockModuleConfig(KiaraModuleConfig):
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": PipelineConfig}
def create_pipeline_config(
cls, title: str, description: str, author: str, *steps: "MockModuleConfig"
) -> PipelineConfig:

data: Dict[str, Any] = {
"pipeline_name": slugify(title),
"doc": description,
"context": {"authors": [author]},
"steps": [],
}
for step in steps:
step_data = {
"step_id": slugify(step.title),
"module_type": "dummy",
"module_config": {
"title": step.title,
"inputs_schema": step.inputs_schema,
"outputs_schema": step.outputs_schema,
"desc": step.desc,
},
}
data["steps"].append(step_data)

@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "json"
pipeline_config = PipelineConfig.from_config(data)
return pipeline_config

@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "pipeline"
inputs_schema: Dict[str, Dict[str, Any]] = Field(
description="The input fields and their types.",
)

def to__python_object(self, data: SerializedData, **config: Any) -> PipelineConfig:
outputs_schema: Dict[str, Dict[str, Any]] = Field(
description="The outputs fields and their types.",
)

chunks = data.get_serialized_data("data")
assert chunks.get_number_of_chunks() == 1
_chunks = list(chunks.get_chunks(as_files=False))
assert len(_chunks) == 1

bytes_string: bytes = _chunks[0] # type: ignore
model_data = orjson.loads(bytes_string)
title: str = Field(description="The title of the step.")
desc: str = Field(description="A description of what this step does.")

obj = PipelineConfig.from_config(model_data)
return obj

class CollectPipelines(KiaraModule):
class MockKiaraModule(KiaraModule):

_module_type_name = "collect.pipelines"
_module_type_name = "mock"
_config_cls = MockModuleConfig

def create_inputs_schema(
self,
) -> ValueMapSchema:

return {
"pipeline_files": {
"type": "file_bundle",
"doc": "A file bundle containing pipeline files in json or yaml format.",
result = {}
v: Mapping[str, Any]
for k, v in self.get_config_value("inputs_schema").items():
data = {
"type": v["type"],
"doc": v.get("doc", "-- n/a --"),
"optional": v.get("optional", True),
}
}
result[k] = data

return result

def create_outputs_schema(
self,
) -> ValueMapSchema:

return {
"pipelines": {
"type": "table",
"doc": "A table with the pipeline data.",
result = {}
for k, v in self.get_config_value("outputs_schema").items():
data = {
"type": v["type"],
"doc": v.get("doc", "-- n/a --"),
"optional": v.get("optional", False),
}
}
result[k] = data

return result

def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

# config = self.get_config_value("desc")

def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

pipeline_files: KiaraFileBundle = inputs["pipeline_files"].data

pipelines = {}
for path, pipeline_file in pipeline_files.included_files.items():
if pipeline_file.file_extension in ["json", "yaml", "yml"]:
try:
pipeline_config = PipelineConfig.from_file(path=pipeline_file.path)
pipelines[path] = pipeline_config
job_log.add_log(f"parsed pipeline for: {path}")
except Exception as e:
job_log.add_log(f"ignoring invalid pipeline file '{path}': {e}")

from kiara.utils.cli import terminal_print
for path, pipeline_config in pipelines.items():
terminal_print(pipeline_config.create_renderable())
pipelines_table = None
outputs.set_value("pipelines", pipelines_table)
outputs_schema = self.get_config_value("outputs_schema")
for field_name, field_config in outputs_schema.items():
outputs.set_value(field_name, field_config["result_value"])
104 changes: 104 additions & 0 deletions src/kiara_plugin/develop/modules/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING, Any, Mapping, Type

import orjson

from kiara.models.module.jobs import JobLog
from kiara.models.module.pipeline import PipelineConfig
from kiara.models.values.value import SerializedData, Value, ValueMap
from kiara.modules import KiaraModule, ValueMapSchema
from kiara.modules.included_core_modules.create_from import CreateFromModule
from kiara.modules.included_core_modules.serialization import DeserializeValueModule

if TYPE_CHECKING:
from kiara.models.filesystem import KiaraFile, KiaraFileBundle


class CreatePipelineModule(CreateFromModule):

_module_type_name = "create.pipeline"

def create__pipeline__from__file(self, source_value: Value) -> Any:

from kiara.models.module.pipeline import PipelineConfig

file: KiaraFile = source_value.data
pipeline_config = PipelineConfig.from_file(path=file.path)

return pipeline_config


class LoadPipelineConfig(DeserializeValueModule):

_module_type_name = "load.pipeline"

@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": PipelineConfig}

@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "json"

@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "pipeline"

def to__python_object(self, data: SerializedData, **config: Any) -> PipelineConfig:

chunks = data.get_serialized_data("data")
assert chunks.get_number_of_chunks() == 1
_chunks = list(chunks.get_chunks(as_files=False))
assert len(_chunks) == 1

bytes_string: bytes = _chunks[0] # type: ignore
model_data = orjson.loads(bytes_string)

obj = PipelineConfig.from_config(model_data)
return obj

class CollectPipelines(KiaraModule):

_module_type_name = "collect.pipelines"

def create_inputs_schema(
self,
) -> ValueMapSchema:

return {
"pipeline_files": {
"type": "file_bundle",
"doc": "A file bundle containing pipeline files in json or yaml format.",
}
}

def create_outputs_schema(
self,
) -> ValueMapSchema:

return {
"pipelines": {
"type": "table",
"doc": "A table with the pipeline data.",
}
}

def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

pipeline_files: KiaraFileBundle = inputs["pipeline_files"].data

pipelines = {}
for path, pipeline_file in pipeline_files.included_files.items():
if pipeline_file.file_extension in ["json", "yaml", "yml"]:
try:
pipeline_config = PipelineConfig.from_file(path=pipeline_file.path)
pipelines[path] = pipeline_config
job_log.add_log(f"parsed pipeline for: {path}")
except Exception as e:
job_log.add_log(f"ignoring invalid pipeline file '{path}': {e}")

from kiara.utils.cli import terminal_print
for path, pipeline_config in pipelines.items():
terminal_print(pipeline_config.create_renderable())
pipelines_table = None
outputs.set_value("pipelines", pipelines_table)

0 comments on commit 43cc6a5

Please sign in to comment.