Skip to content

Commit

Permalink
dubious code for Context::is_first_loop_iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Jan 15, 2025
1 parent a47a92a commit 806671e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
19 changes: 13 additions & 6 deletions dfir_rs/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ pub struct Context {
pub(super) current_stratum: usize,

pub(super) current_tick_start: SystemTime,
pub(super) subgraph_last_tick_run_in: Option<TickInstant>,
pub(super) is_first_run_this_tick: bool,
pub(super) is_first_loop_iteration: bool,

/// The SubgraphId of the currently running operator. When this context is
/// not being forwarded to a running operator, this field is meaningless.
Expand All @@ -70,10 +71,15 @@ impl Context {

/// Gets whether this is the first time this subgraph is being scheduled for this tick
pub fn is_first_run_this_tick(&self) -> bool {
self.subgraph_last_tick_run_in
.map_or(true, |tick_last_run_in| {
self.current_tick > tick_last_run_in
})
self.is_first_run_this_tick
}

/// Gets whether this run is the first iteration of a loop.
///
/// This is only meaningful if the subgraph is in a loop, otherwise this will always return
/// `false`.
pub fn is_first_loop_iteration(&self) -> bool {
self.is_first_loop_iteration
}

/// Gets the current stratum nubmer.
Expand Down Expand Up @@ -237,7 +243,8 @@ impl Default for Context {
current_tick: TickInstant::default(),

current_tick_start: SystemTime::now(),
subgraph_last_tick_run_in: None,
is_first_run_this_tick: true,
is_first_loop_iteration: true,

// Will be re-set before use.
subgraph_id: SubgraphId::from_raw(0),
Expand Down
24 changes: 21 additions & 3 deletions dfir_rs/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,20 @@ impl<'a> Dfir<'a> {
let current_tick = self.context.current_tick;

let mut work_done = false;

while let Some((_depth, sg_id)) =
let mut prev_depth = 0;
// How many times we have entered a loop.
let mut loop_counter = 1;
while let Some((curr_depth, sg_id)) =
self.context.stratum_queues[self.context.current_stratum].pop()
{
work_done = true;

let entered_loop = curr_depth > prev_depth;
if entered_loop {
loop_counter += 1;
prev_depth = curr_depth;
}

{
let sg_data = &mut self.subgraphs[sg_id];
// This must be true for the subgraph to be enqueued.
Expand All @@ -286,7 +295,13 @@ impl<'a> Dfir<'a> {
);

self.context.subgraph_id = sg_id;
self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in;
self.context.is_first_run_this_tick = sg_data
.last_tick_run_in
.is_none_or(|last_tick| last_tick < self.context.current_tick);
self.context.is_first_loop_iteration = self.context.is_first_run_this_tick
|| (sg_data.last_loop_counter < loop_counter);
sg_data.last_loop_counter = loop_counter;

sg_data.subgraph.run(&mut self.context, &mut self.handoffs);
sg_data.last_tick_run_in = Some(current_tick);
}
Expand Down Expand Up @@ -914,6 +929,8 @@ pub(super) struct SubgraphData<'a> {

/// Keep track of the last tick that this subgraph was run in
last_tick_run_in: Option<TickInstant>,
/// Used to track the first iteration in each loop execution.
last_loop_counter: usize,

/// If this subgraph is marked as lazy, then sending data back to a lower stratum does not trigger a new tick to be run.
is_lazy: bool,
Expand Down Expand Up @@ -945,6 +962,7 @@ impl<'a> SubgraphData<'a> {
succs,
is_scheduled: Cell::new(is_scheduled),
last_tick_run_in: None,
last_loop_counter: 0,
is_lazy,
loop_id,
loop_depth,
Expand Down

0 comments on commit 806671e

Please sign in to comment.