From 733bf869bb27c5b576e0e7ddb2d3028750425d7c Mon Sep 17 00:00:00 2001 From: Lukas Rothenberger Date: Tue, 13 Feb 2024 15:07:37 +0100 Subject: [PATCH 1/3] feat(optimizer): check for update necessity on frame exit --- .../DataTransfers/NewDataTransfers.py | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py index 8f9e4dd91..33ac1f6f7 100644 --- a/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py +++ b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py @@ -183,7 +183,24 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe queue += [p for p in get_predecessors(experiment.optimization_graph, current) if p not in queue] for device_id in self.entered_data_regions_by_device: for wda in self.entered_data_regions_by_device[device_id]: - # issue delete updates + # check if a delete or copy delete should be issued + issue_copy_delete = False + issue_delete = False + + if wda.memory_region not in memory.memory[experiment.get_system().get_host_device_id()]: + issue_copy_delete = True + else: + if ( + memory.memory[device_id][wda.memory_region].unique_id + > memory.memory[experiment.get_system().get_host_device_id()][wda.memory_region].unique_id + ): + # device has a more recent memory state than the host + issue_copy_delete = True + else: + # host has a more recent memory state than the device + issue_delete = True + + # issue delete update updates.append( Update( node_id, @@ -194,10 +211,11 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe False, last_cu_id, last_cu_id, - delete_data=True, + delete_data=issue_delete, + copy_delete_data=issue_copy_delete, ) ) - # updates += memory.perform_read(node_id, experiment.get_system().get_host_device_id(), cast(ReadDataAccess, wda)) + # cleanup memory del memory.memory[device_id][wda.memory_region] @@ -218,10 +236,14 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe copy_exit_update: Optional[Update] = None delete_exit_updates: List[Update] = [] for update in updates_by_mem_reg[mem_reg]: + if not update.copy_delete_data: + # data should only be deleted. + delete_exit_updates.append(update) + continue + # check for the most recent memory state if copy_exit_update is None: copy_exit_update = update continue - # check for the most recent memory state if update.write_data_access.unique_id > copy_exit_update.write_data_access.unique_id: # found a more recent memory state delete_exit_updates.append(copy_exit_update) @@ -229,12 +251,12 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe else: # update is older than copy_exit_update, add it to the list of deletions delete_exit_updates.append(update) - if copy_exit_update is None: - raise ValueError("copy_exit_update is NONE. Something went wrong here.") + # -> collect the updates and set the delete_data and copy_delete_data properties - copy_exit_update.copy_delete_data = True - copy_exit_update.delete_data = False - refined_updates.append(copy_exit_update) + if copy_exit_update is not None: + copy_exit_update.copy_delete_data = True + copy_exit_update.delete_data = False + refined_updates.append(copy_exit_update) for update in delete_exit_updates: update.copy_delete_data = False update.delete_data = True From 9132cf6d5681017a43bdfc14b71653f62e50ec50 Mon Sep 17 00:00:00 2001 From: Lukas Rothenberger Date: Tue, 13 Feb 2024 15:28:40 +0100 Subject: [PATCH 2/3] fix(optimizer)[interactive]: pattern inheritance --- .../interactive/interactive_optimizer.py | 25 +++++++++++++++---- .../result_classes/OptimizerOutputPattern.py | 7 +++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/discopop_library/discopop_optimizer/interactive/interactive_optimizer.py b/discopop_library/discopop_optimizer/interactive/interactive_optimizer.py index be3479a3b..f50a52336 100644 --- a/discopop_library/discopop_optimizer/interactive/interactive_optimizer.py +++ b/discopop_library/discopop_optimizer/interactive/interactive_optimizer.py @@ -143,17 +143,32 @@ def __create_optimizer_output_pattern( experiment.get_system().get_host_device_id(), experiment, ) - logger.debug("Initialized OptimizerOutputPattern based on " + str(suggestion_id)) + logger.info("Initialized OptimizerOutputPattern based on " + str(suggestion_id)) pattern_obj = experiment.detection_result.patterns.get_pattern_from_id(suggestion_id) if "device_id" in pattern_obj.__dict__: device_id = pattern_obj.__dict__["device_id"] else: device_id = experiment.get_system().get_host_device_id() - output_pattern.add_pattern( - pattern_obj.pattern_id, device_id, experiment.get_system().get_device(device_id).get_device_type() - ) - logger.debug("Added pattern : " + str(suggestion_id)) + + # unpack OptimizerOutputPattern if necessary + if type(pattern_obj) == OptimizerOutputPattern: + logger.info("OptimizerOutputPattern found!") + for contained_pattern in pattern_obj.applied_patterns: + logger.info("--> " + str(contained_pattern)) + output_pattern.add_pattern( + contained_pattern["pattern_id"], contained_pattern["device_id"], contained_pattern["device_type"] + ) + logger.info("Added sub-pattern : " + str(contained_pattern["pattern_id"])) + logger.info("Inherited decisions from pattern_obj: " + str(pattern_obj.decisions)) + output_pattern.decisions += pattern_obj.decisions + + else: + # regular pattern type + output_pattern.add_pattern( + pattern_obj.pattern_id, device_id, experiment.get_system().get_device(device_id).get_device_type() + ) + logger.info("Added pattern : " + str(suggestion_id)) if output_pattern is None: return None diff --git a/discopop_library/result_classes/OptimizerOutputPattern.py b/discopop_library/result_classes/OptimizerOutputPattern.py index 351f3f9a0..df75e6edd 100644 --- a/discopop_library/result_classes/OptimizerOutputPattern.py +++ b/discopop_library/result_classes/OptimizerOutputPattern.py @@ -71,7 +71,8 @@ def get_contained_decisions(self, experiment: Experiment) -> List[int]: if d not in decision_list: decision_list.append(d) for tmp_dict in self.applied_patterns: - for d in experiment.pattern_id_to_decisions_dict[tmp_dict["pattern_id"]]: - if d not in decision_list: - decision_list.append(d) + if tmp_dict["pattern_id"] in experiment.pattern_id_to_decisions_dict: + for d in experiment.pattern_id_to_decisions_dict[tmp_dict["pattern_id"]]: + if d not in decision_list: + decision_list.append(d) return decision_list From bf9026e1ca20ed51197fb9677334ff6c6e4fe350 Mon Sep 17 00:00:00 2001 From: Lukas Rothenberger Date: Tue, 13 Feb 2024 17:07:11 +0100 Subject: [PATCH 3/3] feat(optimizer): force synchronization at function call --- .../DataTransfers/NewDataTransfers.py | 37 ++++++++++++++++++- .../discopop_optimizer/PETParser/PETParser.py | 36 ++++++++++++++++++ .../classes/types/DataAccessType.py | 21 ++++++++--- 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py index 33ac1f6f7..9ea909c4f 100644 --- a/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py +++ b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py @@ -52,7 +52,13 @@ def perform_read(self, node_id: int, device_id: DeviceID, rda: ReadDataAccess) - if most_recent_write is None: most_recent_write = (dvid, self.memory[dvid][rda.memory_region]) else: - if self.memory[dvid][rda.memory_region].unique_id > most_recent_write[1].unique_id: + if self.memory[dvid][rda.memory_region].from_call: + most_recent_write = (dvid, self.memory[dvid][rda.memory_region]) + + elif ( + self.memory[dvid][rda.memory_region].unique_id > most_recent_write[1].unique_id + and not most_recent_write[1].from_call + ): most_recent_write = (dvid, self.memory[dvid][rda.memory_region]) # check for update necessity @@ -131,6 +137,35 @@ def perform_read(self, node_id: int, device_id: DeviceID, rda: ReadDataAccess) - ) ) + elif most_recent_write[1].from_call: + # force a synchronization + predecessors = get_predecessors(self.graph, node_id) + if len(predecessors) == 0: + predecessor = node_id + else: + predecessor = predecessors[0] + + # check if the update creates the memory on the device + if device_id in self.memory: + if rda.memory_region in self.memory[device_id]: + is_first_data_occurrence = False + else: + is_first_data_occurrence = True + else: + is_first_data_occurrence = True + + updates.append( + Update( + predecessor, + node_id, + most_recent_write[0], + device_id, + most_recent_write[1], + is_first_data_occurrence=is_first_data_occurrence, + source_cu_id=data_at(self.graph, predecessor).original_cu_id, + target_cu_id=data_at(self.graph, node_id).original_cu_id, + ) + ) else: # no update necessary, most recent data on the device pass diff --git a/discopop_library/discopop_optimizer/PETParser/PETParser.py b/discopop_library/discopop_optimizer/PETParser/PETParser.py index a1586549f..bb7a31412 100644 --- a/discopop_library/discopop_optimizer/PETParser/PETParser.py +++ b/discopop_library/discopop_optimizer/PETParser/PETParser.py @@ -40,6 +40,7 @@ from discopop_library.discopop_optimizer.classes.nodes.GenericNode import GenericNode from discopop_library.discopop_optimizer.classes.nodes.Loop import Loop from discopop_library.discopop_optimizer.classes.nodes.Workload import Workload +from discopop_library.discopop_optimizer.classes.types.DataAccessType import ReadDataAccess, WriteDataAccess from discopop_library.discopop_optimizer.utilities.MOGUtilities import ( add_call_edge, add_dataflow_edge, @@ -153,6 +154,12 @@ def parse(self) -> Tuple[nx.DiGraph, int]: # add calling edges self.__add_calling_edges() + logger.info("Inlining read and write information from function calls..") + # note: inlining at this position is a cheap but less reliable alternative to propagating everythin on function calls. + # note: full propagation leads to issues with WriteDataAccess unique ids and potentially broken results + self.__inline_reads_and_writes_from_call() + logger.info("Inlining read and write information from function calls done.") + if self.experiment.arguments.pin_function_calls_to_host: self.__pin_function_calls_to_host() @@ -1434,3 +1441,32 @@ def __propagate_reads_and_writes(self): for p in parents: data_at(self.graph, p).written_memory_regions.update(current_data.written_memory_regions) data_at(self.graph, p).read_memory_regions.update(current_data.read_memory_regions) + + def __inline_reads_and_writes_from_call(self): + for node in self.graph.nodes: + called = get_out_call_edges(self.graph, node) + if len(called) == 0: + continue + logger.info("Node: " + str(node) + " called: " + str(called)) + # identify reads and writes in called function + node_data = data_at(self.graph, node) + for function_id in called: + called_function_data = data_at(self.graph, function_id) + logger.info("--> called function: " + cast(FunctionRoot, called_function_data).name) + function_read_memory_regions: Dict[MemoryRegion, ReadDataAccess] = dict() + function_written_memory_regions: Dict[MemoryRegion, WriteDataAccess] = dict() + for rda in called_function_data.read_memory_regions: + if rda.memory_region not in function_read_memory_regions: + function_read_memory_regions[rda.memory_region] = rda + for wda in called_function_data.written_memory_regions: + if wda.memory_region not in function_written_memory_regions: + function_written_memory_regions[wda.memory_region] = wda + # append the gathered reads and writes to the calling node + for read_mem_reg in function_read_memory_regions: + call_rda = copy.deepcopy(function_read_memory_regions[read_mem_reg]) + call_rda.from_call = True + node_data.read_memory_regions.add(call_rda) + for written_mem_reg in function_written_memory_regions: + call_wda = copy.deepcopy(function_written_memory_regions[written_mem_reg]) + call_wda.from_call = True + node_data.written_memory_regions.add(call_wda) diff --git a/discopop_library/discopop_optimizer/classes/types/DataAccessType.py b/discopop_library/discopop_optimizer/classes/types/DataAccessType.py index 51eec2a3c..4b1a03dab 100644 --- a/discopop_library/discopop_optimizer/classes/types/DataAccessType.py +++ b/discopop_library/discopop_optimizer/classes/types/DataAccessType.py @@ -13,27 +13,37 @@ class ReadDataAccess(object): memory_region: MemoryRegion var_name: Optional[str] + from_call: bool - def __init__(self, memory_region: MemoryRegion, var_name: Optional[str]): + def __init__(self, memory_region: MemoryRegion, var_name: Optional[str], from_call: bool = False): self.memory_region = memory_region self.var_name = var_name + self.from_call = from_call def __str__(self): - return "R(" + self.memory_region + ")" + return_str = "" + return_str += "C" if self.from_call else "" + return_str += "R(" + self.memory_region + ")" + return return_str class WriteDataAccess(object): memory_region: MemoryRegion unique_id: int var_name: Optional[str] + from_call: bool - def __init__(self, memory_region: MemoryRegion, unique_id: int, var_name: Optional[str]): + def __init__(self, memory_region: MemoryRegion, unique_id: int, var_name: Optional[str], from_call: bool = False): self.memory_region = memory_region self.unique_id = unique_id self.var_name = var_name + self.from_call = from_call def __str__(self): - return "W(" + self.memory_region + "-" + str(self.unique_id) + ", --> " + self.var_name + ")" + return_str = "" + return_str += "C" if self.from_call else "" + return_str += "W(" + self.memory_region + "-" + str(self.unique_id) + ", --> " + self.var_name + ")" + return return_str def __hash__(self): return self.unique_id @@ -43,8 +53,9 @@ def toDict(self): result_dict["memory_region"] = self.memory_region result_dict["unique_id"] = self.unique_id result_dict["var_name"] = self.var_name + result_dict["from_call"] = self.from_call return result_dict def write_data_access_from_dict(values: Dict[str, Any]) -> WriteDataAccess: - return WriteDataAccess(values["memory_region"], values["unique_id"], values["var_name"]) + return WriteDataAccess(values["memory_region"], values["unique_id"], values["var_name"], values["from_call"])