diff --git a/pyphare/pyphare/core/phare_utilities.py b/pyphare/pyphare/core/phare_utilities.py index 27a092129..5a322c417 100644 --- a/pyphare/pyphare/core/phare_utilities.py +++ b/pyphare/pyphare/core/phare_utilities.py @@ -1,3 +1,4 @@ +import os import math import numpy as np @@ -174,3 +175,22 @@ def deep_copy(item, memo, excludes=[]): else: setattr(that, key, deepcopy(value, memo)) return that + + +def write_system_state_stats(file_path): + import json, psutil, pathlib, datetime + + file_path = pathlib.Path(file_path) if isinstance(file_path, str) else file_path + proc = psutil.Process(pid=os.getpid()) + file_path.parent.mkdir(exist_ok=True, parents=True) + + with open(file_path, "w") as file: + json.dump( + dict( + time=datetime.datetime.utcnow().timestamp(), + cpu_use=proc.cpu_percent(interval=0.1), + open_files=len(proc.open_files()), + mem_use=proc.memory_info().rss / 1024**2, + ), + file, + ) diff --git a/pyphare/pyphare/pharein/simulation.py b/pyphare/pyphare/pharein/simulation.py index 57116d45a..aab7070ea 100644 --- a/pyphare/pyphare/pharein/simulation.py +++ b/pyphare/pyphare/pharein/simulation.py @@ -581,14 +581,16 @@ def check_clustering(**kwargs): return clustering -def check_loadbalancing(**kwargs): - valid_keys = ["nppc", "homogeneous"] - loadbalancing = kwargs.get("loadbalancing", "nppc") - if loadbalancing not in valid_keys: - raise ValueError( - f"Error: loadbalancing type is not supported, supported types are {valid_keys}" - ) - return loadbalancing +def check_system_state_writes(**kwargs): + write_time = kwargs.get("system_state_write_every_t", 0) + if write_time > 0: + try: + import psutil + except ImportError as e: + raise ValueError( + "psutil is not available for writing system usage information" + ) + return write_time # ------------------------------------------------------------------------------ @@ -626,6 +628,7 @@ def wrapper(simulation_object, **kwargs): "description", "dry_run", "write_reports", + "system_state_write_every_t", ] accepted_keywords += check_optional_keywords(**kwargs) @@ -699,6 +702,8 @@ def wrapper(simulation_object, **kwargs): "write_reports", os.environ.get("PHARE_TESTING", "0") != "1" ) + kwargs["system_state_write_every_t"] = check_system_state_writes(**kwargs) + return func(simulation_object, **kwargs) return wrapper diff --git a/pyphare/pyphare/simulator/simulator.py b/pyphare/pyphare/simulator/simulator.py index 3e6229924..16c5743ee 100644 --- a/pyphare/pyphare/simulator/simulator.py +++ b/pyphare/pyphare/simulator/simulator.py @@ -71,7 +71,7 @@ def __init__(self, simulation, auto_dump=True, **kwargs): self.print_eol = "\r" self.print_eol = kwargs.get("print_eol", self.print_eol) self.log_to_file = kwargs.get("log_to_file", True) - + self.write_system_state_time = 0 self.auto_dump = auto_dump import pyphare.simulator._simulator as _simulator @@ -161,6 +161,7 @@ def advance(self, dt=None): if self._auto_dump() and self.post_advance != None: self.post_advance(self.cpp_sim.currentTime()) + self.write_system_state_stats() return self def times(self): @@ -255,6 +256,23 @@ def _check_init(self): if self.cpp_sim is None: self.initialize() + def write_system_state_stats(self): + import pyphare.core.phare_utilities as pu + from pyphare.cpp import cpp_lib + + if self.simulation.system_state_write_every_t == 0: + return + diff_time = self.cpp_sim.currentTime() - self.write_system_state_time + if ( + pu.fp_equal(diff_time, self.simulation.system_state_write_every_t) + or diff_time > self.simulation.system_state_write_every_t + ): + self.write_system_state_time = self.cpp_sim.currentTime() + time = "{:.10f}".format(self.cpp_sim.currentTime()) + pu.write_system_state_stats( + f".phare/sys/stats/{cpp_lib().mpi_rank()}/{time}" + ) + def _log_to_file(self): """ send C++ std::cout logs to files with env var PHARE_LOG diff --git a/requirements.txt b/requirements.txt index 2ecd76178..21f01139a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ scipy matplotlib seaborn sphinx_rtd_theme +psutil