diff --git a/pyphare/pyphare/pharesee/particles.py b/pyphare/pyphare/pharesee/particles.py index 5d7413aed..0df62f028 100644 --- a/pyphare/pyphare/pharesee/particles.py +++ b/pyphare/pyphare/pharesee/particles.py @@ -194,11 +194,11 @@ def all_assert_sorted(part1, part2): deltol = 1e-6 if any([part.deltas.dtype == np.float32 for part in [part1, part2]] ) else 1e-12 np.testing.assert_array_equal(part1.iCells[idx1], part2.iCells[idx2]) - np.testing.assert_allclose(part1.deltas[idx1], part2.deltas[idx2], atol=deltol) + np.testing.assert_allclose(part1.deltas[idx1], part2.deltas[idx2], rtol=0, atol=deltol) - np.testing.assert_allclose(part1.v[idx1,0], part2.v[idx2,0], atol=1e-12) - np.testing.assert_allclose(part1.v[idx1,1], part2.v[idx2,1], atol=1e-12) - np.testing.assert_allclose(part1.v[idx1,2], part2.v[idx2,2], atol=1e-12) + np.testing.assert_allclose(part1.v[idx1,0], part2.v[idx2,0], rtol=0, atol=1e-12) + np.testing.assert_allclose(part1.v[idx1,1], part2.v[idx2,1], rtol=0, atol=1e-12) + np.testing.assert_allclose(part1.v[idx1,2], part2.v[idx2,2], rtol=0, atol=1e-12) def any_assert(part1, part2): diff --git a/pyphare/pyphare/simulator/simulators.py b/pyphare/pyphare/simulator/simulators.py index ab81271b8..ebdb2d023 100644 --- a/pyphare/pyphare/simulator/simulators.py +++ b/pyphare/pyphare/simulator/simulators.py @@ -4,12 +4,15 @@ """ import os + import sys import json import time import inspect import importlib import concurrent + +from enum import IntEnum from pathlib import Path from datetime import datetime from multiprocessing import Process @@ -31,6 +34,13 @@ shared_size = (200, 200) # kinda arbitrary but over allocated +class SharedSimulationStateEnum(IntEnum): + CURRENT_TIME = 0 + IS_BUSY = 7 + CAN_ADVANCE = 8 + IS_FINISHED = 9 + + def create_shared_block(n_sims): a = np.zeros(shared_size, dtype=np.int64) @@ -39,25 +49,25 @@ def create_shared_block(n_sims): return shm, np_array -def atomic_set(port, shr_name, pos, val): +def atomic_set(sim_id, shr_name, pos, val): existing_shm = shared_memory.SharedMemory(name=shr_name) np_array = np.ndarray(shared_size, dtype=np.int64, buffer=existing_shm.buf) lock.acquire() - np_array[port][pos] = val + np_array[sim_id][pos] = val lock.release() existing_shm.close() -def poll(port, shr_name): +def poll(sim_id, shr_name): while True: - # print("polling", port) - wait_time = state_machine(port, shr_name) + # print("polling", sim_id) + wait_time = state_machine(sim_id, shr_name) time.sleep(wait_time) -def state_machine(port, shr_name): +def state_machine(sim_id, shr_name): result = 2 finished = is_finished() @@ -65,10 +75,10 @@ def state_machine(port, shr_name): np_array = np.ndarray(shared_size, dtype=np.int64, buffer=existing_shm.buf) lock.acquire() # atomic operations below - should_advance = np_array[port][8] == 1 + should_advance = np_array[sim_id][SharedSimulationStateEnum.CAN_ADVANCE] == 1 if should_advance: - np_array[port][7] = 1 - np_array[port][8] = 0 + np_array[sim_id][SharedSimulationStateEnum.IS_BUSY] = 1 + np_array[sim_id][SharedSimulationStateEnum.CAN_ADVANCE] = 0 lock.release() # atomic operations above if not finished and should_advance: @@ -76,15 +86,15 @@ def state_machine(port, shr_name): lock.acquire() # atomic operations below if should_advance: - np_array[port][7] = 0 - np_array[port][9] = is_finished() - np_array[port][0] = 1e9 * current_time() + np_array[sim_id][SharedSimulationStateEnum.IS_BUSY] = 0 + np_array[sim_id][SharedSimulationStateEnum.IS_FINISHED] = is_finished() + np_array[sim_id][SharedSimulationStateEnum.CURRENT_TIME] = 1e9 * current_time() lock.release() # atomic operations above existing_shm.close() if should_advance and finished: - print("FINISHED", port) + print("FINISHED", sim_id) exit(0) return result @@ -99,8 +109,8 @@ def prepend_python_path(val): sys.path = [val] + sys.path -def build_diag_dir(port): - return str(phare_runs_dir / f"diags_{os.environ['PHARE_LOG_TIME']}-ID={port}") +def build_diag_dir(sim_id): + return str(phare_runs_dir / f"diags_{os.environ['PHARE_LOG_TIME']}-ID={sim_id}") def init_sim(sim): @@ -126,31 +136,31 @@ def current_time(): return _globals["simulator"].currentTime() -def init_simulation(port, sim, shr_name, dic): +def init_simulation(sim_id, sim, shr_name, dic): set_env(dic) init_sim(sim) - poll(port, shr_name) + poll(sim_id, shr_name) -def start_server_process(port, sim, shr_name, dic): +def start_server_process(sim_id, sim, shr_name, dic): servers = _globals["servers"] - assert port not in servers + assert sim_id not in servers if "build_dir" in dic: prepend_python_path(dic["build_dir"]) - _globals["servers"][port] = Process( - target=init_simulation, args=(port, sim, shr_name, dic) + _globals["servers"][sim_id] = Process( + target=init_simulation, args=(sim_id, sim, shr_name, dic) ) - _globals["servers"][port].start() + _globals["servers"][sim_id].start() try_count = 5 for i in range(1, try_count + 1): time.sleep(0.5) try: - assert servers[port].exitcode is None # or it crashed/exited early + assert servers[sim_id].exitcode is None # or it crashed/exited early return except Exception as e: if i == try_count: @@ -171,11 +181,11 @@ def build_dir_path(path): class Simulators: - def __init__(self, starting_port=10): + def __init__(self, starting_sim_id=10): self.simulations = [] self.simulation_configs = [] self.states = dict(init=False) - self.starting_port = starting_port + self.starting_sim_id = starting_sim_id self.log_time = datetime.now().strftime("%m_%d_%Y_%H_%M_%S") os.environ["PHARE_LOG_TIME"] = self.log_time @@ -193,22 +203,22 @@ def init(self): shr, np_array = create_shared_block(len(self.simulations)) self.shr = shr self.shared_np_array = np_array - self._state_machine_set_per_simulation(7, 1) + self._state_machine_set_per_simulation(SharedSimulationStateEnum.IS_BUSY, 1) self.thread_pool = concurrent.futures.ThreadPoolExecutor( max_workers=len(self.simulations) ) for i, simulation in enumerate(self.simulations): - port = self.starting_port + i + sim_id = self.starting_sim_id + i simulation_config = self.simulation_configs[i] - init_dict = dict(SIM_ID=str(port), PHARE_LOG_TIME=self.log_time) + init_dict = dict(SIM_ID=str(sim_id), PHARE_LOG_TIME=self.log_time) if simulation_config["build_dir"]: init_dict["build_dir"] = build_dir_path(simulation_config["build_dir"]) start_server_process( - port, + sim_id, simulation, shr.name, init_dict, @@ -222,8 +232,8 @@ def _state_machine_list_for_value(self, offset): results = np.ndarray(len(self.simulations)) for i, simulation in enumerate(self.simulations): - port = self.starting_port + i - results[i] = np_array[port][offset] + sim_id = self.starting_sim_id + i + results[i] = np_array[sim_id][offset] lock.release() existing_shm.close() @@ -235,8 +245,8 @@ def _state_machine_set_per_simulation(self, offset, val): lock.acquire() for i, simulation in enumerate(self.simulations): - port = self.starting_port + i - np_array[port][offset] = val + sim_id = self.starting_sim_id + i + np_array[sim_id][offset] = val lock.release() existing_shm.close() @@ -244,7 +254,7 @@ def _state_machine_set_per_simulation(self, offset, val): def wait_for_simulations(self): # we have to wait for all simulations to finish the current timestep while True: - if np.sum(self._state_machine_list_for_value(7)) == 0: + if np.sum(self._state_machine_list_for_value(SharedSimulationStateEnum.IS_BUSY)) == 0: return time.sleep(1) @@ -253,10 +263,10 @@ def advance(self, compare=False): self.init() atomic_set(0, self.shr.name, 0, int(compare)) - self._state_machine_set_per_simulation(8, 1) + self._state_machine_set_per_simulation(SharedSimulationStateEnum.CAN_ADVANCE, 1) if compare: - self._state_machine_set_per_simulation(7, 1) + self._state_machine_set_per_simulation(SharedSimulationStateEnum.IS_BUSY, 1) # we have to wait for all simulations to finish the current timestep self.wait_for_simulations() self.compare() @@ -264,13 +274,13 @@ def advance(self, compare=False): def compare(self): for i, simulation in enumerate(self.simulations): assert ( - _globals["servers"][self.starting_port + i].exitcode is None + _globals["servers"][self.starting_sim_id + i].exitcode is None ), "or it crashed early" - sim_times = self._state_machine_list_for_value(0) + sim_times = self._state_machine_list_for_value(SharedSimulationStateEnum.CURRENT_TIME) times = { - i: (0.0 + sim_times[i]) / 1e9 # :( + i: (0.0 + sim_times[i]) / 1e9 # :( it's an int so after decimal is dropped for i, simulation in enumerate(self.simulations) } @@ -281,16 +291,16 @@ def compare(self): for j in range(i + 1, len(times)): if times[i] == times[j]: run0 = Run( - build_diag_dir(self.starting_port + i), + build_diag_dir(self.starting_sim_id + i), single_hier_for_all_quantities=True, ) run1 = Run( - build_diag_dir(self.starting_port + j), + build_diag_dir(self.starting_sim_id + j), single_hier_for_all_quantities=True, ) print( - f"comparing {self.starting_port + i} and {self.starting_port + j} at time {times[i]}" + f"comparing {self.starting_sim_id + i} and {self.starting_sim_id + j} at time {times[i]}" ) assert hierarchy_compare( run0.GetAllAvailableQties(times[i], []), @@ -305,8 +315,8 @@ def run(self, compare=False): while self.simulations: self.advance(compare=compare) - is_busy = self._state_machine_list_for_value(7) - is_finished = self._state_machine_list_for_value(9) + is_busy = self._state_machine_list_for_value(SharedSimulationStateEnum.IS_BUSY) + is_finished = self._state_machine_list_for_value(SharedSimulationStateEnum.IS_FINISHED) # trim finished simulations self.simulations = [