Skip to content

Commit

Permalink
Example of metric running in parallel in notebook.
Browse files Browse the repository at this point in the history
  • Loading branch information
drewoldag committed Feb 14, 2024
1 parent 92d35cd commit 8e395ca
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 31 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dev = [
"pylint",
"mpi4py",
"coverage",
"ipyparallel",
]
full = [
"tables-io[full]",
Expand Down
125 changes: 108 additions & 17 deletions src/qp/metrics/parallel_metrics.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,156 @@
"cells": [
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from qp.metrics.point_estimate_metric_classes import PointSigmaIQR\n"
"from qp.metrics.point_estimate_metric_classes import PointSigmaIQR"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Generate the random numbers \n",
"SEED = 1002330\n",
"rng = np.random.default_rng(SEED)\n",
"\n",
"estimate = rng.lognormal(mean=1.0, sigma=2, size=1000)\n",
"reference = rng.lognormal(mean=1.3, sigma=1.9, size=1000)"
"chunk_size = 10_000\n",
"n_chunk = 10\n",
"total_size = n_chunk*chunk_size\n",
"\n",
"estimate = rng.lognormal(mean=1.0, sigma=2, size=total_size)\n",
"reference = rng.lognormal(mean=1.3, sigma=1.9, size=total_size)"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 3,
"metadata": {},
"outputs": [],
"outputs": [
{
"data": {
"text/plain": [
"1.844492171486455"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Do the explicit full calculation\n",
"PointSigmaIQR().evaluate(estimate, reference)"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"#generator that yields chunks from estimate and reference\n",
"def chunker(seq, size):\n",
" return (seq[pos:pos + size] for pos in range(0, len(seq), size))\n",
"\n",
"# create an iterator that yields chunks of 100 elements\n",
"estimate_chunks = chunker(estimate, 100)\n",
"reference_chunks = chunker(reference, 100)"
"# create an iterator that yields chunks of chunk_size elements\n",
"estimate_chunks = chunker(estimate, chunk_size)\n",
"reference_chunks = chunker(reference, chunk_size)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# A function to pass to MPI\n",
"def mpi_example(chunk):\n",
" centroids = chunk[0].accumulate(chunk[1], chunk[2])\n",
" return centroids"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# Set up the data for ipyparallel\n",
"estimator = PointSigmaIQR()\n",
"estimator_list = [estimator]*n_chunk\n",
"data_chunks = [chunk for chunk in zip(estimator_list, estimate_chunks, reference_chunks)]"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"PointSigmaIQR().eval_from_iterator(estimate_chunks, reference_chunks)\n",
"# fails as expected when we get to the `comm` and `centroid` bits in `finalize`."
"# request an MPI cluster with 5 engines\n",
"import ipyparallel as ipp\n",
"import time"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>\n",
"100%|██████████| 4/4 [00:05<00:00, 1.48s/engine]\n",
"mpi_example: 100%|██████████| 10/10 [00:00<00:00, 13.42tasks/s]\n",
"0 : (53, 2)\n",
"1 : (53, 2)\n",
"2 : (54, 2)\n",
"3 : (53, 2)\n",
"4 : (55, 2)\n",
"5 : (53, 2)\n",
"6 : (52, 2)\n",
"7 : (55, 2)\n",
"8 : (51, 2)\n",
"9 : (54, 2)\n",
"1.98106779379963\n",
"Stopping engine(s): 1707869025\n",
"engine set stopped 1707869025: {'exit_code': 0, 'pid': 27802, 'identifier': 'ipengine-1707869024-vf70-1707869025-27783'}\n",
"Stopping controller\n",
"Controller stopped: {'exit_code': 0, 'pid': 27789, 'identifier': 'ipcontroller-1707869024-vf70-27783'}\n"
]
}
],
"source": [
"with ipp.Cluster(controller_ip=\"*\", engines=\"mpi\", n=4) as rc:\n",
" # get a broadcast_view on the cluster which is best\n",
" # suited for MPI style computation\n",
" view = rc.load_balanced_view()\n",
" # run the mpi_example function on all engines in parallel\n",
" asyncresult = view.map_async(mpi_example, data_chunks)\n",
" # Retrieve and print the result from the engines\n",
" asyncresult.wait_interactive()\n",
" # retrieve actual results\n",
" result = asyncresult.get()\n",
" # get and print the results\n",
" for i, res in enumerate(result):\n",
" np.array(res)\n",
" print(f\"{i} : {res.shape}\")\n",
" final = estimator.finalize(centroids=result)\n",
" print(final)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "qp",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -74,9 +165,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
"nbformat_minor": 4
}
22 changes: 8 additions & 14 deletions src/qp/metrics/point_estimate_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from qp.metrics.base_metric_classes import (
MetricOutputType,
PointToPointMetric,
PointToPointMetricDigester
)
from pytdigest import TDigest
from functools import reduce
from operator import add


class PointStatsEz(PointToPointMetric):
Expand Down Expand Up @@ -76,19 +77,12 @@ def accumulate(self, estimate, reference):
centroids = digest.get_centroids()
return centroids

Check warning on line 78 in src/qp/metrics/point_estimate_metric_classes.py

View check run for this annotation

Codecov / codecov/patch

src/qp/metrics/point_estimate_metric_classes.py#L75-L78

Added lines #L75 - L78 were not covered by tests

def finalize(self, comm=None, centroids=None):
# ents = comm.gather()
# meta_digest = TDigest.from_centroid(cents) # Or something like this
# return self.compute_from_digest(meta_digest)
# centroids = comm.gather(centroids, root=0) # ???

#? Does this need to be the more complex version from the example? i.e.
# digests = (
# TDigest.of_centroids(centroid, compression=COMPRESSION)
# for centroid in chain.from_iterable(centroids)
# )
# digest = reduce(add, digests)
digest = TDigest.of_centroids(centroids, compression=100)
def finalize(self, centroids=None):
digests = (

Check warning on line 81 in src/qp/metrics/point_estimate_metric_classes.py

View check run for this annotation

Codecov / codecov/patch

src/qp/metrics/point_estimate_metric_classes.py#L81

Added line #L81 was not covered by tests
TDigest.of_centroids(np.array(centroid), compression=100)
for centroid in centroids
)
digest = reduce(add, digests)

Check warning on line 85 in src/qp/metrics/point_estimate_metric_classes.py

View check run for this annotation

Codecov / codecov/patch

src/qp/metrics/point_estimate_metric_classes.py#L85

Added line #L85 was not covered by tests

return self.compute_from_digest(digest)

Check warning on line 87 in src/qp/metrics/point_estimate_metric_classes.py

View check run for this annotation

Codecov / codecov/patch

src/qp/metrics/point_estimate_metric_classes.py#L87

Added line #L87 was not covered by tests

Expand Down

0 comments on commit 8e395ca

Please sign in to comment.