diff --git a/dfir_rs/src/scheduled/context.rs b/dfir_rs/src/scheduled/context.rs index 5540dc94c7d..83299ff0f96 100644 --- a/dfir_rs/src/scheduled/context.rs +++ b/dfir_rs/src/scheduled/context.rs @@ -45,7 +45,8 @@ pub struct Context { pub(super) current_stratum: usize, pub(super) current_tick_start: SystemTime, - pub(super) subgraph_last_tick_run_in: Option, + pub(super) is_first_run_this_tick: bool, + pub(super) is_first_loop_iteration: bool, /// The SubgraphId of the currently running operator. When this context is /// not being forwarded to a running operator, this field is meaningless. @@ -70,10 +71,15 @@ impl Context { /// Gets whether this is the first time this subgraph is being scheduled for this tick pub fn is_first_run_this_tick(&self) -> bool { - self.subgraph_last_tick_run_in - .map_or(true, |tick_last_run_in| { - self.current_tick > tick_last_run_in - }) + self.is_first_run_this_tick + } + + /// Gets whether this run is the first iteration of a loop. + /// + /// This is only meaningful if the subgraph is in a loop, otherwise this will always return + /// `false`. + pub fn is_first_loop_iteration(&self) -> bool { + self.is_first_loop_iteration } /// Gets the current stratum nubmer. @@ -237,7 +243,8 @@ impl Default for Context { current_tick: TickInstant::default(), current_tick_start: SystemTime::now(), - subgraph_last_tick_run_in: None, + is_first_run_this_tick: true, + is_first_loop_iteration: true, // Will be re-set before use. subgraph_id: SubgraphId::from_raw(0), diff --git a/dfir_rs/src/scheduled/graph.rs b/dfir_rs/src/scheduled/graph.rs index a71cfcab9ab..6fe4ce365c3 100644 --- a/dfir_rs/src/scheduled/graph.rs +++ b/dfir_rs/src/scheduled/graph.rs @@ -270,11 +270,20 @@ impl<'a> Dfir<'a> { let current_tick = self.context.current_tick; let mut work_done = false; - - while let Some((_depth, sg_id)) = + let mut prev_depth = 0; + // How many times we have entered a loop. + let mut loop_counter = 1; + while let Some((curr_depth, sg_id)) = self.context.stratum_queues[self.context.current_stratum].pop() { work_done = true; + + let entered_loop = curr_depth > prev_depth; + if entered_loop { + loop_counter += 1; + prev_depth = curr_depth; + } + { let sg_data = &mut self.subgraphs[sg_id]; // This must be true for the subgraph to be enqueued. @@ -286,7 +295,13 @@ impl<'a> Dfir<'a> { ); self.context.subgraph_id = sg_id; - self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in; + self.context.is_first_run_this_tick = sg_data + .last_tick_run_in + .is_none_or(|last_tick| last_tick < self.context.current_tick); + self.context.is_first_loop_iteration = self.context.is_first_run_this_tick + || (sg_data.last_loop_counter < loop_counter); + sg_data.last_loop_counter = loop_counter; + sg_data.subgraph.run(&mut self.context, &mut self.handoffs); sg_data.last_tick_run_in = Some(current_tick); } @@ -914,6 +929,8 @@ pub(super) struct SubgraphData<'a> { /// Keep track of the last tick that this subgraph was run in last_tick_run_in: Option, + /// Used to track the first iteration in each loop execution. + last_loop_counter: usize, /// If this subgraph is marked as lazy, then sending data back to a lower stratum does not trigger a new tick to be run. is_lazy: bool, @@ -945,6 +962,7 @@ impl<'a> SubgraphData<'a> { succs, is_scheduled: Cell::new(is_scheduled), last_tick_run_in: None, + last_loop_counter: 0, is_lazy, loop_id, loop_depth,