Skip to content

Commit

Permalink
fix queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Jan 15, 2025
1 parent 01cf2da commit 7d2f194
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 11 deletions.
4 changes: 2 additions & 2 deletions dfir_rs/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//! Provides APIs for state and scheduling.
use std::any::Any;
use std::collections::BinaryHeap;
use std::future::Future;
use std::marker::PhantomData;
use std::ops::DerefMut;
Expand All @@ -16,6 +15,7 @@ use web_time::SystemTime;
use super::state::StateHandle;
use super::{StateId, SubgraphId};
use crate::scheduled::ticks::TickInstant;
use crate::util::indexed_queue::IndexedQueue;

/// The main state and scheduler of the Hydroflow instance. Provided as the `context` API to each
/// subgraph/operator as it is run.
Expand All @@ -29,7 +29,7 @@ pub struct Context {
/// TODO(mingwei): separate scheduler into its own struct/trait?
/// Index is stratum, value is FIFO queue for that stratum.
/// PriorityQueue, usize is depth. Larger/deeper is higher priority.
pub(super) stratum_queues: Vec<BinaryHeap<(usize, SubgraphId)>>,
pub(super) stratum_queues: Vec<IndexedQueue<SubgraphId>>,
/// Receive events, if second arg indicates if it is an external "important" event (true).
pub(super) event_queue_recv: UnboundedReceiver<(SubgraphId, bool)>,
/// If external events or data can justify starting the next tick.
Expand Down
16 changes: 8 additions & 8 deletions dfir_rs/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@ impl<'a> Dfir<'a> {
.is_none_or(|last_tick| last_tick < self.context.current_tick);
self.context.is_first_loop_iteration = self.context.is_first_run_this_tick
|| (sg_data.last_loop_counter < loop_counter);
sg_data.last_loop_counter = loop_counter;

sg_data.subgraph.run(&mut self.context, &mut self.handoffs);
sg_data.last_tick_run_in = Some(current_tick);
sg_data.last_loop_counter = loop_counter;
}

let sg_data = &self.subgraphs[sg_id];
Expand All @@ -319,7 +319,7 @@ impl<'a> Dfir<'a> {
// Add subgraph to stratum queue if it is not already scheduled.
if !succ_sg_data.is_scheduled.replace(true) {
self.context.stratum_queues[succ_sg_data.stratum]
.push((succ_sg_data.loop_depth, succ_id));
.push(succ_sg_data.loop_depth, succ_id);
}
}
}
Expand Down Expand Up @@ -472,7 +472,7 @@ impl<'a> Dfir<'a> {
"Event received."
);
if !sg_data.is_scheduled.replace(true) {
self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id));
self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id);
enqueued_count += 1;
}
if is_external {
Expand Down Expand Up @@ -510,7 +510,7 @@ impl<'a> Dfir<'a> {
"Event received."
);
if !sg_data.is_scheduled.replace(true) {
self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id));
self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id);
count += 1;
}
if is_external {
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<'a> Dfir<'a> {
"Event received."
);
if !sg_data.is_scheduled.replace(true) {
self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id));
self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id);
count += 1;
}
if is_external {
Expand Down Expand Up @@ -586,7 +586,7 @@ impl<'a> Dfir<'a> {
let sg_data = &self.subgraphs[sg_id];
let already_scheduled = sg_data.is_scheduled.replace(true);
if !already_scheduled {
self.context.stratum_queues[sg_data.stratum].push((sg_data.loop_depth, sg_id));
self.context.stratum_queues[sg_data.stratum].push(sg_data.loop_depth, sg_id);
true
} else {
false
Expand Down Expand Up @@ -656,7 +656,7 @@ impl<'a> Dfir<'a> {
)
});
self.context.init_stratum(stratum);
self.context.stratum_queues[stratum].push((loop_depth, sg_id));
self.context.stratum_queues[stratum].push(loop_depth, sg_id);

sg_id
}
Expand Down Expand Up @@ -752,7 +752,7 @@ impl<'a> Dfir<'a> {
});

self.context.init_stratum(stratum);
self.context.stratum_queues[stratum].push((0, sg_id));
self.context.stratum_queues[stratum].push(0, sg_id);

sg_id
}
Expand Down
90 changes: 90 additions & 0 deletions dfir_rs/src/util/indexed_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! A priority queue where the priorities are indices (small non-negative integers).
use std::collections::VecDeque;

/// A priority queue where the priorities are indices (small non-negative integers).
///
/// Larger indices are higher priority.
pub struct IndexedQueue<T> {
/// Invariant: the last `VecDeque` is non-empty.
queues: Vec<VecDeque<T>>,
}
impl<T> Default for IndexedQueue<T> {
fn default() -> Self {
Self { queues: Vec::new() }
}
}
impl<T> IndexedQueue<T> {
/// Creates a new `IndexedQueue`.
pub fn new() -> Self {
Self::default()
}

/// Pushes an element to the queue with the given index.
pub fn push(&mut self, index: usize, element: T) {
if index >= self.queues.len() {
self.queues.resize_with(index + 1, Default::default);
}
self.queues[index].push_back(element);
}

/// Pops an element from the queue with the largest index.
pub fn pop(&mut self) -> Option<(usize, T)> {
let item = self.queues.last_mut()?.pop_front().unwrap();
let index = self.queues.len() - 1;
// Remove trailing empty queues to maintain the invariant.
while self.queues.last().is_some_and(VecDeque::is_empty) {
self.queues.pop();
}
Some((index, item))
}

/// Returns the number of elements in the `IndexedQueue`.
pub fn len(&self) -> usize {
self.queues.iter().map(VecDeque::len).sum()
}

/// Returns `true` if the `IndexedQueue` is empty.
pub fn is_empty(&self) -> bool {
self.queues.is_empty()
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_indexed_queue() {
let mut queue = IndexedQueue::new();
assert_eq!(queue.len(), 0);
assert!(queue.is_empty());

queue.push(0, 0);
queue.push(1, 1);
queue.push(0, 2);
queue.push(5, 3);
queue.push(1, 4);

assert_eq!(queue.len(), 5);
assert!(!queue.is_empty());

assert_eq!(queue.pop(), Some((5, 3)));

assert_eq!(queue.len(), 4);
assert!(!queue.is_empty());

assert_eq!(queue.pop(), Some((1, 1)));
assert_eq!(queue.pop(), Some((1, 4)));
assert_eq!(queue.pop(), Some((0, 0)));

assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());

assert_eq!(queue.pop(), Some((0, 2)));
assert_eq!(queue.pop(), None);

assert_eq!(queue.len(), 0);
assert!(queue.is_empty());
}
}
1 change: 1 addition & 0 deletions dfir_rs/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod clear;
#[cfg(feature = "dfir_macro")]
pub mod demux_enum;
pub mod indexed_queue;
pub mod monotonic_map;
pub mod multiset;
pub mod slot_vec;
Expand Down
2 changes: 1 addition & 1 deletion precheck.bash
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cargo clippy --all-targets --features python -- -D warnings
[ "$FULL" = false ] || cargo check --all-targets --no-default-features

INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast --workspace --exclude 'hydro_*' --features python
[ "$FULL" = false ] || INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast -p 'hydro_*' --features python
[ "$FULL" = false ] || INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast -p 'hydro_*'
cargo test --doc
[ "$FULL" = false ] || RUSTDOCFLAGS="-Dwarnings" cargo doc --no-deps

Expand Down

0 comments on commit 7d2f194

Please sign in to comment.