Skip to content

Commit

Permalink
Close the test ioloops
Browse files Browse the repository at this point in the history
This closes the internal file descriptors -- the same concept as the commit
from earlier today.  Do so, now, for the tests.

Similarly, reduce the overhead by moving to a thread, rather than a
no-longer-necessary multiprocessing setup.
  • Loading branch information
khk-globus committed Jan 15, 2025
1 parent c29c0c0 commit 8218488
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def run(self) -> None:
finally:
if self._mq_conn and self._mq_conn.ioloop:
self._mq_conn.ioloop.close()
self._mq_conn = None

self._stop_event.set()

Expand Down Expand Up @@ -378,7 +379,6 @@ def _stop_ioloop(self):
self._mq_conn.close()
elif self._mq_conn.is_closed:
self._mq_conn.ioloop.stop()
self._mq_conn = None

def publish(self, message: bytes) -> Future[None]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def run(self):
finally:
if self._connection and self._connection.ioloop:
self._connection.ioloop.close()
self._connection = None

self._stop_event.set()
logger.debug("%s Shutdown complete", self)
Expand Down Expand Up @@ -346,7 +347,6 @@ def _stop_ioloop(self):
self._connection.close()
elif self._connection.is_closed:
self._connection.ioloop.stop()
self._connection = None

def _event_watcher(self):
"""Polls the stop_event periodically to trigger a shutdown"""
Expand Down
129 changes: 53 additions & 76 deletions compute_endpoint/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from __future__ import annotations

import multiprocessing
import os
import queue
import random
import string
import threading

import pika
import pika.exceptions
import pytest
from globus_compute_endpoint.endpoint.rabbit_mq import (
RabbitPublisherStatus,
ResultPublisher,
TaskQueueSubscriber,
)
Expand Down Expand Up @@ -115,29 +114,6 @@ def task_queue_info(rabbitmq_conn_url, tod_session_num, request) -> dict:
}


@pytest.fixture
def running_subscribers(request):
run_list = []

def cleanup():
for x in run_list:
try: # cannot check is_alive on closed proc
is_alive = x.is_alive()
except ValueError:
is_alive = False
if is_alive:
try:
x.stop()
except Exception as e:
x.terminate()
raise Exception(
f"{x.__class__.__name__} did not shutdown correctly"
) from e

request.addfinalizer(cleanup)
return run_list


@pytest.fixture(scope="session")
def ensure_result_queue(pika_conn_params):
queues_created = []
Expand Down Expand Up @@ -180,7 +156,7 @@ def start_task_q_subscriber(
task_queue_info,
ensure_task_queue,
):
running_subscribers: list[TaskQueueSubscriber] = []
qs_list: list[TaskQueueSubscriber] = []

def func(
*,
Expand All @@ -192,66 +168,60 @@ def func(
q_info = task_queue_info if override_params is None else override_params
ensure_task_queue(queue_opts={"queue": q_info["queue"]})

tqs = TaskQueueSubscriber(queue_info=q_info, pending_task_queue=task_queue)
tqs.start()
running_subscribers.append(tqs)
return tqs
qs = TaskQueueSubscriber(queue_info=q_info, pending_task_queue=task_queue)
qs.start()
qs_list.append(qs)
return qs

yield func

for sub in running_subscribers:
sub._stop_event.set()
sub.join()
while qs_list:
qs = qs_list.pop()
qs.stop()
qs.join()


@pytest.fixture
def start_result_q_subscriber(running_subscribers, pika_conn_params):
def start_result_q_subscriber(pika_conn_params):
qs_list: list[ResultQueueSubscriber] = []

def func(
*,
queue: multiprocessing.Queue | None = None,
kill_event: multiprocessing.Event | None = None,
rqueue: queue.SimpleQueue | None = None,
kill_event: threading.Event | None = None,
override_params: pika.connection.Parameters | None = None,
):
if kill_event is None:
kill_event = multiprocessing.Event()
if queue is None:
queue = multiprocessing.Queue()
result_q = ResultQueueSubscriber(
kill_event = threading.Event()
if rqueue is None:
rqueue = queue.SimpleQueue()
qs = ResultQueueSubscriber(
conn_params=pika_conn_params if not override_params else override_params,
external_queue=queue,
external_queue=rqueue,
kill_event=kill_event,
)
result_q.start()
running_subscribers.append(result_q)
if not result_q.test_class_ready.wait(10):
qs.start()
qs_list.append(qs)
if not qs.test_class_ready.wait(10):
raise AssertionError("Result Queue subscriber failed to initialize")
return result_q

return func


@pytest.fixture
def running_publishers(request):
run_list = []
return qs

def cleanup():
for x in run_list:
if x.status is RabbitPublisherStatus.connected:
if hasattr(x, "stop"):
x.stop() # ResultPublisher
else:
x.close() # TaskQueuePublisher (from tests)
yield func

request.addfinalizer(cleanup)
return run_list
while qs_list:
qs = qs_list.pop()
qs.stop()
qs.join()


@pytest.fixture
def start_result_q_publisher(
running_publishers,
result_queue_info,
ensure_result_queue,
):

qp_list: list[ResultPublisher] = []

def func(
*,
override_params: dict | None = None,
Expand All @@ -268,24 +238,28 @@ def func(
queue_opts = {"queue": queue_name, "durable": True}
ensure_result_queue(exchange_opts=exchange_opts, queue_opts=queue_opts)

result_pub = ResultPublisher(queue_info=q_info)
result_pub.start()
qp = ResultPublisher(queue_info=q_info)
qp.start()
qp_list.append(qp)
if queue_purge: # Make sure queue is empty
try_assert(lambda: result_pub._mq_chan is not None)
result_pub._mq_chan.queue_purge(q_info["queue"])
running_publishers.append(result_pub)
return result_pub
try_assert(lambda: qp._mq_chan is not None)
qp._mq_chan.queue_purge(q_info["queue"])
return qp

yield func

return func
while qp_list:
qp_list.pop().stop(timeout=None)


@pytest.fixture
def start_task_q_publisher(
running_publishers,
task_queue_info,
ensure_task_queue,
default_endpoint_id,
):
qp_list: list[TaskQueuePublisher] = []

def func(
*,
override_params: pika.connection.Parameters | None = None,
Expand All @@ -301,14 +275,17 @@ def func(
queue_opts = {"queue": queue_name, "arguments": {"x-expires": 30 * 1000}}
ensure_task_queue(exchange_opts=exchange_opts, queue_opts=queue_opts)

task_pub = TaskQueuePublisher(queue_info=q_info)
task_pub.connect()
qp = TaskQueuePublisher(queue_info=q_info)
qp.connect()
qp_list.append(qp)
if queue_purge: # Make sure queue is empty
task_pub._channel.queue_purge(q_info["queue"])
running_publishers.append(task_pub)
return task_pub
qp._channel.queue_purge(q_info["queue"])
return qp

yield func

return func
while qp_list:
qp_list.pop().close()


@pytest.fixture(scope="session")
Expand Down
Loading

0 comments on commit 8218488

Please sign in to comment.