diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_and_comments@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_and_comments@datalog_program.snap index 970f27ea57a..d9acccbb460 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_and_comments@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_and_comments@datalog_program.snap @@ -108,6 +108,7 @@ fn main() { var_expr!(), var_expr!(hoff_1v3_send, hoff_9v3_send, hoff_23v1_send), false, + None, move | context, var_args!(), @@ -371,6 +372,7 @@ fn main() { var_expr!(hoff_9v3_recv, hoff_25v1_recv), var_expr!(), false, + None, move |context, var_args!(hoff_9v3_recv, hoff_25v1_recv), var_args!()| { let mut hoff_9v3_recv = hoff_9v3_recv.borrow_mut_swap(); let hoff_9v3_recv = hoff_9v3_recv.drain(..); @@ -688,6 +690,7 @@ fn main() { var_expr!(hoff_23v1_recv), var_expr!(), false, + None, move |context, var_args!(hoff_23v1_recv), var_args!()| { let mut hoff_23v1_recv = hoff_23v1_recv.borrow_mut_swap(); let hoff_23v1_recv = hoff_23v1_recv.drain(..); @@ -905,6 +908,7 @@ fn main() { var_expr!(hoff_1v3_recv), var_expr!(hoff_7v3_send), false, + None, move |context, var_args!(hoff_1v3_recv), var_args!(hoff_7v3_send)| { let mut hoff_1v3_recv = hoff_1v3_recv.borrow_mut_swap(); let hoff_1v3_recv = hoff_1v3_recv.drain(..); @@ -1008,6 +1012,7 @@ fn main() { var_expr!(hoff_7v3_recv), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(hoff_7v3_recv), var_args!(hoff_6v3_send)| { let mut hoff_7v3_recv = hoff_7v3_recv.borrow_mut_swap(); let hoff_7v3_recv = hoff_7v3_recv.drain(..); @@ -1071,6 +1076,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(hoff_25v1_send), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!(hoff_25v1_send)| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_fold_keyed_expr@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_fold_keyed_expr@datalog_program.snap index 115f312fb08..b0b95d37df0 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_fold_keyed_expr@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__aggregations_fold_keyed_expr@datalog_program.snap @@ -62,6 +62,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -212,6 +213,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!()| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__anti_join@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__anti_join@datalog_program.snap index e7d451dc5db..35fb2b5937f 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__anti_join@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__anti_join@datalog_program.snap @@ -152,6 +152,7 @@ fn main() { var_expr!(), var_expr!(hoff_12v3_send), false, + None, move |context, var_args!(), var_args!(hoff_12v3_send)| { let hoff_12v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -302,6 +303,7 @@ fn main() { var_expr!(), var_expr!(hoff_9v3_send), false, + None, move |context, var_args!(), var_args!(hoff_9v3_send)| { let hoff_9v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -366,6 +368,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -430,6 +433,7 @@ fn main() { var_expr!(hoff_6v3_recv, hoff_9v3_recv, hoff_12v3_recv), var_expr!(), false, + None, move | context, var_args!(hoff_6v3_recv, hoff_9v3_recv, hoff_12v3_recv), diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__collect_vec@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__collect_vec@datalog_program.snap index 2746532b102..8e0eaccbe1c 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__collect_vec@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__collect_vec@datalog_program.snap @@ -106,6 +106,7 @@ fn main() { var_expr!(), var_expr!(hoff_9v3_send), false, + None, move |context, var_args!(), var_args!(hoff_9v3_send)| { let hoff_9v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -536,6 +537,7 @@ fn main() { var_expr!(hoff_9v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_9v3_recv), var_args!()| { let mut hoff_9v3_recv = hoff_9v3_recv.borrow_mut_swap(); let hoff_9v3_recv = hoff_9v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple@datalog_program.snap index 0559b25104f..812cfd43aef 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple_then_flat@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple_then_flat@datalog_program.snap index 77652992c93..14397f40880 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple_then_flat@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__detuple_then_flat@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_lhs@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_lhs@datalog_program.snap index ac8f39e24ca..2e893339de7 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_lhs@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_lhs@datalog_program.snap @@ -84,6 +84,7 @@ fn main() { hoff_23v1_send, hoff_24v1_send ), false, + None, move | context, var_args!(), @@ -278,6 +279,7 @@ fn main() { ), var_expr!(), false, + None, move | context, var_args!( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_predicate@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_predicate@datalog_program.snap index 332537cff23..630b00a7d94 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_predicate@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__expr_predicate@datalog_program.snap @@ -71,6 +71,7 @@ fn main() { var_expr!(), var_expr!(hoff_1v3_send, hoff_6v3_send, hoff_21v1_send, hoff_22v1_send), false, + None, move | context, var_args!(), @@ -378,6 +379,7 @@ fn main() { var_expr!(hoff_1v3_recv, hoff_6v3_recv, hoff_21v1_recv, hoff_22v1_recv), var_expr!(), false, + None, move | context, var_args!( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flat_then_detuple@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flat_then_detuple@datalog_program.snap index 24271d1ac77..9a6e8a96b15 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flat_then_detuple@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flat_then_detuple@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flatten@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flatten@datalog_program.snap index bdff7ff96c9..63d77237853 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flatten@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__flatten@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__index@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__index@datalog_program.snap index e08e994e7fc..1c86612b714 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__index@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__index@datalog_program.snap @@ -206,6 +206,7 @@ fn main() { var_expr!(), var_expr!(hoff_16v3_send, hoff_21v3_send), false, + None, move |context, var_args!(), var_args!(hoff_16v3_send, hoff_21v3_send)| { let hoff_16v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -619,6 +620,7 @@ fn main() { var_expr!(hoff_12v3_recv), var_expr!(hoff_15v3_send), false, + None, move |context, var_args!(hoff_12v3_recv), var_args!(hoff_15v3_send)| { let mut hoff_12v3_recv = hoff_12v3_recv.borrow_mut_swap(); let hoff_12v3_recv = hoff_12v3_recv.drain(..); @@ -684,6 +686,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(hoff_9v3_send), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!(hoff_9v3_send)| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); @@ -749,6 +752,7 @@ fn main() { var_expr!(hoff_21v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_21v3_recv), var_args!()| { let mut hoff_21v3_recv = hoff_21v3_recv.borrow_mut_swap(); let hoff_21v3_recv = hoff_21v3_recv.drain(..); @@ -1026,6 +1030,7 @@ fn main() { var_expr!(hoff_9v3_recv, hoff_16v3_recv), var_expr!(hoff_6v3_send, hoff_10v3_send, hoff_13v3_send), false, + None, move | context, var_args!(hoff_9v3_recv, hoff_16v3_recv), @@ -1589,6 +1594,7 @@ fn main() { var_expr!(hoff_13v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_13v3_recv), var_args!()| { let mut hoff_13v3_recv = hoff_13v3_recv.borrow_mut_swap(); let hoff_13v3_recv = hoff_13v3_recv.drain(..); @@ -1882,6 +1888,7 @@ fn main() { var_expr!(hoff_10v3_recv, hoff_15v3_recv), var_expr!(hoff_12v3_send), false, + None, move | context, var_args!(hoff_10v3_recv, hoff_15v3_recv), diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_other@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_other@datalog_program.snap index 07146036595..b057674ca42 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_other@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_other@datalog_program.snap @@ -95,6 +95,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_10v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_self@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_self@datalog_program.snap index 5c117735069..153cf9ad756 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_self@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__join_with_self@datalog_program.snap @@ -81,6 +81,7 @@ fn main() { var_expr!(), var_expr!(hoff_4v3_send, hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_4v3_send, hoff_6v3_send)| { let hoff_4v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -233,6 +234,7 @@ fn main() { var_expr!(hoff_4v3_recv, hoff_6v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_4v3_recv, hoff_6v3_recv), var_args!()| { let mut hoff_4v3_recv = hoff_4v3_recv.borrow_mut_swap(); let hoff_4v3_recv = hoff_4v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program-2.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program-2.snap index c83997d98b2..d3d4c0bbe98 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program-2.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program-2.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program.snap index f4074971246..eabc44fd250 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__local_constraints@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max@datalog_program.snap index 94e324f4db2..4ea3050aa18 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max@datalog_program.snap @@ -62,6 +62,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -212,6 +213,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!()| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max_all@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max_all@datalog_program.snap index 291d1b48c3a..6e77c9d5674 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max_all@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__max_all@datalog_program.snap @@ -65,6 +65,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -215,6 +216,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!()| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__minimal_program@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__minimal_program@datalog_program.snap index 5c00c8626c5..22414ac34a6 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__minimal_program@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__minimal_program@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multi_detuple@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multi_detuple@datalog_program.snap index 6d91a3b8570..6510cc3d863 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multi_detuple@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multi_detuple@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multiple_contributors@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multiple_contributors@datalog_program.snap index 91a476436ba..b88f05e8589 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multiple_contributors@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__multiple_contributors@datalog_program.snap @@ -75,6 +75,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_10v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__non_copy_but_clone@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__non_copy_but_clone@datalog_program.snap index d2cb16f0aab..dc911a853ca 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__non_copy_but_clone@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__non_copy_but_clone@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist@datalog_program.snap index 5dbfaafe8bc..84b2f65769b 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist@datalog_program.snap @@ -307,6 +307,7 @@ fn main() { var_expr!(), var_expr!(hoff_14v3_send), false, + None, move |context, var_args!(), var_args!(hoff_14v3_send)| { let hoff_14v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -371,6 +372,7 @@ fn main() { var_expr!(), var_expr!(hoff_11v3_send), false, + None, move |context, var_args!(), var_args!(hoff_11v3_send)| { let hoff_11v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -435,6 +437,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -499,6 +502,7 @@ fn main() { var_expr!(hoff_25v3_recv), var_expr!(hoff_28v3_send), false, + None, move |context, var_args!(hoff_25v3_recv), var_args!(hoff_28v3_send)| { let mut hoff_25v3_recv = hoff_25v3_recv.borrow_mut_swap(); let hoff_25v3_recv = hoff_25v3_recv.drain(..); @@ -564,6 +568,7 @@ fn main() { var_expr!(hoff_19v3_recv), var_expr!(hoff_22v3_send), false, + None, move |context, var_args!(hoff_19v3_recv), var_args!(hoff_22v3_send)| { let mut hoff_19v3_recv = hoff_19v3_recv.borrow_mut_swap(); let hoff_19v3_recv = hoff_19v3_recv.drain(..); @@ -629,6 +634,7 @@ fn main() { var_expr!(hoff_13v3_recv), var_expr!(hoff_16v3_send), false, + None, move |context, var_args!(hoff_13v3_recv), var_args!(hoff_16v3_send)| { let mut hoff_13v3_recv = hoff_13v3_recv.borrow_mut_swap(); let hoff_13v3_recv = hoff_13v3_recv.drain(..); @@ -694,6 +700,7 @@ fn main() { var_expr!(hoff_6v3_recv, hoff_26v3_recv, hoff_29v3_recv), var_expr!(), false, + None, move | context, var_args!(hoff_6v3_recv, hoff_26v3_recv, hoff_29v3_recv), @@ -1310,6 +1317,7 @@ fn main() { var_expr!(hoff_20v3_recv, hoff_23v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_20v3_recv, hoff_23v3_recv), var_args!()| { let mut hoff_20v3_recv = hoff_20v3_recv.borrow_mut_swap(); let hoff_20v3_recv = hoff_20v3_recv.drain(..); @@ -1682,6 +1690,7 @@ fn main() { hoff_17v3_send, hoff_23v3_send, hoff_25v3_send, hoff_29v3_send ), false, + None, move | context, var_args!(hoff_14v3_recv, hoff_28v3_recv), @@ -2261,6 +2270,7 @@ fn main() { var_expr!(hoff_16v3_recv, hoff_17v3_recv), var_expr!(hoff_13v3_send), false, + None, move | context, var_args!(hoff_16v3_recv, hoff_17v3_recv), @@ -2700,6 +2710,7 @@ fn main() { var_expr!(hoff_11v3_recv, hoff_22v3_recv), var_expr!(hoff_19v3_send, hoff_20v3_send, hoff_26v3_send), false, + None, move | context, var_args!(hoff_11v3_recv, hoff_22v3_recv), diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist_uniqueness@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist_uniqueness@datalog_program.snap index d8f3ed23268..a5821502c6b 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist_uniqueness@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__persist_uniqueness@datalog_program.snap @@ -99,6 +99,7 @@ fn main() { var_expr!(), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_6v3_send)| { let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -163,6 +164,7 @@ fn main() { var_expr!(hoff_8v3_recv), var_expr!(hoff_11v3_send), false, + None, move |context, var_args!(hoff_8v3_recv), var_args!(hoff_11v3_send)| { let mut hoff_8v3_recv = hoff_8v3_recv.borrow_mut_swap(); let hoff_8v3_recv = hoff_8v3_recv.drain(..); @@ -228,6 +230,7 @@ fn main() { var_expr!(hoff_9v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_9v3_recv), var_args!()| { let mut hoff_9v3_recv = hoff_9v3_recv.borrow_mut_swap(); let hoff_9v3_recv = hoff_9v3_recv.drain(..); @@ -475,6 +478,7 @@ fn main() { var_expr!(hoff_6v3_recv, hoff_11v3_recv), var_expr!(hoff_8v3_send, hoff_9v3_send), false, + None, move | context, var_args!(hoff_6v3_recv, hoff_11v3_recv), diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__send_to_node@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__send_to_node@datalog_program.snap index bc5d98cf1a2..78ebb7384cd 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__send_to_node@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__send_to_node@datalog_program.snap @@ -75,6 +75,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_12v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( @@ -216,6 +217,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__simple_filter@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__simple_filter@datalog_program.snap index 2a3e9b6c7df..b584189de49 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__simple_filter@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__simple_filter@datalog_program.snap @@ -51,6 +51,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_7v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__single_column_program@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__single_column_program@datalog_program.snap index fd468dae3c7..b1c082d89e6 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__single_column_program@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__single_column_program@datalog_program.snap @@ -95,6 +95,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_10v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__transitive_closure@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__transitive_closure@datalog_program.snap index 865c463c36f..bbcdc121f47 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__transitive_closure@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__transitive_closure@datalog_program.snap @@ -100,6 +100,7 @@ fn main() { var_expr!(hoff_6v3_recv), var_expr!(hoff_6v3_send), false, + None, move |context, var_args!(hoff_6v3_recv), var_args!(hoff_6v3_send)| { let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap(); let hoff_6v3_recv = hoff_6v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__triple_relation_join@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__triple_relation_join@datalog_program.snap index 2c940efb462..6e8ef6d44de 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__triple_relation_join@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__triple_relation_join@datalog_program.snap @@ -139,6 +139,7 @@ fn main() { var_expr!(), var_expr!(), false, + None, move |context, var_args!(), var_args!()| { let op_13v1 = std::iter::from_fn(|| { match hydroflow::futures::stream::Stream::poll_next( diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_fields@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_fields@datalog_program.snap index 52e622554c2..33efc204bb9 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_fields@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_fields@datalog_program.snap @@ -81,6 +81,7 @@ fn main() { var_expr!(), var_expr!(hoff_4v3_send, hoff_6v3_send), false, + None, move |context, var_args!(), var_args!(hoff_4v3_send, hoff_6v3_send)| { let hoff_4v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -233,6 +234,7 @@ fn main() { var_expr!(hoff_4v3_recv, hoff_6v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_4v3_recv, hoff_6v3_recv), var_args!()| { let mut hoff_4v3_recv = hoff_4v3_recv.borrow_mut_swap(); let hoff_4v3_recv = hoff_4v3_recv.drain(..); diff --git a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_join_count@datalog_program.snap b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_join_count@datalog_program.snap index 59672f4d953..a2c8caaa762 100644 --- a/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_join_count@datalog_program.snap +++ b/dfir_datalog_core/src/snapshots/dfir_datalog_core__tests__wildcard_join_count@datalog_program.snap @@ -166,6 +166,7 @@ fn main() { var_expr!(), var_expr!(hoff_7v3_send, hoff_12v3_send), false, + None, move |context, var_args!(), var_args!(hoff_7v3_send, hoff_12v3_send)| { let hoff_7v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -318,6 +319,7 @@ fn main() { var_expr!(), var_expr!(hoff_4v3_send, hoff_9v3_send), false, + None, move |context, var_args!(), var_args!(hoff_4v3_send, hoff_9v3_send)| { let hoff_4v3_send = hydroflow::pusherator::for_each::ForEach::new(| v| @@ -470,6 +472,7 @@ fn main() { var_expr!(hoff_10v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_10v3_recv), var_args!()| { let mut hoff_10v3_recv = hoff_10v3_recv.borrow_mut_swap(); let hoff_10v3_recv = hoff_10v3_recv.drain(..); @@ -687,6 +690,7 @@ fn main() { var_expr!(hoff_1v3_recv), var_expr!(), false, + None, move |context, var_args!(hoff_1v3_recv), var_args!()| { let mut hoff_1v3_recv = hoff_1v3_recv.borrow_mut_swap(); let hoff_1v3_recv = hoff_1v3_recv.drain(..); @@ -918,6 +922,7 @@ fn main() { var_expr!(hoff_9v3_recv, hoff_12v3_recv), var_expr!(hoff_10v3_send), false, + None, move | context, var_args!(hoff_9v3_recv, hoff_12v3_recv), @@ -1176,6 +1181,7 @@ fn main() { var_expr!(hoff_4v3_recv, hoff_7v3_recv), var_expr!(hoff_1v3_send), false, + None, move | context, var_args!(hoff_4v3_recv, hoff_7v3_recv), diff --git a/dfir_lang/src/graph/hydroflow_graph.rs b/dfir_lang/src/graph/hydroflow_graph.rs index c955f40e470..f61ab3e6a9c 100644 --- a/dfir_lang/src/graph/hydroflow_graph.rs +++ b/dfir_lang/src/graph/hydroflow_graph.rs @@ -1168,6 +1168,7 @@ impl DfirGraph { var_expr!( #( #recv_ports ),* ), var_expr!( #( #send_ports ),* ), #laziness, + None, // `LoopId` move |#context, var_args!( #( #recv_ports ),* ), var_args!( #( #send_ports ),* )| { #( #recv_port_code )* #( #send_port_code )* diff --git a/dfir_rs/src/scheduled/graph.rs b/dfir_rs/src/scheduled/graph.rs index 12d8a789be2..2152710bbb6 100644 --- a/dfir_rs/src/scheduled/graph.rs +++ b/dfir_rs/src/scheduled/graph.rs @@ -21,17 +21,18 @@ use super::port::{RecvCtx, RecvPort, SendCtx, SendPort, RECV, SEND}; use super::reactor::Reactor; use super::state::StateHandle; use super::subgraph::Subgraph; -use super::{HandoffId, HandoffTag, SubgraphId, SubgraphTag}; +use super::{HandoffId, HandoffTag, LoopId, LoopTag, SubgraphId, SubgraphTag}; use crate::scheduled::ticks::{TickDuration, TickInstant}; -use crate::util::slot_vec::{SecondarySlotVec, SlotVec}; +use crate::util::slot_vec::SlotVec; use crate::Never; /// A DFIR graph. Owns, schedules, and runs the compiled subgraphs. #[derive(Default)] pub struct Dfir<'a> { pub(super) subgraphs: SlotVec>, - pub(super) subgraph_loop: SecondarySlotVec, pub(super) context: Context, + // Map from `LoopId` to parent `LoopId` (or `None` for top-level). + pub(super) loop_parent: SlotVec>, handoffs: SlotVec, @@ -587,12 +588,13 @@ impl<'a> Dfir<'a> { W: 'static + PortList, F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>), { - self.add_subgraph_stratified(name, 0, recv_ports, send_ports, false, subgraph) + self.add_subgraph_stratified(name, 0, recv_ports, send_ports, false, None, subgraph) } /// Adds a new compiled subgraph with the specified inputs, outputs, and stratum number. /// /// TODO(mingwei): add example in doc. + #[expect(clippy::too_many_arguments, reason = "TODO(mingwei)")] pub fn add_subgraph_stratified( &mut self, name: Name, @@ -600,6 +602,7 @@ impl<'a> Dfir<'a> { recv_ports: R, send_ports: W, laziness: bool, + loop_id: Option, mut subgraph: F, ) -> SubgraphId where @@ -627,6 +630,7 @@ impl<'a> Dfir<'a> { subgraph_succs, true, laziness, + loop_id, ) }); self.context.init_stratum(stratum); @@ -720,6 +724,7 @@ impl<'a> Dfir<'a> { subgraph_succs, true, false, + None, ) }); @@ -782,6 +787,12 @@ impl<'a> Dfir<'a> { self.context.subgraph_id = sg_id; &mut self.context } + + /// Adds a new loop with the given parent (or `None` for top-level). Returns a loop ID which + /// is used in [`Self::add_subgraph_stratified`] or for nested loops. + pub fn add_loop(&mut self, parent: Option) -> LoopId { + self.loop_parent.insert(parent) + } } impl Dfir<'_> { @@ -893,9 +904,14 @@ pub(super) struct SubgraphData<'a> { /// If this subgraph is marked as lazy, then sending data back to a lower stratum does not trigger a new tick to be run. is_lazy: bool, + + /// The subgraph's loop ID, or `None` for the top level. + #[expect(dead_code, reason = "TODO(mingwei): WIP")] + loop_id: Option, } impl<'a> SubgraphData<'a> { - pub fn new( + #[expect(clippy::too_many_arguments, reason = "internal use")] + fn new( name: Cow<'static, str>, stratum: usize, subgraph: impl Subgraph + 'a, @@ -903,6 +919,7 @@ impl<'a> SubgraphData<'a> { succs: Vec, is_scheduled: bool, laziness: bool, + loop_id: Option, ) -> Self { Self { name, @@ -910,6 +927,7 @@ impl<'a> SubgraphData<'a> { subgraph: Box::new(subgraph), preds, succs, + loop_id, is_scheduled: Cell::new(is_scheduled), last_tick_run_in: None, is_lazy: laziness, diff --git a/dfir_rs/src/scheduled/mod.rs b/dfir_rs/src/scheduled/mod.rs index 0bf8003ea71..0c109b18fe7 100644 --- a/dfir_rs/src/scheduled/mod.rs +++ b/dfir_rs/src/scheduled/mod.rs @@ -36,3 +36,8 @@ pub type HandoffId = Key; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] pub struct StateId(pub(crate) usize); + +/// Tag for [`LoopId`]. +pub enum LoopTag {} +/// A loop's ID. +pub type LoopId = Key; diff --git a/precheck.bash b/precheck.bash index 4c16cd43958..f10f4d57395 100755 --- a/precheck.bash +++ b/precheck.bash @@ -15,7 +15,8 @@ cargo +nightly fmt --all cargo clippy --all-targets --features python -- -D warnings [ "$FULL" = false ] || cargo check --all-targets --no-default-features -INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast --features python +INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast --workspace --exclude 'hydro_*' --features python +[ "$FULL" = false ] || INSTA_FORCE_PASS=1 INSTA_UPDATE=always TRYBUILD=overwrite cargo test --all-targets --no-fail-fast -p 'hydro_*' --features python cargo test --doc [ "$FULL" = false ] || RUSTDOCFLAGS="-Dwarnings" cargo doc --no-deps