Skip to content

Commit

Permalink
Use unblocking mechanism instead of nested locks
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Dec 19, 2024
1 parent dbf4db4 commit d804e35
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
3 changes: 2 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,8 @@ pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
.set_bank(tpu_bank.clone_with_scheduler(), track_transaction_indexes);
tpu_bank.unblock_block_production();
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static {
index: usize,
) -> ScheduleResult;

fn unblock_scheduling(&self);

/// Return the error which caused the scheduler to abort.
///
/// Note that this must not be called until it's observed that `schedule_execution()` has
Expand Down Expand Up @@ -501,6 +503,16 @@ impl BankWithScheduler {
Ok(())
}

pub fn unblock_block_production(&self) {
self.inner
.with_active_scheduler(|scheduler| {
assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction);
scheduler.unblock_scheduling();
Ok(())
})
.unwrap();
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
Expand Down
24 changes: 23 additions & 1 deletion unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,12 @@ impl ExecutedTask {
// Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels
// (i.e. the consumer side needs to be single threaded). For the multiple consumer cases,
// ChainedChannel can be used instead.
#[derive(Debug)]
enum SubchanneledPayload<P1, P2> {
Payload(P1),
OpenSubchannel(P2),
CloseSubchannel,
Unblock,
Disconnect,
Reset,
}
Expand Down Expand Up @@ -1424,7 +1426,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_pausing = true;
session_resetting = true;
}
Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) =>
Ok(NewTaskPayload::OpenSubchannel(_) | NewTaskPayload::Unblock) =>
unreachable!(),
Ok(NewTaskPayload::Disconnect) | Err(RecvError) => {
// Mostly likely is that this scheduler is dropped for pruned blocks of
Expand Down Expand Up @@ -1492,6 +1494,12 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_ending = false;
} else {
session_pausing = false;
// Prevent processing transactions and thus touching poh until
// unblocked, which signals successful poh `set_bank()`-ing.
assert_matches!(
new_task_receiver.recv().unwrap(),
NewTaskPayload::Unblock
);
}

runnable_task_sender
Expand Down Expand Up @@ -1780,6 +1788,12 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
.expect("no new session after aborted");
}

fn unblock_session(&self) {
self.new_task_sender
.send(NewTaskPayload::Unblock)
.expect("no new session after aborted");
}

fn disconnect_new_task_sender(&mut self) {
self.new_task_sender = Arc::new(crossbeam_channel::unbounded().0);
}
Expand Down Expand Up @@ -1936,6 +1950,10 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
self.inner.thread_manager.send_task(task)
}

fn unblock_scheduling(&self) {
self.inner.thread_manager.unblock_session();
}

fn recover_error_after_abort(&mut self) -> TransactionError {
self.inner
.thread_manager
Expand Down Expand Up @@ -3384,6 +3402,10 @@ mod tests {
Ok(())
}

fn unblock_scheduling(&self) {
unimplemented!();
}

fn recover_error_after_abort(&mut self) -> TransactionError {
unimplemented!();
}
Expand Down

0 comments on commit d804e35

Please sign in to comment.