From 973f672e2cfcd733dcd454fb767dbacf2db37446 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 16 Dec 2024 15:26:02 +0100 Subject: [PATCH 1/2] Adapt message arguments passing to process controller --- src/aiida/cmdline/commands/cmd_process.py | 18 ++++++++++++++---- src/aiida/engine/processes/control.py | 11 +++++++---- src/aiida/engine/processes/functions.py | 4 ++-- src/aiida/engine/processes/process.py | 6 +++--- src/aiida/engine/runners.py | 2 +- tests/engine/test_rmq.py | 10 ++++------ 6 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index e203bdddfc..5ad7c5d53c 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -340,8 +340,13 @@ def process_kill(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' - control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + control.kill_processes( + processes, + msg_text='Killed through `verdi process kill`', + all_entries=all_entries, + timeout=timeout, + wait=wait, + ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -371,8 +376,13 @@ def process_pause(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Paused through `verdi process pause`' - control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + control.pause_processes( + processes, + msg_text='Paused through `verdi process pause`', + all_entries=all_entries, + timeout=timeout, + wait=wait, + ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 7cc214c76c..2ecc8477df 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -4,6 +4,7 @@ import collections import concurrent +import functools import typing as t import kiwipy @@ -135,7 +136,7 @@ def play_processes( def pause_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', + msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -164,13 +165,14 @@ def pause_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.pause_process, 'pause', 'pausing', timeout, wait, msg=message) + action = functools.partial(controller.pause_process, msg_text=msg_text) + _perform_actions(processes, action, 'pause', 'pausing', timeout, wait) def kill_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', + msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -199,7 +201,8 @@ def kill_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) + action = functools.partial(controller.kill_process, msg_text=msg_text) + _perform_actions(processes, action, 'kill', 'killing', timeout, wait) def _perform_actions( diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 8bca68f55c..7936979531 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -222,7 +222,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode if kwargs and not process_class.spec().inputs.dynamic: raise ValueError(f'{function.__name__} does not support these kwargs: {kwargs.keys()}') - process = process_class(inputs=inputs, runner=runner) + process: Process = process_class(inputs=inputs, runner=runner) # Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner. # Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown @@ -235,7 +235,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode def kill_process(_num, _frame): """Send the kill signal to the process in the current scope.""" LOGGER.critical('runner received interrupt, killing process %s', process.pid) - result = process.kill(msg='Process was killed because the runner received an interrupt') + result = process.kill(msg_text='Process was killed because the runner received an interrupt') return result # Store the current handler on the signal such that it can be restored after process has terminated diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index e25d1b7c23..f29d426770 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -329,7 +329,7 @@ def load_instance_state( self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state') - def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: + def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future]: """Kill the process and all the children calculations it called :param msg: message @@ -338,7 +338,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur had_been_terminated = self.has_terminated() - result = super().kill(msg) + result = super().kill(msg_text) # Only kill children if we could be killed ourselves if result is not False and not had_been_terminated: @@ -348,7 +348,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur self.logger.info('no controller available to kill child<%s>', child.pk) continue try: - result = self.runner.controller.kill_process(child.pk, f'Killed by parent<{self.node.pk}>') + result = self.runner.controller.kill_process(child.pk, msg_text=f'Killed by parent<{self.node.pk}>') result = asyncio.wrap_future(result) # type: ignore[arg-type] if asyncio.isfuture(result): killing.append(result) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 42cb76244c..b19821b2e7 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -250,7 +250,7 @@ def kill_process(_num, _frame): LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) return LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) - process_inited.kill(msg='Process was killed because the runner received an interrupt') + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') original_handler_int = signal.getsignal(signal.SIGINT) original_handler_term = signal.getsignal(signal.SIGTERM) diff --git a/tests/engine/test_rmq.py b/tests/engine/test_rmq.py index a2edc2fa41..d0a3e461fb 100644 --- a/tests/engine/test_rmq.py +++ b/tests/engine/test_rmq.py @@ -93,8 +93,7 @@ async def do_pause(): assert result assert calc_node.paused - kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text='Sorry, you have to go mate') future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -112,7 +111,7 @@ async def do_pause_play(): await asyncio.sleep(0.1) pause_message = 'Take a seat' - pause_future = controller.pause_process(calc_node.pk, msg=pause_message) + pause_future = controller.pause_process(calc_node.pk, msg_text=pause_message) future = await with_timeout(asyncio.wrap_future(pause_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert calc_node.paused @@ -126,8 +125,7 @@ async def do_pause_play(): assert not calc_node.paused assert calc_node.process_status is None - kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text='Sorry, you have to go mate') future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -145,7 +143,7 @@ async def do_kill(): await asyncio.sleep(0.1) kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result From 35a02047e1e25f3999ecb129fdd0b69cc539cdc8 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sat, 11 Jan 2025 18:50:28 +0100 Subject: [PATCH 2/2] Bump plumpy --- environment.yml | 2 +- pyproject.toml | 2 +- uv.lock | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/environment.yml b/environment.yml index ad80dd3416..fbce5ffe8c 100644 --- a/environment.yml +++ b/environment.yml @@ -22,7 +22,7 @@ dependencies: - importlib-metadata~=6.0 - numpy~=1.21 - paramiko~=3.0 -- plumpy~=0.22.3 +- plumpy~=0.24.0 - pgsu~=0.3.0 - psutil~=5.6 - psycopg[binary]~=3.0 diff --git a/pyproject.toml b/pyproject.toml index 32894eb4ac..6cca60defd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ 'importlib-metadata~=6.0', 'numpy~=1.21', 'paramiko~=3.0', - 'plumpy~=0.22.3', + 'plumpy~=0.24.0', 'pgsu~=0.3.0', 'psutil~=5.6', 'psycopg[binary]~=3.0', diff --git a/uv.lock b/uv.lock index 9aa6014660..c503a4264b 100644 --- a/uv.lock +++ b/uv.lock @@ -200,7 +200,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.22.3" }, + { name = "plumpy", specifier = "~=0.24.0" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=5.6" }, { name = "psycopg", extras = ["binary"], specifier = "~=3.0" }, @@ -3085,7 +3085,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "python_full_version < '3.10' or sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450 } wheels = [ @@ -3241,16 +3241,16 @@ wheels = [ [[package]] name = "plumpy" -version = "0.22.3" +version = "0.24.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "kiwipy", extra = ["rmq"] }, { name = "nest-asyncio" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ab/99/6c931d3f4697acd34cf18eb3fbfe96ed55cd0408d9be7c0f316349117a8e/plumpy-0.22.3.tar.gz", hash = "sha256:e58f45e6360f173babf04e2a4abacae9867622768ce2a126c8260db3b46372c4", size = 73582 } +sdist = { url = "https://files.pythonhosted.org/packages/d9/0c/0bb568982e461f5e428606ccbdfe6d43c11dab0e3f5a8090298feb321172/plumpy-0.24.0.tar.gz", hash = "sha256:c17c8efbd124d7f5ec2f27cb1f2c3de7901143e61551ce81f3ee22bf7e2ed42d", size = 75634 } wheels = [ - { url = "https://files.pythonhosted.org/packages/95/d9/12fd8281f494ca79d6a7a9d40099616d16415be5807959e5b024dffe8aed/plumpy-0.22.3-py3-none-any.whl", hash = "sha256:63ae6c90713f52483836a3b2b3e1941eab7ada920c303092facc27e78229bdc3", size = 74244 }, + { url = "https://files.pythonhosted.org/packages/99/d3/68c83d4774f7a4f8e8dd4e30ce34e46071706a4b4dc40d3a1ad77de793fc/plumpy-0.24.0-py3-none-any.whl", hash = "sha256:09efafe97c88c8928e73f1dc08cf02a2c4737fa767920bff23dfa26226252cc6", size = 74955 }, ] [[package]]