diff --git a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs index e9d7a9bbe9d71a..bb36052e5c4b50 100644 --- a/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs +++ b/core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs @@ -51,15 +51,15 @@ pub(crate) trait ReceiveAndBuffer { type Transaction: TransactionWithMeta + Send + Sync; type Container: StateContainer + Send + Sync; - /// Returns false only if no packets were received - /// AND the receiver is disconnected. + /// Return Err if the receiver is disconnected AND no packets were + /// received. Otherwise return Ok(num_received). fn receive_and_buffer_packets( &mut self, container: &mut Self::Container, timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool; + ) -> Result; } pub(crate) struct SanitizedTransactionReceiveAndBuffer { @@ -81,7 +81,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool { + ) -> Result { let remaining_queue_capacity = container.remaining_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); @@ -111,7 +111,7 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us); }); - match received_packet_results { + let num_received = match received_packet_results { Ok(receive_packet_results) => { let num_received_packets = receive_packet_results.deserialized_packets.len(); @@ -137,12 +137,13 @@ impl ReceiveAndBuffer for SanitizedTransactionReceiveAndBuffer { ); }); } + num_received_packets } - Err(RecvTimeoutError::Timeout) => {} - Err(RecvTimeoutError::Disconnected) => return false, - } + Err(RecvTimeoutError::Timeout) => 0, + Err(RecvTimeoutError::Disconnected) => return Err(()), + }; - true + Ok(num_received) } } @@ -309,7 +310,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { timing_metrics: &mut SchedulerTimingMetrics, count_metrics: &mut SchedulerCountMetrics, decision: &BufferedPacketsDecision, - ) -> bool { + ) -> Result { let (root_bank, working_bank) = { let bank_forks = self.bank_forks.read().unwrap(); let root_bank = bank_forks.root_bank(); @@ -320,6 +321,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { // Receive packet batches. const TIMEOUT: Duration = Duration::from_millis(10); let start = Instant::now(); + let mut num_received = 0; let mut received_message = false; // If not leader/unknown, do a blocking-receive initially. This lets @@ -338,7 +340,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { match self.receiver.recv_timeout(TIMEOUT) { Ok(packet_batch_message) => { received_message = true; - self.handle_packet_batch_message( + num_received += self.handle_packet_batch_message( container, timing_metrics, count_metrics, @@ -348,9 +350,9 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { packet_batch_message, ); } - Err(RecvTimeoutError::Timeout) => return true, + Err(RecvTimeoutError::Timeout) => return Ok(num_received), Err(RecvTimeoutError::Disconnected) => { - return received_message; + return received_message.then_some(num_received).ok_or(()); } } } @@ -359,7 +361,7 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { match self.receiver.try_recv() { Ok(packet_batch_message) => { received_message = true; - self.handle_packet_batch_message( + num_received += self.handle_packet_batch_message( container, timing_metrics, count_metrics, @@ -369,18 +371,19 @@ impl ReceiveAndBuffer for TransactionViewReceiveAndBuffer { packet_batch_message, ); } - Err(TryRecvError::Empty) => return true, + Err(TryRecvError::Empty) => return Ok(num_received), Err(TryRecvError::Disconnected) => { - return received_message; + return received_message.then_some(num_received).ok_or(()); } } } - true + Ok(num_received) } } impl TransactionViewReceiveAndBuffer { + /// Return number of received packets. fn handle_packet_batch_message( &mut self, container: &mut TransactionViewStateContainer, @@ -390,10 +393,10 @@ impl TransactionViewReceiveAndBuffer { root_bank: &Bank, working_bank: &Bank, packet_batch_message: BankingPacketBatch, - ) { + ) -> usize { // Do not support forwarding - only add support for this if we really need it. if matches!(decision, BufferedPacketsDecision::Forward) { - return; + return 0; } let start = Instant::now(); @@ -453,6 +456,8 @@ impl TransactionViewReceiveAndBuffer { ); saturating_add_assign!(count_metrics.num_dropped_on_receive, num_dropped_on_receive); }); + + num_received } fn try_handle_packet( diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 71331456d0eb73..054434df882194 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -107,7 +107,7 @@ impl SchedulerController { self.process_transactions(&decision)?; self.receive_completed()?; - if !self.receive_and_buffer_packets(&decision) { + if self.receive_and_buffer_packets(&decision).is_err() { break; } // Report metrics only if there is data. @@ -421,7 +421,10 @@ impl SchedulerController { } /// Returns whether the packet receiver is still connected. - fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + fn receive_and_buffer_packets( + &mut self, + decision: &BufferedPacketsDecision, + ) -> Result { self.receive_and_buffer.receive_and_buffer_packets( &mut self.container, &mut self.timing_metrics, @@ -618,7 +621,16 @@ mod tests { .make_consume_or_forward_decision(); assert!(matches!(decision, BufferedPacketsDecision::Consume(_))); assert!(scheduler_controller.receive_completed().is_ok()); - assert!(scheduler_controller.receive_and_buffer_packets(&decision)); + + // Time is not a reliable way for deterministic testing. + // Loop here until no more packets are received, this avoids parallel + // tests from inconsistently timing out and not receiving + // from the channel. + while scheduler_controller + .receive_and_buffer_packets(&decision) + .map(|n| n > 0) + .unwrap_or_default() + {} assert!(scheduler_controller.process_transactions(&decision).is_ok()); }