-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipelines, job descriptions, & 'code-view' #39
Comments
PipelinesMost of the basics of how to create a kiara pipeline have already been covered by: https://dharpa.org/kiara.documentation/latest/extending_kiara/pipelines/assemble_pipelines/ So this is a deeper dive into some of the important things around this topic, and explains how describing workflows declaratively makes a difference compared to the traditional procedural way of doing data science. As an example, for now lets the It's as simple as it gets, it contains 2 steps: an Both are connected via the output of the kiara can figure out the order in which steps have to be run, and you can let it show you what it thinks with:
As you can see from that output, it recognizes that it needs two separate 'stages', and which step (or steps -- for pipelines with more steps than this simple example) it needs to assign to each stage. This can be arbitrarily complex, and in some cases there it's not clear in which stage a step should run (imagine a 5 stage pipeline, and one step only has a direct user input, but needs to run before the last stage: it could run in stage 1, 2, 3, or 4, technically). Sidenote: job descriptionsJob descriptions are implemented in the [JobDesc][kiara.interfaces.python_api.models.job.JobDesc] class. The can be used to hold a reference to an operation, as well as inputs for that specific operations in a single Python object (or json/yaml file). A simple example job description using our
kiara can run this easily, via:
Or via the API: kiara = KiaraAPI.instance()
result = kiara.run_job('my_namd.yaml')
dbg(result) Or, more verbose (of course you can assemble the from kiara.interfaces.python_api.models.job import JobDesc
kiara = KiaraAPI.instance()
job = JobDesc.create_from_file('my_namd.yaml')
result = kiara.run_job(job)
dbg(result) For more complex examples that use a pipeline file as operation, have a look at the kiara_plugin.tabular example jobs. I'm happy to answer questions that go further in detail, but don't want to write to much here, so as always, just ping me and ask. The main thing to understand is that a Job description contains the operation as well as inputs. The 'code-view' featureThe 'code-view' feature that was requested is similar to the Sidenote: kiara renderersThis is a minor feature that was implemented mainly for the code-view feature, but I also use it internally for some debug/dev work. It basically provides a small, modular mini-framework where you can write a renderer for an internal kiara model (a pipeline, job description, value, data-type, you get the idea). Except for the renderer(s) I describe below, none of them are in any way 'production' (or even more than a stub in many cases), but feel free to try them out if you are interested. You can see the available renderers via:
And since we are only interested in renderers that take a 'pipeline' as input, we can filter like:
That still shows quite a few renderers, for now we are interested in the one that has
so:
Checking the output of that command, we can see it is a python script, only missing some inputs. We can copy that text, and edit the inputs like:
Save it in a file
Not super interesting, but we should get a Code viewThe code view feature was requested in order to let users who use If you install the kiara_plugin.jupyter plugin into your environment, you should also see a
Edit the inputs again, and you should again be able to run this pipeline as Python code within Jupyter, using the kiara Python API. Play around a bit more with this, and use more complex pipelines to see how this works in those cases. Declarative pipeline structureThe main advantage of using a declarative pipeline structure is that it makes it easier to reason about the workflow, as it's a static data structure, that can be probed, visualized, etc. You can see at a glance what the pipeline does, and what the inputs are. You can also see the order in which steps are executed, and what the dependencies are between steps, whereas with code you don't really have that information as readily available, and you can't really do much (meta-)investgation computationally (unless you consider jumping into AST parsing but that would be a different level of complexity again). kiara contains two main Python classes that relate to Pipelines and their structure:
All of those Python classes start with a from kiara.models.module.pipeline import PipelineConfig
pc = PipelineConfig.from_file("/home/markus/projects/kiara/kiara_plugin.tabular/examples/pipelines/init.yaml")
dbg(pc) But also check out the other Classmethods of The other way to get such a config would be from the op = kiara.get_operation("logic.nand")
pc = op.module.config
print(type(pc))
dbg(pc) Once we have such a ps = pc.structure
print(type(ps)) Check the classes source code for every method you can have access to, but some of the interesting ones are: # all step ids
print(f"pipeline steps: {list(ps.step_ids)}")
# a list of lists, each 'root' list representing a stage, and each element in that list
# the steps contained therein
print(f"pipeline stages: {ps.processing_stages}")
for step in ps.steps:
print(f"Step: {step.step_id}")
dbg(step) In the streamlit demo I showed this is used to automatically render the input forms, one page per stage, for example. Tangential to what is explained here, there are also other pipeline-related API endpoints, for example to register external pipeline files into the current kiara context. If you want to do more with pipelines, make sure to read through the source code there to get an idea what you can do currently. And tell me if there is more you'd like to do. Now comes the interesting part, the This is easiest created using either a from kiara.models.module.pipeline.pipeline import Pipeline
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=pc)
print(type(pipeline))
# or:
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=pc)
print(type(pipeline))
dbg(pipeline) This class method also accepts a string or Python mapping, same as we used above to create the The As you can see from the output of the All of this is fairly involved, because it gives developers a high level of control. There are different implementations of A simple end-to-end example how this would be used in code is attached to this issue. Of course, the 'logic.nand' pipeline is trivial bordering on boring, but that should make it easier to follow along. Try the same thing with 'logic.xor', or create your own pipeline to see this whole thing become much more interesting and (IMHO) useful. |
Hm, zip file is stupid, for reference, here's the source code again directly: from kiara.api import KiaraAPI
from kiara.models.module.pipeline.controller import SinglePipelineBatchController
from kiara.models.module.pipeline.pipeline import Pipeline
from kiara.utils.cli import terminal_print
kiara = KiaraAPI.instance()
ps = kiara.get_pipeline_structure("logic.nand")
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=ps)
terminal_print(pipeline, in_panel="Pipeline details after creation")
# the `job_registry` argument is a bit of a leaky abstraction, but I'm not sure
# it's worth cleaning that one up. Just be aware that this part of the code
# could change at some point (with notice of course)
controller = SinglePipelineBatchController(job_registry=kiara.context.job_registry, pipeline=pipeline)
changed = pipeline.set_pipeline_input(pipeline_input_field="a", input_value=True)
# the result of this is some information how the internal state of the pipeline changed
# because of your input, check that out at your leasure bit in most cases its not important
# now lets check the pipeline state
terminal_print(pipeline, in_panel="Status after first input")
# as you can see, the 'a' field now shows a valid input status, but overall
# the pipeline state is still invalid, since we need one more input:
changed = pipeline.set_pipeline_input(pipeline_input_field="b", input_value=True)
# again, look at the internal state
terminal_print(pipeline, in_panel="Status after second input")
# now we see that step 'and' has a new status: `inputs ready`
# this means we can now kick off processing
# the callback is optional, but it lets us see what is happening,
# which is sometimes useufl
callback_output = []
callback = lambda x: callback_output.append(x)
job_ids = controller.process_pipeline(event_callback=callback)
terminal_print(callback_output, in_panel="Pipeline processing log")
# now lets look at the result
terminal_print(job_ids, in_panel="Processed job ids")
# this is a map with jos that where run, we could have a look at the job records if we wanted:
for step_id, job_id in job_ids.items():
job = kiara.get_job(job_id)
# here we could also see if there where any errors while processing, for example
terminal_print(job, in_panel=f"Processing details for step: {step_id}")
# but much more interestingly, let's look at the state of our pipeline:
terminal_print(pipeline, in_panel="Pipeline state after processing")
# as you can see, this seemed to have processed everything (which makes sense, because
# we supplied every pipeline_input with a valid, and each of those values was valid
# in the context of the pipelines modules
# we can look at every aspect of this pipeline, for example what is the internal state of
# the `and` step:
details = pipeline.get_step_details("and")
terminal_print(details, in_panel="Step details: and")
# and what is the intermediate result (it's 'y' output field
intermediate_outputs = pipeline.get_current_step_outputs("and")
values = kiara.get_values(**intermediate_outputs)
terminal_print(values, in_panel="Current step output(s) for step: and")
# which would be the same as the current input of the 2nd stage step 'not':
values = current_step_input = pipeline.get_current_step_inputs('not')
values = kiara.get_values(**intermediate_outputs)
terminal_print(values, in_panel="Current step input(s) for step: not") |
This issue contains an overview of the central data structure in kiara, the pipeline, and associated features related to decoratively describe a data workflow, the central concept around which kiara is built.
The text was updated successfully, but these errors were encountered: