Skip to content

Commit

Permalink
Merge branch 'cooperative-computing-lab:master' into find_worker_by_f…
Browse files Browse the repository at this point in the history
…iles
  • Loading branch information
JinZhou5042 authored Feb 5, 2025
2 parents 9aa8086 + 1875bd2 commit 39c50e5
Show file tree
Hide file tree
Showing 19 changed files with 442 additions and 69 deletions.
36 changes: 28 additions & 8 deletions dttools/src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ extern int debug_file_reopen(void);
extern int debug_file_close(void);

static void (*debug_write)(int64_t flags, const char *str) = debug_stderr_write;
static pid_t (*debug_getpid)(void) = getpid;
static pid_t (*debug_child_getpid)(void) = 0;
static char debug_program_name[PATH_MAX];
static int64_t debug_flags = D_NOTICE | D_ERROR | D_FATAL;
static pid_t debug_cached_pid = 0;
static int debug_time_zone_cached = 0;

struct flag_info {
const char *name;
Expand Down Expand Up @@ -182,20 +184,37 @@ static void do_debug(int64_t flags, const char *fmt, va_list args)
gettimeofday(&tv, 0);
tm = localtime(&tv.tv_sec);

/*
If the TZ environment variable is not set, then every single call
to localtime() results in a stat("/etc/localtime") which impacts
the minimum latency of a debug event.
*/

if (!debug_time_zone_cached) {
if (!getenv("TZ")) {
setenv("TZ", tm->tm_zone, 0);
}
debug_time_zone_cached = 1;
}

/* Fetch the pid just once and use it multiple times. */
pid_t pid = getpid();

buffer_putfstring(&B,
"%04d/%02d/%02d %02d:%02d:%02d.%02ld ",
"%04d/%02d/%02d %02d:%02d:%02d.%02ld %s[%d]",
tm->tm_year + 1900,
tm->tm_mon + 1,
tm->tm_mday,
tm->tm_hour,
tm->tm_min,
tm->tm_sec,
(long)tv.tv_usec / 10000);
buffer_putfstring(&B, "%s[%d] ", debug_program_name, getpid());
(long)tv.tv_usec / 10000,
debug_program_name,
pid);
}
/* Parrot prints debug messages for children: */
if (getpid() != debug_getpid()) {
buffer_putfstring(&B, "<child:%d> ", (int)debug_getpid());
if (debug_child_getpid) {
buffer_putfstring(&B, "<child:%d> ", (int)debug_child_getpid());
}
buffer_putfstring(&B, "%s: ", debug_flags_to_name(flags));

Expand Down Expand Up @@ -309,16 +328,17 @@ void debug_config_file(const char *path)
void debug_config(const char *name)
{
strncpy(debug_program_name, path_basename(name), sizeof(debug_program_name) - 1);
debug_cached_pid = getpid();
}

void debug_config_file_size(off_t size)
{
debug_file_size(size);
}

void debug_config_getpid(pid_t (*getpidf)(void))
void debug_config_child_getpid(pid_t (*getpidf)(void))
{
debug_getpid = getpidf;
debug_child_getpid = getpidf;
}

int64_t debug_flags_clear()
Expand Down
4 changes: 2 additions & 2 deletions dttools/src/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ modify the linker namespace we are using.
#define debug_config_file cctools_debug_config_file
#define debug_config_file_size cctools_debug_config_file_size
#define debug_config_fatal cctools_debug_config_fatal
#define debug_config_getpid cctools_debug_config_getpid
#define debug_config_child_getpid cctools_debug_config_child_getpid
#define debug_flags_set cctools_debug_flags_set
#define debug_flags_print cctools_debug_flags_print
#define debug_flags_clear cctools_debug_flags_clear
Expand Down Expand Up @@ -206,7 +206,7 @@ void debug_config_file_size(off_t size);

void debug_config_fatal(void (*callback) (void));

void debug_config_getpid (pid_t (*getpidf)(void));
void debug_config_child_getpid (pid_t (*getpidf)(void));

/** Set debugging flags to enable output.
Accepts a debug flag in ASCII form, and enables that subsystem. For example: <tt>debug_flags_set("chirp");</tt>
Expand Down
2 changes: 1 addition & 1 deletion parrot/src/pfs_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ int main( int argc, char *argv[] )
debug_config(argv[0]);
debug_config_file_size(0); /* do not rotate debug file by default */
debug_config_fatal(pfs_process_killall);
debug_config_getpid(pfs_process_getpid);
debug_config_child_getpid(pfs_process_getpid);

/* Special file descriptors (currently the channel and the Parrot
* directory) are allocated from the top of our file descriptor pool. After
Expand Down
9 changes: 9 additions & 0 deletions taskvine/src/bindings/python3/ndcctools/taskvine/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ def type(self):
if self._file:
return cvine.vine_file_type(self._file)

##
# Set the Unix mode permission bits for the remote file.
#
# @param self A file object.
# @param mode Unix mode bits.
def set_mode(self, mode):
if self._file:
return cvine.vine_file_set_mode(self._file, mode)

##
# Return the contents of a file object as a string.
# Typically used to return the contents of an output buffer.
Expand Down
157 changes: 135 additions & 22 deletions taskvine/src/bindings/python3/ndcctools/taskvine/futures.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@

from . import cvine
import hashlib
from collections import deque
from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import ALL_COMPLETED
from concurrent.futures._base import PENDING
from concurrent.futures._base import CANCELLED
from concurrent.futures._base import FINISHED
from collections import deque, namedtuple
from concurrent.futures import (
Executor,
Future,
FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
)
from concurrent.futures._base import PENDING, CANCELLED, FINISHED
from concurrent.futures import TimeoutError
from collections import namedtuple

from .task import (
PythonTask,
FunctionCall,
FunctionCallNoResult,
)

from .manager import (
Factory,
Manager,
)

import math
import os
import time
import textwrap
from functools import partial
from collections.abc import Sequence

RESULT_PENDING = 'result_pending'

Expand Down Expand Up @@ -109,7 +112,7 @@ def as_completed(fs, timeout=None):
f.module_manager.submit(f._task)

start = time.perf_counter()
result_timeout = min(timeout, 5) if timeout is not None else 5
result_timeout = max(1, min(timeout, 5)) if timeout else 5

def _iterator():
# iterate of queue of futures, yeilding completed futures and
Expand All @@ -133,22 +136,39 @@ def _iterator():
assert result != RESULT_PENDING
yield f

if (
fs and timeout is not None
and time.perf_counter() - start > timeout
):
if fs and timeout and time.perf_counter() - start > timeout:
raise TimeoutError()

return _iterator()


def run_iterable(fn, *args):
return list(map(fn, args))


def reduction_tree(fn, *args, n=2):
# n is the arity of the reduction function fn
# if less than 2, we have an infinite loop
assert n > 1
entries = [f.result() if isinstance(f, VineFuture) else f for f in args]
if len(entries) < 2:
return entries[0]

len_multiple = int(math.ceil(len(entries) / n) * n)
new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)])

return reduction_tree(fn, *new_args, n=n)

##
# \class FuturesExecutor
#
# TaskVine FuturesExecutor object
#
# This class acts as an interface for the creation of Futures


class FuturesExecutor(Executor):

def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}):
self.manager = Manager(port=port)
self.port = self.manager.port
Expand All @@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
else:
self.factory = None

def map(self, fn, iterable, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable, Sequence)

def wait_for_map_resolution(*futures_batch):
result = []
for f in futures_batch:
result.extend(f.result() if isinstance(f, VineFuture) else f)
return result

tasks = []
fn_wrapped = partial(run_iterable, fn)
while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]

if library_name:
raise NotImplementedError("Using a library not currently supported.")
future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads))
else:
future_batch_task = self.submit(self.future_task(fn_wrapped, *heads))

tasks.append(future_batch_task)

return self.submit(self.future_task(wait_for_map_resolution, *tasks))

# Reduce performs a reduction tree on the iterable and currently returns a single value
#
# parameters:
# - Function
# - a function that receives fn_arity arguments
# - A sequence of parameters that function will take
# - a chunk_size to group elements in sequence to dispatch to a single task
# - arity of the function, elements of a chunk are reduce arity-wise.
# - an optional library_name for a library function call
def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2):
assert chunk_size > 1
assert fn_arity > 1
assert isinstance(iterable, Sequence)
chunk_size = max(fn_arity, chunk_size)

new_iterable = []
while iterable:
heads, iterable = iterable[:chunk_size], iterable[chunk_size:]
heads = [f.result() if isinstance(f, VineFuture) else f for f in heads]
if library_name:
raise NotImplementedError("Using a library not currently supported.")
future_batch_task = self.submit(
self.future_funcall(
library_name, reduction_tree, fn, *heads, n=fn_arity
)
)
else:
future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity))

new_iterable.append(future_batch_task)

if len(new_iterable) > 1:
return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity)
else:
return new_iterable[0]

def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1):
assert chunk_size > 0
assert isinstance(iterable_rows, Sequence)
assert isinstance(iterable_cols, Sequence)

def wait_for_allpairs_resolution(row_size, col_size, mapped):
result = []
for _ in range(row_size):
result.append([0] * col_size)

mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped
for p in mapped:
(i, j, r) = p.result() if isinstance(p, VineFuture) else p
result[i][j] = r

return result

def wrap_idx(args):
i, j, a, b = args
return (i, j, fn(a, b))

iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)]
mapped = self.map(wrap_idx, iterable, library_name, chunk_size)

return self.submit(
self.future_task(
wait_for_allpairs_resolution,
len(iterable_rows),
len(iterable_cols),
mapped,
)
)

def submit(self, fn, *args, **kwargs):
if isinstance(fn, (FuturePythonTask, FutureFunctionCall)):
self.manager.submit(fn)
Expand Down Expand Up @@ -240,15 +354,15 @@ def cancelled(self):
return False

def running(self):
state = self._task.state
if state == "RUNNING":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_RUNNING:
return True
else:
return False

def done(self):
state = self._task.state
if state == "DONE" or state == "RETRIEVED":
state = self._task._module_manager.task_state(self._task.id)
if state == cvine.VINE_TASK_DONE:
return True
else:
return False
Expand Down Expand Up @@ -301,7 +415,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
self.manager = manager
self.library_name = library_name
self._envs = []

self._future = VineFuture(self)
self._has_retrieved = False

Expand All @@ -326,7 +439,6 @@ def output(self, timeout="wait_forever"):
self._saved_output = output['Result']
else:
self._saved_output = FunctionCallNoResult(output['Reason'])

except Exception as e:
self._saved_output = e
else:
Expand Down Expand Up @@ -400,6 +512,7 @@ def output(self, timeout="wait_forever"):
# task or the exception object of a failed task.
self._output = cloudpickle.loads(self._output_file.contents())
except Exception as e:
print(self._output_file.contents())
# handle output file fetch/deserialization failures
self._output = e
self._output_loaded = True
Expand Down
10 changes: 10 additions & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,16 @@ be delete at the manager's site after it is not needed by the workflow (@ref vin
struct vine_file *vine_declare_starch(
struct vine_manager *m, struct vine_file *f, vine_cache_level_t cache, vine_file_flags_t flags);

/** Set the Unix mode permission bits of a declared file.
Sets (or overrides) the Unix mode permissions of any file object,
as the application will see it. This applies to any file type,
but is particularly useful for controlling buffers, urls, and mini-tasks
that do not inherently contain mode bits.
@param f A file object of any kind.
@param mode The Unix mode bits to be applied to the file.
*/
void vine_file_set_mode( struct vine_file *f, int mode );

/** Fetch the contents of a file.
The contents of the given file will be loaded from disk or pulled back from the cluster
and loaded into manager memory. This is particularly useful for temporary files and mini-tasks
Expand Down
8 changes: 8 additions & 0 deletions taskvine/src/manager/vine_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ struct vine_file *vine_file_create(const char *source, const char *cached_name,
f->source_worker = 0;
f->type = type;
f->size = size;
f->mode = 0;
f->mini_task = mini_task;
f->recovery_task = 0;
f->state = VINE_FILE_STATE_CREATED; /* Assume state created until told otherwise */
Expand Down Expand Up @@ -371,4 +372,11 @@ const char *vine_file_source(struct vine_file *f)
return f->source;
}

void vine_file_set_mode(struct vine_file *f, int mode)
{
/* The mode must contain, at a minimum, owner-rw (0600) (so that we can delete it) */
/* And it should not contain anything beyond the standard 0777. */
f->mode = (mode | 0600) & 0777;
}

/* vim: set noexpandtab tabstop=8: */
Loading

0 comments on commit 39c50e5

Please sign in to comment.