From c85e16fcd0474551b51146aee3345eb6e6dac075 Mon Sep 17 00:00:00 2001 From: Eivind Jahren Date: Mon, 7 Oct 2024 08:12:10 +0200 Subject: [PATCH] Handle inaccesible runpath gracefully --- .github/workflows/test_ert_with_slurm.yml | 1 + src/ert/gui/simulation/run_dialog.py | 4 +- src/ert/scheduler/driver.py | 4 + src/ert/scheduler/job.py | 35 ++++--- src/ert/scheduler/lsf_driver.py | 34 ++++--- src/ert/scheduler/openpbs_driver.py | 4 +- src/ert/scheduler/scheduler.py | 50 +++++++--- src/ert/scheduler/slurm_driver.py | 33 ++++--- tests/ui_tests/cli/test_missing_runpath.py | 98 +++++++++++++++++++ tests/ui_tests/gui/conftest.py | 38 ++++--- tests/ui_tests/gui/test_main_window.py | 8 +- tests/ui_tests/gui/test_missing_runpath.py | 95 ++++++++++++++++++ .../gui/simulation/test_run_dialog.py | 4 + 13 files changed, 332 insertions(+), 76 deletions(-) create mode 100644 tests/ui_tests/cli/test_missing_runpath.py create mode 100644 tests/ui_tests/gui/test_missing_runpath.py diff --git a/.github/workflows/test_ert_with_slurm.yml b/.github/workflows/test_ert_with_slurm.yml index b0bc29d1607..5a3bb804daf 100644 --- a/.github/workflows/test_ert_with_slurm.yml +++ b/.github/workflows/test_ert_with_slurm.yml @@ -70,6 +70,7 @@ jobs: set -e export _ERT_TESTS_ALTERNATIVE_QUEUE=AlternativeQ pytest tests/unit_tests/scheduler --slurm + pytest tests/ui_tests/cli/test_missing_runpath.py --slurm - name: Test poly-example on slurm run: | diff --git a/src/ert/gui/simulation/run_dialog.py b/src/ert/gui/simulation/run_dialog.py index 825da56045c..5f960065529 100644 --- a/src/ert/gui/simulation/run_dialog.py +++ b/src/ert/gui/simulation/run_dialog.py @@ -193,6 +193,7 @@ def __init__( self._run_model = run_model self._event_queue = event_queue self._notifier = notifier + self.fail_msg_box: Optional[ErtMessageBox] = None self._minimum_width = 1200 self._minimum_height = 800 @@ -407,7 +408,8 @@ def _on_simulation_done(self, failed: bool, msg: str) -> None: if failed: self.update_total_progress(1.0, "Failed") self.fail_msg_box = ErtMessageBox("ERT experiment failed!", msg, self) - self.fail_msg_box.exec_() + self.fail_msg_box.setModal(True) + self.fail_msg_box.show() else: self.update_total_progress(1.0, "Experiment completed.") diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index f83071ed6c7..f14bc25fe1e 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -13,6 +13,10 @@ """Bash and other shells add an offset of 128 to the signal value when a process exited due to a signal""" +class FailedSubmit(RuntimeError): + pass + + class Driver(ABC): """Adapter for the HPC cluster.""" diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 831de54e85f..2b12edc7975 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -18,7 +18,7 @@ from ert.load_status import LoadStatus from ert.storage.realization_storage_state import RealizationStorageState -from .driver import Driver +from .driver import Driver, FailedSubmit if TYPE_CHECKING: from ert.ensemble_evaluator import Realization @@ -63,13 +63,20 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None: self.state = JobState.WAITING self.started = asyncio.Event() self.returncode: asyncio.Future[int] = asyncio.Future() - self._aborted = False self._scheduler: Scheduler = scheduler self._callback_status_msg: str = "" self._requested_max_submit: Optional[int] = None self._start_time: Optional[float] = None self._end_time: Optional[float] = None + def unschedule(self, msg: str) -> None: + self.state = JobState.ABORTED + self.real.run_arg.ensemble_storage.set_failure( + self.real.run_arg.iens, + RealizationStorageState.LOAD_FAILURE, + f"Job not scheduled due to {msg}", + ) + @property def iens(self) -> int: return self.real.iens @@ -95,15 +102,21 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: if self._scheduler.submit_sleep_state: await self._scheduler.submit_sleep_state.sleep_until_we_can_submit() await self._send(JobState.SUBMITTING) - await self.driver.submit( - self.real.iens, - self.real.job_script, - self.real.run_arg.runpath, - num_cpu=self.real.num_cpu, - realization_memory=self.real.realization_memory, - name=self.real.run_arg.job_name, - runpath=Path(self.real.run_arg.runpath), - ) + try: + await self.driver.submit( + self.real.iens, + self.real.job_script, + self.real.run_arg.runpath, + num_cpu=self.real.num_cpu, + realization_memory=self.real.realization_memory, + name=self.real.run_arg.job_name, + runpath=Path(self.real.run_arg.runpath), + ) + except FailedSubmit as err: + await self._send(JobState.FAILED) + logger.error(f"Failed to submit: {err}") + self.returncode.cancel() + return await self._send(JobState.PENDING) await self.started.wait() diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 7c7e364a2dc..4562cd823f6 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -8,10 +8,10 @@ import shlex import shutil import stat -import tempfile import time from dataclasses import dataclass from pathlib import Path +from tempfile import NamedTemporaryFile from typing import ( Dict, Iterable, @@ -27,7 +27,7 @@ get_args, ) -from .driver import SIGNAL_OFFSET, Driver +from .driver import SIGNAL_OFFSET, Driver, FailedSubmit from .event import Event, FinishedEvent, StartedEvent _POLL_PERIOD = 2.0 # seconds @@ -298,16 +298,22 @@ async def submit( f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n" ) script_path: Optional[Path] = None - with tempfile.NamedTemporaryFile( - dir=runpath, - prefix=".lsf_submit_", - suffix=".sh", - mode="w", - encoding="utf-8", - delete=False, - ) as script_handle: - script_handle.write(script) - script_path = Path(script_handle.name) + try: + with NamedTemporaryFile( + dir=runpath, + prefix=".lsf_submit_", + suffix=".sh", + mode="w", + encoding="utf-8", + delete=False, + ) as script_handle: + script_handle.write(script) + script_path = Path(script_handle.name) + except OSError as err: + error_message = f"Could not create submit script: {err}" + self._job_error_message_by_iens[iens] = error_message + raise FailedSubmit(error_message) from err + assert script_path is not None script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC) @@ -345,11 +351,11 @@ async def submit( ) if not process_success: self._job_error_message_by_iens[iens] = process_message - raise RuntimeError(process_message) + raise FailedSubmit(process_message) match = re.search("Job <([0-9]+)> is submitted to .*queue", process_message) if match is None: - raise RuntimeError( + raise FailedSubmit( f"Could not understand '{process_message}' from bsub" ) job_id = match[1] diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index 84785ba57ec..dab0b8dad83 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -23,7 +23,7 @@ get_type_hints, ) -from .driver import Driver +from .driver import Driver, FailedSubmit from .event import Event, FinishedEvent, StartedEvent logger = logging.getLogger(__name__) @@ -274,7 +274,7 @@ async def submit( ) if not process_success: self._job_error_message_by_iens[iens] = process_message - raise RuntimeError(process_message) + raise FailedSubmit(process_message) job_id_ = process_message logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}") diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index ae96556421e..4b689d297b7 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -24,7 +24,7 @@ from pydantic.dataclasses import dataclass from _ert.async_utils import get_running_loop -from _ert.events import Event, ForwardModelStepChecksum, Id +from _ert.events import Event, ForwardModelStepChecksum, Id, event_from_dict from ert.constant_filenames import CERT_FILE from .driver import Driver @@ -276,10 +276,24 @@ async def execute( # does internalization at a time forward_model_ok_lock = asyncio.Lock() for iens, job in self._jobs.items(): - self._job_tasks[iens] = asyncio.create_task( - job.run(sem, forward_model_ok_lock, self._max_submit), - name=f"job-{iens}_task", - ) + if job.state != JobState.ABORTED: + self._job_tasks[iens] = asyncio.create_task( + job.run(sem, forward_model_ok_lock, self._max_submit), + name=f"job-{iens}_task", + ) + else: + failure = job.real.run_arg.ensemble_storage.get_failure(iens) + await self._events.put( + event_from_dict( + { + "ensemble": self._ens_id, + "event_type": Id.REALIZATION_FAILURE, + "queue_event_type": JobState.FAILED, + "message": failure.message if failure else None, + "real": str(iens), + } + ) + ) logger.info("All tasks started") self._running.set() try: @@ -317,8 +331,14 @@ async def _process_event_queue(self) -> None: def _update_jobs_json(self, iens: int, runpath: str) -> None: cert_path = f"{runpath}/{CERT_FILE}" - if self._ee_cert is not None: - Path(cert_path).write_text(self._ee_cert, encoding="utf-8") + try: + if self._ee_cert is not None: + Path(cert_path).write_text(self._ee_cert, encoding="utf-8") + except OSError as err: + error_msg = f"Could not write ensemble certificate: {err}" + self._jobs[iens].unschedule(error_msg) + logger.error(error_msg) + return jobs = _JobsJson( experiment_id=None, ens_id=self._ens_id, @@ -328,8 +348,14 @@ def _update_jobs_json(self, iens: int, runpath: str) -> None: ee_cert_path=cert_path if self._ee_cert is not None else None, ) jobs_path = os.path.join(runpath, "jobs.json") - with open(jobs_path, "rb") as fp: - data = orjson.loads(fp.read()) - with open(jobs_path, "wb") as fp: - data.update(asdict(jobs)) - fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2)) + try: + with open(jobs_path, "rb") as fp: + data = orjson.loads(fp.read()) + with open(jobs_path, "wb") as fp: + data.update(asdict(jobs)) + fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2)) + except OSError as err: + error_msg = f"Could not update jobs.json: {err}" + self._jobs[iens].unschedule(error_msg) + logger.error(error_msg) + return diff --git a/src/ert/scheduler/slurm_driver.py b/src/ert/scheduler/slurm_driver.py index 9d213ba13e5..76aaf2ba7f2 100644 --- a/src/ert/scheduler/slurm_driver.py +++ b/src/ert/scheduler/slurm_driver.py @@ -5,18 +5,18 @@ import logging import shlex import stat -import tempfile import time from dataclasses import dataclass from enum import Enum, auto from pathlib import Path +from tempfile import NamedTemporaryFile from typing import ( Iterator, Optional, Tuple, ) -from .driver import SIGNAL_OFFSET, Driver +from .driver import SIGNAL_OFFSET, Driver, FailedSubmit from .event import Event, FinishedEvent, StartedEvent SLURM_FAILED_EXIT_CODE_FETCH = SIGNAL_OFFSET + 66 @@ -184,16 +184,21 @@ async def submit( f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n" ) script_path: Optional[Path] = None - with tempfile.NamedTemporaryFile( - dir=runpath, - prefix=".slurm_submit_", - suffix=".sh", - mode="w", - encoding="utf-8", - delete=False, - ) as script_handle: - script_handle.write(script) - script_path = Path(script_handle.name) + try: + with NamedTemporaryFile( + dir=runpath, + prefix=".slurm_submit_", + suffix=".sh", + mode="w", + encoding="utf-8", + delete=False, + ) as script_handle: + script_handle.write(script) + script_path = Path(script_handle.name) + except OSError as err: + error_message = f"Could not create submit script: {err}" + self._job_error_message_by_iens[iens] = error_message + raise FailedSubmit(error_message) from err assert script_path is not None script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC) sbatch_with_args = [*self._submit_cmd(name, runpath, num_cpu), str(script_path)] @@ -214,10 +219,10 @@ async def submit( ) if not process_success: self._job_error_message_by_iens[iens] = process_message - raise RuntimeError(process_message) + raise FailedSubmit(process_message) if not process_message: - raise RuntimeError("sbatch returned empty jobid") + raise FailedSubmit("sbatch returned empty jobid") job_id = process_message logger.info(f"Realization {iens} accepted by SLURM, got id {job_id}") diff --git a/tests/ui_tests/cli/test_missing_runpath.py b/tests/ui_tests/cli/test_missing_runpath.py new file mode 100644 index 00000000000..615454ed6dd --- /dev/null +++ b/tests/ui_tests/cli/test_missing_runpath.py @@ -0,0 +1,98 @@ +import stat +from contextlib import suppress +from tempfile import NamedTemporaryFile +from unittest.mock import patch + +import pytest + +from ert.cli.main import ErtCliError + +from .run_cli import run_cli + +config_contents = """\ +QUEUE_SYSTEM {queue_system} +NUM_REALIZATIONS 10 +LOAD_WORKFLOW_JOB CHMOD_JOB CHMOD +LOAD_WORKFLOW CHMOD.wf CHMOD.wf +HOOK_WORKFLOW CHMOD.wf PRE_SIMULATION + +""" + +workflow_contents = """\ +CHMOD +""" + +workflow_job_contents = """\ +EXECUTABLE chmod.sh +""" + +chmod_sh_contents = """\ +#!/bin/bash +chmod 000 {tmp_path}/simulations/realization-0/iter-0 +""" + + +def write_config(tmp_path, queue_system): + (tmp_path / "config.ert").write_text( + config_contents.format(queue_system=queue_system) + ) + (tmp_path / "CHMOD_JOB").write_text(workflow_job_contents) + (tmp_path / "CHMOD.wf").write_text(workflow_contents) + (tmp_path / "chmod.sh").write_text(chmod_sh_contents.format(tmp_path=tmp_path)) + (tmp_path / "chmod.sh").chmod( + (tmp_path / "chmod.sh").stat().st_mode + | stat.S_IXUSR + | stat.S_IXGRP + | stat.S_IXOTH + ) + + +def test_missing_runpath_has_isolated_failures(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + write_config(tmp_path, "LOCAL") + try: + with pytest.raises( + ErtCliError, + match=r"active realizations \(9\) is less than .* MIN_REALIZATIONS\(10\)", + ): + run_cli("ensemble_experiment", "config.ert", "--disable-monitoring") + finally: + with suppress(FileNotFoundError): + (tmp_path / "simulations/realization-0/iter-0").chmod(0x777) + + +def raising_named_temporary_file(*args, **kwargs): + if "realization-1" in str(kwargs["dir"]): + raise OSError("Don't like realization-1") + return NamedTemporaryFile(*args, **kwargs) # noqa + + +def patch_raising_named_temporary_file(queue_system): + return patch( + f"ert.scheduler.{queue_system}_driver.NamedTemporaryFile", + raising_named_temporary_file, + ) + + +@pytest.mark.usefixtures() +def test_failing_writes_lead_to_isolated_failures(tmp_path, monkeypatch, pytestconfig): + monkeypatch.chdir(tmp_path) + queue_system = None + if pytestconfig.getoption("lsf"): + queue_system = "LSF" + elif pytestconfig.getoption("slurm"): + queue_system = "SLURM" + else: + pytest.skip() + (tmp_path / "config.ert").write_text( + f""" + QUEUE_SYSTEM {queue_system} + NUM_REALIZATIONS 10 + """ + ) + with pytest.raises( + ErtCliError, + match=r"(?s)active realizations \(9\) is less than .* MIN_REALIZATIONS\(10\).*" + r"Driver reported: Could not create submit script: Don't like realization-1", + ), patch_raising_named_temporary_file(queue_system.lower()): + run_cli("ensemble_experiment", "config.ert", "--disable-monitoring") diff --git a/tests/ui_tests/gui/conftest.py b/tests/ui_tests/gui/conftest.py index 22eabae1b42..72678a02e7e 100644 --- a/tests/ui_tests/gui/conftest.py +++ b/tests/ui_tests/gui/conftest.py @@ -213,7 +213,7 @@ def ensemble_experiment_has_run_no_failure( @pytest.fixture(name="run_experiment", scope="module") def run_experiment_fixture(request): - def func(experiment_mode, gui): + def func(experiment_mode, gui, click_done=True): qtbot = QtBot(request) with contextlib.suppress(FileNotFoundError): shutil.rmtree("poly_out") @@ -245,25 +245,23 @@ def handle_dialog(): QTimer.singleShot(500, handle_dialog) qtbot.mouseClick(run_experiment, Qt.LeftButton) - # The Run dialog opens, click show details and wait until done appears - # then click it - qtbot.waitUntil(lambda: gui.findChild(RunDialog) is not None) - run_dialog = gui.findChild(RunDialog) - - qtbot.waitUntil(run_dialog.done_button.isVisible, timeout=200000) - qtbot.waitUntil(lambda: run_dialog._tab_widget.currentWidget() is not None) - - # Assert that the number of boxes in the detailed view is - # equal to the number of realizations - realization_widget = run_dialog._tab_widget.currentWidget() - assert isinstance(realization_widget, RealizationWidget) - list_model = realization_widget._real_view.model() - assert ( - list_model.rowCount() - == experiment_panel.config.model_config.num_realizations - ) - - qtbot.mouseClick(run_dialog.done_button, Qt.LeftButton) + if click_done: + # The Run dialog opens, click show details and wait until done appears + # then click it + run_dialog = wait_for_child(gui, qtbot, RunDialog, timeout=10000) + qtbot.waitUntil(run_dialog.done_button.isVisible, timeout=200000) + qtbot.waitUntil(lambda: run_dialog._tab_widget.currentWidget() is not None) + + # Assert that the number of boxes in the detailed view is + # equal to the number of realizations + realization_widget = run_dialog._tab_widget.currentWidget() + assert isinstance(realization_widget, RealizationWidget) + list_model = realization_widget._real_view.model() + assert ( + list_model.rowCount() + == experiment_panel.config.model_config.num_realizations + ) + qtbot.mouseClick(run_dialog.done_button, Qt.LeftButton) return func diff --git a/tests/ui_tests/gui/test_main_window.py b/tests/ui_tests/gui/test_main_window.py index 269d0dff40a..9c727e42c4b 100644 --- a/tests/ui_tests/gui/test_main_window.py +++ b/tests/ui_tests/gui/test_main_window.py @@ -692,6 +692,10 @@ def test_that_a_failing_job_shows_error_message_with_context( run_experiment = get_child(experiment_panel, QWidget, name="run_experiment") def handle_error_dialog(run_dialog): + qtbot.waitUntil( + lambda: run_dialog.fail_msg_box is not None, + timeout=20000, + ) error_dialog = run_dialog.fail_msg_box assert error_dialog text = error_dialog.details_text.toPlainText() @@ -707,13 +711,13 @@ def handle_error_dialog(run_dialog): ] for substring in expected_substrings: assert substring in text - qtbot.mouseClick(error_dialog.box.buttons()[0], Qt.LeftButton) + error_dialog.accept() qtbot.mouseClick(run_experiment, Qt.LeftButton) run_dialog = wait_for_child(gui, qtbot, RunDialog) - QTimer.singleShot(20000, lambda: handle_error_dialog(run_dialog)) + QTimer.singleShot(200, lambda: handle_error_dialog(run_dialog)) qtbot.waitUntil(run_dialog.done_button.isVisible, timeout=100000) diff --git a/tests/ui_tests/gui/test_missing_runpath.py b/tests/ui_tests/gui/test_missing_runpath.py new file mode 100644 index 00000000000..4b7409cdb6a --- /dev/null +++ b/tests/ui_tests/gui/test_missing_runpath.py @@ -0,0 +1,95 @@ +import stat +from contextlib import suppress + +from qtpy.QtCore import Qt, QTimer +from qtpy.QtWidgets import ( + QLabel, +) + +from ert.gui.simulation.run_dialog import RunDialog +from ert.run_models import EnsembleExperiment + +from .conftest import open_gui_with_config, wait_for_child + +config_contents = """\ +QUEUE_SYSTEM {queue_system} +NUM_REALIZATIONS 10 +LOAD_WORKFLOW_JOB CHMOD_JOB CHMOD +LOAD_WORKFLOW CHMOD.wf CHMOD.wf +HOOK_WORKFLOW CHMOD.wf PRE_SIMULATION + +""" + +workflow_contents = """\ +CHMOD +""" + +workflow_job_contents = """\ +EXECUTABLE chmod.sh +""" + +chmod_sh_contents = """\ +#!/bin/bash +chmod 000 {tmp_path}/simulations/realization-0/iter-0 +""" + + +def write_config(tmp_path, queue_system): + (tmp_path / "config.ert").write_text( + config_contents.format(queue_system=queue_system) + ) + (tmp_path / "CHMOD_JOB").write_text(workflow_job_contents) + (tmp_path / "CHMOD.wf").write_text(workflow_contents) + (tmp_path / "chmod.sh").write_text(chmod_sh_contents.format(tmp_path=tmp_path)) + (tmp_path / "chmod.sh").chmod( + (tmp_path / "chmod.sh").stat().st_mode + | stat.S_IXUSR + | stat.S_IXGRP + | stat.S_IXOTH + ) + + +def test_missing_runpath_has_isolated_failures( + tmp_path, run_experiment, qtbot, monkeypatch +): + monkeypatch.chdir(tmp_path) + write_config(tmp_path, "LOCAL") + + def handle_message_box(run_dialog): + def inner(): + qtbot.waitUntil( + lambda: run_dialog.fail_msg_box is not None, + timeout=20000, + ) + + message_box = run_dialog.fail_msg_box + assert message_box is not None + assert message_box.label_text.text() == "ERT experiment failed!" + message_box.accept() + + return inner + + try: + for gui in open_gui_with_config(tmp_path / "config.ert"): + qtbot.addWidget(gui) + run_experiment(EnsembleExperiment, gui, click_done=False) + run_dialog = wait_for_child(gui, qtbot, RunDialog, timeout=10000) + + QTimer.singleShot(100, handle_message_box(run_dialog)) + qtbot.waitUntil(run_dialog.done_button.isVisible, timeout=200000) + assert ( + "9/10" + in run_dialog._progress_widget.findChild( + QLabel, name="progress_label_text_Finished" + ).text() + ) + assert ( + "1/10" + in run_dialog._progress_widget.findChild( + QLabel, name="progress_label_text_Failed" + ).text() + ) + qtbot.mouseClick(run_dialog.done_button, Qt.LeftButton) + finally: + with suppress(FileNotFoundError): + (tmp_path / "simulations/realization-0/iter-0").chmod(0x777) diff --git a/tests/unit_tests/gui/simulation/test_run_dialog.py b/tests/unit_tests/gui/simulation/test_run_dialog.py index e1cd4bd4a43..4ad9ee69f73 100644 --- a/tests/unit_tests/gui/simulation/test_run_dialog.py +++ b/tests/unit_tests/gui/simulation/test_run_dialog.py @@ -481,6 +481,10 @@ def test_that_exception_in_base_run_model_is_handled(qtbot: QtBot, storage): run_experiment = gui.findChild(QToolButton, name="run_experiment") def handle_error_dialog(run_dialog): + qtbot.waitUntil( + lambda: run_dialog.fail_msg_box is not None, + timeout=20000, + ) error_dialog = run_dialog.fail_msg_box assert error_dialog text = error_dialog.details_text.toPlainText()