Skip to content

Commit

Permalink
Merge pull request #85 from LSSTDESC/issue/84/parallel-classifiers
Browse files Browse the repository at this point in the history
Parallelize uniform_binning.py
  • Loading branch information
OliviaLynn authored Feb 12, 2024
2 parents c0e32bb + 40802f3 commit 99b6695
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions src/rail/estimation/algos/uniform_binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
tomographic bins with uniform binning.
"""

import gc
import numpy as np
from ceci.config import StageParameter as Param
from rail.estimation.classifier import PZClassifier
from rail.core.data import TableHandle
from rail.core.data import TableHandle, Hdf5Handle

class UniformBinningClassifier(PZClassifier):
"""Classifier that simply assign tomographic
Expand All @@ -23,15 +24,18 @@ class UniformBinningClassifier(PZClassifier):
nbins=Param(int, 5, msg="Number of tomographic bins"),
no_assign=Param(int, -99, msg="Value for no assignment flag"),
)
outputs = [('output', TableHandle)]

outputs = [('output', Hdf5Handle)]


def __init__(self, args, comm=None):
PZClassifier.__init__(self, args, comm=comm)


def _finalize_run(self):
self._output_handle.finalize_write()

def run(self):
test_data = self.get_data('input')
npdf = test_data.npdf

def _process_chunk(self, s, e, test_data, first):
try:
zb = test_data.ancil[self.config.point_estimate]
except KeyError:
Expand All @@ -45,24 +49,47 @@ def run(self):
# assign -99 to objects not in any bin:
bin_index[bin_index==0]=self.config.no_assign
bin_index[bin_index==len(self.config.zbin_edges)]=self.config.no_assign

else:
# linear binning defined by zmin, zmax, and nbins
bin_index = np.digitize(zb, np.linspace(self.config.zmin, self.config.zmax, self.config.nbins+1))
# assign -99 to objects not in any bin:
bin_index[bin_index==0]=self.config.no_assign
bin_index[bin_index==(self.config.nbins+1)]=self.config.no_assign


if self.config.id_name != "":
# below is commented out and replaced by a redundant line
# because the data doesn't have ID yet
# obj_id = test_data[self.config.id_name]
obj_id = np.arange(npdf)
obj_id = np.arange(test_data.npdf)
elif self.config.id_name == "":
# ID set to row index
obj_id = np.arange(npdf)
obj_id = np.arange(test_data.npdf)
self.config.id_name="row_index"

class_id = {self.config.id_name: obj_id, "class_id": bin_index}
self.add_data('output', class_id)
self._do_chunk_output(class_id, s, e, first)


def _do_chunk_output(self, class_id, start, end, first):
if first:
self._output_handle = self.add_handle('output', data=class_id)
self._output_handle.initialize_write(self._input_length, communicator=self.comm)
self._output_handle.set_data(class_id, partial=True)
self._output_handle.write_chunk(start, end)


def run(self):
test_data = self.get_data('input')

iterator = self.input_iterator('input') # calling RailStage's input_iterator here
first = True
self._output_handle = None

for s, e, test_data in iterator:
#print(f"Process {self.rank} running estimator on chunk {s} - {e}")
self._process_chunk(s, e, test_data, first)
first = False
# Running garbage collection manually seems to be needed
# to avoid memory growth for some estimators
gc.collect()
self._finalize_run()

0 comments on commit 99b6695

Please sign in to comment.