Skip to content

Commit

Permalink
fix: use atomic to check if leader bank changed (#4596)
Browse files Browse the repository at this point in the history
* fix: use atomic to check if leader bank changed

* fix default value and tests

* ordering update

* eedback

* add comment
  • Loading branch information
jstarry authored Jan 24, 2025
1 parent cb9cc49 commit 4cc49ac
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
10 changes: 9 additions & 1 deletion core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.fetch_add(get_bank_us, Ordering::Relaxed);

for work in try_drain_iter(work, &self.consume_receiver) {
if bank.is_complete() {
if bank.is_complete() || {
// check if the bank got interrupted before completion
self.get_consume_bank_id() != Some(bank.bank_id())
} {
let (maybe_new_bank, get_bank_us) = measure_us!(self.get_consume_bank());
if let Some(new_bank) = maybe_new_bank {
self.metrics
Expand Down Expand Up @@ -134,6 +137,11 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
.upgrade()
}

/// Try to get the id for the bank that should be used for consuming
fn get_consume_bank_id(&self) -> Option<u64> {
self.leader_bank_notifier.get_current_bank_id()
}

/// Retry current batch and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
for work in try_drain_iter(work, &self.consume_receiver) {
Expand Down
44 changes: 42 additions & 2 deletions poh/src/leader_bank_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ use {
solana_clock::Slot,
solana_runtime::bank::Bank,
std::{
sync::{Arc, Condvar, Mutex, MutexGuard, Weak},
sync::{
atomic::{AtomicU64, Ordering},
Arc, Condvar, Mutex, MutexGuard, Weak,
},
time::{Duration, Instant},
},
};

const STAND_BY_SENTINEL_ID: u64 = u64::MAX;

/// Tracks leader status of the validator node and notifies when:
/// 1. A leader bank initiates (=PoH-initiated)
/// 2. A leader slot completes (=PoH-completed)
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct LeaderBankNotifier {
/// Current state (slot, bank, and status) of the system
state: Mutex<SlotAndBankWithStatus>,
/// CondVar to notify status changes and waiting
condvar: Condvar,
/// Lightweight atomic variable that can be used to check the id of the
/// latest leader bank
current_bank_id: AtomicU64,
}

impl Default for LeaderBankNotifier {
fn default() -> Self {
Self {
state: Mutex::default(),
condvar: Condvar::default(),
current_bank_id: AtomicU64::new(STAND_BY_SENTINEL_ID),
}
}
}

/// Leader status state machine for the validator.
Expand Down Expand Up @@ -43,6 +61,8 @@ impl LeaderBankNotifier {
let mut state = self.state.lock().unwrap();
assert_eq!(state.status, Status::StandBy);

self.current_bank_id
.store(bank.bank_id(), Ordering::Relaxed);
*state = SlotAndBankWithStatus {
status: Status::InProgress,
slot: Some(bank.slot()),
Expand All @@ -61,12 +81,26 @@ impl LeaderBankNotifier {
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(slot));

self.current_bank_id
.store(STAND_BY_SENTINEL_ID, Ordering::Relaxed);
state.status = Status::StandBy;
drop(state);

self.condvar.notify_all();
}

/// Fetch the bank id of the bank inside the mutex wrapped state field. Due
/// to the usage of relaxed ordering, this is not a guarantee that the
/// caller thread will see the updated bank in the mutex wrapped state yet.
pub fn get_current_bank_id(&self) -> Option<u64> {
let current_bank_id = self.current_bank_id.load(Ordering::Relaxed);
if current_bank_id == STAND_BY_SENTINEL_ID {
None
} else {
Some(current_bank_id)
}
}

/// If the status is `InProgress`, immediately return a weak reference to the bank.
/// Otherwise, wait up to the `timeout` for the status to become `InProgress`.
/// If the timeout is reached, the weak reference is unupgradable.
Expand Down Expand Up @@ -124,6 +158,7 @@ mod tests {
fn test_leader_bank_notifier_default() {
let leader_bank_notifier = LeaderBankNotifier::default();
let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, None);
assert!(state.bank.upgrade().is_none());
Expand All @@ -145,6 +180,10 @@ mod tests {
leader_bank_notifier.set_in_progress(&bank);

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(
leader_bank_notifier.get_current_bank_id(),
Some(bank.bank_id())
);
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
Expand Down Expand Up @@ -184,6 +223,7 @@ mod tests {
leader_bank_notifier.set_completed(bank.slot());

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(leader_bank_notifier.get_current_bank_id(), None);
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
Expand Down

0 comments on commit 4cc49ac

Please sign in to comment.