Skip to content

Commit

Permalink
Merge pull request #8434 from gem/rup_mutex
Browse files Browse the repository at this point in the history
Storing rup_mutex and removing ctxt.weight
  • Loading branch information
micheles authored Feb 17, 2023
2 parents 0bf99e0 + 96ee7e8 commit 1f78b55
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
41 changes: 22 additions & 19 deletions openquake/hazardlib/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ def __init__(self, trt, gsims, oq, monitor=Monitor(), extraparams=()):
dic['rup_id'] = U32(0)
dic['sids'] = U32(0)
dic['rrup'] = numpy.float64(0)
dic['weight'] = numpy.float64(0)
dic['occurrence_rate'] = numpy.float64(0)
self.defaultdict = dic
self.shift_hypo = param.get('shift_hypo')
Expand Down Expand Up @@ -581,8 +580,6 @@ def recarray(self, ctxs):
for par in dd:
if par == 'rup_id':
val = getattr(ctx, par)
elif par == 'weight':
val = getattr(ctx, par, 0.)
else:
val = getattr(ctx, par, numpy.nan)
getattr(ra, par)[slc] = val
Expand Down Expand Up @@ -959,7 +956,7 @@ def max_intensity(self, sitecol1, mags, dists):
return gmv

# not used by the engine, is is meant for notebooks
def get_poes(self, srcs, sitecol, rup_indep=True, collapse_level=-1):
def get_poes(self, srcs, sitecol, rup_mutex={}, collapse_level=-1):
"""
:param srcs: a list of sources with the same TRT
:param sitecol: a SiteCollection instance with N sites
Expand All @@ -968,7 +965,7 @@ def get_poes(self, srcs, sitecol, rup_indep=True, collapse_level=-1):
self.collapser.cfactor = numpy.zeros(3)
ctxs = self.from_srcs(srcs, sitecol)
with patch.object(self.collapser, 'collapse_level', collapse_level):
return self.get_pmap(ctxs, rup_indep).array
return self.get_pmap(ctxs, rup_mutex).array

def _gen_poes(self, ctx):
from openquake.hazardlib.site_amplification import get_poes_site
Expand Down Expand Up @@ -1011,24 +1008,28 @@ def gen_poes(self, ctx, rup_indep=True):
poes = numpy.concatenate(list(self._gen_poes(kctx)))
yield poes, ctxt, invs

def get_pmap(self, ctxs, rup_indep=True):
def get_pmap(self, ctxs, rup_mutex={}):
"""
:param ctxs: a list of context arrays (only one for poissonian ctxs)
:param rup_indep: default True
:param rup_mutex: dictionary of weights (default empty)
:returns: a ProbabilityMap
"""
rup_indep = not rup_mutex
sids = numpy.unique(ctxs[0].sids)
pmap = ProbabilityMap(sids, size(self.imtls), len(self.gsims))
pmap.fill(rup_indep)
self.update(pmap, ctxs, rup_indep)
self.update(pmap, ctxs, rup_mutex)
return ~pmap if rup_indep else pmap

def update(self, pmap, ctxs, rup_indep=True):
def update(self, pmap, ctxs, rup_mutex={}):
"""
:param pmap: probability map to update
:param ctxs: a list of context arrays (only one for parametric ctxs)
:param rup_indep: False for mutex ruptures, default True
:param rup_mutex: dictionary (src_id, rup_id) -> weight
The rup_mutex dictionary is read-only and normally empty
"""
rup_indep = len(rup_mutex) == 0
if self.tom is None:
itime = -1. # test_hazard_curve_X
elif isinstance(self.tom, FatedTOM):
Expand All @@ -1046,7 +1047,7 @@ def update(self, pmap, ctxs, rup_indep=True):
for ctx in ctxs:
for poes, ctxt, invs in cm.gen_poes(ctx, rup_indep):
with self.pne_mon:
pmap.update_(poes, invs, ctxt, itime, rup_indep, idx)
pmap.update_(poes, invs, ctxt, itime, rup_mutex, idx)

# called by gen_poes and by the GmfComputer
def get_mean_stds(self, ctxs, split_by_mag=False):
Expand Down Expand Up @@ -1238,6 +1239,13 @@ def __init__(self, cmaker, srcfilter, group):
self.sources = group
self.src_mutex = getattr(group, 'src_interdep', None) == 'mutex'
self.rup_indep = getattr(group, 'rup_interdep', None) != 'mutex'
if self.rup_indep:
self.rup_mutex = {}
else:
self.rup_mutex = {} # src_id, rup_id -> rup_weight
for src in group:
for i, (rup, _) in enumerate(src.data):
self.rup_mutex[src.id, i] = rup.weight
self.fewsites = self.N <= cmaker.max_sites_disagg
if hasattr(group, 'grp_probability'):
self.grp_probability = group.grp_probability
Expand Down Expand Up @@ -1265,11 +1273,6 @@ def gen_ctxs(self, src):
with self.cmaker.delta_mon:
delta = self.cmaker.deltagetter(src.id)
ctx.occurrence_rate += delta[ctx.rup_id]
if hasattr(src, 'mutex_weight'):
if ctx.weight.any():
ctx['weight'] *= src.mutex_weight
else:
ctx['weight'] = src.mutex_weight
if self.fewsites: # keep rupdata in memory (before collapse)
self.rupdata.append(ctx)
yield ctx
Expand All @@ -1291,11 +1294,11 @@ def _make_src_indep(self, pmap):
totlen += len(ctx)
allctxs.append(ctx)
if ctxlen > maxsize:
cm.update(pmap, concat(allctxs), self.rup_indep)
cm.update(pmap, concat(allctxs), self.rup_mutex)
allctxs.clear()
ctxlen = 0
if allctxs:
cm.update(pmap, concat(allctxs), self.rup_indep)
cm.update(pmap, concat(allctxs), self.rup_mutex)
allctxs.clear()
dt = time.time() - t0
nsrcs = len(self.sources)
Expand Down Expand Up @@ -1323,7 +1326,7 @@ def _make_src_mutex(self, pmap):
nctxs = len(ctxs)
nsites = sum(len(ctx) for ctx in ctxs)
if nsites:
cm.update(pm, ctxs, self.rup_indep)
cm.update(pm, ctxs, self.rup_mutex)
if hasattr(src, 'mutex_weight'):
arr = 1. - pm.array if self.rup_indep else pm.array
p = pm.new(arr * src.mutex_weight)
Expand Down
13 changes: 7 additions & 6 deletions openquake/hazardlib/probability_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,22 +293,23 @@ def update(self, other, i):
other.array[:, :, i])
return self

def update_(self, poes, invs, ctxt, itime, rup_indep, idx):
def update_(self, poes, invs, ctxt, itime, mutex_weight, idx):
"""
Update probabilities
"""
rates = ctxt.occurrence_rate
probs_occur = getattr(ctxt, 'probs_occur',
numpy.zeros((len(ctxt), 0)))
probs_occur = getattr(ctxt, 'probs_occur', numpy.zeros((len(ctxt), 0)))
idxs = self.sidx[ctxt.sids]
for i, x in enumerate(idx):
if rup_indep:
if len(mutex_weight) == 0: # indep
update_pmap_i(self.array[:, :, x], poes[:, :, i], invs, rates,
probs_occur, idxs, itime)
else: # mutex
weights = [mutex_weight[src_id, rup_id]
for src_id, rup_id in zip(ctxt.src_id, ctxt.rup_id)]
update_pmap_m(self.array[:, :, x], poes[:, :, i],
invs, rates, probs_occur, ctxt.weight,
idxs, itime)
invs, rates, probs_occur,
numpy.array(weights), idxs, itime)

def __invert__(self):
return self.new(1. - self.array)
Expand Down
16 changes: 16 additions & 0 deletions openquake/hazardlib/source_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ def mutex_by_grp(src_groups):
return numpy.array(lst, [('src_mutex', bool), ('rup_mutex', bool)])


def build_rup_mutex(src_groups):
"""
:returns: a composite array with fields (grp_id, src_id, rup_id, weight)
"""
lst = []
dtlist = [('grp_id', numpy.uint16), ('src_id', numpy.uint32),
('rup_id', numpy.uint32), ('weight', numpy.float64)]
for sg in src_groups:
if sg.rup_interdep == 'mutex':
for src in sg:
for i, (rup, _) in enumerate(src.data):
lst.append((src.grp_id, src.id, i, rup.weight))
return numpy.array(lst, dtlist)


def create_source_info(csm, h5):
"""
Creates source_info, source_wkt, trt_smrs, toms
Expand Down Expand Up @@ -108,6 +123,7 @@ def create_source_info(csm, h5):
# avoid hdf5 damned bug by creating source_info in advance
h5.create_dataset('source_info', (num_srcs,), source_info_dt)
h5['mutex_by_grp'] = mutex_by_grp(csm.src_groups)
h5['rup_mutex'] = build_rup_mutex(csm.src_groups)
h5['source_wkt'] = numpy.array(wkts, hdf5.vstr)


Expand Down

0 comments on commit 1f78b55

Please sign in to comment.