diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f4ac6c6040eda8..75c4d1b8386e55 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -470,25 +470,31 @@ impl Consumer { chunk_offset: usize, pre_results: impl Iterator>, ) -> ProcessTransactionBatchOutput { + // Lock accounts so that other threads cannot encode transactions that + // will modify the same account state + let (mut batch, lock_us) = + measure_us!(bank.prepare_sanitized_batch_with_results(txs, pre_results)); + + // After locking, select the transactions that can fit into the block let ( (transaction_qos_cost_results, cost_model_throttled_transactions_count), cost_model_us, ) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs( bank, - txs, - pre_results + batch.sanitized_transactions(), + batch.lock_results().iter().cloned(), )); - // Only lock accounts for those transactions are selected for the block; - // Once accounts are locked, other threads cannot encode transactions that will modify the - // same account state - let (batch, lock_us) = measure_us!(bank.prepare_sanitized_batch_with_results( - txs, - transaction_qos_cost_results.iter().map(|r| match r { - Ok(_cost) => Ok(()), - Err(err) => Err(err.clone()), - }) - )); + // Unlock accounts for any transactions that were not selected for the block; + batch.unlock_failures( + transaction_qos_cost_results + .iter() + .map(|r| match r { + Ok(_cost) => Ok(()), + Err(err) => Err(err.clone()), + }) + .collect(), + ); // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit @@ -496,7 +502,8 @@ impl Consumer { let mut execute_and_commit_transactions_output = self.execute_and_commit_transactions_locked(bank, &batch); - // Once the accounts are new transactions can enter the pipeline to process them + // Drop the batch to unlock transaction accounts so that new + // transactions can enter the pipeline to process them let (_, unlock_us) = measure_us!(drop(batch)); let ExecuteAndCommitTransactionsOutput { @@ -1656,6 +1663,137 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } + #[test] + fn test_bank_process_and_record_transactions_unlock_cost_tracker_failures() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10_000); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config).0; + let pubkey = solana_sdk::pubkey::new_rand(); + let pubkey1 = solana_sdk::pubkey::new_rand(); + + let transactions = sanitize_transactions(vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), + system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), + ]); + + let transactions2 = sanitize_transactions(vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()), + system_transaction::transfer(&mint_keypair, &pubkey1, 2, genesis_config.hash()), + ]); + + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &pubkey, + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + let recorder = poh_recorder.new_recorder(); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + let poh_simulator = simulate_poh(record_receiver, &poh_recorder); + + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let poh_lock = poh_recorder.write().unwrap(); + let worker1 = { + let bank = bank.clone(); + let committer = Committer::new( + None, + replay_vote_sender.clone(), + Arc::new(PrioritizationFeeCache::new(0u64)), + ); + let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None); + + std::thread::spawn(move || { + // Set strict cost tracker limits so that transactions will be rejected + bank.write_cost_tracker().unwrap().set_limits(0, 0, 0); + + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = process_transactions_batch_output.execute_and_commit_transactions_output; + + assert_eq!(transactions_attempted_execution_count, 2); + assert_eq!(executed_transactions_count, 0); + assert_eq!(retryable_transaction_indexes, vec![0, 1]); + assert!(commit_transactions_result.is_ok()); + }) + }; + + // Sleep so that processing in the other thread hits the poh recorder lock + std::thread::sleep(Duration::from_millis(10)); + + let worker2 = std::thread::spawn(move || { + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); + let consumer = Consumer::new(committer, recorder, QosService::new(2), None); + + // Relax cost tracker limits so that transactions can be processed again + bank.write_cost_tracker() + .unwrap() + .set_limits(u64::MAX, u64::MAX, u64::MAX); + + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions2, 0); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = process_transactions_batch_output.execute_and_commit_transactions_output; + + assert_eq!(transactions_attempted_execution_count, 2); + assert_eq!(executed_transactions_count, 1); + assert_eq!(retryable_transaction_indexes, vec![1]); + assert!(commit_transactions_result.is_ok()); + }); + + // Allow both workers to advance + drop(poh_lock); + + // Wait for both workers to finish + assert!(worker1.join().is_ok()); + assert!(worker2.join().is_ok()); + + poh_recorder + .read() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); + } + #[test] fn test_process_transactions_instruction_error() { solana_logger::setup();