From 38aa0fdb9012c232f5d365d93f8adb1791ca596e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 13 Jan 2025 14:00:29 -0600 Subject: [PATCH] counts received gossip packets inside the thread-pool We already traverse packets inside the thread-pool and know the variant, so might as well count packets there instead of a separate traversal. --- gossip/src/cluster_info.rs | 71 +++++++++++++----------------- gossip/src/cluster_info_metrics.rs | 35 ++++++++++++++- 2 files changed, 64 insertions(+), 42 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b2c38340891dc7..9e7642c0165b11 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2272,7 +2272,7 @@ impl ClusterInfo { thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - fn count_packets_received(packets: &PacketBatch, counts: &mut [u64; 7]) { + fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) { for packet in packets { let k = packet .data(..4) @@ -2280,10 +2280,10 @@ impl ClusterInfo { .map(u32::from_le_bytes) .filter(|&k| k < 6) .unwrap_or(/*invalid:*/ 6) as usize; - counts[k] += 1; + dropped_packets_counts[k] += 1; } } - let mut counts = [0u64; 7]; + let mut dropped_packets_counts = [0u64; 7]; let mut num_packets = 0; let mut packets = VecDeque::with_capacity(2); for packet_batch in receiver @@ -2291,7 +2291,6 @@ impl ClusterInfo { .map(std::iter::once)? .chain(receiver.try_iter()) { - count_packets_received(&packet_batch, &mut counts); num_packets += packet_batch.len(); packets.push_back(packet_batch); while num_packets > MAX_GOSSIP_TRAFFIC { @@ -2299,60 +2298,50 @@ impl ClusterInfo { let Some(packet_batch) = packets.pop_front() else { break; }; + count_dropped_packets(&packet_batch, &mut dropped_packets_counts); num_packets -= packet_batch.len(); - self.stats - .gossip_packets_dropped_count - .add_relaxed(packet_batch.len() as u64); } } - fn verify_packet(packet: &Packet) -> Option<(SocketAddr, Protocol)> { - let protocol: Protocol = packet.deserialize_slice(..).ok()?; + let num_packets_dropped = dropped_packets_counts.iter().sum::(); + if num_packets_dropped > 0 { + self.stats + .gossip_packets_dropped_count + .add_relaxed(num_packets_dropped); + self.stats + .packets_received_count + .add_relaxed(num_packets_dropped); + self.stats + .update_received_packets_counts(&dropped_packets_counts); + } + fn verify_packet(packet: &Packet, stats: &GossipStats) -> Option<(SocketAddr, Protocol)> { + let Ok(protocol) = packet.deserialize_slice::(..) else { + stats.packets_received_unknown_count.add_relaxed(1); + return None; + }; + stats.count_received_packet(&protocol); protocol.sanitize().ok()?; - protocol - .par_verify() - .then(|| (packet.meta().socket_addr(), protocol)) + protocol.par_verify().then(|| { + stats.packets_received_verified_count.add_relaxed(1); + (packet.meta().socket_addr(), protocol) + }) } let packets: Vec<_> = { let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); thread_pool.install(|| { if packets.len() == 1 { - packets[0].par_iter().filter_map(verify_packet).collect() + packets[0] + .par_iter() + .filter_map(|packet| verify_packet(packet, &self.stats)) + .collect() } else { packets .par_iter() .flatten() - .filter_map(verify_packet) + .filter_map(|packet| verify_packet(packet, &self.stats)) .collect() } }) }; - self.stats - .packets_received_count - .add_relaxed(counts.iter().sum::()); - self.stats - .packets_received_pull_requests_count - .add_relaxed(counts[0]); - self.stats - .packets_received_pull_responses_count - .add_relaxed(counts[1]); - self.stats - .packets_received_push_messages_count - .add_relaxed(counts[2]); - self.stats - .packets_received_prune_messages_count - .add_relaxed(counts[3]); - self.stats - .packets_received_ping_messages_count - .add_relaxed(counts[4]); - self.stats - .packets_received_pong_messages_count - .add_relaxed(counts[5]); - self.stats - .packets_received_unknown_count - .add_relaxed(counts[6]); - self.stats - .packets_received_verified_count - .add_relaxed(packets.len() as u64); Ok(sender.send(packets)?) } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 19b5f1c5790b3a..3280f75c0398b7 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -1,5 +1,5 @@ use { - crate::crds_gossip::CrdsGossip, + crate::{crds_gossip::CrdsGossip, protocol::Protocol}, itertools::Itertools, solana_measure::measure::Measure, solana_sdk::{clock::Slot, pubkey::Pubkey}, @@ -177,6 +177,39 @@ pub struct GossipStats { pub(crate) window_request_loopback: Counter, } +impl GossipStats { + pub(crate) fn count_received_packet(&self, protocol: &Protocol) { + self.packets_received_count.add_relaxed(1); + match protocol { + Protocol::PullRequest(..) => &self.packets_received_pull_requests_count, + Protocol::PullResponse(..) => &self.packets_received_pull_responses_count, + Protocol::PushMessage(..) => &self.packets_received_push_messages_count, + Protocol::PruneMessage(..) => &self.packets_received_prune_messages_count, + Protocol::PingMessage(_) => &self.packets_received_ping_messages_count, + Protocol::PongMessage(_) => &self.packets_received_pong_messages_count, + } + .add_relaxed(1) + } + + pub(crate) fn update_received_packets_counts(&self, counts: &[u64; 7]) { + self.packets_received_count + .add_relaxed(counts.iter().sum::()); + self.packets_received_pull_requests_count + .add_relaxed(counts[0]); + self.packets_received_pull_responses_count + .add_relaxed(counts[1]); + self.packets_received_push_messages_count + .add_relaxed(counts[2]); + self.packets_received_prune_messages_count + .add_relaxed(counts[3]); + self.packets_received_ping_messages_count + .add_relaxed(counts[4]); + self.packets_received_pong_messages_count + .add_relaxed(counts[5]); + self.packets_received_unknown_count.add_relaxed(counts[6]); + } +} + pub(crate) fn submit_gossip_stats( stats: &GossipStats, gossip: &CrdsGossip,