Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

LeaderBankNotifier #30395

Merged
merged 36 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0938fe2
LeaderBankStatus
apfitzge Feb 16, 2023
a802d31
in progress timeout
apfitzge Mar 7, 2023
da3a6af
fix some flakiness
apfitzge Mar 13, 2023
cb5524c
LeaderBankStatus: Additional tests, bugfix, and comment clarification
apfitzge Mar 15, 2023
fb9dbc6
Restructuring of working_bank taking and setting complete
apfitzge Mar 21, 2023
df7d433
Use Arc::default
apfitzge Mar 21, 2023
913afe5
Remove Uninitialized variant, clarify comments
apfitzge Mar 21, 2023
011b508
Rename LeaderBankStatus -> LeaderBankNotifier
apfitzge Mar 21, 2023
b985c57
Clarifying commment
apfitzge Mar 21, 2023
a50ae28
timeout -> remaining_timeout
apfitzge Mar 21, 2023
03ec4e6
Single mutex for internal state
apfitzge Mar 21, 2023
891f3eb
Additional renames
apfitzge Mar 21, 2023
91b4e8f
new_leader_bank_notifier
apfitzge Mar 21, 2023
48a747b
Move leader_bank_notifier into poh
apfitzge Mar 22, 2023
22454f6
waiters named with qualifying verbs instead of noun
apfitzge Mar 22, 2023
30c1488
return Weak::new on timeout
apfitzge Mar 22, 2023
8261ab6
Use else-if
apfitzge Mar 22, 2023
80fd964
set_in_progress panics if not in StandBy
apfitzge Mar 22, 2023
bb1791a
set_completed panics on a slot mismatch
apfitzge Mar 22, 2023
8b924cb
optional slot
apfitzge Mar 22, 2023
785b3b2
Fix tests
apfitzge Mar 22, 2023
0f12583
additional testing - actual immediate return
apfitzge Mar 22, 2023
f350c0a
Internal get_or_wait_for_in_progress_state
apfitzge Mar 22, 2023
4a7125a
wait_for_completed explicit checks for InProgress -> StandBy
apfitzge Mar 22, 2023
90fab19
revert some changes. only allow poh_recorder to set leader_bank state
apfitzge Mar 23, 2023
547b808
Use reset instead of set since the bank is already set
apfitzge Mar 23, 2023
9ba7d92
pub(crate) state setting - now only intended to be used in solana-poh
apfitzge Mar 24, 2023
aa673da
unwrap_or_default
apfitzge Mar 24, 2023
9117ddd
reduce complexity
apfitzge Mar 24, 2023
de788b1
state option using ?
apfitzge Mar 24, 2023
b893a6b
timeout sub using checked_sub and ?
apfitzge Mar 24, 2023
06542a1
get_or_wait_for_in_progress_state takes in Condvar ref instead of self
apfitzge Mar 24, 2023
29cee36
get_or_wait_for_in_progress_state argument order
apfitzge Mar 24, 2023
42d778b
set_completed is now only called by PohRecorder. Added assertions app…
apfitzge Mar 24, 2023
b679dee
manually revert previous changes...
apfitzge Mar 25, 2023
974cf7e
Use then and expect for returning optional slot - verifies assumption
apfitzge Mar 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ fn main() {
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
poh_recorder.write().unwrap().set_bank(&bank, false);

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
Expand Down Expand Up @@ -538,6 +537,7 @@ fn main() {
std::u64::MAX,
);

assert!(poh_recorder.read().unwrap().bank().is_none());
poh_recorder.write().unwrap().set_bank(&bank, false);
assert!(poh_recorder.read().unwrap().bank().is_some());
debug!(
Expand Down
1 change: 0 additions & 1 deletion core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
);
poh_recorder.write().unwrap().set_bank(&bank, false);

let chunk_len = verified.len() / CHUNKS;
let mut start = 0;
Expand Down
275 changes: 275 additions & 0 deletions poh/src/leader_bank_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
use {
solana_runtime::bank::Bank,
solana_sdk::slot_history::Slot,
std::{
sync::{Arc, Condvar, Mutex, MutexGuard, Weak},
time::{Duration, Instant},
},
};

/// Tracks leader status of the validator node and notifies when:
/// 1. A leader bank initiates (=PoH-initiated)
/// 2. A leader slot completes (=PoH-completed)
#[derive(Debug, Default)]
pub struct LeaderBankNotifier {
/// Current state (slot, bank, and status) of the system
state: Mutex<SlotAndBankWithStatus>,
/// CondVar to notify status changes and waiting
condvar: Condvar,
}

/// Leader status state machine for the validator.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
enum Status {
/// The leader bank is not currently available. Either not initialized, or PoH-completed bank.
#[default]
StandBy,
/// PoH-initiated bank is available.
InProgress,
}

#[derive(Debug, Default)]
struct SlotAndBankWithStatus {
status: Status,
slot: Option<Slot>,
bank: Weak<Bank>,
}

impl LeaderBankNotifier {
/// Set the status to `InProgress` and notify any waiting threads.
/// Panics if the status is not `StandBy` - cannot have multiple
/// leader banks in progress.
pub(crate) fn set_in_progress(&self, bank: &Arc<Bank>) {
let mut state = self.state.lock().unwrap();
assert_eq!(state.status, Status::StandBy);

*state = SlotAndBankWithStatus {
status: Status::InProgress,
slot: Some(bank.slot()),
bank: Arc::downgrade(bank),
};
drop(state);

self.condvar.notify_all();
}

/// Set the status to `StandBy` and notify any waiting threads.
/// Panics if the current status is not `InProgress` or the stored slot does not match
/// the given slot.
pub(crate) fn set_completed(&self, slot: Slot) {
let mut state = self.state.lock().unwrap();
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(slot));

state.status = Status::StandBy;
drop(state);

self.condvar.notify_all();
}

/// If the status is `InProgress`, immediately return a weak reference to the bank.
/// Otherwise, wait up to the `timeout` for the status to become `InProgress`.
/// If the timeout is reached, the weak reference is unupgradable.
pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak<Bank> {
let state = self.state.lock().unwrap();
Self::get_or_wait_for_in_progress_state(&self.condvar, state, timeout)
.map(|state| state.bank.clone())
.unwrap_or_default()
}

/// Wait for next notification for a completed leader slot.
/// Returns `None` if the timeout is reached
pub fn wait_for_completed(&self, mut remaining_timeout: Duration) -> Option<Slot> {
let state = self.state.lock().unwrap();

// If currently `StandBy`, need to wait for `InProgress` to begin.
let now = Instant::now();
let state =
Self::get_or_wait_for_in_progress_state(&self.condvar, state, remaining_timeout)?;
remaining_timeout = remaining_timeout.checked_sub(now.elapsed())?;

// Wait for `StandBy` to be set.
let (state, wait_timeout_result) = self
.condvar
.wait_timeout_while(state, remaining_timeout, |state| {
matches!(state.status, Status::InProgress)
})
.unwrap();

(!wait_timeout_result.timed_out()).then(|| state.slot.expect("some slot when completed"))
}

/// Helper function to get or wait for the `InProgress` status with a given `MutexGuard`.
/// If `InProgress` status is reached, the state `MutexGuard` is returned, otherwise None.
fn get_or_wait_for_in_progress_state<'a>(
condvar: &'a Condvar,
state: MutexGuard<'a, SlotAndBankWithStatus>,
timeout: Duration,
) -> Option<MutexGuard<'a, SlotAndBankWithStatus>> {
let (state, wait_timeout_result) = condvar
.wait_timeout_while(state, timeout, |state| {
matches!(state.status, Status::StandBy)
})
.unwrap();

(!wait_timeout_result.timed_out()).then_some(state)
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_leader_bank_notifier_default() {
let leader_bank_notifier = LeaderBankNotifier::default();
let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, None);
assert!(state.bank.upgrade().is_none());
}

#[test]
#[should_panic]
fn test_leader_bank_notifier_set_in_progress_already_in_progress() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
leader_bank_notifier.set_in_progress(&bank);
}

#[test]
fn test_leader_bank_notifier_set_in_progress() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(state.status, Status::InProgress);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
}

#[test]
#[should_panic]
fn test_leader_bank_notifier_set_completed_uninitialized() {
let leader_bank_notifier = LeaderBankNotifier::default();
leader_bank_notifier.set_completed(0);
}

#[test]
#[should_panic]
fn test_leader_bank_notifier_set_completed_mismatched_in_progress_slot() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
leader_bank_notifier.set_completed(bank.slot() + 1);
}

#[test]
#[should_panic]
fn test_leader_bank_notifier_set_completed_mismatched_completed_slot() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
leader_bank_notifier.set_completed(bank.slot());
leader_bank_notifier.set_completed(bank.slot() + 1);
}

#[test]
fn test_leader_bank_notifier_set_completed() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
leader_bank_notifier.set_completed(bank.slot());

let state = leader_bank_notifier.state.lock().unwrap();
assert_eq!(state.status, Status::StandBy);
assert_eq!(state.slot, Some(bank.slot()));
assert_eq!(state.bank.upgrade(), Some(bank));
}

#[test]
fn test_leader_bank_notifier_get_or_wait_for_in_progress_timeout() {
let leader_bank_notifier = LeaderBankNotifier::default();

// Uninitialized
assert!(leader_bank_notifier
.get_or_wait_for_in_progress(Duration::from_millis(1))
.upgrade()
.is_none());

let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
leader_bank_notifier.set_completed(bank.slot());

// Completed
assert!(leader_bank_notifier
.get_or_wait_for_in_progress(Duration::from_millis(1))
.upgrade()
.is_none());
}

#[test]
fn test_leader_bank_notifier_get_in_progress() {
let leader_bank_notifier = LeaderBankNotifier::default();

let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::ZERO);
assert!(weak_bank.upgrade().is_some());
}

#[test]
fn test_leader_bank_notifier_wait_for_in_progress() {
let leader_bank_notifier = Arc::new(LeaderBankNotifier::default());
let bank = Arc::new(Bank::default_for_tests());

// Need to spawn a separate thread so we wait for the condvar in `get_or_wait_for_in_progress`
let jh = std::thread::spawn({
let leader_bank_notifier = leader_bank_notifier.clone();
let bank = bank.clone();
move || {
std::thread::sleep(Duration::from_millis(10));
leader_bank_notifier.set_in_progress(&bank);
}
});

let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::from_secs(1));
let upgraded_bank = weak_bank.upgrade().unwrap();
assert_eq!(upgraded_bank.slot(), bank.slot());

jh.join().unwrap();
}

#[test]
fn test_leader_bank_notifier_wait_for_completed() {
let leader_bank_notifier = Arc::new(LeaderBankNotifier::default());
let bank = Arc::new(Bank::default_for_tests());

let jh = std::thread::spawn({
let leader_bank_notifier = leader_bank_notifier.clone();
let bank = bank.clone();
move || {
leader_bank_notifier.set_in_progress(&bank);
std::thread::sleep(Duration::from_millis(10));
leader_bank_notifier.set_completed(bank.slot());
}
});

let slot = leader_bank_notifier.wait_for_completed(Duration::from_secs(1));
assert_eq!(slot, Some(bank.slot()));

jh.join().unwrap();
}

#[test]
fn test_leader_bank_notifier_wait_for_completed_timeout() {
let leader_bank_notifier = LeaderBankNotifier::default();
let bank = Arc::new(Bank::default_for_tests());
leader_bank_notifier.set_in_progress(&bank);
assert!(leader_bank_notifier
.wait_for_completed(Duration::from_millis(1))
.is_none());
}
}
1 change: 1 addition & 0 deletions poh/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)]
pub mod leader_bank_notifier;
pub mod poh_recorder;
pub mod poh_service;

Expand Down
16 changes: 12 additions & 4 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
pub use solana_sdk::clock::Slot;
use {
crate::poh_service::PohService,
crate::{leader_bank_notifier::LeaderBankNotifier, poh_service::PohService},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
log::*,
solana_entry::{
Expand Down Expand Up @@ -298,13 +298,14 @@ pub struct PohRecorder {
ticks_from_record: u64,
last_metric: Instant,
record_sender: Sender<Record>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
pub is_exited: Arc<AtomicBool>,
}

impl PohRecorder {
fn clear_bank(&mut self) {
if let Some(working_bank) = self.working_bank.take() {
let bank = working_bank.bank;
if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() {
self.leader_bank_notifier.set_completed(bank.slot());
let next_leader_slot = self.leader_schedule_cache.next_leader_slot(
&self.id,
bank.slot(),
Expand All @@ -326,7 +327,7 @@ impl PohRecorder {
datapoint_info!(
"leader-slot-start-to-cleared-elapsed-ms",
("slot", bank.slot(), i64),
("elapsed", working_bank.start.elapsed().as_millis(), i64),
("elapsed", start.elapsed().as_millis(), i64),
);
}

Expand Down Expand Up @@ -415,6 +416,10 @@ impl PohRecorder {
TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone())
}

pub fn new_leader_bank_notifier(&self) -> Arc<LeaderBankNotifier> {
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
self.leader_bank_notifier.clone()
}

fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool {
(slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..slot).any(|slot| {
// Check if the last slot Poh reset to was any of the
Expand Down Expand Up @@ -569,6 +574,8 @@ impl PohRecorder {
}

pub fn set_bank(&mut self, bank: &Arc<Bank>, track_transaction_indexes: bool) {
assert!(self.working_bank.is_none());
ryoqun marked this conversation as resolved.
Show resolved Hide resolved
self.leader_bank_notifier.set_in_progress(bank);
let working_bank = WorkingBank {
bank: bank.clone(),
start: Arc::new(Instant::now()),
Expand Down Expand Up @@ -954,6 +961,7 @@ impl PohRecorder {
ticks_from_record: 0,
last_metric: Instant::now(),
record_sender,
leader_bank_notifier: Arc::default(),
is_exited,
},
receiver,
Expand Down