Skip to content

Commit

Permalink
When performing GPU assignment, keep track of split corpora
Browse files Browse the repository at this point in the history
  • Loading branch information
Waino committed May 27, 2024
1 parent 20bdc2f commit 844f2d2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
12 changes: 7 additions & 5 deletions tools/config_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,13 @@ def allocate_devices(opts):
lp_to_key = defaultdict(list)
for key, tasks_config in opts.in_config[0]['tasks'].items():
src_lang, tgt_lang = tasks_config['src_tgt'].split('-')
offset = tasks_config.get('offset', 0)
ready_to_start = tasks_config.get('introduce_at_training_step', 0) == 0

lang_pairs.append((src_lang, tgt_lang))
lang_pairs.append((src_lang, tgt_lang, offset))
if ready_to_start:
lps_ready_to_start.append((src_lang, tgt_lang))
lp_to_key[(src_lang, tgt_lang)].append(key)
lps_ready_to_start.append((src_lang, tgt_lang, offset))
lp_to_key[(src_lang, tgt_lang, offset)].append(key)

if n_nodes is None and n_slots_per_gpu is None:
raise Exception('You must specify either n_nodes or n_slots_per_gpu')
Expand All @@ -527,13 +528,14 @@ def allocate_devices(opts):
lps_ready_to_start = []
for cname, corpus in opts.in_config[0]['tasks'].items():
src_lang, tgt_lang = corpus['src_tgt'].split('-')
offset = corpus.get('offset', 0)
if 'introduce_at_training_step' not in corpus:
lps_ready_to_start.append((src_lang, tgt_lang))
lps_ready_to_start.append((src_lang, tgt_lang, offset))
continue
adjusted = max(0, corpus.get('introduce_at_training_step', 0) - iats_at_last_gpu)
corpus['introduce_at_training_step'] = adjusted
if adjusted == 0:
lps_ready_to_start.append((src_lang, tgt_lang))
lps_ready_to_start.append((src_lang, tgt_lang, offset))

if n_gpus_tot < 2:
print('Assigning all tasks to 0:0')
Expand Down
46 changes: 25 additions & 21 deletions tools/gpu_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def swap(self, slot_a, slot_b, ao):
a_ready_to_start = None
b_ready_to_start = None
if lp_a is not None:
src_lang_a, tgt_lang_a = lp_a
src_lang_a, tgt_lang_a, offset_a = lp_a
components_a = ao.get_components(src_lang_a, tgt_lang_a)
for component in components_a:
# remove from a
Expand All @@ -81,7 +81,7 @@ def swap(self, slot_a, slot_b, ao):
component_to_gpus[component][gpu_b] += 1
a_ready_to_start = ao._is_ready_to_start(lp_a)
if lp_b is not None:
src_lang_b, tgt_lang_b = lp_b
src_lang_b, tgt_lang_b, offset_b = lp_b
components_b = ao.get_components(src_lang_b, tgt_lang_b)
for component in components_b:
# remove from b
Expand All @@ -102,7 +102,7 @@ def _compute_component_to_gpus(assignment, ao):
for gpu_slot, lp in assignment.items():
if lp is None:
continue
src_lang, tgt_lang = lp
src_lang, tgt_lang, offset = lp
gpu = (gpu_slot.node, gpu_slot.gpu)
components = ao.get_components(src_lang, tgt_lang)
for component in components:
Expand Down Expand Up @@ -152,7 +152,7 @@ def _least_favorite_slot_single(self, gpu_slots, ao):
lp = self.assignment[gpu_slot]
if lp is None:
continue
src_lang, tgt_lang = lp
src_lang, tgt_lang, offset = lp
components = ao.get_components(src_lang, tgt_lang)
for component in components:
component_counts[component] += 1
Expand All @@ -166,7 +166,7 @@ def _least_favorite_slot_single(self, gpu_slots, ao):
weighted_slots.append((0, gpu_slot))
continue
cost = 0
src_lang, tgt_lang = lp
src_lang, tgt_lang, offset = lp
components = ao.get_components(src_lang, tgt_lang)
for component in components:
if component_counts[component] == 1:
Expand All @@ -193,7 +193,7 @@ def __init__(
n_gpus_per_node: int,
n_slots_per_gpu: int,
get_components: Callable[[str, str], Set[str]],
ready_to_start: Optional[Set[Tuple[str, str]]] = None,
ready_to_start: Optional[Set[Tuple[str, str, int]]] = None,
):
self.n_nodes = n_nodes
self.n_gpus_per_node = n_gpus_per_node
Expand All @@ -209,8 +209,10 @@ def make_slots(n_nodes, n_gpus_per_node, n_slots_per_gpu):
for slot in range(n_slots_per_gpu):
yield GpuSlot(node, gpu, slot)

def initial_assignment(self, lang_pairs: List[Tuple[str, str]]):
def initial_assignment(self, lang_pairs: List[Tuple[str, str, int]]):
lang_pairs = list(lang_pairs)
if self.ready_to_start:
assert all(lp in lang_pairs for lp in self.ready_to_start), 'ready_to_Start must be subset of lang_pairs'
if len(lang_pairs) > len(self.gpu_slots):
raise Exception(f'More lang pairs {len(lang_pairs)} than gpu slots {len(self.gpu_slots)}')
if len(self.gpu_slots) > len(lang_pairs):
Expand Down Expand Up @@ -335,7 +337,7 @@ def _split_lps_cost(self, assignment: Assignment) -> float:
for gpu_slot, lp in assignment.items():
if lp is None:
continue
src_lang, tgt_lang = lp
src_lang, tgt_lang, offset = lp
lps[(gpu_slot.node, gpu_slot.gpu, src_lang, tgt_lang)] += 1
result = 0
for count in lps.values():
Expand Down Expand Up @@ -428,20 +430,20 @@ def print_assignment(assignment, group_mapping, ready_to_start=None):
f'{slot_str}: UNASSIGNED', flush=True
)
continue
src_lang, tgt_lang = lp
src_lang, tgt_lang, offset = lp
src_group = group_mapping[src_lang]
tgt_group = group_mapping[tgt_lang]
ready = 'ready to start' if lp in ready_to_start else ''
print(
f'{slot_str}: {src_lang}-{tgt_lang}\t({src_group}, {tgt_group})\t{ready}', flush=True
f'{slot_str}: {src_lang}-{tgt_lang}\tsplit{offset}\t({src_group}, {tgt_group})\t{ready}', flush=True
)


def optimize_gpu_assignment(
n_nodes: int,
n_gpus_per_node: int,
n_slots_per_gpu: int,
lang_pairs: List[Tuple[str, str]],
lang_pairs: List[Tuple[str, str, int]],
lang_to_group_mapping: Dict[str, str],
lps_ready_to_start: Optional[Set[Tuple[str, str]]],
log_name: Optional[str] = None,
Expand Down Expand Up @@ -498,23 +500,25 @@ def example():

# LPs ready to start at timestep 0
READY_TO_START = {
('en', 'de'),
('de', 'en'),
('en', 'en'),
('de', 'de'),
('sv', 'no'),
('no', 'sv'),
('sv', 'de'),
('de', 'sv'),
('fi', 'fi'),
('en', 'de', 0),
('de', 'en', 0),
('en', 'en', 0),
('de', 'de', 0),
('sv', 'no', 0),
('no', 'sv', 0),
('sv', 'de', 0),
('de', 'sv', 0),
('fi', 'fi', 0),
}

lang_pairs = []
for src_lang in LANGS.keys():
for tgt_lang in LANGS.keys():
if LANGS[src_lang] != LANGS[tgt_lang]:
continue
lang_pairs.append((src_lang, tgt_lang))
lang_pairs.append((src_lang, tgt_lang, 0))
# One split LP
lang_pairs.append(('en', 'de', 1))

optimize_gpu_assignment(
n_nodes=2,
Expand Down

0 comments on commit 844f2d2

Please sign in to comment.