Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
output txn signature for debug purpose
Browse files Browse the repository at this point in the history
trying txn mask matching

output txn to figure out why txn is not exactly matched

Use 62 and 61 portion

track fetch performance using random txn mask

track sigverify performance using random txn mask

track banking stage performance using random txn mask

adding missing cargo lock file

add debug messages

Revert "add debug messages"

This reverts commit 96aead5.

fixed some clippy issues

check-crate issue

Fix a clippy issue

Fix a clippy issue

debug why txns in banking stage shows fewer performance tracking points

debug why txns in banking stage shows fewer performance tracking points

debug why txns in banking stage shows fewer performance tracking points

debug why txns in banking stage shows fewer performance tracking points

get higher PPS for testing purpose

more debug messages on why txn is skipped

display if tracer packet in log

add debug before calling processing_function

debug at the initial of banking stage

track if a txn is forwarded

dependency order

missing cargo file

clean up debug messages

Do not use TRACER_PACKET, use its own bit

rename some functions

addressed some comments from Trent

Update core/src/banking_stage/immutable_deserialized_packet.rs

Co-authored-by: Trent Nelson <[email protected]>

addressed some comments from Trent

Do not use binary_search, do simple compare in one loop
  • Loading branch information
lijunwangs committed Feb 29, 2024
1 parent c9c2fbb commit a8868ab
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 4 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ members = [
"tokens",
"tpu-client",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
"turbine",
"udp-client",
Expand Down Expand Up @@ -378,6 +379,7 @@ solana-system-program = { path = "programs/system", version = "=1.19.0" }
solana-test-validator = { path = "test-validator", version = "=1.19.0" }
solana-thin-client = { path = "thin-client", version = "=1.19.0" }
solana-tpu-client = { path = "tpu-client", version = "=1.19.0", default-features = false }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.19.0" }
solana-transaction-status = { path = "transaction-status", version = "=1.19.0" }
solana-turbine = { path = "turbine", version = "=1.19.0" }
solana-udp-client = { path = "udp-client", version = "=1.19.0" }
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ solana-send-transaction-service = { workspace = true }
solana-streamer = { workspace = true }
solana-svm = { workspace = true }
solana-tpu-client = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,33 @@ impl Consumer {
.slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);

// Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes
// We assume the retryable_transaction_indexes is already sorted.
let mut retryable_idx = 0;
for (index, packet) in packets_to_process.iter().enumerate() {
if packet.original_packet().meta().is_perf_track_packet() {
if let Some(start_time) = packet.start_time() {
if retryable_idx >= retryable_transaction_indexes.len()
|| retryable_transaction_indexes[retryable_idx] != index
{
let duration = Instant::now().duration_since(*start_time);

debug!(
"Banking stage processing took {duration:?} for transaction {:?}",
packet.transaction().get_signatures().first()
);
inc_new_counter_info!(
"txn-metrics-banking-stage-process-us",
duration.as_micros() as usize
);
} else {
// This packet is retried, advance the retry index to the next, as the next packet's index will
// certainly be > than this.
retryable_idx += 1;
}
}
}
}
Some(retryable_transaction_indexes)
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/banking_stage/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
VersionedTransaction,
},
},
std::{cmp::Ordering, mem::size_of, sync::Arc},
std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant},
thiserror::Error,
};

Expand Down Expand Up @@ -41,10 +41,16 @@ pub struct ImmutableDeserializedPacket {
message_hash: Hash,
is_simple_vote: bool,
compute_budget_details: ComputeBudgetDetails,
banking_stage_start_time: Option<Instant>,
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
let banking_stage_start_time = packet
.meta()
.is_perf_track_packet()
.then_some(Instant::now());

let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand All @@ -67,6 +73,7 @@ impl ImmutableDeserializedPacket {
message_hash,
is_simple_vote,
compute_budget_details,
banking_stage_start_time,
})
}

Expand Down Expand Up @@ -98,6 +105,10 @@ impl ImmutableDeserializedPacket {
self.compute_budget_details.clone()
}

pub fn start_time(&self) -> &Option<Instant> {
&self.banking_stage_start_time
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions.
pub fn build_sanitized_transaction(
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ impl ThreadLocalUnprocessedPackets {
.iter()
.map(|p| (*p).clone())
.collect_vec();

let retryable_packets = if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process, payload)
{
Expand Down
29 changes: 27 additions & 2 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use {
count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches,
},
},
solana_sdk::timing,
solana_sdk::{signature::Signature, timing},
solana_streamer::streamer::{self, StreamerError},
solana_transaction_metrics_tracker::get_signature_from_packet,
std::{
collections::HashMap,
thread::{self, Builder, JoinHandle},
time::Instant,
},
Expand Down Expand Up @@ -296,8 +298,20 @@ impl SigVerifyStage {
verifier: &mut T,
stats: &mut SigVerifierStats,
) -> Result<(), T::SendType> {
let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?;
let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default();

let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?;
// track sigverify start time for interested packets
for batch in &batches {
for packet in batch.iter() {
if packet.meta().is_perf_track_packet() {
let signature = get_signature_from_packet(packet);
if let Ok(signature) = signature {
packet_perf_measure.insert(*signature, Instant::now());
}
}
}
}
let batches_len = batches.len();
debug!(
"@{:?} verifier: verifying: {}",
Expand Down Expand Up @@ -370,6 +384,17 @@ impl SigVerifyStage {
(num_packets as f32 / verify_time.as_s())
);

for (signature, start_time) in packet_perf_measure.drain() {
let duration = Instant::now().duration_since(start_time);
debug!(
"Sigverify took {duration:?} for transaction {:?}",
Signature::from(signature)
);
inc_new_counter_info!(
"txn-metrics-sigverify-packet-verify-us",
duration.as_micros() as usize
);
}
stats
.recv_batches_us_hist
.increment(recv_duration.as_micros() as u64)
Expand Down
16 changes: 16 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
}
}

Expand Down Expand Up @@ -228,6 +230,12 @@ impl Meta {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}

#[inline]
pub fn set_track_performance(&mut self, is_performance_track: bool) {
self.flags
.set(PacketFlags::PERF_TRACK_PACKET, is_performance_track);
}

#[inline]
pub fn set_simple_vote(&mut self, is_simple_vote: bool) {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
Expand Down Expand Up @@ -261,6 +269,11 @@ impl Meta {
self.flags.contains(PacketFlags::TRACER_PACKET)
}

#[inline]
pub fn is_perf_track_packet(&self) -> bool {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
}

#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/transaction/versioned/sanitized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl SanitizedVersionedTransaction {
&self.message
}

pub fn get_signatures(&self) -> &Vec<Signature> {
&self.signatures
}

/// Consumes the SanitizedVersionedTransaction, returning the fields individually.
pub fn destruct(self) -> (Vec<Signature>, SanitizedVersionedMessage) {
(self.signatures, self.message)
Expand Down
1 change: 1 addition & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rustls = { workspace = true, features = ["dangerous_configuration"] }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }
Expand Down
42 changes: 41 additions & 1 deletion streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ use {
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
signature::{Keypair, Signature},
timing,
},
solana_transaction_metrics_tracker::{
get_signature_from_packet, signature_if_should_track_packet,
},
std::{
collections::HashMap,
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
sync::{
Expand Down Expand Up @@ -81,6 +85,7 @@ struct PacketChunk {
struct PacketAccumulator {
pub meta: Meta,
pub chunks: Vec<PacketChunk>,
pub start_time: Instant,
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -628,6 +633,7 @@ async fn packet_batch_sender(
trace!("enter packet_batch_sender");
let mut batch_start_time = Instant::now();
loop {
let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default();
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;

Expand All @@ -647,6 +653,7 @@ async fn packet_batch_sender(
|| (!packet_batch.is_empty() && elapsed >= coalesce)
{
let len = packet_batch.len();
track_streamer_fetch_packet_performance(&packet_batch, &mut packet_perf_measure);
if let Err(e) = packet_sender.send(packet_batch) {
stats
.total_packet_batch_send_err
Expand Down Expand Up @@ -692,6 +699,13 @@ async fn packet_batch_sender(

total_bytes += packet_batch[i].meta().size;

if let Some(signature) =
signature_if_should_track_packet(&packet_batch[i]).unwrap_or(None)
{
packet_perf_measure.insert(*signature, packet_accumulator.start_time);
// we set the PERF_TRACK_PACKET on
packet_batch[i].meta_mut().set_track_performance(true);
}
stats
.total_chunks_processed_by_batcher
.fetch_add(num_chunks, Ordering::Relaxed);
Expand All @@ -700,6 +714,30 @@ async fn packet_batch_sender(
}
}

fn track_streamer_fetch_packet_performance(
packet_batch: &PacketBatch,
packet_perf_measure: &mut HashMap<[u8; 64], Instant>,
) {
for packet in packet_batch.iter() {
if packet.meta().is_perf_track_packet() {
let signature = get_signature_from_packet(packet);
if let Ok(signature) = signature {
if let Some(start_time) = packet_perf_measure.remove(signature) {
let duration = Instant::now().duration_since(start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
inc_new_counter_info!(
"txn-metrics-quic-streamer-packet-fetch-us",
duration.as_micros() as usize
);
}
}
}
}
}

async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
Expand Down Expand Up @@ -854,6 +892,7 @@ async fn handle_chunk(
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
start_time: Instant::now(),
});
}

Expand Down Expand Up @@ -1453,6 +1492,7 @@ pub mod test {
offset,
end_of_chunk: size,
}],
start_time: Instant::now(),
};
ptk_sender.send(packet_accum).await.unwrap();
}
Expand Down
Loading

0 comments on commit a8868ab

Please sign in to comment.