From 503604784c579170c25c95fa74c8361ac150cc70 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 14 Feb 2024 14:57:00 +0900 Subject: [PATCH 1/4] Add --unified-scheduler-handler-threads --- Cargo.lock | 1 + core/src/validator.rs | 3 + ledger-tool/src/ledger_utils.rs | 3 + ledger-tool/src/main.rs | 13 +++- local-cluster/src/validator_configs.rs | 1 + programs/sbf/Cargo.lock | 1 + unified-scheduler-pool/src/lib.rs | 94 ++++++++++++++++++++------ validator/Cargo.toml | 1 + validator/src/cli.rs | 10 +++ validator/src/main.rs | 2 + 10 files changed, 108 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17f821a640e500..6d1ffe6c46579d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7547,6 +7547,7 @@ dependencies = [ "solana-svm", "solana-test-validator", "solana-tpu-client", + "solana-unified-scheduler-pool", "solana-version", "solana-vote-program", "spl-token-2022", diff --git a/core/src/validator.rs b/core/src/validator.rs index f1432d67f397dc..e008bffff7602e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -262,6 +262,7 @@ pub struct ValidatorConfig { pub generator_config: Option, pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, + pub unified_scheduler_handler_threads: Option, } impl Default for ValidatorConfig { @@ -329,6 +330,7 @@ impl Default for ValidatorConfig { generator_config: None, use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, + unified_scheduler_handler_threads: None, } } } @@ -816,6 +818,7 @@ impl Validator { } BlockVerificationMethod::UnifiedScheduler => { let scheduler_pool = DefaultSchedulerPool::new_dyn( + config.unified_scheduler_handler_threads, config.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), Some(replay_vote_sender.clone()), diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 2663a205fb5f37..e170fae640e09d 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -296,6 +296,8 @@ pub fn load_and_process_ledger( info!("no scheduler pool is installed for block verification..."); } BlockVerificationMethod::UnifiedScheduler => { + let unified_scheduler_handler_threads = + value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok(); let no_transaction_status_sender = None; let no_replay_vote_sender = None; let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); @@ -303,6 +305,7 @@ pub fn load_and_process_ledger( .write() .unwrap() .install_scheduler_pool(DefaultSchedulerPool::new_dyn( + unified_scheduler_handler_threads, process_options.runtime_config.log_messages_bytes_limit, no_transaction_status_sender, no_replay_vote_sender, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 500a64173a25c4..c1859b8131688f 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -28,7 +28,7 @@ use { input_parsers::{cluster_type_of, pubkey_of, pubkeys_of}, input_validators::{ is_parsable, is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage, - validate_maximum_full_snapshot_archives_to_retain, + is_within_range, validate_maximum_full_snapshot_archives_to_retain, validate_maximum_incremental_snapshot_archives_to_retain, }, }, @@ -72,6 +72,7 @@ use { transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader}, }, solana_stake_program::stake_state::{self, PointValue}, + solana_unified_scheduler_pool::DefaultSchedulerPool, solana_vote_program::{ self, vote_state::{self, VoteState}, @@ -847,6 +848,16 @@ fn main() { .hidden(hidden_unless_forced()) .help(BlockVerificationMethod::cli_message()), ) + .arg( + Arg::with_name("unified_scheduler_handler_threads") + .long("unified-scheduler-handler-threads") + .value_name("THREADS") + .takes_value(true) + .validator(|s| is_within_range(s, 1..)) + .global(true) + .hidden(hidden_unless_forced()) + .help(DefaultSchedulerPool::cli_message()), + ) .arg( Arg::with_name("output_format") .long("output") diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 537dd6495f32e1..33883bb02c1d77 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { generator_config: config.generator_config.clone(), use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), + unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 76f44ab8c97949..4465a041313c12 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6545,6 +6545,7 @@ dependencies = [ "solana-svm", "solana-test-validator", "solana-tpu-client", + "solana-unified-scheduler-pool", "solana-version", "solana-vote-program", "symlink", diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index deae3697807705..09ded82ee88e7d 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -34,7 +34,7 @@ use { marker::PhantomData, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, - Arc, Mutex, Weak, + Arc, Mutex, OnceLock, Weak, }, thread::{self, JoinHandle}, }, @@ -48,6 +48,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, + handler_count: usize, handler_context: HandlerContext, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to // Arc from &Self, because SchedulerPool is used as in the form of Arc @@ -83,13 +84,20 @@ where // Some internal impl and test code want an actual concrete type, NOT the // `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. fn new( + handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, ) -> Arc { + let handler_count = handler_count.unwrap_or(1); + // we're hard-coding the number of handler thread to 1, meaning this impl is currently + // single-threaded still. + assert_eq!(handler_count, 1); // replace this with assert!(handler_count >= 1) later + Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), + handler_count, handler_context: HandlerContext { log_messages_bytes_limit, transaction_status_sender, @@ -105,12 +113,14 @@ where // This apparently-meaningless wrapper is handy, because some callers explicitly want // `dyn InstalledSchedulerPool` to be returned for type inference convenience. pub fn new_dyn( + handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, ) -> InstalledSchedulerPoolArc { Self::new( + handler_count, log_messages_bytes_limit, transaction_status_sender, replay_vote_sender, @@ -145,6 +155,37 @@ where S::spawn(self.self_arc(), context) } } + + pub fn default_handler_count() -> usize { + Self::calculate_default_handler_count( + thread::available_parallelism() + .ok() + .map(|non_zero| non_zero.get()), + ) + } + + pub fn calculate_default_handler_count(detected_cpu_core_count: Option) -> usize { + // Divide by 4 just not to consume all available CPUs just with handler threads, sparing for + // other active forks and other subsystems. + // Also, if available_parallelism fails (which should be very rare), use 4 threads, + // as a relatively conservatism assumption of modern multi-core systems ranging from + // engineers' laptops to production servers. + detected_cpu_core_count + .map(|core_count| (core_count / 4).max(1)) + .unwrap_or(4) + } + + pub fn cli_message() -> &'static str { + static MESSAGE: OnceLock = OnceLock::new(); + + MESSAGE.get_or_init(|| { + format!( + "Change the number of the unified scheduler's transaction execution threads \ + dedicated to each block, otherwise calculated as cpu_cores/4 [default: {}]", + Self::default_handler_count() + ) + }) + } } impl InstalledSchedulerPool for SchedulerPool @@ -372,7 +413,6 @@ pub struct PooledSchedulerInner, TH: TaskHandler> { struct ThreadManager, TH: TaskHandler> { scheduler_id: SchedulerId, pool: Arc>, - handler_count: usize, new_task_sender: Sender, new_task_receiver: Receiver, session_result_sender: Sender>, @@ -384,13 +424,9 @@ struct ThreadManager, TH: TaskHandler> { impl PooledScheduler { fn do_spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { - // we're hard-coding the number of handler thread to 1, meaning this impl is currently - // single-threaded still. - let handler_count = 1; - Self::from_inner( PooledSchedulerInner:: { - thread_manager: ThreadManager::new(pool, handler_count), + thread_manager: ThreadManager::new(pool), }, initial_context, ) @@ -398,14 +434,14 @@ impl PooledScheduler { } impl, TH: TaskHandler> ThreadManager { - fn new(pool: Arc>, handler_count: usize) -> Self { + fn new(pool: Arc>) -> Self { let (new_task_sender, new_task_receiver) = unbounded(); let (session_result_sender, session_result_receiver) = unbounded(); + let handler_count = pool.handler_count; Self { scheduler_id: pool.new_scheduler_id(), pool, - handler_count, new_task_sender, new_task_receiver, session_result_sender, @@ -477,7 +513,7 @@ impl, TH: TaskHandler> ThreadManager { // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. let scheduler_main_loop = || { - let handler_count = self.handler_count; + let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); let new_task_receiver = self.new_task_receiver.clone(); @@ -613,7 +649,7 @@ impl, TH: TaskHandler> ThreadManager { .unwrap(), ); - self.handler_threads = (0..self.handler_count) + self.handler_threads = (0..self.pool.handler_count) .map({ |thx| { thread::Builder::new() @@ -760,7 +796,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); // this indirectly proves that there should be circular link because there's only one Arc // at this moment now @@ -775,7 +811,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = SchedulingContext::new(bank); let scheduler = pool.take_scheduler(context); @@ -789,7 +825,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); @@ -817,7 +854,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::new(bank); let mut scheduler = pool.do_take_scheduler(context.clone()); @@ -835,7 +873,8 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = DefaultSchedulerPool::new(None, None, None, ignored_prioritization_fee_cache); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let old_bank = &Arc::new(Bank::default_for_tests()); let new_bank = &Arc::new(Bank::default_for_tests()); assert!(!Arc::ptr_eq(old_bank, new_bank)); @@ -861,7 +900,7 @@ mod tests { let mut bank_forks = bank_forks.write().unwrap(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); bank_forks.install_scheduler_pool(pool); } @@ -875,7 +914,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); @@ -928,7 +967,7 @@ mod tests { let bank = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let context = SchedulingContext::new(bank.clone()); assert_eq!(bank.transaction_count(), 0); @@ -953,7 +992,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - DefaultSchedulerPool::new_dyn(None, None, None, ignored_prioritization_fee_cache); + DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let context = SchedulingContext::new(bank.clone()); let mut scheduler = pool.take_scheduler(context); @@ -1159,6 +1198,7 @@ mod tests { None, None, None, + None, ignored_prioritization_fee_cache, ); let scheduler = pool.take_scheduler(context); @@ -1193,4 +1233,18 @@ mod tests { fn test_scheduler_schedule_execution_recent_blockhash_edge_case_without_race() { do_test_scheduler_schedule_execution_recent_blockhash_edge_case::(); } + + #[test] + fn test_default_handler_count() { + for (detected, expected) in [(32, 8), (4, 1), (2, 1)] { + assert_eq!( + DefaultSchedulerPool::calculate_default_handler_count(Some(detected)), + expected + ); + } + assert_eq!( + DefaultSchedulerPool::calculate_default_handler_count(None), + 4 + ); + } } diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 4028221cd7ce68..5cc76a810116b3 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -61,6 +61,7 @@ solana-streamer = { workspace = true } solana-svm = { workspace = true } solana-test-validator = { workspace = true } solana-tpu-client = { workspace = true } +solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } solana-vote-program = { workspace = true } symlink = { workspace = true } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 958cdc4ec947de..06df62d7c74038 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -47,6 +47,7 @@ use { self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{path::PathBuf, str::FromStr}, }; @@ -1389,6 +1390,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .possible_values(BlockProductionMethod::cli_names()) .help(BlockProductionMethod::cli_message()) ) + .arg( + Arg::with_name("unified_scheduler_handler_threads") + .long("unified-scheduler-handler-threads") + .hidden(hidden_unless_forced()) + .value_name("THREADS") + .takes_value(true) + .validator(|s| is_within_range(s, 1..)) + .help(DefaultSchedulerPool::cli_message()), + ) .arg( Arg::with_name("wen_restart") .long("wen-restart") diff --git a/validator/src/main.rs b/validator/src/main.rs index 56b17e5d29c32e..ffd4850204bba5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1652,6 +1652,8 @@ pub fn main() { BlockProductionMethod ) .unwrap_or_default(); + validator_config.unified_scheduler_handler_threads = + value_t!(matches, "unified_scheduler_handler_threads", usize).ok(); validator_config.ledger_column_options = LedgerColumnOptions { compression_type: match matches.value_of("rocksdb_ledger_compression") { From b7ea2e0479b6299301ab0183ad63553cb69ab97f Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 14 Feb 2024 21:52:55 +0900 Subject: [PATCH 2/4] Adjust value name --- ledger-tool/src/main.rs | 2 +- validator/src/cli.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index c1859b8131688f..175ea5d26e8730 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -851,7 +851,7 @@ fn main() { .arg( Arg::with_name("unified_scheduler_handler_threads") .long("unified-scheduler-handler-threads") - .value_name("THREADS") + .value_name("COUNT") .takes_value(true) .validator(|s| is_within_range(s, 1..)) .global(true) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 06df62d7c74038..b8b434cd8f2200 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1394,7 +1394,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { Arg::with_name("unified_scheduler_handler_threads") .long("unified-scheduler-handler-threads") .hidden(hidden_unless_forced()) - .value_name("THREADS") + .value_name("COUNT") .takes_value(true) .validator(|s| is_within_range(s, 1..)) .help(DefaultSchedulerPool::cli_message()), From 61dce275557eb47f64eb0210b6faee01ed42ae46 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 14 Feb 2024 22:34:06 +0900 Subject: [PATCH 3/4] Warn if the flag was ignored --- core/src/validator.rs | 6 ++++++ ledger-tool/src/ledger_utils.rs | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index e008bffff7602e..243c3abf1fc526 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -815,6 +815,12 @@ impl Validator { match &config.block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { info!("no scheduler pool is installed for block verification..."); + if let Some(count) = config.unified_scheduler_handler_threads { + warn!( + "--unified-scheduler-handler-threads={count} is ignored because unified \ + scheduler is disabled" + ); + } } BlockVerificationMethod::UnifiedScheduler => { let scheduler_pool = DefaultSchedulerPool::new_dyn( diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index e170fae640e09d..12e3d5cba9a7ca 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -291,13 +291,19 @@ pub fn load_and_process_ledger( "Using: block-verification-method: {}", block_verification_method, ); + let unified_scheduler_handler_threads = + value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok(); match block_verification_method { BlockVerificationMethod::BlockstoreProcessor => { info!("no scheduler pool is installed for block verification..."); + if let Some(count) = unified_scheduler_handler_threads { + warn!( + "--unified-scheduler-handler-threads={count} is ignored because unified \ + scheduler is disabled" + ); + } } BlockVerificationMethod::UnifiedScheduler => { - let unified_scheduler_handler_threads = - value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok(); let no_transaction_status_sender = None; let no_replay_vote_sender = None; let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); From 857e952626aacd369ba1d69ca50b451fc26cd104 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 14 Feb 2024 22:56:04 +0900 Subject: [PATCH 4/4] Tweak message a bit --- core/src/validator.rs | 2 +- ledger-tool/src/ledger_utils.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 243c3abf1fc526..d15460a890568e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -818,7 +818,7 @@ impl Validator { if let Some(count) = config.unified_scheduler_handler_threads { warn!( "--unified-scheduler-handler-threads={count} is ignored because unified \ - scheduler is disabled" + scheduler isn't enabled" ); } } diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 12e3d5cba9a7ca..116b21527ae4d8 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -299,7 +299,7 @@ pub fn load_and_process_ledger( if let Some(count) = unified_scheduler_handler_threads { warn!( "--unified-scheduler-handler-threads={count} is ignored because unified \ - scheduler is disabled" + scheduler isn't enabled" ); } }