diff --git a/src/rail/estimation/algos/uniform_binning.py b/src/rail/estimation/algos/uniform_binning.py index c3eb7962..4f601e5c 100644 --- a/src/rail/estimation/algos/uniform_binning.py +++ b/src/rail/estimation/algos/uniform_binning.py @@ -3,10 +3,11 @@ tomographic bins with uniform binning. """ +import gc import numpy as np from ceci.config import StageParameter as Param from rail.estimation.classifier import PZClassifier -from rail.core.data import TableHandle +from rail.core.data import TableHandle, Hdf5Handle class UniformBinningClassifier(PZClassifier): """Classifier that simply assign tomographic @@ -23,15 +24,18 @@ class UniformBinningClassifier(PZClassifier): nbins=Param(int, 5, msg="Number of tomographic bins"), no_assign=Param(int, -99, msg="Value for no assignment flag"), ) - outputs = [('output', TableHandle)] - + outputs = [('output', Hdf5Handle)] + + def __init__(self, args, comm=None): PZClassifier.__init__(self, args, comm=comm) + + + def _finalize_run(self): + self._output_handle.finalize_write() - def run(self): - test_data = self.get_data('input') - npdf = test_data.npdf + def _process_chunk(self, s, e, test_data, first): try: zb = test_data.ancil[self.config.point_estimate] except KeyError: @@ -45,7 +49,6 @@ def run(self): # assign -99 to objects not in any bin: bin_index[bin_index==0]=self.config.no_assign bin_index[bin_index==len(self.config.zbin_edges)]=self.config.no_assign - else: # linear binning defined by zmin, zmax, and nbins bin_index = np.digitize(zb, np.linspace(self.config.zmin, self.config.zmax, self.config.nbins+1)) @@ -53,16 +56,40 @@ def run(self): bin_index[bin_index==0]=self.config.no_assign bin_index[bin_index==(self.config.nbins+1)]=self.config.no_assign - if self.config.id_name != "": # below is commented out and replaced by a redundant line # because the data doesn't have ID yet # obj_id = test_data[self.config.id_name] - obj_id = np.arange(npdf) + obj_id = np.arange(test_data.npdf) elif self.config.id_name == "": # ID set to row index - obj_id = np.arange(npdf) + obj_id = np.arange(test_data.npdf) self.config.id_name="row_index" class_id = {self.config.id_name: obj_id, "class_id": bin_index} - self.add_data('output', class_id) \ No newline at end of file + self._do_chunk_output(class_id, s, e, first) + + + def _do_chunk_output(self, class_id, start, end, first): + if first: + self._output_handle = self.add_handle('output', data=class_id) + self._output_handle.initialize_write(self._input_length, communicator=self.comm) + self._output_handle.set_data(class_id, partial=True) + self._output_handle.write_chunk(start, end) + + + def run(self): + test_data = self.get_data('input') + + iterator = self.input_iterator('input') # calling RailStage's input_iterator here + first = True + self._output_handle = None + + for s, e, test_data in iterator: + #print(f"Process {self.rank} running estimator on chunk {s} - {e}") + self._process_chunk(s, e, test_data, first) + first = False + # Running garbage collection manually seems to be needed + # to avoid memory growth for some estimators + gc.collect() + self._finalize_run()