Skip to content

Commit

Permalink
vine: serverless import issue (#3567)
Browse files Browse the repository at this point in the history
* vine: serverless import issue

* modified test example

* modified test examples: inside-function imports

* modified imports format, test examples and doc accordingly

* update doc

* vine: new format of import_modules and update of doc

* add import json

* modify seconds of timeout in taskvine

* change format of import_modules

* modified add_env argument

* a set of requested changes

* type fix
  • Loading branch information
JinZhou5042 authored Nov 27, 2023
1 parent 871aa94 commit 2c5068c
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 90 deletions.
43 changes: 41 additions & 2 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1507,8 +1507,47 @@ function definitions into a library task `libtask`
libtask = m.create_library_from_functions("my_library", my_sum, my_mul)
```

You can optionally specify the number of functions the library can
run concurrently by setting the number of function slots (default to 1):
We strongly recommend to specify the modules the function needs inside the function itself. This ensures that the correct modules and their aliases will be available when the functions are executed in isolation at the worker:

You can certainly embed `import` statements within the function and install any necessary packages:

=== Python
```python
def divide(dividend, divisor):
import math
return dividend / math.sqrt(divisor)

libtask = m.create_library_from_functions("my_library", divide)
```

If the overhead of importing modules per function is noticeable, modules can be optionally imported as a common preamble to the function executions. Common modules can be specified with the `import_modules` argument to `create_library_from_functions`. This reduces the overhead by eliminating redundant imports:


=== Python
```python
import numpy
import math

import_modules = [numpy, math]
```

`import_modules` only accepts modules as arguments (e.g. it can't be used to import functions, or select particular names with `from ... import ...` statements. Such statements should be made inside functions after specifying the modules with `import_modules`.

=== Python
```python
def cube(x):
# whenever using FromImport statments, put them inside of functions
from random import uniform
from time import sleep as time_sleep

random_delay = uniform(0.00001, 0.0001)
time_sleep(random_delay)

return math.pow(x, 3)
```


After installing the packages and functions, you can optionally specify the number of functions the library can run concurrently by setting the number of function slots (default to 1):

=== "Python"
```python
Expand Down
2 changes: 1 addition & 1 deletion dttools/test/test_runner_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ run_taskvine_worker()
exit 1
fi
echo "Running worker."
if ! "$TASKVINE_WORKER" --single-shot --timeout=10s --cores ${cores:-1} --memory ${memory:-250} --disk ${disk:-250} --gpus ${gpus:-0} --debug=all --debug-file="$log" $* localhost $(cat "$port_file"); then
if ! "$TASKVINE_WORKER" --single-shot --timeout=10 --cores ${cores:-1} --memory ${memory:-250} --disk ${disk:-250} --gpus ${gpus:-0} --debug=all --debug-file="$log" $* localhost $(cat "$port_file"); then
echo "ERROR: could not start worker"
exit 1
fi
Expand Down
143 changes: 84 additions & 59 deletions poncho/src/poncho/package_serverize.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from ndcctools.poncho.wq_network_code import wq_network_code
from ndcctools.poncho.library_network_code import library_network_code

import argparse
import json
import os
import stat
import ast
import types
import tarfile
import hashlib
import inspect
Expand All @@ -23,69 +23,93 @@

default_name_func = \
'''def name():
return "my_coprocess"
return "my_coprocess"
'''
init_function = \
'''if __name__ == "__main__":
main()
main()
'''


# Generates a list of import statements based on the given argument.
# @param import_modules A list of modules imported at the preamble of library
def generate_import_statements(import_modules):
if not import_modules:
return

if not isinstance(import_modules, list):
raise ValueError("Expected 'import_modules' to be a list.")

import_statements = []
for module in import_modules:
if not isinstance(module, types.ModuleType):
raise ValueError("Expected ModuleType in 'import_modules'.")

import_statements.append(f"import {module.__name__}")

return import_statements


# Create the library driver code that will be run as a normal task
# on workers and execute function invocations upon workers' instructions.
# @param path Path to the temporary Python script containing functions.
# @param funcs A list of relevant function names.
# @param dest Path to the final library script.
# @param version Whether this is for workqueue or taskvine serverless code.
def create_library_code(path, funcs, dest, version):
import_modules = []
function_source_code = []
name_source_code = ""
absolute_path = os.path.abspath(path)
# open the source file, parse the code into an ast, and then unparse the ast import statements and functions back into python code
with open(absolute_path, 'r') as source:
code = ast.parse(source.read(), filename=absolute_path)
for stmt in ast.walk(code):
if isinstance(stmt, ast.Import) or isinstance(stmt, ast.ImportFrom):
import_modules.append(ast.unparse(stmt))
if isinstance(stmt, ast.FunctionDef):
if stmt.name == "name":
name_source_code = ast.unparse(stmt)
elif stmt.name in funcs:
function_source_code.append(ast.unparse(stmt))
funcs.remove(stmt.name)
if name_source_code == "":
print("No name function found, defaulting to my_coprocess")
name_source_code = default_name_func
for func in funcs:
print(f"No function found named {func}, skipping")
# create output file
output_file = open(dest, "w")
# write shebang to file
output_file.write(shebang)
# write imports to file
for import_module in import_modules:
output_file.write(f"{import_module}\n")
# write network code into it
if version == "work_queue":
raw_source_fnc = wq_network_code
elif version == "taskvine":
raw_source_fnc = library_network_code
raw_source_code = inspect.getsource(raw_source_fnc)
network_code = "\n".join([line[4:] for line in raw_source_code.split("\n")[1:]])
output_file.write(network_code)
# write name function code into it
output_file.write(f"{name_source_code}\n")
# iterate over every function the user requested and attempt to put it into the library code
for function_code in function_source_code:
output_file.write("@remote_execute\n")
output_file.write(function_code)
output_file.write("\n")
output_file.write(init_function)
output_file.close()
st = os.stat(dest)
os.chmod(dest, st.st_mode | stat.S_IEXEC)
# @param path Path to the temporary Python script containing functions.
# @param funcs A list of relevant function names.
# @param dest Path to the final library script.
# @param version Whether this is for workqueue or taskvine serverless code.
# @param import_modules A list of modules to be imported at the preamble of library
def create_library_code(path, funcs, dest, version, import_modules=None):

# create output file
with open(dest, "w") as output_file:
# write shebang to file
output_file.write(shebang)
# write imports to file
import_statements = generate_import_statements(import_modules)
if import_statements:
for import_statement in import_statements:
output_file.write(f"{import_statement}\n")

function_source_code = []
name_source_code = ""
absolute_path = os.path.abspath(path)
# open the source file, parse the code into an ast, and then unparse functions back into python code
with open(absolute_path, 'r') as source:
code = ast.parse(source.read(), filename=absolute_path)
for stmt in ast.walk(code):
if isinstance(stmt, ast.FunctionDef):
if stmt.name == "name":
name_source_code = ast.unparse(stmt)
elif stmt.name in funcs:
function_source_code.append(ast.unparse(stmt))
funcs.remove(stmt.name)
if name_source_code == "":
print("No name function found, defaulting to my_coprocess")
name_source_code = default_name_func
for func in funcs:
print(f"No function found named {func}, skipping")

# write network code into it
if version == "work_queue":
raw_source_fnc = wq_network_code
elif version == "taskvine":
raw_source_fnc = library_network_code
raw_source_code = inspect.getsource(raw_source_fnc)
network_code = "\n".join([line[4:] for line in raw_source_code.split("\n")[1:]])
output_file.write(network_code)

# write name function code into it
output_file.write(f"{name_source_code}\n")
# iterate over every function the user requested and attempt to put it into the library code
for function_code in function_source_code:
output_file.write("@remote_execute\n")
output_file.write(function_code)
output_file.write("\n")
output_file.write(init_function)

st = os.stat(dest)
os.chmod(dest, st.st_mode | stat.S_IEXEC)

def sort_spec(spec):
sorted_spec = json.load(spec)
Expand Down Expand Up @@ -150,9 +174,10 @@ def generate_functions_hash(functions: list) -> str:

# Create a library file and a poncho environment tarball from a list of functions as needed.
# The functions in the list must have source code for this code to work.
# @param path path to directory to create the library python file and the environment tarball.
# @param functions list of functions to include in the
def serverize_library_from_code(path, functions, name, need_pack=True):
# @param path path to directory to create the library python file and the environment tarball.
# @param functions list of functions to include in the
# @param import_modules a list of modules to be imported at the preamble of library
def serverize_library_from_code(path, functions, name, need_pack=True, import_modules=None):
tmp_library_path = f"{path}/tmp_library.py"

# Write out functions into a temporary python file.
Expand All @@ -162,7 +187,7 @@ def serverize_library_from_code(path, functions, name, need_pack=True):
temp_source_file.write(f"def name():\n\treturn '{name}'")

# create the final library code from that temporary file
create_library_code(tmp_library_path, [fnc.__name__ for fnc in functions], path + "/library_code.py", "taskvine")
create_library_code(tmp_library_path, [fnc.__name__ for fnc in functions], path + "/library_code.py", "taskvine", import_modules=import_modules)

# remove the temp library file
os.remove(tmp_library_path)
Expand Down
7 changes: 4 additions & 3 deletions taskvine/src/bindings/python3/ndcctools/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,8 @@ def remove_library(self, name):
# @param init_command A string describing a shell command to execute before the library task is run
# @param add_env Whether to automatically create and/or add environment to the library
# @returns A task to be used with @ref ndcctools.taskvine.manager.Manager.install_library.
def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True):
# @param import_modules A list of modules to be imported at the preamble of library
def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None):
# Delay loading of poncho until here, to avoid bringing in poncho dependencies unless needed.
# Ensure poncho python library is available.
try:
Expand All @@ -872,7 +873,7 @@ def create_library_from_functions(self, name, *function_list, poncho_env=None, i
# Positional arguments are the list of functions to include in the library.
# Create a unique hash of a combination of function names and bodies.
functions_hash = package_serverize.generate_functions_hash(function_list)

# Create path for caching library code and environment based on function hash.
library_cache_path = f"{self.cache_directory}/vine-library-cache/{functions_hash}"
library_code_path = f"{library_cache_path}/library_code.py"
Expand All @@ -896,7 +897,7 @@ def create_library_from_functions(self, name, *function_list, poncho_env=None, i
need_pack=False

# create library code and environment, if appropriate
package_serverize.serverize_library_from_code(library_cache_path, function_list, name, need_pack=need_pack)
package_serverize.serverize_library_from_code(library_cache_path, function_list, name, need_pack=need_pack, import_modules=import_modules)

# enable correct permissions for library code
os.chmod(library_code_path, 0o775)
Expand Down
48 changes: 35 additions & 13 deletions taskvine/src/examples/vine_example_function_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,31 @@
# using FunctionCall tasks.

import ndcctools.taskvine as vine
import json
import argparse
import math
import json

# The library will consist of the following three functions:

def cube(x):
# whenever using FromImport statments, put them inside of functions
from random import uniform
from time import sleep as time_sleep

# The library will consist of the following two functions:
random_delay = uniform(0.00001, 0.0001)
time_sleep(random_delay)

return math.pow(x, 3)

def divide(dividend, divisor):
import math
return dividend/math.sqrt(divisor)
# straightfoward usage of preamble import statements
return dividend / math.sqrt(divisor)

def double(x):
return x*2
import math as m
# use alias inside of functions
return m.prod([x, 2])


def main():
parser = argparse.ArgumentParser(
Expand All @@ -30,7 +44,7 @@ def main():
default=False,
)

q = vine.Manager(9123)
q = vine.Manager(port=9123)

print(f"TaskVine manager listening on port {q.port}")

Expand All @@ -41,35 +55,41 @@ def main():
else:
q.enable_peer_transfers()

print("Creating library from functions...")
print("Creating library from packages and functions...")

# This format shows how tocd create package import statements for the library
import_modules = [math]
libtask = q.create_library_from_functions('test-library', divide, double, cube, import_modules=import_modules, add_env=False)

libtask = q.create_library_from_functions('test-library', divide, double)
q.install_library(libtask)

print("Submitting function call tasks...")

tasks = 100

for i in range(0,tasks):
for _ in range(0, tasks):
s_task = vine.FunctionCall('test-library', 'divide', 2, 2**2)
q.submit(s_task)

s_task = vine.FunctionCall('test-library', 'double', 3)
q.submit(s_task)

s_task = vine.FunctionCall('test-library', 'cube', 4)
q.submit(s_task)

print("Waiting for results...")

total_sum = 0
x = 0

while not q.empty():
t = q.wait(5)
if t:
x = t.output
x = t.output
total_sum += x
print(f"task {t.id} completed with result {x}")

# Check that we got the right result.
expected = tasks * ( divide(2, 2**2) + double(3) )
expected = tasks * (divide(2, 2**2) + double(3) + cube(4))

print(f"Total: {total_sum}")
print(f"Expected: {expected}")
Expand All @@ -78,4 +98,6 @@ def main():

if __name__ == '__main__':
main()


# vim: set sts=4 sw=4 ts=4 expandtab ft=python:
2 changes: 1 addition & 1 deletion taskvine/src/examples/vine_example_future_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def load_matrices(levels):

def write_matrices(levels, n):
for x in range(2**(levels+1)):
matrix = generateRandomMatrix(n)
matrix = generate_random_matrix(n)
with open('matrices/matrix-{}'.format(x), 'wb') as f:
cloudpickle.dump(matrix, f)

Expand Down
Loading

0 comments on commit 2c5068c

Please sign in to comment.