Skip to content

Commit

Permalink
Rename PING to HEARTBEAT
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Jan 20, 2025
1 parent bd3bc1f commit 8ff5bd0
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
18 changes: 9 additions & 9 deletions src/_ert/forward_model_runner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class ClientConnectionError(Exception):
CONNECT_MSG = b"CONNECT"
DISCONNECT_MSG = b"DISCONNECT"
ACK_MSG = b"ACK"
PING_MSG = b"PING"
PING_TIMEOUT = 5.0
HEARTBEAT_MSG = b"BEAT"
HEARTBEAT_TIMEOUT = 5.0


class Client:
Expand Down Expand Up @@ -95,23 +95,23 @@ async def process_message(self, msg: str) -> None:
raise NotImplementedError("Only monitor can receive messages!")

async def _receiver(self) -> None:
last_ping_time: float | None = None
last_beat_time: float | None = None
while True:
try:
_, raw_msg = await self.socket.recv_multipart()
if raw_msg == ACK_MSG:
self._ack_event.set()
elif raw_msg == PING_MSG:
elif raw_msg == HEARTBEAT_MSG:
if (
last_ping_time
and (asyncio.get_running_loop().time() - last_ping_time)
> 2 * PING_TIMEOUT
last_beat_time
and (asyncio.get_running_loop().time() - last_beat_time)
> 2 * HEARTBEAT_TIMEOUT
):
await self.socket.send_multipart([b"", CONNECT_MSG])
logger.warning(
f"{self.dealer_id} pinging failed - reconnecting."
f"{self.dealer_id} heartbeat failed - reconnecting."
)
last_ping_time = asyncio.get_running_loop().time()
last_beat_time = asyncio.get_running_loop().time()
else:
await self.process_message(raw_msg.decode("utf-8"))
except zmq.ZMQError as exc:
Expand Down
12 changes: 6 additions & 6 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
ACK_MSG,
CONNECT_MSG,
DISCONNECT_MSG,
PING_MSG,
PING_TIMEOUT,
HEARTBEAT_MSG,
HEARTBEAT_TIMEOUT,
)
from ert.ensemble_evaluator import identifiers as ids

Expand Down Expand Up @@ -78,12 +78,12 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig):
self._dispatchers_empty: asyncio.Event = asyncio.Event()
self._dispatchers_empty.set()

async def _ping_clients(self) -> None:
async def _do_heartbeat_clients(self) -> None:
await self._server_started.wait()
while True:
if self._clients_connected:
await self._events_to_send.put(PING_MSG)
await asyncio.sleep(PING_TIMEOUT)
await self._events_to_send.put(HEARTBEAT_MSG)
await asyncio.sleep(HEARTBEAT_TIMEOUT)

async def _publisher(self) -> None:
await self._server_started.wait()
Expand Down Expand Up @@ -360,7 +360,7 @@ async def _start_running(self) -> None:
raise ValueError("no config for evaluator")
self._ee_tasks = [
asyncio.create_task(self._server(), name="server_task"),
asyncio.create_task(self._ping_clients(), name="ping_task"),
asyncio.create_task(self._do_heartbeat_clients(), name="beat_task"),
asyncio.create_task(
self._batch_events_into_buffer(), name="dispatcher_task"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ async def test_retry(unused_tcp_port):
assert mock_server.messages.count("test_3") == 1


async def test_reconnect_when_missing_ping(unused_tcp_port, monkeypatch):
async def test_reconnect_when_missing_heartbeat(unused_tcp_port, monkeypatch):
host = "localhost"
url = f"tcp://{host}:{unused_tcp_port}"
monkeypatch.setattr(_ert.forward_model_runner.client, "PING_TIMEOUT", 0.1)
monkeypatch.setattr(_ert.forward_model_runner.client, "HEARTBEAT_TIMEOUT", 0.1)

async with (
MockZMQServer(unused_tcp_port, signal=3) as mock_server,
Client(url) as client,
):
await client.send("start", retries=1)

await mock_server.do_ping()
await mock_server.do_heartbeat()
await asyncio.sleep(1)
await mock_server.do_ping()
await mock_server.do_heartbeat()
await client.send("stop", retries=1)

# when reconnection happens CONNECT message is sent again
Expand Down
6 changes: 3 additions & 3 deletions tests/ert/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ACK_MSG,
CONNECT_MSG,
DISCONNECT_MSG,
PING_MSG,
HEARTBEAT_MSG,
)
from _ert.threading import ErtThread
from ert.scheduler.event import FinishedEvent, StartedEvent
Expand Down Expand Up @@ -123,9 +123,9 @@ async def mock_zmq_server(self):
def signal(self, value):
self.value = value

async def do_ping(self):
async def do_heartbeat(self):
for dealer in self.dealers:
await self.router_socket.send_multipart([dealer, b"", PING_MSG])
await self.router_socket.send_multipart([dealer, b"", HEARTBEAT_MSG])

async def _handler(self):
while True:
Expand Down

0 comments on commit 8ff5bd0

Please sign in to comment.