Skip to content

Commit

Permalink
refactoring: work in progress - mixin for InstantaneousAction, Event,…
Browse files Browse the repository at this point in the history
… Process
  • Loading branch information
Samuel Gobbi committed Dec 3, 2024
1 parent 11be2f1 commit 5583f04
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 130 deletions.
48 changes: 3 additions & 45 deletions unified_planning/model/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from typing import Any, Dict, List, Set, Union, Optional, Iterable
from collections import OrderedDict

from unified_planning.model.transition import Transition
from unified_planning.model.transition import SingleTimePointTransitionMixin, Transition


class Action(Transition):
Expand All @@ -50,7 +50,7 @@ def __call__(
)


class InstantaneousAction(Action):
class InstantaneousAction(Action, SingleTimePointTransitionMixin):
"""Represents an instantaneous action."""

def __init__(
Expand All @@ -61,7 +61,7 @@ def __init__(
**kwargs: "up.model.types.Type",
):
Action.__init__(self, _name, _parameters, _env, **kwargs)
self._preconditions: List["up.model.fnode.FNode"] = []
SingleTimePointTransitionMixin.__init__(self, _env)
self._effects: List[up.model.effect.Effect] = []
self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None
# fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment
Expand Down Expand Up @@ -139,15 +139,6 @@ def clone(self):
new_instantaneous_action._simulated_effect = self._simulated_effect
return new_instantaneous_action

@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` of the `Action` `preconditions`."""
return self._preconditions

def clear_preconditions(self):
"""Removes all the `Action preconditions`"""
self._preconditions = []

@property
def effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `Action effects`."""
Expand Down Expand Up @@ -180,36 +171,6 @@ def unconditional_effects(self) -> List["up.model.effect.Effect"]:
seldom as possible."""
return [e for e in self._effects if not e.is_conditional()]

def add_precondition(
self,
precondition: Union[
"up.model.fnode.FNode",
"up.model.fluent.Fluent",
"up.model.parameter.Parameter",
bool,
],
):
"""
Adds the given expression to `action's preconditions`.
:param precondition: The expression that must be added to the `action's preconditions`.
"""
(precondition_exp,) = self._environment.expression_manager.auto_promote(
precondition
)
assert self._environment.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._environment.expression_manager.TRUE():
return
free_vars = self._environment.free_vars_oracle.get_free_variables(
precondition_exp
)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)

def add_effect(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
Expand Down Expand Up @@ -378,9 +339,6 @@ def set_simulated_effect(self, simulated_effect: "up.model.effect.SimulatedEffec
)
self._simulated_effect = simulated_effect

def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]):
self._preconditions = preconditions


class DurativeAction(Action, TimedCondsEffs):
"""Represents a durative action."""
Expand Down
89 changes: 4 additions & 85 deletions unified_planning/model/natural_transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing import Any, Dict, List, Set, Union, Optional, Iterable
from collections import OrderedDict

from unified_planning.model.transition import Transition
from unified_planning.model.transition import SingleTimePointTransitionMixin, Transition


"""
Expand All @@ -38,7 +38,7 @@
"""


class NaturalTransition(Transition):
class NaturalTransition(Transition, SingleTimePointTransitionMixin):
"""This is the `NaturalTransition` interface"""


Expand All @@ -53,7 +53,7 @@ def __init__(
**kwargs: "up.model.types.Type",
):
Transition.__init__(self, _name, _parameters, _env, **kwargs)
self._preconditions: List["up.model.fnode.FNode"] = []
SingleTimePointTransitionMixin.__init__(self, _env)
self._effects: List[up.model.effect.Effect] = []
# fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment
self._fluents_assigned: Dict[
Expand Down Expand Up @@ -114,15 +114,6 @@ def clone(self):
new_process._fluents_inc_dec = self._fluents_inc_dec.copy()
return new_process

@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` of the `Process` `preconditions`."""
return self._preconditions

def clear_preconditions(self):
"""Removes all the `Process preconditions`"""
self._preconditions = []

@property
def effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `Process effects`."""
Expand All @@ -141,36 +132,6 @@ def _add_effect_instance(self, effect: "up.model.effect.Effect"):

self._effects.append(effect)

def add_precondition(
self,
precondition: Union[
"up.model.fnode.FNode",
"up.model.fluent.Fluent",
"up.model.parameter.Parameter",
bool,
],
):
"""
Adds the given expression to `Process's preconditions`.
:param precondition: The expression that must be added to the `Process's preconditions`.
"""
(precondition_exp,) = self._environment.expression_manager.auto_promote(
precondition
)
assert self._environment.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._environment.expression_manager.TRUE():
return
free_vars = self._environment.free_vars_oracle.get_free_variables(
precondition_exp
)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)

def add_derivative(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
Expand Down Expand Up @@ -223,7 +184,7 @@ def __init__(
**kwargs: "up.model.types.Type",
):
Transition.__init__(self, _name, _parameters, _env, **kwargs)
self._preconditions: List["up.model.fnode.FNode"] = []
SingleTimePointTransitionMixin.__init__(self, _env)
self._effects: List[up.model.effect.Effect] = []
# fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment
self._fluents_assigned: Dict[
Expand Down Expand Up @@ -268,15 +229,6 @@ def clone(self):
new_event._fluents_inc_dec = self._fluents_inc_dec.copy()
return new_event

@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` of the `Event` `preconditions`."""
return self._preconditions

def clear_preconditions(self):
"""Removes all the `Event preconditions`"""
self._preconditions = []

@property
def effects(self) -> List["up.model.effect.Effect"]:
"""Returns the `list` of the `Event effects`."""
Expand Down Expand Up @@ -308,36 +260,6 @@ def unconditional_effects(self) -> List["up.model.effect.Effect"]:
seldom as possible."""
return [e for e in self._effects if not e.is_conditional()]

Check warning on line 261 in unified_planning/model/natural_transition.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/model/natural_transition.py#L261

Added line #L261 was not covered by tests

def add_precondition(
self,
precondition: Union[
"up.model.fnode.FNode",
"up.model.fluent.Fluent",
"up.model.parameter.Parameter",
bool,
],
):
"""
Adds the given expression to `event's preconditions`.
:param precondition: The expression that must be added to the `event's preconditions`.
"""
(precondition_exp,) = self._environment.expression_manager.auto_promote(
precondition
)
assert self._environment.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._environment.expression_manager.TRUE():
return
free_vars = self._environment.free_vars_oracle.get_free_variables(
precondition_exp
)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)

def add_effect(
self,
fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"],
Expand Down Expand Up @@ -481,9 +403,6 @@ def _add_effect_instance(self, effect: "up.model.effect.Effect"):
)
self._effects.append(effect)

def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]):
self._preconditions = preconditions

def __repr__(self) -> str:
s = []
s.append(f"event {self.name}")
Expand Down
48 changes: 48 additions & 0 deletions unified_planning/model/transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,51 @@ def __getattr__(self, parameter_name: str) -> "up.model.parameter.Parameter":
def is_conditional(self) -> bool:
"""Returns `True` if the `Transition` has `conditional effects`, `False` otherwise."""
raise NotImplementedError

Check warning on line 154 in unified_planning/model/transition.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/model/transition.py#L154

Added line #L154 was not covered by tests


class SingleTimePointTransitionMixin:
def __init__(self, _env):
self._preconditions: List["up.model.fnode.FNode"] = []
self._environment = get_environment(_env)

@property
def preconditions(self) -> List["up.model.fnode.FNode"]:
"""Returns the `list` of the `Action` `preconditions`."""
return self._preconditions

def clear_preconditions(self):
"""Removes all the `Action preconditions`"""
self._preconditions = []

def add_precondition(
self,
precondition: Union[
"up.model.fnode.FNode",
"up.model.fluent.Fluent",
"up.model.parameter.Parameter",
bool,
],
):
"""
Adds the given expression to `action's preconditions`.
:param precondition: The expression that must be added to the `action's preconditions`.
"""
(precondition_exp,) = self._environment.expression_manager.auto_promote(
precondition
)
assert self._environment.type_checker.get_type(precondition_exp).is_bool_type()
if precondition_exp == self._environment.expression_manager.TRUE():
return
free_vars = self._environment.free_vars_oracle.get_free_variables(
precondition_exp
)
if len(free_vars) != 0:
raise UPUnboundedVariablesError(

Check warning on line 195 in unified_planning/model/transition.py

View check run for this annotation

Codecov / codecov/patch

unified_planning/model/transition.py#L195

Added line #L195 was not covered by tests
f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}"
)
if precondition_exp not in self._preconditions:
self._preconditions.append(precondition_exp)

def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]):
self._preconditions = preconditions

0 comments on commit 5583f04

Please sign in to comment.