From 090322fe3ddececc10b3c57ff89c84c3ea94166f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 28 Oct 2024 18:07:11 +0000 Subject: [PATCH 1/4] Make ZMQ, UDP and filesystem monitoring routers send via radios This PR is intended to consolidate monitoring message sending in the monitoring radio code. This is a step towards removing Python multiprocessing from the monitoring code base (see issue #2343) by making it clearer how to change to a different message send implementation (by swapping out the radio implementation and configuration) Compare to how the interchange forwards HTEXRadio messages onwards via some other radio (which right now is always the ZMQRadioSender) -- rather than having its own ZMQ code. --- parsl/monitoring/monitoring.py | 4 +++- parsl/monitoring/router.py | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a1b20f2705..08d771036a 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -270,6 +270,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: new_dir = f"{base_path}/new/" logger.debug("Creating new and tmp paths under %s", base_path) + target_radio = MultiprocessingQueueRadioSender(q) + os.makedirs(tmp_dir, exist_ok=True) os.makedirs(new_dir, exist_ok=True) @@ -285,7 +287,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: message = pickle.load(f) logger.debug("Message received is: %s", message) assert isinstance(message, tuple) - q.put(cast(TaggedMonitoringMessage, message)) + target_radio.send(cast(TaggedMonitoringMessage, message)) os.remove(full_path_filename) except Exception: logger.exception("Exception processing %s - probably will be retried next iteration", filename) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 1d4b522e82..a45500fc23 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,6 +14,7 @@ import zmq from parsl.log_utils import set_file_logger +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -55,7 +56,6 @@ def __init__(self, The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. resource_msgs : multiprocessing.Queue A multiprocessing queue to receive messages to be routed onwards to the database process - exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. """ @@ -98,7 +98,7 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.resource_msgs = resource_msgs + self.target_radio = MultiprocessingQueueRadioSender(resource_msgs) self.exit_event = exit_event @wrap_with_logs(target="monitoring_router") @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.resource_msgs.put(resource_msg) + self.target_radio.send(resource_msg) except socket.timeout: pass @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None: data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put(msg) + self.target_radio.send(msg) last_msg_received_time = time.time() except socket.timeout: pass @@ -160,7 +160,7 @@ def start_zmq_listener(self) -> None: assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg) assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg) - self.resource_msgs.put(msg) + self.target_radio.send(msg) except zmq.Again: pass except Exception: From def2bda940251025d00844bfceefa59b815eb4eb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 31 Oct 2024 10:33:08 +0000 Subject: [PATCH 2/4] Remove ability to set monitoring log directory separate from rundir Prior to this PR, MonitoringHub had a logdir parameter which let the log directory be set separately from the DFK-level run directory. Other Parsl components generally don't let the user set this unless there is a specific reason. So this PR removes that feature, reducing the amount of state to be threaded around. When reading this patch, note that what the DFK calls the rundir is a different directory vs what a Config object calls the rundir. --- parsl/dataflow/dflow.py | 2 -- parsl/monitoring/db_manager.py | 12 ++++++------ parsl/monitoring/monitoring.py | 19 ++++++------------- parsl/monitoring/router.py | 12 ++++++------ 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 83ea2e31cf..6cec168b5d 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -111,8 +111,6 @@ def __init__(self, config: Config) -> None: self.monitoring = config.monitoring if self.monitoring: - if self.monitoring.logdir is None: - self.monitoring.logdir = self.run_dir self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index abdb038e79..d396c6bbb9 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -279,7 +279,7 @@ class Resource(Base): class DatabaseManager: def __init__(self, db_url: str = 'sqlite:///runinfo/monitoring.db', - logdir: str = '.', + run_dir: str = '.', logging_level: int = logging.INFO, batching_interval: float = 1, batching_threshold: float = 99999, @@ -287,12 +287,12 @@ def __init__(self, self.workflow_end = False self.workflow_start_message: Optional[MonitoringMessage] = None - self.logdir = logdir - os.makedirs(self.logdir, exist_ok=True) + self.run_dir = run_dir + os.makedirs(self.run_dir, exist_ok=True) logger.propagate = False - set_file_logger("{}/database_manager.log".format(self.logdir), level=logging_level, + set_file_logger("{}/database_manager.log".format(self.run_dir), level=logging_level, format_string="%(asctime)s.%(msecs)03d %(name)s:%(lineno)d [%(levelname)s] [%(threadName)s %(thread)d] %(message)s", name="database_manager") @@ -681,7 +681,7 @@ def close(self) -> None: def dbm_starter(exception_q: mpq.Queue, resource_msgs: mpq.Queue, db_url: str, - logdir: str, + run_dir: str, logging_level: int) -> None: """Start the database manager process @@ -692,7 +692,7 @@ def dbm_starter(exception_q: mpq.Queue, try: dbm = DatabaseManager(db_url=db_url, - logdir=logdir, + run_dir=run_dir, logging_level=logging_level) logger.info("Starting dbm in dbm starter") dbm.start(resource_msgs) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 08d771036a..e82c8fb688 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -44,7 +44,6 @@ def __init__(self, workflow_name: Optional[str] = None, workflow_version: Optional[str] = None, logging_endpoint: Optional[str] = None, - logdir: Optional[str] = None, monitoring_debug: bool = False, resource_monitoring_enabled: bool = True, resource_monitoring_interval: float = 30): # in seconds @@ -73,8 +72,6 @@ def __init__(self, The database connection url for monitoring to log the information. These URLs follow RFC-1738, and can include username, password, hostname, database name. Default: sqlite, in the configured run_dir. - logdir : str - Parsl log directory paths. Logs and temp files go here. Default: '.' monitoring_debug : Bool Enable monitoring debug logging. Default: False resource_monitoring_enabled : boolean @@ -96,7 +93,6 @@ def __init__(self, self.hub_port_range = hub_port_range self.logging_endpoint = logging_endpoint - self.logdir = logdir self.monitoring_debug = monitoring_debug self.workflow_name = workflow_name @@ -109,13 +105,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No logger.debug("Starting MonitoringHub") - if self.logdir is None: - self.logdir = "." - if self.logging_endpoint is None: self.logging_endpoint = f"sqlite:///{os.fspath(config_run_dir)}/monitoring.db" - os.makedirs(self.logdir, exist_ok=True) + os.makedirs(dfk_run_dir, exist_ok=True) self.monitoring_hub_active = True @@ -151,7 +144,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No "hub_address": self.hub_address, "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, - "logdir": self.logdir, + "run_dir": dfk_run_dir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, }, name="Monitoring-Router-Process", @@ -161,7 +154,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.dbm_proc = ForkProcess(target=dbm_starter, args=(self.exception_q, self.resource_msgs,), - kwargs={"logdir": self.logdir, + kwargs={"run_dir": dfk_run_dir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, }, @@ -172,7 +165,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid) self.filesystem_proc = ForkProcess(target=filesystem_receiver, - args=(self.logdir, self.resource_msgs, dfk_run_dir), + args=(self.resource_msgs, dfk_run_dir), name="Monitoring-Filesystem-Process", daemon=True ) @@ -258,8 +251,8 @@ def close(self) -> None: @wrap_with_logs -def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: str) -> None: - logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), +def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None: + logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log", name="monitoring_filesystem_radio", level=logging.INFO) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index a45500fc23..82a78e677f 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -31,7 +31,7 @@ def __init__(self, zmq_port_range: Tuple[int, int] = (55050, 56000), monitoring_hub_address: str = "127.0.0.1", - logdir: str = ".", + run_dir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds resource_msgs: mpq.Queue, @@ -48,7 +48,7 @@ def __init__(self, zmq_port_range : tuple(int, int) The MonitoringHub picks ports at random from the range which will be used by Hub. Default: (55050, 56000) - logdir : str + run_dir : str Parsl log directory paths. Logs and temp files go here. Default: '.' logging_level : int Logging level as defined in the logging module. Default: logging.INFO @@ -59,8 +59,8 @@ def __init__(self, exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. """ - os.makedirs(logdir, exist_ok=True) - self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), + os.makedirs(run_dir, exist_ok=True) + self.logger = set_file_logger("{}/monitoring_router.log".format(run_dir), name="monitoring_router", level=logging_level) self.logger.debug("Monitoring router starting") @@ -187,14 +187,14 @@ def router_starter(*, udp_port: Optional[int], zmq_port_range: Tuple[int, int], - logdir: str, + run_dir: str, logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, udp_port=udp_port, zmq_port_range=zmq_port_range, - logdir=logdir, + run_dir=run_dir, logging_level=logging_level, resource_msgs=resource_msgs, exit_event=exit_event) From 1d55ca3d8f806c121430575deab3b136635b928b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 30 Oct 2024 14:50:04 +0000 Subject: [PATCH 3/4] put each radio type in its own module in preparation for moving radio-specific radio code into these modules too --- parsl/executors/base.py | 2 +- .../executors/high_throughput/interchange.py | 3 +- parsl/monitoring/monitoring.py | 2 +- parsl/monitoring/radios.py | 191 ------------------ parsl/monitoring/radios/__init__.py | 0 parsl/monitoring/radios/base.py | 13 ++ parsl/monitoring/radios/filesystem.py | 52 +++++ parsl/monitoring/radios/htex.py | 57 ++++++ parsl/monitoring/radios/multiprocessing.py | 17 ++ parsl/monitoring/radios/udp.py | 56 +++++ parsl/monitoring/radios/zmq.py | 17 ++ parsl/monitoring/remote.py | 10 +- parsl/monitoring/router.py | 2 +- 13 files changed, 221 insertions(+), 201 deletions(-) delete mode 100644 parsl/monitoring/radios.py create mode 100644 parsl/monitoring/radios/__init__.py create mode 100644 parsl/monitoring/radios/base.py create mode 100644 parsl/monitoring/radios/filesystem.py create mode 100644 parsl/monitoring/radios/htex.py create mode 100644 parsl/monitoring/radios/multiprocessing.py create mode 100644 parsl/monitoring/radios/udp.py create mode 100644 parsl/monitoring/radios/zmq.py diff --git a/parsl/executors/base.py b/parsl/executors/base.py index a112b9eb00..fc97db89d3 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index be38ccf168..ffc767f45f 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -19,7 +19,8 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.zmq import ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index e82c8fb688..3fbe5736ba 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -14,7 +14,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py deleted file mode 100644 index 14dc046557..0000000000 --- a/parsl/monitoring/radios.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -import os -import pickle -import socket -import uuid -from abc import ABCMeta, abstractmethod -from multiprocessing.queues import Queue - -import zmq - -logger = logging.getLogger(__name__) - - -class MonitoringRadioSender(metaclass=ABCMeta): - @abstractmethod - def send(self, message: object) -> None: - pass - - -class FilesystemRadioSender(MonitoringRadioSender): - """A MonitoringRadioSender that sends messages over a shared filesystem. - - The messsage directory structure is based on maildir, - https://en.wikipedia.org/wiki/Maildir - - The writer creates a message in tmp/ and then when it is fully - written, moves it atomically into new/ - - The reader ignores tmp/ and only reads and deletes messages from - new/ - - This avoids a race condition of reading partially written messages. - - This radio is likely to give higher shared filesystem load compared to - the UDP radio, but should be much more reliable. - """ - - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): - logger.info("filesystem based monitoring channel initializing") - self.base_path = f"{run_dir}/monitor-fs-radio/" - self.tmp_path = f"{self.base_path}/tmp" - self.new_path = f"{self.base_path}/new" - - os.makedirs(self.tmp_path, exist_ok=True) - os.makedirs(self.new_path, exist_ok=True) - - def send(self, message: object) -> None: - logger.info("Sending a monitoring message via filesystem") - - unique_id = str(uuid.uuid4()) - - tmp_filename = f"{self.tmp_path}/{unique_id}" - new_filename = f"{self.new_path}/{unique_id}" - buffer = message - - # this will write the message out then atomically - # move it into new/, so that a partially written - # file will never be observed in new/ - with open(tmp_filename, "wb") as f: - pickle.dump(buffer, f) - os.rename(tmp_filename, new_filename) - - -class HTEXRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - logger.info("htex-based monitoring channel initialising") - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - - import parsl.executors.high_throughput.monitoring_info - - result_queue = parsl.executors.high_throughput.monitoring_info.result_queue - - # this message needs to go in the result queue tagged so that it is treated - # i) as a monitoring message by the interchange, and then further more treated - # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO - # which is the implicit default for messages from the interchange) - - # for the interchange, the outer wrapper, this needs to be a dict: - - interchange_msg = { - 'type': 'monitoring', - 'payload': message - } - - if result_queue: - result_queue.put(pickle.dumps(interchange_msg)) - else: - logger.error("result_queue is uninitialized - cannot put monitoring message") - - return - - -class UDPRadioSender(MonitoringRadioSender): - - def __init__(self, monitoring_url: str, timeout: int = 10): - """ - Parameters - ---------- - - monitoring_url : str - URL of the form ://: - timeout : int - timeout, default=10s - """ - self.monitoring_url = monitoring_url - self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) - - self.sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) # UDP - self.sock.settimeout(self.sock_timeout) - - def send(self, message: object) -> None: - """ Sends a message to the UDP receiver - - Parameter - --------- - - message: object - Arbitrary pickle-able object that is to be sent - - Returns: - None - """ - try: - buffer = pickle.dumps(message) - except Exception: - logging.exception("Exception during pickling", exc_info=True) - return - - try: - self.sock.sendto(buffer, (self.ip, self.port)) - except socket.timeout: - logging.error("Could not send message within timeout limit") - return - return - - -class MultiprocessingQueueRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over a multiprocessing Queue. - This radio is intended to be used on the submit side, where components - in the submit process, or processes launched by multiprocessing, will have - access to a Queue shared with the monitoring database code (bypassing the - monitoring router). - """ - def __init__(self, queue: Queue) -> None: - self.queue = queue - - def send(self, message: object) -> None: - self.queue.put(message) - - -class ZMQRadioSender(MonitoringRadioSender): - """A monitoring radio which connects over ZMQ. This radio is not - thread-safe, because its use of ZMQ is not thread-safe. - """ - - def __init__(self, hub_address: str, hub_zmq_port: int) -> None: - self._hub_channel = zmq.Context().socket(zmq.DEALER) - self._hub_channel.set_hwm(0) - self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") - - def send(self, message: object) -> None: - self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/radios/__init__.py b/parsl/monitoring/radios/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py new file mode 100644 index 0000000000..2bb799f256 --- /dev/null +++ b/parsl/monitoring/radios/base.py @@ -0,0 +1,13 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Optional + +_db_manager_excepts: Optional[Exception] + +logger = logging.getLogger(__name__) + + +class MonitoringRadioSender(metaclass=ABCMeta): + @abstractmethod + def send(self, message: object) -> None: + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py new file mode 100644 index 0000000000..accff87d36 --- /dev/null +++ b/parsl/monitoring/radios/filesystem.py @@ -0,0 +1,52 @@ +import logging +import os +import pickle +import uuid + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. + + The messsage directory structure is based on maildir, + https://en.wikipedia.org/wiki/Maildir + + The writer creates a message in tmp/ and then when it is fully + written, moves it atomically into new/ + + The reader ignores tmp/ and only reads and deletes messages from + new/ + + This avoids a race condition of reading partially written messages. + + This radio is likely to give higher shared filesystem load compared to + the UDP radio, but should be much more reliable. + """ + + def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + logger.info("filesystem based monitoring channel initializing") + self.base_path = f"{run_dir}/monitor-fs-radio/" + self.tmp_path = f"{self.base_path}/tmp" + self.new_path = f"{self.base_path}/new" + + os.makedirs(self.tmp_path, exist_ok=True) + os.makedirs(self.new_path, exist_ok=True) + + def send(self, message: object) -> None: + logger.info("Sending a monitoring message via filesystem") + + unique_id = str(uuid.uuid4()) + + tmp_filename = f"{self.tmp_path}/{unique_id}" + new_filename = f"{self.new_path}/{unique_id}" + buffer = message + + # this will write the message out then atomically + # move it into new/, so that a partially written + # file will never be observed in new/ + with open(tmp_filename, "wb") as f: + pickle.dump(buffer, f) + os.rename(tmp_filename, new_filename) diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py new file mode 100644 index 0000000000..bdb893b303 --- /dev/null +++ b/parsl/monitoring/radios/htex.py @@ -0,0 +1,57 @@ +import logging +import pickle + +from parsl.monitoring.radios.base import MonitoringRadioSender + +logger = logging.getLogger(__name__) + + +class HTEXRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + logger.info("htex-based monitoring channel initialising") + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + + import parsl.executors.high_throughput.monitoring_info + + result_queue = parsl.executors.high_throughput.monitoring_info.result_queue + + # this message needs to go in the result queue tagged so that it is treated + # i) as a monitoring message by the interchange, and then further more treated + # as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO + # which is the implicit default for messages from the interchange) + + # for the interchange, the outer wrapper, this needs to be a dict: + + interchange_msg = { + 'type': 'monitoring', + 'payload': message + } + + if result_queue: + result_queue.put(pickle.dumps(interchange_msg)) + else: + logger.error("result_queue is uninitialized - cannot put monitoring message") + + return diff --git a/parsl/monitoring/radios/multiprocessing.py b/parsl/monitoring/radios/multiprocessing.py new file mode 100644 index 0000000000..6274bbfca8 --- /dev/null +++ b/parsl/monitoring/radios/multiprocessing.py @@ -0,0 +1,17 @@ +from multiprocessing.queues import Queue + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class MultiprocessingQueueRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over a multiprocessing Queue. + This radio is intended to be used on the submit side, where components + in the submit process, or processes launched by multiprocessing, will have + access to a Queue shared with the monitoring database code (bypassing the + monitoring router). + """ + def __init__(self, queue: Queue) -> None: + self.queue = queue + + def send(self, message: object) -> None: + self.queue.put(message) diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py new file mode 100644 index 0000000000..f2a652e9ac --- /dev/null +++ b/parsl/monitoring/radios/udp.py @@ -0,0 +1,56 @@ +import logging +import pickle +import socket + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class UDPRadioSender(MonitoringRadioSender): + + def __init__(self, monitoring_url: str, timeout: int = 10): + """ + Parameters + ---------- + + monitoring_url : str + URL of the form ://: + timeout : int + timeout, default=10s + """ + self.monitoring_url = monitoring_url + self.sock_timeout = timeout + try: + self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) + self.port = int(port) + except Exception: + raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + + self.sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) # UDP + self.sock.settimeout(self.sock_timeout) + + def send(self, message: object) -> None: + """ Sends a message to the UDP receiver + + Parameter + --------- + + message: object + Arbitrary pickle-able object that is to be sent + + Returns: + None + """ + try: + buffer = pickle.dumps(message) + except Exception: + logging.exception("Exception during pickling", exc_info=True) + return + + try: + self.sock.sendto(buffer, (self.ip, self.port)) + except socket.timeout: + logging.error("Could not send message within timeout limit") + return + return diff --git a/parsl/monitoring/radios/zmq.py b/parsl/monitoring/radios/zmq.py new file mode 100644 index 0000000000..397c943568 --- /dev/null +++ b/parsl/monitoring/radios/zmq.py @@ -0,0 +1,17 @@ +import zmq + +from parsl.monitoring.radios.base import MonitoringRadioSender + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index d72b54dc3c..530b39f935 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,12 +7,10 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import ( - FilesystemRadioSender, - HTEXRadioSender, - MonitoringRadioSender, - UDPRadioSender, -) +from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.filesystem import FilesystemRadioSender +from parsl.monitoring.radios.htex import HTEXRadioSender +from parsl.monitoring.radios.udp import UDPRadioSender from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 82a78e677f..a3ef46b8eb 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,7 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.radios import MultiprocessingQueueRadioSender +from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.types import TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle From c66230b3181a71cfaf8461eabd80d592b5c08450 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 28 Mar 2024 13:34:44 +0000 Subject: [PATCH 4/4] work on pluggable API for worker-side monitoring radio patch so far is overloading self.monitoring_radio for two different uses that should be clarified: submit side radio and worker side radio its a bit complicated for having a default radio (UDPRadio or HTEXRadio) even when monitoring is turned off: I should check that the radio receiver does not get activated in this case: for example: * test that broken radio activation causes a startup error * test that configuration with broken radio doesn't cause a startup error when monitoringhub is not configured zmq radio should always listen, and be the place where all radio receivers send their data. udp radio and filesystem radio should turn into separate... something... processes? for prototyping i guess it doesn't matter where I make them live, but the most behaviour preserving would keep them somehow separated? --- docs/userguide/monitoring.rst | 1 - parsl/configs/ASPIRE1.py | 1 - parsl/dataflow/dflow.py | 16 +- parsl/executors/base.py | 42 +++++- parsl/executors/high_throughput/executor.py | 16 +- .../executors/high_throughput/mpi_executor.py | 14 +- parsl/executors/taskvine/executor.py | 2 + parsl/executors/threads.py | 1 + parsl/executors/workqueue/executor.py | 2 + parsl/monitoring/monitoring.py | 60 ++++---- parsl/monitoring/radios/base.py | 28 +++- parsl/monitoring/radios/filesystem.py | 36 ++++- parsl/monitoring/radios/htex.py | 25 +++- parsl/monitoring/radios/udp.py | 138 +++++++++++++++++- parsl/monitoring/remote.py | 61 ++++---- parsl/monitoring/router.py | 65 +-------- parsl/tests/configs/htex_local_alternate.py | 1 - .../tests/configs/local_threads_monitoring.py | 1 - parsl/tests/conftest.py | 2 + parsl/tests/test_monitoring/test_basic.py | 13 +- .../test_htex_init_blocks_vs_monitoring.py | 1 - parsl/tests/test_monitoring/test_stdouterr.py | 1 - 22 files changed, 356 insertions(+), 171 deletions(-) diff --git a/docs/userguide/monitoring.rst b/docs/userguide/monitoring.rst index 3404862450..02b3177ca7 100644 --- a/docs/userguide/monitoring.rst +++ b/docs/userguide/monitoring.rst @@ -42,7 +42,6 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por ], monitoring=MonitoringHub( hub_address=address_by_hostname(), - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=10, ), diff --git a/parsl/configs/ASPIRE1.py b/parsl/configs/ASPIRE1.py index 7792f15dba..017e1061d7 100644 --- a/parsl/configs/ASPIRE1.py +++ b/parsl/configs/ASPIRE1.py @@ -34,7 +34,6 @@ ], monitoring=MonitoringHub( hub_address=address_by_interface('ib0'), - hub_port=55055, resource_monitoring_interval=10, ), strategy='simple', diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 6cec168b5d..2739a99bce 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -744,11 +744,10 @@ def launch_task(self, task_record: TaskRecord) -> Future: kwargs=kwargs, x_try_id=try_id, x_task_id=task_id, - monitoring_hub_url=self.monitoring.monitoring_hub_url, + radio_config=executor.remote_monitoring_radio_config, run_id=self.run_id, logging_level=wrapper_logging_level, sleep_dur=self.monitoring.resource_monitoring_interval, - radio_mode=executor.radio_mode, monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) @@ -1146,6 +1145,19 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.submit_monitoring_radio = self.monitoring.radio + # this will modify the radio config object: it will add relevant parameters needed + # for the particular remote radio sender to communicate back + logger.info("starting monitoring receiver " + f"for executor {executor} " + f"with remote monitoring radio config {executor.remote_monitoring_radio_config}") + executor.monitoring_receiver = self.monitoring.start_receiver(executor.remote_monitoring_radio_config, + ip=self.monitoring.hub_address, + run_dir=self.run_dir) + # TODO: this is a weird way to start the receiver. + # Rather than in executor.start, but there's a tangle here + # trying to make the executors usable in a non-pure-parsl + # context where there is no DFK to grab config out of? + # (and no monitoring...) if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index fc97db89d3..94c485d4be 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -1,3 +1,4 @@ +import logging import os from abc import ABCMeta, abstractmethod from concurrent.futures import Future @@ -5,7 +6,14 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.monitoring.radios.udp import UDPRadio + +logger = logging.getLogger(__name__) class ParslExecutor(metaclass=ABCMeta): @@ -19,15 +27,13 @@ class ParslExecutor(metaclass=ABCMeta): no arguments and re-raises any thrown exception. In addition to the listed methods, a ParslExecutor instance must always - have a member field: + have these member fields: label: str - a human readable label for the executor, unique with respect to other executors. - Per-executor monitoring behaviour can be influenced by exposing: - - radio_mode: str - a string describing which radio mode should be used to - send task resource data back to the submit side. + remote_monitoring_radio_config: RadioConfig describing how tasks on this executor + should report task resource status An executor may optionally expose: @@ -45,11 +51,16 @@ class ParslExecutor(metaclass=ABCMeta): """ label: str = "undefined" - radio_mode: str = "udp" def __init__( self, *, + + # TODO: I'd like these two to go away but they're needed right now + # to configure the interchange monitoring radio, that is + # in addition to the submit and worker monitoring radios (!). They + # are effectivley a third monitoring radio config, though, so what + # should that look like for the interchange? hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, submit_monitoring_radio: Optional[MonitoringRadioSender] = None, @@ -58,10 +69,19 @@ def __init__( ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port + + # these are parameters for the monitoring radio to be used on the remote side + # eg. in workers - to send results back, and they should end up encapsulated + # inside a RadioConfig. self.submit_monitoring_radio = submit_monitoring_radio + self.remote_monitoring_radio_config: RadioConfig = UDPRadio() + self.run_dir = os.path.abspath(run_dir) self.run_id = run_id + # will be set externally later, which is pretty ugly + self.monitoring_receiver: Optional[MonitoringRadioReceiver] = None + def __enter__(self) -> Self: return self @@ -94,7 +114,13 @@ def shutdown(self) -> None: This includes all attached resources such as workers and controllers. """ - pass + logger.debug("Starting base executor shutdown") + # logger.error(f"BENC: monitoring receiver on {self} is {self.monitoring_receiver}") + if self.monitoring_receiver is not None: + logger.debug("Starting monitoring receiver shutdown") + self.monitoring_receiver.shutdown() + logger.debug("Done with monitoring receiver shutdown") + logger.debug("Done with base executor shutdown") def monitor_resources(self) -> bool: """Should resource monitoring happen for tasks on running on this executor? diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index a1def0466a..1d6c474d3e 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -29,6 +29,8 @@ ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus +from parsl.monitoring.radios.base import RadioConfig +from parsl.monitoring.radios.htex import HTEXRadio from parsl.process_loggers import wrap_with_logs from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -253,11 +255,13 @@ def __init__(self, worker_logdir_root: Optional[str] = None, manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): logger.debug("Initializing HighThroughputExecutor") BlockProviderExecutor.__init__(self, provider=provider, block_error_handler=block_error_handler) + self.label = label self.worker_debug = worker_debug self.storage_access = storage_access @@ -300,6 +304,12 @@ def __init__(self, self._workers_per_node = 1 # our best guess-- we do not have any provider hints self._task_counter = 0 + + if remote_monitoring_radio_config is not None: + self.remote_monitoring_radio_config = remote_monitoring_radio_config + else: + self.remote_monitoring_radio_config = HTEXRadio() + self.worker_ports = worker_ports self.worker_port_range = worker_port_range self.interchange_proc: Optional[subprocess.Popen] = None @@ -322,7 +332,6 @@ def __init__(self, interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD self.interchange_launch_cmd = interchange_launch_cmd - radio_mode = "htex" enable_mpi_mode: bool = False mpi_launcher: str = "mpiexec" @@ -832,6 +841,9 @@ def shutdown(self, timeout: float = 10.0): logger.info("Closing command client") self.command_client.close() + # TODO: implement this across all executors + super().shutdown() + logger.info("Finished HighThroughputExecutor shutdown attempt") def get_usage_information(self): diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index 04b8cf5197..919fcc066f 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -15,6 +15,7 @@ from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus from parsl.launchers import SimpleLauncher +from parsl.monitoring.radios.base import RadioConfig from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -66,7 +67,8 @@ def __init__(self, worker_logdir_root: Optional[str] = None, mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, - encrypted: bool = False): + encrypted: bool = False, + remote_monitoring_radio_config: Optional[RadioConfig] = None): super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers @@ -92,7 +94,15 @@ def __init__(self, address_probe_timeout=address_probe_timeout, worker_logdir_root=worker_logdir_root, block_error_handler=block_error_handler, - encrypted=encrypted + encrypted=encrypted, + + # TODO: + # worker-side monitoring in MPI-style code is probably going to be + # broken - resource monitoring won't see any worker processes + # most likely, as so perhaps it should have worker resource + # monitoring disabled like the thread pool executor has? + # (for related but different reasons...) + remote_monitoring_radio_config=remote_monitoring_radio_config ) self.enable_mpi_mode = True self.mpi_launcher = mpi_launcher diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index a15a444d2c..efde5dfa15 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -605,6 +605,8 @@ def shutdown(self, *args, **kwargs): self._finished_task_queue.close() self._finished_task_queue.join_thread() + super().shutdown() + logger.debug("TaskVine shutdown completed") @wrap_with_logs diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index 9b3b0df5ce..a532478f58 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -73,6 +73,7 @@ def shutdown(self, block=True): """ logger.debug("Shutting down executor, which involves waiting for running tasks to complete") self.executor.shutdown(wait=block) + super().shutdown() logger.debug("Done with executor shutdown") def monitor_resources(self): diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 155c990ab5..e1fcd42deb 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -716,6 +716,8 @@ def shutdown(self, *args, **kwargs): self.collector_queue.close() self.collector_queue.join_thread() + super().shutdown() + logger.debug("Work Queue shutdown completed") @wrap_with_logs diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 3fbe5736ba..f9734df033 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -8,12 +8,13 @@ import time from multiprocessing import Event from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError +from parsl.monitoring.radios.base import RadioConfig from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage @@ -38,7 +39,6 @@ class MonitoringHub(RepresentationMixin): def __init__(self, hub_address: str, - hub_port: Optional[int] = None, hub_port_range: Tuple[int, int] = (55050, 56000), workflow_name: Optional[str] = None, @@ -52,17 +52,10 @@ def __init__(self, ---------- hub_address : str The ip address at which the workers will be able to reach the Hub. - hub_port : int - The UDP port to which workers will be able to deliver messages to - the monitoring router. - Note that despite the similar name, this is not related to - hub_port_range. - Default: None hub_port_range : tuple(int, int) The port range for a ZMQ channel from an executor process (for example, the interchange in the High Throughput Executor) to deliver monitoring messages to the monitoring router. - Note that despite the similar name, this is not related to hub_port. Default: (55050, 56000) workflow_name : str The name for the workflow. Default to the name of the parsl script @@ -89,7 +82,6 @@ def __init__(self, raise _db_manager_excepts self.hub_address = hub_address - self.hub_port = hub_port self.hub_port_range = hub_port_range self.logging_endpoint = logging_endpoint @@ -121,7 +113,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No # in the future, Queue will allow runtime subscripts. if TYPE_CHECKING: - comm_q: Queue[Union[Tuple[int, int], str]] + comm_q: Queue[Union[int, str]] else: comm_q: Queue @@ -142,7 +134,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, - "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "run_dir": dfk_run_dir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, @@ -164,14 +155,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.dbm_proc.start() logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid) - self.filesystem_proc = ForkProcess(target=filesystem_receiver, - args=(self.resource_msgs, dfk_run_dir), - name="Monitoring-Filesystem-Process", - daemon=True - ) - self.filesystem_proc.start() - logger.info("Started filesystem radio receiver process %s", self.filesystem_proc.pid) - self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) try: @@ -186,9 +169,23 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No logger.error("MonitoringRouter sent an error message: %s", comm_q_result) raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}") - udp_port, zmq_port = comm_q_result + zmq_port = comm_q_result + + self.zmq_port = zmq_port - self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # need to initialize radio configs, perhaps first time a radio config is used + # in each executor? (can't do that at startup because executor list is dynamic, + # don't know all the executors till later) + # self.radio_config.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port) + # How can this config be populated properly? + # There's a UDP port chosen right now by the monitoring router and + # sent back a line above... + # What does that look like for other radios? htexradio has no specific config at all, + # filesystem radio has a path (that should have been created?) for config, and a loop + # that needs to be running, started in this start method. + # so something like... radio_config.receive() generates the appropriate receiver object? + # which has a shutdown method on it for later. and also updates radio_config itself so + # it has the right info to send across the wire? or some state driving like that? logger.info("Monitoring Hub initialized") @@ -218,7 +215,6 @@ def close(self) -> None: ) self.router_proc.terminate() self.dbm_proc.terminate() - self.filesystem_proc.terminate() logger.info("Setting router termination event") self.router_exit_event.set() logger.info("Waiting for router to terminate") @@ -235,13 +231,6 @@ def close(self) -> None: self.dbm_proc.close() logger.debug("Finished waiting for DBM termination") - # should this be message based? it probably doesn't need to be if - # we believe we've received all messages - logger.info("Terminating filesystem radio receiver process") - self.filesystem_proc.terminate() - self.filesystem_proc.join() - self.filesystem_proc.close() - logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() self.exception_q.join_thread() @@ -249,6 +238,17 @@ def close(self) -> None: self.resource_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") + def start_receiver(self, radio_config: RadioConfig, ip: str, run_dir: str) -> Any: + """somehow start a radio receiver here and update radioconfig to be sent over the wire, without + losing the info we need to shut down that receiver later... + """ + r = radio_config.create_receiver(ip=ip, run_dir=run_dir, resource_msgs=self.resource_msgs) + logger.info(f"BENC: created receiver {r}") + # assert r is not None + return r + # ... that is, a thing we need to do a shutdown call on at shutdown, a "shutdownable"? without + # expecting any more structure on it? + @wrap_with_logs def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None: diff --git a/parsl/monitoring/radios/base.py b/parsl/monitoring/radios/base.py index 2bb799f256..6ed85820f3 100644 --- a/parsl/monitoring/radios/base.py +++ b/parsl/monitoring/radios/base.py @@ -1,13 +1,33 @@ -import logging from abc import ABCMeta, abstractmethod -from typing import Optional +from multiprocessing.queues import Queue -_db_manager_excepts: Optional[Exception] -logger = logging.getLogger(__name__) +class MonitoringRadioReceiver(metaclass=ABCMeta): + @abstractmethod + def shutdown(self) -> None: + pass class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass + + +class RadioConfig(metaclass=ABCMeta): + """Base class for radio plugin configuration. + """ + @abstractmethod + def create_sender(self) -> MonitoringRadioSender: + pass + + @abstractmethod + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver: + # TODO: return a shutdownable, and probably take some context to help in + # creation of the radio config? esp. the ZMQ endpoint to send messages to + # from the receiving process that might be created? + """create a receiver for this config, and update this config as + appropriate so that create_sender will be able to connect back to that + receiver in whichever way is relevant. create_sender can assume + that create_receiver has been called.""" + pass diff --git a/parsl/monitoring/radios/filesystem.py b/parsl/monitoring/radios/filesystem.py index accff87d36..8eecedfcd8 100644 --- a/parsl/monitoring/radios/filesystem.py +++ b/parsl/monitoring/radios/filesystem.py @@ -2,12 +2,28 @@ import os import pickle import uuid +from multiprocessing import Queue -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.monitoring import filesystem_receiver +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.multiprocessing import ForkProcess logger = logging.getLogger(__name__) +class FilesystemRadio(RadioConfig): + def create_sender(self) -> MonitoringRadioSender: + return FilesystemRadioSender(run_dir=self.run_dir) + + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver: + self.run_dir = run_dir + return FilesystemRadioReceiver(resource_msgs, run_dir) + + class FilesystemRadioSender(MonitoringRadioSender): """A MonitoringRadioSender that sends messages over a shared filesystem. @@ -26,7 +42,7 @@ class FilesystemRadioSender(MonitoringRadioSender): the UDP radio, but should be much more reliable. """ - def __init__(self, *, monitoring_url: str, timeout: int = 10, run_dir: str): + def __init__(self, *, run_dir: str): logger.info("filesystem based monitoring channel initializing") self.base_path = f"{run_dir}/monitor-fs-radio/" self.tmp_path = f"{self.base_path}/tmp" @@ -50,3 +66,19 @@ def send(self, message: object) -> None: with open(tmp_filename, "wb") as f: pickle.dump(buffer, f) os.rename(tmp_filename, new_filename) + + +class FilesystemRadioReceiver(MonitoringRadioReceiver): + def __init__(self, resource_msgs: Queue, run_dir: str) -> None: + self.filesystem_proc = ForkProcess(target=filesystem_receiver, + args=(resource_msgs, run_dir), + name="Monitoring-Filesystem-Process", + daemon=True + ) + self.filesystem_proc.start() + logger.info("Started filesystem radio receiver process %s", self.filesystem_proc.pid) + + def shutdown(self) -> None: + self.filesystem_proc.terminate() + self.filesystem_proc.join() + self.filesystem_proc.close() diff --git a/parsl/monitoring/radios/htex.py b/parsl/monitoring/radios/htex.py index bdb893b303..7b0c890d79 100644 --- a/parsl/monitoring/radios/htex.py +++ b/parsl/monitoring/radios/htex.py @@ -1,22 +1,33 @@ import logging import pickle +from multiprocessing.queues import Queue -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) logger = logging.getLogger(__name__) +class HTEXRadio(RadioConfig): + def create_sender(self) -> MonitoringRadioSender: + return HTEXRadioSender() + + def create_receiver(self, *, ip: str, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver: + return HTEXRadioReceiver() + + class HTEXRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, timeout: int = 10): + def __init__(self) -> None: """ Parameters ---------- monitoring_url : str URL of the form ://: - timeout : int - timeout, default=10s """ logger.info("htex-based monitoring channel initialising") @@ -54,4 +65,8 @@ def send(self, message: object) -> None: else: logger.error("result_queue is uninitialized - cannot put monitoring message") - return + +class HTEXRadioReceiver(MonitoringRadioReceiver): + def shutdown(self) -> None: + # there is nothing to shut down + pass diff --git a/parsl/monitoring/radios/udp.py b/parsl/monitoring/radios/udp.py index f2a652e9ac..36e91d062e 100644 --- a/parsl/monitoring/radios/udp.py +++ b/parsl/monitoring/radios/udp.py @@ -1,29 +1,105 @@ import logging +import multiprocessing as mp import pickle import socket +import threading +import time +from multiprocessing.queues import Queue +from typing import Any, Optional, Union -from parsl.monitoring.radios.base import MonitoringRadioSender +from parsl.monitoring.radios.base import ( + MonitoringRadioReceiver, + MonitoringRadioSender, + RadioConfig, +) +from parsl.process_loggers import wrap_with_logs + +logger = logging.getLogger(__name__) + + +class UDPRadio(RadioConfig): + ip: str + + # these two values need to be initialized by a create_receiver step... + # which is why an earlier patch needs to turn the UDP and zmq receivers + # into separate threads that can exist separately + + # TODO: this atexit_timeout is now user exposed - in the prior UDP-in-router impl, I think maybe it wasn't (but i should check) + def __init__(self, *, port: Optional[int] = None, atexit_timeout: Union[int, float] = 3): + # TODO awkward: when a user creates this it can be none, + # but after create_receiver initalization it is always an int. + # perhaps leads to motivation of serializable config being its + # own class distinct from the user-specified RadioConfig object? + # Right now, there would be a type-error in create_sender except + # for an assert that asserts this reasoning to mypy. + self.port = port + self.atexit_timeout = atexit_timeout + + def create_sender(self) -> MonitoringRadioSender: + assert self.port is not None, "self.port should have been initialized by create_receiver" + return UDPRadioSender(self.ip, self.port) + + def create_receiver(self, ip: str, run_dir: str, resource_msgs: Queue) -> Any: + """TODO: backwards compatibility would require a single one of these to + exist for all executors that want one, shut down when the last of its + users asks for shut down... in the case that udp_port is specified. + + But maybe the right thing to do here is lose that configuration parameter + in that form? especially as I'd like UDPRadio to go away entirely because + UDP isn't reliable or secure and other code requires reliability of messaging? + """ + + # we could bind to this instead of 0.0.0.0 but that would change behaviour, + # possibly breaking if the IP address isn't bindable (for example, if its + # a port forward). Otherwise, it isn't needed for creation of the listening + # port - only for creation of the sender. + self.ip = ip + + udp_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP) + + # We are trying to bind to all interfaces with 0.0.0.0 + if self.port is None: + udp_sock.bind(('0.0.0.0', 0)) + self.port = udp_sock.getsockname()[1] + else: + try: + udp_sock.bind(('0.0.0.0', self.port)) + except Exception as e: + # TODO: this can be its own patch to use 'from' notation? + raise RuntimeError(f"Could not bind to UDP port {self.port}") from e + udp_sock.settimeout(0.001) # TODO: configurable loop_freq? it's hard-coded though... + logger.info(f"Initialized the UDP socket on port {self.port}") + + # this is now in the submitting process, not the router process. + # I don't think this matters for UDP so much because it's on the + # way out - but how should this work for other things? compare with + # filesystem radio impl? + logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = UDPRadioReceiverThread(udp_sock=udp_sock, resource_msgs=resource_msgs, atexit_timeout=self.atexit_timeout) + udp_radio_receiver_thread.start() + + return udp_radio_receiver_thread + # TODO: wrap this with proper shutdown logic involving events etc? class UDPRadioSender(MonitoringRadioSender): - def __init__(self, monitoring_url: str, timeout: int = 10): + def __init__(self, ip: str, port: int, timeout: int = 10) -> None: """ Parameters ---------- + XXX TODO monitoring_url : str URL of the form ://: timeout : int timeout, default=10s """ - self.monitoring_url = monitoring_url self.sock_timeout = timeout - try: - self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':')) - self.port = int(port) - except Exception: - raise Exception("Failed to parse monitoring url: {}".format(monitoring_url)) + self.ip = ip + self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, @@ -42,6 +118,7 @@ def send(self, message: object) -> None: Returns: None """ + logger.info("Starting UDP radio message send") try: buffer = pickle.dumps(message) except Exception: @@ -53,4 +130,49 @@ def send(self, message: object) -> None: except socket.timeout: logging.error("Could not send message within timeout limit") return + logger.info("Normal ending for UDP radio message send") return + + +class UDPRadioReceiverThread(threading.Thread, MonitoringRadioReceiver): + def __init__(self, udp_sock: socket.socket, resource_msgs: Queue, atexit_timeout: Union[int, float]): + self.exit_event = mp.Event() + self.udp_sock = udp_sock + self.resource_msgs = resource_msgs + self.atexit_timeout = atexit_timeout + super().__init__() + + @wrap_with_logs + def run(self) -> None: + try: + while not self.exit_event.is_set(): + try: + data, addr = self.udp_sock.recvfrom(2048) + resource_msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) + self.resource_msgs.put(resource_msg) + except socket.timeout: + pass + + logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + logger.info("UDP listener finishing normally") + finally: + logger.info("UDP listener finished") + + def shutdown(self) -> None: + logger.debug("Set exit event") + self.exit_event.set() + logger.debug("Joining") + self.join() + logger.debug("done") diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 530b39f935..fe1e62b153 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -7,10 +7,7 @@ from typing import Any, Callable, Dict, List, Sequence, Tuple from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios.base import MonitoringRadioSender -from parsl.monitoring.radios.filesystem import FilesystemRadioSender -from parsl.monitoring.radios.htex import HTEXRadioSender -from parsl.monitoring.radios.udp import UDPRadioSender +from parsl.monitoring.radios.base import MonitoringRadioSender, RadioConfig from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -23,11 +20,10 @@ def monitor_wrapper(*, kwargs: Dict, # per invocation x_try_id: int, # per invocation x_task_id: int, # per invocation - monitoring_hub_url: str, # per workflow + radio_config: RadioConfig, # per executor run_id: str, # per workflow logging_level: int, # per workflow sleep_dur: float, # per workflow - radio_mode: str, # per executor monitor_resources: bool, # per workflow run_dir: str) -> Tuple[Callable, Sequence, Dict]: """Wrap the Parsl app with a function that will call the monitor function and point it at the correct pid when the task begins. @@ -41,9 +37,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: # Send first message to monitoring router send_first_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) if monitor_resources and sleep_dur > 0: @@ -52,9 +47,8 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: args=(os.getpid(), try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, logging_level, sleep_dur, run_dir, @@ -87,9 +81,9 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: send_last_message(try_id, task_id, - monitoring_hub_url, + radio_config, run_id, - radio_mode, run_dir) + run_dir) new_kwargs = kwargs.copy() new_kwargs['_parsl_monitoring_task_id'] = x_task_id @@ -98,47 +92,43 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: +def get_radio(radio_config: RadioConfig, task_id: int) -> MonitoringRadioSender: + + # TODO: maybe this function will end up simple enough to eliminate + radio: MonitoringRadioSender - if radio_mode == "udp": - radio = UDPRadioSender(monitoring_hub_url) - elif radio_mode == "htex": - radio = HTEXRadioSender(monitoring_hub_url) - elif radio_mode == "filesystem": - radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, - run_dir=run_dir) - else: - raise RuntimeError(f"Unknown radio mode: {radio_mode}") + radio = radio_config.create_sender() + return radio @wrap_with_logs def send_first_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, False) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, False) @wrap_with_logs def send_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str) -> None: - send_first_last_message(try_id, task_id, monitoring_hub_url, run_id, - radio_mode, run_dir, True) + radio_config: RadioConfig, + run_id: str, run_dir: str) -> None: + send_first_last_message(try_id, task_id, radio_config, run_id, + run_dir, True) def send_first_last_message(try_id: int, task_id: int, - monitoring_hub_url: str, - run_id: str, radio_mode: str, run_dir: str, + radio_config: RadioConfig, + run_id: str, run_dir: str, is_last: bool) -> None: import os import platform - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) msg = (MessageType.RESOURCE_INFO, {'run_id': run_id, @@ -158,9 +148,8 @@ def send_first_last_message(try_id: int, def monitor(pid: int, try_id: int, task_id: int, - monitoring_hub_url: str, + radio_config: RadioConfig, run_id: str, - radio_mode: str, logging_level: int, sleep_dur: float, run_dir: str, @@ -184,7 +173,7 @@ def monitor(pid: int, setproctitle("parsl: task resource monitor") - radio = get_radio(radio_mode, monitoring_hub_url, task_id, run_dir) + radio = get_radio(radio_config, task_id) logging.debug("start of monitor") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index a3ef46b8eb..49be8c0d3c 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -3,12 +3,10 @@ import logging import multiprocessing.queues as mpq import os -import pickle -import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple +from typing import Tuple import typeguard import zmq @@ -27,7 +25,6 @@ class MonitoringRouter: def __init__(self, *, hub_address: str, - udp_port: Optional[int] = None, zmq_port_range: Tuple[int, int] = (55050, 56000), monitoring_hub_address: str = "127.0.0.1", @@ -43,8 +40,6 @@ def __init__(self, ---------- hub_address : str The ip address at which the workers will be able to reach the Hub. - udp_port : int - The specific port at which workers will be able to reach the Hub via UDP. Default: None zmq_port_range : tuple(int, int) The MonitoringHub picks ports at random from the range which will be used by Hub. Default: (55050, 56000) @@ -70,24 +65,6 @@ def __init__(self, self.loop_freq = 10.0 # milliseconds - # Initialize the UDP socket - self.udp_sock = socket.socket(socket.AF_INET, - socket.SOCK_DGRAM, - socket.IPPROTO_UDP) - - # We are trying to bind to all interfaces with 0.0.0.0 - if not udp_port: - self.udp_sock.bind(('0.0.0.0', 0)) - self.udp_port = self.udp_sock.getsockname()[1] - else: - self.udp_port = udp_port - try: - self.udp_sock.bind(('0.0.0.0', self.udp_port)) - except Exception as e: - raise RuntimeError(f"Could not bind to udp_port {udp_port} because: {e}") - self.udp_sock.settimeout(self.loop_freq / 1000) - self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.udp_port)) - self._context = zmq.Context() self.zmq_receiver_channel = self._context.socket(zmq.DEALER) self.zmq_receiver_channel.setsockopt(zmq.LINGER, 0) @@ -103,47 +80,13 @@ def __init__(self, @wrap_with_logs(target="monitoring_router") def start(self) -> None: - self.logger.info("Starting UDP listener thread") - udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) - udp_radio_receiver_thread.start() - self.logger.info("Starting ZMQ listener thread") zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) zmq_radio_receiver_thread.start() self.logger.info("Joining on ZMQ listener thread") zmq_radio_receiver_thread.join() - self.logger.info("Joining on UDP listener thread") - udp_radio_receiver_thread.join() - self.logger.info("Joined on both ZMQ and UDP listener threads") - - @wrap_with_logs(target="monitoring_router") - def start_udp_listener(self) -> None: - try: - while not self.exit_event.is_set(): - try: - data, addr = self.udp_sock.recvfrom(2048) - resource_msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - self.target_radio.send(resource_msg) - except socket.timeout: - pass - - self.logger.info("UDP listener draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.target_radio.send(msg) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("UDP listener finishing normally") - finally: - self.logger.info("UDP listener finished") + self.logger.info("Joined on ZMQ listener thread") @wrap_with_logs(target="monitoring_router") def start_zmq_listener(self) -> None: @@ -184,7 +127,6 @@ def router_starter(*, exit_event: Event, hub_address: str, - udp_port: Optional[int], zmq_port_range: Tuple[int, int], run_dir: str, @@ -192,7 +134,6 @@ def router_starter(*, setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, - udp_port=udp_port, zmq_port_range=zmq_port_range, run_dir=run_dir, logging_level=logging_level, @@ -202,7 +143,7 @@ def router_starter(*, logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") else: - comm_q.put((router.udp_port, router.zmq_receiver_port)) + comm_q.put(router.zmq_receiver_port) router.logger.info("Starting MonitoringRouter in router_starter") try: diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index d84a07ad84..5667f3ff8c 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -62,7 +62,6 @@ def fresh_config(): retries=2, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, monitoring_debug=False, resource_monitoring_interval=1, ), diff --git a/parsl/tests/configs/local_threads_monitoring.py b/parsl/tests/configs/local_threads_monitoring.py index 81b9095285..9f105af25d 100644 --- a/parsl/tests/configs/local_threads_monitoring.py +++ b/parsl/tests/configs/local_threads_monitoring.py @@ -5,7 +5,6 @@ config = Config(executors=[ThreadPoolExecutor(label='threads', max_threads=4)], monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, resource_monitoring_interval=3, ) ) diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 4bcdde0b7a..c0d5a52baf 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -259,6 +259,8 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): dfk.cleanup() assert DataFlowKernelLoader._dfk is None + # assert [t for t in threading.enumerate() if "MainThread" not in t.name and "HTEX-Queue-Management-Thread" not in t.name] == [] + else: yield diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index 9ffa21df01..8af899fb05 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -8,6 +8,9 @@ from parsl.config import Config from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig from parsl.monitoring import MonitoringHub +from parsl.monitoring.radios.filesystem import FilesystemRadio +from parsl.monitoring.radios.htex import HTEXRadio +from parsl.monitoring.radios.udp import UDPRadio @parsl.python_app @@ -35,9 +38,10 @@ def htex_udp_config(): from parsl.tests.configs.htex_local_alternate import fresh_config c = fresh_config() assert len(c.executors) == 1 + ex = c.executors[0] - assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" - c.executors[0].radio_mode = "udp" + assert isinstance(ex.remote_monitoring_radio_config, HTEXRadio), "precondition: htex is configured for the HTEXRadio" + ex.remote_monitoring_radio_config = UDPRadio() return c @@ -47,9 +51,10 @@ def htex_filesystem_config(): from parsl.tests.configs.htex_local_alternate import fresh_config c = fresh_config() assert len(c.executors) == 1 + ex = c.executors[0] - assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" - c.executors[0].radio_mode = "filesystem" + assert isinstance(ex.remote_monitoring_radio_config, HTEXRadio), "precondition: htex is configured for the HTEXRadio" + ex.remote_monitoring_radio_config = FilesystemRadio() return c diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index ada972e747..c54486f011 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -37,7 +37,6 @@ def fresh_config(run_dir, strategy, db_url): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, logging_endpoint=db_url ) ) diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py index d1817164c0..8e1935045f 100644 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ b/parsl/tests/test_monitoring/test_stdouterr.py @@ -37,7 +37,6 @@ def fresh_config(run_dir): strategy_period=0.1, monitoring=MonitoringHub( hub_address="localhost", - hub_port=55055, ) )