Skip to content

Commit

Permalink
Remove TracerPacketStats
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Dec 11, 2024
1 parent 9e53a7a commit 33c71ac
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 265 deletions.
35 changes: 8 additions & 27 deletions core/benches/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ extern crate test;
use {
itertools::Itertools,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
tracer_packet_stats::TracerPacketStats,
solana_core::banking_stage::{
forwarder::Forwarder,
leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
BankingStageStats,
},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
Expand All @@ -38,7 +35,6 @@ struct BenchSetup {
unprocessed_packet_batches: UnprocessedTransactionStorage,
tracker: LeaderSlotMetricsTracker,
stats: BankingStageStats,
tracer_stats: TracerPacketStats,
}

fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
Expand Down Expand Up @@ -118,7 +114,6 @@ fn setup(num_packets: usize, contentious_transaction: bool) -> BenchSetup {
unprocessed_packet_batches,
tracker: LeaderSlotMetricsTracker::new(0),
stats: BankingStageStats::default(),
tracer_stats: TracerPacketStats::new(0),
}
}

Expand All @@ -132,19 +127,12 @@ fn bench_forwarder_handle_forwading_contentious_transaction(bencher: &mut Benche
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, true);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand All @@ -169,19 +157,12 @@ fn bench_forwarder_handle_forwading_parallel_transactions(bencher: &mut Bencher)
mut unprocessed_packet_batches,
mut tracker,
stats,
mut tracer_stats,
} = setup(num_packets, false);

// hold packets so they can be reused for benching
let hold = true;
bencher.iter(|| {
forwarder.handle_forwarding(
&mut unprocessed_packet_batches,
hold,
&mut tracker,
&stats,
&mut tracer_stats,
);
forwarder.handle_forwarding(&mut unprocessed_packet_batches, hold, &mut tracker, &stats);
// reset packet.forwarded flag to reuse `unprocessed_packet_batches`
if let UnprocessedTransactionStorage::LocalTransactionStorage(unprocessed_packets) =
&mut unprocessed_packet_batches
Expand Down
11 changes: 0 additions & 11 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use {
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
Expand Down Expand Up @@ -319,8 +318,6 @@ pub enum ForwardOption {
#[derive(Debug, Default)]
pub struct FilterForwardingResults {
pub(crate) total_forwardable_packets: usize,
pub(crate) total_tracer_packets_in_buffer: usize,
pub(crate) total_forwardable_tracer_packets: usize,
pub(crate) total_dropped_packets: usize,
pub(crate) total_packet_conversion_us: u64,
pub(crate) total_filter_packets_us: u64,
Expand Down Expand Up @@ -686,7 +683,6 @@ impl BankingStage {
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand Down Expand Up @@ -722,7 +718,6 @@ impl BankingStage {
false,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_us(forward_us);
// Take metrics action after forwarding packets to include forwarded
Expand All @@ -735,7 +730,6 @@ impl BankingStage {
true,
slot_metrics_tracker,
banking_stage_stats,
tracer_packet_stats,
));
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_us);
// Take metrics action after forwarding packets
Expand All @@ -754,7 +748,6 @@ impl BankingStage {
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
) {
let mut banking_stage_stats = BankingStageStats::new(id);
let mut tracer_packet_stats = TracerPacketStats::new(id);

let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();
Expand All @@ -770,19 +763,15 @@ impl BankingStage {
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&mut tracer_packet_stats,
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
last_metrics_update = Instant::now();
}

tracer_packet_stats.report(1000);

match packet_receiver.receive_and_buffer_packets(
&mut unprocessed_transaction_storage,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Expand Down
15 changes: 1 addition & 14 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use {
immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo,
},
next_leader::{next_leader, next_leader_tpu_vote},
tracer_packet_stats::TracerPacketStats,
},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
Expand Down Expand Up @@ -96,7 +95,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
hold: bool,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
banking_stage_stats: &BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
) {
let forward_option = unprocessed_transaction_storage.forward_option();

Expand Down Expand Up @@ -139,19 +137,13 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_forwardable_batches_count(1);

let batched_forwardable_packets_count = forward_batch.len();
let (_forward_result, successful_forwarded_packets_count, leader_pubkey) = self
let (_forward_result, successful_forwarded_packets_count, _leader_pubkey) = self
.forward_buffered_packets(
&forward_option,
forward_batch.get_forwardable_packets(),
banking_stage_stats,
);

if let Some(leader_pubkey) = leader_pubkey {
tracer_packet_stats.increment_total_forwardable_tracer_packets(
filter_forwarding_result.total_forwardable_tracer_packets,
leader_pubkey,
);
}
let failed_forwarded_packets_count = batched_forwardable_packets_count
.saturating_sub(successful_forwarded_packets_count);

Expand All @@ -174,9 +166,6 @@ impl<T: LikeClusterInfo> Forwarder<T> {
slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count(
filter_forwarding_result.total_forwardable_packets as u64,
);
tracer_packet_stats.increment_total_cleared_from_buffer_after_forward(
filter_forwarding_result.total_tracer_packets_in_buffer,
);
unprocessed_transaction_storage.clear_forwarded_packets();
}
}
Expand Down Expand Up @@ -485,7 +474,6 @@ mod tests {
true,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down Expand Up @@ -584,7 +572,6 @@ mod tests {
hold,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&mut TracerPacketStats::new(0),
);

let recv_socket = &local_node.sockets.tpu_forwards_quic[0];
Expand Down
10 changes: 1 addition & 9 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
unprocessed_transaction_storage::UnprocessedTransactionStorage,
BankingStageStats,
},
crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats},
crate::banking_trace::BankingPacketReceiver,
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_sdk::{saturating_add_assign, timing::timestamp},
Expand All @@ -31,7 +31,6 @@ impl PacketReceiver {
&mut self,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let (result, recv_time_us) = measure_us!({
Expand All @@ -53,7 +52,6 @@ impl PacketReceiver {
receive_packet_results,
unprocessed_transaction_storage,
banking_stage_stats,
tracer_packet_stats,
slot_metrics_tracker,
);
recv_and_buffer_measure.stop();
Expand Down Expand Up @@ -97,7 +95,6 @@ impl PacketReceiver {
}: ReceivePacketResults,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
let packet_count = deserialized_packets.len();
Expand All @@ -116,7 +113,6 @@ impl PacketReceiver {
&mut newly_buffered_forwarded_packets_count,
banking_stage_stats,
slot_metrics_tracker,
tracer_packet_stats,
);

banking_stage_stats
Expand All @@ -141,7 +137,6 @@ impl PacketReceiver {
newly_buffered_forwarded_packets_count: &mut usize,
banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if !deserialized_packets.is_empty() {
let _ = banking_stage_stats
Expand All @@ -164,9 +159,6 @@ impl PacketReceiver {
*dropped_packets_count,
insert_packet_batches_summary.total_dropped_packets()
);
tracer_packet_stats.increment_total_exceeded_banking_stage_buffer(
insert_packet_batches_summary.dropped_tracer_packets(),
);
}
}
}
13 changes: 1 addition & 12 deletions core/src/banking_stage/unprocessed_packet_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl Ord for DeserializedPacket {
#[derive(Debug)]
pub struct PacketBatchInsertionMetrics {
pub(crate) num_dropped_packets: usize,
pub(crate) num_dropped_tracer_packets: usize,
}

/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store
Expand Down Expand Up @@ -103,23 +102,13 @@ impl UnprocessedPacketBatches {
deserialized_packets: impl Iterator<Item = DeserializedPacket>,
) -> PacketBatchInsertionMetrics {
let mut num_dropped_packets = 0;
let mut num_dropped_tracer_packets = 0;
for deserialized_packet in deserialized_packets {
if let Some(dropped_packet) = self.push(deserialized_packet) {
if let Some(_dropped_packet) = self.push(deserialized_packet) {
num_dropped_packets += 1;
if dropped_packet
.immutable_section()
.original_packet()
.meta()
.is_tracer_packet()
{
num_dropped_tracer_packets += 1;
}
}
}
PacketBatchInsertionMetrics {
num_dropped_packets,
num_dropped_tracer_packets,
}
}

Expand Down
27 changes: 0 additions & 27 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ impl InsertPacketBatchSummary {
_ => 0,
}
}

pub fn dropped_tracer_packets(&self) -> usize {
match self {
Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_tracer_packets,
_ => 0,
}
}
}

impl From<VoteBatchInsertionMetrics> for InsertPacketBatchSummary {
Expand Down Expand Up @@ -750,8 +743,6 @@ impl ThreadLocalUnprocessedPackets {

FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
total_dropped_packets,
total_packet_conversion_us,
total_filter_packets_us,
Expand Down Expand Up @@ -1134,16 +1125,12 @@ mod tests {

let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
..
} = transaction_storage.filter_forwardable_packets_and_add_batches(
current_bank.clone(),
&mut forward_packet_batches_by_accounts,
);
assert_eq!(total_forwardable_packets, 256);
assert_eq!(total_tracer_packets_in_buffer, 256);
assert_eq!(total_forwardable_tracer_packets, 256);

// packets in a batch are forwarded in arbitrary order; verify the ports match after
// sorting
Expand Down Expand Up @@ -1172,8 +1159,6 @@ mod tests {
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
..
} = transaction_storage.filter_forwardable_packets_and_add_batches(
current_bank.clone(),
Expand All @@ -1183,11 +1168,6 @@ mod tests {
total_forwardable_packets,
packets.len() - num_already_forwarded
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_forwarded
);
}

// some packets are invalid (already processed)
Expand All @@ -1206,8 +1186,6 @@ mod tests {
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
..
} = transaction_storage.filter_forwardable_packets_and_add_batches(
current_bank,
Expand All @@ -1217,11 +1195,6 @@ mod tests {
total_forwardable_packets,
packets.len() - num_already_processed
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_processed
);
}
}

Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub mod stats_reporter_service;
pub mod system_monitor_service;
pub mod tpu;
mod tpu_entry_notifier;
pub mod tracer_packet_stats;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
pub mod validator;
Expand Down
Loading

0 comments on commit 33c71ac

Please sign in to comment.