diff --git a/.flake8 b/.flake8 index 176fb50436..c2f6336190 100644 --- a/.flake8 +++ b/.flake8 @@ -8,7 +8,7 @@ # W504: line break after binary operator # (Raised by flake8 even when it is followed) ignore = E126, E402, E129, W504 -max-line-length = 157 +max-line-length = 152 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py # E741 disallows ambiguous single letter names which look like numbers diff --git a/docs/README.rst b/docs/README.rst index 44f91eb16f..f50c03f3d1 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -21,7 +21,15 @@ Local builds To build the documentation locally, use:: - $ make html + $ make clean html + +To view the freshly rebuilt docs, use:: + + $ cd _build/html + $ python3 -m http.server 8080 + +Once the python http server is launched, point your browser to `http://localhost:8080 `_ + Regenerate module stubs -------------------------- diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 1a61e40e73..4168367f9d 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -39,7 +39,7 @@ parameters include access keys, instance type, and spot bid price Parsl currently supports the following providers: 1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation. -2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. +2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**. 3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. 4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. 5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. @@ -48,7 +48,8 @@ Parsl currently supports the following providers: 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. 10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler +11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. + Executors diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index 34d5ef4ca5..091a1da1ab 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -70,7 +70,8 @@ class TaskRecord(TypedDict, total=False): # these three could be more strongly typed perhaps but I'm not thinking about that now func: Callable fn_hash: str - args: Sequence[Any] # in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those. + # in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those. + args: Sequence[Any] kwargs: Dict[str, Any] time_invoked: Optional[datetime.datetime] diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index df427f61c7..9fda518ae4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -6,6 +6,7 @@ import queue import datetime import pickle +from dataclasses import dataclass from multiprocessing import Process, Queue from typing import Dict, Sequence from typing import List, Optional, Tuple, Union, Callable @@ -629,8 +630,8 @@ def submit(self, func, resource_specification, *args, **kwargs): The outgoing_q is an external process listens on this queue for new work. This method behaves like a - submit call as described here `Python docs: `_ - + submit call as described here + `Python docs: `_ Args: - func (callable) : Callable function - resource_specification (dict): Dictionary containing relevant info about task that is needed by underlying executors. @@ -694,7 +695,7 @@ def create_monitoring_info(self, status): def workers_per_node(self) -> Union[int, float]: return self._workers_per_node - def scale_in(self, blocks, max_idletime=None): + def scale_in(self, blocks: int, max_idletime: Optional[float] = None) -> List[str]: """Scale in the number of active blocks by specified amount. The scale in method here is very rude. It doesn't give the workers @@ -721,25 +722,31 @@ def scale_in(self, blocks, max_idletime=None): List of block IDs scaled in """ logger.debug(f"Scale in called, blocks={blocks}") + + @dataclass + class BlockInfo: + tasks: int # sum of tasks in this block + idle: float # shortest idle time of any manager in this block + managers = self.connected_managers() - block_info = {} # block id -> list( tasks, idle duration ) + block_info: Dict[str, BlockInfo] = {} for manager in managers: if not manager['active']: continue b_id = manager['block_id'] if b_id not in block_info: - block_info[b_id] = [0, float('inf')] - block_info[b_id][0] += manager['tasks'] - block_info[b_id][1] = min(block_info[b_id][1], manager['idle_duration']) + block_info[b_id] = BlockInfo(tasks=0, idle=float('inf')) + block_info[b_id].tasks += manager['tasks'] + block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration']) - sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0])) + sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1].idle, item[1].tasks)) logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks") if max_idletime is None: block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]] else: block_ids_to_kill = [] for x in sorted_blocks: - if x[1][1] > max_idletime and x[1][0] == 0: + if x[1].idle > max_idletime and x[1].tasks == 0: block_ids_to_kill.append(x[0]) if len(block_ids_to_kill) == blocks: break diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 13d271b402..bea85b9984 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -37,6 +37,7 @@ class ManagerLost(Exception): ''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats have been missed. ''' + def __init__(self, manager_id: bytes, hostname: str) -> None: self.manager_id = manager_id self.tstamp = time.time() @@ -49,6 +50,7 @@ def __str__(self) -> str: class VersionMismatch(Exception): ''' Manager and Interchange versions do not match ''' + def __init__(self, interchange_version: str, manager_version: str): self.interchange_version = interchange_version self.manager_version = manager_version @@ -66,6 +68,7 @@ class Interchange: 2. Allow for workers to join and leave the union 3. Detect workers that have failed using heartbeats """ + def __init__(self, client_address: str = "127.0.0.1", interchange_address: Optional[str] = None, @@ -392,7 +395,8 @@ def start(self) -> None: logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) logger.warning("Exiting") - def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None: + def process_task_outgoing_incoming(self, interesting_managers: + Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None: """Process one message from manager on the task_outgoing channel. Note that this message flow is in contradiction to the name of the channel - it is not an outgoing message and it is not a task. @@ -620,8 +624,13 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: ------- None. """ + if format_string is None: - format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s" + format_string = ( + "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d " + "%(processName)s(%(process)d) %(threadName)s " + "%(funcName)s [%(levelname)s] %(message)s" + ) global logger logger = logging.getLogger(LOGGER_NAME) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 9ee2053984..336f1ef186 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -55,6 +55,7 @@ class Manager: | | IPC-Qeueues """ + def __init__(self, *, addresses, address_probe_timeout, @@ -413,7 +414,9 @@ def worker_watchdog(self, kill_event: threading.Event): raise WorkerLost(worker_id, platform.node()) except Exception: logger.info("Putting exception for executor task {} in the pending result queue".format(task['task_id'])) - result_package = {'type': 'result', 'task_id': task['task_id'], 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))} + result_package = {'type': 'result', + 'task_id': task['task_id'], + 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))} pkl_package = pickle.dumps(result_package) self.pending_result_queue.put(pkl_package) except KeyError: @@ -867,7 +870,10 @@ def strategyorlist(s: str): block_id=args.block_id, cores_per_worker=float(args.cores_per_worker), mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker), - max_workers_per_node=args.max_workers_per_node if args.max_workers_per_node == float('inf') else int(args.max_workers_per_node), + max_workers_per_node=( + args.max_workers_per_node if args.max_workers_per_node == float('inf') + else int(args.max_workers_per_node) + ), prefetch_capacity=int(args.prefetch_capacity), heartbeat_threshold=int(args.hb_threshold), heartbeat_period=int(args.hb_period), diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index b396d43e37..c6372b4ae9 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -245,7 +245,8 @@ def _general_strategy(self, status_list, *, strategy_type): exec_status.scale_in(active_blocks - min_blocks) else: - logger.debug(f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in") + logger.debug + (f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in") # Case 2 # More tasks than the available slots. diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index ebc63b8291..c95a3f11ef 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -518,7 +518,9 @@ def start(self, reprocessable_first_resource_messages.append(msg) else: if task_try_id in deferred_resource_messages: - logger.error("Task {} already has a deferred resource message. Discarding previous message.".format(msg['task_id'])) + logger.error( + "Task {} already has a deferred resource message. Discarding previous message." + .format(msg['task_id'])) deferred_resource_messages[task_try_id] = msg elif msg['last_msg']: # This assumes that the primary key has been added @@ -544,7 +546,10 @@ def start(self, if reprocessable_last_resource_messages: self._insert(table=STATUS, messages=reprocessable_last_resource_messages) except Exception: - logger.exception("Exception in db loop: this might have been a malformed message, or some other error. monitoring data may have been lost") + logger.exception( + "Exception in db loop: this might have been a malformed message, " + "or some other error. monitoring data may have been lost" + ) exception_happened = True if exception_happened: raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8acc40b70f..b6a168cff2 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -290,8 +290,12 @@ def close(self) -> None: self._dfk_channel.close() if exception_msgs: for exception_msg in exception_msgs: - self.logger.error("{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(exception_msg[0], - exception_msg[1])) + self.logger.error( + "{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format( + exception_msg[0], + exception_msg[1] + ) + ) self.router_proc.terminate() self.dbm_proc.terminate() self.filesystem_proc.terminate() diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 3f57076f50..89b2b4005e 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -280,11 +280,18 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s else: logger.error("Could not read job ID from submit command standard output.") logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) - raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode) + raise SubmitException( + job_name, + "Could not read job ID from submit command standard output", + stdout=stdout, + stderr=stderr, + retcode=retcode + ) else: logger.error("Submit command failed") logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) - raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode) + raise SubmitException(job_name, "Could not read job ID from submit command standard output", + stdout=stdout, stderr=stderr, retcode=retcode) def cancel(self, job_ids): ''' Cancels the jobs specified by a list of job ids diff --git a/test.memo.stdout.x b/test.memo.stdout.x new file mode 100644 index 0000000000..62d8fe9f6d --- /dev/null +++ b/test.memo.stdout.x @@ -0,0 +1 @@ +X