Skip to content

Commit

Permalink
Add more basic examples to the README.md (#23)
Browse files Browse the repository at this point in the history
* Add more basic examples to the README.md

* Bump flake8
  • Loading branch information
spencerkclark authored Nov 5, 2023
1 parent 1a3a4fa commit 895bc5e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ repos:
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8.git
rev: 4.0.1
rev: 6.1.0
hooks:
- id: flake8
94 changes: 75 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,81 @@ stores from completely independent processes easier.

## Usage

The primary use-case for this is something like this. Say you have a lot of
netCDF files output from a simulation or observational dataset that you would
like to stitch together into a zarr store. If you have a way of combining
those files lazily -- i.e. opening them into dask-backed arrays -- into a
single dataset, then you can write contiguous "partitions" of that dataset out
via independent processes. A "partition" corresponds to a contiguous group of
dask "chunks." I.e. it can correspond to one or more chunks across any number
of dimensions. A key detail is no partition straddles any dask chunks; this
makes writing from independent processes completely safe.
The primary use-case is something like this. Say you have a lot of netCDF files
output from a simulation or observational dataset that you would like to stitch
together into a zarr store. If you have a way of combining those files lazily —
i.e. opening them into dask-backed arrays into a single dataset with maybe
some additional light computations, then you can write contiguous "partitions"
of that dataset out via independent processes. A "partition" corresponds to a
contiguous group of dask "chunks." I.e. it can correspond to one or more chunks
across any number of dimensions. A key detail is no partition straddles any
dask chunks; this makes writing from independent processes completely safe.

`xpartition` provides an accessor called `partition` that implements
`initialize_store` and `write` methods. The pattern is to have some function
that constructs the dataset lazily, then call `initialize_store`, and finally
in a set of separate processes, call `write`.
`initialize_store` and `write` methods. The pattern is to have some code that
constructs the dataset lazily, then call `initialize_store`, and finally in a
set of separate processes, call `write`.

The process might look something like this. Assume through some external
package we have a function that can construct a dataset lazily. Let's write a
couple command-line interfaces that initialize the store and write a
partition. We'll start with one called `initialize_store.py`:
### Simple serial example

Before illustrating a use-case of `xpartition` on a cluster, we can start with a
simple serial example. From this example it should be straightforward to
imagine how to extend this to various distributed computing platforms, whether
HPC or cloud-based, to do the same thing in parallel.

Assume through some external package we have a function that can construct a
dataset lazily. To incrementally write it to zarr using `xpartition` we would
only need to do the following:

```python
import xpartition

from external import construct_lazy_dataset

store = "store.zarr"
partitions = 16
partition_dims = ["tile", "time"]

ds = construct_lazy_dataset()
ds.partition.initialize_store(store)
for partition in range(partitions):
ds.partition.write(store, partitions, partition_dims, partition)
```

The `partition_dims` describe the dimensions over which to partition the
dataset; if chunks exist along dimensions that are not among the partition
dimensions, then they will all be grouped together. If you are not particular
about this, simply using `ds.dims` will also work out of the box.

### Parallelization using `multiprocessing`

A parallel example can easily be illustrated using the built-in
`multiprocessing` library; something similar could be done with `dask.bag`:

```python
import xpartition

from external import construct_lazy_dataset

store = "store.zarr"
partitions = 16
partition_dims = ["tile", "time"]

ds = construct_lazy_dataset()
ds.partition.initialize_store(store)
with multiprocessing.get_context("spawn").Pool(partitions) as pool:
pool.map(
ds.partition.mappable_write(store, partitions, partition_dims),
range(partitions)
)
```

### Parallelization using a SLURM array job

Finally, the example below describes how one might use `xpartition` on an HPC
cluster using a SLURM array job. We first start by writing a couple
command-line interfaces that initialize the store and write a partition. We'll
start with one called `initialize_store.py`:

```python
import argparse
Expand Down Expand Up @@ -72,7 +128,7 @@ ds = construct_lazy_dataset()
ds.partition.write(args.store, args.ranks, dims, args.rank)
```

Now let's write a couple bash scripts. The first will be a SLURM array job
Now we can write a couple bash scripts. The first will be a SLURM array job
that writes all the partitions. The second will be a "main" script that
controls the whole workflow.

Expand Down Expand Up @@ -130,8 +186,8 @@ out of memory errors ([this issue](https://github.com/dask/distributed/issues/63
is perhaps a good summary of the state of things currently in dask). Breaking the
problem down in the way that `xpartition` does, allows you to gain the benefits of
dask's laziness on each independent process, while working in a distributed
environment. *In an ideal world we wouldn't need a package like this -- we would
let dask and dask distributed handle everything -- but in practice that does not
environment. *In an ideal world we wouldn't need a package like this we would
let dask and dask distributed handle everything but in practice that does not
work perfectly yet.*

## Installation
Expand Down
17 changes: 11 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ setup_requires =
setuptools_scm

[flake8]
ignore =
E203 # whitespace before ':' - doesn't work well with black
E402 # module level import not at top of file
E501 # line too long - let black worry about that
E731 # do not assign a lambda expression, use a def
W503 # line break before binary operator
ignore =
# whitespace before ':' - doesn't work well with black
E203
# module level import not at top of file
E402
# line too long - let black worry about that
E501
# do not assign a lambda expression, use a def
E731
# line break before binary operator
W503
exclude =
.eggs
doc
Expand Down

0 comments on commit 895bc5e

Please sign in to comment.