Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP): find worker by files #4045

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 58 additions & 29 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ See the file COPYING for details.

#include "debug.h"
#include "hash_table.h"
#include "set.h"
#include "list.h"
#include "priority_queue.h"
#include "macros.h"
Expand Down Expand Up @@ -319,6 +320,32 @@ static int candidate_has_worse_fit(struct vine_worker_info *current_best, struct
return 0;
}

static struct vine_worker_info *find_worker_by_most_available_disk(struct vine_manager *q, struct vine_task *t)
{
struct vine_worker_info *best_worker = 0;

struct priority_queue *worker_queue = priority_queue_create(0);

char *key;
struct vine_worker_info *w;
HASH_TABLE_ITERATE(q->worker_table, key, w)
{
double disk_available = w->resources->disk.total - w->resources->disk.inuse;
priority_queue_push(worker_queue, w, disk_available);
}

while ((w = priority_queue_pop(worker_queue))) {
if (check_worker_against_task(q, w, t)) {
best_worker = w;
break;
}
}

priority_queue_delete(worker_queue);

return best_worker;
}

/*
Find the worker that has the largest quantity of cached data needed
by this task, so as to minimize transfer work that must be done
Expand All @@ -327,49 +354,51 @@ by the manager.

static struct vine_worker_info *find_worker_by_files(struct vine_manager *q, struct vine_task *t)
{
char *key;
struct vine_worker_info *w;
struct vine_worker_info *best_worker = 0;
int offset_bookkeep;
int64_t most_task_cached_bytes = 0;
int64_t task_cached_bytes;
uint8_t has_all_files;
struct vine_file_replica *replica;
struct vine_mount *m;

int ramp_down = vine_schedule_in_ramp_down(q);
struct priority_queue *worker_queue = priority_queue_create(0);

HASH_TABLE_ITERATE_RANDOM_START(q->worker_table, offset_bookkeep, key, w)
/* for each input file, find the workers that hold it, and enqueue those workers based on the input sizes they have */
LIST_ITERATE(t->input_mounts, m)
{
/* Careful: If check_worker_against task fails, then w may no longer be valid. */
if (check_worker_against_task(q, w, t)) {
task_cached_bytes = 0;
has_all_files = 1;
struct set *sources = hash_table_lookup(q->file_worker_table, m->file->cached_name);

LIST_ITERATE(t->input_mounts, m)
{
replica = hash_table_lookup(w->current_files, m->file->cached_name);
if (!sources) {
break;
}

if (replica && m->file->type == VINE_FILE) {
task_cached_bytes += replica->size;
} else if (m->file->cache_level > VINE_CACHE_LEVEL_TASK) {
has_all_files = 0;
}
}
/* enqueue each worker and update their priority accordingly */
struct vine_worker_info *s;
SET_ITERATE(sources, s)
{
double source_priority = m->file->size;

/* Return the worker if it was in possession of all cacheable files */
if (has_all_files && !ramp_down) {
return w;
int idx = priority_queue_find_idx(worker_queue, s);
if (idx >= 0) {
source_priority += priority_queue_get_priority(worker_queue, idx);
}

if (!best_worker || task_cached_bytes > most_task_cached_bytes ||
(ramp_down && task_cached_bytes == most_task_cached_bytes && candidate_has_worse_fit(best_worker, w))) {
best_worker = w;
most_task_cached_bytes = task_cached_bytes;
}
priority_queue_update_priority(worker_queue, s, source_priority);
}
}

/* find the worker that has the largest amount of inputs */
while ((w = priority_queue_pop(worker_queue))) {
if (check_worker_against_task(q, w, t)) {
best_worker = w;
break;
}
}

priority_queue_delete(worker_queue);

/* if not found, find a worker that has the most available disk */
if (!best_worker) {
best_worker = find_worker_by_most_available_disk(q, t);
}

return best_worker;
}

Expand Down