Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more basic examples to the README.md #23

Merged
merged 2 commits into from
Nov 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ repos:
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8.git
rev: 4.0.1
rev: 6.1.0
hooks:
- id: flake8
94 changes: 75 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,81 @@ stores from completely independent processes easier.

## Usage

The primary use-case for this is something like this. Say you have a lot of
netCDF files output from a simulation or observational dataset that you would
like to stitch together into a zarr store. If you have a way of combining
those files lazily -- i.e. opening them into dask-backed arrays -- into a
single dataset, then you can write contiguous "partitions" of that dataset out
via independent processes. A "partition" corresponds to a contiguous group of
dask "chunks." I.e. it can correspond to one or more chunks across any number
of dimensions. A key detail is no partition straddles any dask chunks; this
makes writing from independent processes completely safe.
The primary use-case is something like this. Say you have a lot of netCDF files
output from a simulation or observational dataset that you would like to stitch
together into a zarr store. If you have a way of combining those files lazily —
i.e. opening them into dask-backed arrays into a single dataset with maybe
some additional light computations, then you can write contiguous "partitions"
of that dataset out via independent processes. A "partition" corresponds to a
contiguous group of dask "chunks." I.e. it can correspond to one or more chunks
across any number of dimensions. A key detail is no partition straddles any
dask chunks; this makes writing from independent processes completely safe.

`xpartition` provides an accessor called `partition` that implements
`initialize_store` and `write` methods. The pattern is to have some function
that constructs the dataset lazily, then call `initialize_store`, and finally
in a set of separate processes, call `write`.
`initialize_store` and `write` methods. The pattern is to have some code that
constructs the dataset lazily, then call `initialize_store`, and finally in a
set of separate processes, call `write`.

The process might look something like this. Assume through some external
package we have a function that can construct a dataset lazily. Let's write a
couple command-line interfaces that initialize the store and write a
partition. We'll start with one called `initialize_store.py`:
### Simple serial example

Before illustrating a use-case of `xpartition` on a cluster, we can start with a
simple serial example. From this example it should be straightforward to
imagine how to extend this to various distributed computing platforms, whether
HPC or cloud-based, to do the same thing in parallel.

Assume through some external package we have a function that can construct a
dataset lazily. To incrementally write it to zarr using `xpartition` we would
only need to do the following:

```python
import xpartition

from external import construct_lazy_dataset

store = "store.zarr"
partitions = 16
partition_dims = ["tile", "time"]

ds = construct_lazy_dataset()
ds.partition.initialize_store(store)
for partition in range(partitions):
ds.partition.write(store, partitions, partition_dims, partition)
```

The `partition_dims` describe the dimensions over which to partition the
dataset; if chunks exist along dimensions that are not among the partition
dimensions, then they will all be grouped together. If you are not particular
about this, simply using `ds.dims` will also work out of the box.

### Parallelization using `multiprocessing`

A parallel example can easily be illustrated using the built-in
`multiprocessing` library; something similar could be done with `dask.bag`:

```python
import xpartition

from external import construct_lazy_dataset

store = "store.zarr"
partitions = 16
partition_dims = ["tile", "time"]

ds = construct_lazy_dataset()
ds.partition.initialize_store(store)
with multiprocessing.get_context("spawn").Pool(partitions) as pool:
pool.map(
ds.partition.mappable_write(store, partitions, partition_dims),
range(partitions)
)
```

### Parallelization using a SLURM array job

Finally, the example below describes how one might use `xpartition` on an HPC
cluster using a SLURM array job. We first start by writing a couple
command-line interfaces that initialize the store and write a partition. We'll
start with one called `initialize_store.py`:

```python
import argparse
Expand Down Expand Up @@ -72,7 +128,7 @@ ds = construct_lazy_dataset()
ds.partition.write(args.store, args.ranks, dims, args.rank)
```

Now let's write a couple bash scripts. The first will be a SLURM array job
Now we can write a couple bash scripts. The first will be a SLURM array job
that writes all the partitions. The second will be a "main" script that
controls the whole workflow.

Expand Down Expand Up @@ -130,8 +186,8 @@ out of memory errors ([this issue](https://github.com/dask/distributed/issues/63
is perhaps a good summary of the state of things currently in dask). Breaking the
problem down in the way that `xpartition` does, allows you to gain the benefits of
dask's laziness on each independent process, while working in a distributed
environment. *In an ideal world we wouldn't need a package like this -- we would
let dask and dask distributed handle everything -- but in practice that does not
environment. *In an ideal world we wouldn't need a package like this we would
let dask and dask distributed handle everything but in practice that does not
work perfectly yet.*

## Installation
Expand Down
17 changes: 11 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ setup_requires =
setuptools_scm

[flake8]
ignore =
E203 # whitespace before ':' - doesn't work well with black
E402 # module level import not at top of file
E501 # line too long - let black worry about that
E731 # do not assign a lambda expression, use a def
W503 # line break before binary operator
ignore =
# whitespace before ':' - doesn't work well with black
E203
# module level import not at top of file
E402
# line too long - let black worry about that
E501
# do not assign a lambda expression, use a def
E731
# line break before binary operator
W503
exclude =
.eggs
doc
Expand Down
Loading