Skip to content

Commit

Permalink
Poll first for exit status before sending running status in fm_model_…
Browse files Browse the repository at this point in the history
…step

- This avoids sending the running message directly after sending the
 start message. For short running fm_steps this will avoid sending 3
 events in short succesion. Downside is that this will not provide the
 metadata from the running message such as memory consumption, but this
 is observed as unreliable for short running jobs anyway.
  • Loading branch information
larsevj committed Jan 29, 2025
1 parent 1e8cb1c commit c429eb4
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 27 deletions.
28 changes: 15 additions & 13 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,21 @@ def _run(self) -> Generator[Start | Exited | Running | None]:
max_memory_usage = 0
max_cpu_seconds = 0
fm_step_pids = {int(process.pid)}
while exit_code is None:
while True:
try:
exit_code = process.wait(timeout=self.MEMORY_POLL_PERIOD)
if exit_code is not None:
break
except TimeoutExpired:
potential_exited_msg = (
self.handle_process_timeout_and_create_exited_msg(
exit_code, proc, run_start_time
)
)
if isinstance(potential_exited_msg, Exited):
yield potential_exited_msg
return

(memory_rss, cpu_seconds, oom_score, pids) = _get_processtree_data(process)
max_cpu_seconds = max(max_cpu_seconds, cpu_seconds or 0)
fm_step_pids |= pids
Expand All @@ -205,18 +219,6 @@ def _run(self) -> Generator[Start | Exited | Running | None]:
),
)

try:
exit_code = process.wait(timeout=self.MEMORY_POLL_PERIOD)
except TimeoutExpired:
potential_exited_msg = (
self.handle_process_timeout_and_create_exited_msg(
exit_code, proc, run_start_time
)
)
if isinstance(potential_exited_msg, Exited):
yield potential_exited_msg
return

ensure_file_handles_closed([stdin, stdout, stderr])
exited_message = self._create_exited_message_based_on_exit_code(
max_memory_usage, target_file_mtime, exit_code, fm_step_pids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def test_run_with_process_failing(mock_process, mock_popen, mock_check_executabl
run = fmstep.run()

assert isinstance(next(run), Start), "run did not yield Start message"
assert isinstance(next(run), Running), "run did not yield Running message"
exited = next(run)
assert isinstance(exited, Exited), "run did not yield Exited message"
assert exited.exit_code == 9, "Exited message had unexpected exit code"
Expand Down Expand Up @@ -213,9 +212,9 @@ def test_run_fails_using_exit_bash_builtin():

statuses = list(fmstep.run())

assert len(statuses) == 3, "Wrong statuses count"
assert statuses[2].exit_code == 1, "Exited status wrong exit_code"
assert statuses[2].error_message == "Process exited with status code 1", (
assert len(statuses) == 2, "Wrong statuses count"
assert statuses[1].exit_code == 1, "Exited status wrong exit_code"
assert statuses[1].error_message == "Process exited with status code 1", (
"Exited status wrong error_message"
)

Expand Down
17 changes: 7 additions & 10 deletions tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import signal
import stat
import subprocess
import sys
from subprocess import Popen
from textwrap import dedent
Expand Down Expand Up @@ -116,8 +115,9 @@ def test_terminate_steps():
os.wait() # allow os to clean up zombie processes


@pytest.mark.integration_test
@pytest.mark.usefixtures("use_tmpdir")
def test_memory_profile_is_logged_as_csv():
def test_memory_profile_is_logged_as_csv(monkeypatch):
"""This tests that a csv is produced and has basic validity.
It does not try to verify the validity of the logged RSS values."""
fm_stepname = "do_nothing"
Expand All @@ -126,6 +126,7 @@ def test_memory_profile_is_logged_as_csv():
with open(scriptname, "w", encoding="utf-8") as script:
script.write(
"""#!/bin/sh
sleep 0.5
exit 0
"""
)
Expand All @@ -144,18 +145,14 @@ def test_memory_profile_is_logged_as_csv():
with open(JOBS_FILE, "w", encoding="utf-8") as f:
f.write(json.dumps(forward_model_steps))

subprocess.run(
[
sys.executable,
importlib.util.find_spec("_ert.forward_model_runner.fm_dispatch").origin,
os.getcwd(),
],
check=False,
monkeypatch.setattr(
_ert.forward_model_runner.runner.ForwardModelStep, "MEMORY_POLL_PERIOD", 0.1
)
main(["fm_dispatch", os.getcwd()])
csv_files = glob.glob("logs/memory-profile*csv")
mem_df = pd.read_csv(csv_files[0], parse_dates=True)
assert mem_df["timestamp"].is_monotonic_increasing
assert (mem_df["fm_step_id"].values == [0, 1, 2]).all()
assert (mem_df["fm_step_id"].unique() == [0, 1, 2]).all()
assert mem_df["fm_step_name"].unique() == [fm_stepname]
assert (mem_df["rss"] >= 0).all() # 0 has been observed

Expand Down

0 comments on commit c429eb4

Please sign in to comment.