Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: rotator cache size now depends on pool's limits #7102

Merged
merged 10 commits into from
Jan 13, 2025
8 changes: 8 additions & 0 deletions prdoc/pr_7102.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: '`fatxpool`: rotator cache size now depends on pool''s limits'
doc:
- audience: Node Dev
description: |-
This PR modifies the hard-coded size of extrinsics cache within `PoolRotator` to be inline with pool limits. It only applies to fork-aware transaction pool. For the legacy (single-state) transaction pool the logic remains untouched.
crates:
- name: sc-transaction-pool
bump: minor
12 changes: 10 additions & 2 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,22 @@ fn benchmark_main(c: &mut Criterion) {
c.bench_function("sequential 50 tx", |b| {
b.iter(|| {
let api = Arc::from(TestApi::new_dependant());
bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 50, api);
bench_configured(
Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()),
50,
api,
);
});
});

c.bench_function("random 100 tx", |b| {
b.iter(|| {
let api = Arc::from(TestApi::default());
bench_configured(Pool::new(Default::default(), true.into(), api.clone()), 100, api);
bench_configured(
Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()),
100,
api,
);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,5 @@ pub(crate) fn uxt(transfer: Transfer) -> Extrinsic {

pub(crate) fn pool() -> (Pool<TestApi>, Arc<TestApi>) {
let api = Arc::new(TestApi::default());
(Pool::new(Default::default(), true.into(), api.clone()), api)
(Pool::new_with_staticly_sized_rotator(Default::default(), true.into(), api.clone()), api)
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ where
let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
loop {
if let Some(dropped) = ctx.get_pending_dropped_transaction() {
debug!("dropped_watcher: sending out (pending): {dropped:?}");
trace!("dropped_watcher: sending out (pending): {dropped:?}");
return Some((dropped, ctx));
}
tokio::select! {
biased;
Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
trace!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ where
pool_api.clone(),
listener.clone(),
metrics.clone(),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * options.total_count(),
options.ready.total_bytes + options.future.total_bytes,
));

Expand Down
49 changes: 38 additions & 11 deletions substrate/client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ impl Default for Options {
}
}

impl Options {
/// Total (ready+future) maximal number of transactions in the pool.
pub fn total_count(&self) -> usize {
self.ready.count + self.future.count
}
}

/// Should we check that the transaction is banned
/// in the pool, before we verify it?
#[derive(Copy, Clone)]
Expand All @@ -172,6 +179,21 @@ pub struct Pool<B: ChainApi> {
}

impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool with statically sized rotator.
pub fn new_with_staticly_sized_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
) -> Self {
Self {
validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
options,
is_validator,
api,
)),
}
}

/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
Expand Down Expand Up @@ -284,6 +306,7 @@ impl<B: ChainApi> Pool<B> {
let mut validated_counter: usize = 0;

let mut future_tags = Vec::new();
let now = Instant::now();
for (extrinsic, in_pool_tags) in all {
match in_pool_tags {
// reuse the tags for extrinsics that were found in the pool
Expand Down Expand Up @@ -319,7 +342,7 @@ impl<B: ChainApi> Pool<B> {
}
}

log::trace!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}");
log::debug!(target: LOG_TARGET,"prune: validated_counter:{validated_counter}, took:{:?}", now.elapsed());

self.prune_tags(at, future_tags, in_pool_hashes).await
}
Expand Down Expand Up @@ -351,6 +374,7 @@ impl<B: ChainApi> Pool<B> {
tags: impl IntoIterator<Item = Tag>,
known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
) {
let now = Instant::now();
log::trace!(target: LOG_TARGET, "Pruning at {:?}", at);
// Prune all transactions that provide given tags
let prune_status = self.validated_pool.prune_tags(tags);
Expand All @@ -369,9 +393,8 @@ impl<B: ChainApi> Pool<B> {
let reverified_transactions =
self.verify(at, pruned_transactions, CheckBannedBeforeVerify::Yes).await;

let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect();

log::trace!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}", &at, reverified_transactions.len());
let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
log::debug!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}, reverification took: {:?}", &at, reverified_transactions.len(), now.elapsed());
log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}");

// And finally - submit reverified transactions back to the pool
Expand Down Expand Up @@ -580,7 +603,7 @@ mod tests {
fn should_reject_unactionable_transactions() {
// given
let api = Arc::new(TestApi::default());
let pool = Pool::new(
let pool = Pool::new_with_staticly_sized_rotator(
Default::default(),
// the node does not author blocks
false.into(),
Expand Down Expand Up @@ -767,7 +790,7 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let hash1 =
block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap();
Expand Down Expand Up @@ -803,7 +826,7 @@ mod tests {
let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

// when
block_on(
Expand Down Expand Up @@ -1036,7 +1059,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let xt = uxt(Transfer {
from: Alice.into(),
Expand Down Expand Up @@ -1074,7 +1097,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

// after validation `IncludeData` will have priority set to 9001
// (validate_transaction mock)
Expand Down Expand Up @@ -1106,7 +1129,7 @@ mod tests {
Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };

let api = Arc::new(TestApi::default());
let pool = Pool::new(options, true.into(), api.clone());
let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());

let han_of_block0 = api.expect_hash_and_number(0);

Expand Down Expand Up @@ -1151,7 +1174,11 @@ mod tests {
let mut api = TestApi::default();
api.delay = Arc::new(Mutex::new(rx.into()));
let api = Arc::new(api);
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));

let han_of_block0 = api.expect_hash_and_number(0);

Expand Down
42 changes: 33 additions & 9 deletions substrate/client/transaction-pool/src/graph/rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use std::{
use super::base_pool::Transaction;

/// Expected size of the banned extrinsics cache.
const EXPECTED_SIZE: usize = 2048;
const DEFAULT_EXPECTED_SIZE: usize = 2048;

/// The default duration, in seconds, for which an extrinsic is banned.
const DEFAULT_BAN_TIME_SECS: u64 = 30 * 60;

/// Pool rotator is responsible to only keep fresh extrinsics in the pool.
///
Expand All @@ -42,18 +45,39 @@ pub struct PoolRotator<Hash> {
ban_time: Duration,
/// Currently banned extrinsics.
banned_until: RwLock<HashMap<Hash, Instant>>,
/// Expected size of the banned extrinsics cache.
expected_size: usize,
}

impl<Hash: Clone> Clone for PoolRotator<Hash> {
fn clone(&self) -> Self {
Self {
ban_time: self.ban_time,
banned_until: RwLock::new(self.banned_until.read().clone()),
expected_size: self.expected_size,
}
}
}

impl<Hash: hash::Hash + Eq> Default for PoolRotator<Hash> {
fn default() -> Self {
Self { ban_time: Duration::from_secs(60 * 30), banned_until: Default::default() }
Self {
ban_time: Duration::from_secs(DEFAULT_BAN_TIME_SECS),
banned_until: Default::default(),
expected_size: DEFAULT_EXPECTED_SIZE,
}
}
}

impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
/// New rotator instance with specified ban time.
pub fn new(ban_time: Duration) -> Self {
Self { ban_time, banned_until: Default::default() }
Self { ban_time, ..Self::default() }
}

/// New rotator instance with specified ban time and expected cache size.
pub fn new_with_expected_size(ban_time: Duration, expected_size: usize) -> Self {
Self { expected_size, ..Self::new(ban_time) }
}

/// Returns `true` if extrinsic hash is currently banned.
Expand All @@ -69,8 +93,8 @@ impl<Hash: hash::Hash + Eq + Clone> PoolRotator<Hash> {
banned.insert(hash, *now + self.ban_time);
}

if banned.len() > 2 * EXPECTED_SIZE {
while banned.len() > EXPECTED_SIZE {
if banned.len() > 2 * self.expected_size {
while banned.len() > self.expected_size {
if let Some(key) = banned.keys().next().cloned() {
banned.remove(&key);
}
Expand Down Expand Up @@ -201,16 +225,16 @@ mod tests {
let past_block = 0;

// when
for i in 0..2 * EXPECTED_SIZE {
for i in 0..2 * DEFAULT_EXPECTED_SIZE {
let tx = tx_with(i as u64, past_block);
assert!(rotator.ban_if_stale(&now, past_block, &tx));
}
assert_eq!(rotator.banned_until.read().len(), 2 * EXPECTED_SIZE);
assert_eq!(rotator.banned_until.read().len(), 2 * DEFAULT_EXPECTED_SIZE);

// then
let tx = tx_with(2 * EXPECTED_SIZE as u64, past_block);
let tx = tx_with(2 * DEFAULT_EXPECTED_SIZE as u64, past_block);
// trigger a garbage collection
assert!(rotator.ban_if_stale(&now, past_block, &tx));
assert_eq!(rotator.banned_until.read().len(), EXPECTED_SIZE);
assert_eq!(rotator.banned_until.read().len(), DEFAULT_EXPECTED_SIZE);
}
}
31 changes: 28 additions & 3 deletions substrate/client/transaction-pool/src/graph/validated_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,49 @@ impl<B: ChainApi> Clone for ValidatedPool<B> {
listener: Default::default(),
pool: RwLock::from(self.pool.read().clone()),
import_notification_sinks: Default::default(),
rotator: PoolRotator::default(),
rotator: self.rotator.clone(),
}
}
}

impl<B: ChainApi> ValidatedPool<B> {
/// Create a new transaction pool with statically sized rotator.
pub fn new_with_staticly_sized_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
) -> Self {
let ban_time = options.ban_time;
Self::new_with_rotator(options, is_validator, api, PoolRotator::new(ban_time))
}

/// Create a new transaction pool.
pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
let ban_time = options.ban_time;
let total_count = options.total_count();
Self::new_with_rotator(
options,
is_validator,
api,
PoolRotator::new_with_expected_size(ban_time, total_count),
)
}

fn new_with_rotator(
options: Options,
is_validator: IsValidator,
api: Arc<B>,
rotator: PoolRotator<ExtrinsicHash<B>>,
) -> Self {
let base_pool = base::BasePool::new(options.reject_future_transactions);
Self {
is_validator,
options,
listener: Default::default(),
api,
pool: RwLock::new(base_pool),
import_notification_sinks: Default::default(),
rotator: PoolRotator::new(ban_time),
rotator,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,11 @@ mod tests {
#[test]
fn revalidation_queue_works() {
let api = Arc::new(TestApi::default());
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));
let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));

let uxt = uxt(Transfer {
Expand Down Expand Up @@ -414,7 +418,11 @@ mod tests {
#[test]
fn revalidation_queue_skips_revalidation_for_unknown_block_hash() {
let api = Arc::new(TestApi::default());
let pool = Arc::new(Pool::new(Default::default(), true.into(), api.clone()));
let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
Default::default(),
true.into(),
api.clone(),
));
let queue = Arc::new(RevalidationQueue::new(api.clone(), pool.clone()));

let uxt0 = uxt(Transfer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ where
finalized_hash: Block::Hash,
options: graph::Options,
) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
let pool = Arc::new(graph::Pool::new(options, true.into(), pool_api.clone()));
let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
options,
true.into(),
pool_api.clone(),
));
let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
pool_api.clone(),
pool.clone(),
Expand Down Expand Up @@ -177,7 +181,11 @@ where
best_block_hash: Block::Hash,
finalized_hash: Block::Hash,
) -> Self {
let pool = Arc::new(graph::Pool::new(options, is_validator, pool_api.clone()));
let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
options,
is_validator,
pool_api.clone(),
));
let (revalidation_queue, background_task) = match revalidation_type {
RevalidationType::Light =>
(revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None),
Expand Down
Loading
Loading