Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for GlobusComputeExecutor #3607

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions .github/workflows/gce_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
name: GlobusComputeExecutor tests

on:
pull_request:
types:
- opened
- synchronize

env:
PYTHON_VERSION: 3.11

jobs:
main-test-suite:
runs-on: ubuntu-20.04
timeout-minutes: 60

steps:
- uses: actions/checkout@master

- name: Set up Python Environment
uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Collect Job Information
id: job-info
run: |
echo "Python Version: ${{ env.PYTHON_VERSION }} " >> ci_job_info.txt
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT

- name: setup virtual env
run: |
make virtualenv
source .venv/bin/activate

- name: Non-requirements based install
run: |
# mpich: required by mpi4py which is in test-requirements for radical-pilot
sudo apt-get update -q
sudo apt-get install -qy mpich

- name: make deps clean_coverage
run: |
source .venv/bin/activate
make deps
make clean_coverage

# Temporary fix until fixes make it to a release
git clone -b main https://github.com/globus/globus-compute.git
pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint
khk-globus marked this conversation as resolved.
Show resolved Hide resolved

- name: start globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source /home/runner/work/parsl/parsl/.venv/bin/activate
globus-compute-endpoint configure default
which globus-compute-endpoint
python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
cat << EOF > /home/runner/.globus_compute/default/config.yaml
engine:
type: ThreadPoolEngine
benclifford marked this conversation as resolved.
Show resolved Hide resolved
max_workers: 4
EOF
cat /home/runner/.globus_compute/default/config.yaml
mkdir ~/.globus_compute/default/tasks_working_dir
globus-compute-endpoint start default
globus-compute-endpoint list
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
- name: make test
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
export GLOBUS_COMPUTE_ENDPOINT=$(jq -r .endpoint_id < ~/.globus_compute/default/endpoint.json)
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"

export PARSL_TEST_PRESERVE_NUM_RUNS=7

make gce_test
ln -s pytest-parsl/parsltest-current test_runinfo

khk-globus marked this conversation as resolved.
Show resolved Hide resolved
- name: stop globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source /home/runner/work/parsl/parsl/.venv/bin/activate
globus-compute-endpoint stop default

- name: Archive runinfo logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: runinfo-${{ env.PYTHON_VERSION }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
path: |
runinfo/
pytest-parsl/
ci_job_info.txt
compression-level: 9
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/

.PHONY: gce_test
gce_test: ## Run tests with GlobusComputeExecutor
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Executors
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor
parsl.executors.GlobusComputeExecutor

Manager Selectors
=================
Expand Down
24 changes: 24 additions & 0 deletions docs/userguide/configuration/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ running on a login node and uses the `parsl.providers.SlurmProvider` to interfac

.. literalinclude:: ../../../parsl/configs/frontera.py

Globus Compute (Multisite)
--------------------------

Globus Compute is a distributed Function as a Service (FaaS) platform that enables secure
execution of functions on heterogeneous remote computers, from laptops to campus clusters, clouds, and supercomputers.
Functions are executed on `Globus Compute Endpoints <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html>`_
that can be `configured <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoint_examples.html>`_
for most clusters/HPC systems. The example configuration below allows task submission
to Globus Compute's hosted tutorial endpoint.

.. literalinclude:: ../../../parsl/configs/gc_tutorial.py

.. note:: The Globus Compute tutorial endpoint runs Python 3.11. We recommend
using the same Python environment to avoid potential serialization errors
caused by environment mismatches. Globus Compute will raise a warning if any
environment version mismatches are detected although minor version differences
may not cause faults (eg, Python3.11.7 vs Python3.11.8)

The configuration below specifies two remote endpoints, one at `SDSC's Expanse Supercomputer <https://www.sdsc.edu/services/hpc/expanse/>`_
and the other at `NERSC's Perlmutter Supercomputer <https://docs.nersc.gov/systems/perlmutter/architecture/>`_.

.. literalinclude:: ../../../parsl/configs/gc_multisite.py



Kubernetes Clusters
-------------------
Expand Down
10 changes: 9 additions & 1 deletion docs/userguide/configuration/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,19 @@ Parsl currently supports the following executors:
4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine <https://ccl.cse.nd.edu/software/taskvine/>`_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing.
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.

5. `parsl.executors.GlobusComputeExecutor`: This executor uses `Globus Compute <https://www.globus.org/compute>`_
as the execution backend. Globus Compute is a distributed Function as a Service (FaaS) platform that enables secure
execution of functions on heterogeneous remote computers, from laptops to campus clusters, clouds, and supercomputers.
Functions are executed on `Globus Compute Endpoints <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html>`_
that can be `configured <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoint_examples.html>`_
to scale execution on most batch schedulers automatically. Since Globus Compute Endpoints use `parsl.executors.HighThroughputExecutor`
as the default execution system, this executor can be thought of as an extension of the `parsl.executors.HighThroughputExecutor` with
a secure and reliable remote execution wrapper.

.. note::
Refer to :ref:`configuration-section` for information on how to configure these executors.


Launchers
---------

Many LRMs offer mechanisms for spawning applications across nodes
Expand Down
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ ignore_missing_imports = True
#[mypy-multiprocessing.synchronization.*]
#ignore_missing_imports = True

[mypy-globus_compute_sdk.*]
ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

Expand Down
27 changes: 27 additions & 0 deletions parsl/configs/gc_multisite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from globus_compute_sdk import Executor

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor
from parsl.usage_tracking.levels import LEVEL_1

# Please start your own endpoint on perlmutter following instructions below to use this config:
# https://globus-compute.readthedocs.io/en/stable/endpoints/endpoint_examples.html#perlmutter-nersc
perlmutter_endpoint = 'YOUR_PERLMUTTER_ENDPOINT_UUID'

# Please start your own endpoint on expanse following instructions below to use this config:
# https://globus-compute.readthedocs.io/en/stable/endpoints/endpoint_examples.html#expanse-sdsc
expanse_endpoint = 'YOUR_EXPANSE_ENDPOINT_UUID'

config = Config(
executors=[
GlobusComputeExecutor(
executor=Executor(endpoint_id=perlmutter_endpoint),
label="Perlmutter",
),
GlobusComputeExecutor(
executor=Executor(endpoint_id=expanse_endpoint),
label="Expanse",
),
],
usage_tracking=LEVEL_1,
)
18 changes: 18 additions & 0 deletions parsl/configs/gc_tutorial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from globus_compute_sdk import Executor

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor
from parsl.usage_tracking.levels import LEVEL_1

# Public tutorial endpoint
tutorial_endpoint = '4b116d3c-1703-4f8f-9f6f-39921e5864df'

config = Config(
executors=[
GlobusComputeExecutor(
executor=Executor(endpoint_id=tutorial_endpoint),
label="Tutorial_Endpoint_py3.11",
)
],
usage_tracking=LEVEL_1,
)
4 changes: 3 additions & 1 deletion parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from parsl.executors.flux.executor import FluxExecutor
from parsl.executors.globus_compute import GlobusComputeExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.threads import ThreadPoolExecutor
Expand All @@ -8,4 +9,5 @@
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
'FluxExecutor',
'GlobusComputeExecutor']
125 changes: 125 additions & 0 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import copy
from concurrent.futures import Future
from typing import Any, Callable, Dict

import typeguard

from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

try:
from globus_compute_sdk import Executor
_globus_compute_enabled = True
except ImportError:
_globus_compute_enabled = False


class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints

GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
for more details.

.. note::
As a remote execution system, Globus Compute relies on serialization to ship
tasks and results between the Parsl client side and the remote Globus Compute
Endpoint side. Serialization is unreliable across python versions, and
wrappers used by Parsl assume identical Parsl versions across on both sides.
We recommend using matching Python, Parsl and Globus Compute version on both
the client side and the endpoint side for stable behavior.

"""

@typeguard.typechecked
def __init__(
self,
executor: Executor,
label: str = 'GlobusComputeExecutor',
):
"""
Parameters
----------

executor: globus_compute_sdk.Executor
Pass a globus_compute_sdk Executor that will be used to execute
tasks on a globus_compute endpoint. Refer to `globus-compute docs
<https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_

label:
a label to name the executor
"""
if not _globus_compute_enabled:
raise OptionalModuleMissing(
['globus-compute-sdk'],
"GlobusComputeExecutor requires globus-compute-sdk installed"
)
yadudoc marked this conversation as resolved.
Show resolved Hide resolved

super().__init__()
self.executor: Executor = executor
self.resource_specification = self.executor.resource_specification
self.user_endpoint_config = self.executor.user_endpoint_config
self.label = label

def start(self) -> None:
""" Start the Globus Compute Executor """
pass
yadudoc marked this conversation as resolved.
Show resolved Hide resolved

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
""" Submit func to globus-compute


Parameters
----------

func: Callable
Python function to execute remotely

resource_specification: Dict[str, Any]
Resource specification can be used specify MPI resources required by MPI applications on
Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*
to configure endpoints when the endpoint is a `Multi-User Endpoint
<https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_

args:
Args to pass to the function

kwargs:
kwargs to pass to the function

Returns
-------

Future
"""
res_spec = copy.deepcopy(resource_specification or self.resource_specification)
# Pop user_endpoint_config since it is illegal in resource_spec for globus_compute
if res_spec:
user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)
else:
user_endpoint_config = self.user_endpoint_config

try:
self.executor.resource_specification = res_spec
self.executor.user_endpoint_config = user_endpoint_config
return self.executor.submit(func, *args, **kwargs)
finally:
# Reset executor state to defaults set at configuration time
self.executor.resource_specification = self.resource_specification
self.executor.user_endpoint_config = self.user_endpoint_config

def shutdown(self):
"""Clean-up the resources associated with the Executor.

GCE.shutdown will cancel all futures that have not yet registered with
Globus Compute and will not wait for the launched futures to complete.
This method explicitly shutsdown the result_watcher thread to avoid
it waiting for outstanding futures at thread exit.
"""
self.executor.shutdown(wait=False, cancel_futures=True)
result_watcher = self.executor._get_result_watcher()
result_watcher.shutdown(wait=False, cancel_futures=True)
20 changes: 20 additions & 0 deletions parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os

from globus_compute_sdk import Executor

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor


def fresh_config():

endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]

return Config(
executors=[
GlobusComputeExecutor(
executor=Executor(endpoint_id=endpoint_id),
label="globus_compute",
)
]
)
Loading