diff --git a/Cargo.lock b/Cargo.lock index b010687a85f117..3431ab4de783eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,14 @@ dependencies = [ "solana-version", ] +[[package]] +name = "agave-banking-stage-ingress-types" +version = "2.2.0" +dependencies = [ + "crossbeam-channel", + "solana-perf", +] + [[package]] name = "agave-cargo-registry" version = "2.2.0" @@ -6053,6 +6061,8 @@ dependencies = [ name = "solana-banking-bench" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", + "assert_matches", "clap 3.2.23", "crossbeam-channel", "log", @@ -6781,6 +6791,7 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "ahash 0.8.11", "anyhow", "arrayvec", diff --git a/Cargo.toml b/Cargo.toml index aef63e4807a863..9af98ac6710e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "accounts-db/store-histogram", "accounts-db/store-tool", "banking-bench", + "banking-stage-ingress-types", "banks-client", "banks-interface", "banks-server", @@ -255,6 +256,7 @@ check-cfg = [ [workspace.dependencies] Inflector = "0.11.4" +agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=2.2.0" } agave-transaction-view = { path = "transaction-view", version = "=2.2.0" } aquamarine = "0.3.3" aes-gcm-siv = "0.11.1" diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index 67ca53f88324c9..a86629552ddf84 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -9,13 +9,15 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +agave-banking-stage-ingress-types = { workspace = true } +assert_matches = { workspace = true } clap = { version = "3.1.8", features = ["derive", "cargo"] } crossbeam-channel = { workspace = true } log = { workspace = true } rand = { workspace = true } rayon = { workspace = true } solana-client = { workspace = true } -solana-core = { workspace = true } +solana-core = { workspace = true, features = ["dev-context-only-utils"] } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 99722c7cf6082e..6196f713666e19 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -1,5 +1,7 @@ #![allow(clippy::arithmetic_side_effects)] use { + agave_banking_stage_ingress_types::BankingPacketBatch, + assert_matches::assert_matches, clap::{crate_description, crate_name, Arg, ArgEnum, Command}, crossbeam_channel::{unbounded, Receiver}, log::*, @@ -7,10 +9,8 @@ use { rayon::prelude::*, solana_client::connection_cache::ConnectionCache, solana_core::{ - banking_stage::BankingStage, - banking_trace::{ - BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, - }, + banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage}, + banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, validator::BlockProductionMethod, }, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -349,7 +349,7 @@ fn main() { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new_for_benches(&genesis_config); let bank_forks = BankForks::new_rw_arc(bank0); - let mut bank = bank_forks.read().unwrap().working_bank(); + let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler(); // set cost tracker limits to MAX so it will not filter out TXs bank.write_cost_tracker() @@ -552,21 +552,24 @@ fn main() { poh_time.stop(); let mut new_bank_time = Measure::start("new_bank"); + if let Some((result, _timings)) = bank.wait_for_completed_scheduler() { + assert_matches!(result, Ok(_)); + } let new_slot = bank.slot() + 1; - let new_bank = Bank::new_from_parent(bank, &collector, new_slot); + let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot); new_bank_time.stop(); let mut insert_time = Measure::start("insert_time"); - bank_forks.write().unwrap().insert(new_bank); - bank = bank_forks.read().unwrap().working_bank(); + assert_matches!(poh_recorder.read().unwrap().bank(), None); + update_bank_forks_and_poh_recorder_for_new_tpu_bank( + &bank_forks, + &poh_recorder, + new_bank, + false, + ); + bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + assert_matches!(poh_recorder.read().unwrap().bank(), Some(_)); insert_time.stop(); - - assert!(poh_recorder.read().unwrap().bank().is_none()); - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - assert!(poh_recorder.read().unwrap().bank().is_some()); debug!( "new_bank_time: {}us insert_time: {}us poh_time: {}us", new_bank_time.as_us(), diff --git a/banking-stage-ingress-types/Cargo.toml b/banking-stage-ingress-types/Cargo.toml new file mode 100644 index 00000000000000..b27ab6d6b7e17a --- /dev/null +++ b/banking-stage-ingress-types/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "agave-banking-stage-ingress-types" +description = "Agave banking stage ingress types" +documentation = "https://docs.rs/agave-banking-stage-ingress-types" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +crossbeam-channel = { workspace = true } +solana-perf = { workspace = true } diff --git a/banking-stage-ingress-types/src/lib.rs b/banking-stage-ingress-types/src/lib.rs new file mode 100644 index 00000000000000..0fcfb6f31ed391 --- /dev/null +++ b/banking-stage-ingress-types/src/lib.rs @@ -0,0 +1,4 @@ +use {crossbeam_channel::Receiver, solana_perf::packet::PacketBatch, std::sync::Arc}; + +pub type BankingPacketBatch = Arc>; +pub type BankingPacketReceiver = Receiver; diff --git a/core/Cargo.toml b/core/Cargo.toml index 0f630d49d5c954..5f2eb1af2f22ae 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,9 +14,11 @@ edition = { workspace = true } codecov = { repository = "solana-labs/solana", branch = "master", service = "github" } [dependencies] +agave-banking-stage-ingress-types = { workspace = true } ahash = { workspace = true } anyhow = { workspace = true } arrayvec = { workspace = true } +assert_matches = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } bs58 = { workspace = true } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 5d764f4b7c1d44..dfad8bc8c227cf 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -2,6 +2,7 @@ #![feature(test)] use { + agave_banking_stage_ingress_types::BankingPacketBatch, solana_core::{banking_trace::Channels, validator::BlockProductionMethod}, solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction}, }; @@ -24,7 +25,7 @@ use { unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, BankingStage, BankingStageStats, }, - banking_trace::{BankingPacketBatch, BankingTracer}, + banking_trace::BankingTracer, }, solana_entry::entry::{next_hash, Entry}, solana_gossip::cluster_info::{ClusterInfo, Node}, diff --git a/core/benches/banking_trace.rs b/core/benches/banking_trace.rs index 66bfc84c630436..62b396354ce957 100644 --- a/core/benches/banking_trace.rs +++ b/core/benches/banking_trace.rs @@ -3,12 +3,13 @@ extern crate test; use { + agave_banking_stage_ingress_types::BankingPacketBatch, solana_core::banking_trace::{ for_test::{ drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer, }, - receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels, - TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, + receiving_loop_with_minimized_sender_overhead, BankingTracer, Channels, TraceError, + TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, }, std::{ path::PathBuf, diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 82a4e5e94a3e76..e811c9c6df9bd8 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -1,14 +1,17 @@ #![cfg(feature = "dev-context-only-utils")] use { crate::{ - banking_stage::{BankingStage, LikeClusterInfo}, + banking_stage::{ + update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo, + }, banking_trace::{ - BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent, - TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, - BASENAME, + BankingTracer, ChannelLabel, Channels, TimedTracedEvent, TracedEvent, TracedSender, + TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME, }, validator::BlockProductionMethod, }, + agave_banking_stage_ingress_types::BankingPacketBatch, + assert_matches::assert_matches, bincode::deserialize_from, crossbeam_channel::{unbounded, Sender}, itertools::Itertools, @@ -450,6 +453,9 @@ impl SimulatorLoop { info!("Bank::new_from_parent()!"); logger.log_jitter(&bank); + if let Some((result, _execute_timings)) = bank.wait_for_completed_scheduler() { + assert_matches!(result, Ok(())); + } bank.freeze(); let new_slot = if bank.slot() == self.parent_slot { info!("initial leader block!"); @@ -484,16 +490,17 @@ impl SimulatorLoop { logger.log_frozen_bank_cost(&bank); } self.retransmit_slots_sender.send(bank.slot()).unwrap(); - self.bank_forks.write().unwrap().insert(new_bank); + update_bank_forks_and_poh_recorder_for_new_tpu_bank( + &self.bank_forks, + &self.poh_recorder, + new_bank, + false, + ); bank = self .bank_forks .read() .unwrap() .working_bank_with_scheduler(); - self.poh_recorder - .write() - .unwrap() - .set_bank(bank.clone_with_scheduler(), false); } else { logger.log_ongoing_bank_cost(&bank); } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3d3e7b186c3597..845e2981d2fe6d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,6 +2,8 @@ //! to construct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. +#[cfg(feature = "dev-context-only-utils")] +use qualifier_attr::qualifiers; use { self::{ committer::Committer, @@ -23,9 +25,9 @@ use { scheduler_controller::SchedulerController, scheduler_error::SchedulerError, }, }, - banking_trace::BankingPacketReceiver, validator::BlockProductionMethod, }, + agave_banking_stage_ingress_types::BankingPacketReceiver, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, histogram::Histogram, solana_client::connection_cache::ConnectionCache, @@ -35,7 +37,7 @@ use { solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_poh::poh_recorder::{PohRecorder, TransactionRecorder}, solana_runtime::{ - bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}, @@ -716,11 +718,26 @@ impl BankingStage { } } +#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] +pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank( + bank_forks: &RwLock, + poh_recorder: &RwLock, + tpu_bank: Bank, + track_transaction_indexes: bool, +) { + let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); + poh_recorder + .write() + .unwrap() + .set_bank(tpu_bank, track_transaction_indexes); +} + #[cfg(test)] mod tests { use { super::*, - crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels}, + crate::banking_trace::{BankingTracer, Channels}, + agave_banking_stage_ingress_types::BankingPacketBatch, crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_entry::entry::{self, Entry, EntrySlice}, diff --git a/core/src/banking_stage/packet_deserializer.rs b/core/src/banking_stage/packet_deserializer.rs index 33f41cf377cc48..3c1a56b43b01de 100644 --- a/core/src/banking_stage/packet_deserializer.rs +++ b/core/src/banking_stage/packet_deserializer.rs @@ -5,7 +5,7 @@ use { immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, packet_filter::PacketFilterFailure, }, - crate::banking_trace::{BankingPacketBatch, BankingPacketReceiver}, + agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, crossbeam_channel::RecvTimeoutError, solana_perf::packet::PacketBatch, solana_sdk::saturating_add_assign, diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index e95b20c3df4f1e..e9e760ede455fe 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -6,7 +6,7 @@ use { unprocessed_transaction_storage::UnprocessedTransactionStorage, BankingStageStats, }, - crate::banking_trace::BankingPacketReceiver, + agave_banking_stage_ingress_types::BankingPacketReceiver, crossbeam_channel::RecvTimeoutError, solana_measure::{measure::Measure, measure_us}, solana_sdk::{saturating_add_assign, timing::timestamp}, diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 0a7bcf34fc0a01..8cd38b81ab3e67 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -435,19 +435,17 @@ impl SchedulerController { mod tests { use { super::*, - crate::{ - banking_stage::{ - consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, - packet_deserializer::PacketDeserializer, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, - tests::create_slow_genesis_config, - transaction_scheduler::{ - prio_graph_scheduler::PrioGraphSchedulerConfig, - receive_and_buffer::SanitizedTransactionReceiveAndBuffer, - }, + crate::banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + packet_deserializer::PacketDeserializer, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, + tests::create_slow_genesis_config, + transaction_scheduler::{ + prio_graph_scheduler::PrioGraphSchedulerConfig, + receive_and_buffer::SanitizedTransactionReceiveAndBuffer, }, - banking_trace::BankingPacketBatch, }, + agave_banking_stage_ingress_types::BankingPacketBatch, crossbeam_channel::{unbounded, Receiver, Sender}, itertools::Itertools, solana_gossip::cluster_info::ClusterInfo, diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index a997f02843ac2e..f7a97c28b4e43f 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -1,9 +1,9 @@ use { + agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, bincode::serialize_into, chrono::{DateTime, Local}, crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError}, rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender}, - solana_perf::packet::PacketBatch, solana_sdk::{hash::Hash, slot_history::Slot}, std::{ fs::{create_dir_all, remove_dir_all}, @@ -19,9 +19,7 @@ use { thiserror::Error, }; -pub type BankingPacketBatch = Arc>; pub type BankingPacketSender = TracedSender; -pub type BankingPacketReceiver = Receiver; pub type TracerThreadResult = Result<(), TraceError>; pub type TracerThread = Option>; pub type DirByteLimit = u64; diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 40b6304a7455cc..1f82c765741cb7 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,12 +1,13 @@ use { crate::{ - banking_trace::{BankingPacketBatch, BankingPacketSender}, + banking_trace::BankingPacketSender, consensus::vote_stake_tracker::VoteStakeTracker, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, sigverify, }, + agave_banking_stage_ingress_types::BankingPacketBatch, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender}, log::*, solana_gossip::{ diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0224844839f4ab..1dc461c3cf5aaf 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2,6 +2,7 @@ use { crate::{ + banking_stage::update_bank_forks_and_poh_recorder_for_new_tpu_bank, banking_trace::BankingTracer, cluster_info_vote_listener::{ DuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker, @@ -2217,11 +2218,12 @@ impl ReplayStage { // new()-ing of its child bank banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash()); - let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); - poh_recorder - .write() - .unwrap() - .set_bank(tpu_bank, track_transaction_indexes); + update_bank_forks_and_poh_recorder_for_new_tpu_bank( + bank_forks, + poh_recorder, + tpu_bank, + track_transaction_indexes, + ); true } else { error!("{} No next leader found", my_pubkey); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 61da8cf9ef70dd..4c638dcf19f711 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -9,9 +9,10 @@ pub use solana_perf::sigverify::{ }; use { crate::{ - banking_trace::{BankingPacketBatch, BankingPacketSender}, + banking_trace::BankingPacketSender, sigverify_stage::{SigVerifier, SigVerifyServiceError}, }, + agave_banking_stage_ingress_types::BankingPacketBatch, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, }; diff --git a/core/src/validator.rs b/core/src/validator.rs index 3168ac597424f9..e765e0106e8992 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -879,6 +879,28 @@ impl Validator { // (by both replay stage and banking stage) let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + let leader_schedule_cache = Arc::new(leader_schedule_cache); + let startup_verification_complete; + let (poh_recorder, entry_receiver, record_receiver) = { + let bank = &bank_forks.read().unwrap().working_bank(); + startup_verification_complete = Arc::clone(bank.get_startup_verification_complete()); + PohRecorder::new_with_clear_signal( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + None, + bank.ticks_per_slot(), + config.delay_leader_block_for_pending_fork, + blockstore.clone(), + blockstore.get_new_shred_signal(0), + &leader_schedule_cache, + &genesis_config.poh_config, + Some(poh_timing_point_sender), + exit.clone(), + ) + }; + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + match &config.block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { info!("no scheduler pool is installed for block verification..."); @@ -904,7 +926,6 @@ impl Validator { } } - let leader_schedule_cache = Arc::new(leader_schedule_cache); let entry_notification_sender = entry_notifier_service .as_ref() .map(|service| service.sender()); @@ -979,27 +1000,6 @@ impl Validator { let max_slots = Arc::new(MaxSlots::default()); - let startup_verification_complete; - let (poh_recorder, entry_receiver, record_receiver) = { - let bank = &bank_forks.read().unwrap().working_bank(); - startup_verification_complete = Arc::clone(bank.get_startup_verification_complete()); - PohRecorder::new_with_clear_signal( - bank.tick_height(), - bank.last_blockhash(), - bank.clone(), - None, - bank.ticks_per_slot(), - config.delay_leader_block_for_pending_fork, - blockstore.clone(), - blockstore.get_new_shred_signal(0), - &leader_schedule_cache, - &genesis_config.poh_config, - Some(poh_timing_point_sender), - exit.clone(), - ) - }; - let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let connection_cache = if use_quic { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f5c3afa24b5960..5e638153a072aa 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -63,6 +63,14 @@ dependencies = [ "zeroize", ] +[[package]] +name = "agave-banking-stage-ingress-types" +version = "2.2.0" +dependencies = [ + "crossbeam-channel", + "solana-perf", +] + [[package]] name = "agave-geyser-plugin-interface" version = "2.2.0" @@ -5449,9 +5457,11 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "ahash 0.8.11", "anyhow", "arrayvec", + "assert_matches", "base64 0.22.1", "bincode", "bs58", diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 56c0ccc16b7a88..7ec26911ca6477 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -63,6 +63,14 @@ dependencies = [ "zeroize", ] +[[package]] +name = "agave-banking-stage-ingress-types" +version = "2.2.0" +dependencies = [ + "crossbeam-channel", + "solana-perf", +] + [[package]] name = "agave-geyser-plugin-interface" version = "2.2.0" @@ -5300,9 +5308,11 @@ dependencies = [ name = "solana-core" version = "2.2.0" dependencies = [ + "agave-banking-stage-ingress-types", "ahash 0.8.11", "anyhow", "arrayvec", + "assert_matches", "base64 0.22.1", "bincode", "bs58",