From 27c183f061d215fc9f901eb2fe983fa9945d0e71 Mon Sep 17 00:00:00 2001 From: Josue De-Santiago Date: Fri, 10 Nov 2023 18:48:24 -0600 Subject: [PATCH 1/4] Parallel point_est_hist --- src/rail/estimation/algos/naive_stack.py | 24 ++--------- src/rail/estimation/algos/point_est_hist.py | 47 ++++++++++++++------- src/rail/estimation/summarizer.py | 19 +++++++++ 3 files changed, 54 insertions(+), 36 deletions(-) diff --git a/src/rail/estimation/algos/naive_stack.py b/src/rail/estimation/algos/naive_stack.py index 9d3c29d4..4d52c568 100644 --- a/src/rail/estimation/algos/naive_stack.py +++ b/src/rail/estimation/algos/naive_stack.py @@ -25,7 +25,7 @@ def run(self): self.add_data('model', np.array([None])) class NaiveStackSummarizer(PZSummarizer): - """Summarizer which simply histograms a point estimate + """Summarizer which stacks individual P(z) """ name = 'NaiveStackSummarizer' @@ -49,7 +49,7 @@ def run(self): # Initiallizing the stacking pdf's yvals = np.zeros((1, len(self.zgrid))) bvals = np.zeros((self.config.nsamples, len(self.zgrid))) - bootstrap_matrix = self.broadcast_bootstrap_matrix() + bootstrap_matrix = self._broadcast_bootstrap_matrix() first = True for s, e, test_data in iterator: @@ -57,7 +57,7 @@ def run(self): self._process_chunk(s, e, test_data, first, bootstrap_matrix, yvals, bvals) first = False if self.comm is not None: # pragma: no cover - bvals, yvals = self.join_histograms(bvals, yvals) + bvals, yvals = self._join_histograms(bvals, yvals) if self.rank == 0: sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=bvals)) @@ -77,24 +77,6 @@ def _process_chunk(self, start, end, data, first, bootstrap_matrix, yvals, bval bootstrap_draws = bootstrap_draws[mask] - start bvals[i] += np.sum(pdf_vals[bootstrap_draws], axis=0) - def broadcast_bootstrap_matrix(self): - rng = np.random.default_rng(seed=self.config.seed) - # Only one of the nodes needs to produce the bootstrap indices - ngal = self._input_length - print('i am the rank with number of galaxies',self.rank,ngal) - if self.rank == 0: - bootstrap_matrix = rng.integers(low=0, high=ngal, size=(ngal,self.config.nsamples)) - else: # pragma: no cover - bootstrap_matrix = None - if self.comm is not None: # pragma: no cover - self.comm.Barrier() - bootstrap_matrix = self.comm.bcast(bootstrap_matrix, root = 0) - return bootstrap_matrix - - def join_histograms(self, bvals, yvals): - bvals_r = self.comm.reduce(bvals) - yvals_r = self.comm.reduce(yvals) - return(bvals_r, yvals_r) diff --git a/src/rail/estimation/algos/point_est_hist.py b/src/rail/estimation/algos/point_est_hist.py index c20270f7..43ecae3a 100644 --- a/src/rail/estimation/algos/point_est_hist.py +++ b/src/rail/estimation/algos/point_est_hist.py @@ -46,23 +46,40 @@ def __init__(self, args, comm=None): self.zgrid = None self.bincents = None + def run(self): - rng = np.random.default_rng(seed=self.config.seed) - test_data = self.get_data('input') - npdf = test_data.npdf - zb = test_data.ancil['zmode'] - nsamp = self.config.nsamples + iterator = self.input_iterator('input') self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins + 1) self.bincents = 0.5 * (self.zgrid[1:] + self.zgrid[:-1]) - single_hist = np.histogram(test_data.ancil[self.config.point_estimate], bins=self.zgrid)[0] - qp_d = qp.Ensemble(qp.hist, + bootstrap_matrix = self._broadcast_bootstrap_matrix() + # Initiallizing the histograms + single_hist = np.zeros(self.config.nzbins) + hist_vals = np.zeros((self.config.nsamples, self.config.nzbins)) + + first = True + for s, e, test_data in iterator: + print(f"Process {self.rank} running estimator on chunk {s} - {e}") + self._process_chunk(s, e, test_data, first, bootstrap_matrix, single_hist, hist_vals) + first = False + if self.comm is not None: # pragma: no cover + hist_vals, single_hist = self._join_histograms(hist_vals, single_hist) + + if self.rank == 0: + sample_ens = qp.Ensemble(qp.hist, + data=dict(bins=self.zgrid, pdfs=np.atleast_2d(hist_vals))) + qp_d = qp.Ensemble(qp.hist, data=dict(bins=self.zgrid, pdfs=np.atleast_2d(single_hist))) - hist_vals = np.empty((nsamp, self.config.nzbins)) - for i in range(nsamp): - bootstrap_indeces = rng.integers(low=0, high=npdf, size=npdf) + self.add_data('output', sample_ens) + self.add_data('single_NZ', qp_d) + + def _process_chunk(self, start, end, test_data, first, bootstrap_matrix, single_hist, hist_vals): + zb = test_data.ancil[self.config.point_estimate] + single_hist += np.histogram(zb, bins=self.zgrid)[0] + for i in range(self.config.nsamples): + bootstrap_indeces = bootstrap_matrix[:, i] + # Neither all of the bootstrap_draws are in this chunk nor the index starts at "start" + mask = (bootstrap_indeces>=start) & (bootstrap_indeces Date: Fri, 10 Nov 2023 20:41:45 -0600 Subject: [PATCH 2/4] Var inf working in parallel, still need to fix the error message when nprocess is not large enough --- src/rail/core/stage.py | 17 ++++++--- src/rail/estimation/algos/var_inf.py | 57 +++++++++++++++++++++------- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/rail/core/stage.py b/src/rail/core/stage.py index 5e6d5653..721bb820 100644 --- a/src/rail/core/stage.py +++ b/src/rail/core/stage.py @@ -337,7 +337,7 @@ def input_iterator(self, tag, **kwargs): if handle.path and handle.path!='None': self._input_length = handle.size(groupname=self.config.hdf5_groupname) total_chunks_needed = ceil(self._input_length/self.config.chunk_size) - # If the number of process is larger than we need, we wemove some of them + # If the number of process is larger than we need, we remove some of them if total_chunks_needed < self.size: #pragma: no cover color = self.rank+1 <= total_chunks_needed newcomm = self.comm.Split(color=color,key=self.rank) @@ -358,10 +358,17 @@ def input_iterator(self, tag, **kwargs): test_data = self.get_data('input')[self.config.hdf5_groupname] else: test_data = self.get_data('input') - max_l = 0 - for k, v in test_data.items(): - max_l = max(max_l, len(v)) - self._input_length = max_l + # max_l = 0 + # for k, v in test_data.items(): + # max_l = max(max_l, len(v)) + # self._input_length = max_l + try: + self._input_length = test_data.npdf + except: + max_l = 0 + for k, v in test_data.items(): + max_l = max(max_l, len(v)) + self._input_length = max_l s = 0 iterator=[[s, self._input_length, test_data]] return iterator diff --git a/src/rail/estimation/algos/var_inf.py b/src/rail/estimation/algos/var_inf.py index c210337a..99d91fe4 100644 --- a/src/rail/estimation/algos/var_inf.py +++ b/src/rail/estimation/algos/var_inf.py @@ -64,28 +64,57 @@ def __init__(self, args, comm=None): self.zgrid = None def run(self): - rng = np.random.default_rng(seed=self.config.seed) - test_data = self.get_data('input') + iterator = self.input_iterator('input') self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins) - pdf_vals = test_data.pdf(self.zgrid) - log_pdf_vals = np.log(np.array(pdf_vals) + TEENY) + first = True + for s, e, test_data in iterator: + self.check_number_of_iterations(e,s) + print(f"Process {self.rank} running estimator on chunk {s} - {e}") + alpha_trace = self._process_chunk(s, e, test_data, first) + first = False + + if self.rank == 0: + # old way of just spitting out a single distribution + # qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace)) + # instead, sample and save the samples + rng = np.random.default_rng(seed=self.config.seed) + sample_pz = dirichlet.rvs(alpha_trace, size=self.config.nsamples, random_state=rng) + qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace)) + + sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=sample_pz)) + self.add_data('output', sample_ens) + self.add_data('single_NZ', qp_d) + + + def _process_chunk(self, start, end, test_data, first): + if not first: + raise ValueError(f"This algorithm needs all data in memory at once, increase nprocess or chunk size.") + + # Initiallizing arrays alpha_trace = np.ones(len(self.zgrid)) init_trace = np.ones(len(self.zgrid)) - + pdf_vals = test_data.pdf(self.zgrid) + log_pdf_vals = np.log(np.array(pdf_vals) + TEENY) for _ in range(self.config.niter): dig = np.array([digamma(kk) - digamma(np.sum(alpha_trace)) for kk in alpha_trace]) matrix_grid = np.exp(dig + log_pdf_vals) gamma_matrix = np.array([kk / np.sum(kk) for kk in matrix_grid]) - nk = np.sum(gamma_matrix, axis=0) + for kk in matrix_grid: + break + nk_partial = np.sum(gamma_matrix, axis=0) + if self.comm is not None: # pragma: no cover + nk = self.comm.allreduce(nk_partial) + else: + nk = nk_partial alpha_trace = nk + init_trace + return(alpha_trace) + - # old way of just spitting out a single distribution - # qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace)) - # instead, sample and save the samples - sample_pz = dirichlet.rvs(alpha_trace, size=self.config.nsamples, random_state=rng) - qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace)) + def check_number_of_iterations(self,e,s): + if self.comm is not None and self.rank == 0: + total_chunks_needed = self._input_length/(e-s) + if total_chunks_needed > self.size: + iteration_number = np.ceil(total_chunks_needed/self.size) + raise ValueError(f"This algorithm needs all data in memory at once, currentlly it would need {iteration_number} steps. Increase nprocess or chunk size.") - sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=sample_pz)) - self.add_data('output', sample_ens) - self.add_data('single_NZ', qp_d) From bb38a6c6eaad1d8305431d4ac3096d59cdd936df Mon Sep 17 00:00:00 2001 From: Josue De-Santiago Date: Wed, 15 Nov 2023 19:31:50 -0600 Subject: [PATCH 3/4] The chunks are redefined to distribute all data into memory at once --- src/rail/core/stage.py | 17 +++++------------ src/rail/estimation/algos/var_inf.py | 23 +++++++++++++---------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/rail/core/stage.py b/src/rail/core/stage.py index 721bb820..5e6d5653 100644 --- a/src/rail/core/stage.py +++ b/src/rail/core/stage.py @@ -337,7 +337,7 @@ def input_iterator(self, tag, **kwargs): if handle.path and handle.path!='None': self._input_length = handle.size(groupname=self.config.hdf5_groupname) total_chunks_needed = ceil(self._input_length/self.config.chunk_size) - # If the number of process is larger than we need, we remove some of them + # If the number of process is larger than we need, we wemove some of them if total_chunks_needed < self.size: #pragma: no cover color = self.rank+1 <= total_chunks_needed newcomm = self.comm.Split(color=color,key=self.rank) @@ -358,17 +358,10 @@ def input_iterator(self, tag, **kwargs): test_data = self.get_data('input')[self.config.hdf5_groupname] else: test_data = self.get_data('input') - # max_l = 0 - # for k, v in test_data.items(): - # max_l = max(max_l, len(v)) - # self._input_length = max_l - try: - self._input_length = test_data.npdf - except: - max_l = 0 - for k, v in test_data.items(): - max_l = max(max_l, len(v)) - self._input_length = max_l + max_l = 0 + for k, v in test_data.items(): + max_l = max(max_l, len(v)) + self._input_length = max_l s = 0 iterator=[[s, self._input_length, test_data]] return iterator diff --git a/src/rail/estimation/algos/var_inf.py b/src/rail/estimation/algos/var_inf.py index 99d91fe4..9e8b9ff1 100644 --- a/src/rail/estimation/algos/var_inf.py +++ b/src/rail/estimation/algos/var_inf.py @@ -64,11 +64,22 @@ def __init__(self, args, comm=None): self.zgrid = None def run(self): + # Redefining the chunk size so that all of the data is distributed at once in the + # nodes. This would fill all the memory if not enough nodes are allocated + + input_data = self.get_handle('input', allow_missing=True) + try: + self.config.hdf5_groupname + except: + self.config.hdf5_groupname = None + input_length = input_data.size(groupname=self.config.hdf5_groupname) + self.config.chunk_size = np.ceil(input_length/self.size) + + iterator = self.input_iterator('input') self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins) first = True for s, e, test_data in iterator: - self.check_number_of_iterations(e,s) print(f"Process {self.rank} running estimator on chunk {s} - {e}") alpha_trace = self._process_chunk(s, e, test_data, first) first = False @@ -87,7 +98,7 @@ def run(self): def _process_chunk(self, start, end, test_data, first): - if not first: + if not first: #pragma: no cover raise ValueError(f"This algorithm needs all data in memory at once, increase nprocess or chunk size.") # Initiallizing arrays @@ -110,11 +121,3 @@ def _process_chunk(self, start, end, test_data, first): return(alpha_trace) - - def check_number_of_iterations(self,e,s): - if self.comm is not None and self.rank == 0: - total_chunks_needed = self._input_length/(e-s) - if total_chunks_needed > self.size: - iteration_number = np.ceil(total_chunks_needed/self.size) - raise ValueError(f"This algorithm needs all data in memory at once, currentlly it would need {iteration_number} steps. Increase nprocess or chunk size.") - From 382217e9afd22b995c440ca18e407ec1ad188391 Mon Sep 17 00:00:00 2001 From: Josue De-Santiago Date: Fri, 17 Nov 2023 12:44:23 -0600 Subject: [PATCH 4/4] add pragma no cover --- src/rail/estimation/summarizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rail/estimation/summarizer.py b/src/rail/estimation/summarizer.py index a78b8f5f..c3e01b12 100644 --- a/src/rail/estimation/summarizer.py +++ b/src/rail/estimation/summarizer.py @@ -122,7 +122,7 @@ def _broadcast_bootstrap_matrix(self): bootstrap_matrix = self.comm.bcast(bootstrap_matrix, root = 0) return bootstrap_matrix - def _join_histograms(self, bvals, yvals): + def _join_histograms(self, bvals, yvals):#pragma: no cover bvals_r = self.comm.reduce(bvals) yvals_r = self.comm.reduce(yvals) return(bvals_r, yvals_r)