Skip to content

Commit

Permalink
WIP - Parallelization of DistToPoint metrics (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
drewoldag authored Mar 7, 2024
1 parent 96b3e13 commit 02ceed8
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 69 deletions.
8 changes: 8 additions & 0 deletions src/qp/metrics/base_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class DistToPointMetric(BaseMetric):
def evaluate(self, estimate, reference):
raise NotImplementedError()

def initialize(self): #pragma: no cover
pass

def accumulate(self, estimate, reference): #pragma: no cover
raise NotImplementedError()

def finalize(self): #pragma: no cover
raise NotImplementedError()

class PointToPointMetric(BaseMetric):
"""A base class for metrics that require a point estimate as input for both
Expand Down
10 changes: 10 additions & 0 deletions src/qp/metrics/brier.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ def evaluate(self):
self._validate_data()
return self._calculate_metric()

def accumulate(self):
self._manipulate_data()
self._validate_data()
return self._calculate_metric_for_accumulation()

def _manipulate_data(self):
"""
Placeholder for data manipulation as required. i.e. converting from
Expand Down Expand Up @@ -95,3 +100,8 @@ def _calculate_metric(self):
return np.mean(
np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation)
)

def _calculate_metric_for_accumulation(self):
return np.sum(
np.sum((self._prediction - self._truth) ** 2, axis=self._axis_for_summation)
)
114 changes: 105 additions & 9 deletions src/qp/metrics/concrete_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import numpy as np

from qp.ensemble import Ensemble
from qp.metrics.base_metric_classes import (
MetricOutputType,
DistToDistMetric,
Expand All @@ -11,6 +10,7 @@
)
from qp.metrics.metrics import (
calculate_brier,
calculate_brier_for_accumulation,
calculate_goodness_of_fit,
calculate_kld,
calculate_moment,
Expand All @@ -20,11 +20,56 @@
)
from qp.metrics.pit import PIT

from pytdigest import TDigest
from functools import reduce
from operator import add


class DistToPointMetricDigester(DistToPointMetric):

def __init__(self, tdigest_compression: int = 1000, **kwargs) -> None:
super().__init__(**kwargs)
self._tdigest_compression = tdigest_compression

def initialize(self):
pass

def accumulate(self, estimate, reference): #pragma: no cover
raise NotImplementedError()

def finalize(self, centroids: np.ndarray = []):
"""This function combines all the centroids that were calculated for the
input estimate and reference subsets and returns the resulting TDigest
object.
Parameters
----------
centroids : Numpy 2d array, optional
The output collected from prior calls to `accumulate`, by default []
Returns
-------
float
The result of the specific metric calculation defined in the subclasses
`compute_from_digest` method.
"""
digests = (
TDigest.of_centroids(np.array(centroid), compression=self._tdigest_compression)
for centroid in centroids
)
digest = reduce(add, digests)

return self.compute_from_digest(digest)

def compute_from_digest(self, digest): #pragma: no cover
raise NotImplementedError()


class MomentMetric(SingleEnsembleMetric):
"""Class wrapper around the `calculate_moment` function."""

metric_name = "moment"
metric_output_type = MetricOutputType.one_value_per_distribution

def __init__(
self,
Expand Down Expand Up @@ -80,7 +125,7 @@ def evaluate(self, estimate) -> list:


#! Should this be implemented as `DistToPointMetric` or `DistToDistMetric` ???
class BrierMetric(DistToPointMetric):
class BrierMetric(DistToPointMetricDigester):
"""Class wrapper around the calculate_brier function. (Which itself is a
wrapper around the `Brier` metric evaluator class).
"""
Expand All @@ -89,11 +134,23 @@ class BrierMetric(DistToPointMetric):
metric_output_type = MetricOutputType.one_value_per_distribution

def __init__(self, limits: tuple = (0.0, 3.0), dx: float = 0.01, **kwargs) -> None:
super().__init__(limits, dx)
kwargs.update({"limits": limits, "dx": dx})
super().__init__(**kwargs)

def evaluate(self, estimate, reference) -> list:
return calculate_brier(estimate, reference, self._limits, self._dx)

def accumulate(self, estimate, reference):
brier_sum_npdf_tuple = calculate_brier_for_accumulation(estimate, reference, self._limits, self._dx)
return brier_sum_npdf_tuple

def finalize(self, tuples):
# tuples is a list of tuples. The first value in the tuple is the Brier sum
# The second value is the number of PDFs
summed_terms = np.sum(np.atleast_2d(tuples), axis=0)

# calculate the mean from the summed terms
return summed_terms[0] / summed_terms[1]

class OutlierMetric(SingleEnsembleMetric):
"""Class wrapper around the outlier calculation metric."""
Expand Down Expand Up @@ -204,24 +261,47 @@ def evaluate(self, estimate, reference) -> list:
)


#! Confirm metric output type - perhaps a new type is appropriate ???
class PITMetric(DistToPointMetric):
class PITMetric(DistToPointMetricDigester):
"""Class wrapper for the PIT Metric class."""

metric_name = "pit"
metric_output_type = MetricOutputType.single_distribution
default_eval_grid = np.linspace(0, 1, 100)

def __init__(self, eval_grid: list = default_eval_grid, **kwargs) -> None:
super().__init__()
super().__init__(**kwargs)
self._eval_grid = eval_grid

def evaluate(self, estimate, reference) -> Ensemble:
def evaluate(self, estimate, reference):
pit_object = PIT(estimate, reference, self._eval_grid)
return pit_object.pit


class CDELossMetric(DistToPointMetric):
def accumulate(self, estimate, reference):
pit_samples = PIT(estimate, reference, self._eval_grid)._gather_pit_samples(estimate, reference)
digest = TDigest.compute(pit_samples, compression=self._tdigest_compression)
centroids = digest.get_centroids()
return centroids

def compute_from_digest(self, digest):
# Since we use equal weights for all the values in the digest
# digest.weight is the total number of values, it is stored as a float,
# so we cast to int.
eval_grid = self._eval_grid
total_samples = int(digest.weight)
n_pit = np.min([total_samples, len(eval_grid)])
if n_pit < len(eval_grid): # pragma: no cover
#! TODO: Determine what the appropriate style of logging is going to be for metrics.
print(
"Number of pit samples is smaller than the evaluation grid size. "
"Will create a new evaluation grid with size = number of pit samples"
)
eval_grid = np.linspace(0, 1, n_pit)

data_quants = digest.inverse_cdf(eval_grid)
return PIT._produce_output_ensemble(data_quants, eval_grid)


class CDELossMetric(DistToPointMetricDigester):
"""Conditional density loss"""

metric_name = "cdeloss"
Expand All @@ -248,3 +328,19 @@ def evaluate(self, estimate, reference):
term2 = np.mean(pdfs[range(npdf), nns])
cdeloss = term1 - 2 * term2
return cdeloss

def accumulate(self, estimate, reference):
pdfs = estimate.pdf(self._xvals)
npdf = estimate.npdf
term1_sum = np.sum(np.trapz(pdfs**2, x=self._xvals))

nns = [np.argmin(np.abs(self._xvals - z)) for z in reference]
term2_sum = np.sum(pdfs[range(npdf), nns])

return (term1_sum, term2_sum, npdf)

def finalize(self, tuples):
summed_terms = np.sum(np.atleast_2d(tuples), axis=0)
term1 = summed_terms[0] / summed_terms[2]
term2 = summed_terms[1] / summed_terms[2]
return term1 - 2 * term2
63 changes: 36 additions & 27 deletions src/qp/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,32 +217,7 @@ def evaluate_pdf_at_z(z, dist):
return np.array(rbpes)


def calculate_brier(p, truth, limits, dx=0.01):
"""This function will do the following:
1) Generate a Mx1 sized grid based on `limits` and `dx`.
2) Produce an NxM array by evaluating the pdf for each of the N distribution objects
in the Ensemble p on the grid.
3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1.
4) Create a Brier metric evaluation object
5) Return the result of the Brier metric calculation.
Parameters
----------
p: qp.Ensemble object
of N distributions probability distribution functions that will be gridded and compared against truth.
truth: Nx1 sequence
the list of true values, 1 per distribution in p.
limits: 2-tuple of floats
endpoints grid to evaluate the PDFs for the distributions in p
dx: float
resolution of the grid Defaults to 0.01.
Returns
-------
Brier_metric: float
"""

def _prepare_for_brier(p, truth, limits, dx=0.01):
# Ensure that the number of distributions objects in the Ensemble
# is equal to the length of the truth array
if p.npdf != len(truth):
Expand Down Expand Up @@ -270,11 +245,45 @@ def calculate_brier(p, truth, limits, dx=0.01):
truth_array = np.squeeze([np.histogram(t, grid.hist_bin_edges)[0] for t in truth])

# instantiate the Brier metric object
brier_metric_evaluation = Brier(pdf_values, truth_array)
return Brier(pdf_values, truth_array)

def calculate_brier(p, truth, limits, dx=0.01):
"""This function will do the following:
1) Generate a Mx1 sized grid based on `limits` and `dx`.
2) Produce an NxM array by evaluating the pdf for each of the N distribution objects
in the Ensemble p on the grid.
3) Produce an NxM truth_array using the input truth and the generated grid. All values will be 0 or 1.
4) Create a Brier metric evaluation object
5) Return the result of the Brier metric calculation.
Parameters
----------
p: qp.Ensemble object
of N distributions probability distribution functions that will be gridded and compared against truth.
truth: Nx1 sequence
the list of true values, 1 per distribution in p.
limits: 2-tuple of floats
endpoints grid to evaluate the PDFs for the distributions in p
dx: float
resolution of the grid Defaults to 0.01.
Returns
-------
Brier_metric: float
"""

brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx)

# return the results of evaluating the Brier metric
return brier_metric_evaluation.evaluate()

def calculate_brier_for_accumulation(p, truth, limits, dx=0.01):
brier_metric_evaluation = _prepare_for_brier(p, truth, limits, dx)

brier_sum = brier_metric_evaluation.accumulate()

return (brier_sum, p.npdf)

@deprecated(
reason="""
Expand Down
65 changes: 34 additions & 31 deletions src/qp/metrics/pit.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,8 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS):
eval_grid : [float], optional
A strictly increasing array-like sequence in the range [0,1], by default DEFAULT_QUANTS
"""
self._true_vals = true_vals

# For each distribution in the Ensemble, calculate the CDF where x = known_true_value
self._pit_samps = np.array(
[
qp_ens[i].cdf(self._true_vals[i])[0][0]
for i in range(len(self._true_vals))
]
)

# These two lines set all `NaN` values to 0. This may or may not make sense
# Alternatively if it's better to simply remove the `NaN`, this can be done
# efficiently on line 61 with `data_quants = np.nanquantile(...)`.`
samp_mask = np.isfinite(self._pit_samps)
self._pit_samps[~samp_mask] = 0
if not np.all(samp_mask): #pragma: no cover
logging.warning(
"Some PIT samples were `NaN`. They have been replacd with 0."
)
self._pit_samps = self._gather_pit_samples(qp_ens, true_vals)

n_pit = np.min([len(self._pit_samps), len(eval_grid)])
if n_pit < len(eval_grid):
Expand All @@ -72,19 +55,7 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS):

data_quants = np.quantile(self._pit_samps, eval_grid)

# Remove duplicates values as well as values outside the range (0,1)
_, unique_indices = np.unique(data_quants, return_index=True)
unique_data_quants = data_quants[unique_indices]
unique_eval_grid = eval_grid[unique_indices]
quant_mask = self._create_quant_mask(unique_data_quants)

self._pit = qp.Ensemble(
qp.quant,
data=dict(
quants=unique_eval_grid[quant_mask],
locs=np.atleast_2d(unique_data_quants[quant_mask]),
),
)
self._pit = self._produce_output_ensemble(data_quants, eval_grid)

@property
def pit_samps(self):
Expand Down Expand Up @@ -201,6 +172,38 @@ def evaluate_PIT_outlier_rate(self, pit_min=0.0001, pit_max=0.9999):
"""
return calculate_outlier_rate(self._pit, pit_min, pit_max)[0]

@classmethod
def _gather_pit_samples(cls, qp_ens, true_vals):
pit_samples = np.squeeze(qp_ens.cdf(np.vstack(true_vals)))

# These two lines set all `NaN` values to 0. This may or may not make sense
# Alternatively if it's better to simply remove the `NaN`, this can be done
# efficiently on line 61 with `data_quants = np.nanquantile(...)`.`
sample_mask = np.isfinite(pit_samples)
pit_samples[~sample_mask] = 0
if not np.all(sample_mask): #pragma: no cover
logging.warning(
"Some PIT samples were `NaN`. They have been replacd with 0."
)

return pit_samples

@classmethod
def _produce_output_ensemble(cls, data_quants, eval_grid):
# Remove duplicates values as well as values outside the range (0,1)
_, unique_indices = np.unique(data_quants, return_index=True)
unique_data_quants = data_quants[unique_indices]
unique_eval_grid = eval_grid[unique_indices]
quant_mask = cls._create_quant_mask(unique_data_quants)

return qp.Ensemble(
qp.quant,
data=dict(
quants=unique_eval_grid[quant_mask],
locs=np.atleast_2d(unique_data_quants[quant_mask]),
),
)

@classmethod
def _create_quant_mask(cls, data_quants):
"""Create a numpy mask such that, when applied only values greater than
Expand Down
Loading

0 comments on commit 02ceed8

Please sign in to comment.