diff --git a/discopop_library/CodeGenerator/classes/UnpackedSuggestion.py b/discopop_library/CodeGenerator/classes/UnpackedSuggestion.py index 14ace1dbb..b30797d9d 100644 --- a/discopop_library/CodeGenerator/classes/UnpackedSuggestion.py +++ b/discopop_library/CodeGenerator/classes/UnpackedSuggestion.py @@ -65,7 +65,7 @@ def __get_device_update_pragmas(self): openmp_source_device_id = self.values["openmp_source_device_id"] openmp_target_device_id = self.values["openmp_target_device_id"] range: Optional[Tuple[int, int]] = self.values["range"] - print("IS FIRST DATA OCCURRENCE?: ", is_first_data_occurrence) + delete_data: bool = self.values["delete_data"] def get_range_str(r): return "" if r is None else "[" + str(r[0]) + ":" + str(r[1]) + "]" @@ -91,7 +91,7 @@ def get_range_str(r): elif source_device_id != self.host_device_id and target_device_id == self.host_device_id: # update type from - if is_first_data_occurrence: + if delete_data: pragma.pragma_str = "#pragma omp target exit data map(from:" else: pragma.pragma_str = "#pragma omp target update from(" diff --git a/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py new file mode 100644 index 000000000..9345be78c --- /dev/null +++ b/discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py @@ -0,0 +1,398 @@ +# This file is part of the DiscoPoP software (http://www.discopop.tu-darmstadt.de) +# +# Copyright (c) 2020, Technische Universitaet Darmstadt, Germany +# +# This software may be modified and distributed under the terms of +# the 3-Clause BSD License. See the LICENSE file in the package base +# directory for details. + +import logging +from typing import Dict, List, Optional, Set, Tuple, cast +import networkx as nx # type: ignore +from discopop_explorer.PEGraphX import MemoryRegion + +from discopop_library.discopop_optimizer.CostModels.CostModel import CostModel +from discopop_library.discopop_optimizer.Variables.Experiment import Experiment +from discopop_library.discopop_optimizer.classes.context.Update import Update +from discopop_library.discopop_optimizer.classes.context.ContextObject import ContextObject + +from discopop_library.discopop_optimizer.classes.nodes.FunctionRoot import FunctionRoot +from discopop_library.discopop_optimizer.classes.types.Aliases import DeviceID +from discopop_library.discopop_optimizer.classes.types.DataAccessType import ReadDataAccess, WriteDataAccess +from discopop_library.discopop_optimizer.utilities.MOGUtilities import ( + get_all_function_nodes, + get_children, + get_predecessors, + get_requirements, + get_successors, + show_decision_graph, +) +from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at +from discopop_library.discopop_optimizer.classes.nodes.ContextRestore import ContextRestore +from discopop_library.discopop_optimizer.classes.nodes.ContextSave import ContextSave + +logger = logging.getLogger("Optimizer") + + +class DeviceMemory(object): + memory: Dict[DeviceID, Dict[MemoryRegion, WriteDataAccess]] = dict() + graph: nx.DiGraph + experiment: Experiment + + def __init__(self, graph: nx.DiGraph, experiment: Experiment): + self.graph = graph + self.experiment = experiment + + def perform_read(self, node_id: int, device_id: DeviceID, rda: ReadDataAccess) -> List[Update]: + updates: List[Update] = [] + # get most recent write to the memory region + most_recent_write: Optional[Tuple[DeviceID, WriteDataAccess]] = None + for dvid in self.memory: + if rda.memory_region in self.memory[dvid]: + if most_recent_write is None: + most_recent_write = (dvid, self.memory[dvid][rda.memory_region]) + else: + if self.memory[dvid][rda.memory_region].unique_id > most_recent_write[1].unique_id: + most_recent_write = (dvid, self.memory[dvid][rda.memory_region]) + + # check for update necessity + if most_recent_write is None: + # data written outside the current function. Issue update from host device to device_id + predecessors = get_predecessors(self.graph, node_id) + if len(predecessors) == 0: + predecessor = node_id + else: + predecessor = predecessors[0] + dummy_wda = WriteDataAccess(rda.memory_region, 0, rda.var_name) + updates.append( + Update( + predecessor, + node_id, + self.experiment.get_system().get_host_device_id(), + device_id, + dummy_wda, + is_first_data_occurrence=True, + source_cu_id=data_at(self.graph, predecessor).original_cu_id, + target_cu_id=data_at(self.graph, node_id).original_cu_id, + ) + ) + + elif most_recent_write[0] != device_id: + # update necessary + predecessors = get_predecessors(self.graph, node_id) + if len(predecessors) == 0: + predecessor = node_id + else: + predecessor = predecessors[0] + + # check if the update creates the memory on the device + if device_id in self.memory: + if rda.memory_region in self.memory[device_id]: + is_first_data_occurrence = False + else: + is_first_data_occurrence = True + else: + is_first_data_occurrence = True + + # check for differing unique ids to prevent updates in an already synchronized state, if the data is already present on the device + already_synchronized = False + if not is_first_data_occurrence: + if device_id in self.memory: + if rda.memory_region in self.memory[device_id]: + if self.memory[device_id][rda.memory_region].unique_id >= most_recent_write[1].unique_id: + already_synchronized = True + logger.debug( + "Already synchronized: " + + str( + Update( + predecessor, + node_id, + most_recent_write[0], + device_id, + most_recent_write[1], + is_first_data_occurrence=is_first_data_occurrence, + source_cu_id=data_at(self.graph, predecessor).original_cu_id, + target_cu_id=data_at(self.graph, node_id).original_cu_id, + ) + ) + ) + + if not already_synchronized: + updates.append( + Update( + predecessor, + node_id, + most_recent_write[0], + device_id, + most_recent_write[1], + is_first_data_occurrence=is_first_data_occurrence, + source_cu_id=data_at(self.graph, predecessor).original_cu_id, + target_cu_id=data_at(self.graph, node_id).original_cu_id, + ) + ) + + else: + # no update necessary, most recent data on the device + pass + + if len(updates) > 0: + # update the memory state according to the updates + for update in updates: + self.perform_write(update.target_node_id, update.target_device_id, update.write_data_access) + + return updates + + def perform_write(self, node_id: int, device_id: DeviceID, wda: WriteDataAccess): + if device_id not in self.memory: + self.memory[device_id] = dict() + self.memory[device_id][wda.memory_region] = wda + + def log_state(self): + logger.debug("Memory state:") + for device_id in self.memory: + logger.debug("-> Device: " + str(device_id)) + for mem_reg in self.memory[device_id]: + logger.debug("\t-> " + str(self.memory[device_id][mem_reg])) + logger.debug("") + + +class DataFrame(object): + entered_data_regions_by_device: Dict[DeviceID, List[WriteDataAccess]] + + def __init__(self): + self.entered_data_regions_by_device = dict() + + def parse_update(self, update: Update): + if update.is_first_data_occurrence: + if update.target_device_id not in self.entered_data_regions_by_device: + self.entered_data_regions_by_device[update.target_device_id] = [] + self.entered_data_regions_by_device[update.target_device_id].append(update.write_data_access) + + def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Experiment) -> List[Update]: + """synchronize data to the host device and issue a delete update""" + updates: List[Update] = [] + # identify last cu_id for update positioning + queue = [node_id] + last_cu_id = None + while queue: + current = queue.pop(0) + current_data = data_at(experiment.optimization_graph, current) + if current_data.original_cu_id is not None: + last_cu_id = current_data.original_cu_id + break + queue += [p for p in get_predecessors(experiment.optimization_graph, current) if p not in queue] + + for device_id in self.entered_data_regions_by_device: + for wda in self.entered_data_regions_by_device[device_id]: + # issue delete updates + updates.append( + Update( + node_id, + node_id, + device_id, + experiment.get_system().get_host_device_id(), + wda, + False, + last_cu_id, + last_cu_id, + delete_data=True, + ) + ) + # updates += memory.perform_read(node_id, experiment.get_system().get_host_device_id(), cast(ReadDataAccess, wda)) + # cleanup memory + del memory.memory[device_id][wda.memory_region] + + logger.debug("CDF Updates: ") + for u in updates: + logger.debug("\t" + str(u)) + return updates + + def log_state(self): + logger.debug("DataFrame:") + for device_id in self.entered_data_regions_by_device: + logger.debug("-> Device: " + str(device_id)) + for wda in self.entered_data_regions_by_device[device_id]: + logger.debug("\t-> " + str(wda.memory_region) + " : " + wda.var_name) + logger.debug("") + + +def new_calculate_data_transfers( + graph: nx.DiGraph, decisions: List[int], experiment, targeted_functions: Optional[List[int]] = None +) -> List[Update]: + updates: List[Update] = [] + logger.debug("Calculating updates for configuration: " + str(decisions)) + + if targeted_functions is None: + targeted_functions = get_all_function_nodes(graph) + + for idx, function in enumerate(targeted_functions): + logger.debug( + "\t-> Function: " + + cast(FunctionRoot, data_at(graph, function)).name + + " " + + str(idx) + + " / " + + str(len(targeted_functions)) + ) + function_children = get_children(graph, function) + if len(function_children) == 0: + continue + # initialize "walker" + device_id_stack: List[int] = [cast(int, data_at(graph, function).device_id)] + return_node_stack: List[int] = [] + current_node: Optional[int] = None + next_node: Optional[int] = function_children[0] + + # initialize device memory + memory = DeviceMemory(graph, experiment) + + # initialize data frame stack, used to cleanup data from the devices when leaving a branch or function + dataframe_stack: List[DataFrame] = [DataFrame()] + + # traverse function graph and calculate necessary data updates + while next_node is not None or len(return_node_stack) != 0: + # 0: check if next_node exists + if next_node is not None: + current_node = next_node + next_node = None + else: + # if not, get next node from the return stack + current_node = return_node_stack.pop() + + # identify device id of the current node + current_node_data = data_at(graph, current_node) + current_device_id = ( + device_id_stack[-1] if current_node_data.device_id is None else current_node_data.device_id + ) + logger.debug("Current node: " + str(current_node) + " @ device " + str(current_device_id)) + + # identify necessary updates + tmp_updates: List[Update] = [] + for rda in current_node_data.read_memory_regions: + tmp_updates += memory.perform_read(current_node, current_device_id, rda) + # assumption (potential improvements for later): each written memory region is also read + # todo: replace this assumption with allocations on the device, if no prior data is read + for rda in cast(Set[ReadDataAccess], current_node_data.written_memory_regions): + tmp_updates += memory.perform_read(current_node, current_device_id, rda) + for wda in current_node_data.written_memory_regions: + memory.perform_write(current_node, current_device_id, wda) + memory.log_state() + + # register newly created data on a device in the current data frame + for update in tmp_updates: + dataframe_stack[-1].parse_update(update) + logger.debug("Data Frame Stack:") + for df in dataframe_stack: + df.log_state() + logger.debug("") + + # add a new data frame, if a branch is entered + if type(current_node_data) == ContextRestore: + dataframe_stack.append(DataFrame()) + + # close the current data frame if a branch is exited + if type(current_node_data) == ContextSave: + tmp_updates += dataframe_stack[-1].cleanup_dataframe(current_node, memory, experiment) + dataframe_stack.pop() + + # 1: prepare graph traversal + successors = get_successors(graph, current_node) + children = get_children(graph, current_node) + + # 1.1: select successor according to decisions + logger.debug("Successors: " + str(successors)) + logger.debug("Decisions: " + str(decisions)) + + if len(successors) == 1: + successor = successors[0] + else: + valid_successors = [s for s in successors if s in decisions] + logger.debug("valid successors: " + str(valid_successors)) + if len(valid_successors) == 1: + # check if one successor exists after filtering + successor = valid_successors[0] + else: + # check if any of the successors is a requirement + if len(valid_successors) == 0: + requirements = [] + for dec in decisions: + requirements += get_requirements(graph, dec) + valid_successors = [s for s in successors if s in requirements] + logger.debug("valid after requirements: " + str(valid_successors)) + + # if no successor is a requirement, select a successor which represents a sequential execution at device NONE + if len(valid_successors) == 0: + valid_successors = [ + s + for s in successors + if data_at(graph, s).represents_sequential_version() and data_at(graph, s).device_id == None + ] + logger.debug("valid successors after strict sequential check: " + str(valid_successors)) + + # if no valid successor with the strict sequential version check could be found, relax the condition and try again + if len(valid_successors) == 0: + valid_successors = [s for s in successors if data_at(graph, s).represents_sequential_version()] + logger.debug("valid successors after loose sequential check: " + str(valid_successors)) + + if len(valid_successors) > 0: + logger.debug("valid successors: " + str(valid_successors)) + successor = valid_successors[0] + else: + # still, no valid successor is identified, end of path or invalid path reached + successor = None + # pop element from device_stack + logger.debug("Path end reached!") + device_id_stack.pop() + tmp_updates += dataframe_stack[-1].cleanup_dataframe(current_node, memory, experiment) + dataframe_stack.pop() + + # 1.2: save successor to return_node_stack if required + if len(children) == 0: + next_node = successor + else: + return_node_stack.append(cast(int, successor)) + return_node_stack += children + for c in children: + device_id_stack.append(current_device_id) + dataframe_stack.append(DataFrame()) + # add device id of the parent to the device_id_stack for each child, as the last element will be popped when the path end is reached + # add a new dataframe for each child to enforce cleaning up before leaving a loop body, if necessary + # children will be visited before the successor is visited + + # 1.3: log state + logger.debug("next_node: " + str(next_node)) + logger.debug("return_stack: " + str(return_node_stack)) + logger.debug("device_id_stack: " + str(device_id_stack)) + logger.debug("") + + # todo remove + memory.log_state() + + # register identified updates + updates += tmp_updates + + logger.debug("Calculated updates: ") + for u in updates: + logger.debug("--> " + str(u)) + logger.debug("") + + return updates + + +def calculate_data_transfers_by_models( + graph: nx.DiGraph, function_performance_models: Dict[FunctionRoot, List[CostModel]], experiment +) -> Dict[FunctionRoot, List[Tuple[CostModel, ContextObject]]]: + """Calculate data transfers for each performance model and append the respective ContextObject to the result.""" + result_dict: Dict[FunctionRoot, List[Tuple[CostModel, ContextObject]]] = dict() + for function in function_performance_models: + result_dict[function] = [] + for model in function_performance_models[function]: + # create a ContextObject for the current path + context = ContextObject(function.node_id, [function.device_id]) + + context.necessary_updates = set( + new_calculate_data_transfers(graph, model.path_decisions, experiment, [function.node_id]) + ) + + result_dict[function].append((model, context)) + return result_dict diff --git a/discopop_library/discopop_optimizer/DataTransfers/calculate_configuration_data_movement.py b/discopop_library/discopop_optimizer/DataTransfers/calculate_configuration_data_movement.py index cf60a38f5..84052a8c5 100644 --- a/discopop_library/discopop_optimizer/DataTransfers/calculate_configuration_data_movement.py +++ b/discopop_library/discopop_optimizer/DataTransfers/calculate_configuration_data_movement.py @@ -10,6 +10,7 @@ from discopop_library.discopop_optimizer.CostModels.DataTransfer.DataTransferCosts import add_data_transfer_costs from discopop_library.discopop_optimizer.CostModels.utilities import get_performance_models_for_functions from discopop_library.discopop_optimizer.DataTransfers.DataTransfers import calculate_data_transfers +from discopop_library.discopop_optimizer.DataTransfers.NewDataTransfers import new_calculate_data_transfers from discopop_library.discopop_optimizer.UpdateOptimization.main import optimize_updates from discopop_library.discopop_optimizer.Variables.Experiment import Experiment from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern @@ -31,19 +32,23 @@ def calculate_data_movement(experiment: Experiment): len(experiment.detection_result.patterns.optimizer_output), ) # calculate necessary updates - function_performance_models_without_context = get_performance_models_for_functions( - experiment, experiment.optimization_graph, restrict_to_decisions=set(oo_suggestion.decisions) - ) + # function_performance_models_without_context = get_performance_models_for_functions( + # experiment, experiment.optimization_graph, restrict_to_decisions=set(oo_suggestion.decisions) + # ) - function_performance_models = calculate_data_transfers( - experiment.optimization_graph, function_performance_models_without_context, experiment - ) + # function_performance_models = calculate_data_transfers( + # experiment.optimization_graph, function_performance_models_without_context, experiment + # ) + + updates = new_calculate_data_transfers(experiment.optimization_graph, oo_suggestion.decisions, experiment) + for update in updates: + oo_suggestion.add_data_movement(update) - # collect necessary updates - for function in function_performance_models: - for cost_model, context in function_performance_models[function]: - for update in context.necessary_updates: - oo_suggestion.add_data_movement(update) + # # collect necessary updates + # for function in function_performance_models: + # for cost_model, context in function_performance_models[function]: + # for update in context.necessary_updates: + # oo_suggestion.add_data_movement(update) # optimize updates oo_suggestion = optimize_updates(experiment, oo_suggestion, experiment.arguments) diff --git a/discopop_library/discopop_optimizer/OptimizerArguments.py b/discopop_library/discopop_optimizer/OptimizerArguments.py index dc677bd06..0896b8a53 100644 --- a/discopop_library/discopop_optimizer/OptimizerArguments.py +++ b/discopop_library/discopop_optimizer/OptimizerArguments.py @@ -29,6 +29,7 @@ class OptimizerArguments(GeneralArguments): optimization_level: int optimization_level_2_parameters: str single_suggestions: bool + pin_function_calls_to_host: bool def __post_init__(self): self.__validate() diff --git a/discopop_library/discopop_optimizer/PETParser/PETParser.py b/discopop_library/discopop_optimizer/PETParser/PETParser.py index c6985d9a6..a1586549f 100644 --- a/discopop_library/discopop_optimizer/PETParser/PETParser.py +++ b/discopop_library/discopop_optimizer/PETParser/PETParser.py @@ -6,6 +6,7 @@ # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. import copy +import logging import os import pstats from typing import Dict, List, Optional, Tuple, Set, cast @@ -45,6 +46,7 @@ get_all_nodes_in_function, get_all_parents, get_nodes_by_functions, + get_out_call_edges, get_parent_function, get_parents, get_path_entry, @@ -65,6 +67,8 @@ from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at from time import time +logger = logging.getLogger("Optimizer") + class PETParser(object): pet: PEGraphX @@ -123,18 +127,23 @@ def parse(self) -> Tuple[nx.DiGraph, int]: if self.experiment.arguments.verbose: print("converted temporary edges") - # self.__mark_branch_affiliation() - # print("marked branch affiliations") - self.__calculate_data_flow() - if self.experiment.arguments.verbose: - print("calculated data flow") - if self.experiment.arguments.verbose: print("Propagating read/write information...") self.__propagate_reads_and_writes() if self.experiment.arguments.verbose: print("Propagated read/write information") + # self.__mark_branch_affiliation() + # print("marked branch affiliations") + # show(self.graph) + # self.__calculate_data_flow() + # self.__new_calculate_data_flow() + # if self.experiment.arguments.verbose: + # print("calculated data flow") + # show(self.graph) + # import sys + # sys.exit(0) + # remove invalid functions self.__remove_invalid_functions() @@ -144,6 +153,9 @@ def parse(self) -> Tuple[nx.DiGraph, int]: # add calling edges self.__add_calling_edges() + if self.experiment.arguments.pin_function_calls_to_host: + self.__pin_function_calls_to_host() + return self.graph, self.next_free_node_id def get_new_node_id(self) -> int: @@ -152,6 +164,21 @@ def get_new_node_id(self) -> int: self.next_free_node_id += 1 return buffer + def __pin_function_calls_to_host(self): + host_device_id = self.experiment.get_system().get_host_device_id() + logger.info("Pinning functions and function calls to host device: " + str(host_device_id)) + for node in get_all_function_nodes(self.graph): + node_data = data_at(self.graph, node) + if node_data.device_id != host_device_id: + logger.info("\tPinning function node: " + str(node)) + data_at(self.graph, node).device_id = host_device_id + for node in self.graph.nodes: + node_data = data_at(self.graph, node) + if node_data.device_id != host_device_id: + if len(get_out_call_edges(self.graph, node)) > 0: + logger.info("\tPinning calling node: " + str(node)) + data_at(self.graph, node).device_id = host_device_id + def __add_calling_edges(self): all_function_nodes = get_all_function_nodes(self.graph) @@ -1293,11 +1320,25 @@ def mark_branched_section(node, branch_stack): current_node = get_children(self.graph, function_node)[0] mark_branched_section(current_node, []) + # def __new_calculate_data_flow(self): + # """calculate dataflow in such a way, that no data is left on any device but the host and every relevant change is synchronized.""" + # self.in_data_flow = dict() + # self.out_data_flow = dict() + # + # data_transactions= dict() # stores created and removed data for unrolling when leaving a "frame" + # # Dict[device_id, List[("enter/exit", memreg)]] + # + # data_frame_stack: List[int] = [] # stores node ids which opened a data frame + # + # for function in get_all_function_nodes(self.graph): + # logger.info("calculate data flow for function: " + data_at(self.graph, function).name) + def __calculate_data_flow(self): self.in_data_flow = dict() self.out_data_flow = dict() def inlined_data_flow_calculation(current_node, current_last_writes): + # TODO add entering and exiting data frames to support resetting at end of a child section while current_node is not None: # check if current_node uses written data reads, writes = get_read_and_written_data_from_subgraph( @@ -1320,9 +1361,9 @@ def inlined_data_flow_calculation(current_node, current_last_writes): for mem_reg in writes: current_last_writes[mem_reg] = current_node - # inline children + ## start data_flow calculation for children for child in get_children(self.graph, current_node): - current_last_writes = inlined_data_flow_calculation(child, current_last_writes) + _current_last_writes = inlined_data_flow_calculation(child, current_last_writes) # continue to successor successors = get_successors(self.graph, current_node) diff --git a/discopop_library/discopop_optimizer/UpdateOptimization/RemoveLoopIndexUpdates.py b/discopop_library/discopop_optimizer/UpdateOptimization/RemoveLoopIndexUpdates.py index e72879220..872194084 100644 --- a/discopop_library/discopop_optimizer/UpdateOptimization/RemoveLoopIndexUpdates.py +++ b/discopop_library/discopop_optimizer/UpdateOptimization/RemoveLoopIndexUpdates.py @@ -6,8 +6,9 @@ # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. -from typing import List, cast -from discopop_explorer.PEGraphX import LoopNode +import logging +from typing import List, Set, cast +from discopop_explorer.PEGraphX import LoopNode, MemoryRegion from discopop_library.discopop_optimizer.OptimizerArguments import OptimizerArguments from discopop_library.discopop_optimizer.Variables.Experiment import Experiment from discopop_library.discopop_optimizer.classes.context.Update import Update @@ -15,10 +16,13 @@ from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern +logger = logging.getLogger("Optimizer") + def remove_loop_index_updates( experiment: Experiment, best_configuration: OptimizerOutputPattern, arguments: OptimizerArguments ) -> OptimizerOutputPattern: + mem_reg_blacklist: Set[MemoryRegion] = set() to_be_removed: List[Update] = [] for update in best_configuration.data_movement: # check for loop nodes as update targets @@ -35,10 +39,19 @@ def remove_loop_index_updates( # check for loop indices as targeted varbiables if update.write_data_access.var_name in loop_indices: # found update to loop index - if arguments.verbose: - print("ignoring update: ", str(update), " , since it targets a loop index.") - to_be_removed.append(update) + logger.info( + "# ignoring updates to mem_reg: " + + str(update.write_data_access.memory_region) + + " name: " + + update.write_data_access.var_name + + " since it targets a loop index." + ) + mem_reg_blacklist.add(update.write_data_access.memory_region) # remove identified loop index updates + for update in best_configuration.data_movement: + if update.write_data_access.memory_region in mem_reg_blacklist: + to_be_removed.append(update) + for tbr in to_be_removed: if tbr in best_configuration.data_movement: best_configuration.data_movement.remove(tbr) diff --git a/discopop_library/discopop_optimizer/UpdateOptimization/RemoveSameDeviceUpdates.py b/discopop_library/discopop_optimizer/UpdateOptimization/RemoveSameDeviceUpdates.py new file mode 100644 index 000000000..7df0f8fd1 --- /dev/null +++ b/discopop_library/discopop_optimizer/UpdateOptimization/RemoveSameDeviceUpdates.py @@ -0,0 +1,34 @@ +# This file is part of the DiscoPoP software (http://www.discopop.tu-darmstadt.de) +# +# Copyright (c) 2020, Technische Universitaet Darmstadt, Germany +# +# This software may be modified and distributed under the terms of +# the 3-Clause BSD License. See the LICENSE file in the package base +# directory for details. + +from typing import List +from discopop_library.discopop_optimizer.OptimizerArguments import OptimizerArguments +from discopop_library.discopop_optimizer.Variables.Experiment import Experiment +from discopop_library.discopop_optimizer.classes.context.Update import Update +from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern + + +def remove_same_device_updates( + experiment: Experiment, configuration: OptimizerOutputPattern, arguments: OptimizerArguments +) -> OptimizerOutputPattern: + """Remove updates where source and device id are equal. These are artifacts from initializing device memory states during the data movement calculation.""" + cleaned_updates: List[Update] = [] + for update in configuration.data_movement: + if update.source_device_id == update.target_device_id: + # ignore this update + continue + else: + cleaned_updates.append(update) + configuration.data_movement = cleaned_updates + + if arguments.verbose: + print("Removed same device updates") + for update in configuration.data_movement: + print("# ", update) + print() + return configuration diff --git a/discopop_library/discopop_optimizer/UpdateOptimization/main.py b/discopop_library/discopop_optimizer/UpdateOptimization/main.py index b17256d52..67f041005 100644 --- a/discopop_library/discopop_optimizer/UpdateOptimization/main.py +++ b/discopop_library/discopop_optimizer/UpdateOptimization/main.py @@ -15,6 +15,7 @@ ) from discopop_library.discopop_optimizer.UpdateOptimization.RemoveDuplicatedUpdates import remove_duplicated_updates from discopop_library.discopop_optimizer.UpdateOptimization.RemoveLoopIndexUpdates import remove_loop_index_updates +from discopop_library.discopop_optimizer.UpdateOptimization.RemoveSameDeviceUpdates import remove_same_device_updates from discopop_library.discopop_optimizer.Variables.Experiment import Experiment from discopop_library.discopop_optimizer.utilities.visualization.update_graph import show_update_graph from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern @@ -33,6 +34,9 @@ def optimize_updates( print("# ", update) print() + # remove same-device updates + configuration = remove_same_device_updates(experiment, configuration, arguments) + # optimize updates configuration = fix_loop_initialization_updates(experiment, configuration, arguments) @@ -40,7 +44,7 @@ def optimize_updates( configuration = remove_duplicated_updates(configuration, arguments) # remove loop index updates - configuration = remove_loop_index_updates(experiment, configuration, arguments) + # configuration = remove_loop_index_updates(experiment, configuration, arguments) # add ranges to be transferred to the updates configuration = add_ranges_to_updates(experiment, configuration, arguments) diff --git a/discopop_library/discopop_optimizer/__main__.py b/discopop_library/discopop_optimizer/__main__.py index 532f54189..c9068f50d 100644 --- a/discopop_library/discopop_optimizer/__main__.py +++ b/discopop_library/discopop_optimizer/__main__.py @@ -46,6 +46,7 @@ def parse_args() -> OptimizerArguments: ) parser.add_argument("--profiling", action="store_true", help="Enable profiling.") + parser.add_argument("--pin-function-calls-to-host", action="store_true", help="Force functions calls on the host system. Prevent offloading of entire functions.") parser.add_argument("--log", type=str, default="WARNING", help="Specify log level: DEBUG, INFO, WARNING, ERROR, CRITICAL") parser.add_argument("--write-log", action="store_true", help="Create Logfile.") # EXPERIMENTAL FLAGS: @@ -79,6 +80,7 @@ def parse_args() -> OptimizerArguments: single_suggestions=arguments.single_suggestions, log_level=arguments.log.upper(), write_log=arguments.write_log, + pin_function_calls_to_host=arguments.pin_function_calls_to_host, ) diff --git a/discopop_library/discopop_optimizer/classes/context/Update.py b/discopop_library/discopop_optimizer/classes/context/Update.py index b1d73531a..9d22e6126 100644 --- a/discopop_library/discopop_optimizer/classes/context/Update.py +++ b/discopop_library/discopop_optimizer/classes/context/Update.py @@ -10,6 +10,7 @@ from discopop_explorer.PEGraphX import NodeID, PEGraphX from discopop_library.discopop_optimizer.classes.types.Aliases import DeviceID from discopop_library.discopop_optimizer.classes.types.DataAccessType import ( + ReadDataAccess, WriteDataAccess, write_data_access_from_dict, ) @@ -26,6 +27,7 @@ class Update(object): source_cu_id: Optional[NodeID] target_cu_id: Optional[NodeID] range: Optional[Tuple[int, int]] + delete_data: bool def __init__( self, @@ -39,6 +41,7 @@ def __init__( target_cu_id: Optional[NodeID], originated_from_node: Optional[int] = None, range: Optional[Tuple[int, int]] = None, + delete_data: bool = False, ): self.source_node_id = source_node_id self.target_node_id = target_node_id @@ -50,9 +53,12 @@ def __init__( self.target_cu_id = target_cu_id self.originated_from_node = originated_from_node self.range = range + self.delete_data = delete_data def __str__(self): - result_str = "First" if self.is_first_data_occurrence else "" + result_str = "" + result_str += "IssueDelete " if self.delete_data else "" + result_str += "First" if self.is_first_data_occurrence else "" return ( result_str + "Update(" @@ -82,6 +88,7 @@ def get_pattern_string(self, pet: PEGraphX) -> str: result_dict["start_line"] = pet.node_at(cast(NodeID, self.source_cu_id)).start_position() result_dict["end_line"] = pet.node_at(cast(NodeID, self.target_cu_id)).start_position() result_dict["range"] = self.range + result_dict["delete_data"] = self.delete_data return json.dumps(result_dict) def toDict(self) -> Dict[str, Any]: @@ -95,6 +102,7 @@ def toDict(self) -> Dict[str, Any]: result_dict["source_cu_id"] = self.source_cu_id result_dict["target_cu_id"] = self.target_cu_id result_dict["range"] = self.range + result_dict["delete_data"] = self.delete_data return result_dict @@ -109,5 +117,6 @@ def construct_update_from_dict(values: Dict[str, Any]) -> Update: values["source_cu_id"], values["target_cu_id"], range=values["range"], + delete_data=values["delete_data"], ) return update diff --git a/discopop_library/discopop_optimizer/classes/system/Network.py b/discopop_library/discopop_optimizer/classes/system/Network.py index 43e2ad4eb..a32423c93 100644 --- a/discopop_library/discopop_optimizer/classes/system/Network.py +++ b/discopop_library/discopop_optimizer/classes/system/Network.py @@ -30,6 +30,8 @@ def add_connection(self, source: Device, target: Device, transfer_speed: Expr, i self.__transfer_initialization_costs[(source, target)] = initialization_delay def get_transfer_speed(self, source: Device, target: Device): + if source == target: + return Integer(1000000) # 1000 GB/s if (source, target) not in self.__transfer_speeds: if self.__host_device is None: raise ValueError("Host device of network unspecified!") @@ -39,6 +41,8 @@ def get_transfer_speed(self, source: Device, target: Device): return self.__transfer_speeds[(source, target)] def get_transfer_initialization_costs(self, source: Device, target: Device): + if source == target: + return Integer(0) if (source, target) not in self.__transfer_speeds: if self.__host_device is None: raise ValueError("Host device of network unspecified!") diff --git a/discopop_library/discopop_optimizer/classes/system/System.py b/discopop_library/discopop_optimizer/classes/system/System.py index 584115534..ae87aed1d 100644 --- a/discopop_library/discopop_optimizer/classes/system/System.py +++ b/discopop_library/discopop_optimizer/classes/system/System.py @@ -27,6 +27,7 @@ class System(object): __host_device_id: int __network: Network __device_do_all_overhead_models: Dict[Device, Expr] + __device_do_all_shared_overhead_models: Dict[Device, Expr] __device_reduction_overhead_models: Dict[Device, Expr] __symbol_substitutions: List[ Tuple[ @@ -43,6 +44,7 @@ def __init__(self, arguments: OptimizerArguments): self.__host_device_id = -1 self.__network = Network() self.__device_do_all_overhead_models = dict() + self.__device_do_all_shared_overhead_models = dict() self.__device_reduction_overhead_models = dict() self.__symbol_substitutions = [] @@ -92,6 +94,7 @@ def __build_CPU(self, device_configuration: Dict[str, Any]): openmp_device_id=device_configuration["device_id"], device_specific_compiler_flags="", speedup=device_configuration["speedup"], + compute_init_delays=device_configuration["compute_init_delays[us]"], ) self.add_device(cpu, device_configuration["device_id"]) @@ -102,6 +105,7 @@ def __build_GPU(self, device_configuration: Dict[str, Any]): openmp_device_id=device_configuration["device_id"], device_specific_compiler_flags="", speedup=device_configuration["speedup"], + compute_init_delays=device_configuration["compute_init_delays[us]"], ) self.add_device(gpu, device_configuration["device_id"]) @@ -110,6 +114,11 @@ def set_device_doall_overhead_model(self, device: Device, model: Expr, arguments print("System: Set DOALL overhead model: ", model) self.__device_do_all_overhead_models[device] = model + def set_device_doall_shared_overhead_model(self, device: Device, model: Expr, arguments: OptimizerArguments): + if arguments.verbose: + print("System: Set DOALL SHARED overhead model: ", model) + self.__device_do_all_shared_overhead_models[device] = model + def set_reduction_overhead_model(self, device: Device, model: Expr, arguments: OptimizerArguments): if arguments.verbose: print("System: Set REDUCTION overhead model: ", model) @@ -122,6 +131,13 @@ def get_device_doall_overhead_model(self, device: Device, arguments: OptimizerAr return Expr(Integer(0)) return self.__device_do_all_overhead_models[device] + def get_device_doall_shared_overhead_model(self, device: Device, arguments: OptimizerArguments) -> Expr: + if device not in self.__device_do_all_shared_overhead_models: + if arguments.verbose: + warnings.warn("No DOALL SHARED overhead model, assuming 0 for device: " + str(device)) + return Expr(Integer(0)) + return self.__device_do_all_shared_overhead_models[device] + def get_device_reduction_overhead_model(self, device: Device, arguments: OptimizerArguments) -> Expr: if device not in self.__device_reduction_overhead_models: if arguments.verbose: diff --git a/discopop_library/discopop_optimizer/classes/system/devices/CPU.py b/discopop_library/discopop_optimizer/classes/system/devices/CPU.py index b18fc3804..f11efedcc 100644 --- a/discopop_library/discopop_optimizer/classes/system/devices/CPU.py +++ b/discopop_library/discopop_optimizer/classes/system/devices/CPU.py @@ -5,15 +5,24 @@ # This software may be modified and distributed under the terms of # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. +from typing import Dict from discopop_library.discopop_optimizer.classes.system.devices.Device import Device from discopop_library.discopop_optimizer.classes.system.devices.DeviceTypeEnum import DeviceTypeEnum class CPU(Device): def __init__( - self, frequency, thread_count, openmp_device_id: int, device_specific_compiler_flags: str, speedup: float + self, + frequency, + thread_count, + openmp_device_id: int, + device_specific_compiler_flags: str, + speedup: float, + compute_init_delays: Dict[str, float], ): - super().__init__(frequency, thread_count, openmp_device_id, device_specific_compiler_flags, speedup) + super().__init__( + frequency, thread_count, openmp_device_id, device_specific_compiler_flags, speedup, compute_init_delays + ) def get_device_type(self) -> DeviceTypeEnum: return DeviceTypeEnum.CPU diff --git a/discopop_library/discopop_optimizer/classes/system/devices/Device.py b/discopop_library/discopop_optimizer/classes/system/devices/Device.py index 8cfc60491..9be7ec5af 100644 --- a/discopop_library/discopop_optimizer/classes/system/devices/Device.py +++ b/discopop_library/discopop_optimizer/classes/system/devices/Device.py @@ -5,7 +5,7 @@ # This software may be modified and distributed under the terms of # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. -from typing import Tuple, List, Optional, cast +from typing import Dict, Tuple, List, Optional, cast from sympy import Expr, Float, Symbol from sympy import Integer @@ -19,6 +19,7 @@ class Device(object): __thread_count: Expr openmp_device_id: int __speedup: float # comapred to sequential execution + __compute_init_delays: Dict[str, float] def __init__( self, @@ -27,18 +28,23 @@ def __init__( openmp_device_id: int, device_specific_compiler_flags: str, speedup: float, + compute_init_delays: Dict[str, float], ): self.__frequency = frequency self.__thread_count = thread_count self.openmp_device_id = openmp_device_id self.device_specific_compiler_flags: str = device_specific_compiler_flags self.__speedup = speedup + self.__compute_init_delays = compute_init_delays def get_device_specific_pattern_info( self, suggestion: PatternInfo, suggestion_type: str ) -> Tuple[PatternInfo, str]: return suggestion, suggestion_type + def get_compute_init_delays(self) -> Dict[str, float]: + return self.__compute_init_delays + def get_compute_capability(self) -> Expr: return self.__frequency diff --git a/discopop_library/discopop_optimizer/classes/system/devices/GPU.py b/discopop_library/discopop_optimizer/classes/system/devices/GPU.py index 08dc0d4ba..f515b2ed8 100644 --- a/discopop_library/discopop_optimizer/classes/system/devices/GPU.py +++ b/discopop_library/discopop_optimizer/classes/system/devices/GPU.py @@ -5,7 +5,7 @@ # This software may be modified and distributed under the terms of # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. -from typing import Tuple +from typing import Dict, Tuple from discopop_explorer.pattern_detectors.PatternInfo import PatternInfo from discopop_library.discopop_optimizer.classes.system.devices.Device import Device @@ -13,8 +13,18 @@ class GPU(Device): - def __init__(self, frequency, thread_count, openmp_device_id, device_specific_compiler_flags, speedup: float): - super().__init__(frequency, thread_count, openmp_device_id, device_specific_compiler_flags, speedup) + def __init__( + self, + frequency, + thread_count, + openmp_device_id, + device_specific_compiler_flags, + speedup: float, + compute_init_delays: Dict[str, float], + ): + super().__init__( + frequency, thread_count, openmp_device_id, device_specific_compiler_flags, speedup, compute_init_delays + ) def get_device_specific_pattern_info( self, suggestion: PatternInfo, suggestion_type: str diff --git a/discopop_library/discopop_optimizer/classes/system/system_utils.py b/discopop_library/discopop_optimizer/classes/system/system_utils.py index 56bdc5a16..fc76c9899 100644 --- a/discopop_library/discopop_optimizer/classes/system/system_utils.py +++ b/discopop_library/discopop_optimizer/classes/system/system_utils.py @@ -32,10 +32,11 @@ def generate_default_system_configuration(file_path: str): "processors": 16, "threads": 16, "speedup": 9.5, + "compute_init_delays[us]": {"doall": 300}, } # configure gpu_1 gpu_1: Dict[str, Union[float, int, Dict[str, float]]] = { - "compute_init_delays": {"target_teams_distribute_parallel_for": 0.01}, + "compute_init_delays[us]": {"target_teams_distribute_parallel_for": 1000}, "device_id": 1, "device_type": DeviceTypeEnum.GPU, "frequency": 256000000, @@ -53,7 +54,7 @@ def generate_default_system_configuration(file_path: str): } # configure gpu_2 gpu_2: Dict[str, Union[float, int, Dict[str, float]]] = { - "compute_init_delays": {"target_teams_distribute_parallel_for": 0.005}, + "compute_init_delays[us]": {"target_teams_distribute_parallel_for": 5000}, "device_id": 2, "device_type": DeviceTypeEnum.GPU, "frequency": 128000000, diff --git a/discopop_library/discopop_optimizer/optimization/evaluate.py b/discopop_library/discopop_optimizer/optimization/evaluate.py index 6833006e6..fde49c703 100644 --- a/discopop_library/discopop_optimizer/optimization/evaluate.py +++ b/discopop_library/discopop_optimizer/optimization/evaluate.py @@ -19,6 +19,7 @@ get_performance_models_for_functions, ) from discopop_library.discopop_optimizer.DataTransfers.DataTransfers import calculate_data_transfers +from discopop_library.discopop_optimizer.DataTransfers.NewDataTransfers import calculate_data_transfers_by_models from discopop_library.discopop_optimizer.Variables.Experiment import Experiment from discopop_library.discopop_optimizer.classes.context.ContextObject import ContextObject @@ -51,7 +52,7 @@ def evaluate_configuration( function_performance_models_without_context = get_performance_models_for_functions( experiment, experiment.optimization_graph, restrict_to_decisions=set(decisions) ) - function_performance_models = calculate_data_transfers( + function_performance_models = calculate_data_transfers_by_models( experiment.optimization_graph, function_performance_models_without_context, experiment ) function_performance_models = add_data_transfer_costs( diff --git a/discopop_library/discopop_optimizer/optimization/greedy.py b/discopop_library/discopop_optimizer/optimization/greedy.py index 62a351307..74bdb4284 100644 --- a/discopop_library/discopop_optimizer/optimization/greedy.py +++ b/discopop_library/discopop_optimizer/optimization/greedy.py @@ -7,6 +7,7 @@ # directory for details. import copy import json +import logging from multiprocessing import Pool import os from typing import Dict, List, Optional, Set, Tuple, cast @@ -27,6 +28,7 @@ from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern +logger = logging.getLogger("Optimizer") global_experiment = None global_arguments = None @@ -108,13 +110,31 @@ def greedy_search( for local_result in tmp_result: # remove invalid elements - if local_result[1] == -1: + if local_result[1] < 0: continue local_results.append(local_result) # identify best option and update made_decisions best_option: Optional[Tuple[Dict[int, List[List[int]]], int, ContextObject]] = None for k, e, c in local_results: + dbg_decisions_string = "" + for key in k: + dbg_decisions_string += str(key) + "(" + for l in k[key]: + dbg_decisions_string += "[ " + for entry2 in l: + dbg_decisions_string += str(entry2) + entry_data = data_at(global_experiment.optimization_graph, entry2) + + if entry_data.device_id != global_experiment.get_system().get_host_device_id(): + dbg_decisions_string += "@" + str(entry_data.device_id) + if entry_data.represents_sequential_version(): + dbg_decisions_string += "(seq)" + dbg_decisions_string += ", " + dbg_decisions_string += "] " + dbg_decisions_string += ") " + + logger.info("LocalResult: " + dbg_decisions_string + " -> " + str(e)) if best_option is None: best_option = (k, e, c) continue @@ -203,8 +223,16 @@ def __get_optimizer_output_pattern( best_configuration.add_pattern( pattern_id, device_id, experiment.get_system().get_device(device_id).get_device_type() ) + best_configuration.decisions.append(node_id) if best_configuration is None: - return None + # best configuration is sequential + return OptimizerOutputPattern( + experiment.detection_result.pet.node_at( + cast(NodeID, data_at(experiment.optimization_graph, selection[0]).original_cu_id) + ), + [], + experiment.get_system().get_host_device_id(), + ) # collect data movement information for update in context.necessary_updates: best_configuration.add_data_movement(update) diff --git a/discopop_library/discopop_optimizer/optimizer.py b/discopop_library/discopop_optimizer/optimizer.py index a26041d08..26bd05a56 100644 --- a/discopop_library/discopop_optimizer/optimizer.py +++ b/discopop_library/discopop_optimizer/optimizer.py @@ -25,6 +25,7 @@ from discopop_library.PathManagement.PathManagement import load_file_mapping from discopop_library.discopop_optimizer.CostModels.DataTransfer.DataTransferCosts import add_data_transfer_costs from discopop_library.discopop_optimizer.CostModels.utilities import get_performance_models_for_functions +from discopop_library.discopop_optimizer.DataTransfers.NewDataTransfers import new_calculate_data_transfers from discopop_library.discopop_optimizer.DataTransfers.calculate_configuration_data_movement import ( calculate_data_movement, ) @@ -169,9 +170,20 @@ def run(arguments: OptimizerArguments): # construct and set overhead model for doall suggestions system.set_device_doall_overhead_model( system.get_device(system.get_host_device_id()), - ExtrapInterpolatedMicrobench(arguments.doall_microbench_file).getFunctionSympy(), + ExtrapInterpolatedMicrobench(arguments.doall_microbench_file).getFunctionSympy( + benchType=MicrobenchType.DOALL + ), + arguments, + ) + # construct and set overhead model for doall suggestions with shaed clause + system.set_device_doall_shared_overhead_model( + system.get_device(system.get_host_device_id()), + ExtrapInterpolatedMicrobench(arguments.doall_microbench_file).getFunctionSympy( + benchType=MicrobenchType.SHARED + ), arguments, ) + if arguments.reduction_microbench_file != "None": # construct and set overhead model for reduction suggestions system.set_reduction_overhead_model( @@ -205,10 +217,10 @@ def run(arguments: OptimizerArguments): experiment.optimization_graph = optimize_suggestions(experiment) # insert device switch nodes - experiment.optimization_graph = insert_device_switch_nodes(experiment) + # experiment.optimization_graph = insert_device_switch_nodes(experiment) if arguments.plot: - show(experiment.optimization_graph, show_dataflow=False, show_mutex_edges=False) + show(experiment.optimization_graph, show_dataflow=True, show_mutex_edges=False) if arguments.verbose: print("# SUGGESTION ID -> NODE ID MAPPING") @@ -260,7 +272,16 @@ def run(arguments: OptimizerArguments): else: raise ValueError("No valid optimization method specified: " + str(arguments.optimization_level)) + print("BEST CONFIGURATION: ", best_configuration) + if best_configuration is not None: + # calculate updates for best_configuration + updates = new_calculate_data_transfers(experiment.optimization_graph, best_configuration.decisions, experiment) + # register updates + best_configuration.data_movement = [] + for u in updates: + best_configuration.add_data_movement(u) + best_configuration = optimize_updates(experiment, best_configuration, arguments) # append the configuration to the list of patterns experiment.detection_result.patterns.optimizer_output.append(best_configuration) diff --git a/discopop_library/discopop_optimizer/suggestions/importers/do_all.py b/discopop_library/discopop_optimizer/suggestions/importers/do_all.py index acedbd266..8a61d23ef 100644 --- a/discopop_library/discopop_optimizer/suggestions/importers/do_all.py +++ b/discopop_library/discopop_optimizer/suggestions/importers/do_all.py @@ -6,6 +6,7 @@ # the 3-Clause BSD License. See the LICENSE file in the package base # directory for details. import copy +import logging from typing import Set, cast, Tuple, List, Dict import networkx as nx # type: ignore @@ -24,11 +25,13 @@ from discopop_library.discopop_optimizer.classes.nodes.Loop import Loop from discopop_library.discopop_optimizer.classes.nodes.Workload import Workload from discopop_library.discopop_optimizer.classes.system.devices.CPU import CPU +from discopop_library.discopop_optimizer.classes.system.devices.DeviceTypeEnum import DeviceTypeEnum from discopop_library.discopop_optimizer.classes.system.devices.GPU import GPU from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern suggestion_device_types = [CPU, GPU] +logger = logging.getLogger("Optimizer") def import_suggestion( @@ -165,10 +168,46 @@ def get_overhead_term(node_data: Loop, environment: Experiment, device_id: int) For testing purposes, the following function is used to represent the overhead incurred by a do-all loop. The function has been created using Extra-P. unit of the overhead term are micro seconds.""" - # retrieve DoAll overhead model - overhead_model = environment.get_system().get_device_doall_overhead_model( - environment.get_system().get_device(device_id), environment.arguments - ) + ci_costs = environment.get_system().get_device(device_id).get_compute_init_delays() + + # get overhead model + if environment.get_system().get_device(node_data.device_id).get_device_type() == DeviceTypeEnum.CPU: + # device is CPU + if len(cast(DoAllInfo, node_data.suggestion).shared) == 0: + # retrieve DoAll overhead model + overhead_model = environment.get_system().get_device_doall_overhead_model( + environment.get_system().get_device(device_id), environment.arguments + ) + logger.info("Loop: " + str(node_data.node_id) + " is DOALL") + else: + # retrieve DoAll shared overhead model + overhead_model = environment.get_system().get_device_doall_shared_overhead_model( + environment.get_system().get_device(device_id), environment.arguments + ) + logger.info("Loop: " + str(node_data.node_id) + " is DOALL SHARED") + # add computation initialization costs (technically duplicated, but required for low workloads) + if "doall" in ci_costs: + overhead_model += Float(ci_costs["doall"]) + logger.debug( + "Added doall compute init delay: " + str(ci_costs["doall"]) + " to node: " + str(node_data.node_id) + ) + else: + logger.debug("Could not find compute init delays for node: " + str(node_data.node_id)) + else: + # device is GPU + overhead_model = Integer(0) + # add computation initialization costs (technically duplicated, but required for low workloads) + if "target_teams_distribute_parallel_for" in ci_costs: + overhead_model += Float(ci_costs["target_teams_distribute_parallel_for"]) + logger.debug( + "Added ttdpf compute init delay: " + + str(ci_costs["target_teams_distribute_parallel_for"]) + + " to node: " + + str(node_data.node_id) + ) + else: + logger.debug("Could not find compute init delays for node: " + str(node_data.node_id)) + # substitute workload, iterations and threads thread_count = environment.get_system().get_device(device_id).get_thread_count() iterations = node_data.iterations_symbol diff --git a/discopop_library/discopop_optimizer/utilities/MOGUtilities.py b/discopop_library/discopop_optimizer/utilities/MOGUtilities.py index 8bcf8dba5..802315464 100644 --- a/discopop_library/discopop_optimizer/utilities/MOGUtilities.py +++ b/discopop_library/discopop_optimizer/utilities/MOGUtilities.py @@ -127,6 +127,46 @@ def get_function_return_node(graph: nx.DiGraph, function: int) -> int: ) +def show_decision_graph( + graph: nx.DiGraph, decisions: List[int], show_dataflow: bool = True, show_mutex_edges: bool = False +): + print("Decisions: ", decisions) + contained_nodes: Set[int] = set() + for function in get_all_function_nodes(graph): + # get nodes in subtree of function, if they are reachable by the set of decisions + queue: List[int] = [data_at(graph, function).node_id] + while len(queue) > 0: + current = queue.pop() + contained_nodes.add(current) + queue += [c for c in get_children(graph, current) if c not in queue and c not in contained_nodes] + successors = get_successors(graph, current) + + if len(successors) <= 1: + # no decision required + queue += [s for s in successors if s not in queue and s not in contained_nodes] + else: + # filter for successors in decisions + print("SUCCESSORS: ", successors) + valid_successors = [s for s in successors if s in decisions] + # check for requirements, if no valid successor was found + if len(valid_successors) == 0: + requirements = [] + for dec in decisions: + requirements += get_requirements(graph, dec) + valid_successors = [s for s in successors if s in requirements] + print("REQUIREMENT SUCCESSORS: ", valid_successors) + print("VALID SUCCESSORS: ", valid_successors) + # if no decision could be made, keep all sequential successors + if len(valid_successors) > 0: + queue += [s for s in valid_successors if s not in queue and s not in contained_nodes] + else: + # fallback + queue += [s for s in successors if data_at(graph, s).represents_sequential_version()] + + # show the subgraph + show(graph.subgraph(contained_nodes), show_dataflow=show_dataflow, show_mutex_edges=show_mutex_edges) + + def show_function(graph: nx.DiGraph, function: FunctionRoot, show_dataflow: bool = True, show_mutex_edges: bool = True): # get nodes in subtree of function contained_nodes: Set[int] = set()