From 1cd2f0dd54262fa1c972fe36ec8458697f56027e Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Tue, 15 Oct 2024 13:45:33 -0500 Subject: [PATCH] Add working_dir kwarg to GlobusComputeEngineBase `ThreadPoolEngine` and `ProcessPoolEngine` now pass the `working_dir` default `"tasks_working_dir"` to the `GlobusComputeEngineBase`, which makes the path absolute. When `execute_task` runs the cwd is changed to `working_dir`, ensuring that tasks run in the same directory. * Added a new method `GlobusComputeEngineBase.set_working_dir()` to ensure `working_dir` is absolute. * Enforce absolute paths for `run_dir` in `execute_task()` * Removed an obsolete test --- ...adudoc1729_configure_tasks_working_dir.rst | 20 +++++ .../globus_compute_endpoint/engines/base.py | 30 ++++++- .../engines/globus_compute.py | 13 +-- .../globus_compute_endpoint/engines/helper.py | 21 +++-- .../engines/high_throughput/engine.py | 6 +- .../engines/process_pool.py | 3 + .../engines/thread_pool.py | 4 +- compute_endpoint/tests/unit/test_engines.py | 33 +++++++- .../tests/unit/test_execute_task.py | 17 +++- compute_endpoint/tests/unit/test_worker.py | 16 ++-- .../tests/unit/test_working_dir.py | 81 +++++++++++-------- compute_endpoint/tests/utils.py | 6 ++ 12 files changed, 182 insertions(+), 68 deletions(-) create mode 100644 changelog.d/20241023_095610_yadudoc1729_configure_tasks_working_dir.rst diff --git a/changelog.d/20241023_095610_yadudoc1729_configure_tasks_working_dir.rst b/changelog.d/20241023_095610_yadudoc1729_configure_tasks_working_dir.rst new file mode 100644 index 000000000..c593e8c41 --- /dev/null +++ b/changelog.d/20241023_095610_yadudoc1729_configure_tasks_working_dir.rst @@ -0,0 +1,20 @@ +New Functionality +^^^^^^^^^^^^^^^^^ + +- ``GlobusComputeEngine``, ``ThreadPoolEngine``, and ``ProcessPoolEngine`` can + now be configured with ``working_dir`` to specify the tasks working directory. + If a relative path is specified, it is set in relation to the endpoint + run directory (usually ``~/.globus_compute/``). Here's an example + config file: + + .. code-block:: yaml + + engine: + type: GlobusComputeEngine + working_dir: /absolute/path/to/tasks_working_dir + +Bug Fixes +^^^^^^^^^ + + - Fixed a bug where functions run with ``ThreadPoolEngine`` and ``ProcessPoolEngine`` + create and switch into the ``tasks_working_dir`` creating endless nesting. diff --git a/compute_endpoint/globus_compute_endpoint/engines/base.py b/compute_endpoint/globus_compute_endpoint/engines/base.py index acddf22a4..90368825e 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/base.py +++ b/compute_endpoint/globus_compute_endpoint/engines/base.py @@ -81,8 +81,31 @@ def __init__( *args: object, endpoint_id: uuid.UUID | None = None, max_retries_on_system_failure: int = 0, + working_dir: str | os.PathLike = "tasks_working_dir", **kwargs: object, ): + """ + Parameters + ---------- + + endpoint_id: uuid | None + ID of the endpoint that the engine serves as execution backend + + max_retries_on_system_failure: int + Set the number of retries for functions that fail due to system + failures such as node failure/loss. Since functions can fail + after partial runs, consider additional cleanup logic before + enabling this functionality. default=0 + + working_dir: str | os.PathLike + Directory within which functions should execute, defaults to + (~/.globus_compute//tasks_working_dir) + If a relative path is supplied, the working dir is set relative + to the endpoint.run_dir. If an absolute path is supplied, it is + used as is. default="tasks_working_dir" + + kwargs + """ self._shutdown_event = threading.Event() self.endpoint_id = endpoint_id self.max_retries_on_system_failure = max_retries_on_system_failure @@ -91,7 +114,7 @@ def __init__( # endpoint interchange happy self.container_type: str | None = None self.run_dir: str | None = None - self.working_dir: str | os.PathLike = "tasks_working_dir" + self.working_dir: str | os.PathLike = working_dir self.run_in_sandbox: bool = False # This attribute could be set by the subclasses in their # start method if another component insists on owning the queue. @@ -111,6 +134,11 @@ def start( def get_status_report(self) -> EPStatusReport: raise NotImplementedError + def set_working_dir(self, run_dir: str | None = None): + if not os.path.isabs(self.working_dir): + run_dir = os.path.abspath(run_dir or os.getcwd()) + self.working_dir = os.path.join(run_dir, self.working_dir) + def report_status(self) -> None: status_report = self.get_status_report() packed: bytes = messagepack.pack(status_report) diff --git a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py index ed5605e38..d1cd1685e 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py +++ b/compute_endpoint/globus_compute_endpoint/engines/globus_compute.py @@ -50,7 +50,6 @@ def __init__( encrypted: bool = True, strategy: str | None = None, job_status_kwargs: t.Optional[JobStatusPollerKwargs] = None, - working_dir: str | os.PathLike = "tasks_working_dir", run_in_sandbox: bool = False, **kwargs, ): @@ -90,13 +89,6 @@ def __init__( encrypted: bool Flag to enable/disable encryption (CurveZMQ). Default is True. - working_dir: str | os.PathLike - Directory within which functions should execute, defaults to - (~/.globus_compute//tasks_working_dir) - If a relative path is supplied, the working dir is set relative - to the endpoint.run_dir. If an absolute path is supplied, it is - used as is. - run_in_sandbox: bool Functions will run in a sandbox directory under the working_dir if this option is enabled. Default: False @@ -146,7 +138,6 @@ def __init__( self.executor.interchange_launch_cmd = self._get_compute_ix_launch_cmd() self.executor.launch_cmd = self._get_compute_launch_cmd() - self.working_dir = working_dir self.run_in_sandbox = run_in_sandbox if strategy is None: strategy = "simple" @@ -249,9 +240,7 @@ def start( assert endpoint_id, "GCExecutor requires kwarg:endpoint_id at start" assert run_dir, "GCExecutor requires kwarg:run_dir at start" - if not os.path.isabs(self.working_dir): - # set relative to run_dir - self.working_dir = os.path.join(run_dir, self.working_dir) + self.set_working_dir(run_dir=run_dir) self.endpoint_id = endpoint_id self.run_dir = run_dir diff --git a/compute_endpoint/globus_compute_endpoint/engines/helper.py b/compute_endpoint/globus_compute_endpoint/engines/helper.py index 9038e6a1d..67238d03f 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/helper.py +++ b/compute_endpoint/globus_compute_endpoint/engines/helper.py @@ -58,14 +58,19 @@ def execute_task( os.environ.pop("GC_TASK_SANDBOX_DIR", None) os.environ["GC_TASK_UUID"] = str(task_id) - if run_dir: - os.makedirs(run_dir, exist_ok=True) - os.chdir(run_dir) - if run_in_sandbox: - os.makedirs(str(task_id)) # task_id is expected to be unique - os.chdir(str(task_id)) - # Set sandbox dir so that apps can use it - os.environ["GC_TASK_SANDBOX_DIR"] = os.getcwd() + + if not run_dir or not os.path.isabs(run_dir): + raise RuntimeError( + f"execute_task requires an absolute path for run_dir, got {run_dir=}" + ) + + os.makedirs(run_dir, exist_ok=True) + os.chdir(run_dir) + if run_in_sandbox: + os.makedirs(str(task_id)) # task_id is expected to be unique + os.chdir(str(task_id)) + # Set sandbox dir so that apps can use it + os.environ["GC_TASK_SANDBOX_DIR"] = os.getcwd() env_details = get_env_details() try: diff --git a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py index 6fc8afa8c..4bb281831 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py +++ b/compute_endpoint/globus_compute_endpoint/engines/high_throughput/engine.py @@ -779,7 +779,11 @@ def submit( container_loc = self._get_container_location(packed_task) ser = serializer.serialize( - (execute_task, [task_id, packed_task, self.endpoint_id], {}) + ( + execute_task, + [task_id, packed_task, self.endpoint_id], + {"run_dir": self.run_dir}, + ) ) payload = Task(task_id, container_loc, ser).pack() assert self.outgoing_q # Placate mypy diff --git a/compute_endpoint/globus_compute_endpoint/engines/process_pool.py b/compute_endpoint/globus_compute_endpoint/engines/process_pool.py index 3bb0cd0ac..48292f704 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/process_pool.py +++ b/compute_endpoint/globus_compute_endpoint/engines/process_pool.py @@ -34,6 +34,7 @@ def start( self, *args, endpoint_id: t.Optional[uuid.UUID] = None, + run_dir: t.Optional[str] = None, results_passthrough: t.Optional[queue.Queue] = None, **kwargs, ) -> None: @@ -41,6 +42,7 @@ def start( Parameters ---------- endpoint_id: Endpoint UUID + run_dir: endpoint run directory results_passthrough: Queue to which packed results will be posted Returns ------- @@ -59,6 +61,7 @@ def start( if results_passthrough: self.results_passthrough = results_passthrough assert self.results_passthrough + self.set_working_dir(run_dir=run_dir) self._status_report_thread.start() diff --git a/compute_endpoint/globus_compute_endpoint/engines/thread_pool.py b/compute_endpoint/globus_compute_endpoint/engines/thread_pool.py index 49e04f198..0a6341125 100644 --- a/compute_endpoint/globus_compute_endpoint/engines/thread_pool.py +++ b/compute_endpoint/globus_compute_endpoint/engines/thread_pool.py @@ -32,6 +32,7 @@ def start( self, *args, endpoint_id: t.Optional[uuid.UUID] = None, + run_dir: t.Optional[str] = None, results_passthrough: t.Optional[queue.Queue] = None, **kwargs, ) -> None: @@ -39,8 +40,8 @@ def start( Parameters ---------- endpoint_id: Endpoint UUID + run_dir: endpoint run directory results_passthrough: Queue to which packed results will be posted - run_dir Not used Returns ------- """ @@ -50,6 +51,7 @@ def start( self.results_passthrough = results_passthrough assert self.results_passthrough + self.set_working_dir(run_dir=run_dir) # mypy think the thread can be none self._status_report_thread.start() diff --git a/compute_endpoint/tests/unit/test_engines.py b/compute_endpoint/tests/unit/test_engines.py index 94329bd7e..5289275c4 100644 --- a/compute_endpoint/tests/unit/test_engines.py +++ b/compute_endpoint/tests/unit/test_engines.py @@ -24,7 +24,7 @@ from parsl import HighThroughputExecutor from parsl.executors.high_throughput.interchange import ManagerLost from parsl.providers import KubernetesProvider -from tests.utils import double, kill_manager +from tests.utils import double, get_cwd, kill_manager logger = logging.getLogger(__name__) @@ -73,6 +73,37 @@ def test_engine_submit(engine_type: GlobusComputeEngineBase, engine_runner): assert future.result(timeout=5) == param * 2 +@pytest.mark.parametrize( + "engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine) +) +def test_engine_working_dir( + engine_type: GlobusComputeEngineBase, + engine_runner, + ez_pack_task, + serde, + task_uuid, +): + """working dir remains constant across multiple fn invocations + This test requires submitting the task payload so that the execute_task + wrapper is used which switches into the working_dir, which created + working_dir nesting when relative paths were used. + """ + engine = engine_runner(engine_type) + + task_args = (str(task_uuid), ez_pack_task(get_cwd), {}) + + future1 = engine.submit(*task_args) + unpacked1 = messagepack.unpack(future1.result()) # blocks; avoid race condition + + future2 = engine.submit(*task_args) # exact same task + unpacked2 = messagepack.unpack(future2.result()) + + # data is enough for test, but in error case, be kind to dev + cwd1 = serde.deserialize(unpacked1.data) + cwd2 = serde.deserialize(unpacked2.data) + assert cwd1 == cwd2, "working dir should be idempotent" + + @pytest.mark.parametrize( "engine_type", (ProcessPoolEngine, ThreadPoolEngine, GlobusComputeEngine) ) diff --git a/compute_endpoint/tests/unit/test_execute_task.py b/compute_endpoint/tests/unit/test_execute_task.py index 8eb944268..45194cee6 100644 --- a/compute_endpoint/tests/unit/test_execute_task.py +++ b/compute_endpoint/tests/unit/test_execute_task.py @@ -1,6 +1,7 @@ import logging from unittest import mock +import pytest from globus_compute_common import messagepack from globus_compute_endpoint.engines.helper import execute_task @@ -13,12 +14,18 @@ def divide(x, y): return x / y -def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task): +@pytest.mark.parametrize("run_dir", ("tmp", None, "$HOME")) +def test_bad_run_dir(endpoint_uuid, task_uuid, run_dir): + with pytest.raises(RuntimeError): + execute_task(task_uuid, b"", endpoint_uuid, run_dir=run_dir) + + +def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task, tmp_path): inp, outp = (10, 2), 5 task_bytes = ez_pack_task(divide, *inp) - packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) + packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path) assert isinstance(packed_result, bytes) result = messagepack.unpack(packed_result) @@ -31,11 +38,13 @@ def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task): assert serde.deserialize(result.data) == outp -def test_execute_task_with_exception(endpoint_uuid, task_uuid, ez_pack_task): +def test_execute_task_with_exception(endpoint_uuid, task_uuid, ez_pack_task, tmp_path): task_bytes = ez_pack_task(divide, 10, 0) with mock.patch(f"{_MOCK_BASE}log") as mock_log: - packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) + packed_result = execute_task( + task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path + ) assert mock_log.exception.called a, _k = mock_log.exception.call_args diff --git a/compute_endpoint/tests/unit/test_worker.py b/compute_endpoint/tests/unit/test_worker.py index 74a0e950e..2bd99b73c 100644 --- a/compute_endpoint/tests/unit/test_worker.py +++ b/compute_endpoint/tests/unit/test_worker.py @@ -79,7 +79,7 @@ def test_register_and_kill(test_worker): assert messages[1][0] == b"WRKR_DIE", messages -def test_execute_hello_world(test_worker): +def test_execute_hello_world(test_worker, tmp_path): task_id = uuid.uuid1() task_body = test_worker.serializer.serialize((hello_world, (), {})) internal_task = Task(task_id, "RAW", task_body) @@ -125,7 +125,7 @@ def test_execute_failing_function(test_worker): def test_execute_function_exceeding_result_size_limit( - test_worker, endpoint_uuid, task_uuid, ez_pack_task + test_worker, endpoint_uuid, task_uuid, ez_pack_task, tmp_path ): return_size = 10 @@ -133,7 +133,11 @@ def test_execute_function_exceeding_result_size_limit( with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: s_result = execute_task( - task_uuid, task_bytes, endpoint_uuid, result_size_limit=return_size - 2 + task_uuid, + task_bytes, + endpoint_uuid, + result_size_limit=return_size - 2, + run_dir=tmp_path, ) result = messagepack.unpack(s_result) @@ -145,12 +149,14 @@ def test_execute_function_exceeding_result_size_limit( assert mock_log.exception.called -def test_app_timeout(test_worker, endpoint_uuid, task_uuid, ez_pack_task): +def test_app_timeout(test_worker, endpoint_uuid, task_uuid, ez_pack_task, tmp_path): task_bytes = ez_pack_task(sleeper, 1) with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: with mock.patch.dict(os.environ, {"GC_TASK_TIMEOUT": "0.01"}): - packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) + packed_result = execute_task( + task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path + ) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) diff --git a/compute_endpoint/tests/unit/test_working_dir.py b/compute_endpoint/tests/unit/test_working_dir.py index b67f8a8da..f345dd190 100644 --- a/compute_endpoint/tests/unit/test_working_dir.py +++ b/compute_endpoint/tests/unit/test_working_dir.py @@ -4,9 +4,14 @@ import pytest from globus_compute_common import messagepack -from globus_compute_endpoint.engines.globus_compute import GlobusComputeEngine +from globus_compute_endpoint.engines import ( + GlobusComputeEngine, + ProcessPoolEngine, + ThreadPoolEngine, +) from globus_compute_endpoint.engines.helper import execute_task from parsl.executors import HighThroughputExecutor +from tests.utils import get_cwd @pytest.fixture() @@ -16,20 +21,54 @@ def reset_cwd(): os.chdir(cwd) -def get_cwd(): - import os - - return os.getcwd() +@pytest.mark.parametrize( + "engine", + (GlobusComputeEngine(address="127.0.0.1"), ThreadPoolEngine(), ProcessPoolEngine()), +) +def test_set_working_dir_default(engine, tmp_path): + """Verify that working dir is set to tasks dir in the run_dir by default for all + engines + """ + engine.set_working_dir(tmp_path) + assert engine.working_dir == os.path.join(tmp_path, "tasks_working_dir") + assert os.path.isabs(engine.working_dir) is True + + +@pytest.mark.parametrize( + "engine", + (GlobusComputeEngine(address="127.0.0.1"), ThreadPoolEngine(), ProcessPoolEngine()), +) +def test_set_working_dir_called(engine, tmp_path, endpoint_uuid): + """Verify that set_working_dir is called when engine.start() is called""" + engine.executor = mock.Mock() + engine.executor.status_polling_interval = 0 + engine.set_working_dir = mock.Mock(spec=engine.set_working_dir) + + engine.start(endpoint_id=endpoint_uuid, run_dir=tmp_path) + assert engine.set_working_dir.called + + +@pytest.mark.parametrize( + "engine", + (GlobusComputeEngine(address="127.0.0.1"), ThreadPoolEngine(), ProcessPoolEngine()), +) +def test_set_working_dir_relative(engine, tmp_path): + """Working_dir should be absolute and set relative to the endpoint run_dir""" + os.chdir(tmp_path) + engine.working_dir = "tasks_working_dir" + engine.set_working_dir(run_dir="foo") + assert engine.working_dir == os.path.join(tmp_path, "foo", "tasks_working_dir") + assert os.path.isabs(engine.working_dir) is True def test_default_working_dir(tmp_path): - """Verify that working dir is set to tasks dir in the run_dir by default""" + """Test working_dir relative to run_dir""" gce = GlobusComputeEngine( address="127.0.0.1", ) gce.executor.start = mock.MagicMock(spec=HighThroughputExecutor.start) gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) - assert gce.executor.run_dir == tmp_path + assert gce.run_dir == gce.executor.run_dir assert gce.working_dir == os.path.join(tmp_path, "tasks_working_dir") @@ -98,34 +137,6 @@ def test_execute_task_working_dir( assert os.getcwd() == os.fspath(tmp_path) -def test_non_existent_relative_working_dir( - tmp_path, reset_cwd, serde, endpoint_uuid, task_uuid, ez_pack_task -): - """This tests for execute_task creating a non-existent working dir - when a relative path is specified to the CWD""" - - os.chdir(tmp_path) - target_dir = f"{uuid.uuid4()}" - assert os.getcwd() != target_dir - - abs_target_dir = os.path.abspath(target_dir) - - task_bytes = ez_pack_task(get_cwd) - packed_result = execute_task( - task_uuid, - task_bytes, - endpoint_uuid, - run_dir=target_dir, - ) - - message = messagepack.unpack(packed_result) - assert message.task_id == task_uuid - assert message.data - result = serde.deserialize(message.data) - - assert result == abs_target_dir - - def test_sandbox(tmp_path, reset_cwd, serde, endpoint_uuid, task_uuid, ez_pack_task): os.chdir(tmp_path) diff --git a/compute_endpoint/tests/utils.py b/compute_endpoint/tests/utils.py index 797c5de19..65551e206 100644 --- a/compute_endpoint/tests/utils.py +++ b/compute_endpoint/tests/utils.py @@ -131,3 +131,9 @@ def get_env_vars(): import os return os.environ + + +def get_cwd(): + import os + + return os.getcwd()