Skip to content

Commit

Permalink
cleans up window-service metrics (#4800)
Browse files Browse the repository at this point in the history
prune_shreds_elapsed_us and num_shreds_pruned_invalid_repair are
leftovers of: #4641
and are no longer measuring anything.

Separately, num_repairs can be atomically updated inside the thread-pool
during the first iteration, and does not need an additional iteration
over the packets.
  • Loading branch information
behzadnouri authored Feb 5, 2025
1 parent 76cea65 commit 3257ffa
Showing 1 changed file with 6 additions and 23 deletions.
29 changes: 6 additions & 23 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use {
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
Expand All @@ -56,12 +56,10 @@ pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
struct WindowServiceMetrics {
run_insert_count: u64,
num_packets: usize,
num_repairs: usize,
num_repairs: AtomicUsize,
num_shreds_received: usize,
handle_packets_elapsed_us: u64,
shred_receiver_elapsed_us: u64,
prune_shreds_elapsed_us: u64,
num_shreds_pruned_invalid_repair: usize,
num_errors: u64,
num_errors_blockstore: u64,
num_errors_cross_beam_recv_timeout: u64,
Expand All @@ -82,23 +80,13 @@ impl WindowServiceMetrics {
),
("run_insert_count", self.run_insert_count as i64, i64),
("num_packets", self.num_packets, i64),
("num_repairs", self.num_repairs, i64),
("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
("num_shreds_received", self.num_shreds_received, i64),
(
"shred_receiver_elapsed_us",
self.shred_receiver_elapsed_us as i64,
i64
),
(
"prune_shreds_elapsed_us",
self.prune_shreds_elapsed_us as i64,
i64
),
(
"num_shreds_pruned_invalid_repair",
self.num_shreds_pruned_invalid_repair,
i64
),
("num_errors", self.num_errors, i64),
("num_errors_blockstore", self.num_errors_blockstore, i64),
("num_errors_other", self.num_errors_other, i64),
Expand Down Expand Up @@ -246,6 +234,9 @@ where
return None;
}
let repair = packet.meta().repair();
if repair {
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
}
if accept_repairs_only && !repair {
return None;
}
Expand All @@ -263,19 +254,11 @@ where
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_repairs += shreds.iter().filter(|&&(_, repair)| repair).count();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
*ws_metrics.addrs.entry(addr).or_default() += 1;
}

let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
prune_shreds_elapsed.stop();
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();

let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
Some(leader_schedule_cache),
Expand Down

0 comments on commit 3257ffa

Please sign in to comment.