Skip to content

Commit

Permalink
Improve transaction inclusion when block is nearly full
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry committed Mar 7, 2024
1 parent adefcbb commit 5891408
Showing 1 changed file with 151 additions and 13 deletions.
164 changes: 151 additions & 13 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,33 +470,40 @@ impl Consumer {
chunk_offset: usize,
pre_results: impl Iterator<Item = Result<(), TransactionError>>,
) -> ProcessTransactionBatchOutput {
// Lock accounts so that other threads cannot encode transactions that
// will modify the same account state
let (mut batch, lock_us) =
measure_us!(bank.prepare_sanitized_batch_with_results(txs, pre_results));

// After locking, select the transactions that can fit into the block
let (
(transaction_qos_cost_results, cost_model_throttled_transactions_count),
cost_model_us,
) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs(
bank,
txs,
pre_results
batch.sanitized_transactions(),
batch.lock_results().iter().cloned(),
));

// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let (batch, lock_us) = measure_us!(bank.prepare_sanitized_batch_with_results(
txs,
transaction_qos_cost_results.iter().map(|r| match r {
Ok(_cost) => Ok(()),
Err(err) => Err(err.clone()),
})
));
// Unlock accounts for any transactions that were not selected for the block;
batch.unlock_failures(
transaction_qos_cost_results
.iter()
.map(|r| match r {
Ok(_cost) => Ok(()),
Err(err) => Err(err.clone()),
})
.collect(),
);

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
// and WouldExceedMaxAccountDataCostLimit
let mut execute_and_commit_transactions_output =
self.execute_and_commit_transactions_locked(bank, &batch);

// Once the accounts are new transactions can enter the pipeline to process them
// Drop the batch to unlock transaction accounts so that new
// transactions can enter the pipeline to process them
let (_, unlock_us) = measure_us!(drop(batch));

let ExecuteAndCommitTransactionsOutput {
Expand Down Expand Up @@ -1656,6 +1663,137 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_bank_process_and_record_transactions_unlock_cost_tracker_failures() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(10_000);
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config).0;
let pubkey = solana_sdk::pubkey::new_rand();
let pubkey1 = solana_sdk::pubkey::new_rand();

let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
]);

let transactions2 = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 2, genesis_config.hash()),
]);

let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.clone(),
Some((4, 4)),
bank.ticks_per_slot(),
&pubkey,
Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&PohConfig::default(),
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.new_recorder();
let poh_recorder = Arc::new(RwLock::new(poh_recorder));

poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let poh_lock = poh_recorder.write().unwrap();
let worker1 = {
let bank = bank.clone();
let committer = Committer::new(
None,
replay_vote_sender.clone(),
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);

std::thread::spawn(move || {
// Set strict cost tracker limits so that transactions will be rejected
bank.write_cost_tracker().unwrap().set_limits(0, 0, 0);

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);

let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = process_transactions_batch_output.execute_and_commit_transactions_output;

assert_eq!(transactions_attempted_execution_count, 2);
assert_eq!(executed_transactions_count, 0);
assert_eq!(retryable_transaction_indexes, vec![0, 1]);
assert!(commit_transactions_result.is_ok());
})
};

// Sleep so that processing in the other thread hits the poh recorder lock
std::thread::sleep(Duration::from_millis(10));

let worker2 = std::thread::spawn(move || {
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(2), None);

// Relax cost tracker limits so that transactions can be processed again
bank.write_cost_tracker()
.unwrap()
.set_limits(u64::MAX, u64::MAX, u64::MAX);

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions2, 0);

let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = process_transactions_batch_output.execute_and_commit_transactions_output;

assert_eq!(transactions_attempted_execution_count, 2);
assert_eq!(executed_transactions_count, 1);
assert_eq!(retryable_transaction_indexes, vec![1]);
assert!(commit_transactions_result.is_ok());
});

// Allow both workers to advance
drop(poh_lock);

// Wait for both workers to finish
assert!(worker1.join().is_ok());
assert!(worker2.join().is_ok());

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_process_transactions_instruction_error() {
solana_logger::setup();
Expand Down

0 comments on commit 5891408

Please sign in to comment.