Skip to content

Commit

Permalink
Merge pull request #77 from LSSTDESC/46-parallelize-var_inf
Browse files Browse the repository at this point in the history
46 parallelize var inf
  • Loading branch information
joselotl authored Nov 20, 2023
2 parents 263b2a6 + 382217e commit 499c244
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 51 deletions.
24 changes: 3 additions & 21 deletions src/rail/estimation/algos/naive_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def run(self):
self.add_data('model', np.array([None]))

class NaiveStackSummarizer(PZSummarizer):
"""Summarizer which simply histograms a point estimate
"""Summarizer which stacks individual P(z)
"""

name = 'NaiveStackSummarizer'
Expand All @@ -49,15 +49,15 @@ def run(self):
# Initiallizing the stacking pdf's
yvals = np.zeros((1, len(self.zgrid)))
bvals = np.zeros((self.config.nsamples, len(self.zgrid)))
bootstrap_matrix = self.broadcast_bootstrap_matrix()
bootstrap_matrix = self._broadcast_bootstrap_matrix()

first = True
for s, e, test_data in iterator:
print(f"Process {self.rank} running estimator on chunk {s} - {e}")
self._process_chunk(s, e, test_data, first, bootstrap_matrix, yvals, bvals)
first = False
if self.comm is not None: # pragma: no cover
bvals, yvals = self.join_histograms(bvals, yvals)
bvals, yvals = self._join_histograms(bvals, yvals)

if self.rank == 0:
sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=bvals))
Expand All @@ -77,24 +77,6 @@ def _process_chunk(self, start, end, data, first, bootstrap_matrix, yvals, bval
bootstrap_draws = bootstrap_draws[mask] - start
bvals[i] += np.sum(pdf_vals[bootstrap_draws], axis=0)

def broadcast_bootstrap_matrix(self):
rng = np.random.default_rng(seed=self.config.seed)
# Only one of the nodes needs to produce the bootstrap indices
ngal = self._input_length
print('i am the rank with number of galaxies',self.rank,ngal)
if self.rank == 0:
bootstrap_matrix = rng.integers(low=0, high=ngal, size=(ngal,self.config.nsamples))
else: # pragma: no cover
bootstrap_matrix = None
if self.comm is not None: # pragma: no cover
self.comm.Barrier()
bootstrap_matrix = self.comm.bcast(bootstrap_matrix, root = 0)
return bootstrap_matrix

def join_histograms(self, bvals, yvals):
bvals_r = self.comm.reduce(bvals)
yvals_r = self.comm.reduce(yvals)
return(bvals_r, yvals_r)



Expand Down
47 changes: 32 additions & 15 deletions src/rail/estimation/algos/point_est_hist.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,40 @@ def __init__(self, args, comm=None):
self.zgrid = None
self.bincents = None


def run(self):
rng = np.random.default_rng(seed=self.config.seed)
test_data = self.get_data('input')
npdf = test_data.npdf
zb = test_data.ancil['zmode']
nsamp = self.config.nsamples
iterator = self.input_iterator('input')
self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins + 1)
self.bincents = 0.5 * (self.zgrid[1:] + self.zgrid[:-1])
single_hist = np.histogram(test_data.ancil[self.config.point_estimate], bins=self.zgrid)[0]
qp_d = qp.Ensemble(qp.hist,
bootstrap_matrix = self._broadcast_bootstrap_matrix()
# Initiallizing the histograms
single_hist = np.zeros(self.config.nzbins)
hist_vals = np.zeros((self.config.nsamples, self.config.nzbins))

first = True
for s, e, test_data in iterator:
print(f"Process {self.rank} running estimator on chunk {s} - {e}")
self._process_chunk(s, e, test_data, first, bootstrap_matrix, single_hist, hist_vals)
first = False
if self.comm is not None: # pragma: no cover
hist_vals, single_hist = self._join_histograms(hist_vals, single_hist)

if self.rank == 0:
sample_ens = qp.Ensemble(qp.hist,
data=dict(bins=self.zgrid, pdfs=np.atleast_2d(hist_vals)))
qp_d = qp.Ensemble(qp.hist,
data=dict(bins=self.zgrid, pdfs=np.atleast_2d(single_hist)))
hist_vals = np.empty((nsamp, self.config.nzbins))
for i in range(nsamp):
bootstrap_indeces = rng.integers(low=0, high=npdf, size=npdf)
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)

def _process_chunk(self, start, end, test_data, first, bootstrap_matrix, single_hist, hist_vals):
zb = test_data.ancil[self.config.point_estimate]
single_hist += np.histogram(zb, bins=self.zgrid)[0]
for i in range(self.config.nsamples):
bootstrap_indeces = bootstrap_matrix[:, i]
# Neither all of the bootstrap_draws are in this chunk nor the index starts at "start"
mask = (bootstrap_indeces>=start) & (bootstrap_indeces<end)
bootstrap_indeces = bootstrap_indeces[mask] - start
zarr = zb[bootstrap_indeces]
hist_vals[i] = np.histogram(zarr, bins=self.zgrid)[0]
sample_ens = qp.Ensemble(qp.hist,
data=dict(bins=self.zgrid, pdfs=np.atleast_2d(hist_vals)))
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)
hist_vals[i] += np.histogram(zarr, bins=self.zgrid)[0]

62 changes: 47 additions & 15 deletions src/rail/estimation/algos/var_inf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,60 @@ def __init__(self, args, comm=None):
self.zgrid = None

def run(self):
rng = np.random.default_rng(seed=self.config.seed)
test_data = self.get_data('input')
# Redefining the chunk size so that all of the data is distributed at once in the
# nodes. This would fill all the memory if not enough nodes are allocated

input_data = self.get_handle('input', allow_missing=True)
try:
self.config.hdf5_groupname
except:
self.config.hdf5_groupname = None
input_length = input_data.size(groupname=self.config.hdf5_groupname)
self.config.chunk_size = np.ceil(input_length/self.size)


iterator = self.input_iterator('input')
self.zgrid = np.linspace(self.config.zmin, self.config.zmax, self.config.nzbins)
pdf_vals = test_data.pdf(self.zgrid)
log_pdf_vals = np.log(np.array(pdf_vals) + TEENY)
first = True
for s, e, test_data in iterator:
print(f"Process {self.rank} running estimator on chunk {s} - {e}")
alpha_trace = self._process_chunk(s, e, test_data, first)
first = False

if self.rank == 0:
# old way of just spitting out a single distribution
# qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace))
# instead, sample and save the samples
rng = np.random.default_rng(seed=self.config.seed)
sample_pz = dirichlet.rvs(alpha_trace, size=self.config.nsamples, random_state=rng)
qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace))

sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=sample_pz))
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)


def _process_chunk(self, start, end, test_data, first):
if not first: #pragma: no cover
raise ValueError(f"This algorithm needs all data in memory at once, increase nprocess or chunk size.")

# Initiallizing arrays
alpha_trace = np.ones(len(self.zgrid))
init_trace = np.ones(len(self.zgrid))

pdf_vals = test_data.pdf(self.zgrid)
log_pdf_vals = np.log(np.array(pdf_vals) + TEENY)
for _ in range(self.config.niter):
dig = np.array([digamma(kk) - digamma(np.sum(alpha_trace)) for kk in alpha_trace])
matrix_grid = np.exp(dig + log_pdf_vals)
gamma_matrix = np.array([kk / np.sum(kk) for kk in matrix_grid])
nk = np.sum(gamma_matrix, axis=0)
for kk in matrix_grid:
break
nk_partial = np.sum(gamma_matrix, axis=0)
if self.comm is not None: # pragma: no cover
nk = self.comm.allreduce(nk_partial)
else:
nk = nk_partial
alpha_trace = nk + init_trace
return(alpha_trace)

# old way of just spitting out a single distribution
# qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace))
# instead, sample and save the samples
sample_pz = dirichlet.rvs(alpha_trace, size=self.config.nsamples, random_state=rng)

qp_d = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=alpha_trace))

sample_ens = qp.Ensemble(qp.interp, data=dict(xvals=self.zgrid, yvals=sample_pz))
self.add_data('output', sample_ens)
self.add_data('single_NZ', qp_d)
19 changes: 19 additions & 0 deletions src/rail/estimation/summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,25 @@ def summarize(self, input_data):
return self.get_handle('output')


def _broadcast_bootstrap_matrix(self):
import numpy as np
rng = np.random.default_rng(seed=self.config.seed)
# Only one of the nodes needs to produce the bootstrap indices
ngal = self._input_length
if self.rank == 0:
bootstrap_matrix = rng.integers(low=0, high=ngal, size=(ngal,self.config.nsamples))
else: # pragma: no cover
bootstrap_matrix = None
if self.comm is not None: # pragma: no cover
self.comm.Barrier()
bootstrap_matrix = self.comm.bcast(bootstrap_matrix, root = 0)
return bootstrap_matrix

def _join_histograms(self, bvals, yvals):#pragma: no cover
bvals_r = self.comm.reduce(bvals)
yvals_r = self.comm.reduce(yvals)
return(bvals_r, yvals_r)

class SZPZSummarizer(RailStage):
"""The base class for classes that use two sets of data: a photometry sample with
spec-z values, and a photometry sample with unknown redshifts, e.g. minisom_som and
Expand Down

0 comments on commit 499c244

Please sign in to comment.