-
Notifications
You must be signed in to change notification settings - Fork 319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
txview: run status and age checks on incoming transactions #4506
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
use { | ||
super::{ | ||
scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics}, | ||
transaction_priority_id::TransactionPriorityId, | ||
transaction_state::TransactionState, | ||
transaction_state_container::{ | ||
SharedBytes, StateContainer, TransactionViewState, TransactionViewStateContainer, | ||
EXTRA_CAPACITY, | ||
}, | ||
}, | ||
crate::banking_stage::{ | ||
|
@@ -405,8 +407,57 @@ impl TransactionViewReceiveAndBuffer { | |
|
||
let mut num_received = 0usize; | ||
let mut num_buffered = 0usize; | ||
let mut num_dropped_on_status_age_checks = 0usize; | ||
let mut num_dropped_on_capacity = 0usize; | ||
let mut num_dropped_on_receive = 0usize; | ||
|
||
// Create temporary batches of transactions to be age-checked. | ||
let mut transaction_ids = ArrayVec::<_, EXTRA_CAPACITY>::new(); | ||
let lock_results: [_; EXTRA_CAPACITY] = core::array::from_fn(|_| Ok(())); | ||
let mut error_counters = TransactionErrorMetrics::default(); | ||
|
||
let mut run_status_age_checks = | ||
|container: &mut TransactionViewStateContainer, | ||
transaction_ids: &mut ArrayVec<usize, 64>| { | ||
// Temporary scope so that transaction references are immediately | ||
// dropped and transactions not passing | ||
let check_results = { | ||
let mut transactions = ArrayVec::<_, EXTRA_CAPACITY>::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's an uninitialized array on the stack underlying this |
||
transactions.extend(transaction_ids.iter().map(|id| { | ||
&container | ||
.get_transaction_ttl(*id) | ||
.expect("transaction must exist") | ||
.transaction | ||
})); | ||
working_bank.check_transactions::<RuntimeTransaction<_>>( | ||
&transactions, | ||
&lock_results, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's currently fine to call |
||
MAX_PROCESSING_AGE, | ||
&mut error_counters, | ||
) | ||
}; | ||
|
||
// Remove all invalid transactions from the map; insert passing | ||
// ids into the priority queue. | ||
for (transaction_id, check_result) in transaction_ids.drain(..).zip(check_results) { | ||
if check_result.is_ok() { | ||
let priority = container | ||
.get_mut_transaction_state(transaction_id) | ||
.expect("transaction must exist") | ||
.priority(); | ||
if container.push_id_into_queue(TransactionPriorityId::new( | ||
priority, | ||
transaction_id, | ||
)) { | ||
num_dropped_on_capacity += 1; | ||
} | ||
} else { | ||
num_dropped_on_status_age_checks += 1; | ||
container.remove_by_id(transaction_id); | ||
} | ||
} | ||
}; | ||
|
||
for packet_batch in packet_batch_message.iter() { | ||
for packet in packet_batch.iter() { | ||
let Some(packet_data) = packet.data(..) else { | ||
|
@@ -416,9 +467,8 @@ impl TransactionViewReceiveAndBuffer { | |
num_received += 1; | ||
|
||
// Reserve free-space to copy packet into, run sanitization checks, and insert. | ||
if container.try_insert_with_data( | ||
packet_data, | ||
|bytes| match Self::try_handle_packet( | ||
if let Some(transaction_id) = container.try_insert_with_data(packet_data, |bytes| { | ||
match Self::try_handle_packet( | ||
bytes, | ||
root_bank, | ||
working_bank, | ||
|
@@ -434,20 +484,32 @@ impl TransactionViewReceiveAndBuffer { | |
num_dropped_on_receive += 1; | ||
Err(()) | ||
} | ||
}, | ||
) { | ||
num_dropped_on_capacity += 1; | ||
}; | ||
} | ||
}) { | ||
transaction_ids.push(transaction_id); | ||
|
||
// If at capacity, run checks and remove invalid transactions. | ||
if transaction_ids.len() == EXTRA_CAPACITY { | ||
run_status_age_checks(container, &mut transaction_ids); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Any remaining packets undergo status/age checks | ||
run_status_age_checks(container, &mut transaction_ids); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's check if |
||
|
||
let buffer_time_us = start.elapsed().as_micros() as u64; | ||
timing_metrics.update(|timing_metrics| { | ||
saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); | ||
}); | ||
count_metrics.update(|count_metrics| { | ||
saturating_add_assign!(count_metrics.num_received, num_received); | ||
saturating_add_assign!(count_metrics.num_buffered, num_buffered); | ||
saturating_add_assign!( | ||
count_metrics.num_dropped_on_age_and_status, | ||
num_dropped_on_status_age_checks | ||
); | ||
saturating_add_assign!( | ||
count_metrics.num_dropped_on_capacity, | ||
num_dropped_on_capacity | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,11 +95,12 @@ pub(crate) trait StateContainer<Tx: TransactionWithMeta> { | |
fn get_min_max_priority(&self) -> MinMaxResult<u64>; | ||
} | ||
|
||
// Extra capacity is added because some additional space is needed when | ||
// pushing a new transaction into the container to avoid reallocation. | ||
pub(crate) const EXTRA_CAPACITY: usize = 64; | ||
|
||
impl<Tx: TransactionWithMeta> StateContainer<Tx> for TransactionStateContainer<Tx> { | ||
fn with_capacity(capacity: usize) -> Self { | ||
// Extra capacity is added because some additional space is needed when | ||
// pushing a new transaction into the container to avoid reallocation. | ||
const EXTRA_CAPACITY: usize = 64; | ||
Self { | ||
priority_queue: MinMaxHeap::with_capacity(capacity), | ||
id_to_transaction_state: Slab::with_capacity(capacity + EXTRA_CAPACITY), | ||
|
@@ -214,15 +215,13 @@ pub struct TransactionViewStateContainer { | |
} | ||
|
||
impl TransactionViewStateContainer { | ||
/// Returns true if packet was dropped due to capacity limits. | ||
// Insert into the map, but NOT into the priority queue. | ||
// Returns the id of the transaction if it was inserted. | ||
pub(crate) fn try_insert_with_data( | ||
&mut self, | ||
data: &[u8], | ||
f: impl FnOnce(SharedBytes) -> Result<TransactionState<RuntimeTransactionView>, ()>, | ||
) -> bool { | ||
// Get remaining capacity before inserting. | ||
let remaining_capacity = self.remaining_capacity(); | ||
|
||
) -> Option<usize> { | ||
// Get a vacant entry in the slab. | ||
let vacant_entry = self.inner.get_vacant_map_entry(); | ||
let transaction_id = vacant_entry.key(); | ||
|
@@ -248,16 +247,11 @@ impl TransactionViewStateContainer { | |
} | ||
|
||
// Attempt to insert the transaction. | ||
match f(Arc::clone(bytes_entry)) { | ||
Ok(state) => { | ||
let priority_id = TransactionPriorityId::new(state.priority(), transaction_id); | ||
vacant_entry.insert(state); | ||
|
||
// Push the transaction into the queue. | ||
self.inner | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No longer push into the queue here. We just let up to 64 (see EXTRA_CAPACITY const) additional transactions to live in the map. Once we reach end of incoming tx stream or hit 64 packets we run age checks and only THEN do we insert into the priority queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think If the priority capacity was 100 and we only had 36 elements in the transaction slab, then a new batch of 64 transactions would get added to the slab and If I'm reading this change correctly, we used to get the remaining capacity before inserting but no longer do that, causing this new issue to emerge. |
||
.push_id_into_queue_with_remaining_capacity(priority_id, remaining_capacity) | ||
} | ||
Err(_) => false, | ||
if let Ok(state) = f(Arc::clone(bytes_entry)) { | ||
vacant_entry.insert(state); | ||
Some(transaction_id) | ||
} else { | ||
None | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complexity here can go away once we have
Bytes
backed transactions coming from upstream, since we do not need to do the weird "insert to map only" pattern.