Skip to content

Commit

Permalink
Apply misc small cleanups to unified scheduler (#4080)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun authored Dec 12, 2024
1 parent bd46d7f commit 6abf4f2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 56 deletions.
9 changes: 2 additions & 7 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ impl SimulatorLoop {
.bank_forks
.read()
.unwrap()
.working_bank_with_scheduler()
.clone_with_scheduler();
.working_bank_with_scheduler();
self.poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -676,11 +675,7 @@ impl BankingSimulator {
let parent_slot = self.parent_slot().unwrap();
let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time;
let freeze_time_by_slot = self.banking_trace_events.freeze_time_by_slot;
let bank = bank_forks
.read()
.unwrap()
.working_bank_with_scheduler()
.clone_with_scheduler();
let bank = bank_forks.read().unwrap().working_bank_with_scheduler();

let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
assert_eq!(parent_slot, bank.slot());
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ impl BankForks {
self[self.highest_slot()].clone()
}

pub fn working_bank_with_scheduler(&self) -> &BankWithScheduler {
&self.banks[&self.highest_slot()]
pub fn working_bank_with_scheduler(&self) -> BankWithScheduler {
self.banks[&self.highest_slot()].clone_with_scheduler()
}

/// Register to be notified when a bank has been dumped (due to duplicate block handling)
Expand Down
95 changes: 48 additions & 47 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ mod chained_channel {

pub(super) fn send_chained_channel(
&mut self,
context: C,
context: &C,
count: usize,
) -> std::result::Result<(), SendError<ChainedChannel<P, C>>> {
let (chained_sender, chained_receiver) = crossbeam_channel::unbounded();
Expand Down Expand Up @@ -771,7 +771,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded();
let handler_count = pool.handler_count;

Self {
scheduler_id: pool.new_scheduler_id(),
Expand All @@ -782,7 +781,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_result_receiver,
session_result_with_timings: None,
scheduler_thread: None,
handler_threads: Vec::with_capacity(handler_count),
handler_threads: vec![],
}
}

Expand Down Expand Up @@ -1101,7 +1100,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// enter into the preceding `while(!is_finished) {...}` loop again.
// Before that, propagate new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(new_context, handler_count)
.send_chained_channel(&new_context, handler_count)
.unwrap();
result_with_timings = new_result_with_timings;
}
Expand Down Expand Up @@ -1152,54 +1151,56 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 2. Subsequent contexts are propagated explicitly inside `.after_select()` as part of
// `select_biased!`, which are sent from `.send_chained_channel()` in the scheduler
// thread for all-but-initial sessions.
move || loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
let Ok(message) = message else {
break;
};
if let Some(task) = runnable_task_receiver.after_select(message) {
(task, &finished_blocked_task_sender)
} else {
continue;
move || {
loop {
let (task, sender) = select_biased! {
recv(runnable_task_receiver.for_select()) -> message => {
let Ok(message) = message else {
break;
};
if let Some(task) = runnable_task_receiver.after_select(message) {
(task, &finished_blocked_task_sender)
} else {
continue;
}
},
recv(runnable_task_receiver.aux_for_select()) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)
} else {
runnable_task_receiver.never_receive_from_aux();
continue;
}
},
};
defer! {
if !thread::panicking() {
return;
}
},
recv(runnable_task_receiver.aux_for_select()) -> task => {
if let Ok(task) = task {
(task, &finished_idle_task_sender)

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
runnable_task_receiver.never_receive_from_aux();
continue;
// It seems that the scheduler thread has been aborted already...
warn!("failed to notify a panic from {:?}", current_thread);
}
},
};
defer! {
if !thread::panicking() {
return;
}

// The scheduler thread can't detect panics in handler threads with
// disconnected channel errors, unless all of them has died. So, send an
// explicit Err promptly.
let current_thread = thread::current();
error!("handler thread is panicking: {:?}", current_thread);
if sender.send(Err(HandlerPanicked)).is_ok() {
info!("notified a panic from {:?}", current_thread);
} else {
// It seems that the scheduler thread has been aborted already...
warn!("failed to notify a panic from {:?}", current_thread);
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context(),
&mut task,
&pool.handler_context,
);
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
}
let mut task = ExecutedTask::new_boxed(task);
Self::execute_task_with_handler(
runnable_task_receiver.context(),
&mut task,
&pool.handler_context,
);
if sender.send(Ok(task)).is_err() {
warn!("handler_thread: scheduler thread aborted...");
break;
}
}
};

Expand Down Expand Up @@ -1441,7 +1442,7 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {

impl<S, TH> UninstalledScheduler for PooledSchedulerInner<S, TH>
where
S: SpawnableScheduler<TH, Inner = PooledSchedulerInner<S, TH>>,
S: SpawnableScheduler<TH, Inner = Self>,
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
Expand Down

0 comments on commit 6abf4f2

Please sign in to comment.