Skip to content

Commit

Permalink
Update pinging reconnect test
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Jan 18, 2025
1 parent ae76e35 commit 1261e9a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
32 changes: 18 additions & 14 deletions tests/ert/unit_tests/ensemble_evaluator/test_ensemble_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@ async def test_invalid_server():
async def test_successful_sending(unused_tcp_port):
host = "localhost"
url = f"tcp://{host}:{unused_tcp_port}"
messages_c1 = ["test_1", "test_2", "test_3"]
async with MockZMQServer(unused_tcp_port) as mock_server, Client(url) as c1:
for message in messages_c1:
await c1.send(message)
messages = ["test_1", "test_2", "test_3"]
async with MockZMQServer(unused_tcp_port) as mock_server, Client(url) as client:
for message in messages:
await client.send(message)

for msg in messages_c1:
for msg in messages:
assert msg in mock_server.messages


async def test_retry(unused_tcp_port):
host = "localhost"
url = f"tcp://{host}:{unused_tcp_port}"
client_connection_error_set = False
messages_c1 = ["test_1", "test_2", "test_3"]
messages = ["test_1", "test_2", "test_3"]
async with (
MockZMQServer(unused_tcp_port, signal=2) as mock_server,
Client(url, ack_timeout=0.5) as c1,
Client(url, ack_timeout=0.5) as client,
):
for message in messages_c1:
for message in messages:
try:
await c1.send(message, retries=1)
await client.send(message, retries=1)
except ClientConnectionError:
client_connection_error_set = True
mock_server.signal(0)
Expand All @@ -55,14 +55,18 @@ async def test_reconnect_when_missing_ping(unused_tcp_port):
url = f"tcp://{host}:{unused_tcp_port}"

async with (
MockZMQServer(unused_tcp_port, signal=3) as mock_server,
Client(url, ack_timeout=0.5) as c1,
MockZMQServer(unused_tcp_port, signal=0) as mock_server,
Client(url, ack_timeout=0.5) as client,
):
await c1.send("start", retries=1)
await client.send("start", retries=1)

await mock_server.do_ping()
await asyncio.sleep(2)
await mock_server.do_ping()
await c1.send("stop", retries=1)
await client.send("stop", retries=1)

print(mock_server.messages)
# when reconnection happens CONNECT message is sent again
assert mock_server.messages.count("CONNECT") == 2
assert mock_server.messages.count("DISCONNECT") == 1
assert "start" in mock_server.messages
assert "stop" in mock_server.messages
12 changes: 3 additions & 9 deletions tests/ert/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def __init__(self, port, signal=0):
signal = 0: normal operation
signal = 1: don't send ACK and don't receive messages
signal = 2: don't send ACK, but receive messages
signal = 3: normal operation and store all messages
"""
self.port = port
self.messages = []
Expand Down Expand Up @@ -134,18 +133,13 @@ async def _handler(self):
if frame == CONNECT_MSG:
await self.router_socket.send_multipart([dealer, b"", ACK_MSG])
self.dealers.add(dealer)
if self.value == 3:
self.messages.append(frame.decode("utf-8"))
elif frame == DISCONNECT_MSG:
await self.router_socket.send_multipart([dealer, b"", ACK_MSG])
self.dealers.discard(dealer)
if self.value == 3:
self.messages.append(frame.decode("utf-8"))
elif self.value != 1:
elif self.value == 0:
await self.router_socket.send_multipart([dealer, b"", ACK_MSG])
if self.value != 2:
self.messages.append(frame.decode("utf-8"))

if self.value != 1:
self.messages.append(frame.decode("utf-8"))
except asyncio.CancelledError:
break

Expand Down

0 comments on commit 1261e9a

Please sign in to comment.