Skip to content

Commit

Permalink
unpack config only in process_transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Jan 26, 2025
1 parent 0992aca commit 8afccb2
Showing 1 changed file with 34 additions and 86 deletions.
120 changes: 34 additions & 86 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ impl SendTransactionService {
receiver,
client.clone(),
retry_transactions.clone(),
stats_report.clone(),
config.clone(),
stats_report.clone(),
exit.clone(),
);

Expand All @@ -406,13 +406,13 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
client: ConnectionCacheClient<T>,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
Config {
batch_send_rate_ms,
batch_size,
retry_pool_max_size,
..
}: Config,
stats_report: Arc<SendTransactionServiceStatsReport>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_batch_sent = Instant::now();
Expand All @@ -422,9 +422,8 @@ impl SendTransactionService {
Builder::new()
.name("solStxReceive".to_string())
.spawn(move || loop {
let recv_timeout_ms = batch_send_rate_ms;
let stats = &stats_report.stats;
let recv_result = receiver.recv_timeout(Duration::from_millis(recv_timeout_ms));
let recv_result = receiver.recv_timeout(Duration::from_millis(batch_send_rate_ms));
if exit.load(Ordering::Relaxed) {
break;
}
Expand Down Expand Up @@ -509,21 +508,15 @@ impl SendTransactionService {
bank_forks: Arc<RwLock<BankForks>>,
client: ConnectionCacheClient<T>,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
Config {
retry_rate_ms,
service_max_retries,
default_max_retries,
batch_size,
..
}: Config,
config: Config,
stats_report: Arc<SendTransactionServiceStatsReport>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
debug!("Starting send-transaction-service::retry_thread.");
Builder::new()
.name("solStxRetry".to_string())
.spawn(move || loop {
let retry_interval_ms = retry_rate_ms;
let retry_interval_ms = config.retry_rate_ms;
let stats = &stats_report.stats;
sleep(Duration::from_millis(
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
Expand All @@ -546,10 +539,7 @@ impl SendTransactionService {
&root_bank,
&mut transactions,
&client,
retry_rate_ms,
service_max_retries,
default_max_retries,
batch_size,
&config,
stats,
);
stats_report.report();
Expand All @@ -564,16 +554,19 @@ impl SendTransactionService {
root_bank: &Bank,
transactions: &mut HashMap<Signature, TransactionInfo>,
client: &ConnectionCacheClient<T>,
retry_rate_ms: u64,
service_max_retries: usize,
default_max_retries: Option<usize>,
batch_size: usize,
Config {
retry_rate_ms,
service_max_retries,
default_max_retries,
batch_size,
..
}: &Config,
stats: &SendTransactionServiceStats,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();

let mut batched_transactions = HashSet::new();
let retry_rate = Duration::from_millis(retry_rate_ms);
let retry_rate = Duration::from_millis(*retry_rate_ms);

transactions.retain(|signature, transaction_info| {
if transaction_info.durable_nonce_info.is_some() {
Expand Down Expand Up @@ -611,8 +604,8 @@ impl SendTransactionService {

let max_retries = transaction_info
.max_retries
.or(default_max_retries)
.map(|max_retries| max_retries.min(service_max_retries));
.or(*default_max_retries)
.map(|max_retries| max_retries.min(*service_max_retries));

if let Some(max_retries) = max_retries {
if transaction_info.retries >= max_retries {
Expand Down Expand Up @@ -669,7 +662,7 @@ impl SendTransactionService {
.filter(|(signature, _)| batched_transactions.contains(signature))
.map(|(_, transaction_info)| transaction_info.wire_transaction.clone());

let iter = wire_transactions.chunks(batch_size);
let iter = wire_transactions.chunks(*batch_size);
for chunk in &iter {
let chunk = chunk.collect();
client.send_transactions_in_batch(chunk, stats);
Expand Down Expand Up @@ -962,16 +955,13 @@ mod test {
),
);

let client = create_client(config.tpu_peers, config.leader_forward_count);
let client = create_client(config.tpu_peers.clone(), config.leader_forward_count);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1000,10 +990,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1032,10 +1019,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1064,10 +1048,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
Expand Down Expand Up @@ -1098,10 +1079,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
Expand Down Expand Up @@ -1142,10 +1120,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
Expand All @@ -1162,10 +1137,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1253,16 +1225,13 @@ mod test {
),
);
let stats = SendTransactionServiceStats::default();
let client = create_client(config.tpu_peers, config.leader_forward_count);
let client = create_client(config.tpu_peers.clone(), config.leader_forward_count);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1290,10 +1259,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1323,10 +1289,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1354,10 +1317,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1386,10 +1346,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert!(transactions.is_empty());
Expand Down Expand Up @@ -1418,10 +1375,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
Expand Down Expand Up @@ -1452,10 +1406,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 1);
Expand Down Expand Up @@ -1483,10 +1434,7 @@ mod test {
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&config,
&stats,
);
assert_eq!(transactions.len(), 0);
Expand Down

0 comments on commit 8afccb2

Please sign in to comment.