diff --git a/src/_ert/forward_model_runner/client.py b/src/_ert/forward_model_runner/client.py index 91d0fc22df3..bee558fcb39 100644 --- a/src/_ert/forward_model_runner/client.py +++ b/src/_ert/forward_model_runner/client.py @@ -18,8 +18,8 @@ class ClientConnectionError(Exception): CONNECT_MSG = b"CONNECT" DISCONNECT_MSG = b"DISCONNECT" ACK_MSG = b"ACK" -PING_MSG = b"PING" -PING_TIMEOUT = 5.0 +HEARTBEAT_MSG = b"BEAT" +HEARTBEAT_TIMEOUT = 5.0 class Client: @@ -95,23 +95,23 @@ async def process_message(self, msg: str) -> None: raise NotImplementedError("Only monitor can receive messages!") async def _receiver(self) -> None: - last_ping_time: float | None = None + last_beat_time: float | None = None while True: try: _, raw_msg = await self.socket.recv_multipart() if raw_msg == ACK_MSG: self._ack_event.set() - elif raw_msg == PING_MSG: + elif raw_msg == HEARTBEAT_MSG: if ( - last_ping_time - and (asyncio.get_running_loop().time() - last_ping_time) - > 2 * PING_TIMEOUT + last_beat_time + and (asyncio.get_running_loop().time() - last_beat_time) + > 2 * HEARTBEAT_TIMEOUT ): await self.socket.send_multipart([b"", CONNECT_MSG]) logger.warning( - f"{self.dealer_id} pinging failed - reconnecting." + f"{self.dealer_id} heartbeat failed - reconnecting." ) - last_ping_time = asyncio.get_running_loop().time() + last_beat_time = asyncio.get_running_loop().time() else: await self.process_message(raw_msg.decode("utf-8")) except zmq.ZMQError as exc: diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index 1684bb660c0..bad05358b44 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -31,8 +31,8 @@ ACK_MSG, CONNECT_MSG, DISCONNECT_MSG, - PING_MSG, - PING_TIMEOUT, + HEARTBEAT_MSG, + HEARTBEAT_TIMEOUT, ) from ert.ensemble_evaluator import identifiers as ids @@ -78,12 +78,12 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig): self._dispatchers_empty: asyncio.Event = asyncio.Event() self._dispatchers_empty.set() - async def _ping_clients(self) -> None: + async def _do_heartbeat_clients(self) -> None: await self._server_started.wait() while True: if self._clients_connected: - await self._events_to_send.put(PING_MSG) - await asyncio.sleep(PING_TIMEOUT) + await self._events_to_send.put(HEARTBEAT_MSG) + await asyncio.sleep(HEARTBEAT_TIMEOUT) async def _publisher(self) -> None: await self._server_started.wait() @@ -360,7 +360,7 @@ async def _start_running(self) -> None: raise ValueError("no config for evaluator") self._ee_tasks = [ asyncio.create_task(self._server(), name="server_task"), - asyncio.create_task(self._ping_clients(), name="ping_task"), + asyncio.create_task(self._do_heartbeat_clients(), name="beat_task"), asyncio.create_task( self._batch_events_into_buffer(), name="dispatcher_task" ), diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_client.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_client.py index b62a37a1f22..af8148a10ff 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_client.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_client.py @@ -51,10 +51,10 @@ async def test_retry(unused_tcp_port): assert mock_server.messages.count("test_3") == 1 -async def test_reconnect_when_missing_ping(unused_tcp_port, monkeypatch): +async def test_reconnect_when_missing_heartbeat(unused_tcp_port, monkeypatch): host = "localhost" url = f"tcp://{host}:{unused_tcp_port}" - monkeypatch.setattr(_ert.forward_model_runner.client, "PING_TIMEOUT", 0.1) + monkeypatch.setattr(_ert.forward_model_runner.client, "HEARTBEAT_TIMEOUT", 0.1) async with ( MockZMQServer(unused_tcp_port, signal=3) as mock_server, @@ -62,9 +62,9 @@ async def test_reconnect_when_missing_ping(unused_tcp_port, monkeypatch): ): await client.send("start", retries=1) - await mock_server.do_ping() + await mock_server.do_heartbeat() await asyncio.sleep(1) - await mock_server.do_ping() + await mock_server.do_heartbeat() await client.send("stop", retries=1) # when reconnection happens CONNECT message is sent again diff --git a/tests/ert/utils.py b/tests/ert/utils.py index e80ff3957e4..7a1ecff1452 100644 --- a/tests/ert/utils.py +++ b/tests/ert/utils.py @@ -13,7 +13,7 @@ ACK_MSG, CONNECT_MSG, DISCONNECT_MSG, - PING_MSG, + HEARTBEAT_MSG, ) from _ert.threading import ErtThread from ert.scheduler.event import FinishedEvent, StartedEvent @@ -123,9 +123,9 @@ async def mock_zmq_server(self): def signal(self, value): self.value = value - async def do_ping(self): + async def do_heartbeat(self): for dealer in self.dealers: - await self.router_socket.send_multipart([dealer, b"", PING_MSG]) + await self.router_socket.send_multipart([dealer, b"", HEARTBEAT_MSG]) async def _handler(self): while True: