From 7a28c84317025faa82f8f18fbaee6e35a108f213 Mon Sep 17 00:00:00 2001 From: "Olivia R. Lynn" Date: Wed, 7 Feb 2024 14:36:24 -0500 Subject: [PATCH 1/2] Parallelize uniform_binning.py --- src/rail/estimation/algos/uniform_binning.py | 53 ++++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/src/rail/estimation/algos/uniform_binning.py b/src/rail/estimation/algos/uniform_binning.py index c3eb7962..a332a60b 100644 --- a/src/rail/estimation/algos/uniform_binning.py +++ b/src/rail/estimation/algos/uniform_binning.py @@ -3,10 +3,11 @@ tomographic bins with uniform binning. """ +import gc import numpy as np from ceci.config import StageParameter as Param from rail.estimation.classifier import PZClassifier -from rail.core.data import TableHandle +from rail.core.data import TableHandle, Hdf5Handle class UniformBinningClassifier(PZClassifier): """Classifier that simply assign tomographic @@ -23,15 +24,22 @@ class UniformBinningClassifier(PZClassifier): nbins=Param(int, 5, msg="Number of tomographic bins"), no_assign=Param(int, -99, msg="Value for no assignment flag"), ) - outputs = [('output', TableHandle)] - + outputs = [('output', Hdf5Handle)] + + def __init__(self, args, comm=None): PZClassifier.__init__(self, args, comm=comm) + + + def _initialize_run(self): + self._output_handle = None + + + def _finalize_run(self): + self._output_handle.finalize_write() - def run(self): - test_data = self.get_data('input') - npdf = test_data.npdf + def _process_chunk(self, s, e, test_data, first): try: zb = test_data.ancil[self.config.point_estimate] except KeyError: @@ -45,15 +53,14 @@ def run(self): # assign -99 to objects not in any bin: bin_index[bin_index==0]=self.config.no_assign bin_index[bin_index==len(self.config.zbin_edges)]=self.config.no_assign - else: # linear binning defined by zmin, zmax, and nbins bin_index = np.digitize(zb, np.linspace(self.config.zmin, self.config.zmax, self.config.nbins+1)) # assign -99 to objects not in any bin: bin_index[bin_index==0]=self.config.no_assign bin_index[bin_index==(self.config.nbins+1)]=self.config.no_assign - - + + npdf = test_data.npdf if self.config.id_name != "": # below is commented out and replaced by a redundant line # because the data doesn't have ID yet @@ -65,4 +72,30 @@ def run(self): self.config.id_name="row_index" class_id = {self.config.id_name: obj_id, "class_id": bin_index} - self.add_data('output', class_id) \ No newline at end of file + self._do_chunk_output(class_id, s, e, first) + + + def _do_chunk_output(self, class_id, start, end, first): + if first: + self._output_handle = self.add_handle('output', data=class_id) + self._output_handle.initialize_write(self._input_length, communicator=self.comm) + self._output_handle.set_data(class_id, partial=True) + self._output_handle.write_chunk(start, end) + + + def run(self): + test_data = self.get_data('input') + + iterator = self.input_iterator('input') # calling RailStage's input_iterator here + first = True + self._initialize_run() + self._output_handle = None + + for s, e, test_data in iterator: + #print(f"Process {self.rank} running estimator on chunk {s} - {e}") + self._process_chunk(s, e, test_data, first) + first = False + # Running garbage collection manually seems to be needed + # to avoid memory growth for some estimators + gc.collect() + self._finalize_run() From 40802f3c723c010dfeb059d014009fff075db11f Mon Sep 17 00:00:00 2001 From: "Olivia R. Lynn" Date: Wed, 7 Feb 2024 16:01:43 -0500 Subject: [PATCH 2/2] Update uniform_binning.py - remove initialize_run and npdf var --- src/rail/estimation/algos/uniform_binning.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/rail/estimation/algos/uniform_binning.py b/src/rail/estimation/algos/uniform_binning.py index a332a60b..4f601e5c 100644 --- a/src/rail/estimation/algos/uniform_binning.py +++ b/src/rail/estimation/algos/uniform_binning.py @@ -30,11 +30,7 @@ class UniformBinningClassifier(PZClassifier): def __init__(self, args, comm=None): PZClassifier.__init__(self, args, comm=comm) - - def _initialize_run(self): - self._output_handle = None - def _finalize_run(self): self._output_handle.finalize_write() @@ -59,16 +55,15 @@ def _process_chunk(self, s, e, test_data, first): # assign -99 to objects not in any bin: bin_index[bin_index==0]=self.config.no_assign bin_index[bin_index==(self.config.nbins+1)]=self.config.no_assign - - npdf = test_data.npdf + if self.config.id_name != "": # below is commented out and replaced by a redundant line # because the data doesn't have ID yet # obj_id = test_data[self.config.id_name] - obj_id = np.arange(npdf) + obj_id = np.arange(test_data.npdf) elif self.config.id_name == "": # ID set to row index - obj_id = np.arange(npdf) + obj_id = np.arange(test_data.npdf) self.config.id_name="row_index" class_id = {self.config.id_name: obj_id, "class_id": bin_index} @@ -88,7 +83,6 @@ def run(self): iterator = self.input_iterator('input') # calling RailStage's input_iterator here first = True - self._initialize_run() self._output_handle = None for s, e, test_data in iterator: