Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilipDeegan committed Dec 18, 2022
1 parent f11619b commit 017173f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 48 deletions.
8 changes: 4 additions & 4 deletions pyphare/pyphare/pharesee/particles.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ def all_assert_sorted(part1, part2):
deltol = 1e-6 if any([part.deltas.dtype == np.float32 for part in [part1, part2]] ) else 1e-12

np.testing.assert_array_equal(part1.iCells[idx1], part2.iCells[idx2])
np.testing.assert_allclose(part1.deltas[idx1], part2.deltas[idx2], atol=deltol)
np.testing.assert_allclose(part1.deltas[idx1], part2.deltas[idx2], rtol=0, atol=deltol)

np.testing.assert_allclose(part1.v[idx1,0], part2.v[idx2,0], atol=1e-12)
np.testing.assert_allclose(part1.v[idx1,1], part2.v[idx2,1], atol=1e-12)
np.testing.assert_allclose(part1.v[idx1,2], part2.v[idx2,2], atol=1e-12)
np.testing.assert_allclose(part1.v[idx1,0], part2.v[idx2,0], rtol=0, atol=1e-12)
np.testing.assert_allclose(part1.v[idx1,1], part2.v[idx2,1], rtol=0, atol=1e-12)
np.testing.assert_allclose(part1.v[idx1,2], part2.v[idx2,2], rtol=0, atol=1e-12)


def any_assert(part1, part2):
Expand Down
98 changes: 54 additions & 44 deletions pyphare/pyphare/simulator/simulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
"""
import os

import sys
import json
import time
import inspect
import importlib
import concurrent

from enum import IntEnum
from pathlib import Path
from datetime import datetime
from multiprocessing import Process
Expand All @@ -31,6 +34,13 @@

shared_size = (200, 200) # kinda arbitrary but over allocated

class SharedSimulationStateEnum(IntEnum):
CURRENT_TIME = 0
IS_BUSY = 7
CAN_ADVANCE = 8
IS_FINISHED = 9



def create_shared_block(n_sims):
a = np.zeros(shared_size, dtype=np.int64)
Expand All @@ -39,52 +49,52 @@ def create_shared_block(n_sims):
return shm, np_array


def atomic_set(port, shr_name, pos, val):
def atomic_set(sim_id, shr_name, pos, val):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray(shared_size, dtype=np.int64, buffer=existing_shm.buf)
lock.acquire()

np_array[port][pos] = val
np_array[sim_id][pos] = val

lock.release()
existing_shm.close()


def poll(port, shr_name):
def poll(sim_id, shr_name):
while True:
# print("polling", port)
wait_time = state_machine(port, shr_name)
# print("polling", sim_id)
wait_time = state_machine(sim_id, shr_name)
time.sleep(wait_time)


def state_machine(port, shr_name):
def state_machine(sim_id, shr_name):
result = 2
finished = is_finished()

existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray(shared_size, dtype=np.int64, buffer=existing_shm.buf)

lock.acquire() # atomic operations below
should_advance = np_array[port][8] == 1
should_advance = np_array[sim_id][SharedSimulationStateEnum.CAN_ADVANCE] == 1
if should_advance:
np_array[port][7] = 1
np_array[port][8] = 0
np_array[sim_id][SharedSimulationStateEnum.IS_BUSY] = 1
np_array[sim_id][SharedSimulationStateEnum.CAN_ADVANCE] = 0
lock.release() # atomic operations above

if not finished and should_advance:
advance_sim()

lock.acquire() # atomic operations below
if should_advance:
np_array[port][7] = 0
np_array[port][9] = is_finished()
np_array[port][0] = 1e9 * current_time()
np_array[sim_id][SharedSimulationStateEnum.IS_BUSY] = 0
np_array[sim_id][SharedSimulationStateEnum.IS_FINISHED] = is_finished()
np_array[sim_id][SharedSimulationStateEnum.CURRENT_TIME] = 1e9 * current_time()
lock.release() # atomic operations above

existing_shm.close()

if should_advance and finished:
print("FINISHED", port)
print("FINISHED", sim_id)
exit(0)

return result
Expand All @@ -99,8 +109,8 @@ def prepend_python_path(val):
sys.path = [val] + sys.path


def build_diag_dir(port):
return str(phare_runs_dir / f"diags_{os.environ['PHARE_LOG_TIME']}-ID={port}")
def build_diag_dir(sim_id):
return str(phare_runs_dir / f"diags_{os.environ['PHARE_LOG_TIME']}-ID={sim_id}")


def init_sim(sim):
Expand All @@ -126,31 +136,31 @@ def current_time():
return _globals["simulator"].currentTime()


def init_simulation(port, sim, shr_name, dic):
def init_simulation(sim_id, sim, shr_name, dic):
set_env(dic)

init_sim(sim)

poll(port, shr_name)
poll(sim_id, shr_name)


def start_server_process(port, sim, shr_name, dic):
def start_server_process(sim_id, sim, shr_name, dic):
servers = _globals["servers"]
assert port not in servers
assert sim_id not in servers

if "build_dir" in dic:
prepend_python_path(dic["build_dir"])

_globals["servers"][port] = Process(
target=init_simulation, args=(port, sim, shr_name, dic)
_globals["servers"][sim_id] = Process(
target=init_simulation, args=(sim_id, sim, shr_name, dic)
)
_globals["servers"][port].start()
_globals["servers"][sim_id].start()

try_count = 5
for i in range(1, try_count + 1):
time.sleep(0.5)
try:
assert servers[port].exitcode is None # or it crashed/exited early
assert servers[sim_id].exitcode is None # or it crashed/exited early
return
except Exception as e:
if i == try_count:
Expand All @@ -171,11 +181,11 @@ def build_dir_path(path):


class Simulators:
def __init__(self, starting_port=10):
def __init__(self, starting_sim_id=10):
self.simulations = []
self.simulation_configs = []
self.states = dict(init=False)
self.starting_port = starting_port
self.starting_sim_id = starting_sim_id
self.log_time = datetime.now().strftime("%m_%d_%Y_%H_%M_%S")
os.environ["PHARE_LOG_TIME"] = self.log_time

Expand All @@ -193,22 +203,22 @@ def init(self):
shr, np_array = create_shared_block(len(self.simulations))
self.shr = shr
self.shared_np_array = np_array
self._state_machine_set_per_simulation(7, 1)
self._state_machine_set_per_simulation(SharedSimulationStateEnum.IS_BUSY, 1)

self.thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=len(self.simulations)
)

for i, simulation in enumerate(self.simulations):
port = self.starting_port + i
sim_id = self.starting_sim_id + i
simulation_config = self.simulation_configs[i]

init_dict = dict(SIM_ID=str(port), PHARE_LOG_TIME=self.log_time)
init_dict = dict(SIM_ID=str(sim_id), PHARE_LOG_TIME=self.log_time)
if simulation_config["build_dir"]:
init_dict["build_dir"] = build_dir_path(simulation_config["build_dir"])

start_server_process(
port,
sim_id,
simulation,
shr.name,
init_dict,
Expand All @@ -222,8 +232,8 @@ def _state_machine_list_for_value(self, offset):

results = np.ndarray(len(self.simulations))
for i, simulation in enumerate(self.simulations):
port = self.starting_port + i
results[i] = np_array[port][offset]
sim_id = self.starting_sim_id + i
results[i] = np_array[sim_id][offset]

lock.release()
existing_shm.close()
Expand All @@ -235,16 +245,16 @@ def _state_machine_set_per_simulation(self, offset, val):
lock.acquire()

for i, simulation in enumerate(self.simulations):
port = self.starting_port + i
np_array[port][offset] = val
sim_id = self.starting_sim_id + i
np_array[sim_id][offset] = val

lock.release()
existing_shm.close()

def wait_for_simulations(self):
# we have to wait for all simulations to finish the current timestep
while True:
if np.sum(self._state_machine_list_for_value(7)) == 0:
if np.sum(self._state_machine_list_for_value(SharedSimulationStateEnum.IS_BUSY)) == 0:
return
time.sleep(1)

Expand All @@ -253,24 +263,24 @@ def advance(self, compare=False):
self.init()

atomic_set(0, self.shr.name, 0, int(compare))
self._state_machine_set_per_simulation(8, 1)
self._state_machine_set_per_simulation(SharedSimulationStateEnum.CAN_ADVANCE, 1)

if compare:
self._state_machine_set_per_simulation(7, 1)
self._state_machine_set_per_simulation(SharedSimulationStateEnum.IS_BUSY, 1)
# we have to wait for all simulations to finish the current timestep
self.wait_for_simulations()
self.compare()

def compare(self):
for i, simulation in enumerate(self.simulations):
assert (
_globals["servers"][self.starting_port + i].exitcode is None
_globals["servers"][self.starting_sim_id + i].exitcode is None
), "or it crashed early"

sim_times = self._state_machine_list_for_value(0)
sim_times = self._state_machine_list_for_value(SharedSimulationStateEnum.CURRENT_TIME)

times = {
i: (0.0 + sim_times[i]) / 1e9 # :(
i: (0.0 + sim_times[i]) / 1e9 # :( it's an int so after decimal is dropped
for i, simulation in enumerate(self.simulations)
}

Expand All @@ -281,16 +291,16 @@ def compare(self):
for j in range(i + 1, len(times)):
if times[i] == times[j]:
run0 = Run(
build_diag_dir(self.starting_port + i),
build_diag_dir(self.starting_sim_id + i),
single_hier_for_all_quantities=True,
)
run1 = Run(
build_diag_dir(self.starting_port + j),
build_diag_dir(self.starting_sim_id + j),
single_hier_for_all_quantities=True,
)

print(
f"comparing {self.starting_port + i} and {self.starting_port + j} at time {times[i]}"
f"comparing {self.starting_sim_id + i} and {self.starting_sim_id + j} at time {times[i]}"
)
assert hierarchy_compare(
run0.GetAllAvailableQties(times[i], []),
Expand All @@ -305,8 +315,8 @@ def run(self, compare=False):
while self.simulations:
self.advance(compare=compare)

is_busy = self._state_machine_list_for_value(7)
is_finished = self._state_machine_list_for_value(9)
is_busy = self._state_machine_list_for_value(SharedSimulationStateEnum.IS_BUSY)
is_finished = self._state_machine_list_for_value(SharedSimulationStateEnum.IS_FINISHED)

# trim finished simulations
self.simulations = [
Expand Down

0 comments on commit 017173f

Please sign in to comment.