Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Optimizer: Update calculation improved #549

Merged
merged 14 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions discopop_library/CodeGenerator/classes/UnpackedSuggestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __get_device_update_pragmas(self):
openmp_source_device_id = self.values["openmp_source_device_id"]
openmp_target_device_id = self.values["openmp_target_device_id"]
range: Optional[Tuple[int, int]] = self.values["range"]
print("IS FIRST DATA OCCURRENCE?: ", is_first_data_occurrence)
delete_data: bool = self.values["delete_data"]

def get_range_str(r):
return "" if r is None else "[" + str(r[0]) + ":" + str(r[1]) + "]"
Expand All @@ -91,7 +91,7 @@ def get_range_str(r):

elif source_device_id != self.host_device_id and target_device_id == self.host_device_id:
# update type from
if is_first_data_occurrence:
if delete_data:
pragma.pragma_str = "#pragma omp target exit data map(from:"
else:
pragma.pragma_str = "#pragma omp target update from("
Expand Down
398 changes: 398 additions & 0 deletions discopop_library/discopop_optimizer/DataTransfers/NewDataTransfers.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from discopop_library.discopop_optimizer.CostModels.DataTransfer.DataTransferCosts import add_data_transfer_costs
from discopop_library.discopop_optimizer.CostModels.utilities import get_performance_models_for_functions
from discopop_library.discopop_optimizer.DataTransfers.DataTransfers import calculate_data_transfers
from discopop_library.discopop_optimizer.DataTransfers.NewDataTransfers import new_calculate_data_transfers
from discopop_library.discopop_optimizer.UpdateOptimization.main import optimize_updates
from discopop_library.discopop_optimizer.Variables.Experiment import Experiment
from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern
Expand All @@ -31,19 +32,23 @@ def calculate_data_movement(experiment: Experiment):
len(experiment.detection_result.patterns.optimizer_output),
)
# calculate necessary updates
function_performance_models_without_context = get_performance_models_for_functions(
experiment, experiment.optimization_graph, restrict_to_decisions=set(oo_suggestion.decisions)
)
# function_performance_models_without_context = get_performance_models_for_functions(
# experiment, experiment.optimization_graph, restrict_to_decisions=set(oo_suggestion.decisions)
# )

function_performance_models = calculate_data_transfers(
experiment.optimization_graph, function_performance_models_without_context, experiment
)
# function_performance_models = calculate_data_transfers(
# experiment.optimization_graph, function_performance_models_without_context, experiment
# )

updates = new_calculate_data_transfers(experiment.optimization_graph, oo_suggestion.decisions, experiment)
for update in updates:
oo_suggestion.add_data_movement(update)

# collect necessary updates
for function in function_performance_models:
for cost_model, context in function_performance_models[function]:
for update in context.necessary_updates:
oo_suggestion.add_data_movement(update)
# # collect necessary updates
# for function in function_performance_models:
# for cost_model, context in function_performance_models[function]:
# for update in context.necessary_updates:
# oo_suggestion.add_data_movement(update)

# optimize updates
oo_suggestion = optimize_updates(experiment, oo_suggestion, experiment.arguments)
1 change: 1 addition & 0 deletions discopop_library/discopop_optimizer/OptimizerArguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OptimizerArguments(GeneralArguments):
optimization_level: int
optimization_level_2_parameters: str
single_suggestions: bool
pin_function_calls_to_host: bool

def __post_init__(self):
self.__validate()
Expand Down
57 changes: 49 additions & 8 deletions discopop_library/discopop_optimizer/PETParser/PETParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# the 3-Clause BSD License. See the LICENSE file in the package base
# directory for details.
import copy
import logging
import os
import pstats
from typing import Dict, List, Optional, Tuple, Set, cast
Expand Down Expand Up @@ -45,6 +46,7 @@
get_all_nodes_in_function,
get_all_parents,
get_nodes_by_functions,
get_out_call_edges,
get_parent_function,
get_parents,
get_path_entry,
Expand All @@ -65,6 +67,8 @@
from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at
from time import time

logger = logging.getLogger("Optimizer")


class PETParser(object):
pet: PEGraphX
Expand Down Expand Up @@ -123,18 +127,23 @@ def parse(self) -> Tuple[nx.DiGraph, int]:
if self.experiment.arguments.verbose:
print("converted temporary edges")

# self.__mark_branch_affiliation()
# print("marked branch affiliations")
self.__calculate_data_flow()
if self.experiment.arguments.verbose:
print("calculated data flow")

if self.experiment.arguments.verbose:
print("Propagating read/write information...")
self.__propagate_reads_and_writes()
if self.experiment.arguments.verbose:
print("Propagated read/write information")

# self.__mark_branch_affiliation()
# print("marked branch affiliations")
# show(self.graph)
# self.__calculate_data_flow()
# self.__new_calculate_data_flow()
# if self.experiment.arguments.verbose:
# print("calculated data flow")
# show(self.graph)
# import sys
# sys.exit(0)

# remove invalid functions
self.__remove_invalid_functions()

Expand All @@ -144,6 +153,9 @@ def parse(self) -> Tuple[nx.DiGraph, int]:
# add calling edges
self.__add_calling_edges()

if self.experiment.arguments.pin_function_calls_to_host:
self.__pin_function_calls_to_host()

return self.graph, self.next_free_node_id

def get_new_node_id(self) -> int:
Expand All @@ -152,6 +164,21 @@ def get_new_node_id(self) -> int:
self.next_free_node_id += 1
return buffer

def __pin_function_calls_to_host(self):
host_device_id = self.experiment.get_system().get_host_device_id()
logger.info("Pinning functions and function calls to host device: " + str(host_device_id))
for node in get_all_function_nodes(self.graph):
node_data = data_at(self.graph, node)
if node_data.device_id != host_device_id:
logger.info("\tPinning function node: " + str(node))
data_at(self.graph, node).device_id = host_device_id
for node in self.graph.nodes:
node_data = data_at(self.graph, node)
if node_data.device_id != host_device_id:
if len(get_out_call_edges(self.graph, node)) > 0:
logger.info("\tPinning calling node: " + str(node))
data_at(self.graph, node).device_id = host_device_id

def __add_calling_edges(self):
all_function_nodes = get_all_function_nodes(self.graph)

Expand Down Expand Up @@ -1293,11 +1320,25 @@ def mark_branched_section(node, branch_stack):
current_node = get_children(self.graph, function_node)[0]
mark_branched_section(current_node, [])

# def __new_calculate_data_flow(self):
# """calculate dataflow in such a way, that no data is left on any device but the host and every relevant change is synchronized."""
# self.in_data_flow = dict()
# self.out_data_flow = dict()
#
# data_transactions= dict() # stores created and removed data for unrolling when leaving a "frame"
# # Dict[device_id, List[("enter/exit", memreg)]]
#
# data_frame_stack: List[int] = [] # stores node ids which opened a data frame
#
# for function in get_all_function_nodes(self.graph):
# logger.info("calculate data flow for function: " + data_at(self.graph, function).name)

def __calculate_data_flow(self):
self.in_data_flow = dict()
self.out_data_flow = dict()

def inlined_data_flow_calculation(current_node, current_last_writes):
# TODO add entering and exiting data frames to support resetting at end of a child section
while current_node is not None:
# check if current_node uses written data
reads, writes = get_read_and_written_data_from_subgraph(
Expand All @@ -1320,9 +1361,9 @@ def inlined_data_flow_calculation(current_node, current_last_writes):
for mem_reg in writes:
current_last_writes[mem_reg] = current_node

# inline children
## start data_flow calculation for children
for child in get_children(self.graph, current_node):
current_last_writes = inlined_data_flow_calculation(child, current_last_writes)
_current_last_writes = inlined_data_flow_calculation(child, current_last_writes)

# continue to successor
successors = get_successors(self.graph, current_node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@
# the 3-Clause BSD License. See the LICENSE file in the package base
# directory for details.

from typing import List, cast
from discopop_explorer.PEGraphX import LoopNode
import logging
from typing import List, Set, cast
from discopop_explorer.PEGraphX import LoopNode, MemoryRegion
from discopop_library.discopop_optimizer.OptimizerArguments import OptimizerArguments
from discopop_library.discopop_optimizer.Variables.Experiment import Experiment
from discopop_library.discopop_optimizer.classes.context.Update import Update
from discopop_library.discopop_optimizer.classes.nodes.Loop import Loop
from discopop_library.discopop_optimizer.utilities.simple_utilities import data_at
from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern

logger = logging.getLogger("Optimizer")


def remove_loop_index_updates(
experiment: Experiment, best_configuration: OptimizerOutputPattern, arguments: OptimizerArguments
) -> OptimizerOutputPattern:
mem_reg_blacklist: Set[MemoryRegion] = set()
to_be_removed: List[Update] = []
for update in best_configuration.data_movement:
# check for loop nodes as update targets
Expand All @@ -35,10 +39,19 @@ def remove_loop_index_updates(
# check for loop indices as targeted varbiables
if update.write_data_access.var_name in loop_indices:
# found update to loop index
if arguments.verbose:
print("ignoring update: ", str(update), " , since it targets a loop index.")
to_be_removed.append(update)
logger.info(
"# ignoring updates to mem_reg: "
+ str(update.write_data_access.memory_region)
+ " name: "
+ update.write_data_access.var_name
+ " since it targets a loop index."
)
mem_reg_blacklist.add(update.write_data_access.memory_region)
# remove identified loop index updates
for update in best_configuration.data_movement:
if update.write_data_access.memory_region in mem_reg_blacklist:
to_be_removed.append(update)

for tbr in to_be_removed:
if tbr in best_configuration.data_movement:
best_configuration.data_movement.remove(tbr)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is part of the DiscoPoP software (http://www.discopop.tu-darmstadt.de)
#
# Copyright (c) 2020, Technische Universitaet Darmstadt, Germany
#
# This software may be modified and distributed under the terms of
# the 3-Clause BSD License. See the LICENSE file in the package base
# directory for details.

from typing import List
from discopop_library.discopop_optimizer.OptimizerArguments import OptimizerArguments
from discopop_library.discopop_optimizer.Variables.Experiment import Experiment
from discopop_library.discopop_optimizer.classes.context.Update import Update
from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern


def remove_same_device_updates(
experiment: Experiment, configuration: OptimizerOutputPattern, arguments: OptimizerArguments
) -> OptimizerOutputPattern:
"""Remove updates where source and device id are equal. These are artifacts from initializing device memory states during the data movement calculation."""
cleaned_updates: List[Update] = []
for update in configuration.data_movement:
if update.source_device_id == update.target_device_id:
# ignore this update
continue
else:
cleaned_updates.append(update)
configuration.data_movement = cleaned_updates

if arguments.verbose:
print("Removed same device updates")
for update in configuration.data_movement:
print("# ", update)
print()
return configuration
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from discopop_library.discopop_optimizer.UpdateOptimization.RemoveDuplicatedUpdates import remove_duplicated_updates
from discopop_library.discopop_optimizer.UpdateOptimization.RemoveLoopIndexUpdates import remove_loop_index_updates
from discopop_library.discopop_optimizer.UpdateOptimization.RemoveSameDeviceUpdates import remove_same_device_updates
from discopop_library.discopop_optimizer.Variables.Experiment import Experiment
from discopop_library.discopop_optimizer.utilities.visualization.update_graph import show_update_graph
from discopop_library.result_classes.OptimizerOutputPattern import OptimizerOutputPattern
Expand All @@ -33,14 +34,17 @@ def optimize_updates(
print("# ", update)
print()

# remove same-device updates
configuration = remove_same_device_updates(experiment, configuration, arguments)

# optimize updates
configuration = fix_loop_initialization_updates(experiment, configuration, arguments)

# remove duplicated updates
configuration = remove_duplicated_updates(configuration, arguments)

# remove loop index updates
configuration = remove_loop_index_updates(experiment, configuration, arguments)
# configuration = remove_loop_index_updates(experiment, configuration, arguments)

# add ranges to be transferred to the updates
configuration = add_ranges_to_updates(experiment, configuration, arguments)
Expand Down
2 changes: 2 additions & 0 deletions discopop_library/discopop_optimizer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def parse_args() -> OptimizerArguments:
)
parser.add_argument("--profiling", action="store_true",
help="Enable profiling.")
parser.add_argument("--pin-function-calls-to-host", action="store_true", help="Force functions calls on the host system. Prevent offloading of entire functions.")
parser.add_argument("--log", type=str, default="WARNING", help="Specify log level: DEBUG, INFO, WARNING, ERROR, CRITICAL")
parser.add_argument("--write-log", action="store_true", help="Create Logfile.")
# EXPERIMENTAL FLAGS:
Expand Down Expand Up @@ -79,6 +80,7 @@ def parse_args() -> OptimizerArguments:
single_suggestions=arguments.single_suggestions,
log_level=arguments.log.upper(),
write_log=arguments.write_log,
pin_function_calls_to_host=arguments.pin_function_calls_to_host,
)


Expand Down
11 changes: 10 additions & 1 deletion discopop_library/discopop_optimizer/classes/context/Update.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from discopop_explorer.PEGraphX import NodeID, PEGraphX
from discopop_library.discopop_optimizer.classes.types.Aliases import DeviceID
from discopop_library.discopop_optimizer.classes.types.DataAccessType import (
ReadDataAccess,
WriteDataAccess,
write_data_access_from_dict,
)
Expand All @@ -26,6 +27,7 @@ class Update(object):
source_cu_id: Optional[NodeID]
target_cu_id: Optional[NodeID]
range: Optional[Tuple[int, int]]
delete_data: bool

def __init__(
self,
Expand All @@ -39,6 +41,7 @@ def __init__(
target_cu_id: Optional[NodeID],
originated_from_node: Optional[int] = None,
range: Optional[Tuple[int, int]] = None,
delete_data: bool = False,
):
self.source_node_id = source_node_id
self.target_node_id = target_node_id
Expand All @@ -50,9 +53,12 @@ def __init__(
self.target_cu_id = target_cu_id
self.originated_from_node = originated_from_node
self.range = range
self.delete_data = delete_data

def __str__(self):
result_str = "First" if self.is_first_data_occurrence else ""
result_str = ""
result_str += "IssueDelete " if self.delete_data else ""
result_str += "First" if self.is_first_data_occurrence else ""
return (
result_str
+ "Update("
Expand Down Expand Up @@ -82,6 +88,7 @@ def get_pattern_string(self, pet: PEGraphX) -> str:
result_dict["start_line"] = pet.node_at(cast(NodeID, self.source_cu_id)).start_position()
result_dict["end_line"] = pet.node_at(cast(NodeID, self.target_cu_id)).start_position()
result_dict["range"] = self.range
result_dict["delete_data"] = self.delete_data
return json.dumps(result_dict)

def toDict(self) -> Dict[str, Any]:
Expand All @@ -95,6 +102,7 @@ def toDict(self) -> Dict[str, Any]:
result_dict["source_cu_id"] = self.source_cu_id
result_dict["target_cu_id"] = self.target_cu_id
result_dict["range"] = self.range
result_dict["delete_data"] = self.delete_data
return result_dict


Expand All @@ -109,5 +117,6 @@ def construct_update_from_dict(values: Dict[str, Any]) -> Update:
values["source_cu_id"],
values["target_cu_id"],
range=values["range"],
delete_data=values["delete_data"],
)
return update
4 changes: 4 additions & 0 deletions discopop_library/discopop_optimizer/classes/system/Network.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def add_connection(self, source: Device, target: Device, transfer_speed: Expr, i
self.__transfer_initialization_costs[(source, target)] = initialization_delay

def get_transfer_speed(self, source: Device, target: Device):
if source == target:
return Integer(1000000) # 1000 GB/s
if (source, target) not in self.__transfer_speeds:
if self.__host_device is None:
raise ValueError("Host device of network unspecified!")
Expand All @@ -39,6 +41,8 @@ def get_transfer_speed(self, source: Device, target: Device):
return self.__transfer_speeds[(source, target)]

def get_transfer_initialization_costs(self, source: Device, target: Device):
if source == target:
return Integer(0)
if (source, target) not in self.__transfer_speeds:
if self.__host_device is None:
raise ValueError("Host device of network unspecified!")
Expand Down
Loading
Loading