Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizer: check for update necessity at data frame exits #552

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ def perform_read(self, node_id: int, device_id: DeviceID, rda: ReadDataAccess) -
if most_recent_write is None:
most_recent_write = (dvid, self.memory[dvid][rda.memory_region])
else:
if self.memory[dvid][rda.memory_region].unique_id > most_recent_write[1].unique_id:
if self.memory[dvid][rda.memory_region].from_call:
most_recent_write = (dvid, self.memory[dvid][rda.memory_region])

elif (
self.memory[dvid][rda.memory_region].unique_id > most_recent_write[1].unique_id
and not most_recent_write[1].from_call
):
most_recent_write = (dvid, self.memory[dvid][rda.memory_region])

# check for update necessity
Expand Down Expand Up @@ -131,6 +137,35 @@ def perform_read(self, node_id: int, device_id: DeviceID, rda: ReadDataAccess) -
)
)

elif most_recent_write[1].from_call:
# force a synchronization
predecessors = get_predecessors(self.graph, node_id)
if len(predecessors) == 0:
predecessor = node_id
else:
predecessor = predecessors[0]

# check if the update creates the memory on the device
if device_id in self.memory:
if rda.memory_region in self.memory[device_id]:
is_first_data_occurrence = False
else:
is_first_data_occurrence = True
else:
is_first_data_occurrence = True

updates.append(
Update(
predecessor,
node_id,
most_recent_write[0],
device_id,
most_recent_write[1],
is_first_data_occurrence=is_first_data_occurrence,
source_cu_id=data_at(self.graph, predecessor).original_cu_id,
target_cu_id=data_at(self.graph, node_id).original_cu_id,
)
)
else:
# no update necessary, most recent data on the device
pass
Expand Down Expand Up @@ -183,7 +218,24 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe
queue += [p for p in get_predecessors(experiment.optimization_graph, current) if p not in queue]
for device_id in self.entered_data_regions_by_device:
for wda in self.entered_data_regions_by_device[device_id]:
# issue delete updates
# check if a delete or copy delete should be issued
issue_copy_delete = False
issue_delete = False

if wda.memory_region not in memory.memory[experiment.get_system().get_host_device_id()]:
issue_copy_delete = True
else:
if (
memory.memory[device_id][wda.memory_region].unique_id
> memory.memory[experiment.get_system().get_host_device_id()][wda.memory_region].unique_id
):
# device has a more recent memory state than the host
issue_copy_delete = True
else:
# host has a more recent memory state than the device
issue_delete = True

# issue delete update
updates.append(
Update(
node_id,
Expand All @@ -194,10 +246,11 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe
False,
last_cu_id,
last_cu_id,
delete_data=True,
delete_data=issue_delete,
copy_delete_data=issue_copy_delete,
)
)
# updates += memory.perform_read(node_id, experiment.get_system().get_host_device_id(), cast(ReadDataAccess, wda))

# cleanup memory
del memory.memory[device_id][wda.memory_region]

Expand All @@ -218,23 +271,27 @@ def cleanup_dataframe(self, node_id: int, memory: DeviceMemory, experiment: Expe
copy_exit_update: Optional[Update] = None
delete_exit_updates: List[Update] = []
for update in updates_by_mem_reg[mem_reg]:
if not update.copy_delete_data:
# data should only be deleted.
delete_exit_updates.append(update)
continue
# check for the most recent memory state
if copy_exit_update is None:
copy_exit_update = update
continue
# check for the most recent memory state
if update.write_data_access.unique_id > copy_exit_update.write_data_access.unique_id:
# found a more recent memory state
delete_exit_updates.append(copy_exit_update)
copy_exit_update = update
else:
# update is older than copy_exit_update, add it to the list of deletions
delete_exit_updates.append(update)
if copy_exit_update is None:
raise ValueError("copy_exit_update is NONE. Something went wrong here.")

# -> collect the updates and set the delete_data and copy_delete_data properties
copy_exit_update.copy_delete_data = True
copy_exit_update.delete_data = False
refined_updates.append(copy_exit_update)
if copy_exit_update is not None:
copy_exit_update.copy_delete_data = True
copy_exit_update.delete_data = False
refined_updates.append(copy_exit_update)
for update in delete_exit_updates:
update.copy_delete_data = False
update.delete_data = True
Expand Down
36 changes: 36 additions & 0 deletions discopop_library/discopop_optimizer/PETParser/PETParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from discopop_library.discopop_optimizer.classes.nodes.GenericNode import GenericNode
from discopop_library.discopop_optimizer.classes.nodes.Loop import Loop
from discopop_library.discopop_optimizer.classes.nodes.Workload import Workload
from discopop_library.discopop_optimizer.classes.types.DataAccessType import ReadDataAccess, WriteDataAccess
from discopop_library.discopop_optimizer.utilities.MOGUtilities import (
add_call_edge,
add_dataflow_edge,
Expand Down Expand Up @@ -153,6 +154,12 @@ def parse(self) -> Tuple[nx.DiGraph, int]:
# add calling edges
self.__add_calling_edges()

logger.info("Inlining read and write information from function calls..")
# note: inlining at this position is a cheap but less reliable alternative to propagating everythin on function calls.
# note: full propagation leads to issues with WriteDataAccess unique ids and potentially broken results
self.__inline_reads_and_writes_from_call()
logger.info("Inlining read and write information from function calls done.")

if self.experiment.arguments.pin_function_calls_to_host:
self.__pin_function_calls_to_host()

Expand Down Expand Up @@ -1434,3 +1441,32 @@ def __propagate_reads_and_writes(self):
for p in parents:
data_at(self.graph, p).written_memory_regions.update(current_data.written_memory_regions)
data_at(self.graph, p).read_memory_regions.update(current_data.read_memory_regions)

def __inline_reads_and_writes_from_call(self):
for node in self.graph.nodes:
called = get_out_call_edges(self.graph, node)
if len(called) == 0:
continue
logger.info("Node: " + str(node) + " called: " + str(called))
# identify reads and writes in called function
node_data = data_at(self.graph, node)
for function_id in called:
called_function_data = data_at(self.graph, function_id)
logger.info("--> called function: " + cast(FunctionRoot, called_function_data).name)
function_read_memory_regions: Dict[MemoryRegion, ReadDataAccess] = dict()
function_written_memory_regions: Dict[MemoryRegion, WriteDataAccess] = dict()
for rda in called_function_data.read_memory_regions:
if rda.memory_region not in function_read_memory_regions:
function_read_memory_regions[rda.memory_region] = rda
for wda in called_function_data.written_memory_regions:
if wda.memory_region not in function_written_memory_regions:
function_written_memory_regions[wda.memory_region] = wda
# append the gathered reads and writes to the calling node
for read_mem_reg in function_read_memory_regions:
call_rda = copy.deepcopy(function_read_memory_regions[read_mem_reg])
call_rda.from_call = True
node_data.read_memory_regions.add(call_rda)
for written_mem_reg in function_written_memory_regions:
call_wda = copy.deepcopy(function_written_memory_regions[written_mem_reg])
call_wda.from_call = True
node_data.written_memory_regions.add(call_wda)
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,37 @@
class ReadDataAccess(object):
memory_region: MemoryRegion
var_name: Optional[str]
from_call: bool

def __init__(self, memory_region: MemoryRegion, var_name: Optional[str]):
def __init__(self, memory_region: MemoryRegion, var_name: Optional[str], from_call: bool = False):
self.memory_region = memory_region
self.var_name = var_name
self.from_call = from_call

def __str__(self):
return "R(" + self.memory_region + ")"
return_str = ""
return_str += "C" if self.from_call else ""
return_str += "R(" + self.memory_region + ")"
return return_str


class WriteDataAccess(object):
memory_region: MemoryRegion
unique_id: int
var_name: Optional[str]
from_call: bool

def __init__(self, memory_region: MemoryRegion, unique_id: int, var_name: Optional[str]):
def __init__(self, memory_region: MemoryRegion, unique_id: int, var_name: Optional[str], from_call: bool = False):
self.memory_region = memory_region
self.unique_id = unique_id
self.var_name = var_name
self.from_call = from_call

def __str__(self):
return "W(" + self.memory_region + "-" + str(self.unique_id) + ", --> " + self.var_name + ")"
return_str = ""
return_str += "C" if self.from_call else ""
return_str += "W(" + self.memory_region + "-" + str(self.unique_id) + ", --> " + self.var_name + ")"
return return_str

def __hash__(self):
return self.unique_id
Expand All @@ -43,8 +53,9 @@ def toDict(self):
result_dict["memory_region"] = self.memory_region
result_dict["unique_id"] = self.unique_id
result_dict["var_name"] = self.var_name
result_dict["from_call"] = self.from_call
return result_dict


def write_data_access_from_dict(values: Dict[str, Any]) -> WriteDataAccess:
return WriteDataAccess(values["memory_region"], values["unique_id"], values["var_name"])
return WriteDataAccess(values["memory_region"], values["unique_id"], values["var_name"], values["from_call"])
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,32 @@ def __create_optimizer_output_pattern(
experiment.get_system().get_host_device_id(),
experiment,
)
logger.debug("Initialized OptimizerOutputPattern based on " + str(suggestion_id))
logger.info("Initialized OptimizerOutputPattern based on " + str(suggestion_id))

pattern_obj = experiment.detection_result.patterns.get_pattern_from_id(suggestion_id)
if "device_id" in pattern_obj.__dict__:
device_id = pattern_obj.__dict__["device_id"]
else:
device_id = experiment.get_system().get_host_device_id()
output_pattern.add_pattern(
pattern_obj.pattern_id, device_id, experiment.get_system().get_device(device_id).get_device_type()
)
logger.debug("Added pattern : " + str(suggestion_id))

# unpack OptimizerOutputPattern if necessary
if type(pattern_obj) == OptimizerOutputPattern:
logger.info("OptimizerOutputPattern found!")
for contained_pattern in pattern_obj.applied_patterns:
logger.info("--> " + str(contained_pattern))
output_pattern.add_pattern(
contained_pattern["pattern_id"], contained_pattern["device_id"], contained_pattern["device_type"]
)
logger.info("Added sub-pattern : " + str(contained_pattern["pattern_id"]))
logger.info("Inherited decisions from pattern_obj: " + str(pattern_obj.decisions))
output_pattern.decisions += pattern_obj.decisions

else:
# regular pattern type
output_pattern.add_pattern(
pattern_obj.pattern_id, device_id, experiment.get_system().get_device(device_id).get_device_type()
)
logger.info("Added pattern : " + str(suggestion_id))

if output_pattern is None:
return None
Expand Down
7 changes: 4 additions & 3 deletions discopop_library/result_classes/OptimizerOutputPattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def get_contained_decisions(self, experiment: Experiment) -> List[int]:
if d not in decision_list:
decision_list.append(d)
for tmp_dict in self.applied_patterns:
for d in experiment.pattern_id_to_decisions_dict[tmp_dict["pattern_id"]]:
if d not in decision_list:
decision_list.append(d)
if tmp_dict["pattern_id"] in experiment.pattern_id_to_decisions_dict:
for d in experiment.pattern_id_to_decisions_dict[tmp_dict["pattern_id"]]:
if d not in decision_list:
decision_list.append(d)
return decision_list
Loading