From 5a4a92864df91dca399713bed0a20f09c406de9f Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 13 Jan 2025 22:43:06 +0900 Subject: [PATCH] Clean up block producing unified scheduler setup --- banking-bench/src/main.rs | 32 +++++++------ core/src/banking_simulation.rs | 18 ++++++-- core/src/banking_stage.rs | 6 +++ core/src/banking_stage/unified_scheduler.rs | 51 +++++++++++++++------ core/src/banking_trace.rs | 21 +++++---- core/src/validator.rs | 50 +++++--------------- 6 files changed, 97 insertions(+), 81 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 946dc118c6ba85..33ae65bdfdf26a 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -9,7 +9,10 @@ use { rayon::prelude::*, solana_client::connection_cache::ConnectionCache, solana_core::{ - banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage}, + banking_stage::{ + unified_scheduler::ensure_banking_stage_setup, + update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, + }, banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, validator::BlockProductionMethod, }, @@ -445,7 +448,13 @@ fn main() { ))) .unwrap(); let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let scheduler_pool = if matches!( + let cluster_info = { + let keypair = Arc::new(Keypair::new()); + let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); + ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified) + }; + let cluster_info = Arc::new(cluster_info); + let banking_tracer_channels = if matches!( block_production_method, BlockProductionMethod::UnifiedScheduler ) { @@ -458,13 +467,12 @@ fn main() { prioritization_fee_cache.clone(), poh_recorder.read().unwrap().new_recorder(), ); - bank_forks - .write() - .unwrap() - .install_scheduler_pool(pool.clone()); - Some(pool) + let channels = banking_tracer.create_channels_for_scheduler_pool(&pool); + ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder); + bank_forks.write().unwrap().install_scheduler_pool(pool); + channels } else { - None + banking_tracer.create_channels(false) }; let Channels { non_vote_sender, @@ -473,13 +481,7 @@ fn main() { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels_for_scheduler_pool(scheduler_pool.as_ref()); - let cluster_info = { - let keypair = Arc::new(Keypair::new()); - let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified) - }; - let cluster_info = Arc::new(cluster_info); + } = banking_tracer_channels; let tpu_disable_quic = matches.is_present("tpu_disable_quic"); let connection_cache = if tpu_disable_quic { ConnectionCache::with_udp( diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index abffb4c303c563..fbe584be00ec5b 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -2,6 +2,7 @@ use { crate::{ banking_stage::{ + unified_scheduler::ensure_banking_stage_setup, update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo, }, banking_trace::{ @@ -770,6 +771,17 @@ impl BankingSimulator { BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, ); + // Create a partially-dummy ClusterInfo for the banking stage. + let cluster_info = Arc::new(DummyClusterInfo { + id: simulated_leader.into(), + }); + let banking_tracer_channels = if let Some(pool) = unified_scheduler_pool { + let channels = retracer.create_channels_for_scheduler_pool(&pool); + ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder); + channels + } else { + retracer.create_channels(false) + }; let Channels { non_vote_sender, non_vote_receiver, @@ -777,7 +789,7 @@ impl BankingSimulator { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = retracer.create_channels_for_scheduler_pool(unified_scheduler_pool.as_ref()); + } = banking_tracer_channels; let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim")); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); @@ -812,10 +824,6 @@ impl BankingSimulator { ); info!("Start banking stage!..."); - // Create a partially-dummy ClusterInfo for the banking stage. - let cluster_info = Arc::new(DummyClusterInfo { - id: simulated_leader.into(), - }); let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64)); let banking_stage = BankingStage::new_num_threads( block_production_method.clone(), diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48f60db869955f..baff7fc834db73 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -81,6 +81,12 @@ mod packet_receiver; mod read_write_account_set; mod scheduler_messages; mod transaction_scheduler; + +// proc_macro_hygiene needs to be stabilzied to use qualifier_attr... +// error[E0658]: non-inline modules in proc macro input are unstable +#[cfg(not(feature = "dev-context-only-utils"))] +pub(crate) mod unified_scheduler; +#[cfg(feature = "dev-context-only-utils")] pub mod unified_scheduler; // Fixed thread size seems to be fastest on GCP setup diff --git a/core/src/banking_stage/unified_scheduler.rs b/core/src/banking_stage/unified_scheduler.rs index b4f639a24e39dc..f4ecdcdd38ff21 100644 --- a/core/src/banking_stage/unified_scheduler.rs +++ b/core/src/banking_stage/unified_scheduler.rs @@ -1,24 +1,19 @@ +#[cfg(feature = "dev-context-only-utils")] +use qualifier_attr::qualifiers; use { super::decision_maker::{BufferedPacketsDecision, DecisionMaker}, - crate::banking_stage::packet_deserializer::PacketDeserializer, - agave_banking_stage_ingress_types::BankingPacketReceiver, + crate::{ + banking_stage::{packet_deserializer::PacketDeserializer, BankingStage, LikeClusterInfo}, + banking_trace::Channels, + }, + solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, - solana_unified_scheduler_pool::{BankingStageAdapter, BatchConverterCreator}, + solana_unified_scheduler_pool::{ + BankingStageAdapter, BatchConverterCreator, DefaultSchedulerPool, + }, std::sync::{Arc, RwLock}, }; -pub(crate) fn unified_receiver( - non_vote_receiver: BankingPacketReceiver, - tpu_vote_receiver: BankingPacketReceiver, - gossip_vote_receiver: BankingPacketReceiver, -) -> BankingPacketReceiver { - assert!(non_vote_receiver.same_channel(&tpu_vote_receiver)); - assert!(non_vote_receiver.same_channel(&gossip_vote_receiver)); - drop((tpu_vote_receiver, gossip_vote_receiver)); - - non_vote_receiver -} - pub(crate) fn batch_converter_creator( decision_maker: DecisionMaker, bank_forks: Arc>, @@ -58,3 +53,29 @@ pub(crate) fn batch_converter_creator( }) }) } + +#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] +pub(crate) fn ensure_banking_stage_setup( + pool: &DefaultSchedulerPool, + bank_forks: &Arc>, + channels: &Channels, + cluster_info: &impl LikeClusterInfo, + poh_recorder: &Arc>, +) { + if !pool.block_production_supported() { + return; + } + + let unified_receiver = channels.unified_receiver().clone(); + let block_producing_scheduler_handler_threads = BankingStage::num_threads() as usize; + let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let banking_stage_monitor = Box::new(decision_maker.clone()); + let converter = batch_converter_creator(decision_maker, bank_forks.clone()); + + pool.register_banking_stage( + unified_receiver, + block_producing_scheduler_handler_threads, + banking_stage_monitor, + converter, + ); +} diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index ee47d5827cc972..27602b57732206 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -185,6 +185,16 @@ pub struct Channels { pub gossip_vote_receiver: BankingPacketReceiver, } +impl Channels { + pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver { + assert!(self.non_vote_receiver.same_channel(&self.tpu_vote_receiver)); + assert!(self + .non_vote_receiver + .same_channel(&self.gossip_vote_receiver)); + &self.non_vote_receiver + } +} + impl BankingTracer { pub fn new( maybe_config: Option<(&PathBuf, Arc, DirByteLimit)>, @@ -264,14 +274,9 @@ impl BankingTracer { } } - pub fn create_channels_for_scheduler_pool( - &self, - pool: Option<&Arc>, - ) -> Channels { - self.create_channels( - pool.map(|pool| pool.block_production_supported()) - .unwrap_or_default(), - ) + pub fn create_channels_for_scheduler_pool(&self, pool: &DefaultSchedulerPool) -> Channels { + let should_unify = pool.block_production_supported(); + self.create_channels(should_unify) } fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) { diff --git a/core/src/validator.rs b/core/src/validator.rs index b4e012fce85e32..f57e5254a32927 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,6 +5,7 @@ use { crate::{ accounts_hash_verifier::AccountsHashVerifier, admin_rpc_post_init::AdminRpcRequestMetadataPostInit, + banking_stage::unified_scheduler::ensure_banking_stage_setup, banking_trace::{self, BankingTracer, TraceError}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, @@ -943,31 +944,7 @@ impl Validator { ) { methods @ (BlockVerificationMethod::UnifiedScheduler, _) | methods @ (_, BlockProductionMethod::UnifiedScheduler) => { - let banking_tracer_channels = banking_tracer.create_channels(true); - use crate::{ - banking_stage::{ - decision_maker::DecisionMaker, unified_scheduler, BankingStage, - }, - banking_trace::Channels, - }; - - let Channels { - non_vote_sender: _, - non_vote_receiver, - tpu_vote_sender: _, - tpu_vote_receiver, - gossip_vote_sender: _, - gossip_vote_receiver, - } = &banking_tracer_channels; - let unified_receiver = unified_scheduler::unified_receiver( - non_vote_receiver.clone(), - tpu_vote_receiver.clone(), - gossip_vote_receiver.clone(), - ); - let block_producing_scheduler_handler_threads = - BankingStage::num_threads() as usize; - - let pool = DefaultSchedulerPool::new( + let scheduler_pool = DefaultSchedulerPool::new( supported_scheduling_mode(methods), config.unified_scheduler_handler_threads, config.runtime_config.log_messages_bytes_limit, @@ -976,23 +953,20 @@ impl Validator { prioritization_fee_cache.clone(), poh_recorder.read().unwrap().new_recorder(), ); - let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); - let banking_stage_monitor = Box::new(decision_maker.clone()); - let converter = - unified_scheduler::batch_converter_creator(decision_maker, bank_forks.clone()); - pool.register_banking_stage( - unified_receiver, - block_producing_scheduler_handler_threads, - banking_stage_monitor, - converter, - ); + let channels = banking_tracer.create_channels_for_scheduler_pool(&scheduler_pool); + ensure_banking_stage_setup( + &scheduler_pool, + &bank_forks, + &channels, + &cluster_info, + &poh_recorder, + ); bank_forks .write() .unwrap() - .install_scheduler_pool(pool.clone()); - // this actually won't be used and but return this for type safety - banking_tracer_channels + .install_scheduler_pool(scheduler_pool); + channels } _ => { info!("no scheduler pool is installed for block verification/production...");