From 0ba181cb28d0db83b5c939541d7341590b27093a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 5 Jan 2024 23:27:38 +0900 Subject: [PATCH 01/14] Introduce primitive threading in unified scheduler --- Cargo.lock | 5 + programs/sbf/Cargo.lock | 6 + unified-scheduler-logic/Cargo.toml | 3 + unified-scheduler-logic/src/lib.rs | 21 +- unified-scheduler-pool/Cargo.toml | 3 + unified-scheduler-pool/src/lib.rs | 508 +++++++++++++++++++++++++---- 6 files changed, 488 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46a3d4c6c97b83..83f7369c50c982 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7544,12 +7544,17 @@ dependencies = [ [[package]] name = "solana-unified-scheduler-logic" version = "1.18.0" +dependencies = [ + "solana-sdk", +] [[package]] name = "solana-unified-scheduler-pool" version = "1.18.0" dependencies = [ "assert_matches", + "crossbeam-channel", + "log", "solana-ledger", "solana-logger", "solana-program-runtime", diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 4072b6c73f9dc9..1d5c58a8959054 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6544,11 +6544,17 @@ dependencies = [ [[package]] name = "solana-unified-scheduler-logic" version = "1.18.0" +dependencies = [ + "solana-sdk", +] [[package]] name = "solana-unified-scheduler-pool" version = "1.18.0" dependencies = [ + "assert_matches", + "crossbeam-channel", + "log", "solana-ledger", "solana-program-runtime", "solana-runtime", diff --git a/unified-scheduler-logic/Cargo.toml b/unified-scheduler-logic/Cargo.toml index 764bb0192f5632..b2e80c79c7a08f 100644 --- a/unified-scheduler-logic/Cargo.toml +++ b/unified-scheduler-logic/Cargo.toml @@ -8,3 +8,6 @@ repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } edition = { workspace = true } + +[dependencies] +solana-sdk = { workspace = true } diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 73a5a82f6d3a7b..997c6c1745a7c9 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -1 +1,20 @@ -// This file will be populated with actual implementation later. +use solana_sdk::transaction::SanitizedTransaction; + +pub struct Task { + transaction: SanitizedTransaction, + index: usize, +} + +impl Task { + pub fn create_task(transaction: SanitizedTransaction, index: usize) -> Self { + Task { transaction, index } + } + + pub fn task_index(&self) -> usize { + self.index + } + + pub fn transaction(&self) -> &SanitizedTransaction { + &self.transaction + } +} diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 213bc5bb86c0ef..c8848e098827b2 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -10,6 +10,9 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +assert_matches = { workspace = true } +crossbeam-channel = { workspace = true } +log = { workspace = true } solana-ledger = { workspace = true } solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 10cb5309e5e01d..8ec460d087f2e4 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -9,6 +9,9 @@ //! `solana-ledger`'s helper function called `execute_batch()`. use { + assert_matches::assert_matches, + crossbeam_channel::{select, unbounded, Receiver, Sender}, + log::*, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, @@ -23,6 +26,7 @@ use { prioritization_fee_cache::PrioritizationFeeCache, }, solana_sdk::transaction::{Result, SanitizedTransaction}, + solana_unified_scheduler_logic::Task, solana_vote::vote_sender_types::ReplayVoteSender, std::{ fmt::Debug, @@ -31,6 +35,7 @@ use { atomic::{AtomicU64, Ordering::Relaxed}, Arc, Mutex, Weak, }, + thread::{self, JoinHandle}, }, }; @@ -194,6 +199,91 @@ impl TaskHandler for DefaultTaskHandler { } } +pub struct ExecutedTask { + task: Task, + result_with_timings: ResultWithTimings, +} + +impl ExecutedTask { + fn new_boxed(task: Task) -> Box { + Box::new(Self { + task, + result_with_timings: initialized_result_with_timings(), + }) + } +} + +// A very tiny generic message type to signal about opening and closing of subchannels, which is +// logically segmented series of Payloads (P1) over a signle continuous time-span, potentially +// carrying some subchannel metadata (P2) upon opening a new subchannel. +// Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels +// (i.e. the consumer side needs to be single threaded). For the multiple consumer cases, +// ChainedChannel can be used instead. +enum SubchanneledPayload { + Payload(P1), + OpenSubchannel(P2), + CloseSubchannel, +} + +type NewTaskPayload = SubchanneledPayload; +type ExecutedTaskPayload = SubchanneledPayload, ()>; + +// A tiny generic message type to synchronize multiple threads everytime some contextual data needs +// to be switched (ie. SchedulingContext), just using a single communication channel. +// +// Usually, there's no way to prevent one of those threads from mixing current and next contexts +// while processing messages with a multiple-consumer channel. A condvar or other +// out-of-bound mechanism is needed to notify about switching of contextual data. That's because +// there's no way to block those threads reliably on such an switching event just with a channel. +// +// However, if the number of consumer can be determined, this can be accomplished just over a +// single channel, which even carries an in-bound control meta-message with the contexts. The trick +// is that identical meta-messages as many as the number of threads are sent over the channel, +// along with new channel receivers to be used (hence the name of _chained_). Then, the receiving +// thread drops the old channel and is now blocked on receiving from the new channel. In this way, +// this switching can happen exactly once for each thread. +// +// Overall, this greatly simplifies the code, reduces CAS/syscall overhead per messaging to the +// minimum at the cost of a single heap allocation per switching for the sake of Box-ing the Self +// type to avoid infinite mem::size_of() due to the recursive type structure. Needless to say, such +// an allocation can be amortized to be negligible. +enum ChainedChannel { + Payload(P1), + PayloadAndChannel(Box>), +} + +trait WithChannelAndPayload: Send + Sync { + fn payload_and_channel(self: Box) -> PayloadAndChannelInner; +} + +type PayloadAndChannelInner = (P2, Receiver>); + +struct PayloadAndChannelWrapper(PayloadAndChannelInner); + +impl WithChannelAndPayload for PayloadAndChannelWrapper +where + P1: Send + Sync, + P2: Send + Sync, +{ + fn payload_and_channel(self: Box) -> PayloadAndChannelInner { + self.0 + } +} + +impl ChainedChannel +where + P1: Send + Sync + 'static, + P2: Send + Sync + 'static, +{ + fn chain_to_new_channel(payload: P2, receiver: Receiver) -> Self { + Self::PayloadAndChannel(Box::new(PayloadAndChannelWrapper((payload, receiver)))) + } +} + +fn initialized_result_with_timings() -> ResultWithTimings { + (Ok(()), ExecuteTimings::default()) +} + // Currently, simplest possible implementation (i.e. single-threaded) // this will be replaced with more proper implementation... // not usable at all, especially for mainnet-beta @@ -201,27 +291,330 @@ impl TaskHandler for DefaultTaskHandler { pub struct PooledScheduler { inner: PooledSchedulerInner, context: SchedulingContext, - result_with_timings: Mutex, } #[derive(Debug)] pub struct PooledSchedulerInner, TH: TaskHandler> { - id: SchedulerId, + thread_manager: ThreadManager, +} + +// This type manages the OS threads for scheduling and executing transactions. The term +// `session` is consistently used to mean a group of Tasks scoped under a single SchedulingContext. +// This is equivalent to a particular bank for block verification. However, new terms is introduced +// here to mean some continuous time over multiple continuous banks/slots for the block production, +// which is planned to be implemented in the future. +#[derive(Debug)] +struct ThreadManager, TH: TaskHandler> { + scheduler_id: SchedulerId, pool: Arc>, + handler_count: usize, + new_task_sender: Sender, + new_task_receiver: Receiver, + session_result_sender: Sender>, + session_result_receiver: Receiver>, + session_result_with_timings: Option, + scheduler_thread: Option>, + handler_threads: Vec>, + accumulator_thread: Option>, } impl PooledScheduler { fn do_spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { + // we're hard-coding the number of handler thread to 1, meaning this impl is currently + // single-threaded still. + let handler_count = 1; + Self::from_inner( PooledSchedulerInner:: { - id: pool.new_scheduler_id(), - pool, + thread_manager: ThreadManager::new(pool, handler_count), }, initial_context, ) } } +impl, TH: TaskHandler> ThreadManager { + fn new(pool: Arc>, handler_count: usize) -> Self { + let (new_task_sender, new_task_receiver) = unbounded(); + let (session_result_sender, session_result_receiver) = unbounded(); + + Self { + scheduler_id: pool.new_scheduler_id(), + pool, + handler_count, + new_task_sender, + new_task_receiver, + session_result_sender, + session_result_receiver, + session_result_with_timings: None, + scheduler_thread: None, + handler_threads: Vec::with_capacity(handler_count), + accumulator_thread: None, + } + } + + fn execute_task_with_handler( + bank: &Arc, + executed_task: &mut Box, + handler_context: &HandlerContext, + ) { + debug!("handling task at {:?}", thread::current()); + TH::handle( + &mut executed_task.result_with_timings.0, + &mut executed_task.result_with_timings.1, + bank, + executed_task.task.transaction(), + executed_task.task.task_index(), + handler_context, + ); + } + + fn propagate_context_to_handler_threads( + runnable_task_sender: &mut Sender>, + context: SchedulingContext, + handler_count: usize, + ) { + let (next_sessioned_task_sender, runnable_task_receiver) = unbounded(); + for _ in 0..handler_count { + runnable_task_sender + .send(ChainedChannel::chain_to_new_channel( + context.clone(), + runnable_task_receiver.clone(), + )) + .unwrap(); + } + *runnable_task_sender = next_sessioned_task_sender; + } + + fn take_session_result_with_timings(&mut self) -> ResultWithTimings { + self.session_result_with_timings.take().unwrap() + } + + fn put_session_result_with_timings(&mut self, result_with_timings: ResultWithTimings) { + assert_matches!( + self.session_result_with_timings + .replace(result_with_timings), + None + ); + } + + fn start_threads(&mut self, context: &SchedulingContext) { + let (runnable_task_sender, runnable_task_receiver) = + unbounded::>(); + let (executed_task_sender, executed_task_receiver) = unbounded::(); + let (finished_task_sender, finished_task_receiver) = unbounded::>(); + let (accumulated_result_sender, accumulated_result_receiver) = + unbounded::>(); + + let mut result_with_timings = self.session_result_with_timings.take(); + + // High-level flow of new tasks: + // 1. the replay stage thread send a new task. + // 2. the scheduler thread accepts the task. + // 3. the scheduler thread dispatches the task after proper locking. + // 4. the handler thread processes the dispatched task. + // 5. the handler thread reply back to the scheduler thread as an executed task. + // 6. the scheduler thread post-processes the executed task. + // 7. the scheduler thread send the executed task to the accumulator thread. + // 8. the accumulator thread examines the executed task's result and accumulate its timing, + // finally dropping the transaction inside the executed task. + let scheduler_main_loop = || { + let handler_count = self.handler_count; + let session_result_sender = self.session_result_sender.clone(); + let new_task_receiver = self.new_task_receiver.clone(); + let mut runnable_task_sender = runnable_task_sender.clone(); + + let mut session_ending = false; + let mut active_task_count: usize = 0; + move || loop { + let mut is_finished = false; + while !is_finished { + select! { + recv(finished_task_receiver) -> executed_task => { + let executed_task = executed_task.unwrap(); + + active_task_count = active_task_count.checked_sub(1).unwrap(); + executed_task_sender + .send(ExecutedTaskPayload::Payload(executed_task)) + .unwrap(); + }, + recv(new_task_receiver) -> message => { + match message.unwrap() { + NewTaskPayload::Payload(task) => { + assert!(!session_ending); + + // so, we're NOT scheduling at all here; rather, just execute + // tx straight off. the inter-tx locking deps aren't needed to + // be resolved in the case of single-threaded FIFO like this. + active_task_count = active_task_count.checked_add(1).unwrap(); + runnable_task_sender + .send(ChainedChannel::Payload(task)) + .unwrap(); + } + NewTaskPayload::OpenSubchannel(context) => { + // signal about new SchedulingContext to both handler and + // accumulator threads + Self::propagate_context_to_handler_threads( + &mut runnable_task_sender, + context, + handler_count + ); + executed_task_sender + .send(ExecutedTaskPayload::OpenSubchannel(())) + .unwrap(); + } + NewTaskPayload::CloseSubchannel => { + assert!(!session_ending); + session_ending = true; + } + } + }, + }; + + // a really simplistic termination condition, which only works under the + // assumption of single handler thread... + is_finished = session_ending && active_task_count == 0; + } + + if session_ending { + executed_task_sender + .send(ExecutedTaskPayload::CloseSubchannel) + .unwrap(); + session_result_sender + .send(Some( + accumulated_result_receiver + .recv() + .unwrap() + .unwrap_or_else(initialized_result_with_timings), + )) + .unwrap(); + session_ending = false; + } + } + }; + + let handler_main_loop = || { + let pool = self.pool.clone(); + let mut bank = context.bank().clone(); + let mut runnable_task_receiver = runnable_task_receiver.clone(); + let finished_task_sender = finished_task_sender.clone(); + + move || loop { + let (task, sender) = select! { + recv(runnable_task_receiver) -> message => { + match message.unwrap() { + ChainedChannel::Payload(task) => { + (task, &finished_task_sender) + } + ChainedChannel::PayloadAndChannel(new_channel) => { + let new_context; + (new_context, runnable_task_receiver) = new_channel.payload_and_channel(); + bank = new_context.bank().clone(); + continue; + } + } + }, + }; + let mut task = ExecutedTask::new_boxed(task); + Self::execute_task_with_handler(&bank, &mut task, &pool.handler_context); + sender.send(task).unwrap(); + } + }; + + // This thread is needed because .accumualte() and drop::() both are + // relatively heavy operations to be put inside the scheduler thread. + let accumulator_main_loop = || { + move || loop { + match executed_task_receiver.recv().unwrap() { + ExecutedTaskPayload::Payload(executed_task) => { + let result_with_timings = result_with_timings.as_mut().unwrap(); + match executed_task.result_with_timings.0 { + Ok(()) => {} + Err(error) => { + error!("error is detected while accumulating....: {error:?}"); + // Override errors intentionally for simplicity, not retaining the + // first error unlike the block verification in the + // blockstore_processor. This will be addressed with more + // full-fledged impl later. + result_with_timings.0 = Err(error); + } + } + result_with_timings + .1 + .accumulate(&executed_task.result_with_timings.1); + } + ExecutedTaskPayload::OpenSubchannel(()) => { + assert_matches!( + result_with_timings.replace(initialized_result_with_timings()), + None + ); + } + ExecutedTaskPayload::CloseSubchannel => { + accumulated_result_sender + .send(result_with_timings.take()) + .unwrap(); + } + } + } + }; + + self.scheduler_thread = Some( + thread::Builder::new() + .name("solScheduler".to_owned()) + .spawn(scheduler_main_loop()) + .unwrap(), + ); + + self.accumulator_thread = Some( + thread::Builder::new() + .name("solScAccmltr".to_owned()) + .spawn(accumulator_main_loop()) + .unwrap(), + ); + + self.handler_threads = (0..self.handler_count) + .map({ + |thx| { + thread::Builder::new() + .name(format!("solScHandler{:02}", thx)) + .spawn(handler_main_loop()) + .unwrap() + } + }) + .collect(); + } + + fn send_task(&self, task: Task) { + debug!("send_task()"); + self.new_task_sender + .send(NewTaskPayload::Payload(task)) + .unwrap() + } + + fn end_session(&mut self) { + if self.session_result_with_timings.is_some() { + debug!("end_session(): already result resides within thread manager.."); + return; + } + debug!("end_session(): will end session..."); + + self.new_task_sender + .send(NewTaskPayload::CloseSubchannel) + .unwrap(); + + if let Some(result_with_timings) = self.session_result_receiver.recv().unwrap() { + self.put_session_result_with_timings(result_with_timings); + } + } + + fn start_session(&mut self, context: &SchedulingContext) { + assert_matches!(self.session_result_with_timings, None); + self.new_task_sender + .send(NewTaskPayload::OpenSubchannel(context.clone())) + .unwrap(); + } +} + pub trait SpawnableScheduler: InstalledScheduler { type Inner: Debug + Send + Sync; @@ -237,29 +630,33 @@ pub trait SpawnableScheduler: InstalledScheduler { impl SpawnableScheduler for PooledScheduler { type Inner = PooledSchedulerInner; - fn into_inner(self) -> (ResultWithTimings, Self::Inner) { - ( - self.result_with_timings.into_inner().expect("not poisoned"), - self.inner, - ) + fn into_inner(mut self) -> (ResultWithTimings, Self::Inner) { + let result_with_timings = { + let manager = &mut self.inner.thread_manager; + manager.end_session(); + manager.take_session_result_with_timings() + }; + (result_with_timings, self.inner) } - fn from_inner(inner: Self::Inner, context: SchedulingContext) -> Self { - Self { - inner, - context, - result_with_timings: Mutex::new((Ok(()), ExecuteTimings::default())), - } + fn from_inner(mut inner: Self::Inner, context: SchedulingContext) -> Self { + inner.thread_manager.start_session(&context); + Self { inner, context } } fn spawn(pool: Arc>, initial_context: SchedulingContext) -> Self { - Self::do_spawn(pool, initial_context) + let mut scheduler = Self::do_spawn(pool, initial_context); + scheduler + .inner + .thread_manager + .start_threads(&scheduler.context); + scheduler } } impl InstalledScheduler for PooledScheduler { fn id(&self) -> SchedulerId { - self.inner.id + self.inner.thread_manager.scheduler_id } fn context(&self) -> &SchedulingContext { @@ -267,23 +664,8 @@ impl InstalledScheduler for PooledScheduler { } fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { - let (result, timings) = &mut *self.result_with_timings.lock().expect("not poisoned"); - if result.is_err() { - // just bail out early to short-circuit the processing altogether - return; - } - - // ... so, we're NOT scheduling at all here; rather, just execute tx straight off. the - // inter-tx locking deps aren't needed to be resolved in the case of single-threaded FIFO - // like this. - TH::handle( - result, - timings, - self.context().bank(), - transaction, - index, - &self.inner.pool.handler_context, - ); + let task = Task::create_task(transaction.clone(), index); + self.inner.thread_manager.send_task(task); } fn wait_for_termination( @@ -295,7 +677,7 @@ impl InstalledScheduler for PooledScheduler { } fn pause_for_recent_blockhash(&mut self) { - // not surprisingly, there's nothing to do for this min impl! + self.inner.thread_manager.end_session(); } } @@ -305,7 +687,7 @@ where TH: TaskHandler, { fn return_to_pool(self: Box) { - self.pool.clone().return_scheduler(*self) + self.thread_manager.pool.clone().return_scheduler(*self) } } @@ -544,7 +926,8 @@ mod tests { )); assert_eq!(bank.transaction_count(), 0); scheduler.schedule_execution(&(bad_tx, 0)); - scheduler.pause_for_recent_blockhash(); + // simulate the task-sending thread is stalled for some reason. + std::thread::sleep(std::time::Duration::from_secs(1)); assert_eq!(bank.transaction_count(), 0); let good_tx_after_bad_tx = @@ -563,7 +946,13 @@ mod tests { scheduler.schedule_execution(&(good_tx_after_bad_tx, 0)); scheduler.pause_for_recent_blockhash(); // transaction_count should remain same as scheduler should be bailing out. - assert_eq!(bank.transaction_count(), 0); + // That's because we're testing the serialized failing execution case in this test. + // However, currently threaded impl can't properly abort in this situtation.. + // so, 1 should be observed, intead of 0. + // Also note that bank.transaction_count() is generally racy because blockstore_processor + // and unified_scheduler both tend to process non-conflicting batches in parallel as normal + // operation. + assert_eq!(bank.transaction_count(), 1); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!( @@ -577,8 +966,10 @@ mod tests { #[derive(Debug)] struct AsyncScheduler( - PooledScheduler, + Mutex, Mutex>>, + SchedulingContext, + Arc>, ); impl AsyncScheduler { @@ -593,7 +984,7 @@ mod tests { } overall_timings.accumulate(&timings); } - *self.0.result_with_timings.lock().unwrap() = (overall_result, overall_timings); + *self.0.lock().unwrap() = (overall_result, overall_timings); } } @@ -601,17 +992,17 @@ mod tests { for AsyncScheduler { fn id(&self) -> SchedulerId { - self.0.id() + todo!(); } fn context(&self) -> &SchedulingContext { - self.0.context() + &self.2 } fn schedule_execution(&self, &(transaction, index): &(&SanitizedTransaction, usize)) { let transaction_and_index = (transaction.clone(), index); let context = self.context().clone(); - let pool = self.0.inner.pool.clone(); + let pool = self.3.clone(); self.1.lock().unwrap().push(std::thread::spawn(move || { // intentionally sleep to simulate race condition where register_recent_blockhash @@ -635,10 +1026,14 @@ mod tests { fn wait_for_termination( self: Box, - is_dropped: bool, + _is_dropped: bool, ) -> (ResultWithTimings, UninstalledSchedulerBox) { self.do_wait(); - Box::new(self.0).wait_for_termination(is_dropped) + let result_with_timings = std::mem::replace( + &mut *self.0.lock().unwrap(), + initialized_result_with_timings(), + ); + (result_with_timings, self) } fn pause_for_recent_blockhash(&mut self) { @@ -651,6 +1046,14 @@ mod tests { } } + impl UninstalledScheduler + for AsyncScheduler + { + fn return_to_pool(self: Box) { + self.3.clone().return_scheduler(*self) + } + } + impl SpawnableScheduler for AsyncScheduler { @@ -670,19 +1073,10 @@ mod tests { initial_context: SchedulingContext, ) -> Self { AsyncScheduler::( - PooledScheduler::::from_inner( - PooledSchedulerInner { - id: pool.new_scheduler_id(), - pool: SchedulerPool::new( - pool.handler_context.log_messages_bytes_limit, - pool.handler_context.transaction_status_sender.clone(), - pool.handler_context.replay_vote_sender.clone(), - pool.handler_context.prioritization_fee_cache.clone(), - ), - }, - initial_context, - ), + Mutex::new(initialized_result_with_timings()), Mutex::new(vec![]), + initial_context, + pool, ) } } From 2dd63440eed7f3a64d8f9cb679515cc39445554e Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 9 Jan 2024 12:31:37 +0900 Subject: [PATCH 02/14] Make the internal struct ExecutedTask not pub --- unified-scheduler-pool/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 8ec460d087f2e4..e9b68e41e5d5fa 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -199,7 +199,7 @@ impl TaskHandler for DefaultTaskHandler { } } -pub struct ExecutedTask { +struct ExecutedTask { task: Task, result_with_timings: ResultWithTimings, } From 7550bb9dea42457afd75e08d1aae9d187e2c57c4 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 9 Jan 2024 14:40:58 +0900 Subject: [PATCH 03/14] Improve wording a bit --- unified-scheduler-pool/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index e9b68e41e5d5fa..096d978c4af666 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -213,7 +213,7 @@ impl ExecutedTask { } } -// A very tiny generic message type to signal about opening and closing of subchannels, which is +// A very tiny generic message type to signal about opening and closing of subchannels, which are // logically segmented series of Payloads (P1) over a signle continuous time-span, potentially // carrying some subchannel metadata (P2) upon opening a new subchannel. // Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels @@ -234,7 +234,7 @@ type ExecutedTaskPayload = SubchanneledPayload, ()>; // Usually, there's no way to prevent one of those threads from mixing current and next contexts // while processing messages with a multiple-consumer channel. A condvar or other // out-of-bound mechanism is needed to notify about switching of contextual data. That's because -// there's no way to block those threads reliably on such an switching event just with a channel. +// there's no way to block those threads reliably on such a switching event just with a channel. // // However, if the number of consumer can be determined, this can be accomplished just over a // single channel, which even carries an in-bound control meta-message with the contexts. The trick @@ -949,9 +949,9 @@ mod tests { // That's because we're testing the serialized failing execution case in this test. // However, currently threaded impl can't properly abort in this situtation.. // so, 1 should be observed, intead of 0. - // Also note that bank.transaction_count() is generally racy because blockstore_processor - // and unified_scheduler both tend to process non-conflicting batches in parallel as normal - // operation. + // Also note that bank.transaction_count() is generally racy by nature, because + // blockstore_processor and unified_scheduler both tend to process non-conflicting batches + // in parallel as part of the normal operation. assert_eq!(bank.transaction_count(), 1); let bank = BankWithScheduler::new(bank, Some(scheduler)); From 30f6abe5f42b056552c2c390651d2644fc2d45f2 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 9 Jan 2024 22:26:54 +0900 Subject: [PATCH 04/14] Explain scheduler main loop's overhead sensitivity --- unified-scheduler-pool/src/lib.rs | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 096d978c4af666..cdf9f76e28bf8f 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -426,6 +426,39 @@ impl, TH: TaskHandler> ThreadManager { let mut session_ending = false; let mut active_task_count: usize = 0; + + // Now, this is the main loop for the scheduler thread, which is a special beast. + // + // That's because it's the most notable bottleneck of throughput. Unified scheduler's + // overall throughput is largely dependant on its ultra-low latency characteristic, + // which is the most important design goal of the scheduler in order to reduce the + // transaction confirmation latency for end users. + // + // Firstly, the scheduler thread must handle incoming messages from thread(s) owned by + // the replay stage or the banking stage. It also must handle incoming messages from + // the multi-threaded handlers. This heavily-multi-threaded whole processing load must + // be coped just with the single-threaded scheduler, to attain ideal cpu cache + // friendliness and main memory bandwidth saturation with its shared-nothing + // single-threaded account locking implementation. In other words, the per-task + // processing efficiency of the main loop codifies the upper bound of horizontal + // scalability of the unified scheduler. + // + // Moreover, the scheduler is designed to handle tasks without batching at all in the + // pursuit of saturating all of the handler threads with maximally-fine-grained + // concurrency density for throughput as the second design goal. This design goal + // relies on the assumption that there's no considerable penalty arising from the + // unbatched manner of processing. + // + // These two key elements of the design philosophy lead to the rather unforgiving + // implementation burden: Degraded performance would acutely manifest from an even tiny + // amount of individual cpu-bound processing delay in the scheduler thread, like when + // dispatching the next conflicting task after receiving the previous finished one from + // the handler. + // + // Thus, it's fatal for unified scheduler's advertised superiority to squeeze every cpu + // cycles out of the scheduler thread. Thus, any kinds of overhead sources like + // syscalls and VDSO, and even memory (de)allocation should be avoided at all costs by + // design or by means of offloading at the last resort. move || loop { let mut is_finished = false; while !is_finished { From 3623a346576069781fe03c38331250324c9158ae Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 9 Jan 2024 22:55:08 +0900 Subject: [PATCH 05/14] Improve wording a bit --- unified-scheduler-pool/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index cdf9f76e28bf8f..ad8d127b8d6e5b 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -456,9 +456,9 @@ impl, TH: TaskHandler> ThreadManager { // the handler. // // Thus, it's fatal for unified scheduler's advertised superiority to squeeze every cpu - // cycles out of the scheduler thread. Thus, any kinds of overhead sources like - // syscalls and VDSO, and even memory (de)allocation should be avoided at all costs by - // design or by means of offloading at the last resort. + // cycles out of the scheduler thread. Thus, any kinds of unessential overhead sources + // like syscalls, VDSO, and even memory (de)allocation should be avoided at all costs + // by design or by means of offloading at the last resort. move || loop { let mut is_finished = false; while !is_finished { From 1334811444fa871dc2e83946bb81b30cd047d447 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 9 Jan 2024 23:04:24 +0900 Subject: [PATCH 06/14] Define ChainedChannel{Sender, Receiver} wrappers --- unified-scheduler-pool/src/lib.rs | 204 +++++++++++++++++++++--------- 1 file changed, 142 insertions(+), 62 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index ad8d127b8d6e5b..e28973d86f76ef 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -10,7 +10,7 @@ use { assert_matches::assert_matches, - crossbeam_channel::{select, unbounded, Receiver, Sender}, + crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, log::*, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, @@ -247,36 +247,138 @@ type ExecutedTaskPayload = SubchanneledPayload, ()>; // minimum at the cost of a single heap allocation per switching for the sake of Box-ing the Self // type to avoid infinite mem::size_of() due to the recursive type structure. Needless to say, such // an allocation can be amortized to be negligible. -enum ChainedChannel { - Payload(P1), - PayloadAndChannel(Box>), -} +mod chained_channel { + use super::*; -trait WithChannelAndPayload: Send + Sync { - fn payload_and_channel(self: Box) -> PayloadAndChannelInner; -} + // hide variants by putting this inside newtype + enum ChainedChannelPrivate { + Payload(P), + ContextAndChannel(Box>), + } -type PayloadAndChannelInner = (P2, Receiver>); + pub(super) struct ChainedChannel(ChainedChannelPrivate); -struct PayloadAndChannelWrapper(PayloadAndChannelInner); + trait WithContextAndPayload: Send + Sync { + fn context_and_channel(self: Box) -> ContextAndChannelInner; + } -impl WithChannelAndPayload for PayloadAndChannelWrapper -where - P1: Send + Sync, - P2: Send + Sync, -{ - fn payload_and_channel(self: Box) -> PayloadAndChannelInner { - self.0 + type ContextAndChannelInner = (C, Receiver>); + + struct ContextAndChannelWrapper(ContextAndChannelInner); + + impl WithContextAndPayload for ContextAndChannelWrapper + where + P: Send + Sync, + C: Send + Sync, + { + fn context_and_channel(self: Box) -> ContextAndChannelInner { + self.0 + } } -} -impl ChainedChannel -where - P1: Send + Sync + 'static, - P2: Send + Sync + 'static, -{ - fn chain_to_new_channel(payload: P2, receiver: Receiver) -> Self { - Self::PayloadAndChannel(Box::new(PayloadAndChannelWrapper((payload, receiver)))) + impl ChainedChannel + where + P: Send + Sync + 'static, + C: Send + Sync + 'static, + { + fn chain_to_new_channel(context: C, receiver: Receiver) -> Self { + Self(ChainedChannelPrivate::ContextAndChannel(Box::new( + ContextAndChannelWrapper((context, receiver)), + ))) + } + } + + pub(super) struct ChainedChannelSender { + sender: Sender>, + } + + impl ChainedChannelSender + where + P: Send + Sync + 'static, + C: Send + Sync + 'static + Clone, + { + fn new(sender: Sender>) -> Self { + Self { sender } + } + + pub(super) fn send_payload( + &self, + payload: P, + ) -> std::result::Result<(), SendError>> { + self.sender + .send(ChainedChannel(ChainedChannelPrivate::Payload(payload))) + } + + pub(super) fn send_chained_channel( + &mut self, + context: C, + count: usize, + ) -> std::result::Result<(), SendError>> { + let (chained_sender, chained_receiver) = crossbeam_channel::unbounded(); + for _ in 0..count { + self.sender.send(ChainedChannel::chain_to_new_channel( + context.clone(), + chained_receiver.clone(), + ))? + } + self.sender = chained_sender; + Ok(()) + } + } + + pub(super) struct ChainedChannelReceiver { + receiver: Receiver>, + context: C, + } + + impl Clone for ChainedChannelReceiver { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.clone(), + context: self.context.clone(), + } + } + } + + impl ChainedChannelReceiver { + fn new(receiver: Receiver>, initial_context: C) -> Self { + Self { + receiver, + context: initial_context, + } + } + + pub(super) fn context(&self) -> &C { + &self.context + } + + pub(super) fn for_select(&self) -> &Receiver> { + &self.receiver + } + + pub(super) fn after_select(&mut self, message: ChainedChannel) -> Option

{ + match message.0 { + ChainedChannelPrivate::Payload(payload) => Some(payload), + ChainedChannelPrivate::ContextAndChannel(new_context_and_channel) => { + (self.context, self.receiver) = new_context_and_channel.context_and_channel(); + None + } + } + } + } + + pub(super) fn unbounded( + initial_context: C, + ) -> (ChainedChannelSender, ChainedChannelReceiver) + where + P: Send + Sync + 'static, + C: Send + Sync + 'static + Clone, + { + let (sender, receiver) = crossbeam_channel::unbounded(); + ( + ChainedChannelSender::new(sender), + ChainedChannelReceiver::new(receiver, initial_context), + ) } } @@ -369,23 +471,6 @@ impl, TH: TaskHandler> ThreadManager { ); } - fn propagate_context_to_handler_threads( - runnable_task_sender: &mut Sender>, - context: SchedulingContext, - handler_count: usize, - ) { - let (next_sessioned_task_sender, runnable_task_receiver) = unbounded(); - for _ in 0..handler_count { - runnable_task_sender - .send(ChainedChannel::chain_to_new_channel( - context.clone(), - runnable_task_receiver.clone(), - )) - .unwrap(); - } - *runnable_task_sender = next_sessioned_task_sender; - } - fn take_session_result_with_timings(&mut self) -> ResultWithTimings { self.session_result_with_timings.take().unwrap() } @@ -399,8 +484,8 @@ impl, TH: TaskHandler> ThreadManager { } fn start_threads(&mut self, context: &SchedulingContext) { - let (runnable_task_sender, runnable_task_receiver) = - unbounded::>(); + let (mut runnable_task_sender, runnable_task_receiver) = + chained_channel::unbounded::(context.clone()); let (executed_task_sender, executed_task_receiver) = unbounded::(); let (finished_task_sender, finished_task_receiver) = unbounded::>(); let (accumulated_result_sender, accumulated_result_receiver) = @@ -422,7 +507,6 @@ impl, TH: TaskHandler> ThreadManager { let handler_count = self.handler_count; let session_result_sender = self.session_result_sender.clone(); let new_task_receiver = self.new_task_receiver.clone(); - let mut runnable_task_sender = runnable_task_sender.clone(); let mut session_ending = false; let mut active_task_count: usize = 0; @@ -481,17 +565,16 @@ impl, TH: TaskHandler> ThreadManager { // be resolved in the case of single-threaded FIFO like this. active_task_count = active_task_count.checked_add(1).unwrap(); runnable_task_sender - .send(ChainedChannel::Payload(task)) + .send_payload(task) .unwrap(); } NewTaskPayload::OpenSubchannel(context) => { // signal about new SchedulingContext to both handler and // accumulator threads - Self::propagate_context_to_handler_threads( - &mut runnable_task_sender, + runnable_task_sender.send_chained_channel( context, handler_count - ); + ).unwrap(); executed_task_sender .send(ExecutedTaskPayload::OpenSubchannel(())) .unwrap(); @@ -528,28 +611,25 @@ impl, TH: TaskHandler> ThreadManager { let handler_main_loop = || { let pool = self.pool.clone(); - let mut bank = context.bank().clone(); let mut runnable_task_receiver = runnable_task_receiver.clone(); let finished_task_sender = finished_task_sender.clone(); move || loop { let (task, sender) = select! { - recv(runnable_task_receiver) -> message => { - match message.unwrap() { - ChainedChannel::Payload(task) => { - (task, &finished_task_sender) - } - ChainedChannel::PayloadAndChannel(new_channel) => { - let new_context; - (new_context, runnable_task_receiver) = new_channel.payload_and_channel(); - bank = new_context.bank().clone(); - continue; - } + recv(runnable_task_receiver.for_select()) -> message => { + if let Some(task) = runnable_task_receiver.after_select(message.unwrap()) { + (task, &finished_task_sender) + } else { + continue; } }, }; let mut task = ExecutedTask::new_boxed(task); - Self::execute_task_with_handler(&bank, &mut task, &pool.handler_context); + Self::execute_task_with_handler( + runnable_task_receiver.context().bank(), + &mut task, + &pool.handler_context, + ); sender.send(task).unwrap(); } }; From f9fbbfbc85165ed6c4825a0e0b722ea2bf7822cd Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 10 Jan 2024 01:03:02 +0900 Subject: [PATCH 07/14] Clean up a bit --- unified-scheduler-pool/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index e28973d86f76ef..b3944da1c47497 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -571,10 +571,9 @@ impl, TH: TaskHandler> ThreadManager { NewTaskPayload::OpenSubchannel(context) => { // signal about new SchedulingContext to both handler and // accumulator threads - runnable_task_sender.send_chained_channel( - context, - handler_count - ).unwrap(); + runnable_task_sender + .send_chained_channel(context, handler_count) + .unwrap(); executed_task_sender .send(ExecutedTaskPayload::OpenSubchannel(())) .unwrap(); From 2a046e4ead38e7eb2b0cd00eceb564064f6a1601 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 10 Jan 2024 14:27:57 +0900 Subject: [PATCH 08/14] Use derivative to avoid manual Clone impl --- Cargo.lock | 1 + Cargo.toml | 1 + programs/sbf/Cargo.lock | 1 + unified-scheduler-pool/Cargo.toml | 1 + unified-scheduler-pool/src/lib.rs | 14 +++++--------- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83f7369c50c982..9f4886b0125b15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7554,6 +7554,7 @@ version = "1.18.0" dependencies = [ "assert_matches", "crossbeam-channel", + "derivative", "log", "solana-ledger", "solana-logger", diff --git a/Cargo.toml b/Cargo.toml index 4661dfa9e5fe4c..2ace7298f69cea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,7 @@ ctrlc = "3.4.2" curve25519-dalek = "3.2.1" dashmap = "5.5.3" derivation-path = { version = "0.2.0", default-features = false } +derivative = "2.2.0" dialoguer = "0.10.4" digest = "0.10.7" dir-diff = "0.3.3" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 1d5c58a8959054..bcdc8de57fd24e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6554,6 +6554,7 @@ version = "1.18.0" dependencies = [ "assert_matches", "crossbeam-channel", + "derivative", "log", "solana-ledger", "solana-program-runtime", diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index c8848e098827b2..7626215b1e1126 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } [dependencies] assert_matches = { workspace = true } crossbeam-channel = { workspace = true } +derivative = { workspace = true } log = { workspace = true } solana-ledger = { workspace = true } solana-program-runtime = { workspace = true } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index b3944da1c47497..c7fa38cd610ee1 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -11,6 +11,7 @@ use { assert_matches::assert_matches, crossbeam_channel::{select, unbounded, Receiver, SendError, Sender}, + derivative::Derivative, log::*, solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, @@ -326,20 +327,15 @@ mod chained_channel { } } + // P doesn't need to be `: Clone`, yet rustc derive can't handle it. + // see https://github.com/rust-lang/rust/issues/26925 + #[derive(Derivative)] + #[derivative(Clone(bound = "C: Clone"))] pub(super) struct ChainedChannelReceiver { receiver: Receiver>, context: C, } - impl Clone for ChainedChannelReceiver { - fn clone(&self) -> Self { - Self { - receiver: self.receiver.clone(), - context: self.context.clone(), - } - } - } - impl ChainedChannelReceiver { fn new(receiver: Receiver>, initial_context: C) -> Self { Self { From ebab9a7e4789ceee4af01be414d059c02a7864b9 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 13 Jan 2024 00:47:16 +0900 Subject: [PATCH 09/14] Clarify comment --- unified-scheduler-pool/src/lib.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c7fa38cd610ee1..e865f3af054f95 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -509,10 +509,11 @@ impl, TH: TaskHandler> ThreadManager { // Now, this is the main loop for the scheduler thread, which is a special beast. // - // That's because it's the most notable bottleneck of throughput. Unified scheduler's - // overall throughput is largely dependant on its ultra-low latency characteristic, - // which is the most important design goal of the scheduler in order to reduce the - // transaction confirmation latency for end users. + // That's because it could be the most notable bottleneck of throughput in the future + // when there are ~100 handler threads. Unified scheduler's overall throughput is + // largely dependant on its ultra-low latency characteristic, which is the most + // important design goal of the scheduler in order to reduce the transaction + // confirmation latency for end users. // // Firstly, the scheduler thread must handle incoming messages from thread(s) owned by // the replay stage or the banking stage. It also must handle incoming messages from @@ -529,6 +530,11 @@ impl, TH: TaskHandler> ThreadManager { // relies on the assumption that there's no considerable penalty arising from the // unbatched manner of processing. // + // Note that this assumption isn't true as of writing. The current code path + // underneath execute_batch() isn't optimized for unified scheduler's load pattern (ie. + // batches just with a single transaction) at all. This will be addressed in the + // future. + // // These two key elements of the design philosophy lead to the rather unforgiving // implementation burden: Degraded performance would acutely manifest from an even tiny // amount of individual cpu-bound processing delay in the scheduler thread, like when From 4225cba1d046bc57098a985fa84901cd6090b538 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 13 Jan 2024 21:38:49 +0900 Subject: [PATCH 10/14] Remove extra whitespace in comment --- unified-scheduler-pool/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index e865f3af054f95..45c1d45e59685a 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -530,7 +530,7 @@ impl, TH: TaskHandler> ThreadManager { // relies on the assumption that there's no considerable penalty arising from the // unbatched manner of processing. // - // Note that this assumption isn't true as of writing. The current code path + // Note that this assumption isn't true as of writing. The current code path // underneath execute_batch() isn't optimized for unified scheduler's load pattern (ie. // batches just with a single transaction) at all. This will be addressed in the // future. From cc5a1f071fd95bdf624308936b509856ce8aee4b Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 18 Jan 2024 13:48:35 +0900 Subject: [PATCH 11/14] Remove unneeded dyn trait for ChainedChannel --- unified-scheduler-pool/src/lib.rs | 54 +++++++------------------------ 1 file changed, 11 insertions(+), 43 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 45c1d45e59685a..f28c6ca49d6291 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -245,47 +245,22 @@ type ExecutedTaskPayload = SubchanneledPayload, ()>; // this switching can happen exactly once for each thread. // // Overall, this greatly simplifies the code, reduces CAS/syscall overhead per messaging to the -// minimum at the cost of a single heap allocation per switching for the sake of Box-ing the Self -// type to avoid infinite mem::size_of() due to the recursive type structure. Needless to say, such -// an allocation can be amortized to be negligible. +// minimum at the cost of a single channel recreation per switching. Needless to say, such an +// allocation can be amortized to be negligible. mod chained_channel { use super::*; // hide variants by putting this inside newtype enum ChainedChannelPrivate { Payload(P), - ContextAndChannel(Box>), + ContextAndChannel(C, Receiver>), } pub(super) struct ChainedChannel(ChainedChannelPrivate); - trait WithContextAndPayload: Send + Sync { - fn context_and_channel(self: Box) -> ContextAndChannelInner; - } - - type ContextAndChannelInner = (C, Receiver>); - - struct ContextAndChannelWrapper(ContextAndChannelInner); - - impl WithContextAndPayload for ContextAndChannelWrapper - where - P: Send + Sync, - C: Send + Sync, - { - fn context_and_channel(self: Box) -> ContextAndChannelInner { - self.0 - } - } - - impl ChainedChannel - where - P: Send + Sync + 'static, - C: Send + Sync + 'static, - { + impl ChainedChannel { fn chain_to_new_channel(context: C, receiver: Receiver) -> Self { - Self(ChainedChannelPrivate::ContextAndChannel(Box::new( - ContextAndChannelWrapper((context, receiver)), - ))) + Self(ChainedChannelPrivate::ContextAndChannel(context, receiver)) } } @@ -293,11 +268,7 @@ mod chained_channel { sender: Sender>, } - impl ChainedChannelSender - where - P: Send + Sync + 'static, - C: Send + Sync + 'static + Clone, - { + impl ChainedChannelSender { fn new(sender: Sender>) -> Self { Self { sender } } @@ -355,21 +326,18 @@ mod chained_channel { pub(super) fn after_select(&mut self, message: ChainedChannel) -> Option

{ match message.0 { ChainedChannelPrivate::Payload(payload) => Some(payload), - ChainedChannelPrivate::ContextAndChannel(new_context_and_channel) => { - (self.context, self.receiver) = new_context_and_channel.context_and_channel(); + ChainedChannelPrivate::ContextAndChannel(context, channel) => { + self.context = context; + self.receiver = channel; None } } } } - pub(super) fn unbounded( + pub(super) fn unbounded( initial_context: C, - ) -> (ChainedChannelSender, ChainedChannelReceiver) - where - P: Send + Sync + 'static, - C: Send + Sync + 'static + Clone, - { + ) -> (ChainedChannelSender, ChainedChannelReceiver) { let (sender, receiver) = crossbeam_channel::unbounded(); ( ChainedChannelSender::new(sender), From 2f592f5a84dba4fa61f92e1c23d1923c7bfea238 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 18 Jan 2024 16:33:10 +0900 Subject: [PATCH 12/14] Remove the accumulator thread for now --- unified-scheduler-pool/src/lib.rs | 101 +++++++++--------------------- 1 file changed, 30 insertions(+), 71 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index f28c6ca49d6291..4e073461eb0729 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -227,7 +227,6 @@ enum SubchanneledPayload { } type NewTaskPayload = SubchanneledPayload; -type ExecutedTaskPayload = SubchanneledPayload, ()>; // A tiny generic message type to synchronize multiple threads everytime some contextual data needs // to be switched (ie. SchedulingContext), just using a single communication channel. @@ -381,7 +380,6 @@ struct ThreadManager, TH: TaskHandler> { session_result_with_timings: Option, scheduler_thread: Option>, handler_threads: Vec>, - accumulator_thread: Option>, } impl PooledScheduler { @@ -415,7 +413,6 @@ impl, TH: TaskHandler> ThreadManager { session_result_with_timings: None, scheduler_thread: None, handler_threads: Vec::with_capacity(handler_count), - accumulator_thread: None, } } @@ -435,6 +432,24 @@ impl, TH: TaskHandler> ThreadManager { ); } + fn accumulate_result_with_timings( + (result, timings): &mut ResultWithTimings, + executed_task: Box, + ) { + match executed_task.result_with_timings.0 { + Ok(()) => {} + Err(error) => { + error!("error is detected while accumulating....: {error:?}"); + // Override errors intentionally for simplicity, not retaining the + // first error unlike the block verification in the + // blockstore_processor. This will be addressed with more + // full-fledged impl later. + *result = Err(error); + } + } + timings.accumulate(&executed_task.result_with_timings.1); + } + fn take_session_result_with_timings(&mut self) -> ResultWithTimings { self.session_result_with_timings.take().unwrap() } @@ -450,10 +465,7 @@ impl, TH: TaskHandler> ThreadManager { fn start_threads(&mut self, context: &SchedulingContext) { let (mut runnable_task_sender, runnable_task_receiver) = chained_channel::unbounded::(context.clone()); - let (executed_task_sender, executed_task_receiver) = unbounded::(); let (finished_task_sender, finished_task_receiver) = unbounded::>(); - let (accumulated_result_sender, accumulated_result_receiver) = - unbounded::>(); let mut result_with_timings = self.session_result_with_timings.take(); @@ -464,9 +476,6 @@ impl, TH: TaskHandler> ThreadManager { // 4. the handler thread processes the dispatched task. // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. - // 7. the scheduler thread send the executed task to the accumulator thread. - // 8. the accumulator thread examines the executed task's result and accumulate its timing, - // finally dropping the transaction inside the executed task. let scheduler_main_loop = || { let handler_count = self.handler_count; let session_result_sender = self.session_result_sender.clone(); @@ -521,35 +530,33 @@ impl, TH: TaskHandler> ThreadManager { let executed_task = executed_task.unwrap(); active_task_count = active_task_count.checked_sub(1).unwrap(); - executed_task_sender - .send(ExecutedTaskPayload::Payload(executed_task)) - .unwrap(); + let result_with_timings = result_with_timings.as_mut().unwrap(); + Self::accumulate_result_with_timings(result_with_timings, executed_task); }, recv(new_task_receiver) -> message => { + assert!(!session_ending); + match message.unwrap() { NewTaskPayload::Payload(task) => { - assert!(!session_ending); - // so, we're NOT scheduling at all here; rather, just execute // tx straight off. the inter-tx locking deps aren't needed to // be resolved in the case of single-threaded FIFO like this. - active_task_count = active_task_count.checked_add(1).unwrap(); runnable_task_sender .send_payload(task) .unwrap(); + active_task_count = active_task_count.checked_add(1).unwrap(); } NewTaskPayload::OpenSubchannel(context) => { - // signal about new SchedulingContext to both handler and - // accumulator threads + // signal about new SchedulingContext to handler threads runnable_task_sender .send_chained_channel(context, handler_count) .unwrap(); - executed_task_sender - .send(ExecutedTaskPayload::OpenSubchannel(())) - .unwrap(); + assert_matches!( + result_with_timings.replace(initialized_result_with_timings()), + None + ); } NewTaskPayload::CloseSubchannel => { - assert!(!session_ending); session_ending = true; } } @@ -562,14 +569,10 @@ impl, TH: TaskHandler> ThreadManager { } if session_ending { - executed_task_sender - .send(ExecutedTaskPayload::CloseSubchannel) - .unwrap(); session_result_sender .send(Some( - accumulated_result_receiver - .recv() - .unwrap() + result_with_timings + .take() .unwrap_or_else(initialized_result_with_timings), )) .unwrap(); @@ -603,43 +606,6 @@ impl, TH: TaskHandler> ThreadManager { } }; - // This thread is needed because .accumualte() and drop::() both are - // relatively heavy operations to be put inside the scheduler thread. - let accumulator_main_loop = || { - move || loop { - match executed_task_receiver.recv().unwrap() { - ExecutedTaskPayload::Payload(executed_task) => { - let result_with_timings = result_with_timings.as_mut().unwrap(); - match executed_task.result_with_timings.0 { - Ok(()) => {} - Err(error) => { - error!("error is detected while accumulating....: {error:?}"); - // Override errors intentionally for simplicity, not retaining the - // first error unlike the block verification in the - // blockstore_processor. This will be addressed with more - // full-fledged impl later. - result_with_timings.0 = Err(error); - } - } - result_with_timings - .1 - .accumulate(&executed_task.result_with_timings.1); - } - ExecutedTaskPayload::OpenSubchannel(()) => { - assert_matches!( - result_with_timings.replace(initialized_result_with_timings()), - None - ); - } - ExecutedTaskPayload::CloseSubchannel => { - accumulated_result_sender - .send(result_with_timings.take()) - .unwrap(); - } - } - } - }; - self.scheduler_thread = Some( thread::Builder::new() .name("solScheduler".to_owned()) @@ -647,13 +613,6 @@ impl, TH: TaskHandler> ThreadManager { .unwrap(), ); - self.accumulator_thread = Some( - thread::Builder::new() - .name("solScAccmltr".to_owned()) - .spawn(accumulator_main_loop()) - .unwrap(), - ); - self.handler_threads = (0..self.handler_count) .map({ |thx| { From 07de5b4736eb811a7943aa97a43fd4ae4fa892a2 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 19 Jan 2024 23:47:03 +0900 Subject: [PATCH 13/14] Fix typo --- unified-scheduler-pool/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 4e073461eb0729..cc0826375637ac 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -215,7 +215,7 @@ impl ExecutedTask { } // A very tiny generic message type to signal about opening and closing of subchannels, which are -// logically segmented series of Payloads (P1) over a signle continuous time-span, potentially +// logically segmented series of Payloads (P1) over a single continuous time-span, potentially // carrying some subchannel metadata (P2) upon opening a new subchannel. // Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels // (i.e. the consumer side needs to be single threaded). For the multiple consumer cases, From afd580e8f86634447deaa86282e24bd257fbdee5 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 19 Jan 2024 23:49:33 +0900 Subject: [PATCH 14/14] Use unimplemented!() to convey intention better --- unified-scheduler-pool/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index cc0826375637ac..deae3697807705 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -1033,7 +1033,7 @@ mod tests { for AsyncScheduler { fn id(&self) -> SchedulerId { - todo!(); + unimplemented!(); } fn context(&self) -> &SchedulingContext { @@ -1102,11 +1102,11 @@ mod tests { type Inner = Self; fn into_inner(self) -> (ResultWithTimings, Self::Inner) { - todo!(); + unimplemented!(); } fn from_inner(_inner: Self::Inner, _context: SchedulingContext) -> Self { - todo!(); + unimplemented!(); } fn spawn(