diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index 854f17a939..15c1ab00fa 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -6,6 +6,7 @@ from artiq.experiment import * from artiq.master.scheduler import Scheduler +from sipyco.sync_struct import process_mod class EmptyExperiment(EnvExperiment): @@ -50,29 +51,6 @@ def _get_expid(name): } -def _get_basic_steps(rid, expid, priority=0, flush=False): - return [ - {"action": "setitem", "key": rid, "value": - {"pipeline": "main", "status": "pending", "priority": priority, - "expid": expid, "due_date": None, "flush": flush, - "repo_msg": None}, - "path": []}, - {"action": "setitem", "key": "status", "value": "preparing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "prepare_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "running", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "run_done", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "analyzing", - "path": [rid]}, - {"action": "setitem", "key": "status", "value": "deleting", - "path": [rid]}, - {"action": "delitem", "key": rid, "path": []} - ] - - class _RIDCounter: def __init__(self, next_rid): self._next_rid = next_rid @@ -83,54 +61,130 @@ def get(self): return rid -class SchedulerCase(unittest.TestCase): +class SchedulerMonitor: + flow_map = { # current status -> possible move + "": {"pending"}, + "pending": {"preparing", "flushing", "deleting"}, + "preparing": {"prepare_done", "deleting"}, + "prepare_done": {"running", "deleting"}, + "running": {"run_done", "paused", "deleting"}, + "run_done": {"analyzing", "deleting"}, + "analyzing": {"deleting"}, + "deleting": {}, + "paused": {"running"}, + "flushing": {"preparing"} + } + + def __init__(self, test): + self.test = test + self.experiments = {} + self.last_status = {} + self.flags = {"arrive": {}, "leave": {}} + + def record(self, mod): + process_mod(self.experiments, mod) + for rid, exp_info in self.experiments.items(): + if rid not in self.last_status.keys(): + self.last_status[rid] = "" + if exp_info["status"] != self.last_status[rid]: + self.test.assertIn(exp_info["status"], + self.flow_map[self.last_status[rid]]) + + if rid in self.flags["arrive"].keys(): + if exp_info["status"] in self.flags["arrive"][rid].keys(): + self.flags["arrive"][rid][exp_info["status"]].set() + if rid in self.flags["leave"].keys(): + if self.last_status[rid] in self.flags["leave"][rid].keys(): + self.flags["leave"][rid][self.last_status[rid]].set() + + self.last_status[rid] = exp_info["status"] + return + + async def wait_until(self, rid, condition, status): + # condition : "arrive", "leave" + if self.last_status[rid] == status and condition == "arrive": + return + if rid not in self.flags[condition] or\ + status not in self.flags[condition][rid]: + self.add_flag(rid, condition, status) + await self.flags[condition][rid][status].wait() + self.remove_flag(rid, condition, status) + return rid + + def add_flag(self, rid, condition, status): + if rid not in self.flags[condition]: + self.flags[condition][rid] = {} + self.flags[condition][rid][status] = asyncio.Event() + + def remove_flag(self, rid, condition, status): + if rid in self.flags[condition].keys(): + if status in self.flags[condition][rid].keys(): + del self.flags[condition][rid][status] + if not self.flags[condition][rid]: + del self.flags[condition][rid] + + +class AssertScheduler: + def assertStatusEqual(self, rid, status): + rid_status = self.monitor.last_status[rid] + if rid_status != status: + raise AssertionError(f"Status of rid {rid} should be " + f"{status}, instead of {rid_status}") + + def assertArriveStatus(self, rid, status, time_out=10): + try: + self.loop.run_until_complete(asyncio.wait_for( + self.monitor.wait_until(rid, "arrive", status), + time_out)) + except asyncio.TimeoutError: + raise AssertionError(f"rid {rid} did not arrive " + f"{status} within {time_out}s") + + def assertStopped(self, task, time_out=10): + try: + self.loop.run_until_complete(asyncio.wait_for(task, time_out)) + except asyncio.TimeoutError: + raise AssertionError(f"{task} did not complete within {time_out}s") + + def assertFirstLeave(self, first_rid, rids, status): + done, pending = self.loop.run_until_complete(asyncio.wait( + [self.monitor.wait_until(rid, "leave", status) for rid in rids], + return_when=asyncio.FIRST_COMPLETED)) + for task in pending: + task.cancel() + if done.pop().result() != first_rid: + raise AssertionError(f"rid {first_rid} did not leave" + f" {status} first") + + +class SchedulerCase(unittest.TestCase, AssertScheduler): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + self.handlers = {} + self.scheduler = Scheduler(_RIDCounter(0), self.handlers, None, None) + self.monitor = SchedulerMonitor(self) + self.scheduler.notifier.publish = self.monitor.record + self.scheduler.start() def test_steps(self): - loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid) - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() - # Verify that a timed experiment far in the future does not # get run, even if it has high priority. late = time() + 100000 - expect.insert(0, - {"action": "setitem", "key": 0, "value": - {"pipeline": "main", "status": "pending", "priority": 99, - "expid": expid, "due_date": late, "flush": False, - "repo_msg": None}, - "path": []}) - scheduler.submit("main", expid, 99, late, False) + self.scheduler.submit("main", expid, 99, late, False) # This one (RID 1) gets run instead. - scheduler.submit("main", expid, 0, None, False) + self.scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(done.wait()) - scheduler.notifier.publish = None - loop.run_until_complete(scheduler.stop()) + self.assertArriveStatus(1, "deleting") + self.assertStatusEqual(0, "pending") def test_pending_priority(self): """Check due dates take precedence over priorities when waiting to prepare.""" - loop = self.loop - handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) - handlers["scheduler_check_pause"] = scheduler.check_pause + self.handlers["scheduler_check_pause"] = self.scheduler.check_pause expid_empty = _get_expid("EmptyExperiment") @@ -144,145 +198,16 @@ def test_pending_priority(self): late = time() + 100000 early = time() + 1 - expect = [ - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": low_priority, - "pipeline": "main", - "due_date": None, - "status": "pending", - "expid": expid_bg, - "flush": False - }, - "key": 0 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": high_priority, - "pipeline": "main", - "due_date": late, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 1 - }, - { - "path": [], - "action": "setitem", - "value": { - "repo_msg": None, - "priority": middle_priority, - "pipeline": "main", - "due_date": early, - "status": "pending", - "expid": expid_empty, - "flush": False - }, - "key": 2 - }, - { - "path": [0], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "preparing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "prepare_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "paused", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "run_done", - "key": "status" - }, - { - "path": [0], - "action": "setitem", - "value": "running", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "analyzing", - "key": "status" - }, - { - "path": [2], - "action": "setitem", - "value": "deleting", - "key": "status" - }, - { - "path": [], - "action": "delitem", - "key": 2 - }, - ] - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() - - scheduler.submit("main", expid_bg, low_priority) - scheduler.submit("main", expid_empty, high_priority, late) - scheduler.submit("main", expid_empty, middle_priority, early) - - loop.run_until_complete(done.wait()) - scheduler.notifier.publish = None - loop.run_until_complete(scheduler.stop()) + self.scheduler.submit("main", expid_bg, low_priority) + self.scheduler.submit("main", expid_empty, high_priority, late) + self.scheduler.submit("main", expid_empty, middle_priority, early) - def test_pause(self): - loop = self.loop + self.assertFirstLeave(2, [1, 2], "pending") + self.assertArriveStatus(2, "deleting") + def test_pause(self): termination_ok = False + def check_termination(mod): nonlocal termination_ok self.assertEqual( @@ -290,139 +215,67 @@ def check_termination(mod): {"action": "setitem", "key": "termination_ok", "value": (False, True), "path": []}) termination_ok = True - handlers = { - "update_dataset": check_termination - } - scheduler = Scheduler(_RIDCounter(0), handlers, None, None) + self.handlers["update_dataset"] = check_termination expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") - expect = _get_basic_steps(1, expid) - background_running = asyncio.Event() - empty_ready = asyncio.Event() - empty_completed = asyncio.Event() - background_completed = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - empty_completed.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) - self.assertFalse(scheduler.check_pause(0)) - scheduler.submit("main", expid, 0, None, False) - self.assertFalse(scheduler.check_pause(0)) - loop.run_until_complete(empty_ready.wait()) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(empty_completed.wait()) - self.assertFalse(scheduler.check_pause(0)) - + # check_pause is True when rid with higher priority is prepare_done + self.scheduler.submit("main", expid_bg, -99, None, False) + self.assertArriveStatus(0, "running") + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.submit("main", expid, 0, None, False) + self.assertFalse(self.scheduler.check_pause(0)) + self.assertArriveStatus(1, "prepare_done") + self.assertTrue(self.scheduler.check_pause(0)) + self.assertArriveStatus(1, "deleting") + self.assertFalse(self.scheduler.check_pause(0)) + + # check_pause is True when request_termination is called self.assertFalse(termination_ok) - scheduler.request_termination(0) - self.assertTrue(scheduler.check_pause(0)) - loop.run_until_complete(background_completed.wait()) + self.assertFalse(self.scheduler.check_pause(0)) + self.scheduler.request_termination(0) + self.assertTrue(self.scheduler.check_pause(0)) + self.assertArriveStatus(0, "deleting") self.assertTrue(termination_ok) - loop.run_until_complete(scheduler.stop()) - def test_close_with_active_runs(self): """Check scheduler exits with experiments still running""" - loop = self.loop - - scheduler = Scheduler(_RIDCounter(0), {}, None, None) - expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. expid_bg["log_level"] = logging.CRITICAL expid = _get_expid("EmptyExperiment") - background_running = asyncio.Event() - empty_ready = asyncio.Event() - background_completed = asyncio.Event() - def notify(mod): - if mod == {"path": [0], - "value": "running", - "key": "status", - "action": "setitem"}: - background_running.set() - if mod == {"path": [0], - "value": "deleting", - "key": "status", - "action": "setitem"}: - background_completed.set() - if mod == {"path": [1], - "value": "prepare_done", - "key": "status", - "action": "setitem"}: - empty_ready.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid_bg, -99, None, False) - loop.run_until_complete(background_running.wait()) - - scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(empty_ready.wait()) + self.scheduler.submit("main", expid_bg, -99, None, False) + self.assertArriveStatus(0, "running") + + self.scheduler.submit("main", expid, 0, None, False) + self.assertArriveStatus(1, "prepare_done") # At this point, (at least) BackgroundExperiment is still running; make # sure we can stop the scheduler without hanging. - loop.run_until_complete(scheduler.stop()) def test_flush(self): - loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None, None) + self.handlers["scheduler_check_pause"] = self.scheduler.check_pause expid = _get_expid("EmptyExperiment") + expid_bg = _get_expid("CheckPauseBackgroundExperiment") + expid_bg["log_level"] = logging.CRITICAL - expect = _get_basic_steps(1, expid, 1, True) - expect.insert(1, {"key": "status", - "path": [1], - "value": "flushing", - "action": "setitem"}) - first_preparing = asyncio.Event() - done = asyncio.Event() - expect_idx = 0 - def notify(mod): - nonlocal expect_idx - if mod == {"path": [0], - "value": "preparing", - "key": "status", - "action": "setitem"}: - first_preparing.set() - if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): - self.assertEqual(mod, expect[expect_idx]) - expect_idx += 1 - if expect_idx >= len(expect): - done.set() - scheduler.notifier.publish = notify - - scheduler.start() - scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(first_preparing.wait()) - scheduler.submit("main", expid, 1, None, True) - loop.run_until_complete(done.wait()) - loop.run_until_complete(scheduler.stop()) + # Flush with same priority + self.scheduler.submit("main", expid, 0, None, False) + self.scheduler.submit("main", expid, 0, None, True) + self.assertArriveStatus(1, "preparing") + self.assertStatusEqual(0, "deleting") + self.assertArriveStatus(1, "deleting") + + # Flush with higher priority + self.scheduler.submit("main", expid_bg, 0, None, False) + # Make sure RID 2 go into preparing stage first + self.assertArriveStatus(2, "preparing") + self.scheduler.submit("main", expid, 1, None, True) + self.assertArriveStatus(3, "deleting") + self.assertStatusEqual(2, "running") def tearDown(self): + self.assertStopped(self.scheduler.stop()) self.loop.close()