Skip to content

Commit

Permalink
More work on modularization of SystemInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
eisDNV committed Jan 14, 2025
1 parent e171a89 commit 4c63691
Show file tree
Hide file tree
Showing 21 changed files with 551 additions and 586 deletions.
12 changes: 5 additions & 7 deletions src/sim_explorer/assertion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# type: ignore

import ast
from typing import Any, Callable, Iterable, Iterator

Expand Down Expand Up @@ -69,7 +67,7 @@ def info(self, sym: str, typ: str = "instance") -> str | int:
elif typ == "variable": # get the generic variable name
return var
elif typ == "length": # get the number of elements
return len(self._cases_variables[var]["variables"])
return len(self._cases_variables[var]["refs"])
elif typ == "model": # get the basic (FMU) model
return self._cases_variables[var]["model"]
else:
Expand Down Expand Up @@ -250,8 +248,8 @@ def register_vars(self, variables: dict):
for key, info in variables.items():
for inst in info["instances"]:
if len(info["instances"]) == 1: # the instance is unique
self.symbol(key, len(info["variables"])) # we allow to use the 'short name' if unique
self.symbol(inst + "_" + key, len(info["variables"])) # fully qualified name can always be used
self.symbol(key, len(info["names"])) # we allow to use the 'short name' if unique
self.symbol(inst + "_" + key, len(info["names"])) # fully qualified name can always be used

def make_locals(self, loc: dict):
"""Adapt the locals with 'allowed' functions."""
Expand Down Expand Up @@ -303,7 +301,7 @@ def eval_single(self, key: str, kvargs: dict | list | tuple):
# print("kvargs", kvargs, self._syms[key], self.expr_get_symbols_functions(key))
return self._eval(locals()["_" + key], kvargs)

def eval_series(self, key: str, data: list[Any], ret: float | str | Callable | None = None):
def eval_series(self, key: str, data: list, ret: float | str | Callable | None = None):
"""Perform assertion on a (time) series.
Args:
Expand Down Expand Up @@ -336,7 +334,7 @@ def eval_series(self, key: str, data: list[Any], ret: float | str | Callable | N
_temp = self._temporal[key]["type"] if ret is None else Temporal.UNDEFINED

for row in data:
if not isinstance(row, Iterable): # can happen if the time itself is evaluated
if not isinstance(row, (tuple, list)): # can happen if the time itself is evaluated
time = row
row = [row]
elif "t" not in argnames: # the independent variable is not explicitly used in the expression
Expand Down
392 changes: 131 additions & 261 deletions src/sim_explorer/case.py

Large diffs are not rendered by default.

180 changes: 161 additions & 19 deletions src/sim_explorer/system_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from pathlib import Path
from typing import Any, TypeAlias

import numpy as np

from sim_explorer.json5 import Json5
from sim_explorer.utils.misc import from_xml, match_with_wildcard
from sim_explorer.utils.osp import read_system_structure_xml
Expand Down Expand Up @@ -50,6 +52,7 @@ def __init__(
self._models = self._get_models()
# self.simulator=None # derived classes override this to instantiate the system simulator
self.message = "" # possibility to save additional message for (optional) retrieval by client
self.log_level = log_level
self.full_simulator_available = False # only system and components specification available. No simulation!

@property
Expand Down Expand Up @@ -196,7 +199,7 @@ def variable_iter(self, variables: dict, flt: int | str | tuple | list):
if info["reference"] in ids:
yield (v, info)

def match_variables(self, component: str, varname: str) -> tuple[int]:
def match_variables(self, component: str, varname: str) -> tuple:
"""Based on an example component (instance), identify unique variables starting with 'varname'.
The returned information applies to all instances of the same model.
The variables shall all be of the same type, causality and variability.
Expand All @@ -208,7 +211,7 @@ def match_variables(self, component: str, varname: str) -> tuple[int]:
Returns
-------
Tuple of value references
Tuple of tuples (name,value reference)
"""

def accept_as_alias(org: str) -> bool:
Expand All @@ -231,7 +234,7 @@ def accept_as_alias(org: str) -> bool:
assert all(
v[e] == accepted[e] for e in ("type", "causality", "variability")
), f"Variable {k} matches {varname}, but properties do not match"
var.append(v["reference"])
var.append((k, v["reference"]))
return tuple(var)

def variable_name_from_ref(self, comp: int | str, ref: int) -> str:
Expand Down Expand Up @@ -349,7 +352,7 @@ def _check(cond, msg):
_variability = var_info["variability"]
_initial = var_info.get("initial", SystemInterface.default_initial(_causality, _variability))

if action == "get": # no restrictions on get
if action in ("get", "step"): # no restrictions on get
pass
elif action == "set":
if (
Expand Down Expand Up @@ -393,26 +396,165 @@ def _check(cond, msg):
return False
return True

def run_until(self, time: float):
"""Instruct the simulator to simulate until the given time."""
raise NotImplementedError("The method 'run_until()' cannot be used in SystemInterface") from None
return False
@classmethod
def update_refs_values(
cls, allrefs: tuple[int, ...], baserefs: tuple[int, ...], basevals: tuple, refs: tuple[int, ...], values: tuple
):
"""Update baserefs and basevals with refs and values according to all possible refs."""
allvals = [None] * len(allrefs)
for i, r in enumerate(baserefs):
allvals[allrefs.index(r)] = basevals[i]
for i, r in enumerate(refs):
allvals[allrefs.index(r)] = values[i]
_refs: list = []
_vals: list = []
for i, v in enumerate(allvals):
if v is not None:
_refs.append(allrefs[i])
_vals.append(allvals[i])
return (tuple(_refs), tuple(_vals))

def comp_model_var(self, cref: int, vref: int | tuple[int]):
"""Find the component name and the variable names from the provided reference(s)."""
model = None
for i, (_comp, m) in enumerate(self.components):
if i == cref:
model = m["model"]
comp = _comp
break
assert model is not None, f"Model for component id {cref} not found"
refs = (vref,) if isinstance(vref, int) else vref
var_names = []
for vr in refs:
var = None
for v, info in self.models[model]["variables"].items():
if info["reference"] == vr:
var = v
break
assert var is not None, f"Reference {vr} not found in model {model}"
var_names.append(var)
return (comp, model, var_names)

def _add_set(
self, actions: dict, time: float, cvar: str, comp: str, cvar_info: dict, values: tuple, rng: tuple | None = None
):
"""Perform final processing and add the set action to the list (if appropriate).
Properties of set actions:
def set_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...], var_vals: tuple) -> bool:
"""Provide a function which sets the 'variable' (of the given 'instance' model) to 'value'.
* both full case variable settings and partial settings are allowed and must be considered
* actions are recorded as tuples of (case-variable, component-name, value-references, values)
Args:
instance (int): identifier of the instance model for which the variable is to be set
var_refs (tuple): Tuple of variable references for which the values shall be set
var_vals (tuple): Tuple of values (of the correct type), used to set model variables
actions (dict): dict of get actions. The time slot is beforehand ensured.
time (float): the time at which the action is issued
cvar (str): the case variable name for which the action is performed
comp (str): the component name
cvar_info (dict): info about the case variable
values (tuple): tuple of values (correct type made sure)
rng (tuple)=None: Optional sub-range among the variables of cvar. None: whole variable
"""
raise NotImplementedError("The method 'set_variable_value()' cannot be used in SystemInterface") from None
refs = cvar_info["refs"] if rng is None else tuple([cvar_info["refs"][i] for i in rng])
assert len(refs) == len(values), f"Number of variable refs {refs} != values {values} in {cvar}, {comp}"
for i, (_cvar, _comp, _refs, _values) in enumerate(actions[time]): # go through existing actions for time
if cvar == _cvar and comp == _comp: # the case variable and the component name match
refs, values = self.update_refs_values(cvar_info["refs"], _refs, _values, refs, values)
actions[time][i] = (cvar, comp, refs, values) # replace action
return
# new set action
actions[time].append((cvar, comp, refs, values))

def get_variable_value(self, instance: int, typ: type, var_refs: tuple[int, ...]):
"""Provide an observer function which gets the 'variable' value (of the given 'instance' model) at the time when called.
def _add_get(self, actions: dict, time: float, cvar: str, comp: str, cvar_info: dict):
"""Perform final processing and add the get action to the list (if appropriate).
Properties of get actions:
* concern always the whole case-variable (all elements. rng not used)
* are tuples of (case-variable, component-name, variable-references)
* are never overridden for same time (no duplicate get actions for same component and cvar)
Args:
actions (dict): dict of get actions. The time slot is beforehand ensured.
time (float): the time at which the action is issued
cvar (str): the case variable name for which the action is performed
component (str): the component name for which the action is performed
cvar_info (dict): info about the case variable
"""
for _cvar, _comp, _vars in actions[time]: # go through existing actions for same time
if cvar == _cvar and comp == _comp: # match on case variable and component
return # the get action is already registered
actions[time].append((cvar, comp, cvar_info["refs"]))

def add_actions(
self,
actions: dict,
act_type: str,
cvar: str,
cvar_info: dict,
values: tuple | None,
at_time: float,
stoptime: float,
rng: tuple[int, ...] | None = None,
):
"""Add specified actions to the provided action dict.
The action list is simulator-agnostic and need 'compilation' before they are used in a simulation.
Args:
instance (int): identifier of the instance model for which the variable is to be set
var_refs (tuple): Tuple of variable references for which the values shall be retrieved
actions (dict): actions ('get' or 'set') registered so far
act_type (str): action type 'get', 'step' or 'set'
cvar (str): name of the case variable
cvar_info (dict): dict of variable info: {model, instances, names, refs, type, causality, variability}
see Cases.get_case_variables() for details.
values (PyVal) = None: Optional values (mandatory for 'set' actions)
at_time (float): time at which actions shall be triggered (may be scaled)
stoptime (float): simulation stop time (needed to handle 'step' actions)
rng (Iterable)=None: Optional range specification for compound variables (indices to address)
Returns
-------
Updated actions dict, where the whole dict is specific for get/set and new actions are added as
{at_time : [ (cvar, component-name, (variable-name-list)[, value-list, rng])},
where value-list and rng are only present for set actions
at-time=-1 for get actions denote step actions
"""
assert isinstance(at_time, (float, int)), f"Actions require a defined time as float. Found {at_time}"
if at_time not in actions:
actions.update({at_time: []}) # make sure that there is a suitable slot
for comp in cvar_info["instances"]:
if act_type == "get" or (act_type == "step" and at_time == -1): # normal get or step without time spec
self._add_get(actions, at_time, cvar, comp, cvar_info)
elif act_type == "step" and at_time >= 0: # step actions with specified interval
for time in np.arange(start=at_time, stop=stoptime, step=at_time):
self._add_get(actions, time, cvar, comp, cvar_info)

elif act_type == "set":
assert values is not None, f"Variable {cvar}: Value needed for 'set' actions."
self._add_set(
actions, at_time, cvar, comp, cvar_info, tuple([cvar_info["type"](x) for x in values]), rng
)
else:
raise KeyError(f"Unknown action type {act_type} at time {at_time}")

def do_action(self, time: int | float, act_info: tuple, typ: type):
"""Do the action described by the tuple using OSP functions."""
raise NotImplementedError("The method 'do_action()' cannot be used in SystemInterface") from None
return False

def action_step(self, act_info: tuple, typ: type):
"""Pre-compile the step action and return the partial function
so that it can be called at communication points.
"""
raise NotImplementedError("The method 'action_step()' cannot be used in SystemInterface") from None
return None

def init_simulator(self):
"""Instantiate and initialize the simulator, so that simulations can be run.
Perforemd separately from __init__ so that it can be repeated before simulation runs.
"""
raise NotImplementedError("The method 'get_variable_value()' cannot be used in SystemInterface") from None
raise NotImplementedError("The method 'init_simulator()' cannot be used in SystemInterface") from None
return False

def run_until(self, time: int | float):
"""Instruct the simulator to simulate until the given time."""
raise NotImplementedError("The method 'run_until()' cannot be used in SystemInterface") from None
return False
Loading

0 comments on commit 4c63691

Please sign in to comment.