From 7d2f1944b92a9e1a7a413499f6d30c9b7a32e01a Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Wed, 15 Jan 2025 10:06:24 -0800 Subject: [PATCH] fix queueing --- dfir_rs/src/scheduled/context.rs | 4 +- dfir_rs/src/scheduled/graph.rs | 16 +++--- dfir_rs/src/util/indexed_queue.rs | 90 +++++++++++++++++++++++++++++++ dfir_rs/src/util/mod.rs | 1 + precheck.bash | 2 +- 5 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 dfir_rs/src/util/indexed_queue.rs diff --git a/dfir_rs/src/scheduled/context.rs b/dfir_rs/src/scheduled/context.rs index 83299ff0f96..58fac9d8b02 100644 --- a/dfir_rs/src/scheduled/context.rs +++ b/dfir_rs/src/scheduled/context.rs @@ -3,7 +3,6 @@ //! Provides APIs for state and scheduling. use std::any::Any; -use std::collections::BinaryHeap; use std::future::Future; use std::marker::PhantomData; use std::ops::DerefMut; @@ -16,6 +15,7 @@ use web_time::SystemTime; use super::state::StateHandle; use super::{StateId, SubgraphId}; use crate::scheduled::ticks::TickInstant; +use crate::util::indexed_queue::IndexedQueue; /// The main state and scheduler of the Hydroflow instance. Provided as the `context` API to each /// subgraph/operator as it is run. @@ -29,7 +29,7 @@ pub struct Context { /// TODO(mingwei): separate scheduler into its own struct/trait? /// Index is stratum, value is FIFO queue for that stratum. /// PriorityQueue, usize is depth. Larger/deeper is higher priority. - pub(super) stratum_queues: Vec>, + pub(super) stratum_queues: Vec>, /// Receive events, if second arg indicates if it is an external "important" event (true). pub(super) event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>, /// If external events or data can justify starting the next tick. diff --git a/dfir_rs/src/scheduled/graph.rs b/dfir_rs/src/scheduled/graph.rs index 6fe4ce365c3..f9270a546d7 100644 --- a/dfir_rs/src/scheduled/graph.rs +++ b/dfir_rs/src/scheduled/graph.rs @@ -300,10 +300,10 @@ impl<'a> Dfir<'a> { .is_none_or(|last_tick| last_tick < self.context.current_tick); self.context.is_first_loop_iteration = self.context.is_first_run_this_tick || (sg_data.last_loop_counter < loop_counter); - sg_data.last_loop_counter = loop_counter; sg_data.subgraph.run(&mut self.context, &mut self.handoffs); sg_data.last_tick_run_in = Some(current_tick); + sg_data.last_loop_counter = loop_counter; } let sg_data = &self.subgraphs[sg_id]; @@ -319,7 +319,7 @@ impl<'a> Dfir<'a> { // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) { self.context.stratum_queues[succ_sg_data.stratum] - .push((succ_sg_data.loop_depth, succ_id)); + .push(succ_sg_data.loop_depth, succ_id); } } } @@ -472,7 +472,7 @@ impl<'a> Dfir<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id)); + self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id); enqueued_count += 1; } if is_external { @@ -510,7 +510,7 @@ impl<'a> Dfir<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id)); + self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id); count += 1; } if is_external { @@ -555,7 +555,7 @@ impl<'a> Dfir<'a> { "Event received." ); if !sg_data.is_scheduled.replace(true) { - self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id)); + self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id); count += 1; } if is_external { @@ -586,7 +586,7 @@ impl<'a> Dfir<'a> { let sg_data = &self.subgraphs[sg_id]; let already_scheduled = sg_data.is_scheduled.replace(true); if !already_scheduled { - self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id)); + self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id); true } else { false @@ -656,7 +656,7 @@ impl<'a> Dfir<'a> { ) }); self.context.init_stratum(stratum); - self.context.stratum_queues[stratum].push((loop_depth, sg_id)); + self.context.stratum_queues[stratum].push(loop_depth, sg_id); sg_id } @@ -752,7 +752,7 @@ impl<'a> Dfir<'a> { }); self.context.init_stratum(stratum); - self.context.stratum_queues[stratum].push((0, sg_id)); + self.context.stratum_queues[stratum].push(0, sg_id); sg_id } diff --git a/dfir_rs/src/util/indexed_queue.rs b/dfir_rs/src/util/indexed_queue.rs new file mode 100644 index 00000000000..3536a75bb57 --- /dev/null +++ b/dfir_rs/src/util/indexed_queue.rs @@ -0,0 +1,90 @@ +//! A priority queue where the priorities are indices (small non-negative integers). + +use std::collections::VecDeque; + +/// A priority queue where the priorities are indices (small non-negative integers). +/// +/// Larger indices are higher priority. +pub struct IndexedQueue { + /// Invariant: the last `VecDeque` is non-empty. + queues: Vec>, +} +impl Default for IndexedQueue { + fn default() -> Self { + Self { queues: Vec::new() } + } +} +impl IndexedQueue { + /// Creates a new `IndexedQueue`. + pub fn new() -> Self { + Self::default() + } + + /// Pushes an element to the queue with the given index. + pub fn push(&mut self, index: usize, element: T) { + if index >= self.queues.len() { + self.queues.resize_with(index + 1, Default::default); + } + self.queues[index].push_back(element); + } + + /// Pops an element from the queue with the largest index. + pub fn pop(&mut self) -> Option<(usize, T)> { + let item = self.queues.last_mut()?.pop_front().unwrap(); + let index = self.queues.len() - 1; + // Remove trailing empty queues to maintain the invariant. + while self.queues.last().is_some_and(VecDeque::is_empty) { + self.queues.pop(); + } + Some((index, item)) + } + + /// Returns the number of elements in the `IndexedQueue`. + pub fn len(&self) -> usize { + self.queues.iter().map(VecDeque::len).sum() + } + + /// Returns `true` if the `IndexedQueue` is empty. + pub fn is_empty(&self) -> bool { + self.queues.is_empty() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_indexed_queue() { + let mut queue = IndexedQueue::new(); + assert_eq!(queue.len(), 0); + assert!(queue.is_empty()); + + queue.push(0, 0); + queue.push(1, 1); + queue.push(0, 2); + queue.push(5, 3); + queue.push(1, 4); + + assert_eq!(queue.len(), 5); + assert!(!queue.is_empty()); + + assert_eq!(queue.pop(), Some((5, 3))); + + assert_eq!(queue.len(), 4); + assert!(!queue.is_empty()); + + assert_eq!(queue.pop(), Some((1, 1))); + assert_eq!(queue.pop(), Some((1, 4))); + assert_eq!(queue.pop(), Some((0, 0))); + + assert_eq!(queue.len(), 1); + assert!(!queue.is_empty()); + + assert_eq!(queue.pop(), Some((0, 2))); + assert_eq!(queue.pop(), None); + + assert_eq!(queue.len(), 0); + assert!(queue.is_empty()); + } +} diff --git a/dfir_rs/src/util/mod.rs b/dfir_rs/src/util/mod.rs index 0c6d61ed29e..6425c66c3a8 100644 --- a/dfir_rs/src/util/mod.rs +++ b/dfir_rs/src/util/mod.rs @@ -4,6 +4,7 @@ pub mod clear; #[cfg(feature = "dfir_macro")] pub mod demux_enum; +pub mod indexed_queue; pub mod monotonic_map; pub mod multiset; pub mod slot_vec; diff --git a/precheck.bash b/precheck.bash index f10f4d57395..fa4f85f9c4f 100755 --- a/precheck.bash +++ b/precheck.bash @@ -16,7 +16,7 @@ cargo clippy --all-targets --features python -- -D warnings [ "$FULL" = false ] || cargo check --all-targets --no-default-features INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast --workspace --exclude 'hydro_*' --features python -[ "$FULL" = false ] || INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast -p 'hydro_*' --features python +[ "$FULL" = false ] || INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast -p 'hydro_*' cargo test --doc [ "$FULL" = false ] || RUSTDOCFLAGS="-Dwarnings" cargo doc --no-deps