Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap to Hamilton DAG #34

Merged
merged 26 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0a1a9d0
docs: add placeholders for workflow conversions
kreczko Mar 20, 2024
bfc69da
docs: add orchestration and rephrase existing docs
kreczko Mar 22, 2024
03e189e
docs: add Snakemake reference
kreczko Jun 17, 2024
a371bd0
pyproject: replace prefect with hamilton and explicit dask dependency
kreczko Oct 8, 2024
b34cbfa
rename stage -> task
kreczko Oct 8, 2024
325195d
pyproject: add pydantic as new dependency
kreczko Oct 8, 2024
da6a653
feat(cli): reduce verbosity of `lint` subcommand
kreczko Oct 8, 2024
ef24d45
docs: change stages -> tasks
kreczko Oct 8, 2024
89421f8
feat: add metadata to FlowConfig
kreczko Oct 8, 2024
dfaeecf
fix(LocalBashOperator): repr should put bash_command in quotes for pr…
kreczko Oct 14, 2024
f435991
CI: add python 3.12 for testing
kreczko Oct 14, 2024
eba062a
FlowConfig: add optional metadata
kreczko Oct 14, 2024
a4b436e
build: add python 3.13 to list
kreczko Oct 14, 2024
b3f1fa2
build: add dill dependency
kreczko Oct 14, 2024
24508da
build: drop python 3.10 due to missing hashlib.file_digest
kreczko Oct 14, 2024
51a2ffc
feat(workflow): add disk persistence
kreczko Oct 14, 2024
930d2a4
tests: stages -> tasks
kreczko Oct 14, 2024
1b9038f
tests: BashOperator -> LocalBashOperator
kreczko Oct 14, 2024
698a8fa
test: add workflow.task_names
kreczko Oct 15, 2024
27e4b45
feat(workflow): add disk persistance
kreczko Oct 15, 2024
bb85246
refactor: replace prefect with hamilton
kreczko Oct 15, 2024
0afb1e9
fix(cli): add version_base and metadata for config, add hamilton DAG
kreczko Oct 15, 2024
5d35ca2
feat: align Hamilton cache path with workflow save path
kreczko Oct 15, 2024
b8a9931
feat(orch): add adapters for dask and local
kreczko Oct 15, 2024
715fd3d
fix: pylint errors
kreczko Oct 15, 2024
980dea8
refactor(pylint): move pylint config to one location
kreczko Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]
python-version: ["3.11", "3.12"]
runs-on: [ubuntu-latest, macos-latest, windows-latest]

steps:
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ repos:
files: src|tests
args: []
additional_dependencies:
- dill
- plumbum
- pytest
- pydantic
Expand Down
12 changes: 8 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ persistent=yes

# Minimum Python version to use for version dependent checks. Will default to
# the version used to run pylint.
py-version=3.10
py-version=3.11

# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
Expand Down Expand Up @@ -88,7 +88,11 @@ disable=raw-checker-failed,
use-symbolic-message-instead,
import-outside-toplevel,
fixme,
super-init-not-called
super-init-not-called,
design,
line-too-long,
missing-module-docstring,
wrong-import-position

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand All @@ -113,7 +117,7 @@ evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / stateme
# Set the output format. Available formats are text, parseable, colorized, json
# and msvs (visual studio). You can also give a reporter class, e.g.
# mypackage.mymodule.MyReporterClass.
output-format=text
output-format=colorized

# Tells whether to display a full report or only the messages.
reports=no
Expand Down Expand Up @@ -143,7 +147,7 @@ ignore-comments=yes
ignore-docstrings=yes

# Imports are removed from the similarity computation
ignore-imports=no
ignore-imports=yes

# Signatures are removed from the similarity computation
ignore-signatures=no
Expand Down
6 changes: 3 additions & 3 deletions docs/advanced_examples/cms_full_analysis_example.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ perspecite, they are effectively independent branches. To split a workflow, you
will need to use the `needs` keyword:

```yaml
stages:
- name: Data Input Stage
tasks:
- name: Data Input task
...
- name: Common selection
...
Expand All @@ -48,7 +48,7 @@ This will create a DAG like this:

```{mermaid}
flowchart TD
A[Data Input Stage] --> B(Common selection)
A[Data Input task] --> B(Common selection)
B --> C[signal selection]
B --> D[control selection]
C --> E[Create histograms for signal region]
Expand Down
6 changes: 3 additions & 3 deletions docs/command_line_interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ fasthep flow run \
```

You can also specify the number of events to process, the executor to use, and
the stage to run:
the task to run:

```bash
fasthep flow run \
examples/cms_pub_example.yaml \
--output examples/output/cms_pub_example \
--n-events 1000 \
--executor dask-local \
--stages "Select events", "Histograms after selection"
--tasks "Select events", "Histograms after selection"
```

This will run the workflow for 1000 events, using the `dask-local` executor, and
only run the `Select events` and `Histograms after selection` stages.
only run the `Select events` and `Histograms after selection` tasks.
90 changes: 44 additions & 46 deletions docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,58 @@

`fasthep-flow` does not implement any processing itself, but rather delegates
between user workflow description (the
[YAML configuration file](./configuration.md)), the workflow stages (e.g.
**Python Callables**), the **workflow DAG** and the **Executor** engine. Unless
excplicitely stated, every workflow has to start with a **Data Stage**, has one
or more **Processing stages**, and end with an **Output stage**.
[YAML configuration file](./configuration/index.md)), the workflow tasks (e.g.
**Python Callables**), the **workflow DAG** and the **Orchestration** layer.
Unless excplicitely stated, every workflow has to start with a **Data task**,
has one or more **Processing tasks**, and end with an **Output task**.

## Stages
## Tasks

**Data Stage**: The data stage is any callable that returns data. This can be a
**Data task**: The data task is any callable that returns data. This can be a
function that reads data from a file, or a function that generates data. The
data stage is the first stage in a workflow, and is executed only once. The
output of the data stage is passed to the first processing stage.
data task is the first task in a workflow, and is executed only once. The output
of the data task is passed to the first processing task.

**Processing Stage**: A processing stage is any callable that takes data as
input and returns data as output. The output of a processing stage is passed to
the next processing stage. The processing stages are executed in order, and can
be parallelised. In `fasthep-flow` these stages are represented by
**Operators**. Details on how this works and what is required to write one, can
be found in [Operators](./operators.md).
**Processing task**: A processing task is any callable that takes data as input
and returns data as output. The output of a processing task is passed to the
next processing task. The processing tasks are executed in order, and can be
parallelised. In `fasthep-flow` these tasks are represented by **Operators**.
Details on how this works and what is required to write one, can be found in
[Operators](./operators.md).

**Output Stage**: The output stage is any callable that takes data as input and
returns data as output. The output of the last processing stage is passed to the
output stage. The output stage is executed only once, and is the last stage in a
workflow. The output of the output stage is saved to disk.
**Output task**: The output task is any callable that takes data as input and
returns data as output. The output of the last processing task is passed to the
output task. The output task is executed only once, and is the last task in a
workflow. The output of the output task is saved to disk.

### Exceptional stages
### Exceptional tasks

Of course, not all workflows are as simple as the above. In some cases, you may
want to checkpoint the process, write out intermediate results, or do some other
special processing. For this, `fasthep-flow` has the following special stages:

**Provenance Stage**: The provenance stage is a special stagen that typically
runs outside the workflow. It is used to collect information about the workflow,
such as the software versions used, the input data, and the output data. The
provenance stage is executed only once, and is the last stage in a workflow. The
output of the provenance stage is saved to disk.

**Caching stage**: The caching stage is a special stage that can be used to
cache the output of a processing stage. The caching stage can be added to any
processing stage, and will save the output of the processing stage to disk or
remote storage.

**Monitoring stage**: The monitoring stage is a special stage that can be used
to monitor the progress of the workflow. The monitoring stage can be added to
any processing stage, and can either store the information locally or send it in
special processing. For this, `fasthep-flow` has the following special tasks:

**Provenance task**: The provenance task is a special task that typically runs
outside the workflow. It is used to collect information about the workflow, such
as the software versions used, the input data, and the output data. The
provenance task is executed only once, and is the last task in a workflow. The
output of the provenance task is saved to disk.

**Caching task**: The caching task is a special task that can be used to cache
the output of a processing task. The caching task can be added to any processing
task, and will save the output of the processing task to disk or remote storage.

**Monitoring task**: The monitoring task is a special task that can be used to
monitor the progress of the workflow. The monitoring task can be added to any
processing task, and can either store the information locally or send it in
intervals to a specified endpoint.

## Anatomy of an analysis workflow

In the most general terms, an analysis workflow consists of the following parts:

- **Data stage**: the data to be analysed
- **Processing stages**: the analysis steps
- **Output stage**: the output of the analysis
- **Data task**: the data to be analysed
- **Processing tasks**: the analysis steps
- **Output task**: the output of the analysis

The following diagram shows the different parts of an analysis workflow,
including the data flow between them:
Expand All @@ -63,25 +62,24 @@ including the data flow between them:
:align: center
:class: with-border

Analysis workflow: starting from input data, the workflow is split into stages, which are then executed in order. Tasks in each stage can be parallelised, and the output of each stage is passed to the next. In the end, the output of the last stage is saved to disk.
Analysis workflow: starting from input data, the workflow is split into tasks, which are then executed in order. Tasks in each task can be parallelised, and the output of each task is passed to the next. In the end, the output of the last task is saved to disk.
```

In `fasthep-flow` we attempt to map each part of an analysis workflow onto a
**stage** in the YAML file. By default each consecutive stage will be executed
in order, but this can be changed by specifying dependencies between stages.
**task** in the YAML file. By default each consecutive task will be executed in
order, but this can be changed by specifying dependencies between tasks.
Currently only one parallelisation strategy is supported, `split-by-file`, but
more will be added in the future. `fasthep-flow` will create **Tasks** for each
stage based on the parallelisation strategy. E.g. if the parallelisation
strategy is `split-by-file`, then each file will be processed in a separate
task.
task based on the parallelisation strategy. E.g. if the parallelisation strategy
is `split-by-file`, then each file will be processed in a separate task.

The following diagram shows the different stages:
The following diagram shows the different tasks:

```{figure} /images/workflow_stages.png
---
class: with-border
---
Stages of a workflow: a workflow starts with a data stage, has one or more processing stages, and ends with an output stage.
Task of a workflow: a workflow starts with a data task, has one or more processing tasks, and ends with an output task.
```

Of course, this is a very simplified picture, a more realistic example is shown
Expand Down
50 changes: 30 additions & 20 deletions docs/configuration/environments.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
# Environments

By default, all of the stages in a workflow are executed in the same
environment. However, in some scenarios, it may be necessary to execute
different stages in different environments. For example, you may want to run a
stage that uses a framework that needs its own specific environment, such as
TensorFlow, PyTorch, or ROOT. Or, you may want to run a stage on a different
machine that has more memory or more CPUs.
By default, all of the tasks in a workflow are executed in the same environment.
However, in some scenarios, it may be necessary to execute different tasks in
different environments. For example, you may want to run a task that uses a
framework that needs its own specific environment, such as TensorFlow, PyTorch,
or ROOT. Or, you may want to run a task on a different machine that has more
memory or more CPUs.

To support this, `fasthep-flow` allows you to specify the environment for each
stage in a workflow. The environment is specified using the `environment` key in
the stage definition. The `environment` has many settings, which we will discuss
task in a workflow. The environment is specified using the `environment` key in
the task definition. The `environment` has many settings, which we will discuss
here.

Let's start with a simple example:

```yaml
stages:
tasks:
- name: runROOTMacro
type: "fasthep_flow.operators.BashOperator"
kwargs:
bash_command: root -bxq <path to ROOT macro>
environment:
image: docker.io/rootproject/root:6.28.04-ubuntu22.04
variables: <path to .env>
executor: LocalExecutor
flow: prefect::SequentialTaskRunner
- name: runStatsCode
type: "fasthep_flow.operators.BashOperator"
kwargs:
Expand All @@ -37,32 +37,42 @@ stages:
process_on: gpu_if_available
```

There is a lot to unpack here, so let's start bit by bit. The first stage uses
`environment::image`, `environment::variables`, and `environment::executor`. The
There is a lot to unpack here, so let's start bit by bit. The first task uses
`environment::image`, `environment::variables`, and `environment::flow`. The
`image` is a container image, here Docker, while `variables` defines the
environmental variables. The values for `variables` can either be a path to an
`.env` file or a dictionary of key-value pairs (see 2nd example). The `executor`
defines the executor to use for this stage. The default is `DaskExecutor`, but
here we are using `LocalExecutor` to run the stage locally.
`.env` file or a dictionary of key-value pairs (see 2nd example).

```{note}
A `.env` file is a file specifying variables in the format `VARIABLE=VALUE` - one per line. For example, `STATS_METHOD=CLs` is a valid `.env` file.
```

In the second stage, we use `environment::image`, `environment::variables` and
The `flow`defines the orchestration of the workflow to use for this task. The
default orchestration is defined in the global settings, usually set to
`prefect::DaskTaskRunner`. In this case, we are using the
`prefect::SequentialTaskRunner` to run the task locally.

````{note}
The `flow` setting has to use the same prefix as the global setting and has to match a defined orchestration.```


In the second task, we use `environment::image`, `environment::variables` and
`environment::resources`. We've already discussed the firs two, but we use the
dictionary variable definition here, instead of the `.env` file. The new
additin, `resources`, is the same as for the global setting. Here you can define
memory, CPU, and GPU resources for the stage. These will be passed to the
executor.
memory, CPU, and GPU resources for the task. These will be passed to the
orchestration layer.

The full set of options for `environment` is:

```yaml
environment:
variables: <path to .env> | { <key>: <value>, ... }
image: <image name>
executor: LocalExecutor | DaskExecutor | any other supported executor
workflow:
transform: prefect
kwargs:
runner: SequentialTaskRunner | DaskTaskRunner | any other supported value
resources: # see details in global settings
extra_data: TBC
```
````
19 changes: 10 additions & 9 deletions docs/configuration/global_settings.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Global Settings

In addition to the stages, the YAML file can also contain global settings. These
settings apply to all stages, unless overridden by the stage.
In addition to the tasks, the YAML file can also contain global settings. These
settings apply to all tasks, unless overridden by the task.

```yaml
global:
Expand All @@ -12,28 +12,29 @@ global:
prefix: h_
folder_rule: from_name | fixed | None
folder: None
executor: DaskExecutor
output_dir: /path/to/output/dir
flow: prefect::DaskTaskRunner
output:
directory: /path/to/output/dir
variables: <path to .env> | { <key>: <value>, ... }
```

## Resources

The `resources` key defines the resources to use for the workflow. How these are
interpreted depends on the executor and stages used.
interpreted depends on the flow and tasks used.

- `memory`: the amount of memory to use for the workflow. This is passed to the
executor.
- `process_on`: the type of resource to use for the workflow. This is passed to
the stages.
the tasks.

For `process_on`, the following values are supported:

- `cpu`: run on a CPU. This is the default.
- `gpu`: run on a GPU. This will fail if no GPU is available or the stage cannot
- `gpu`: run on a GPU. This will fail if no GPU is available or the task cannot
run on GPU.
- `gpu_if_available`: run on a GPU if available, otherwise run on a CPU.

Stages that can support GPUs will need to register their CPU and GPU versions
with `fasthep-flow` (see [here](register.md)). If a stage is not registered with
tasks that can support GPUs will need to register their CPU and GPU versions
with `fasthep-flow` (see [here](register.md)). If a task is not registered with
a GPU version, then `gpu_if_available` will run the CPU version.
16 changes: 8 additions & 8 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ variations.md
Here's a simplified example of a YAML file:

```yaml
stages:
tasks:
- name: printEcho
type: "fasthep_flow.operators.BashOperator"
kwargs:
Expand All @@ -33,13 +33,13 @@ stages:
op_args: ["Hello World!"]
```

This YAML file defines two stages, `printEcho` and `printPython`. The
`printEcho` stage uses the `BashOperator`, and the `printPython` stage uses the
`PythonOperator`. The `printEcho` stage passes the argument
`echo "Hello World!"` to the `bash_command` argument of the `BashOperator`. To
make it easier to use Python callables, `fasthep-flow` provides the
`PythonOperator`. This operator takes a Python callable and its arguments, and
then calls the callable with the arguments.
This YAML file defines two tasks, `printEcho` and `printPython`. The `printEcho`
task uses the `BashOperator`, and the `printPython` task uses the
`PythonOperator`. The `printEcho` task passes the argument `echo "Hello World!"`
to the `bash_command` argument of the `BashOperator`. To make it easier to use
Python callables, `fasthep-flow` provides the `PythonOperator`. This operator
takes a Python callable and its arguments, and then calls the callable with the
arguments.

```{note}
- you can test the validity of a config via `fasthep-flow lint <config.yaml>`
Expand Down
Loading
Loading