From fcbf8946df9fadeefedfbc51d4a783ae1d29bfbd Mon Sep 17 00:00:00 2001 From: Samuel Date: Fri, 29 Nov 2024 11:34:01 +0100 Subject: [PATCH] separating events and processes from actions - work in progress --- unified_planning/model/__init__.py | 5 +- unified_planning/model/action.py | 445 ++----------------- unified_planning/model/mixins/actions_set.py | 8 +- unified_planning/model/problem.py | 24 +- unified_planning/test/examples/processes.py | 2 +- unified_planning/test/test_model.py | 8 +- unified_planning/test/test_pddl_io.py | 2 +- 7 files changed, 69 insertions(+), 425 deletions(-) diff --git a/unified_planning/model/__init__.py b/unified_planning/model/__init__.py index d0f61f013..abc20cd1d 100644 --- a/unified_planning/model/__init__.py +++ b/unified_planning/model/__init__.py @@ -19,8 +19,11 @@ InstantaneousAction, DurativeAction, SensingAction, +) + +from unified_planning.model.transition import( Process, - Event, + Event, ) from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind from unified_planning.model.expression import ( diff --git a/unified_planning/model/action.py b/unified_planning/model/action.py index 47b25bce1..1366cf60d 100644 --- a/unified_planning/model/action.py +++ b/unified_planning/model/action.py @@ -32,142 +32,14 @@ from typing import Any, Dict, List, Set, Union, Optional, Iterable from collections import OrderedDict +from unified_planning.model.transition import Transition -class Action(ABC): - """This is the `Action` interface.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - self._environment = get_environment(_env) - self._name = _name - self._parameters: "OrderedDict[str, up.model.parameter.Parameter]" = ( - OrderedDict() - ) - if _parameters is not None: - assert len(kwargs) == 0 - for n, t in _parameters.items(): - assert self._environment.type_manager.has_type( - t - ), "type of parameter does not belong to the same environment of the action" - self._parameters[n] = up.model.parameter.Parameter( - n, t, self._environment - ) - else: - for n, t in kwargs.items(): - assert self._environment.type_manager.has_type( - t - ), "type of parameter does not belong to the same environment of the action" - self._parameters[n] = up.model.parameter.Parameter( - n, t, self._environment - ) - - @abstractmethod - def __eq__(self, oth: object) -> bool: - raise NotImplementedError - - def _print_parameters(self, s): - first = True - for p in self.parameters: - if first: - s.append("(") - first = False - else: - s.append(", ") - s.append(str(p)) - if not first: - s.append(")") - - @abstractmethod - def __hash__(self) -> int: - raise NotImplementedError - - def __call__( - self, - *args: "up.model.Expression", - agent: Optional["up.model.multi_agent.Agent"] = None, - motion_paths: Optional[ - Dict["up.model.tamp.MotionConstraint", "up.model.tamp.Path"] - ] = None, - ) -> "up.plans.plan.ActionInstance": - params = tuple(args) - return up.plans.plan.ActionInstance( - self, params, agent=agent, motion_paths=motion_paths - ) - - @abstractmethod - def clone(self): - raise NotImplementedError - - @property - def environment(self) -> Environment: - """Returns this `Action` `Environment`.""" - return self._environment - - @property - def name(self) -> str: - """Returns the `Action` `name`.""" - return self._name - - @name.setter - def name(self, new_name: str): - """Sets the `Action` `name`.""" - self._name = new_name - - @property - def parameters(self) -> List["up.model.parameter.Parameter"]: - """Returns the `list` of the `Action parameters`.""" - return list(self._parameters.values()) - - def parameter(self, name: str) -> "up.model.parameter.Parameter": - """ - Returns the `parameter` of the `Action` with the given `name`. - - Example - ------- - >>> from unified_planning.shortcuts import * - >>> location_type = UserType("Location") - >>> move = InstantaneousAction("move", source=location_type, target=location_type) - >>> move.parameter("source") # return the "source" parameter of the action, with type "Location" - Location source - >>> move.parameter("target") - Location target - - If a parameter's name (1) does not conflict with an existing attribute of `Action` and (2) does not start with '_' - it can also be accessed as if it was an attribute of the action. For instance: - - >>> move.source - Location source - - :param name: The `name` of the target `parameter`. - :return: The `parameter` of the `Action` with the given `name`. - """ - if name not in self._parameters: - raise ValueError(f"Action '{self.name}' has no parameter '{name}'") - return self._parameters[name] - - def __getattr__(self, parameter_name: str) -> "up.model.parameter.Parameter": - if parameter_name.startswith("_"): - # guard access as pickling relies on attribute error to be thrown even when - # no attributes of the object have been set. - # In this case accessing `self._name` or `self._parameters`, would re-invoke __getattr__ - raise AttributeError(f"Action has no attribute '{parameter_name}'") - if parameter_name not in self._parameters: - raise AttributeError( - f"Action '{self.name}' has no attribute or parameter '{parameter_name}'" - ) - return self._parameters[parameter_name] - def is_conditional(self) -> bool: - """Returns `True` if the `Action` has `conditional effects`, `False` otherwise.""" - raise NotImplementedError +class Action(Transition): + """This is the `Action` interface.""" -class InstantaneousTransitionMixin(Action): +class InstantaneousAction(Action): """Represents an instantaneous action.""" def __init__( @@ -188,8 +60,35 @@ def __init__( # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() + def __repr__(self) -> str: + s = [] + s.append(f"action {self.name}") + first = True + for p in self.parameters: + if first: + s.append("(") + first = False + else: + s.append(", ") + s.append(str(p)) + if not first: + s.append(")") + s.append(" {\n") + s.append(" preconditions = [\n") + for c in self.preconditions: + s.append(f" {str(c)}\n") + s.append(" ]\n") + s.append(" effects = [\n") + for e in self.effects: + s.append(f" {str(e)}\n") + s.append(" ]\n") + if self._simulated_effect is not None: + s.append(f" simulated effect = {self._simulated_effect}\n") + s.append(" }") + return "".join(s) + def __eq__(self, oth: object) -> bool: - if isinstance(oth, InstantaneousTransitionMixin): + if isinstance(oth, InstantaneousAction): cond = ( self._environment == oth._environment and self._name == oth._name @@ -219,7 +118,7 @@ def clone(self): new_params = OrderedDict( (param_name, param.type) for param_name, param in self._parameters.items() ) - new_instantaneous_action = InstantaneousTransitionMixin( + new_instantaneous_action = InstantaneousAction( self._name, new_params, self._environment ) new_instantaneous_action._preconditions = self._preconditions[:] @@ -472,40 +371,6 @@ def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]): self._preconditions = preconditions -class InstantaneousAction(InstantaneousTransitionMixin): - def __repr__(self) -> str: - s = [] - s.append(f"action {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - if self._simulated_effect is not None: - s.append(f" simulated effect = {self._simulated_effect}\n") - s.append(" }") - return "".join(s) - - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_instantaneous_action = InstantaneousAction( - self._name, new_params, self._environment - ) - new_instantaneous_action._preconditions = self._preconditions[:] - new_instantaneous_action._effects = [e.clone() for e in self._effects] - new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy() - new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_instantaneous_action._simulated_effect = self._simulated_effect - return new_instantaneous_action - - class DurativeAction(Action, TimedCondsEffs): """Represents a durative action.""" @@ -782,246 +647,4 @@ def __repr__(self) -> str: s.append(f" {str(e)}\n") s.append(" ]\n") s.append(" }") - return "".join(s) - - -""" -Below we have natural transitions. These are not controlled by the agent and would probably need a proper subclass. Natural transitions can be of two kinds: -Processes or Events. -Processes dictate how numeric variables evolve over time through the use of time-derivative functions -Events dictate the analogous of urgent transitions in timed automata theory -""" - - -class Process(Action): - """This is the `Process` class, which implements the abstract `Action` class.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - Action.__init__(self, _name, _parameters, _env, **kwargs) - self._preconditions: List["up.model.fnode.FNode"] = [] - self._effects: List[up.model.effect.Effect] = [] - self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None - # fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment - self._fluents_assigned: Dict[ - "up.model.fnode.FNode", "up.model.fnode.FNode" - ] = {} - # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease - self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() - - def __repr__(self) -> str: - s = [] - s.append(f"process {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - s.append(" }") - return "".join(s) - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, Process): - cond = ( - self._environment == oth._environment - and self._name == oth._name - and self._parameters == oth._parameters - ) - return ( - cond - and set(self._preconditions) == set(oth._preconditions) - and set(self._effects) == set(oth._effects) - and self._simulated_effect == oth._simulated_effect - ) - else: - return False - - def __hash__(self) -> int: - res = hash(self._name) - for ap in self._parameters.items(): - res += hash(ap) - for p in self._preconditions: - res += hash(p) - for e in self._effects: - res += hash(e) - res += hash(self._simulated_effect) - return res - - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_process = Process(self._name, new_params, self._environment) - new_process._preconditions = self._preconditions[:] - new_process._effects = [e.clone() for e in self._effects] - new_process._fluents_assigned = self._fluents_assigned.copy() - new_process._fluents_inc_dec = self._fluents_inc_dec.copy() - new_process._simulated_effect = self._simulated_effect - return new_process - - @property - def preconditions(self) -> List["up.model.fnode.FNode"]: - """Returns the `list` of the `Process` `preconditions`.""" - return self._preconditions - - def clear_preconditions(self): - """Removes all the `Process preconditions`""" - self._preconditions = [] - - @property - def effects(self) -> List["up.model.effect.Effect"]: - """Returns the `list` of the `Process effects`.""" - return self._effects - - def clear_effects(self): - """Removes all the `Process's effects`.""" - self._effects = [] - self._fluents_assigned = {} - self._fluents_inc_dec = set() - - def _add_effect_instance(self, effect: "up.model.effect.Effect"): - assert ( - effect.environment == self._environment - ), "effect does not have the same environment of the Process" - - self._effects.append(effect) - - def add_precondition( - self, - precondition: Union[ - "up.model.fnode.FNode", - "up.model.fluent.Fluent", - "up.model.parameter.Parameter", - bool, - ], - ): - """ - Adds the given expression to `Process's preconditions`. - - :param precondition: The expression that must be added to the `Process's preconditions`. - """ - (precondition_exp,) = self._environment.expression_manager.auto_promote( - precondition - ) - assert self._environment.type_checker.get_type(precondition_exp).is_bool_type() - if precondition_exp == self._environment.expression_manager.TRUE(): - return - free_vars = self._environment.free_vars_oracle.get_free_variables( - precondition_exp - ) - if len(free_vars) != 0: - raise UPUnboundedVariablesError( - f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}" - ) - if precondition_exp not in self._preconditions: - self._preconditions.append(precondition_exp) - - def add_derivative( - self, - fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], - value: "up.model.expression.Expression", - ): - """ - Adds the given `time derivative effect` to the `process's effects`. - - :param fluent: The `fluent` is the numeric state variable of which this process expresses its time derivative, which in Newton's notation would be over-dot(fluent). - :param value: This is the actual time derivative function. For instance, `fluent = 4` expresses that the time derivative of `fluent` is 4. - """ - ( - fluent_exp, - value_exp, - condition_exp, - ) = self._environment.expression_manager.auto_promote( - fluent, - value, - True, - ) - if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): - raise UPUsageError( - "fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot." - ) - if not fluent_exp.type.is_compatible(value_exp.type): - raise UPTypeError( - f"Process effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" - ) - if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): - raise UPTypeError("Derivative can be created only on numeric types!") - self._add_effect_instance( - up.model.effect.Effect( - fluent_exp, - value_exp, - condition_exp, - kind=up.model.effect.EffectKind.DERIVATIVE, - forall=tuple(), - ) - ) - - -class Event(InstantaneousTransitionMixin): - """This class represents an event.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - InstantaneousTransitionMixin.__init__(self, _name, _parameters, _env, **kwargs) - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, Event): - return super().__eq__(oth) - else: - return False - - def __hash__(self) -> int: - res = hash(self._name) - for ap in self._parameters.items(): - res += hash(ap) - for p in self._preconditions: - res += hash(p) - for e in self._effects: - res += hash(e) - res += hash(self._simulated_effect) - return res - - def __repr__(self) -> str: - s = [] - s.append(f"event {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - if self._simulated_effect is not None: - s.append(f" simulated effect = {self._simulated_effect}\n") - s.append(" }") - return "".join(s) - - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_instantaneous_action = Event(self._name, new_params, self._environment) - new_instantaneous_action._preconditions = self._preconditions[:] - new_instantaneous_action._effects = [e.clone() for e in self._effects] - new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy() - new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_instantaneous_action._simulated_effect = self._simulated_effect - return new_instantaneous_action + return "".join(s) \ No newline at end of file diff --git a/unified_planning/model/mixins/actions_set.py b/unified_planning/model/mixins/actions_set.py index a2aeaabfb..80d7fdcb9 100644 --- a/unified_planning/model/mixins/actions_set.py +++ b/unified_planning/model/mixins/actions_set.py @@ -83,23 +83,23 @@ def durative_actions(self) -> Iterator["up.model.action.DurativeAction"]: yield a @property - def processes(self) -> Iterator["up.model.action.Process"]: + def processes(self) -> Iterator["up.model.transition.Process"]: """Returs all the sensing actions of the problem. IMPORTANT NOTE: this property does some computation, so it should be called as seldom as possible.""" for a in self._actions: - if isinstance(a, up.model.action.Process): + if isinstance(a, up.model.transition.Process): yield a @property - def events(self) -> Iterator["up.model.action.Event"]: + def events(self) -> Iterator["up.model.transition.Event"]: """Returs all the sensing actions of the problem. IMPORTANT NOTE: this property does some computation, so it should be called as seldom as possible.""" for a in self._actions: - if isinstance(a, up.model.action.Event): + if isinstance(a, up.model.transition.Event): yield a @property diff --git a/unified_planning/model/problem.py b/unified_planning/model/problem.py index 4a1934ece..dc57c2102 100644 --- a/unified_planning/model/problem.py +++ b/unified_planning/model/problem.py @@ -311,7 +311,19 @@ def _get_static_and_unused_fluents( (f.fluent() for e in exps for f in fve.get(e)) ) for a in self._actions: - if isinstance(a, up.model.action.InstantaneousTransitionMixin): + if isinstance(a, up.model.action.InstantaneousAction): + remove_used_fluents(*a.preconditions) + for e in a.effects: + remove_used_fluents(e.fluent, e.value, e.condition) + static_fluents.discard(e.fluent.fluent()) + if a.simulated_effect is not None: + # empty the set because a simulated effect reads all the fluents + unused_fluents.clear() + for f in a.simulated_effect.fluents: + static_fluents.discard(f.fluent()) + + elif isinstance(a, up.model.transition.Event): + # NOTE copypaste of above, with mixin should become one single block remove_used_fluents(*a.preconditions) for e in a.effects: remove_used_fluents(e.fluent, e.value, e.condition) @@ -332,7 +344,7 @@ def _get_static_and_unused_fluents( unused_fluents.clear() for f in se.fluents: static_fluents.discard(f.fluent()) - elif isinstance(a, up.model.action.Process): + elif isinstance(a, up.model.transition.Process): for e in a.effects: remove_used_fluents(e.fluent, e.value, e.condition) static_fluents.discard(e.fluent.fluent()) @@ -992,7 +1004,7 @@ def update_problem_kind_action( if isinstance(action, up.model.tamp.InstantaneousMotionAction): if len(action.motion_constraints) > 0: self.kind.set_problem_class("TAMP") - if isinstance(action, up.model.action.InstantaneousTransitionMixin): + if isinstance(action, up.model.action.InstantaneousAction): for c in action.preconditions: self.update_problem_kind_expression(c) for e in action.effects: @@ -1010,8 +1022,10 @@ def update_problem_kind_action( if len(action.simulated_effects) > 0: self.kind.set_simulated_entities("SIMULATED_EFFECTS") self.kind.set_time("CONTINUOUS_TIME") - elif isinstance(action, up.model.action.Process): - pass + elif isinstance(action, up.model.transition.Process): + pass # TODO add Process kind + elif isinstance(action, up.model.transition.Event): + pass # TODO add Event kind else: raise NotImplementedError diff --git a/unified_planning/test/examples/processes.py b/unified_planning/test/examples/processes.py index 999e45eb1..eaee2ffae 100644 --- a/unified_planning/test/examples/processes.py +++ b/unified_planning/test/examples/processes.py @@ -1,5 +1,5 @@ from unified_planning.shortcuts import * -from unified_planning.model.action import Process +from unified_planning.model.transition import Process from unified_planning.test import TestCase diff --git a/unified_planning/test/test_model.py b/unified_planning/test/test_model.py index dca8aa511..14ac7d60f 100644 --- a/unified_planning/test/test_model.py +++ b/unified_planning/test/test_model.py @@ -22,7 +22,7 @@ ) from unified_planning.test.examples import get_example_problems from unified_planning.test import unittest_TestCase, main -from unified_planning.model.action import InstantaneousTransitionMixin +from unified_planning.model.action import InstantaneousAction class TestModel(unittest_TestCase): @@ -71,7 +71,7 @@ def test_clone_problem_and_action(self): for action_1, action_2 in zip( problem_clone_1.actions, problem_clone_2.actions ): - if isinstance(action_2, InstantaneousTransitionMixin): + if isinstance(action_2, InstantaneousAction): action_2._effects = [] action_1_clone = action_1.clone() action_1_clone._effects = [] @@ -83,6 +83,10 @@ def test_clone_problem_and_action(self): action_2._effects = [] action_1_clone = action_1.clone() action_1_clone._effects = [] + elif isinstance(action_2, Event): + action_2._effects = [] + action_1_clone = action_1.clone() + action_1_clone._effects = [] else: raise NotImplementedError self.assertEqual(action_2, action_1_clone) diff --git a/unified_planning/test/test_pddl_io.py b/unified_planning/test/test_pddl_io.py index b39984fcd..a85bcdfe0 100644 --- a/unified_planning/test/test_pddl_io.py +++ b/unified_planning/test/test_pddl_io.py @@ -14,8 +14,8 @@ import os import tempfile +import pytest # type: ignore from typing import cast -import pytest import unified_planning from unified_planning.shortcuts import * from unified_planning.test import (