Skip to content

Commit

Permalink
counts received gossip packets inside the thread-pool
Browse files Browse the repository at this point in the history
We already traverse packets inside the thread-pool and know the variant,
so might as well count packets there instead of a separate traversal.
  • Loading branch information
behzadnouri committed Jan 13, 2025
1 parent 60612b8 commit 38aa0fd
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 42 deletions.
71 changes: 30 additions & 41 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2272,87 +2272,76 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
fn count_packets_received(packets: &PacketBatch, counts: &mut [u64; 7]) {
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) {
for packet in packets {
let k = packet
.data(..4)
.and_then(|data| <[u8; 4]>::try_from(data).ok())
.map(u32::from_le_bytes)
.filter(|&k| k < 6)
.unwrap_or(/*invalid:*/ 6) as usize;
counts[k] += 1;
dropped_packets_counts[k] += 1;
}
}
let mut counts = [0u64; 7];
let mut dropped_packets_counts = [0u64; 7];
let mut num_packets = 0;
let mut packets = VecDeque::with_capacity(2);
for packet_batch in receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(receiver.try_iter())
{
count_packets_received(&packet_batch, &mut counts);
num_packets += packet_batch.len();
packets.push_back(packet_batch);
while num_packets > MAX_GOSSIP_TRAFFIC {
// Discard older packets in favor of more recent ones.
let Some(packet_batch) = packets.pop_front() else {
break;
};
count_dropped_packets(&packet_batch, &mut dropped_packets_counts);
num_packets -= packet_batch.len();
self.stats
.gossip_packets_dropped_count
.add_relaxed(packet_batch.len() as u64);
}
}
fn verify_packet(packet: &Packet) -> Option<(SocketAddr, Protocol)> {
let protocol: Protocol = packet.deserialize_slice(..).ok()?;
let num_packets_dropped = dropped_packets_counts.iter().sum::<u64>();
if num_packets_dropped > 0 {
self.stats
.gossip_packets_dropped_count
.add_relaxed(num_packets_dropped);
self.stats
.packets_received_count
.add_relaxed(num_packets_dropped);
self.stats
.update_received_packets_counts(&dropped_packets_counts);
}
fn verify_packet(packet: &Packet, stats: &GossipStats) -> Option<(SocketAddr, Protocol)> {
let Ok(protocol) = packet.deserialize_slice::<Protocol, _>(..) else {
stats.packets_received_unknown_count.add_relaxed(1);
return None;
};
stats.count_received_packet(&protocol);
protocol.sanitize().ok()?;
protocol
.par_verify()
.then(|| (packet.meta().socket_addr(), protocol))
protocol.par_verify().then(|| {
stats.packets_received_verified_count.add_relaxed(1);
(packet.meta().socket_addr(), protocol)
})
}
let packets: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
thread_pool.install(|| {
if packets.len() == 1 {
packets[0].par_iter().filter_map(verify_packet).collect()
packets[0]
.par_iter()
.filter_map(|packet| verify_packet(packet, &self.stats))
.collect()
} else {
packets
.par_iter()
.flatten()
.filter_map(verify_packet)
.filter_map(|packet| verify_packet(packet, &self.stats))
.collect()
}
})
};
self.stats
.packets_received_count
.add_relaxed(counts.iter().sum::<u64>());
self.stats
.packets_received_pull_requests_count
.add_relaxed(counts[0]);
self.stats
.packets_received_pull_responses_count
.add_relaxed(counts[1]);
self.stats
.packets_received_push_messages_count
.add_relaxed(counts[2]);
self.stats
.packets_received_prune_messages_count
.add_relaxed(counts[3]);
self.stats
.packets_received_ping_messages_count
.add_relaxed(counts[4]);
self.stats
.packets_received_pong_messages_count
.add_relaxed(counts[5]);
self.stats
.packets_received_unknown_count
.add_relaxed(counts[6]);
self.stats
.packets_received_verified_count
.add_relaxed(packets.len() as u64);
Ok(sender.send(packets)?)
}

Expand Down
35 changes: 34 additions & 1 deletion gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::crds_gossip::CrdsGossip,
crate::{crds_gossip::CrdsGossip, protocol::Protocol},
itertools::Itertools,
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, pubkey::Pubkey},
Expand Down Expand Up @@ -177,6 +177,39 @@ pub struct GossipStats {
pub(crate) window_request_loopback: Counter,
}

impl GossipStats {
pub(crate) fn count_received_packet(&self, protocol: &Protocol) {
self.packets_received_count.add_relaxed(1);
match protocol {
Protocol::PullRequest(..) => &self.packets_received_pull_requests_count,
Protocol::PullResponse(..) => &self.packets_received_pull_responses_count,
Protocol::PushMessage(..) => &self.packets_received_push_messages_count,
Protocol::PruneMessage(..) => &self.packets_received_prune_messages_count,
Protocol::PingMessage(_) => &self.packets_received_ping_messages_count,
Protocol::PongMessage(_) => &self.packets_received_pong_messages_count,
}
.add_relaxed(1)
}

pub(crate) fn update_received_packets_counts(&self, counts: &[u64; 7]) {
self.packets_received_count
.add_relaxed(counts.iter().sum::<u64>());
self.packets_received_pull_requests_count
.add_relaxed(counts[0]);
self.packets_received_pull_responses_count
.add_relaxed(counts[1]);
self.packets_received_push_messages_count
.add_relaxed(counts[2]);
self.packets_received_prune_messages_count
.add_relaxed(counts[3]);
self.packets_received_ping_messages_count
.add_relaxed(counts[4]);
self.packets_received_pong_messages_count
.add_relaxed(counts[5]);
self.packets_received_unknown_count.add_relaxed(counts[6]);
}
}

pub(crate) fn submit_gossip_stats(
stats: &GossipStats,
gossip: &CrdsGossip,
Expand Down

0 comments on commit 38aa0fd

Please sign in to comment.