From 8e395caaec5539f3c989705a9a482869836b870c Mon Sep 17 00:00:00 2001 From: drewoldag <47493171+drewoldag@users.noreply.github.com> Date: Tue, 13 Feb 2024 16:06:48 -0800 Subject: [PATCH] Example of metric running in parallel in notebook. --- pyproject.toml | 1 + src/qp/metrics/parallel_metrics.ipynb | 125 +++++++++++++++--- .../metrics/point_estimate_metric_classes.py | 22 ++- 3 files changed, 117 insertions(+), 31 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cb5c0cc..f8bba55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dev = [ "pylint", "mpi4py", "coverage", + "ipyparallel", ] full = [ "tables-io[full]", diff --git a/src/qp/metrics/parallel_metrics.ipynb b/src/qp/metrics/parallel_metrics.ipynb index a8b75c3..5316aa3 100644 --- a/src/qp/metrics/parallel_metrics.ipynb +++ b/src/qp/metrics/parallel_metrics.ipynb @@ -2,39 +2,56 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", - "from qp.metrics.point_estimate_metric_classes import PointSigmaIQR\n" + "from qp.metrics.point_estimate_metric_classes import PointSigmaIQR" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ + "# Generate the random numbers \n", "SEED = 1002330\n", "rng = np.random.default_rng(SEED)\n", "\n", - "estimate = rng.lognormal(mean=1.0, sigma=2, size=1000)\n", - "reference = rng.lognormal(mean=1.3, sigma=1.9, size=1000)" + "chunk_size = 10_000\n", + "n_chunk = 10\n", + "total_size = n_chunk*chunk_size\n", + "\n", + "estimate = rng.lognormal(mean=1.0, sigma=2, size=total_size)\n", + "reference = rng.lognormal(mean=1.3, sigma=1.9, size=total_size)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "1.844492171486455" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ + "# Do the explicit full calculation\n", "PointSigmaIQR().evaluate(estimate, reference)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -42,25 +59,99 @@ "def chunker(seq, size):\n", " return (seq[pos:pos + size] for pos in range(0, len(seq), size))\n", "\n", - "# create an iterator that yields chunks of 100 elements\n", - "estimate_chunks = chunker(estimate, 100)\n", - "reference_chunks = chunker(reference, 100)" + "# create an iterator that yields chunks of chunk_size elements\n", + "estimate_chunks = chunker(estimate, chunk_size)\n", + "reference_chunks = chunker(reference, chunk_size)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# A function to pass to MPI\n", + "def mpi_example(chunk):\n", + " centroids = chunk[0].accumulate(chunk[1], chunk[2])\n", + " return centroids" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Set up the data for ipyparallel\n", + "estimator = PointSigmaIQR()\n", + "estimator_list = [estimator]*n_chunk\n", + "data_chunks = [chunk for chunk in zip(estimator_list, estimate_chunks, reference_chunks)]" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ - "PointSigmaIQR().eval_from_iterator(estimate_chunks, reference_chunks)\n", - "# fails as expected when we get to the `comm` and `centroid` bits in `finalize`." + "# request an MPI cluster with 5 engines\n", + "import ipyparallel as ipp\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting 4 engines with \n", + "100%|██████████| 4/4 [00:05<00:00, 1.48s/engine]\n", + "mpi_example: 100%|██████████| 10/10 [00:00<00:00, 13.42tasks/s]\n", + "0 : (53, 2)\n", + "1 : (53, 2)\n", + "2 : (54, 2)\n", + "3 : (53, 2)\n", + "4 : (55, 2)\n", + "5 : (53, 2)\n", + "6 : (52, 2)\n", + "7 : (55, 2)\n", + "8 : (51, 2)\n", + "9 : (54, 2)\n", + "1.98106779379963\n", + "Stopping engine(s): 1707869025\n", + "engine set stopped 1707869025: {'exit_code': 0, 'pid': 27802, 'identifier': 'ipengine-1707869024-vf70-1707869025-27783'}\n", + "Stopping controller\n", + "Controller stopped: {'exit_code': 0, 'pid': 27789, 'identifier': 'ipcontroller-1707869024-vf70-27783'}\n" + ] + } + ], + "source": [ + "with ipp.Cluster(controller_ip=\"*\", engines=\"mpi\", n=4) as rc:\n", + " # get a broadcast_view on the cluster which is best\n", + " # suited for MPI style computation\n", + " view = rc.load_balanced_view()\n", + " # run the mpi_example function on all engines in parallel\n", + " asyncresult = view.map_async(mpi_example, data_chunks)\n", + " # Retrieve and print the result from the engines\n", + " asyncresult.wait_interactive()\n", + " # retrieve actual results\n", + " result = asyncresult.get()\n", + " # get and print the results\n", + " for i, res in enumerate(result):\n", + " np.array(res)\n", + " print(f\"{i} : {res.shape}\")\n", + " final = estimator.finalize(centroids=result)\n", + " print(final)" ] } ], "metadata": { "kernelspec": { - "display_name": "qp", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -74,9 +165,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.10.12" } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/src/qp/metrics/point_estimate_metric_classes.py b/src/qp/metrics/point_estimate_metric_classes.py index e14d903..5076930 100644 --- a/src/qp/metrics/point_estimate_metric_classes.py +++ b/src/qp/metrics/point_estimate_metric_classes.py @@ -2,9 +2,10 @@ from qp.metrics.base_metric_classes import ( MetricOutputType, PointToPointMetric, - PointToPointMetricDigester ) from pytdigest import TDigest +from functools import reduce +from operator import add class PointStatsEz(PointToPointMetric): @@ -76,19 +77,12 @@ def accumulate(self, estimate, reference): centroids = digest.get_centroids() return centroids - def finalize(self, comm=None, centroids=None): - # ents = comm.gather() - # meta_digest = TDigest.from_centroid(cents) # Or something like this - # return self.compute_from_digest(meta_digest) - # centroids = comm.gather(centroids, root=0) # ??? - - #? Does this need to be the more complex version from the example? i.e. - # digests = ( - # TDigest.of_centroids(centroid, compression=COMPRESSION) - # for centroid in chain.from_iterable(centroids) - # ) - # digest = reduce(add, digests) - digest = TDigest.of_centroids(centroids, compression=100) + def finalize(self, centroids=None): + digests = ( + TDigest.of_centroids(np.array(centroid), compression=100) + for centroid in centroids + ) + digest = reduce(add, digests) return self.compute_from_digest(digest)