diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py index da6a6db4b..1427f089a 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py @@ -81,7 +81,7 @@ def __init__( self._thread_id = 0 # 0 == not yet started self._mq_chan: Channel | None = None self._awaiting_confirmation: dict[int, Future] = {} - self._outstanding: queue.Queue[tuple[Future, bytes]] = queue.Queue() + self._outstanding: queue.SimpleQueue[tuple[Future, bytes]] = queue.SimpleQueue() self._total_published = 0 self._delivery_tag_index = 0 self._stop_event = threading.Event() @@ -202,7 +202,8 @@ def _on_connection_closed(self, mq_conn: pika.BaseConnection, exc: Exception): if self._connection_tries == 1: # if 1, then we've not been stable for more than 60s (see _event_watcher) log.info(msg_fmt, self, exc) - log.warning(f"{self!r} Unable to sustain connection; retrying ...") + if not self._stop_event.is_set(): + log.warning(f"{self!r} Unable to sustain connection; retrying ...") self.status = RabbitPublisherStatus.closed mq_conn.ioloop.stop() diff --git a/compute_endpoint/tests/integration/test_rabbit_mq/test_result_q.py b/compute_endpoint/tests/integration/test_rabbit_mq/test_result_q.py index 08a46524b..64323459a 100644 --- a/compute_endpoint/tests/integration/test_rabbit_mq/test_result_q.py +++ b/compute_endpoint/tests/integration/test_rabbit_mq/test_result_q.py @@ -125,9 +125,7 @@ def test_broken_connection( rp.stop() -def test_disconnect_from_client_side( - start_result_q_publisher, start_result_q_subscriber -): +def test_disconnect_from_client_side(start_result_q_publisher): """Confirm that an exception is raised when the connection is closed Ideally we use rabbitmqadmin to close the connection, but that is less reliable here since the test env may not be have the util, and