Skip to content

Commit

Permalink
feature(interpreted functions): work in progress - new implementation…
Browse files Browse the repository at this point in the history
… for compiler for instantaneous actions
  • Loading branch information
Samuel Gobbi committed Oct 23, 2024
1 parent 9fac559 commit 8003ff2
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 43 deletions.
179 changes: 140 additions & 39 deletions unified_planning/engines/compilers/interpreted_functions_remover.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
check_and_simplify_conditions,
replace_action,
)
from unified_planning.shortcuts import UserType
from unified_planning.shortcuts import BoolType, UserType
from unified_planning.utils import powerset
from typing import List, Dict, Tuple, Optional, Iterator
from functools import partial
Expand Down Expand Up @@ -224,6 +224,12 @@ def _compile(
"kNum_for_interpreted_functions"
) # does this need to have a unique name?

fluent_function_dict: dict = dict()
fluent_function_dict.clear()
object_dict: dict = dict()
object_dict.clear()
fluents_assignments_tuple_list: list = list() # (fluent, input, output)
fluents_assignments_tuple_list.clear()
for a in problem.actions:
print(
"---------------------------------------------------------------------"
Expand All @@ -234,10 +240,10 @@ def _compile(
"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"
)
if isinstance(a, InstantaneousAction):
interpreted_functions_queue = deque()
interpreted_functions_queue: deque = deque()
no_IF_action_base = a.clone()
no_IF_action_base.clear_preconditions()
no_IF_action_preconditions_list = list()
no_IF_action_preconditions_list: list = list()
fixed_preconditions = []
for p in a.preconditions:
templist = self._fix_precondition(p)
Expand All @@ -247,7 +253,7 @@ def _compile(
if len(IFs) == 0:
no_IF_action_base.add_precondition(p)

all_combinations_for_this_action = OrderedDict()
all_combinations_for_this_action: OrderedDict = OrderedDict()

all_ifs_in_instantaneous_action: list = list()
all_ifs_in_instantaneous_action.clear()
Expand All @@ -263,6 +269,10 @@ def _compile(
interpreted_functions_queue.append(f)
# alternative implementation -----------------------------------------------------------------
temp_a = a.clone()
temp_a.clear_preconditions()
for p in fixed_preconditions:
temp_a.add_precondition(p)

print(len(all_ifs_in_instantaneous_action))
number_of_combinations = 2 ** len(all_ifs_in_instantaneous_action)
print("there are")
Expand All @@ -272,9 +282,7 @@ def _compile(
actions_list: list = list()
actions_list.clear()
actions_list.append(temp_a)
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# make lists of fluents, types and objects to add at the end?
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# moved lists/dict assignments from here
while len(interpreted_functions_queue) > 0:
IF_to_handle_now = interpreted_functions_queue.pop()
print(
Expand All @@ -288,7 +296,7 @@ def _compile(
print(IF_to_handle_now)
print("is an instance of")
print(IF_to_handle_now.interpreted_function())
temp_action_list = list()
temp_action_list: list = list()
temp_action_list.clear()
for partially_elaborated_action in actions_list:
# print (partially_elaborated_action)
Expand All @@ -297,22 +305,42 @@ def _compile(
temp_problem,
("f_" + IF_to_handle_now.interpreted_function().name),
)
placeholder_function_fluent = None
if placeholder_function_name in fluent_function_dict.keys():
placeholder_function_fluent = fluent_function_dict[
placeholder_function_name
]

else:
placeholder_function_fluent = Fluent(
placeholder_function_name,
IF_to_handle_now.interpreted_function().return_type,
input_object=kNum_for_interpreted_functions,
)
fluent_function_dict[
placeholder_function_name
] = placeholder_function_fluent
argnamesstring = ""
for arg in IF_to_handle_now.args:
argnamesstring = argnamesstring + "_" + str(arg)
print(argnamesstring)
action_parameter_name = (
"P_" + placeholder_function_name + argnamesstring
)

action_for_known_values = (
self._clone_instantaneous_add_parameter(
partially_elaborated_action,
action_parameter_name,
kNum_for_interpreted_functions,
)
)
action_parameter = action_for_known_values.parameter(
action_parameter_name
)
print("just cloned action")
print(action_for_known_values)

# action_for_known_values = (partially_elaborated_action.clone())

# maybe can use new_problem but not sure
Expand All @@ -324,9 +352,9 @@ def _compile(
# ? -> can I add this without reconstructing the action from scratch?
# ---------------------------------------

known_values_precondition_list = list()
known_values_precondition_list: list = list()
known_values_precondition_list.clear()
known_values_imply_list = list()
known_values_imply_list: list = list()
known_values_imply_list.clear()
for known_value in better_knowledge[
IF_to_handle_now.interpreted_function()
Expand All @@ -351,9 +379,30 @@ def _compile(

# print("placeholder name before double checking") # not sure double checking is necessary - the fluents have to be added only at the end I think
# print(compressed_input_name)
known_value_object = Object(
compressed_input_name,
kNum_for_interpreted_functions,
known_value_object = None
if compressed_input_name in object_dict.keys():
known_value_object = object_dict[
compressed_input_name
]
else:
known_value_object = Object(
compressed_input_name,
kNum_for_interpreted_functions,
)
object_dict[
compressed_input_name
] = known_value_object
# here add fluent assignment list
# placeholder_function_fluent(known_value_object) = VALUE
known_output_value = self._interpreted_functions_values[
known_value
]
fluents_assignments_tuple_list.append(
(
placeholder_function_fluent,
known_value_object,
known_output_value,
)
)
argcounter = 0
known_values_precondition = None
Expand All @@ -378,9 +427,7 @@ def _compile(

argcounter = argcounter + 1
# known_values_precondition implies that the value of the new action parameter is O_*knownvalue*
action_parameter = action_for_known_values.parameter(
action_parameter_name
)

imply_precondition = action_for_known_values.environment.expression_manager.Implies(
known_values_precondition,
action_for_known_values.environment.expression_manager.Equals(
Expand All @@ -392,27 +439,31 @@ def _compile(
known_values_precondition
)
# end of for known_value in better knowledge
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# for each known value we still have to add the special fluent assignment
# e.g. f_funx (O_11_1) = funx(11, 1)

# after having the special fluent assignment things done,
# we can substitute the instance of the IF
# e.g. funx (ione, itwo)
# with the new special fluent with the action parameter
# e.g. f_funx (P_f_funx_ione_itwo)
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --
# -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO -- TODO --

# print("preconditions for known values")
# for printp in known_values_precondition_list:
# print(printp)
# print("implications for known values")
# for printi in known_values_imply_list:
# print(printi)

substitution_dict_instantaneous_action = dict()
placeholder_function_expression = (
placeholder_function_fluent(action_parameter)
)
substitution_dict_instantaneous_action[
IF_to_handle_now
] = placeholder_function_expression
substituter_instantaneous_action: up.model.walkers.Substituter = up.model.walkers.Substituter(
action_for_known_values.environment
)
temp_preconditions_list = (
action_for_known_values.preconditions
)
action_for_known_values.clear_preconditions()
for p in temp_preconditions_list:
temp_precondition = (
substituter_instantaneous_action.substitute(
p, substitution_dict_instantaneous_action
)
)
action_for_known_values.add_precondition(
temp_precondition
)

for p in known_values_imply_list:
action_for_known_values.add_precondition(p)
Or_condition = None
Expand Down Expand Up @@ -445,6 +496,10 @@ def _compile(
action_for_known_values
) # might not be necessary - just for naming purposes
# unknown values ----------------------------------------------------------------
# unknown values ----------------------------------------------------------------
# unknown values ----------------------------------------------------------------
# unknown values ----------------------------------------------------------------
# unknown values ----------------------------------------------------------------
# print ("now making action for unknown values ---")
# now add precondition for unknown values and remove the if
action_for_unknown_values = (
Expand Down Expand Up @@ -494,6 +549,22 @@ def _compile(
else:
print("we have no info about this IF")
print(IF_to_handle_now)
temp_action_list = list()
temp_action_list.clear()
for partially_elaborated_action in actions_list:
preconditions_no_info = (
partially_elaborated_action.preconditions
)
partially_elaborated_action.clear_preconditions()

for p in preconditions_no_info:
IFs = self.interpreted_functions_extractor.get(p)
if IF_to_handle_now not in IFs:
partially_elaborated_action.add_precondition(p)
temp_action_list.append(partially_elaborated_action)
actions_list.clear()
actions_list.extend(temp_action_list)

print(
"/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\"
)
Expand All @@ -515,6 +586,12 @@ def _compile(
print("done!")
print("final list of actions:")
print(actions_list)
print("\ndict of fluents to add to the problem")
print(fluent_function_dict)
print("\ndict of objects to add to the problem")
print(object_dict)
print("\nlist of tuples of fluent values to add to the problem")
print(fluents_assignments_tuple_list)
print(
"/\\ /\\"
)
Expand All @@ -533,6 +610,12 @@ def _compile(
print(
"\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/\\/"
)
for final_action in actions_list:
final_action.name = get_fresh_name(new_problem, a.name)
new_to_old[final_action] = a
new_problem.add_action(final_action)

continue
# old implementation will be removed
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
if len(all_ifs_in_instantaneous_action) != 0:
Expand All @@ -555,7 +638,7 @@ def _compile(

for kf in kfc:

substituter_instantaneous_action: up.model.walkers.Substituter = up.model.walkers.Substituter(
substituter_instantaneous_action_old: up.model.walkers.Substituter = up.model.walkers.Substituter(
new_action.environment
)

Expand All @@ -580,7 +663,7 @@ def _compile(
for pre in preconditions_to_substitute_list:

new_precondition = (
substituter_instantaneous_action.substitute(
substituter_instantaneous_action_old.substitute(
pre, subdict
)
)
Expand Down Expand Up @@ -1099,6 +1182,24 @@ def _compile(

else:
raise NotImplementedError
# TODO
# add fluents / objects / fluent tuples
print("final part, content of fluent dict")
for fkey in fluent_function_dict:
# change here if we need a specific default value
if fluent_function_dict[fkey].type.is_bool_type():
new_problem.add_fluent(fluent_function_dict[fkey])
else:
new_problem.add_fluent(fluent_function_dict[fkey])
for okey in object_dict:
new_problem.add_object(object_dict[okey])
for ftuple in fluents_assignments_tuple_list:
print(ftuple)
print(ftuple[0])
print(ftuple[1])
print(ftuple[2])
problem.set_initial_value(ftuple[0](ftuple[1]), ftuple[2])

print("compilation complete!")
print(new_problem)
return CompilerResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def test_interpreted_functions_in_preconditions_planner_refine(self):
self.assertEqual(result.plan.actions[1].action, problem.actions[1])
self.assertTrue(result.status in up.engines.results.POSITIVE_OUTCOMES)

# @pytest.mark.skip(reason="current implementation does not work")
@skipIfEngineNotAvailable("opt-pddl-planner")
def test_interpreted_functions_in_preconditions_planner_complex(self):
testproblem = self.problems["IF_in_conditions_complex_1"]
Expand Down
Loading

0 comments on commit 8003ff2

Please sign in to comment.