diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index e17beb95923..9737f26e56c 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -64,7 +64,7 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig): self._max_batch_size: int = 500 self._batching_interval: float = 2.0 self._complete_batch: asyncio.Event = asyncio.Event() - self._server_started: asyncio.Event = asyncio.Event() + self._server_started: asyncio.Future[None] = asyncio.Future() self._clients_connected: set[bytes] = set() self._clients_empty: asyncio.Event = asyncio.Event() self._clients_empty.set() @@ -73,7 +73,7 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig): self._dispatchers_empty.set() async def _publisher(self) -> None: - await self._server_started.wait() + await self._server_started while True: event = await self._events_to_send.get() for identity in self._clients_connected: @@ -243,7 +243,7 @@ async def handle_dispatch(self, dealer: bytes, frame: bytes) -> None: await self._events.put(event) async def listen_for_messages(self) -> None: - await self._server_started.wait() + await self._server_started while True: try: dealer, _, frame = await self._router_socket.recv_multipart() @@ -285,9 +285,10 @@ async def _server(self) -> None: self._router_socket.bind(f"tcp://*:{self._config.router_port}") else: self._router_socket.bind(self._config.url) - self._server_started.set() + self._server_started.set_result(None) except zmq.error.ZMQError as e: logger.error(f"ZMQ error encountered {e} during evaluator initialization") + self._server_started.set_exception(e) raise try: await self._server_done.wait() @@ -350,7 +351,7 @@ async def _start_running(self) -> None: asyncio.create_task(self.listen_for_messages(), name="listener_task"), ] - await self._server_started.wait() + await self._server_started self._ee_tasks.append( asyncio.create_task( self._ensemble.evaluate( diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index 040c18d978b..f4fd22c9844 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -583,7 +583,7 @@ async def run_ensemble_evaluator_async( evaluator_task = asyncio.create_task( evaluator.run_and_get_successful_realizations() ) - await evaluator._server_started.wait() + await evaluator._server_started if not (await self.run_monitor(ee_config, ensemble.iteration)): return [] diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index 3959beaa273..e3f3dd7ba51 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -144,7 +144,7 @@ async def evaluator_to_use_fixture(make_ee_config): evaluator = EnsembleEvaluator(ensemble, make_ee_config(use_token=False)) evaluator._batching_interval = 0.5 # batching can be faster for tests run_task = asyncio.create_task(evaluator.run_and_get_successful_realizations()) - await evaluator._server_started.wait() + await evaluator._server_started yield evaluator evaluator.stop() await run_task diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py index 59fc4da5405..7a2ee7291cf 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py @@ -18,7 +18,7 @@ def evaluator_to_use(): async def run_evaluator(ensemble, ee_config): evaluator = EnsembleEvaluator(ensemble, ee_config) run_task = asyncio.create_task(evaluator.run_and_get_successful_realizations()) - await evaluator._server_started.wait() + await evaluator._server_started try: yield evaluator finally: diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py b/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py index 43abdad5d3a..83887bf3c04 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_scheduler.py @@ -66,7 +66,7 @@ def create_manifest_file(): run_task = asyncio.create_task( evaluator.run_and_get_successful_realizations() ) - await evaluator._server_started.wait() + await evaluator._server_started await _run_monitor() await run_task assert "Waiting for disk synchronization" in caplog.messages