diff --git a/prdoc/pr_7102.prdoc b/prdoc/pr_7102.prdoc new file mode 100644 index 000000000000..b1923aafc3db --- /dev/null +++ b/prdoc/pr_7102.prdoc @@ -0,0 +1,8 @@ +title: '`fatxpool`: rotator cache size now depends on pool''s limits' +doc: +- audience: Node Dev + description: |- + This PR modifies the hard-coded size of extrinsics cache within `PoolRotator` to be inline with pool limits. It only applies to fork-aware transaction pool. For the legacy (single-state) transaction pool the logic remains untouched. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs index 5e40b0fb72d6..5ba9dd40c156 100644 --- a/substrate/client/transaction-pool/benches/basics.rs +++ b/substrate/client/transaction-pool/benches/basics.rs @@ -197,14 +197,22 @@ fn benchmark_main(c: &mut Criterion) { c.bench_function("sequential 50 tx", |b| { b.iter(|| { let api = Arc::from(TestApi::new_dependant()); - bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 50, api); + bench_configured( + Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), + 50, + api, + ); }); }); c.bench_function("random 100 tx", |b| { b.iter(|| { let api = Arc::from(TestApi::default()); - bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 100, api); + bench_configured( + Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), + 100, + api, + ); }); }); } diff --git a/substrate/client/transaction-pool/src/common/tests.rs b/substrate/client/transaction-pool/src/common/tests.rs index b00cf5fbfede..7f2cbe24d8ef 100644 --- a/substrate/client/transaction-pool/src/common/tests.rs +++ b/substrate/client/transaction-pool/src/common/tests.rs @@ -222,5 +222,5 @@ pub(crate) fn uxt(transfer: Transfer) -> Extrinsic { pub(crate) fn pool() -> (Pool, Arc) { let api = Arc::new(TestApi::default()); - (Pool::new(Default::default(), true.into(), api.clone()), api) + (Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), api) } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index 7679e3b169d2..d69aa37c94a1 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -329,14 +329,14 @@ where let stream_map = futures::stream::unfold(ctx, |mut ctx| async move { loop { if let Some(dropped) = ctx.get_pending_dropped_transaction() { - debug!("dropped_watcher: sending out (pending): {dropped:?}"); + trace!("dropped_watcher: sending out (pending): {dropped:?}"); return Some((dropped, ctx)); } tokio::select! { biased; Some(event) = next_event(&mut ctx.stream_map) => { if let Some(dropped) = ctx.handle_event(event.0, event.1) { - debug!("dropped_watcher: sending out: {dropped:?}"); + trace!("dropped_watcher: sending out: {dropped:?}"); return Some((dropped, ctx)); } }, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 4ec87f1fefa4..e57256943ccf 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -318,7 +318,7 @@ where pool_api.clone(), listener.clone(), metrics.clone(), - TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count), + TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * options.total_count(), options.ready.total_bytes + options.future.total_bytes, )); diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index ff9cc1541af4..4c0ace0b1c73 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -158,6 +158,13 @@ impl Default for Options { } } +impl Options { + /// Total (ready+future) maximal number of transactions in the pool. + pub fn total_count(&self) -> usize { + self.ready.count + self.future.count + } +} + /// Should we check that the transaction is banned /// in the pool, before we verify it? #[derive(Copy, Clone)] @@ -172,6 +179,21 @@ pub struct Pool { } impl Pool { + /// Create a new transaction pool with statically sized rotator. + pub fn new_with_staticly_sized_rotator( + options: Options, + is_validator: IsValidator, + api: Arc, + ) -> Self { + Self { + validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator( + options, + is_validator, + api, + )), + } + } + /// Create a new transaction pool. pub fn new(options: Options, is_validator: IsValidator, api: Arc) -> Self { Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) } @@ -284,6 +306,7 @@ impl Pool { let mut validated_counter: usize = 0; let mut future_tags = Vec::new(); + let now = Instant::now(); for (extrinsic, in_pool_tags) in all { match in_pool_tags { // reuse the tags for extrinsics that were found in the pool @@ -319,7 +342,7 @@ impl Pool { } } - log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}"); + log::debug!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}, took:{:?}", now.elapsed()); self.prune_tags(at, future_tags, in_pool_hashes).await } @@ -351,6 +374,7 @@ impl Pool { tags: impl IntoIterator, known_imported_hashes: impl IntoIterator> + Clone, ) { + let now = Instant::now(); log::trace!(target: LOG_TARGET, "Pruning at {:?}", at); // Prune all transactions that provide given tags let prune_status = self.validated_pool.prune_tags(tags); @@ -369,9 +393,8 @@ impl Pool { let reverified_transactions = self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await; - let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect(); - - log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len()); + let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::>(); + log::debug!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}, reverification took: {:?}", &at, reverified_transactions.len(), now.elapsed()); log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}"); // And finally - submit reverified transactions back to the pool @@ -580,7 +603,7 @@ mod tests { fn should_reject_unactionable_transactions() { // given let api = Arc::new(TestApi::default()); - let pool = Pool::new( + let pool = Pool::new_with_staticly_sized_rotator( Default::default(), // the node does not author blocks false.into(), @@ -767,7 +790,7 @@ mod tests { let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() }; let api = Arc::new(TestApi::default()); - let pool = Pool::new(options, true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap(); @@ -803,7 +826,7 @@ mod tests { let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() }; let api = Arc::new(TestApi::default()); - let pool = Pool::new(options, true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); // when block_on( @@ -1036,7 +1059,7 @@ mod tests { Options { ready: limit.clone(), future: limit.clone(), ..Default::default() }; let api = Arc::new(TestApi::default()); - let pool = Pool::new(options, true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); let xt = uxt(Transfer { from: Alice.into(), @@ -1074,7 +1097,7 @@ mod tests { Options { ready: limit.clone(), future: limit.clone(), ..Default::default() }; let api = Arc::new(TestApi::default()); - let pool = Pool::new(options, true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); // after validation `IncludeData` will have priority set to 9001 // (validate_transaction mock) @@ -1106,7 +1129,7 @@ mod tests { Options { ready: limit.clone(), future: limit.clone(), ..Default::default() }; let api = Arc::new(TestApi::default()); - let pool = Pool::new(options, true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); let han_of_block0 = api.expect_hash_and_number(0); @@ -1151,7 +1174,11 @@ mod tests { let mut api = TestApi::default(); api.delay = Arc::new(Mutex::new(rx.into())); let api = Arc::new(api); - let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone())); + let pool = Arc::new(Pool::new_with_staticly_sized_rotator( + Default::default(), + true.into(), + api.clone(), + )); let han_of_block0 = api.expect_hash_and_number(0); diff --git a/substrate/client/transaction-pool/src/graph/rotator.rs b/substrate/client/transaction-pool/src/graph/rotator.rs index 9a2e269b5eed..80d8f24144c8 100644 --- a/substrate/client/transaction-pool/src/graph/rotator.rs +++ b/substrate/client/transaction-pool/src/graph/rotator.rs @@ -31,7 +31,10 @@ use std::{ use super::base_pool::Transaction; /// Expected size of the banned extrinsics cache. -const EXPECTED_SIZE: usize = 2048; +const DEFAULT_EXPECTED_SIZE: usize = 2048; + +/// The default duration, in seconds, for which an extrinsic is banned. +const DEFAULT_BAN_TIME_SECS: u64 = 30 * 60; /// Pool rotator is responsible to only keep fresh extrinsics in the pool. /// @@ -42,18 +45,39 @@ pub struct PoolRotator { ban_time: Duration, /// Currently banned extrinsics. banned_until: RwLock>, + /// Expected size of the banned extrinsics cache. + expected_size: usize, +} + +impl Clone for PoolRotator { + fn clone(&self) -> Self { + Self { + ban_time: self.ban_time, + banned_until: RwLock::new(self.banned_until.read().clone()), + expected_size: self.expected_size, + } + } } impl Default for PoolRotator { fn default() -> Self { - Self { ban_time: Duration::from_secs(60 * 30), banned_until: Default::default() } + Self { + ban_time: Duration::from_secs(DEFAULT_BAN_TIME_SECS), + banned_until: Default::default(), + expected_size: DEFAULT_EXPECTED_SIZE, + } } } impl PoolRotator { /// New rotator instance with specified ban time. pub fn new(ban_time: Duration) -> Self { - Self { ban_time, banned_until: Default::default() } + Self { ban_time, ..Self::default() } + } + + /// New rotator instance with specified ban time and expected cache size. + pub fn new_with_expected_size(ban_time: Duration, expected_size: usize) -> Self { + Self { expected_size, ..Self::new(ban_time) } } /// Returns `true` if extrinsic hash is currently banned. @@ -69,8 +93,8 @@ impl PoolRotator { banned.insert(hash, *now + self.ban_time); } - if banned.len() > 2 * EXPECTED_SIZE { - while banned.len() > EXPECTED_SIZE { + if banned.len() > 2 * self.expected_size { + while banned.len() > self.expected_size { if let Some(key) = banned.keys().next().cloned() { banned.remove(&key); } @@ -201,16 +225,16 @@ mod tests { let past_block = 0; // when - for i in 0..2 * EXPECTED_SIZE { + for i in 0..2 * DEFAULT_EXPECTED_SIZE { let tx = tx_with(i as u64, past_block); assert!(rotator.ban_if_stale(&now, past_block, &tx)); } - assert_eq!(rotator.banned_until.read().len(), 2 * EXPECTED_SIZE); + assert_eq!(rotator.banned_until.read().len(), 2 * DEFAULT_EXPECTED_SIZE); // then - let tx = tx_with(2 * EXPECTED_SIZE as u64, past_block); + let tx = tx_with(2 * DEFAULT_EXPECTED_SIZE as u64, past_block); // trigger a garbage collection assert!(rotator.ban_if_stale(&now, past_block, &tx)); - assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE); + assert_eq!(rotator.banned_until.read().len(), DEFAULT_EXPECTED_SIZE); } } diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 14df63d9673e..3f7bf4773de7 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -121,16 +121,41 @@ impl Clone for ValidatedPool { listener: Default::default(), pool: RwLock::from(self.pool.read().clone()), import_notification_sinks: Default::default(), - rotator: PoolRotator::default(), + rotator: self.rotator.clone(), } } } impl ValidatedPool { + /// Create a new transaction pool with statically sized rotator. + pub fn new_with_staticly_sized_rotator( + options: Options, + is_validator: IsValidator, + api: Arc, + ) -> Self { + let ban_time = options.ban_time; + Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time)) + } + /// Create a new transaction pool. pub fn new(options: Options, is_validator: IsValidator, api: Arc) -> Self { - let base_pool = base::BasePool::new(options.reject_future_transactions); let ban_time = options.ban_time; + let total_count = options.total_count(); + Self::new_with_rotator( + options, + is_validator, + api, + PoolRotator::new_with_expected_size(ban_time, total_count), + ) + } + + fn new_with_rotator( + options: Options, + is_validator: IsValidator, + api: Arc, + rotator: PoolRotator>, + ) -> Self { + let base_pool = base::BasePool::new(options.reject_future_transactions); Self { is_validator, options, @@ -138,7 +163,7 @@ impl ValidatedPool { api, pool: RwLock::new(base_pool), import_notification_sinks: Default::default(), - rotator: PoolRotator::new(ban_time), + rotator, } } diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs index f22fa2ddabde..caa09585b28b 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs @@ -384,7 +384,11 @@ mod tests { #[test] fn revalidation_queue_works() { let api = Arc::new(TestApi::default()); - let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone())); + let pool = Arc::new(Pool::new_with_staticly_sized_rotator( + Default::default(), + true.into(), + api.clone(), + )); let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone())); let uxt = uxt(Transfer { @@ -414,7 +418,11 @@ mod tests { #[test] fn revalidation_queue_skips_revalidation_for_unknown_block_hash() { let api = Arc::new(TestApi::default()); - let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone())); + let pool = Arc::new(Pool::new_with_staticly_sized_rotator( + Default::default(), + true.into(), + api.clone(), + )); let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone())); let uxt0 = uxt(Transfer { diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index e7504012ca67..2b32704945c7 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -141,7 +141,11 @@ where finalized_hash: Block::Hash, options: graph::Options, ) -> (Self, Pin + Send>>) { - let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone())); + let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator( + options, + true.into(), + pool_api.clone(), + )); let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background( pool_api.clone(), pool.clone(), @@ -177,7 +181,11 @@ where best_block_hash: Block::Hash, finalized_hash: Block::Hash, ) -> Self { - let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone())); + let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator( + options, + is_validator, + pool_api.clone(), + )); let (revalidation_queue, background_task) = match revalidation_type { RevalidationType::Light => (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None), diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs index 8bf08122995c..dd82c52a6047 100644 --- a/substrate/client/transaction-pool/tests/fatp.rs +++ b/substrate/client/transaction-pool/tests/fatp.rs @@ -2199,7 +2199,7 @@ fn import_sink_works3() { pool.submit_one(genesis, SOURCE, xt1.clone()), ]; - let x = block_on(futures::future::join_all(submissions)); + block_on(futures::future::join_all(submissions)); let header01a = api.push_block(1, vec![], true); let header01b = api.push_block(1, vec![], true); @@ -2213,8 +2213,6 @@ fn import_sink_works3() { assert_pool_status!(header01a.hash(), &pool, 1, 1); assert_pool_status!(header01b.hash(), &pool, 1, 1); - log::debug!("xxx {x:#?}"); - let import_events = futures::executor::block_on_stream(import_stream).take(1).collect::>(); diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index 20997606c607..de35726435f0 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -49,7 +49,7 @@ const LOG_TARGET: &str = "txpool"; fn pool() -> (Pool, Arc) { let api = Arc::new(TestApi::with_alice_nonce(209)); - (Pool::new(Default::default(), true.into(), api.clone()), api) + (Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), api) } fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { @@ -224,7 +224,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { api.set_valid_modifier(Box::new(|v: &mut ValidTransaction| { v.provides.push(vec![155]); })); - let pool = Pool::new(Default::default(), true.into(), api.clone()); + let pool = Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()); let xt0 = Arc::from(uxt(Alice, 209)); block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, xt0.clone())) .expect("1. Imported");