Skip to content

Commit

Permalink
add directory in dt
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Ejarque committed Oct 13, 2023
1 parent 7e8167f commit 9b1019f
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from pycompss.api.parameter import COLLECTION_IN
from pycompss.api.parameter import FILE_OUT
from pycompss.api.parameter import FILE_IN
from pycompss.api.parameter import DIRECTORY_IN
from pycompss.api.parameter import DIRECTORY_OUT


@task(returns=1)
Expand All @@ -47,7 +49,7 @@ def transform(target, function, **kwargs):


@task(returns=object, target=COLLECTION_IN)
def col_to_obj(target, function):
def col_to_obj(target, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
Expand All @@ -57,11 +59,25 @@ def col_to_obj(target, function):
produces an object.
:return:
"""
return function(target)
return function(target, **kwargs)


@task(destination=FILE_OUT, target=COLLECTION_IN)
def col_to_file(target, destination, function):
def col_to_file(target, destination, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
@param target: the parameter that DT will be applied to
@param destination: name of the file that will be produced by this task
@param function: DT function which accepts a collection as input and
produces a file.
:return:
"""
function(target, destination, **kwargs)

@task(destination=DIRECTORY_OUT, target=COLLECTION_IN)
def col_to_dir(target, destination, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
Expand All @@ -72,11 +88,11 @@ def col_to_file(target, destination, function):
produces a file.
:return:
"""
function(target, destination)
function(target, destination, **kwargs)


@task(returns=object(), target=FILE_IN)
def file_to_object(target, function):
def file_to_object(target, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
Expand All @@ -86,11 +102,24 @@ def file_to_object(target, function):
produces an object.
:return:
"""
return function(target)
return function(target, **kwargs)

@task(returns=object(), target=DIRECTORY_IN)
def dir_to_object(target, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
@param target: the parameter that DT will be applied to
@param function: DT function which accepts a file as input and
produces an object.
:return:
"""
return function(target,**kwargs)


@task(destination=FILE_OUT)
def object_to_file(target, destination, function):
def object_to_file(target, destination, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
Expand All @@ -101,11 +130,25 @@ def object_to_file(target, destination, function):
produces a file.
:return:
"""
function(target, destination)
function(target, destination, **kwargs)

@task(destination=DIRECTORY_OUT)
def object_to_dir(target, destination, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
@param target: the parameter that DT will be applied to
@param destination: name of the file that will be produced by this task
@param function: DT function which accepts an object as input and
produces a file.
:return:
"""
function(target, destination, **kwargs)


@task(target=FILE_IN)
def file_to_col(target, function):
def file_to_col(target, function, **kwargs):
"""Replace the user function with its @task equivalent.
NOTE: Used from @data_transformation.
Expand All @@ -115,7 +158,7 @@ def file_to_col(target, function):
produces a collection.
:return:
"""
return function(target)
return function(target, **kwargs)


# @task(target=FILE_IN, destination=COLLECTION_OUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,20 @@
from pycompss.api.commons.private_tasks import transform as _transform
from pycompss.api.commons.private_tasks import col_to_obj as _col_to_obj
from pycompss.api.commons.private_tasks import col_to_file as _col_to_file
from pycompss.api.commons.private_tasks import col_to_dir as _col_to_dir
from pycompss.api.commons.private_tasks import (
object_to_file as _object_to_file,
)
from pycompss.api.commons.private_tasks import (
object_to_dir as _object_to_dir,
)
from pycompss.api.commons.private_tasks import file_to_col as _file_to_col
from pycompss.api.commons.private_tasks import (
file_to_object as _file_to_object,
)
from pycompss.api.commons.private_tasks import (
dir_to_object as _dir_to_object,
)
from pycompss.util.typing_helper import typing

# from pycompss.runtime.task.definitions.core_element import CE
Expand All @@ -58,7 +65,9 @@
FILE_TO_COLLECTION = 2
COLLECTION_TO_OBJECT = 3
COLLECTION_TO_FILE = 4

OBJECT_TO_DIRECTORY = 5
DIRECTORY_TO_OBJECT = 6
COLLECTION_TO_DIRECTORY = 7

class DataTransformation: # pylint: disable=R0902,R0903
# disable=too-many-instance-attributes, too-few-public-methods
Expand Down Expand Up @@ -124,9 +133,7 @@ def dt_f(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
if __debug__:
logger.debug("Executing DT wrapper.")
tmp = list(args)
if (
CONTEXT.in_master() or CONTEXT.is_nesting_enabled()
) and not self.core_element_configured:
if ( CONTEXT.in_master() or CONTEXT.is_nesting_enabled() ) and ( not self.core_element_configured ):
self.__call_dt__(user_function, tmp, kwargs)
with keep_arguments(tuple(tmp), kwargs, prepend_strings=True):
# no need to do anything on the worker side
Expand All @@ -146,6 +153,7 @@ def __call_dt__(self, user_function, args: list, kwargs: dict) -> None:
"""
dts = []
self.user_function = user_function
dt_kwargs=self.kwargs.copy()
if __debug__:
logger.debug("Configuring DT core element.")
if "dt" in kwargs:
Expand All @@ -155,27 +163,41 @@ def __call_dt__(self, user_function, args: list, kwargs: dict) -> None:
elif isinstance(tmp, list):
dts = [obj.extract() for obj in tmp]
elif len(self.args) == 2:
dts.append((self.args[0], self.args[1], self.kwargs))
dts.append((self.args[0], self.args[1], dt_kwargs))
elif self.type is OBJECT_TO_FILE:
dts.append(
(self.target, self.user_function, self.kwargs, args, kwargs)
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is OBJECT_TO_DIRECTORY:
dts.append(
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is FILE_TO_OBJECT:
dts.append(
(self.target, self.user_function, self.kwargs, args, kwargs)
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is DIRECTORY_TO_OBJECT:
dts.append(
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is FILE_TO_COLLECTION:
dts.append(
(self.target, self.user_function, self.kwargs, args, kwargs)
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is COLLECTION_TO_OBJECT:
dts.append(
(self.target, self.user_function, self.kwargs, args, kwargs)
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is COLLECTION_TO_FILE:
dts.append(
(self.target, self.user_function, self.kwargs, args, kwargs)
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
elif self.type is COLLECTION_TO_DIRECTORY:
dts.append(
(self.target, self.user_function, dt_kwargs, args, kwargs)
)
if __debug__:
logger.debug("Applying " + str(len(dts)) +" DTs.")
for _dt in dts:
self._apply_dt(_dt[0], _dt[1], _dt[2], args, kwargs)

Expand Down Expand Up @@ -205,7 +227,7 @@ def _apply_dt(
all_params = inspect.signature(self.user_function) # type: ignore
keyz = all_params.parameters.keys()
if param_name not in keyz:
raise Exception("Wrong Param Name in DT")
raise Exception("Wrong Param " + param_name + " in data transformation")
i = list(keyz).index(param_name)
if i < len(args):
p_value = args[i]
Expand All @@ -215,25 +237,35 @@ def _apply_dt(
).default # type: ignore

new_value = None
func_kwargs = _replace_func_kwargs(func_kwargs, kwargs, args, self.user_function)
if is_workflow:
# no need to create a task if it's a workflow
new_value = func(p_value, **func_kwargs)
elif self.type is OBJECT_TO_FILE:
_object_to_file(p_value, self.destination, self.dt_function)
_object_to_file(p_value, self.destination, self.dt_function, **func_kwargs)
new_value = self.destination
elif self.type is OBJECT_TO_DIRECTORY:
_object_to_dir(p_value, self.destination, self.dt_function, **func_kwargs)
new_value = self.destination
elif self.type is FILE_TO_OBJECT:
new_value = _file_to_object(p_value, self.dt_function)
new_value = _file_to_object(p_value, self.dt_function, **func_kwargs)
elif self.type is DIRECTORY_TO_OBJECT:
new_value = _dir_to_object(p_value, self.dt_function, **func_kwargs)
elif self.type is FILE_TO_COLLECTION:
size = int(self.kwargs.pop("size"))
new_value = _file_to_col( # pylint: disable=unexpected-keyword-arg
p_value,
self.dt_function,
returns=size,
**func_kwargs
)
elif self.type is COLLECTION_TO_OBJECT:
new_value = _col_to_obj(p_value, self.dt_function)
new_value = _col_to_obj(p_value, self.dt_function, **func_kwargs)
elif self.type is COLLECTION_TO_FILE:
_col_to_file(p_value, self.destination, self.dt_function)
_col_to_file(p_value, self.destination, self.dt_function, **func_kwargs)
new_value = self.destination
elif self.type is COLLECTION_TO_DIRECTORY:
_col_to_dir(p_value, self.destination, self.dt_function, **func_kwargs)
new_value = self.destination
else:
new_value = _transform(p_value, func, **func_kwargs)
Expand All @@ -243,6 +275,28 @@ def _apply_dt(
else:
args[i] = new_value

def _replace_func_kwargs(dt_f_kwargs, f_kwargs, f_args, f):

for key, value in dt_f_kwargs.items():
if type(value) == str and value.startswith("{{") and value.endswith("}}"):
name = value[2:-2]
if name in f_kwargs:
dt_f_kwargs[key] = f_kwargs[name]
else:
dt_f_kwargs[key] = _get_param_from_signature(f, name, f_args)
return dt_f_kwargs

def _get_param_from_signature(function, param_name, args):
all_params = inspect.signature(function) # type: ignore
keyz = all_params.parameters.keys()
if param_name not in keyz:
raise Exception("Wrong Param " + param_name + " in data transformation")
i = list(keyz).index(param_name)
if i < len(args):
p_value = args[i]
else:
p_value = all_params.parameters.get(param_name).default # type: ignore
return p_value

class DTObject: # pylint: disable=R0903
# disable=too-few-public-methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ def __run_http__(
:param kwargs: Keyword arguments received from call.
:return: Execution return code.
"""
print("running http")
if __debug__:
logger.debug("Running HTTP task ...")
return 200

def __configure_core_element__(self, kwargs: dict) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,14 @@ def parse_config_file(self) -> None:
self.parameters = config.get(LABELS.parameters, {})
exec_type = execution.pop(LABELS.type, None)
if exec_type is None:
print("Execution type not provided for @software task")
print("WARN: Execution type not provided for @software task")
elif exec_type == LABELS.task:
self.task_type, self.decor = SUPPORTED_DECORATORS[exec_type]
print("Executing task function..")
if __debug__:
logger.debug("Executing Software as task function ...")
elif exec_type == "workflow":
print("Executing workflow..")
if __debug__:
logger.debug("Executing Software as Workflow ...")
self.is_workflow = True
return
elif exec_type.lower() not in SUPPORTED_DECORATORS:
Expand Down
13 changes: 7 additions & 6 deletions compss/runtime/scripts/system/runtime/compss_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,20 @@ prepare_runtime_environment() {
prepare_coverage
fi

# Create JVM Options file
generate_jvm_opts_file

# Start streaming backend if required
start_stream_backends

if [ -n "${gen_core}" ]; then
prepare_coredump_generation
fi

if [ -n "${keepWD}" ]; then
prepare_keep_workingdir
fi

# Create JVM Options file
generate_jvm_opts_file

# Start streaming backend if required
start_stream_backends

}

prepare_coredump_generation() {
Expand Down

0 comments on commit 9b1019f

Please sign in to comment.