diff --git a/changelog.d/20231031_230517_yadudoc1729_task_retries_1.rst b/changelog.d/20231031_230517_yadudoc1729_task_retries_1.rst index 568d938a4..e8ad5f115 100644 --- a/changelog.d/20231031_230517_yadudoc1729_task_retries_1.rst +++ b/changelog.d/20231031_230517_yadudoc1729_task_retries_1.rst @@ -5,8 +5,8 @@ New Functionality node failures occur. Traceback history from all prior attempts is supplied if the last retry attempt fails. Here's a snippet from config.yaml: - ``` - engine: - type: GlobusComputeEngine - max_retries_on_system_failure: 2 - ``` \ No newline at end of file +.. code-block:: yaml + + engine: + type: GlobusComputeEngine + max_retries_on_system_failure: 2 \ No newline at end of file diff --git a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py index 15f4b2c2e..0fae45e4a 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py +++ b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py @@ -1,6 +1,4 @@ import logging - -# import tempfile import uuid from queue import Queue @@ -16,29 +14,6 @@ @pytest.fixture def gc_engine_with_retries(tmp_path): - ep_id = uuid.uuid4() - engine = GlobusComputeEngine( - address="127.0.0.1", - max_workers=1, - heartbeat_period=1, - heartbeat_threshold=1, - max_retries_on_system_failure=2, - provider=LocalProvider( - init_blocks=0, - min_blocks=0, - max_blocks=1, - ), - strategy=SimpleStrategy(interval=0.1, max_idletime=0), - ) - queue = Queue() - engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue) - - yield engine - engine.shutdown() - - -@pytest.fixture -def gc_engine_no_retries(tmp_path): ep_id = uuid.uuid4() engine = GlobusComputeEngine( address="127.0.0.1", @@ -55,14 +30,14 @@ def gc_engine_no_retries(tmp_path): ) engine._status_report_thread.reporting_period = 1 queue = Queue() - engine.start(endpoint_id=ep_id, run_dir=str(tmp_path), results_passthrough=queue) - + engine.start(endpoint_id=ep_id, run_dir=tmp_path, results_passthrough=queue) yield engine engine.shutdown() -def test_gce_kill_manager(gc_engine_no_retries): - engine = gc_engine_no_retries +def test_gce_kill_manager(gc_engine_with_retries): + engine = gc_engine_with_retries + engine.max_retries_on_system_failure = 0 queue = engine.results_passthrough task_id = uuid.uuid1() serializer = ComputeSerializer() @@ -84,7 +59,6 @@ def test_gce_kill_manager(gc_engine_no_retries): flag = False for _i in range(4): q_msg = queue.get(timeout=2) - logging.warning(f"GOT message: {q_msg=}") assert isinstance(q_msg, dict) packed_result_q = q_msg["message"] @@ -99,11 +73,11 @@ def test_gce_kill_manager(gc_engine_no_retries): break assert flag, "Result message missing" - engine.shutdown() def test_success_after_1_fail(gc_engine_with_retries, tmp_path): engine = gc_engine_with_retries + engine.max_retries_on_system_failure = 2 fail_count = 1 queue = engine.results_passthrough task_id = uuid.uuid1() @@ -140,6 +114,7 @@ def test_success_after_1_fail(gc_engine_with_retries, tmp_path): def test_repeated_fail(gc_engine_with_retries, tmp_path): engine = gc_engine_with_retries + engine.max_retries_on_system_failure = 2 fail_count = 3 queue = engine.results_passthrough task_id = uuid.uuid1()