diff --git a/core/benches/forwarder.rs b/core/benches/forwarder.rs index 10a050f3d97d4b..74365f85e7d607 100644 --- a/core/benches/forwarder.rs +++ b/core/benches/forwarder.rs @@ -3,15 +3,12 @@ extern crate test; use { itertools::Itertools, solana_client::connection_cache::ConnectionCache, - solana_core::{ - banking_stage::{ - forwarder::Forwarder, - leader_slot_metrics::LeaderSlotMetricsTracker, - unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, - unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, - BankingStageStats, - }, - tracer_packet_stats::TracerPacketStats, + solana_core::banking_stage::{ + forwarder::Forwarder, + leader_slot_metrics::LeaderSlotMetricsTracker, + unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, + unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, + BankingStageStats, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ @@ -38,7 +35,6 @@ struct BenchSetup { unprocessed_packet_batches: UnprocessedTransactionStorage, tracker: LeaderSlotMetricsTracker, stats: BankingStageStats, - tracer_stats: TracerPacketStats, } fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup { @@ -118,7 +114,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup { unprocessed_packet_batches, tracker: LeaderSlotMetricsTracker::new(0), stats: BankingStageStats::default(), - tracer_stats: TracerPacketStats::new(0), } } @@ -132,19 +127,12 @@ fn bench_forwarder_handle_forwading_contentious_transaction(bencher: &mut Benche mut unprocessed_packet_batches, mut tracker, stats, - mut tracer_stats, } = setup(num_packets, true); // hold packets so they can be reused for benching let hold = true; bencher.iter(|| { - forwarder.handle_forwarding( - &mut unprocessed_packet_batches, - hold, - &mut tracker, - &stats, - &mut tracer_stats, - ); + forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats); // reset packet.forwarded flag to reuse `unprocessed_packet_batches` if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) = &mut unprocessed_packet_batches @@ -169,19 +157,12 @@ fn bench_forwarder_handle_forwading_parallel_transactions(bencher: &mut Bencher) mut unprocessed_packet_batches, mut tracker, stats, - mut tracer_stats, } = setup(num_packets, false); // hold packets so they can be reused for benching let hold = true; bencher.iter(|| { - forwarder.handle_forwarding( - &mut unprocessed_packet_batches, - hold, - &mut tracker, - &stats, - &mut tracer_stats, - ); + forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats); // reset packet.forwarded flag to reuse `unprocessed_packet_batches` if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) = &mut unprocessed_packet_batches diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 43b88cb26cae8c..2bc7148743657c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -25,7 +25,6 @@ use { }, }, banking_trace::BankingPacketReceiver, - tracer_packet_stats::TracerPacketStats, validator::BlockProductionMethod, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -319,8 +318,6 @@ pub enum ForwardOption { #[derive(Debug, Default)] pub struct FilterForwardingResults { pub(crate) total_forwardable_packets: usize, - pub(crate) total_tracer_packets_in_buffer: usize, - pub(crate) total_forwardable_tracer_packets: usize, pub(crate) total_dropped_packets: usize, pub(crate) total_packet_conversion_us: u64, pub(crate) total_filter_packets_us: u64, @@ -686,7 +683,6 @@ impl BankingStage { unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - tracer_packet_stats: &mut TracerPacketStats, ) { if unprocessed_transaction_storage.should_not_process() { return; @@ -722,7 +718,6 @@ impl BankingStage { false, slot_metrics_tracker, banking_stage_stats, - tracer_packet_stats, )); slot_metrics_tracker.increment_forward_us(forward_us); // Take metrics action after forwarding packets to include forwarded @@ -735,7 +730,6 @@ impl BankingStage { true, slot_metrics_tracker, banking_stage_stats, - tracer_packet_stats, )); slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us); // Take metrics action after forwarding packets @@ -754,7 +748,6 @@ impl BankingStage { mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { let mut banking_stage_stats = BankingStageStats::new(id); - let mut tracer_packet_stats = TracerPacketStats::new(id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); let mut last_metrics_update = Instant::now(); @@ -770,19 +763,15 @@ impl BankingStage { &mut unprocessed_transaction_storage, &banking_stage_stats, &mut slot_metrics_tracker, - &mut tracer_packet_stats, )); slot_metrics_tracker .increment_process_buffered_packets_us(process_buffered_packets_us); last_metrics_update = Instant::now(); } - tracer_packet_stats.report(1000); - match packet_receiver.receive_and_buffer_packets( &mut unprocessed_transaction_storage, &mut banking_stage_stats, - &mut tracer_packet_stats, &mut slot_metrics_tracker, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index de4a5d913b6f25..0a3cfd35b2a9e5 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -10,7 +10,6 @@ use { immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo, }, next_leader::{next_leader, next_leader_tpu_vote}, - tracer_packet_stats::TracerPacketStats, }, solana_client::connection_cache::ConnectionCache, solana_connection_cache::client_connection::ClientConnection as TpuConnection, @@ -96,7 +95,6 @@ impl Forwarder { hold: bool, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, banking_stage_stats: &BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, ) { let forward_option = unprocessed_transaction_storage.forward_option(); @@ -139,19 +137,13 @@ impl Forwarder { slot_metrics_tracker.increment_forwardable_batches_count(1); let batched_forwardable_packets_count = forward_batch.len(); - let (_forward_result, successful_forwarded_packets_count, leader_pubkey) = self + let (_forward_result, successful_forwarded_packets_count, _leader_pubkey) = self .forward_buffered_packets( &forward_option, forward_batch.get_forwardable_packets(), banking_stage_stats, ); - if let Some(leader_pubkey) = leader_pubkey { - tracer_packet_stats.increment_total_forwardable_tracer_packets( - filter_forwarding_result.total_forwardable_tracer_packets, - leader_pubkey, - ); - } let failed_forwarded_packets_count = batched_forwardable_packets_count .saturating_sub(successful_forwarded_packets_count); @@ -174,9 +166,6 @@ impl Forwarder { slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( filter_forwarding_result.total_forwardable_packets as u64, ); - tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( - filter_forwarding_result.total_tracer_packets_in_buffer, - ); unprocessed_transaction_storage.clear_forwarded_packets(); } } @@ -485,7 +474,6 @@ mod tests { true, &mut LeaderSlotMetricsTracker::new(0), &stats, - &mut TracerPacketStats::new(0), ); let recv_socket = &local_node.sockets.tpu_forwards_quic[0]; @@ -584,7 +572,6 @@ mod tests { hold, &mut LeaderSlotMetricsTracker::new(0), &stats, - &mut TracerPacketStats::new(0), ); let recv_socket = &local_node.sockets.tpu_forwards_quic[0]; diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 4cbe7b8686ce95..e95b20c3df4f1e 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -6,7 +6,7 @@ use { unprocessed_transaction_storage::UnprocessedTransactionStorage, BankingStageStats, }, - crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats}, + crate::banking_trace::BankingPacketReceiver, crossbeam_channel::RecvTimeoutError, solana_measure::{measure::Measure, measure_us}, solana_sdk::{saturating_add_assign, timing::timestamp}, @@ -31,7 +31,6 @@ impl PacketReceiver { &mut self, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let (result, recv_time_us) = measure_us!({ @@ -53,7 +52,6 @@ impl PacketReceiver { receive_packet_results, unprocessed_transaction_storage, banking_stage_stats, - tracer_packet_stats, slot_metrics_tracker, ); recv_and_buffer_measure.stop(); @@ -97,7 +95,6 @@ impl PacketReceiver { }: ReceivePacketResults, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let packet_count = deserialized_packets.len(); @@ -116,7 +113,6 @@ impl PacketReceiver { &mut newly_buffered_forwarded_packets_count, banking_stage_stats, slot_metrics_tracker, - tracer_packet_stats, ); banking_stage_stats @@ -141,7 +137,6 @@ impl PacketReceiver { newly_buffered_forwarded_packets_count: &mut usize, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - tracer_packet_stats: &mut TracerPacketStats, ) { if !deserialized_packets.is_empty() { let _ = banking_stage_stats @@ -164,9 +159,6 @@ impl PacketReceiver { *dropped_packets_count, insert_packet_batches_summary.total_dropped_packets() ); - tracer_packet_stats.increment_total_exceeded_banking_stage_buffer( - insert_packet_batches_summary.dropped_tracer_packets(), - ); } } } diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index 3c4e0f66664dd2..493025e9cd635e 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -57,7 +57,6 @@ impl Ord for DeserializedPacket { #[derive(Debug)] pub struct PacketBatchInsertionMetrics { pub(crate) num_dropped_packets: usize, - pub(crate) num_dropped_tracer_packets: usize, } /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store @@ -103,23 +102,13 @@ impl UnprocessedPacketBatches { deserialized_packets: impl Iterator, ) -> PacketBatchInsertionMetrics { let mut num_dropped_packets = 0; - let mut num_dropped_tracer_packets = 0; for deserialized_packet in deserialized_packets { - if let Some(dropped_packet) = self.push(deserialized_packet) { + if let Some(_dropped_packet) = self.push(deserialized_packet) { num_dropped_packets += 1; - if dropped_packet - .immutable_section() - .original_packet() - .meta() - .is_tracer_packet() - { - num_dropped_tracer_packets += 1; - } } } PacketBatchInsertionMetrics { num_dropped_packets, - num_dropped_tracer_packets, } } diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index e86780002ea694..012185b53124d1 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -93,13 +93,6 @@ impl InsertPacketBatchSummary { _ => 0, } } - - pub fn dropped_tracer_packets(&self) -> usize { - match self { - Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_tracer_packets, - _ => 0, - } - } } impl From for InsertPacketBatchSummary { @@ -750,8 +743,6 @@ impl ThreadLocalUnprocessedPackets { FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, total_dropped_packets, total_packet_conversion_us, total_filter_packets_us, @@ -1134,16 +1125,12 @@ mod tests { let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank.clone(), &mut forward_packet_batches_by_accounts, ); assert_eq!(total_forwardable_packets, 256); - assert_eq!(total_tracer_packets_in_buffer, 256); - assert_eq!(total_forwardable_tracer_packets, 256); // packets in a batch are forwarded in arbitrary order; verify the ports match after // sorting @@ -1172,8 +1159,6 @@ mod tests { ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank.clone(), @@ -1183,11 +1168,6 @@ mod tests { total_forwardable_packets, packets.len() - num_already_forwarded ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_forwarded - ); } // some packets are invalid (already processed) @@ -1206,8 +1186,6 @@ mod tests { ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); let FilterForwardingResults { total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, .. } = transaction_storage.filter_forwardable_packets_and_add_batches( current_bank, @@ -1217,11 +1195,6 @@ mod tests { total_forwardable_packets, packets.len() - num_already_processed ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_processed - ); } } diff --git a/core/src/lib.rs b/core/src/lib.rs index a7639993871fcb..c40eebdc09c4db 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,7 +39,6 @@ pub mod stats_reporter_service; pub mod system_monitor_service; pub mod tpu; mod tpu_entry_notifier; -pub mod tracer_packet_stats; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; diff --git a/core/src/tracer_packet_stats.rs b/core/src/tracer_packet_stats.rs deleted file mode 100644 index 036d95c5a5329c..00000000000000 --- a/core/src/tracer_packet_stats.rs +++ /dev/null @@ -1,164 +0,0 @@ -use { - solana_sdk::{pubkey::Pubkey, saturating_add_assign, timing::timestamp}, - std::collections::HashSet, -}; - -#[derive(Debug, Default)] -pub struct BankingStageTracerPacketStats { - total_exceeded_banking_stage_buffer: usize, - // This is the total number of tracer packets removed from the buffer - // after a leader's set of slots. Of these, only a subset that were in - // the buffer were actually forwardable (didn't arrive on forward port and haven't been - // forwarded before) - total_cleared_from_buffer_after_forward: usize, - total_forwardable_tracer_packets: usize, - forward_target_leaders: HashSet, -} - -#[derive(Debug, Default)] -pub struct ModifiableTracerPacketStats { - banking_stage_tracer_packet_stats: BankingStageTracerPacketStats, -} - -#[derive(Debug, Default)] -pub struct TracerPacketStats { - id: String, - last_report: u64, - modifiable_tracer_packet_stats: Option, -} - -impl TracerPacketStats { - pub fn new(id: u32) -> Self { - Self { - id: id.to_string(), - ..Self::default() - } - } - - fn reset(id: String) -> Self { - Self { - id, - ..Self::default() - } - } - - pub fn get_mutable_stats(&mut self) -> &mut ModifiableTracerPacketStats { - if self.modifiable_tracer_packet_stats.is_none() { - self.modifiable_tracer_packet_stats = Some(ModifiableTracerPacketStats::default()); - } - self.modifiable_tracer_packet_stats.as_mut().unwrap() - } - - pub fn increment_total_exceeded_banking_stage_buffer( - &mut self, - total_exceeded_banking_stage_buffer: usize, - ) { - if total_exceeded_banking_stage_buffer != 0 { - let stats = self.get_mutable_stats(); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_exceeded_banking_stage_buffer, - total_exceeded_banking_stage_buffer - ); - } - } - - pub fn increment_total_cleared_from_buffer_after_forward( - &mut self, - total_cleared_from_buffer_after_forward: usize, - ) { - if total_cleared_from_buffer_after_forward != 0 { - let stats = self.get_mutable_stats(); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_cleared_from_buffer_after_forward, - total_cleared_from_buffer_after_forward - ); - } - } - - pub fn increment_total_forwardable_tracer_packets( - &mut self, - total_forwardable_tracer_packets: usize, - forward_target_leader: Pubkey, - ) { - if total_forwardable_tracer_packets != 0 { - let stats = self.get_mutable_stats(); - stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .insert(forward_target_leader); - saturating_add_assign!( - stats - .banking_stage_tracer_packet_stats - .total_forwardable_tracer_packets, - total_forwardable_tracer_packets - ); - } - } - - pub fn report(&mut self, report_interval_ms: u64) { - let now = timestamp(); - const LEADER_REPORT_LIMIT: usize = 4; - if now.saturating_sub(self.last_report) > report_interval_ms { - // We don't want to report unless we actually saw/forwarded a tracer packet - // to prevent noisy metrics - if let Some(modifiable_tracer_packet_stats) = self.modifiable_tracer_packet_stats.take() - { - datapoint_info!( - "tracer-packet-stats", - "id" => &self.id, - ( - "total_exceeded_banking_stage_buffer", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_exceeded_banking_stage_buffer as i64, - i64 - ), - ( - "total_cleared_from_buffer_after_forward", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_cleared_from_buffer_after_forward as i64, - i64 - ), - ( - "total_forwardable_tracer_packets", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .total_forwardable_tracer_packets as i64, - i64 - ), - ( - "exceeded_expected_forward_leader_count", - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .len() - > LEADER_REPORT_LIMIT, - bool - ), - ( - "forward_target_leaders", - itertools::Itertools::intersperse( - modifiable_tracer_packet_stats - .banking_stage_tracer_packet_stats - .forward_target_leaders - .iter() - .take(LEADER_REPORT_LIMIT) - .map(|leader_pubkey| leader_pubkey.to_string()), - ", ".to_string() - ) - .collect::(), - String - ) - ); - - *self = Self::reset(self.id.clone()); - self.last_report = timestamp(); - } - } - } -}