diff --git a/.github/workflows/gce_test.yaml b/.github/workflows/gce_test.yaml
new file mode 100644
index 0000000000..a7cca3cf38
--- /dev/null
+++ b/.github/workflows/gce_test.yaml
@@ -0,0 +1,108 @@
+name: GlobusComputeExecutor tests
+
+on:
+ pull_request:
+ types:
+ - opened
+ - synchronize
+
+env:
+ PYTHON_VERSION: 3.11
+
+jobs:
+ main-test-suite:
+ runs-on: ubuntu-20.04
+ timeout-minutes: 60
+
+ steps:
+ - uses: actions/checkout@master
+
+ - name: Set up Python Environment
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ env.PYTHON_VERSION }}
+
+ - name: Collect Job Information
+ id: job-info
+ run: |
+ echo "Python Version: ${{ env.PYTHON_VERSION }} " >> ci_job_info.txt
+ echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
+ echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
+ echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
+ echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
+ echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
+ as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
+ echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT
+
+ - name: setup virtual env
+ run: |
+ make virtualenv
+ source .venv/bin/activate
+
+ - name: Non-requirements based install
+ run: |
+ # mpich: required by mpi4py which is in test-requirements for radical-pilot
+ sudo apt-get update -q
+ sudo apt-get install -qy mpich
+
+ - name: make deps clean_coverage
+ run: |
+ source .venv/bin/activate
+ make deps
+ make clean_coverage
+
+ # Temporary fix until fixes make it to a release
+ git clone -b main https://github.com/globus/globus-compute.git
+ pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint
+
+ - name: start globus_compute_endpoint
+ env:
+ GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
+ GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
+ run: |
+ source /home/runner/work/parsl/parsl/.venv/bin/activate
+ globus-compute-endpoint configure default
+ which globus-compute-endpoint
+ python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
+ python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
+ cat << EOF > /home/runner/.globus_compute/default/config.yaml
+ engine:
+ type: ThreadPoolEngine
+ max_workers: 4
+ EOF
+ cat /home/runner/.globus_compute/default/config.yaml
+ mkdir ~/.globus_compute/default/tasks_working_dir
+ globus-compute-endpoint start default
+ globus-compute-endpoint list
+ - name: make test
+ env:
+ GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
+ GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
+ run: |
+ source .venv/bin/activate
+ export GLOBUS_COMPUTE_ENDPOINT=$(jq -r .endpoint_id < ~/.globus_compute/default/endpoint.json)
+ echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"
+
+ export PARSL_TEST_PRESERVE_NUM_RUNS=7
+
+ make gce_test
+ ln -s pytest-parsl/parsltest-current test_runinfo
+
+ - name: stop globus_compute_endpoint
+ env:
+ GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
+ GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
+ run: |
+ source /home/runner/work/parsl/parsl/.venv/bin/activate
+ globus-compute-endpoint stop default
+
+ - name: Archive runinfo logs
+ if: ${{ always() }}
+ uses: actions/upload-artifact@v4
+ with:
+ name: runinfo-${{ env.PYTHON_VERSION }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
+ path: |
+ runinfo/
+ pytest-parsl/
+ ci_job_info.txt
+ compression-level: 9
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 4d2f37f715..ad127f2c23 100644
--- a/Makefile
+++ b/Makefile
@@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/
+.PHONY: gce_test
+gce_test: ## Run tests with GlobusComputeExecutor
+ pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10
+
.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
diff --git a/docs/reference.rst b/docs/reference.rst
index 3bba8acba9..919f8ac664 100644
--- a/docs/reference.rst
+++ b/docs/reference.rst
@@ -79,6 +79,7 @@ Executors
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor
+ parsl.executors.GlobusComputeExecutor
Manager Selectors
=================
diff --git a/docs/userguide/configuration/execution.rst b/docs/userguide/configuration/execution.rst
index ac7217032a..6dab0a3df1 100644
--- a/docs/userguide/configuration/execution.rst
+++ b/docs/userguide/configuration/execution.rst
@@ -86,11 +86,19 @@ Parsl currently supports the following executors:
4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine `_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing.
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.
+5. `parsl.executors.GlobusComputeExecutor`: This executor uses `Globus Compute `_
+as the execution backend. Globus Compute is a distributed Function as a Service (FaaS) platform that enables secure
+execution of functions on heterogeneous remote computers, from laptops to campus clusters, clouds, and supercomputers.
+Functions are executed on `Globus Compute Endpoints `_
+that can be `configured `_
+to scale execution on most batch schedulers automatically. Since Globus Compute Endpoints use `parsl.executors.HighThroughputExecutor`
+as the default execution system, this executor can be thought of as an extension of the `parsl.executors.HighThroughputExecutor` with
+a secure and reliable remote execution wrapper.
+
.. note::
Refer to :ref:`configuration-section` for information on how to configure these executors.
-Launchers
---------
Many LRMs offer mechanisms for spawning applications across nodes
diff --git a/mypy.ini b/mypy.ini
index 4b64a12de2..e46e11fd63 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -177,6 +177,9 @@ ignore_missing_imports = True
#[mypy-multiprocessing.synchronization.*]
#ignore_missing_imports = True
+[mypy-globus_compute_sdk.*]
+ignore_missing_imports = True
+
[mypy-pandas.*]
ignore_missing_imports = True
diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py
index bc29204502..81955aab76 100644
--- a/parsl/executors/__init__.py
+++ b/parsl/executors/__init__.py
@@ -1,4 +1,5 @@
from parsl.executors.flux.executor import FluxExecutor
+from parsl.executors.globus_compute import GlobusComputeExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.threads import ThreadPoolExecutor
@@ -8,4 +9,5 @@
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
- 'FluxExecutor']
+ 'FluxExecutor',
+ 'GlobusComputeExecutor']
diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py
new file mode 100644
index 0000000000..687026979d
--- /dev/null
+++ b/parsl/executors/globus_compute.py
@@ -0,0 +1,125 @@
+from __future__ import annotations
+
+import copy
+from concurrent.futures import Future
+from typing import Any, Callable, Dict
+
+import typeguard
+
+from parsl.errors import OptionalModuleMissing
+from parsl.executors.base import ParslExecutor
+from parsl.utils import RepresentationMixin
+
+try:
+ from globus_compute_sdk import Executor
+ _globus_compute_enabled = True
+except ImportError:
+ _globus_compute_enabled = False
+
+
+class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
+ """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints
+
+ GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
+ Refer to `globus-compute user documentation `_
+ and `reference documentation `_
+ for more details.
+
+ .. note::
+ As a remote execution system, Globus Compute relies on serialization to ship
+ tasks and results between the Parsl client side and the remote Globus Compute
+ Endpoint side. Serialization is unreliable across python versions, and
+ wrappers used by Parsl assume identical Parsl versions across on both sides.
+ We recommend using matching Python, Parsl and Globus Compute version on both
+ the client side and the endpoint side for stable behavior.
+
+ """
+
+ @typeguard.typechecked
+ def __init__(
+ self,
+ executor: Executor,
+ label: str = 'GlobusComputeExecutor',
+ ):
+ """
+ Parameters
+ ----------
+
+ executor: globus_compute_sdk.Executor
+ Pass a globus_compute_sdk Executor that will be used to execute
+ tasks on a globus_compute endpoint. Refer to `globus-compute docs
+ `_
+
+ label:
+ a label to name the executor
+ """
+ if not _globus_compute_enabled:
+ raise OptionalModuleMissing(
+ ['globus-compute-sdk'],
+ "GlobusComputeExecutor requires globus-compute-sdk installed"
+ )
+
+ super().__init__()
+ self.executor: Executor = executor
+ self.resource_specification = self.executor.resource_specification
+ self.user_endpoint_config = self.executor.user_endpoint_config
+ self.label = label
+
+ def start(self) -> None:
+ """ Start the Globus Compute Executor """
+ pass
+
+ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
+ """ Submit func to globus-compute
+
+
+ Parameters
+ ----------
+
+ func: Callable
+ Python function to execute remotely
+
+ resource_specification: Dict[str, Any]
+ Resource specification can be used specify MPI resources required by MPI applications on
+ Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*
+ to configure endpoints when the endpoint is a `Multi-User Endpoint
+ `_
+
+ args:
+ Args to pass to the function
+
+ kwargs:
+ kwargs to pass to the function
+
+ Returns
+ -------
+
+ Future
+ """
+ res_spec = copy.deepcopy(resource_specification or self.resource_specification)
+ # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
+ if res_spec:
+ user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
+ else:
+ user_endpoint_config = self.user_endpoint_config
+
+ try:
+ self.executor.resource_specification = res_spec
+ self.executor.user_endpoint_config = user_endpoint_config
+ return self.executor.submit(func, *args, **kwargs)
+ finally:
+ # Reset executor state to defaults set at configuration time
+ self.executor.resource_specification = self.resource_specification
+ self.executor.user_endpoint_config = self.user_endpoint_config
+
+ def shutdown(self):
+ """Clean-up the resources associated with the Executor.
+
+ GCE.shutdown will cancel all futures that have not yet registered with
+ Globus Compute and will not wait for the launched futures to complete.
+ This method explicitly shutsdown the result_watcher thread to avoid
+ it waiting for outstanding futures at thread exit.
+ """
+ self.executor.shutdown(wait=False, cancel_futures=True)
+ result_watcher = self.executor._get_result_watcher()
+ result_watcher.shutdown(wait=False, cancel_futures=True)
diff --git a/parsl/tests/configs/globus_compute.py b/parsl/tests/configs/globus_compute.py
new file mode 100644
index 0000000000..19f90b8c20
--- /dev/null
+++ b/parsl/tests/configs/globus_compute.py
@@ -0,0 +1,20 @@
+import os
+
+from globus_compute_sdk import Executor
+
+from parsl.config import Config
+from parsl.executors import GlobusComputeExecutor
+
+
+def fresh_config():
+
+ endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]
+
+ return Config(
+ executors=[
+ GlobusComputeExecutor(
+ executor=Executor(endpoint_id=endpoint_id),
+ label="globus_compute",
+ )
+ ]
+ )
diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py
index 4bcdde0b7a..4f1281025c 100644
--- a/parsl/tests/conftest.py
+++ b/parsl/tests/conftest.py
@@ -163,6 +163,10 @@ def pytest_configure(config):
'markers',
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
)
+ config.addinivalue_line(
+ 'markers',
+ 'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)'
+ )
@pytest.fixture(autouse=True, scope='session')
diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py
index 4616219be2..7def2b736c 100644
--- a/parsl/tests/test_error_handling/test_resource_spec.py
+++ b/parsl/tests/test_error_handling/test_resource_spec.py
@@ -1,3 +1,5 @@
+import pytest
+
import parsl
from parsl.app.app import python_app
from parsl.executors import WorkQueueExecutor
@@ -11,6 +13,7 @@ def double(x, parsl_resource_specification={}):
return x * 2
+@pytest.mark.issue_3620
def test_resource(n=2):
executors = parsl.dfk().executors
executor = None
diff --git a/parsl/tests/unit/test_globus_compute_executor.py b/parsl/tests/unit/test_globus_compute_executor.py
new file mode 100644
index 0000000000..b65ff92047
--- /dev/null
+++ b/parsl/tests/unit/test_globus_compute_executor.py
@@ -0,0 +1,104 @@
+import random
+from unittest import mock
+
+import pytest
+from globus_compute_sdk import Executor
+
+from parsl.executors import GlobusComputeExecutor
+
+
+@pytest.fixture
+def mock_ex():
+ # Not Parsl's job to test GC's Executor
+ yield mock.Mock(spec=Executor)
+
+
+@pytest.mark.local
+def test_gc_executor_mock_spec(mock_ex):
+ # a test of tests -- make sure we're using spec= in the mock
+ with pytest.raises(AttributeError):
+ mock_ex.aasdf()
+
+
+@pytest.mark.local
+def test_gc_executor_label_default(mock_ex):
+ gce = GlobusComputeExecutor(mock_ex)
+ assert gce.label == type(gce).__name__, "Expect reasonable default label"
+
+
+@pytest.mark.local
+def test_gc_executor_label(mock_ex, randomstring):
+ exp_label = randomstring()
+ gce = GlobusComputeExecutor(mock_ex, label=exp_label)
+ assert gce.label == exp_label
+
+
+@pytest.mark.local
+def test_gc_executor_resets_spec_after_submit(mock_ex, randomstring):
+ submit_res = {randomstring(): "some submit res"}
+ res = {"some": randomstring(), "spec": randomstring()}
+ mock_ex.resource_specification = res
+ mock_ex.user_endpoint_config = None
+ gce = GlobusComputeExecutor(mock_ex)
+
+ fn = mock.Mock()
+ orig_res = mock_ex.resource_specification
+ orig_uep = mock_ex.user_endpoint_config
+
+ def mock_submit(*a, **k):
+ assert mock_ex.resource_specification == submit_res, "Expect set for submission"
+ assert mock_ex.user_endpoint_config is None
+ mock_ex.submit.side_effect = mock_submit
+
+ gce.submit(fn, resource_specification=submit_res)
+
+ assert mock_ex.resource_specification == orig_res
+ assert mock_ex.user_endpoint_config is orig_uep
+
+
+@pytest.mark.local
+def test_gc_executor_resets_uep_after_submit(mock_ex, randomstring):
+ uep_conf = randomstring()
+ res = {"some": randomstring()}
+ gce = GlobusComputeExecutor(mock_ex)
+
+ fn = mock.Mock()
+ orig_res = mock_ex.resource_specification
+ orig_uep = mock_ex.user_endpoint_config
+
+ def mock_submit(*a, **k):
+
+ assert mock_ex.resource_specification == res, "Expect set for submission"
+ assert mock_ex.user_endpoint_config == uep_conf, "Expect set for submission"
+ mock_ex.submit.side_effect = mock_submit
+
+ gce.submit(fn, resource_specification={"user_endpoint_config": uep_conf, **res})
+
+ assert mock_ex.resource_specification == orig_res
+ assert mock_ex.user_endpoint_config is orig_uep
+
+
+@pytest.mark.local
+def test_gc_executor_happy_path(mock_ex, randomstring):
+ mock_fn = mock.Mock()
+ args = tuple(randomstring() for _ in range(random.randint(0, 3)))
+ kwargs = {randomstring(): randomstring() for _ in range(random.randint(0, 3))}
+
+ gce = GlobusComputeExecutor(mock_ex)
+ gce.submit(mock_fn, {}, *args, **kwargs)
+
+ assert mock_ex.submit.called, "Expect proxying of args to underlying executor"
+ found_a, found_k = mock_ex.submit.call_args
+ assert found_a[0] is mock_fn
+ assert found_a[1:] == args
+ assert found_k == kwargs
+
+
+@pytest.mark.local
+def test_gc_executor_shuts_down_asynchronously(mock_ex):
+ gce = GlobusComputeExecutor(mock_ex)
+ gce.shutdown()
+ assert mock_ex.shutdown.called
+ a, k = mock_ex.shutdown.call_args
+ assert k["wait"] is False
+ assert k["cancel_futures"] is True
diff --git a/setup.py b/setup.py
index 94551e2ba3..3e9a7f73fb 100755
--- a/setup.py
+++ b/setup.py
@@ -45,6 +45,7 @@
'flux': ['pyyaml', 'cffi', 'jsonschema'],
'proxystore': ['proxystore'],
'radical-pilot': ['radical.pilot==1.90', 'radical.utils==1.90'],
+ 'globus_compute': ['globus_compute_sdk>=2.34.0'],
# Disabling psi-j since github direct links are not allowed by pypi
# 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl']
}
diff --git a/test-requirements.txt b/test-requirements.txt
index 2b4d81a6dd..ff9bf55db8 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -10,6 +10,7 @@ types-mock
types-python-dateutil
types-requests
mpi4py
+globus-compute-sdk>=2.34.0
# sqlalchemy is needed for typechecking, so it's here
# as well as at runtime for optional monitoring execution