Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize uniform_binning.py #85

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions src/rail/estimation/algos/uniform_binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
tomographic bins with uniform binning.
"""

import gc
import numpy as np
from ceci.config import StageParameter as Param
from rail.estimation.classifier import PZClassifier
from rail.core.data import TableHandle
from rail.core.data import TableHandle, Hdf5Handle

class UniformBinningClassifier(PZClassifier):
"""Classifier that simply assign tomographic
Expand All @@ -23,15 +24,22 @@ class UniformBinningClassifier(PZClassifier):
nbins=Param(int, 5, msg="Number of tomographic bins"),
no_assign=Param(int, -99, msg="Value for no assignment flag"),
)
outputs = [('output', TableHandle)]

outputs = [('output', Hdf5Handle)]


def __init__(self, args, comm=None):
PZClassifier.__init__(self, args, comm=comm)


def _initialize_run(self):
self._output_handle = None


def _finalize_run(self):
self._output_handle.finalize_write()

def run(self):
test_data = self.get_data('input')
npdf = test_data.npdf

def _process_chunk(self, s, e, test_data, first):
try:
zb = test_data.ancil[self.config.point_estimate]
except KeyError:
Expand All @@ -45,15 +53,14 @@ def run(self):
# assign -99 to objects not in any bin:
bin_index[bin_index==0]=self.config.no_assign
bin_index[bin_index==len(self.config.zbin_edges)]=self.config.no_assign

else:
# linear binning defined by zmin, zmax, and nbins
bin_index = np.digitize(zb, np.linspace(self.config.zmin, self.config.zmax, self.config.nbins+1))
# assign -99 to objects not in any bin:
bin_index[bin_index==0]=self.config.no_assign
bin_index[bin_index==(self.config.nbins+1)]=self.config.no_assign


npdf = test_data.npdf
OliviaLynn marked this conversation as resolved.
Show resolved Hide resolved
if self.config.id_name != "":
# below is commented out and replaced by a redundant line
# because the data doesn't have ID yet
Expand All @@ -65,4 +72,30 @@ def run(self):
self.config.id_name="row_index"

class_id = {self.config.id_name: obj_id, "class_id": bin_index}
self.add_data('output', class_id)
self._do_chunk_output(class_id, s, e, first)


def _do_chunk_output(self, class_id, start, end, first):
if first:
self._output_handle = self.add_handle('output', data=class_id)
self._output_handle.initialize_write(self._input_length, communicator=self.comm)
self._output_handle.set_data(class_id, partial=True)
self._output_handle.write_chunk(start, end)


def run(self):
test_data = self.get_data('input')

iterator = self.input_iterator('input') # calling RailStage's input_iterator here
first = True
OliviaLynn marked this conversation as resolved.
Show resolved Hide resolved
self._initialize_run()
OliviaLynn marked this conversation as resolved.
Show resolved Hide resolved
self._output_handle = None

for s, e, test_data in iterator:
#print(f"Process {self.rank} running estimator on chunk {s} - {e}")
self._process_chunk(s, e, test_data, first)
first = False
# Running garbage collection manually seems to be needed
# to avoid memory growth for some estimators
OliviaLynn marked this conversation as resolved.
Show resolved Hide resolved
gc.collect()
self._finalize_run()
Loading