diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index cc3860921c..3026b367f6 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -8,11 +8,6 @@ from artiq.master.scheduler import Scheduler from sipyco.sync_struct import process_mod -basic_flow = ["pending", "preparing", "prepare_done", "running", - "run_done", "analyzing", "deleting"] - -flush_flow = ["pending", "flushing", "preparing", "prepare_done", - "running", "run_done", "analyzing", "deleting"] class EmptyExperiment(EnvExperiment): def build(self): @@ -66,27 +61,32 @@ def get(self): return rid class SchedulerMonitor: - def __init__(self): + def __init__(self, test): + self.test = test self.experiments = {} self.last_status = {} - self.exp_flow = {} self.flags = {"arrive": {}, "leave": {}} + self.flow_map = { # current status -> possible move + "": {"pending"}, + "pending": {"preparing", "flushing", "deleting"}, + "preparing": {"prepare_done", "deleting"}, + "prepare_done": {"running", "deleting"}, + "running": {"run_done", "paused", "deleting"}, + "run_done": {"analyzing", "deleting"}, + "analyzing": {"deleting"}, + "deleting": {}, + "paused" : {"running"}, + "flushing" : {"preparing"} + } def record(self, mod): process_mod(self.experiments, mod) for key, value in self.experiments.items(): if key not in self.last_status.keys(): self.last_status[key] = "" - self.exp_flow[key] = [] current_status = self.experiments[key]["status"] if current_status != self.last_status[key]: - if self.exp_flow[key]: - self.exp_flow[key][-1]["out_time"] = time() - self.exp_flow[key].append({ - "status": current_status, - "in_time": time(), - "out_time": "never" - }) + self.test.assertIn(current_status, self.flow_map[self.last_status[key]]) if key in self.flags["arrive"].keys(): if current_status in self.flags["arrive"][key].keys(): @@ -98,9 +98,6 @@ def record(self, mod): self.last_status[key] = current_status return - def get_status_order(self, rid): - return [step["status"] for step in self.exp_flow[rid]] - async def wait_until(self, rid, condition, status): # condition : "arrive", "leave" if self.last_status[rid] == status and condition == "arrive": @@ -123,6 +120,7 @@ def remove_flag(self, rid, condition, status): if not self.flags[condition][rid]: del self.flags[condition][rid] + class SchedulerCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() @@ -132,7 +130,7 @@ def test_steps(self): loop = self.loop scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -145,9 +143,7 @@ def test_steps(self): scheduler.submit("main", expid, 0, None, False) loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(1), basic_flow) self.assertEqual(monitor.last_status[0], "pending") - scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) def test_pending_priority(self): @@ -170,7 +166,7 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -190,8 +186,6 @@ def test_pending_priority(self): task.cancel() loop.run_until_complete(monitor.wait_until(2, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(2), basic_flow) - scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) def test_pause(self): @@ -213,7 +207,7 @@ def check_termination(mod): expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() # check_pause is True when rid with higher priority is prepare_done @@ -226,7 +220,6 @@ def check_termination(mod): self.assertTrue(scheduler.check_pause(0)) loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) self.assertFalse(scheduler.check_pause(0)) - self.assertEqual(monitor.get_status_order(1), basic_flow) # check_pause is True when request_termination is called self.assertFalse(termination_ok) @@ -247,7 +240,7 @@ def test_close_with_active_runs(self): expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) expid = _get_expid("EmptyExperiment") scheduler.notifier.publish = monitor.record @@ -270,7 +263,7 @@ def test_flush(self): expid = _get_expid("EmptyExperiment") expid_bg = _get_expid("CheckPauseBackgroundExperiment") expid_bg["log_level"] = logging.CRITICAL - monitor = SchedulerMonitor() + monitor = SchedulerMonitor(self) scheduler.notifier.publish = monitor.record scheduler.start() @@ -280,8 +273,6 @@ def test_flush(self): loop.run_until_complete(monitor.wait_until(1, "arrive", "preparing")) self.assertEqual(monitor.last_status[0], "deleting") loop.run_until_complete(monitor.wait_until(1, "arrive", "deleting")) - self.assertEqual(monitor.get_status_order(0), basic_flow) - self.assertEqual(monitor.get_status_order(1), flush_flow) # Flush with higher priority scheduler.submit("main", expid_bg, 0, None, False) @@ -290,7 +281,6 @@ def test_flush(self): scheduler.submit("main", expid, 1, None, True) loop.run_until_complete(monitor.wait_until(3, "arrive", "deleting")) self.assertEqual(monitor.last_status[2], "running") - self.assertEqual(monitor.get_status_order(3), flush_flow) loop.run_until_complete(scheduler.stop())