Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Per-input evaluator classes #74

Closed
wants to merge 8 commits into from
4 changes: 2 additions & 2 deletions src/rail/core/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ def input_iterator(self, tag, **kwargs):
# in a single chunk.
else: #pragma: no cover
if self.config.hdf5_groupname:
test_data = self.get_data('input')[self.config.hdf5_groupname]
test_data = self.get_data(tag)[self.config.hdf5_groupname]
else:
test_data = self.get_data('input')
test_data = self.get_data(tag)
max_l = 0
for k, v in test_data.items():
max_l = max(max_l, len(v))
Expand Down
79 changes: 79 additions & 0 deletions src/rail/evaluation/dist_to_dist_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import numpy as np

from ceci.config import StageParameter as Param
from qp.metrics.concrete_metric_classes import DistToDistMetric

from rail.core.data import Hdf5Handle, QPHandle
from rail.core.stage import RailStage
from rail.evaluation.evaluator import Evaluator

# dynamically build a dictionary of all available metrics of the appropriate type
METRIC_DICT = {}
for subcls in DistToDistMetric.__subclasses__():
METRIC_DICT[subcls.metric_name] = subcls

class DistToDistEvaluator(Evaluator):
"""Evaluate the performance of a photo-z estimator against reference PDFs"""

name = 'DistToDistEvaluator'
config_options = RailStage.config_options.copy()
config_options.update(
metrics=Param(list, [], required=False,
msg="The metrics you want to evaluate."),
chunk_size=Param(int, 1000, required=False,
msg="The default number of PDFs to evaluate per loop."),
limits=Param(tuple, (0.0, 3.0), required=False,
msg="The default end points for calculating metrics on a grid."),
dx=Param(float, 0.01, required=False,
msg="The default step size when calculating metrics on a grid."),
num_samples=Param(int, 100, required=False,
msg="The number of random samples to select for certain metrics."),
_random_state=Param(float, default=None, required=False,
msg="Random seed value to use for reproducible results."),
)
inputs = [('input', QPHandle),
('truth', QPHandle)]
outputs = [('output', Hdf5Handle)]

def __init__(self, args, comm=None):
Evaluator.__init__(self, args, comm=comm)
self._output_handle = None
self._metric_dict = METRIC_DICT

def run(self):
print(f"Requested metrics: {self.config.metrics}")

estimate_iterator = self.input_iterator('input')
reference_iterator = self.input_iterator('truth')

first = True
for estimate_data_chunk, reference_data_chunk in zip(estimate_iterator, reference_iterator):
chunk_start, chunk_end, estimate_data = estimate_data_chunk
_, _, reference_data = reference_data_chunk

print(f"Processing {self.rank} running evaluator on chunk {chunk_start} - {chunk_end}.")
self._process_chunk(chunk_start, chunk_end, estimate_data, reference_data, first)
first = False

self._output_handle.finalize_write()

def _process_chunk(self, start, end, estimate_data, reference_data, first):
out_table = {}
for metric in self.config.metrics:
if metric not in self._metric_dict:
#! Make the following a logged error instead of bailing out of the stage.
# raise ValueError(
# f"Unsupported metric requested: '{metric}'.
# Available metrics are: {self._metric_dict.keys()}")
continue

this_metric = self._metric_dict[metric](**self.config.to_dict())
out_table[metric] = this_metric.evaluate(estimate_data, reference_data)

out_table_to_write = {key: np.array(val).astype(float) for key, val in out_table.items()}

if first:
self._output_handle = self.add_handle('output', data=out_table_to_write)
self._output_handle.initialize_write(self._input_length, communicator=self.comm)
self._output_handle.set_data(out_table_to_write, partial=True)
self._output_handle.write_chunk(start, end)
100 changes: 100 additions & 0 deletions src/rail/evaluation/dist_to_point_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import numpy as np

from ceci.config import StageParameter as Param
from qp.metrics.base_metric_classes import MetricOutputType
from qp.metrics.concrete_metric_classes import DistToPointMetric

from rail.core.data import Hdf5Handle, QPHandle, TableHandle
from rail.core.stage import RailStage
from rail.evaluation.evaluator import Evaluator

# dynamically build a dictionary of all available metrics of the appropriate type
METRIC_DICT = {}
for subcls in DistToPointMetric.__subclasses__():
METRIC_DICT[subcls.metric_name] = subcls

class DistToPointEvaluator(Evaluator):
"""Evaluate the performance of a photo-z estimator against reference point estimate"""

name = 'DistToPointEvaluator'
config_options = RailStage.config_options.copy()
config_options.update(
metrics=Param(list, [], required=False,
msg="The metrics you want to evaluate."),
chunk_size=Param(int, 1000, required=False,
msg="The default number of PDFs to evaluate per loop."),
limits=Param(tuple, (0.0, 3.0), required=False,
msg="The default end points for calculating metrics on a grid."),
dx=Param(float, 0.01, required=False,
msg="The default step size when calculating metrics on a grid."),
quantile_grid=Param(list, np.linspace(0,1,100), required=False,
msg="The quantile value grid on which to evaluate the CDF values. (0, 1)"),
x_grid=Param(list, np.linspace(0,2.5, 301), required=False,
msg="The x-value grid at which to evaluate the pdf values."),
_random_state=Param(float, default=None, required=False,
msg="Random seed value to use for reproducible results."),
hdf5_groupname=Param(str, "photometry", required=False,
msg="HDF5 Groupname for truth table."),
reference_dictionary_key=Param(str, "redshift", required=False,
msg="The key in the `truth` dictionary where the redshift data is stored."),
)
inputs = [('input', QPHandle),
('truth', TableHandle)]
outputs = [('output', Hdf5Handle)]

def __init__(self, args, comm=None):
Evaluator.__init__(self, args, comm=comm)
self._output_handle = None
self._metric_dict = METRIC_DICT

def run(self):
print(f"Requested metrics: {self.config.metrics}")

estimate_iterator = self.input_iterator('input')
reference_iterator = self.input_iterator('truth')

first = True
for estimate_data_chunk, reference_data_chunk in zip(estimate_iterator, reference_iterator):
chunk_start, chunk_end, estimate_data = estimate_data_chunk
_, _, reference_data = reference_data_chunk

print(f"Processing {self.rank} running evaluator on chunk {chunk_start} - {chunk_end}.")
self._process_chunk(chunk_start, chunk_end, estimate_data, reference_data, first)
first = False

self._output_handle.finalize_write()

#pylint: disable=too-many-arguments
def _process_chunk(self, start, end, estimate_data, reference_data, first):
out_table = {}
for metric in self.config.metrics:

if metric not in self._metric_dict:
#! Make the following a logged error instead of bailing out of the stage.
# raise ValueError(
# f"Unsupported metric requested: '{metric}'.
# Available metrics are: {self._metric_dict.keys()}")
continue

this_metric = self._metric_dict[metric](**self.config.to_dict())

if this_metric.metric_output_type == MetricOutputType.single_value:
print(f"We can't calculate a value for {this_metric.metric_name} right now.")
continue

if this_metric.metric_output_type == MetricOutputType.single_distribution:
print(f"We can't calculate a value for {this_metric.metric_name} right now.")
continue

out_table[metric] = this_metric.evaluate(
estimate_data,
reference_data[self.config.reference_dictionary_key]
)

out_table_to_write = {key: np.array(val).astype(float) for key, val in out_table.items()}

if first:
self._output_handle = self.add_handle('output', data=out_table_to_write)
self._output_handle.initialize_write(self._input_length, communicator=self.comm)
self._output_handle.set_data(out_table_to_write, partial=True)
self._output_handle.write_chunk(start, end)
73 changes: 73 additions & 0 deletions src/rail/evaluation/point_to_point_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import numpy as np

from ceci.config import StageParameter as Param
from qp.metrics.base_metric_classes import PointToPointMetric

from rail.core.data import Hdf5Handle, TableHandle
from rail.core.stage import RailStage
from rail.evaluation.evaluator import Evaluator

# dynamically build a dictionary of all available metrics of the appropriate type
METRIC_DICT = {}
for subcls in PointToPointMetric.__subclasses__():
METRIC_DICT[subcls.metric_name] = subcls

class PointToPointEvaluator(Evaluator):
"""Evaluate the performance of a photo-z estimator against reference point estimate"""

name = 'PointToPointEvaluator'
config_options = RailStage.config_options.copy()
config_options.update(
metrics=Param(list, [], required=False,
msg="The metrics you want to evaluate."),
chunk_size=Param(int, 1000, required=False,
msg="The default number of PDFs to evaluate per loop."),
_random_state=Param(float, default=None, required=False,
msg="Random seed value to use for reproducible results."),
)
inputs = [('input', TableHandle),
('truth', TableHandle)]
outputs = [('output', Hdf5Handle)]

def __init__(self, args, comm=None):
Evaluator.__init__(self, args, comm=comm)
self._output_handle = None
self._metric_dict = METRIC_DICT

def run(self):
print(f"Requested metrics: {self.config.metrics}")

estimate_iterator = self.input_iterator('input')
reference_iterator = self.input_iterator('truth')

first = True
for estimate_data_chunk, reference_data_chunk in zip(estimate_iterator, reference_iterator):
chunk_start, chunk_end, estimate_data = estimate_data_chunk
_, _, reference_data = reference_data_chunk

print(f"Processing {self.rank} running evaluator on chunk {chunk_start} - {chunk_end}.")
self._process_chunk(chunk_start, chunk_end, estimate_data, reference_data, first)
first = False

self._output_handle.finalize_write()

def _process_chunk(self, start, end, estimate_data, reference_data, first):
out_table = {}
for metric in self.config.metrics:
if metric not in self._metric_dict:
#! Make the following a logged error instead of bailing out of the stage.
# raise ValueError(
# f"Unsupported metric requested: '{metric}'.
# Available metrics are: {self._metric_dict.keys()}")
continue

this_metric = self._metric_dict[metric](**self.config.to_dict())
out_table[metric] = this_metric.evaluate(estimate_data, reference_data)

out_table_to_write = {key: np.array(val).astype(float) for key, val in out_table.items()}

if first:
self._output_handle = self.add_handle('output', data=out_table_to_write)
self._output_handle.initialize_write(self._input_length, communicator=self.comm)
self._output_handle.set_data(out_table_to_write, partial=True)
self._output_handle.write_chunk(start, end)
Loading
Loading