Skip to content

Commit

Permalink
Pass heartbeat_period to Parsl HTEX
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmello committed Oct 24, 2023
1 parent 9ad4812 commit 34b250f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bug Fixes
^^^^^^^^^

- The ``GlobusComputeEngine`` has been updated to fully support the
``heartbeat_period`` parameter.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
self.max_workers_per_node = 1
if executor is None:
executor = HighThroughputExecutor( # type: ignore
*args, address=address, **kwargs
*args, address=address, heartbeat_period=heartbeat_period, **kwargs
)
self.executor = executor

Expand Down
4 changes: 2 additions & 2 deletions compute_endpoint/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def func():


@pytest.fixture
def engine_heartbeat() -> float:
return 0.1
def engine_heartbeat() -> int:
return 1


@pytest.fixture
Expand Down
14 changes: 14 additions & 0 deletions compute_endpoint/tests/unit/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import uuid

import parsl
import pytest
from globus_compute_common import messagepack
from globus_compute_common.messagepack.message_types import TaskTransition
Expand All @@ -19,6 +20,7 @@
from globus_compute_endpoint.engines.base import GlobusComputeEngineBase
from globus_compute_sdk.serialize import ComputeSerializer
from parsl.executors.high_throughput.interchange import ManagerLost
from pytest_mock import MockFixture
from tests.utils import double, ez_pack_function, slow_double

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -185,3 +187,15 @@ def test_serialized_engine_config_has_provider(engine_type: GlobusComputeEngineB
executor = res["executors"][0].get("executor") or res["executors"][0]

assert executor.get("provider")


def test_gcengine_pass_through_to_executor(mocker: MockFixture):
mock_executor = mocker.patch.object(parsl.HighThroughputExecutor, "__new__")

args = (1, "blah")
kwargs = {"address": "127.0.0.1", "heartbeat_period": 10, "foo": "bar"}
GlobusComputeEngine(*args, **kwargs)

a, k = mock_executor.call_args
assert a[1:] == args
assert kwargs == k
4 changes: 2 additions & 2 deletions compute_endpoint/tests/unit/test_status_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"engine_type",
(engines.ProcessPoolEngine, engines.ThreadPoolEngine, engines.GlobusComputeEngine),
)
def test_status_reporting(engine_type, engine_runner, engine_heartbeat: float):
def test_status_reporting(engine_type, engine_runner, engine_heartbeat: int):
engine = engine_runner(engine_type)

report = engine.get_status_report()
Expand All @@ -28,7 +28,7 @@ def test_status_reporting(engine_type, engine_runner, engine_heartbeat: float):

# Confirm heartbeats in regular intervals
for _i in range(3):
q_msg = results_q.get(timeout=1)
q_msg = results_q.get(timeout=1.1)
assert isinstance(q_msg, dict)

message = q_msg["message"]
Expand Down

0 comments on commit 34b250f

Please sign in to comment.