Skip to content

Commit

Permalink
Share ensemble smry datasets in memory between plugins (#569)
Browse files Browse the repository at this point in the history
Share ensemble smry datasets in memory between plugins
  • Loading branch information
sigurdp authored Feb 23, 2021
1 parent 58e6e6b commit 7e4a1c7
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED] - YYYY-MM-DD
### Fixed
- [#569](https://github.com/equinor/webviz-subsurface/pull/569) - Allow sharing of ensemble smry datasets in memory between plugins instances. Note that currently sharing can only be accomplished between plugin instances that use the same ensembles, column_keys and time_index.
- [#552](https://github.com/equinor/webviz-subsurface/pull/552) - Fixed an issue where webvizstore was not properly initialized in ParameterAnalysis plugin
- [#549](https://github.com/equinor/webviz-subsurface/pull/549) - Fixed issue in WellCrossSectionFMU that prevented use of user provided colors.
- [#561](https://github.com/equinor/webviz-subsurface/pull/561) - Fixed issue in ParameterAnalysis for non-numeric parameters (dropping them).
Expand Down
7 changes: 3 additions & 4 deletions tests/unit_tests/model_tests/test_ensemble_set_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def test_single_ensemble(testdata_folder):
assert emodel.ens_folders == {
"iter-0": Path(testdata_folder) / "reek_history_match"
}
assert len(emodel.ensembles) == 1
smry = emodel.load_smry()
smry = emodel.get_or_load_smry_cached()
assert len(smry.columns) == 476
assert len(smry["DATE"].unique()) == 480
assert smry["ENSEMBLE"].unique() == ["iter-0"]
Expand Down Expand Up @@ -66,7 +65,7 @@ def test_smry_load_multiple_ensembles(testdata_folder):
),
}
)
smry = emodel.load_smry()
smry = emodel.get_or_load_smry_cached()
assert len(smry.columns) == 476
assert len(smry["DATE"].unique()) == 1141
assert set(smry["ENSEMBLE"].unique()) == set(
Expand Down Expand Up @@ -134,7 +133,7 @@ def test_webvizstore(testdata_folder):
)
emodel.load_parameters()
assert len(emodel.webvizstore) == 4
emodel.load_smry()
emodel.get_or_load_smry_cached()
assert len(emodel.webvizstore) == 8
emodel.load_smry_meta()
assert len(emodel.webvizstore) == 12
Expand Down
41 changes: 41 additions & 0 deletions webviz_subsurface/_models/caching_ensemble_set_model_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
from typing import Union, Optional, Dict
import threading

from .ensemble_set_model import EnsembleSetModel


# Module level globals
# Do we actually need to consider locking here or will this access always be single threaded?
_cache_lock = threading.Lock()
_ensemble_set_model_cache: Dict[str, EnsembleSetModel] = {}


def get_or_create_model(
ensemble_paths: dict,
time_index: Optional[Union[list, str]] = None,
column_keys: Optional[list] = None,
) -> EnsembleSetModel:

modelkey = json.dumps(
{
"ensemble_paths": ensemble_paths,
"time_index": time_index,
"column_keys": column_keys,
}
)

with _cache_lock:
if modelkey in _ensemble_set_model_cache:
# Just return existing model from cache
return _ensemble_set_model_cache[modelkey]

# No matching model in cache -> create a new ensemble set model and insert in cache
new_model = EnsembleSetModel(
ensemble_paths=ensemble_paths,
smry_time_index=time_index,
smry_column_keys=column_keys,
)
_ensemble_set_model_cache[modelkey] = new_model

return new_model
6 changes: 6 additions & 0 deletions webviz_subsurface/_models/ensemble_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ def load_csv(self, csv_file: pathlib.Path) -> pd.DataFrame:
def _load_parameters(self) -> pd.DataFrame:
return self.load_ensemble().parameters

# What should we do with the memoize decorator here?
# If we leave it in place, we will spend memory storing the pickled version of the
# return value wich is a waste when we're running a portable app.
# On the other hand, if we remove it we will save the memory, but during build of
# a portable app we will end up loading the ensemble's smry data twice. Once during
# normal init of the plugins and once when saving to the webviz store.
@CACHE.memoize(timeout=CACHE.TIMEOUT)
@webvizstore
def _load_smry(
Expand Down
76 changes: 48 additions & 28 deletions webviz_subsurface/_models/ensemble_set_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,39 @@ class EnsembleSetModel:
def __init__(
self,
ensemble_paths: dict,
ensemble_set_name: str = "EnsembleSet",
filter_file: Union[str, None] = "OK",
smry_time_index: Optional[Union[list, str]] = None,
smry_column_keys: Optional[list] = None,
) -> None:
self.ensemble_paths = ensemble_paths
self.ensemble_set_name = ensemble_set_name
self.filter_file = filter_file
self._ensemble_paths = ensemble_paths
self._webvizstore: List = []
self.ensembles = [
EnsembleModel(ens_name, ens_path, filter_file=self.filter_file)
for ens_name, ens_path in self.ensemble_paths.items()
self._ensembles = [
EnsembleModel(ens_name, ens_path, filter_file="OK")
for ens_name, ens_path in self._ensemble_paths.items()
]

self._smry_time_index = smry_time_index
self._smry_column_keys = smry_column_keys
self._cached_smry_df: Optional[pd.DataFrame] = None
self._hash_for_cached_smry_df: Optional[pd.Series] = None

def __repr__(self) -> str:
return f"EnsembleSetModel: {self.ensemble_paths}"
return f"EnsembleSetModel: {self._ensemble_paths}"

@property
def ens_folders(self) -> dict:
"""Get root folders for ensemble set"""
return {
ens: pathlib.Path(ens_path.split("realization")[0])
for ens, ens_path in self.ensemble_paths.items()
for ens, ens_path in self._ensemble_paths.items()
}

def _get_ensembles_data(self, func: str, **kwargs: Any) -> pd.DataFrame:
@staticmethod
def _get_ensembles_data(
ensemble_models: List[EnsembleModel], func: str, **kwargs: Any
) -> pd.DataFrame:
"""Runs the provided function for each ensemble and concats dataframes"""
dfs = []
for ensemble in self.ensembles:
for ensemble in ensemble_models:
try:
dframe = getattr(ensemble, func)(**kwargs)
dframe.insert(0, "ENSEMBLE", ensemble.ensemble_name)
Expand All @@ -53,39 +59,53 @@ def _get_ensembles_data(self, func: str, **kwargs: Any) -> pd.DataFrame:
raise KeyError(f"No data found for {func} with arguments: {kwargs}")

def load_parameters(self) -> pd.DataFrame:
return self._get_ensembles_data("load_parameters")
return EnsembleSetModel._get_ensembles_data(self._ensembles, "load_parameters")

def load_smry(
self,
time_index: Optional[Union[list, str]] = None,
column_keys: Optional[list] = None,
) -> pd.DataFrame:
return self._get_ensembles_data(
"load_smry", time_index=time_index, column_keys=column_keys
def get_or_load_smry_cached(self) -> pd.DataFrame:
"""Either loads smry data from file or retrieves a cached copy of DataFrame.
Note that it is imperative that the returned DataFrame be treated as read-only
since it will probably be shared by multiple clients.
"""

if self._cached_smry_df is not None:
# If we're returning cached data frame, try and verify that it hasn't been tampered with
curr_hash: pd.Series = pd.util.hash_pandas_object(self._cached_smry_df)
if not curr_hash.equals(self._hash_for_cached_smry_df):
raise KeyError("The cached SMRY DataFrame has been tampered with")

return self._cached_smry_df

self._cached_smry_df = EnsembleSetModel._get_ensembles_data(
self._ensembles,
"load_smry",
time_index=self._smry_time_index,
column_keys=self._smry_column_keys,
)
self._hash_for_cached_smry_df = pd.util.hash_pandas_object(self._cached_smry_df)

def load_smry_meta(
self,
column_keys: Optional[list] = None,
) -> pd.DataFrame:
return self._cached_smry_df

def load_smry_meta(self) -> pd.DataFrame:
"""Finds metadata for the summary vectors in the ensemble set.
Note that we assume the same units for all ensembles.
(meaning that we update/overwrite when checking the next ensemble)
"""

smry_meta: dict = {}
for ensemble in self.ensembles:
for ensemble in self._ensembles:
smry_meta.update(
ensemble.load_smry_meta(column_keys=column_keys).T.to_dict()
ensemble.load_smry_meta(column_keys=self._smry_column_keys).T.to_dict()
)
return pd.DataFrame(smry_meta).transpose()

def load_csv(self, csv_file: pathlib.Path) -> pd.DataFrame:
return self._get_ensembles_data("load_csv", csv_file=csv_file)
return EnsembleSetModel._get_ensembles_data(
self._ensembles, "load_csv", csv_file=csv_file
)

@property
def webvizstore(self) -> List[Tuple[Callable, List[Dict]]]:
store_functions = []
for ensemble in self.ensembles:
for ensemble in self._ensembles:
store_functions.extend(ensemble.webviz_store)
return store_functions
19 changes: 11 additions & 8 deletions webviz_subsurface/plugins/_bhp_qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from webviz_config import WebvizSettings

from webviz_subsurface._models import EnsembleSetModel
from webviz_subsurface._models import caching_ensemble_set_model_factory
from .._utils.unique_theming import unique_colors
from .._utils.colors import hex_to_rgba

Expand Down Expand Up @@ -48,15 +49,17 @@ def __init__(
else:
self.column_keys = [f"WBHP:{well}" for well in wells]

self.emodel = EnsembleSetModel(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
}
)
self.smry = self.emodel.load_smry(
time_index="raw", column_keys=self.column_keys
self.emodel: EnsembleSetModel = (
caching_ensemble_set_model_factory.get_or_create_model(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
},
time_index="raw",
column_keys=self.column_keys,
)
)
self.smry = self.emodel.get_or_load_smry_cached()
self.theme = webviz_settings.theme
self.set_callbacks(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import webviz_subsurface
from webviz_subsurface._models import EnsembleSetModel
from webviz_subsurface._models import caching_ensemble_set_model_factory
from .views import main_view
from .models import ParametersModel, SimulationTimeSeriesModel
from .controllers import (
Expand Down Expand Up @@ -97,21 +98,23 @@ def __init__(
self.csvfile_smry = csvfile_smry

if ensembles is not None:
self.emodel = EnsembleSetModel(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
}
self.emodel: EnsembleSetModel = (
caching_ensemble_set_model_factory.get_or_create_model(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
},
time_index=self.time_index,
column_keys=self.column_keys,
)
)
self.pmodel = ParametersModel(
dataframe=self.emodel.load_parameters(),
theme=self.theme,
drop_constants=drop_constants,
)
self.vmodel = SimulationTimeSeriesModel(
dataframe=self.emodel.load_smry(
time_index=self.time_index, column_keys=self.column_keys
),
dataframe=self.emodel.get_or_load_smry_cached()
)

elif self.csvfile_parameters is None:
Expand Down
19 changes: 11 additions & 8 deletions webviz_subsurface/plugins/_parameter_parallel_coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from webviz_config.webviz_store import webvizstore

from webviz_subsurface._models import EnsembleSetModel
from webviz_subsurface._models import caching_ensemble_set_model_factory
import webviz_subsurface._utils.parameter_response as parresp


Expand Down Expand Up @@ -184,20 +185,22 @@ def __init__(
'Incorrect arguments. Either provide "response_csv" or '
'"ensembles and/or response_file".'
)
self.emodel = EnsembleSetModel(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
}
self.emodel: EnsembleSetModel = (
caching_ensemble_set_model_factory.get_or_create_model(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
},
time_index=self.time_index,
column_keys=self.column_keys,
)
)
self.parameterdf = self.emodel.load_parameters()
if not self.no_responses:
if self.response_file:
self.responsedf = self.emodel.load_csv(csv_file=response_file)
else:
self.responsedf = self.emodel.load_smry(
time_index=self.time_index, column_keys=self.column_keys
)
self.responsedf = self.emodel.get_or_load_smry_cached()
self.response_filters["DATE"] = "single"
else:
raise ValueError(
Expand Down
12 changes: 8 additions & 4 deletions webviz_subsurface/plugins/_parameter_response_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from webviz_config.utils import calculate_slider_step

from webviz_subsurface._models import EnsembleSetModel
from webviz_subsurface._models import caching_ensemble_set_model_factory
from webviz_subsurface._datainput.fmu_input import load_parameters, load_csv
import webviz_subsurface._utils.parameter_response as parresp

Expand Down Expand Up @@ -173,11 +174,14 @@ def __init__(
ensemble_set_name="EnsembleSet",
)
else:
self.emodel = EnsembleSetModel(ensemble_paths=self.ens_paths)
self.responsedf = self.emodel.load_smry(
column_keys=self.column_keys,
time_index=self.time_index,
self.emodel: EnsembleSetModel = (
caching_ensemble_set_model_factory.get_or_create_model(
ensemble_paths=self.ens_paths,
column_keys=self.column_keys,
time_index=self.time_index,
)
)
self.responsedf = self.emodel.get_or_load_smry_cached()
self.response_filters["DATE"] = "single"
else:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import webviz_subsurface
from webviz_subsurface._models import EnsembleSetModel
from webviz_subsurface._models import caching_ensemble_set_model_factory
from .views.main_view import main_view
from .models import PropertyStatisticsModel, SimulationTimeSeriesModel
from .controllers.property_qc_controller import property_qc_controller
Expand Down Expand Up @@ -109,11 +110,15 @@ def __init__(
self.surface_folders: Union[dict, None]

if ensembles is not None:
self.emodel = EnsembleSetModel(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
}
self.emodel: EnsembleSetModel = (
caching_ensemble_set_model_factory.get_or_create_model(
ensemble_paths={
ens: webviz_settings.shared_settings["scratch_ensembles"][ens]
for ens in ensembles
},
time_index=self.time_index,
column_keys=self.column_keys,
)
)
self.pmodel = PropertyStatisticsModel(
dataframe=self.emodel.load_csv(
Expand All @@ -122,9 +127,7 @@ def __init__(
theme=self.theme,
)
self.vmodel = SimulationTimeSeriesModel(
dataframe=self.emodel.load_smry(
time_index=self.time_index, column_keys=self.column_keys
),
dataframe=self.emodel.get_or_load_smry_cached(),
theme=self.theme,
)
self.surface_folders = {
Expand Down
Loading

0 comments on commit 7e4a1c7

Please sign in to comment.