diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index f05ac95ff8c..8c631d3360a 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -31,7 +31,7 @@ from ert.ensemble_evaluator import EnsembleSnapshot, EvaluatorServerConfig from ert.runpaths import Runpaths from ert.storage import open_storage -from everest.config import EverestConfig +from everest.config import ControlConfig, ControlVariableGuessListConfig, EverestConfig from everest.optimizer.everest2ropt import everest2ropt from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST @@ -433,39 +433,36 @@ def _init_batch_data( evaluator_context: EvaluatorContext, cached_results: dict[int, Any], ) -> dict[int, dict[str, Any]]: - def add_control( - controls: dict[str, Any], - control_name: tuple[Any, ...], - control_value: float, - ) -> None: - group_name = control_name[0] - variable_name = control_name[1] - group = controls.get(group_name, {}) - if len(control_name) > 2: - index_name = str(control_name[2]) - if variable_name in group: - group[variable_name][index_name] = control_value - else: - group[variable_name] = {index_name: control_value} - else: - group[variable_name] = control_value - controls[group_name] = group - - batch_data = {} - for control_idx in range(control_values.shape[0]): - if control_idx not in cached_results and ( - evaluator_context.active is None - or evaluator_context.active[evaluator_context.realizations[control_idx]] - ): - controls: dict[str, Any] = {} - for control_name, control_value in zip( - self._everest_config.control_name_tuples, - control_values[control_idx, :], - strict=False, - ): - add_control(controls, control_name, control_value) - batch_data[control_idx] = controls - return batch_data + def _add_controls( + controls_config: list[ControlConfig], values: NDArray[np.float64] + ) -> dict[str, Any]: + batch_data_item: dict[str, Any] = {} + value_list = values.tolist() + for control in controls_config: + control_dict: dict[str, Any] = batch_data_item.get(control.name, {}) + for variable in control.variables: + variable_value = control_dict.get(variable.name, {}) + if isinstance(variable, ControlVariableGuessListConfig): + for idx in range(1, len(variable.initial_guess) + 1): + variable_value[idx] = value_list.pop(0) + elif variable.index is not None: + variable_value[variable.index] = value_list.pop(0) + else: + variable_value = value_list.pop(0) + control_dict[variable.name] = variable_value + batch_data_item[control.name] = control_dict + return batch_data_item + + active = evaluator_context.active + realizations = evaluator_context.realizations + return { + idx: _add_controls(self._everest_config.controls, control_values[idx, :]) + for idx in range(control_values.shape[0]) + if ( + idx not in cached_results + and (active is None or active[realizations[idx]]) + ) + } def _setup_sim( self, diff --git a/src/everest/config/everest_config.py b/src/everest/config/everest_config.py index d9dc43c9657..13f43065366 100644 --- a/src/everest/config/everest_config.py +++ b/src/everest/config/everest_config.py @@ -2,7 +2,6 @@ import os from argparse import ArgumentParser from copy import copy -from functools import cached_property from io import StringIO from itertools import chain from pathlib import Path @@ -647,25 +646,11 @@ def control_names(self): controls = self.controls or [] return [control.name for control in controls] - @cached_property - def control_name_tuples(self) -> list[tuple[str, str, int | tuple[str, str]]]: - tuples = [] - for control in self.controls: - for variable in control.variables: - if isinstance(variable, ControlVariableGuessListConfig): - for index in range(1, len(variable.initial_guess) + 1): - tuples.append((control.name, variable.name, index)) - elif variable.index is not None: - tuples.append((control.name, variable.name, variable.index)) - else: - tuples.append((control.name, variable.name)) - return tuples - @property def objective_names(self) -> list[str]: return [objective.name for objective in self.objective_functions] - @cached_property + @property def constraint_names(self) -> list[str]: names: list[str] = [] @@ -675,16 +660,12 @@ def _add_output_constraint(rhs_value: float | None, suffix=None): names.append(name if suffix is None else f"{name}:{suffix}") for constr in self.output_constraints or []: + _add_output_constraint(constr.target) _add_output_constraint( - constr.target, - ) - _add_output_constraint( - constr.upper_bound, - None if constr.lower_bound is None else "upper", + constr.upper_bound, None if constr.lower_bound is None else "upper" ) _add_output_constraint( - constr.lower_bound, - None if constr.upper_bound is None else "lower", + constr.lower_bound, None if constr.upper_bound is None else "lower" ) return names diff --git a/src/everest/config/utils.py b/src/everest/config/utils.py new file mode 100644 index 00000000000..6123949d21d --- /dev/null +++ b/src/everest/config/utils.py @@ -0,0 +1,117 @@ +from collections.abc import Iterator + +from .control_config import ControlConfig +from .control_variable_config import ( + ControlVariableConfig, + ControlVariableGuessListConfig, +) +from .sampler_config import SamplerConfig + + +class FlattenedControls: + def __init__(self, controls: list[ControlConfig]) -> None: + self._controls = [] + self._samplers: list[SamplerConfig] = [] + + for control in controls: + control_sampler_idx = -1 + variables = [] + for variable in control.variables: + if isinstance(variable, ControlVariableConfig): + var_dict = { + key: getattr(variable, key) + for key in [ + "control_type", + "enabled", + "auto_scale", + "scaled_range", + "min", + "max", + "perturbation_magnitude", + "initial_guess", + ] + } + + var_dict["name"] = ( + (control.name, variable.name) + if variable.index is None + else (control.name, variable.name, variable.index) + ) + + if variable.sampler is not None: + self._samplers.append(variable.sampler) + var_dict["sampler_idx"] = len(self._samplers) - 1 + else: + if control.sampler is not None and control_sampler_idx < 0: + self._samplers.append(control.sampler) + control_sampler_idx = len(self._samplers) - 1 + var_dict["sampler_idx"] = control_sampler_idx + + variables.append(var_dict) + elif isinstance(variable, ControlVariableGuessListConfig): + if control.sampler is not None and control_sampler_idx < 0: + self._samplers.append(control.sampler) + control_sampler_idx = len(self._samplers) - 1 + variables.extend( + { + "name": (control.name, variable.name, index + 1), + "initial_guess": guess, + "sampler_idx": control_sampler_idx, + } + for index, guess in enumerate(variable.initial_guess) + ) + + for var_dict in variables: + for key in [ + "type", + "initial_guess", + "control_type", + "enabled", + "auto_scale", + "min", + "max", + "perturbation_type", + "perturbation_magnitude", + "scaled_range", + ]: + if var_dict.get(key) is None: + var_dict[key] = getattr(control, key) + + self._controls.extend(variables) + + self.names = [control["name"] for control in self._controls] + self.types = [ + None if control["control_type"] is None else control["control_type"] + for control in self._controls + ] + self.initial_guesses = [control["initial_guess"] for control in self._controls] + self.lower_bounds = [control["min"] for control in self._controls] + self.upper_bounds = [control["max"] for control in self._controls] + self.auto_scales = [control["auto_scale"] for control in self._controls] + self.scaled_ranges = [ + (0.0, 1.0) if control["scaled_range"] is None else control["scaled_range"] + for control in self._controls + ] + self.enabled = [control["enabled"] for control in self._controls] + self.perturbation_magnitudes = [ + control["perturbation_magnitude"] for control in self._controls + ] + self.perturbation_types = [ + control["perturbation_type"] for control in self._controls + ] + self.sampler_indices = [control["sampler_idx"] for control in self._controls] + self.samplers = self._samplers + + +def control_tuples( + controls: list[ControlConfig], +) -> Iterator[tuple[str, str, int] | tuple[str, str]]: + for control in controls: + for variable in control.variables: + if isinstance(variable, ControlVariableGuessListConfig): + for index in range(1, len(variable.initial_guess) + 1): + yield (control.name, variable.name, index) + elif variable.index is not None: + yield (control.name, variable.name, variable.index) + else: + yield (control.name, variable.name) diff --git a/src/everest/optimizer/everest2ropt.py b/src/everest/optimizer/everest2ropt.py index b8aaafc4b60..24aa40aa7c9 100644 --- a/src/everest/optimizer/everest2ropt.py +++ b/src/everest/optimizer/everest2ropt.py @@ -1,12 +1,5 @@ import os -from collections import defaultdict -from collections.abc import Sequence -from dataclasses import asdict, dataclass -from typing import ( - Any, - Final, - TypeAlias, -) +from typing import Any from ropt.config.enopt import EnOptConfig from ropt.enums import ConstraintType, PerturbationType, VariableType @@ -14,226 +7,87 @@ from everest.config import ( ControlConfig, EverestConfig, - InputConstraintConfig, ModelConfig, ObjectiveFunctionConfig, OptimizationConfig, OutputConstraintConfig, - SamplerConfig, -) -from everest.config.control_variable_config import ( - ControlVariableConfig, - ControlVariableGuessListConfig, -) - -VariableName: TypeAlias = tuple[str, str, int] -ControlName: TypeAlias = tuple[str, str] | VariableName | list[VariableName] -StrListDict: TypeAlias = defaultdict[str, list] -IGNORE_KEYS: Final[tuple[str, ...]] = ( - "enabled", - "scaled_range", - "auto_scale", - "index", - "name", - "perturbation_magnitudes", ) +from everest.config.utils import FlattenedControls, control_tuples -def _collect_sampler( - sampler: SamplerConfig | None, - storage: dict[str, Any], - control_name: list[ControlName] | ControlName | None = None, -) -> dict[str, Any] | None: - if sampler is None: - return None - map = sampler.model_dump(exclude_none=True, exclude={"backend", "method"}) - map["method"] = sampler.ropt_method - control_names = map.setdefault("control_names", []) - if control_name: - control_names.extend( - control_name if isinstance(control_name, list) else [control_name] - ) - storage.setdefault("samplers", []).append(map) - return map - - -def _scale_translations( - is_scale: bool, - min_: float, - max_: float, - lower_bound: float, - upper_bound: float, - perturbation_type: PerturbationType, -) -> tuple[float, float, int]: - if not is_scale: - return 1.0, 0.0, perturbation_type.value - scale = (max_ - min_) / (upper_bound - lower_bound) - return scale, min_ - lower_bound * scale, PerturbationType.SCALED.value - - -@dataclass -class Control: - name: tuple[str, str] - enabled: bool - lower_bounds: float - upper_bounds: float - perturbation_magnitudes: float | None - initial_values: list[float] - types: VariableType - scaled_range: tuple[float, float] - auto_scale: bool - index: int | None - scales: float - offsets: float - perturbation_types: int - - -def _resolve_everest_control( - variable: ControlVariableConfig | ControlVariableGuessListConfig, - group: ControlConfig, -) -> Control: - scaled_range = variable.scaled_range or group.scaled_range or (0, 1.0) - auto_scale = variable.auto_scale or group.auto_scale - lower_bound = group.min if variable.min is None else variable.min - upper_bound = group.max if variable.max is None else variable.max - - scale, offset, perturbation_type = _scale_translations( - auto_scale, - lower_bound, # type: ignore - upper_bound, # type: ignore - *scaled_range, - group.ropt_perturbation_type, - ) - return Control( - name=(group.name, variable.name), - enabled=group.enabled if variable.enabled is None else variable.enabled, # type: ignore - lower_bounds=lower_bound, # type: ignore - upper_bounds=upper_bound, # type: ignore - perturbation_magnitudes=group.perturbation_magnitude - if variable.perturbation_magnitude is None - else variable.perturbation_magnitude, - initial_values=group.initial_guess - if variable.initial_guess is None - else variable.initial_guess, # type: ignore - types=group.ropt_control_type - if variable.ropt_control_type is None - else variable.ropt_control_type, - scaled_range=scaled_range, - auto_scale=auto_scale, - index=getattr(variable, "index", None), - scales=scale, - offsets=offset, - perturbation_types=perturbation_type, - ) - - -def _variable_initial_guess_list_injection( - control: Control, - *, - variables: StrListDict, - gradients: StrListDict, -) -> list[VariableName]: - guesses = len(control.initial_values) - ropt_names = [(*control.name, index + 1) for index in range(guesses)] - variables["names"].extend(ropt_names) - variables["initial_values"].extend(control.initial_values) - for key, value in asdict(control).items(): - if key not in {*IGNORE_KEYS, "initial_values"}: - (gradients if "perturbation" in key else variables)[key].extend( - [value] * guesses +def _parse_controls(ever_controls: list[ControlConfig], ropt_config): + controls = FlattenedControls(ever_controls) + control_types = [ + None if type_ is None else VariableType[type_.upper()] + for type_ in controls.types + ] + if all(item is None for item in controls.auto_scales): + offsets = None + scales = None + else: + scales = [ + (ub - lb) / (sr[1] - sr[0]) if au else 1.0 + for au, lb, ub, sr in zip( + controls.auto_scales, + controls.lower_bounds, + controls.upper_bounds, + controls.scaled_ranges, + strict=True, ) - gradients["perturbation_magnitudes"].extend( - [ - ( - (max(variables["upper_bounds"]) - min(variables["lower_bounds"])) / 10.0 - if control.perturbation_magnitudes is None - else control.perturbation_magnitudes + ] + offsets = [ + lb - sr[0] * sc if au else 0.0 + for au, lb, sc, sr in zip( + controls.auto_scales, + controls.lower_bounds, + scales, + controls.scaled_ranges, + strict=True, ) ] - * guesses - ) - return ropt_names + indices = [idx for idx, is_enabled in enumerate(controls.enabled) if is_enabled] + ropt_config["variables"] = { + "names": controls.names, + "types": None if all(item is None for item in control_types) else control_types, + "initial_values": controls.initial_guesses, + "lower_bounds": controls.lower_bounds, + "upper_bounds": controls.upper_bounds, + "offsets": offsets, + "scales": scales, + "indices": indices if indices else None, + "delimiters": "_-", + } + if "gradients" not in ropt_config: + ropt_config["gradient"] = {} -def _variable_initial_guess_injection( - control: Control, - *, - variables: StrListDict, - gradients: StrListDict, -) -> ControlName: - ropt_names: ControlName = ( - control.name if control.index is None else (*control.name, control.index) - ) - variables["names"].append(ropt_names) - for key, value in asdict(control).items(): - if key not in IGNORE_KEYS: - (gradients if "perturbation" in key else variables)[key].append(value) - gradients["perturbation_magnitudes"].append( - (max(variables["upper_bounds"]) - min(variables["lower_bounds"])) / 10.0 - if control.perturbation_magnitudes is None - else control.perturbation_magnitudes - ) - return ropt_names - - -def _parse_controls(controls: Sequence[ControlConfig], ropt_config): - """Extract info from ever_config['controls']""" - enabled = [] - variables: StrListDict = defaultdict(list) - gradients: StrListDict = defaultdict(list) - auto_scale = False - - for group in controls: - sampler = _collect_sampler(group.sampler, ropt_config) - - for variable in group.variables: - control = _resolve_everest_control(variable, group) - enabled.append(control.enabled) - control_injector = ( - _variable_initial_guess_list_injection - if isinstance(variable.initial_guess, list) - else _variable_initial_guess_injection - ) - ropt_names = control_injector( - control, - variables=variables, - gradients=gradients, - ) + if any(item >= 0 for item in controls.sampler_indices): + ropt_config["samplers"] = [ + { + "method": f"{sampler.backend}/{sampler.method}", + "options": {} if sampler.options is None else sampler.options, + "shared": False if sampler.shared is None else sampler.shared, + } + for sampler in controls.samplers + ] + ropt_config["gradient"]["samplers"] = [ + max(0, idx) for idx in controls.sampler_indices + ] - if ( - _collect_sampler(variable.sampler, ropt_config, ropt_names) is None - and sampler - ): - control_names = sampler["control_names"] - ( - control_names.extend - if isinstance(ropt_names, list) - else control_names.append - )(ropt_names) - - if control.auto_scale: - auto_scale = True - - if not auto_scale: - del variables["scales"] - del variables["offsets"] - - ropt_config["variables"] = dict(variables) - ropt_config["variables"]["indices"] = ( - None if all(enabled) else [idx for idx, item in enumerate(enabled) if item] - ) - ropt_config["variables"]["delimiters"] = "_-" - ropt_config["gradient"] = dict(gradients) + default_magnitude = (max(controls.upper_bounds) - min(controls.lower_bounds)) / 10.0 + ropt_config["gradient"]["perturbation_magnitudes"] = [ + default_magnitude if perturbation_magnitude is None else perturbation_magnitude + for perturbation_magnitude in controls.perturbation_magnitudes + ] - # The samplers in the list constructed above contain the names of the - # variables they should apply to, but ropt expects a array of indices that - # map variables to the samplers that should apply to them: - if samplers := ropt_config.get("samplers"): - sampler_indices = [0] * len(variables["names"]) - for idx, sampler in enumerate(samplers): - for name in sampler.pop("control_names"): # type: ignore - sampler_indices[variables["names"].index(name)] = idx - ropt_config["gradient"]["samplers"] = sampler_indices + ropt_config["gradient"]["perturbation_types"] = [ + PerturbationType.SCALED.value + if auto_scale + else PerturbationType[perturbation_type.upper()] + for perturbation_type, auto_scale in zip( + controls.perturbation_types, controls.auto_scales, strict=True + ) + ] def _parse_objectives(objective_functions: list[ObjectiveFunctionConfig], ropt_config): @@ -285,14 +139,19 @@ def _parse_objectives(objective_functions: list[ObjectiveFunctionConfig], ropt_c ropt_config["function_transforms"] = transforms -def _parse_input_constraints( - input_constraints: list[InputConstraintConfig] | None, - ropt_config, - formatted_names, -): - if not input_constraints: +def _parse_input_constraints(ever_config: EverestConfig, ropt_config): + if not ever_config.input_constraints: return + formatted_names = [ + ( + f"{control_name[0]}.{control_name[1]}-{control_name[2]}" + if len(control_name) > 2 + else f"{control_name[0]}.{control_name[1]}" + ) + for control_name in control_tuples(ever_config.controls) + ] + coefficients_matrix = [] rhs_values = [] types = [] @@ -303,7 +162,7 @@ def _add_input_constraint(rhs_value, coefficients, constraint_type): rhs_values.append(rhs_value) types.append(constraint_type) - for constr in input_constraints: + for constr in ever_config.input_constraints: coefficients = [0.0] * len(formatted_names) for name, value in constr.weights.items(): coefficients[formatted_names.index(name)] = value @@ -519,18 +378,8 @@ def everest2ropt(ever_config: EverestConfig) -> EnOptConfig: ropt_config: dict[str, Any] = {} _parse_controls(ever_config.controls, ropt_config) - - control_names = [ - ( - f"{control_name[0]}.{control_name[1]}-{control_name[2]}" - if len(control_name) > 2 - else f"{control_name[0]}.{control_name[1]}" - ) - for control_name in ropt_config["variables"]["names"] - ] - _parse_objectives(ever_config.objective_functions, ropt_config) - _parse_input_constraints(ever_config.input_constraints, ropt_config, control_names) + _parse_input_constraints(ever_config, ropt_config) _parse_output_constraints(ever_config.output_constraints, ropt_config) _parse_optimization( ever_opt=ever_config.optimization,