Skip to content

Commit

Permalink
refactor(dfir_rs): use SlotMap-esque for Subgraph & Handoff datastr…
Browse files Browse the repository at this point in the history
…ucture (#1637)

Previously we used wrapper types around usize indexes, but any accesses
into the `Vec` required unwrapping the usize value. This adds a proper
wrapper class called `SlotVec` which has an API based on
[`SlotMap`](https://docs.rs/slotmap/latest/slotmap/struct.SlotMap.html)
with the main difference being that `SlotVec` does not support removals
and therefore doesn't need the (very slight) overhead of generational
indices
  • Loading branch information
MingweiSamuel authored Jan 14, 2025
1 parent 9fdb770 commit efb31e1
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 146 deletions.
3 changes: 2 additions & 1 deletion dfir_rs/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ impl Default for Context {
current_tick_start: SystemTime::now(),
subgraph_last_tick_run_in: None,

subgraph_id: SubgraphId(0),
// Will be re-set before use.
subgraph_id: SubgraphId::from_raw(0),

tasks_to_spawn: Vec::new(),
task_join_handles: Vec::new(),
Expand Down
227 changes: 117 additions & 110 deletions dfir_rs/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ use super::port::{RecvCtx, RecvPort, SendCtx, SendPort, RECV, SEND};
use super::reactor::Reactor;
use super::state::StateHandle;
use super::subgraph::Subgraph;
use super::{HandoffId, SubgraphId};
use super::{HandoffId, HandoffTag, SubgraphId, SubgraphTag};
use crate::scheduled::ticks::{TickDuration, TickInstant};
use crate::util::slot_vec::SlotVec;
use crate::Never;

/// A DFIR graph. Owns, schedules, and runs the compiled subgraphs.
#[derive(Default)]
pub struct Dfir<'a> {
pub(super) subgraphs: Vec<SubgraphData<'a>>,
pub(super) subgraphs: SlotVec<SubgraphTag, SubgraphData<'a>>,
pub(super) context: Context,

handoffs: Vec<HandoffData>,
handoffs: SlotVec<HandoffTag, HandoffData>,

#[cfg(feature = "meta")]
/// See [`Self::meta_graph()`].
Expand All @@ -52,15 +53,32 @@ impl Dfir<'_> {
where
T: Clone,
{
// Handoff ID of new tee output.
let new_hoff_id = HandoffId(self.handoffs.len());

// If we're teeing from a child make sure to find root.
let tee_root = self.handoffs[tee_parent_port.handoff_id.0].pred_handoffs[0];
let tee_root = self.handoffs[tee_parent_port.handoff_id].pred_handoffs[0];

// Set up teeing metadata.
let tee_root_data = &mut self.handoffs[tee_root];
let tee_root_data_name = tee_root_data.name.clone();

// Insert new handoff output.
let teeing_handoff = tee_root_data
.handoff
.any_ref()
.downcast_ref::<TeeingHandoff<T>>()
.unwrap();
let new_handoff = teeing_handoff.tee();

// Handoff ID of new tee output.
let new_hoff_id = self.handoffs.insert_with_key(|new_hoff_id| {
let new_name = Cow::Owned(format!("{} tee {:?}", tee_root_data_name, new_hoff_id));
let mut new_handoff_data = HandoffData::new(new_name, new_handoff, new_hoff_id);
// Set self's predecessor as `tee_root`.
new_handoff_data.pred_handoffs = vec![tee_root];
new_handoff_data
});

// Go to `tee_root`'s successors and insert self (the new tee output).
let tee_root_data = &mut self.handoffs[tee_root.0];
let tee_root_data = &mut self.handoffs[tee_root];
tee_root_data.succ_handoffs.push(new_hoff_id);

// Add our new handoff id into the subgraph data if the send `tee_root` has already been
Expand All @@ -70,22 +88,9 @@ impl Dfir<'_> {
"Tee send side should only have one sender (or none set yet)."
);
if let Some(&pred_sg_id) = tee_root_data.preds.first() {
self.subgraphs[pred_sg_id.0].succs.push(new_hoff_id);
self.subgraphs[pred_sg_id].succs.push(new_hoff_id);
}

// Insert new handoff output.
let teeing_handoff = tee_root_data
.handoff
.any_ref()
.downcast_ref::<TeeingHandoff<T>>()
.unwrap();
let new_handoff = teeing_handoff.tee();
let new_name = Cow::Owned(format!("{} tee {:?}", tee_root_data.name, new_hoff_id));
let mut new_handoff_data = HandoffData::new(new_name, new_handoff, new_hoff_id);
// Set self's predecessor as `tee_root`.
new_handoff_data.pred_handoffs = vec![tee_root];
self.handoffs.push(new_handoff_data);

let output_port = RecvPort {
handoff_id: new_hoff_id,
_marker: PhantomData,
Expand All @@ -101,7 +106,7 @@ impl Dfir<'_> {
where
T: Clone,
{
let data = &self.handoffs[tee_port.handoff_id.0];
let data = &self.handoffs[tee_port.handoff_id];
let teeing_handoff = data
.handoff
.any_ref()
Expand All @@ -110,7 +115,7 @@ impl Dfir<'_> {
teeing_handoff.drop();

let tee_root = data.pred_handoffs[0];
let tee_root_data = &mut self.handoffs[tee_root.0];
let tee_root_data = &mut self.handoffs[tee_root];
// Remove this output from the send succ handoff list.
tee_root_data
.succ_handoffs
Expand All @@ -121,7 +126,7 @@ impl Dfir<'_> {
"Tee send side should only have one sender (or none set yet)."
);
if let Some(&pred_sg_id) = tee_root_data.preds.first() {
self.subgraphs[pred_sg_id.0]
self.subgraphs[pred_sg_id]
.succs
.retain(|&succ_hoff| succ_hoff != tee_port.handoff_id);
}
Expand Down Expand Up @@ -266,11 +271,11 @@ impl<'a> Dfir<'a> {
{
work_done = true;
{
let sg_data = &mut self.subgraphs[sg_id.0];
let sg_data = &mut self.subgraphs[sg_id];
// This must be true for the subgraph to be enqueued.
assert!(sg_data.is_scheduled.take());
tracing::trace!(
sg_id = sg_id.0,
sg_id = sg_id.to_string(),
sg_name = &*sg_data.name,
"Running subgraph."
);
Expand All @@ -281,12 +286,12 @@ impl<'a> Dfir<'a> {
sg_data.last_tick_run_in = Some(current_tick);
}

let sg_data = &self.subgraphs[sg_id.0];
let sg_data = &self.subgraphs[sg_id];
for &handoff_id in sg_data.succs.iter() {
let handoff = &self.handoffs[handoff_id.0];
let handoff = &self.handoffs[handoff_id];
if !handoff.handoff.is_bottom() {
for &succ_id in handoff.succs.iter() {
let succ_sg_data = &self.subgraphs[succ_id.0];
let succ_sg_data = &self.subgraphs[succ_id];
// If we have sent data to the next tick, then we can start the next tick.
if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy {
self.context.can_start_tick = true;
Expand Down Expand Up @@ -438,9 +443,9 @@ impl<'a> Dfir<'a> {
pub fn try_recv_events(&mut self) -> usize {
let mut enqueued_count = 0;
while let Ok((sg_id, is_external)) = self.context.event_queue_recv.try_recv() {
let sg_data = &self.subgraphs[sg_id.0];
let sg_data = &self.subgraphs[sg_id];
tracing::trace!(
sg_id = sg_id.0,
sg_id = sg_id.to_string(),
is_external = is_external,
sg_stratum = sg_data.stratum,
"Event received."
Expand Down Expand Up @@ -476,9 +481,9 @@ impl<'a> Dfir<'a> {
let mut count = 0;
loop {
let (sg_id, is_external) = self.context.event_queue_recv.blocking_recv()?;
let sg_data = &self.subgraphs[sg_id.0];
let sg_data = &self.subgraphs[sg_id];
tracing::trace!(
sg_id = sg_id.0,
sg_id = sg_id.to_string(),
is_external = is_external,
sg_stratum = sg_data.stratum,
"Event received."
Expand Down Expand Up @@ -521,9 +526,9 @@ impl<'a> Dfir<'a> {
loop {
tracing::trace!("Awaiting events (`event_queue_recv`).");
let (sg_id, is_external) = self.context.event_queue_recv.recv().await?;
let sg_data = &self.subgraphs[sg_id.0];
let sg_data = &self.subgraphs[sg_id];
tracing::trace!(
sg_id = sg_id.0,
sg_id = sg_id.to_string(),
is_external = is_external,
sg_stratum = sg_data.stratum,
"Event received."
Expand Down Expand Up @@ -557,7 +562,7 @@ impl<'a> Dfir<'a> {

/// Schedules a subgraph to be run. See also: [`Context::schedule_subgraph`].
pub fn schedule_subgraph(&mut self, sg_id: SubgraphId) -> bool {
let sg_data = &self.subgraphs[sg_id.0];
let sg_data = &self.subgraphs[sg_id];
let already_scheduled = sg_data.is_scheduled.replace(true);
if !already_scheduled {
self.context.stratum_queues[sg_data.stratum].push_back(sg_id);
Expand Down Expand Up @@ -602,26 +607,27 @@ impl<'a> Dfir<'a> {
W: 'static + PortList<SEND>,
F: 'a + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),
{
let sg_id = SubgraphId(self.subgraphs.len());

let (mut subgraph_preds, mut subgraph_succs) = Default::default();
recv_ports.set_graph_meta(&mut self.handoffs, &mut subgraph_preds, sg_id, true);
send_ports.set_graph_meta(&mut self.handoffs, &mut subgraph_succs, sg_id, false);

let subgraph = move |context: &mut Context, handoffs: &mut Vec<HandoffData>| {
let recv = recv_ports.make_ctx(&*handoffs);
let send = send_ports.make_ctx(&*handoffs);
(subgraph)(context, recv, send);
};
self.subgraphs.push(SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
laziness,
));
let sg_id = self.subgraphs.insert_with_key(|sg_id| {
let (mut subgraph_preds, mut subgraph_succs) = Default::default();
recv_ports.set_graph_meta(&mut self.handoffs, &mut subgraph_preds, sg_id, true);
send_ports.set_graph_meta(&mut self.handoffs, &mut subgraph_succs, sg_id, false);

let subgraph =
move |context: &mut Context, handoffs: &mut SlotVec<HandoffTag, HandoffData>| {
let recv = recv_ports.make_ctx(&*handoffs);
let send = send_ports.make_ctx(&*handoffs);
(subgraph)(context, recv, send);
};
SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
laziness,
)
});
self.context.init_stratum(stratum);
self.context.stratum_queues[stratum].push_back(sg_id);

Expand Down Expand Up @@ -662,58 +668,60 @@ impl<'a> Dfir<'a> {
F: 'static
+ for<'ctx> FnMut(&'ctx mut Context, &'ctx [&'ctx RecvCtx<R>], &'ctx [&'ctx SendCtx<W>]),
{
let sg_id = SubgraphId(self.subgraphs.len());
let sg_id = self.subgraphs.insert_with_key(|sg_id| {
let subgraph_preds = recv_ports.iter().map(|port| port.handoff_id).collect();
let subgraph_succs = send_ports.iter().map(|port| port.handoff_id).collect();

let subgraph_preds = recv_ports.iter().map(|port| port.handoff_id).collect();
let subgraph_succs = send_ports.iter().map(|port| port.handoff_id).collect();
for recv_port in recv_ports.iter() {
self.handoffs[recv_port.handoff_id].succs.push(sg_id);
}
for send_port in send_ports.iter() {
self.handoffs[send_port.handoff_id].preds.push(sg_id);
}

for recv_port in recv_ports.iter() {
self.handoffs[recv_port.handoff_id.0].succs.push(sg_id);
}
for send_port in send_ports.iter() {
self.handoffs[send_port.handoff_id.0].preds.push(sg_id);
}
let subgraph =
move |context: &mut Context, handoffs: &mut SlotVec<HandoffTag, HandoffData>| {
let recvs: Vec<&RecvCtx<R>> = recv_ports
.iter()
.map(|hid| hid.handoff_id)
.map(|hid| handoffs.get(hid).unwrap())
.map(|h_data| {
h_data
.handoff
.any_ref()
.downcast_ref()
.expect("Attempted to cast handoff to wrong type.")
})
.map(RefCast::ref_cast)
.collect();

let sends: Vec<&SendCtx<W>> = send_ports
.iter()
.map(|hid| hid.handoff_id)
.map(|hid| handoffs.get(hid).unwrap())
.map(|h_data| {
h_data
.handoff
.any_ref()
.downcast_ref()
.expect("Attempted to cast handoff to wrong type.")
})
.map(RefCast::ref_cast)
.collect();

(subgraph)(context, &recvs, &sends)
};
SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
false,
)
});

let subgraph = move |context: &mut Context, handoffs: &mut Vec<HandoffData>| {
let recvs: Vec<&RecvCtx<R>> = recv_ports
.iter()
.map(|hid| hid.handoff_id)
.map(|hid| handoffs.get(hid.0).unwrap())
.map(|h_data| {
h_data
.handoff
.any_ref()
.downcast_ref()
.expect("Attempted to cast handoff to wrong type.")
})
.map(RefCast::ref_cast)
.collect();

let sends: Vec<&SendCtx<W>> = send_ports
.iter()
.map(|hid| hid.handoff_id)
.map(|hid| handoffs.get(hid.0).unwrap())
.map(|h_data| {
h_data
.handoff
.any_ref()
.downcast_ref()
.expect("Attempted to cast handoff to wrong type.")
})
.map(RefCast::ref_cast)
.collect();

(subgraph)(context, &recvs, &sends)
};
self.subgraphs.push(SubgraphData::new(
name.into(),
stratum,
subgraph,
subgraph_preds,
subgraph_succs,
true,
false,
));
self.context.init_stratum(stratum);
self.context.stratum_queues[stratum].push_back(sg_id);

Expand All @@ -726,12 +734,11 @@ impl<'a> Dfir<'a> {
Name: Into<Cow<'static, str>>,
H: 'static + Handoff,
{
let handoff_id = HandoffId(self.handoffs.len());

// Create and insert handoff.
let handoff = H::default();
self.handoffs
.push(HandoffData::new(name.into(), handoff, handoff_id));
let handoff_id = self
.handoffs
.insert_with_key(|hoff_id| HandoffData::new(name.into(), handoff, hoff_id));

// Make ports.
let input_port = SendPort {
Expand Down
Loading

0 comments on commit efb31e1

Please sign in to comment.