From 0938fe281e4e34b52469f30ff4c985177579ac11 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 16 Feb 2023 23:18:59 +0000 Subject: [PATCH 01/36] LeaderBankStatus --- core/src/replay_stage.rs | 5 ++ poh/src/poh_recorder.rs | 20 ++++- runtime/src/leader_bank_status.rs | 139 ++++++++++++++++++++++++++++++ runtime/src/lib.rs | 1 + 4 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 runtime/src/leader_bank_status.rs diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index e17772321c62da..8e1e9f53f5efa1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -58,6 +58,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, + leader_bank_status::LeaderBankStatus, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, @@ -481,6 +482,7 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; + let leader_bank_status = poh_recorder.read().unwrap().leader_bank_status.clone(); Self::reset_poh_recorder( &my_pubkey, @@ -948,6 +950,7 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, &banking_tracer, + &leader_bank_status, has_new_vote_been_rooted, transaction_status_sender.is_some(), ); @@ -1677,6 +1680,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, banking_tracer: &Arc, + leader_bank_status: &LeaderBankStatus, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { @@ -1802,6 +1806,7 @@ impl ReplayStage { banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash()); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); + leader_bank_status.set_in_progress(&tpu_bank); poh_recorder .write() .unwrap() diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index c0777a932e9674..0442fbda628d9b 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -26,7 +26,7 @@ use { }, solana_measure::{measure, measure_us}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, - solana_runtime::bank::Bank, + solana_runtime::{bank::Bank, leader_bank_status::LeaderBankStatus}, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, saturating_add_assign, transaction::VersionedTransaction, @@ -142,13 +142,19 @@ pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, pub is_exited: Arc, + leader_bank_status: Arc, } impl TransactionRecorder { - pub fn new(record_sender: Sender, is_exited: Arc) -> Self { + pub fn new( + record_sender: Sender, + is_exited: Arc, + leader_bank_status: Arc, + ) -> Self { Self { record_sender, is_exited, + leader_bank_status, } } @@ -174,6 +180,7 @@ impl TransactionRecorder { starting_transaction_index = starting_index; } Err(PohRecorderError::MaxHeightReached) => { + self.leader_bank_status.set_completed(bank_slot); return RecordTransactionsSummary { record_transactions_timings, result: Err(PohRecorderError::MaxHeightReached), @@ -298,6 +305,7 @@ pub struct PohRecorder { ticks_from_record: u64, last_metric: Instant, record_sender: Sender, + pub leader_bank_status: Arc, pub is_exited: Arc, } @@ -305,6 +313,7 @@ impl PohRecorder { fn clear_bank(&mut self) { if let Some(working_bank) = self.working_bank.take() { let bank = working_bank.bank; + self.leader_bank_status.set_completed(bank.slot()); let next_leader_slot = self.leader_schedule_cache.next_leader_slot( &self.id, bank.slot(), @@ -412,7 +421,11 @@ impl PohRecorder { } pub fn new_recorder(&self) -> TransactionRecorder { - TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone()) + TransactionRecorder::new( + self.record_sender.clone(), + self.is_exited.clone(), + self.leader_bank_status.clone(), + ) } fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool { @@ -954,6 +967,7 @@ impl PohRecorder { ticks_from_record: 0, last_metric: Instant::now(), record_sender, + leader_bank_status: Arc::new(LeaderBankStatus::default()), is_exited, }, receiver, diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_status.rs new file mode 100644 index 00000000000000..8c1c2f52f3e6b1 --- /dev/null +++ b/runtime/src/leader_bank_status.rs @@ -0,0 +1,139 @@ +use { + crate::bank::Bank, + solana_sdk::slot_history::Slot, + std::{ + sync::{Arc, Condvar, Mutex, RwLock, Weak}, + time::{Duration, Instant}, + }, +}; + +/// Tracks leader status of the validator node and notifies when: +/// 1. A leader slot begins +/// 2. A leader slot completes +#[derive(Debug, Default)] +pub struct LeaderBankStatus { + /// Current state of the system + status: Mutex, + /// Weak reference to the current bank + bank: RwLock)>>, + /// CondVar to notify status changes and waiting + condvar: Condvar, +} + +/// Leader status state machine for the validator: +/// [Unininitialized] -> [InProgress] -> [Completed] --| +/// ^-------------------------| +#[derive(Debug, Default)] +enum Status { + /// Initial state, no bank, but also not completed yet + #[default] + Uninitialized, + /// Slot is in progress as leader + InProgress, + /// PoH has reached the end of the slot, and the next bank as leader is not available yet + Completed, +} + +impl LeaderBankStatus { + /// Set the status to `InProgress` and notify any waiting threads + /// if the status was not already `InProgress`. + pub fn set_in_progress(&self, bank: &Arc) { + let mut status = self.status.lock().unwrap(); + if matches!(*status, Status::InProgress) { + return; + } + + *status = Status::InProgress; + *self.bank.write().unwrap() = Some((bank.slot(), Arc::downgrade(bank))); + self.condvar.notify_all(); + } + + /// Set the status to `Completed` and notify any waiting threads + /// if the status was not already `Completed` + /// and the slot is higher than the current slot (sanity check). + pub fn set_completed(&self, slot: Slot) { + let mut status = self.status.lock().unwrap(); + if matches!(*status, Status::Completed) { + return; + } + + if let Some((current_slot, _)) = *self.bank.read().unwrap() { + if slot < current_slot { + return; + } + } + + *status = Status::Completed; + self.condvar.notify_all(); + } + + /// Return weak bank reference or wait for notification for an in progress bank. + pub fn wait_for_in_progress(&self) -> Weak { + let status = self.status.lock().unwrap(); + + // Hold status lock until after the weak bank reference is cloned. + let status = self + .condvar + .wait_while(status, |status| { + matches!(*status, Status::Uninitialized | Status::Completed) + }) + .unwrap(); + let bank = self.bank.read().unwrap().as_ref().unwrap().1.clone(); + drop(status); + + bank + } + + /// Wait for next notification for a completed slot. + /// Returns None if the timeout is reached + pub fn wait_for_next_completed_timeout(&self, mut timeout: Duration) -> Option { + loop { + let start = Instant::now(); + let status = self.status.lock().unwrap(); + let (status, result) = self.condvar.wait_timeout(status, timeout).unwrap(); + if result.timed_out() { + return None; + } + + if matches!(*status, Status::Completed) { + let slot = self.bank.read().unwrap().as_ref().unwrap().0; + return Some(slot); + } + + timeout = timeout.saturating_sub(start.elapsed()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_leader_bank_status_wait_for_in_progress() { + let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status2 = leader_bank_status.clone(); + + let jh = std::thread::spawn(move || { + let _weak_bank = leader_bank_status2.wait_for_in_progress(); + }); + leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_status.set_completed(1); + + jh.join().unwrap(); + } + + #[test] + fn test_leader_bank_status_wait_for_completed() { + let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status2 = leader_bank_status.clone(); + + let jh = std::thread::spawn(move || { + let _weak_bank = + leader_bank_status2.wait_for_next_completed_timeout(Duration::from_secs(1)); + }); + leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + + jh.join().unwrap(); + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 8a5bf2b0511af6..dfc1ccd94a1264 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -45,6 +45,7 @@ pub mod in_mem_accounts_index; pub mod inline_spl_associated_token_account; pub mod inline_spl_token; pub mod inline_spl_token_2022; +pub mod leader_bank_status; pub mod loader_utils; pub mod message_processor; pub mod non_circulating_supply; From a802d313f5b013229a747215d6432c889e521862 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 7 Mar 2023 19:58:05 +0000 Subject: [PATCH 02/36] in progress timeout --- runtime/src/leader_bank_status.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_status.rs index 8c1c2f52f3e6b1..02befe0e6c71e0 100644 --- a/runtime/src/leader_bank_status.rs +++ b/runtime/src/leader_bank_status.rs @@ -23,7 +23,7 @@ pub struct LeaderBankStatus { /// Leader status state machine for the validator: /// [Unininitialized] -> [InProgress] -> [Completed] --| /// ^-------------------------| -#[derive(Debug, Default)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] enum Status { /// Initial state, no bank, but also not completed yet #[default] @@ -68,25 +68,25 @@ impl LeaderBankStatus { } /// Return weak bank reference or wait for notification for an in progress bank. - pub fn wait_for_in_progress(&self) -> Weak { + pub fn wait_for_in_progress(&self, timeout: Duration) -> Option> { let status = self.status.lock().unwrap(); // Hold status lock until after the weak bank reference is cloned. let status = self .condvar - .wait_while(status, |status| { + .wait_timeout_while(status, timeout, |status| { matches!(*status, Status::Uninitialized | Status::Completed) }) .unwrap(); let bank = self.bank.read().unwrap().as_ref().unwrap().1.clone(); drop(status); - bank + Some(bank) } /// Wait for next notification for a completed slot. /// Returns None if the timeout is reached - pub fn wait_for_next_completed_timeout(&self, mut timeout: Duration) -> Option { + pub fn wait_for_next_completed(&self, mut timeout: Duration) -> Option { loop { let start = Instant::now(); let status = self.status.lock().unwrap(); @@ -115,7 +115,7 @@ mod tests { let leader_bank_status2 = leader_bank_status.clone(); let jh = std::thread::spawn(move || { - let _weak_bank = leader_bank_status2.wait_for_in_progress(); + let _weak_bank = leader_bank_status2.wait_for_in_progress(Duration::from_secs(1)); }); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); leader_bank_status.set_completed(1); @@ -129,8 +129,7 @@ mod tests { let leader_bank_status2 = leader_bank_status.clone(); let jh = std::thread::spawn(move || { - let _weak_bank = - leader_bank_status2.wait_for_next_completed_timeout(Duration::from_secs(1)); + let _weak_bank = leader_bank_status2.wait_for_next_completed(Duration::from_secs(1)); }); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); From da3a6af504427301720824218acfb9ca4d6c7660 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 13 Mar 2023 19:07:03 +0000 Subject: [PATCH 03/36] fix some flakiness --- runtime/src/leader_bank_status.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_status.rs index 02befe0e6c71e0..5fd2af1d3dd319 100644 --- a/runtime/src/leader_bank_status.rs +++ b/runtime/src/leader_bank_status.rs @@ -45,6 +45,8 @@ impl LeaderBankStatus { *status = Status::InProgress; *self.bank.write().unwrap() = Some((bank.slot(), Arc::downgrade(bank))); + drop(status); + self.condvar.notify_all(); } @@ -64,6 +66,8 @@ impl LeaderBankStatus { } *status = Status::Completed; + drop(status); + self.condvar.notify_all(); } @@ -117,8 +121,8 @@ mod tests { let jh = std::thread::spawn(move || { let _weak_bank = leader_bank_status2.wait_for_in_progress(Duration::from_secs(1)); }); + std::thread::sleep(Duration::from_millis(10)); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); - leader_bank_status.set_completed(1); jh.join().unwrap(); } @@ -129,9 +133,11 @@ mod tests { let leader_bank_status2 = leader_bank_status.clone(); let jh = std::thread::spawn(move || { - let _weak_bank = leader_bank_status2.wait_for_next_completed(Duration::from_secs(1)); + let _slot = leader_bank_status2.wait_for_next_completed(Duration::from_secs(1)); }); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + std::thread::sleep(Duration::from_millis(10)); + leader_bank_status.set_completed(1); jh.join().unwrap(); } From cb5524c3b1998cdd5e213659111e8efe4c89de4f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 15 Mar 2023 17:56:28 +0000 Subject: [PATCH 04/36] LeaderBankStatus: Additional tests, bugfix, and comment clarification --- runtime/src/leader_bank_status.rs | 39 ++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_status.rs index 5fd2af1d3dd319..1802d643d9af5f 100644 --- a/runtime/src/leader_bank_status.rs +++ b/runtime/src/leader_bank_status.rs @@ -71,25 +71,26 @@ impl LeaderBankStatus { self.condvar.notify_all(); } - /// Return weak bank reference or wait for notification for an in progress bank. + /// If the status is `InProgress`, immediately return a weak reference to the bank. + /// Otherwise, wait up to the `timeout` for the status to become `InProgress`. + /// Returns `None` if the timeout is reached. pub fn wait_for_in_progress(&self, timeout: Duration) -> Option> { let status = self.status.lock().unwrap(); // Hold status lock until after the weak bank reference is cloned. - let status = self + let (_status, wait_timeout_result) = self .condvar .wait_timeout_while(status, timeout, |status| { matches!(*status, Status::Uninitialized | Status::Completed) }) .unwrap(); - let bank = self.bank.read().unwrap().as_ref().unwrap().1.clone(); - drop(status); - Some(bank) + (!wait_timeout_result.timed_out()) + .then(|| self.bank.read().unwrap().as_ref().unwrap().1.clone()) } - /// Wait for next notification for a completed slot. - /// Returns None if the timeout is reached + /// Wait for next notification for a completed leader slot. + /// Returns `None` if the timeout is reached pub fn wait_for_next_completed(&self, mut timeout: Duration) -> Option { loop { let start = Instant::now(); @@ -128,7 +129,18 @@ mod tests { } #[test] - fn test_leader_bank_status_wait_for_completed() { + fn test_leader_bank_status_wait_for_in_progress_timeout() { + let leader_bank_status = Arc::new(LeaderBankStatus::default()); + leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_status.set_completed(1); + + assert!(leader_bank_status + .wait_for_in_progress(Duration::from_millis(1)) + .is_none()); + } + + #[test] + fn test_leader_bank_status_wait_for_next_completed() { let leader_bank_status = Arc::new(LeaderBankStatus::default()); let leader_bank_status2 = leader_bank_status.clone(); @@ -141,4 +153,15 @@ mod tests { jh.join().unwrap(); } + + #[test] + fn test_leader_bank_status_wait_for_next_completed_timeout() { + let leader_bank_status = Arc::new(LeaderBankStatus::default()); + + leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + assert_eq!( + leader_bank_status.wait_for_next_completed(Duration::from_millis(1)), + None + ); + } } From fb9dbc6ccb307621a19d92dbac634b405d651346 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:24:35 +0000 Subject: [PATCH 05/36] Restructuring of working_bank taking and setting complete --- poh/src/poh_recorder.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 0442fbda628d9b..059b42d3c23169 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -311,8 +311,7 @@ pub struct PohRecorder { impl PohRecorder { fn clear_bank(&mut self) { - if let Some(working_bank) = self.working_bank.take() { - let bank = working_bank.bank; + if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() { self.leader_bank_status.set_completed(bank.slot()); let next_leader_slot = self.leader_schedule_cache.next_leader_slot( &self.id, @@ -335,7 +334,7 @@ impl PohRecorder { datapoint_info!( "leader-slot-start-to-cleared-elapsed-ms", ("slot", bank.slot(), i64), - ("elapsed", working_bank.start.elapsed().as_millis(), i64), + ("elapsed", start.elapsed().as_millis(), i64), ); } From df7d433846ca981b7e06a40dd12c4d7a9dfb2afd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:30:32 +0000 Subject: [PATCH 06/36] Use Arc::default --- poh/src/poh_recorder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 059b42d3c23169..c77b72f36a1bc3 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -966,7 +966,7 @@ impl PohRecorder { ticks_from_record: 0, last_metric: Instant::now(), record_sender, - leader_bank_status: Arc::new(LeaderBankStatus::default()), + leader_bank_status: Arc::default(), is_exited, }, receiver, From 913afe511415582b10ac0f3d965832d012b0751c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:36:39 +0000 Subject: [PATCH 07/36] Remove Uninitialized variant, clarify comments --- runtime/src/leader_bank_status.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_status.rs index 1802d643d9af5f..3b60d897021e9f 100644 --- a/runtime/src/leader_bank_status.rs +++ b/runtime/src/leader_bank_status.rs @@ -8,8 +8,8 @@ use { }; /// Tracks leader status of the validator node and notifies when: -/// 1. A leader slot begins -/// 2. A leader slot completes +/// 1. A leader bank initiates (=PoH-initiated) +/// 2. A leader slot completes (=PoH-completed) #[derive(Debug, Default)] pub struct LeaderBankStatus { /// Current state of the system @@ -20,18 +20,14 @@ pub struct LeaderBankStatus { condvar: Condvar, } -/// Leader status state machine for the validator: -/// [Unininitialized] -> [InProgress] -> [Completed] --| -/// ^-------------------------| +/// Leader status state machine for the validator. #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] enum Status { - /// Initial state, no bank, but also not completed yet + /// The leader bank is not currently available. Either not initialized, or PoH-completed bank. #[default] - Uninitialized, - /// Slot is in progress as leader + StandBy, + /// PoH-initiated bank is available. InProgress, - /// PoH has reached the end of the slot, and the next bank as leader is not available yet - Completed, } impl LeaderBankStatus { @@ -50,12 +46,12 @@ impl LeaderBankStatus { self.condvar.notify_all(); } - /// Set the status to `Completed` and notify any waiting threads - /// if the status was not already `Completed` + /// Set the status to `StandBy` and notify any waiting threads + /// if the status was not already `StandBy` /// and the slot is higher than the current slot (sanity check). pub fn set_completed(&self, slot: Slot) { let mut status = self.status.lock().unwrap(); - if matches!(*status, Status::Completed) { + if matches!(*status, Status::StandBy) { return; } @@ -65,7 +61,7 @@ impl LeaderBankStatus { } } - *status = Status::Completed; + *status = Status::StandBy; drop(status); self.condvar.notify_all(); @@ -80,9 +76,7 @@ impl LeaderBankStatus { // Hold status lock until after the weak bank reference is cloned. let (_status, wait_timeout_result) = self .condvar - .wait_timeout_while(status, timeout, |status| { - matches!(*status, Status::Uninitialized | Status::Completed) - }) + .wait_timeout_while(status, timeout, |status| matches!(*status, Status::StandBy)) .unwrap(); (!wait_timeout_result.timed_out()) @@ -100,7 +94,7 @@ impl LeaderBankStatus { return None; } - if matches!(*status, Status::Completed) { + if matches!(*status, Status::StandBy) { let slot = self.bank.read().unwrap().as_ref().unwrap().0; return Some(slot); } From 011b508c92665fc6656d60e4a1e0efdb43d06cb1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:38:08 +0000 Subject: [PATCH 08/36] Rename LeaderBankStatus -> LeaderBankNotifier --- core/src/replay_stage.rs | 4 ++-- poh/src/poh_recorder.rs | 8 ++++---- ...leader_bank_status.rs => leader_bank_notifier.rs} | 12 ++++++------ runtime/src/lib.rs | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) rename runtime/src/{leader_bank_status.rs => leader_bank_notifier.rs} (93%) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 8e1e9f53f5efa1..1c272bfe0566ac 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -58,7 +58,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, - leader_bank_status::LeaderBankStatus, + leader_bank_notifier::LeaderBankNotifier, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, @@ -1680,7 +1680,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, banking_tracer: &Arc, - leader_bank_status: &LeaderBankStatus, + leader_bank_status: &LeaderBankNotifier, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index c77b72f36a1bc3..781025818b30d2 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -26,7 +26,7 @@ use { }, solana_measure::{measure, measure_us}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, - solana_runtime::{bank::Bank, leader_bank_status::LeaderBankStatus}, + solana_runtime::{bank::Bank, leader_bank_notifier::LeaderBankNotifier}, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, saturating_add_assign, transaction::VersionedTransaction, @@ -142,14 +142,14 @@ pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, pub is_exited: Arc, - leader_bank_status: Arc, + leader_bank_status: Arc, } impl TransactionRecorder { pub fn new( record_sender: Sender, is_exited: Arc, - leader_bank_status: Arc, + leader_bank_status: Arc, ) -> Self { Self { record_sender, @@ -305,7 +305,7 @@ pub struct PohRecorder { ticks_from_record: u64, last_metric: Instant, record_sender: Sender, - pub leader_bank_status: Arc, + pub leader_bank_status: Arc, pub is_exited: Arc, } diff --git a/runtime/src/leader_bank_status.rs b/runtime/src/leader_bank_notifier.rs similarity index 93% rename from runtime/src/leader_bank_status.rs rename to runtime/src/leader_bank_notifier.rs index 3b60d897021e9f..13cf273a9f3ab4 100644 --- a/runtime/src/leader_bank_status.rs +++ b/runtime/src/leader_bank_notifier.rs @@ -11,7 +11,7 @@ use { /// 1. A leader bank initiates (=PoH-initiated) /// 2. A leader slot completes (=PoH-completed) #[derive(Debug, Default)] -pub struct LeaderBankStatus { +pub struct LeaderBankNotifier { /// Current state of the system status: Mutex, /// Weak reference to the current bank @@ -30,7 +30,7 @@ enum Status { InProgress, } -impl LeaderBankStatus { +impl LeaderBankNotifier { /// Set the status to `InProgress` and notify any waiting threads /// if the status was not already `InProgress`. pub fn set_in_progress(&self, bank: &Arc) { @@ -110,7 +110,7 @@ mod tests { #[test] fn test_leader_bank_status_wait_for_in_progress() { - let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status = Arc::new(LeaderBankNotifier::default()); let leader_bank_status2 = leader_bank_status.clone(); let jh = std::thread::spawn(move || { @@ -124,7 +124,7 @@ mod tests { #[test] fn test_leader_bank_status_wait_for_in_progress_timeout() { - let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status = Arc::new(LeaderBankNotifier::default()); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); leader_bank_status.set_completed(1); @@ -135,7 +135,7 @@ mod tests { #[test] fn test_leader_bank_status_wait_for_next_completed() { - let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status = Arc::new(LeaderBankNotifier::default()); let leader_bank_status2 = leader_bank_status.clone(); let jh = std::thread::spawn(move || { @@ -150,7 +150,7 @@ mod tests { #[test] fn test_leader_bank_status_wait_for_next_completed_timeout() { - let leader_bank_status = Arc::new(LeaderBankStatus::default()); + let leader_bank_status = Arc::new(LeaderBankNotifier::default()); leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); assert_eq!( diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index dfc1ccd94a1264..fbffea25a2afb1 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -45,7 +45,7 @@ pub mod in_mem_accounts_index; pub mod inline_spl_associated_token_account; pub mod inline_spl_token; pub mod inline_spl_token_2022; -pub mod leader_bank_status; +pub mod leader_bank_notifier; pub mod loader_utils; pub mod message_processor; pub mod non_circulating_supply; From b985c5781489bb8828dbb347eb79e75252281c57 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:39:10 +0000 Subject: [PATCH 09/36] Clarifying commment --- runtime/src/leader_bank_notifier.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/src/leader_bank_notifier.rs b/runtime/src/leader_bank_notifier.rs index 13cf273a9f3ab4..1a232543618b25 100644 --- a/runtime/src/leader_bank_notifier.rs +++ b/runtime/src/leader_bank_notifier.rs @@ -46,9 +46,9 @@ impl LeaderBankNotifier { self.condvar.notify_all(); } - /// Set the status to `StandBy` and notify any waiting threads - /// if the status was not already `StandBy` - /// and the slot is higher than the current slot (sanity check). + /// Set the status to `StandBy` and notify any waiting threads if + /// 1. the status was not already `StandBy` and + /// 2. the slot is higher than the current slot (sanity check). pub fn set_completed(&self, slot: Slot) { let mut status = self.status.lock().unwrap(); if matches!(*status, Status::StandBy) { From a50ae28221d44a4a31e543caedc775ef4253d796 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 20:45:27 +0000 Subject: [PATCH 10/36] timeout -> remaining_timeout --- runtime/src/leader_bank_notifier.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runtime/src/leader_bank_notifier.rs b/runtime/src/leader_bank_notifier.rs index 1a232543618b25..655d45f3b89eb6 100644 --- a/runtime/src/leader_bank_notifier.rs +++ b/runtime/src/leader_bank_notifier.rs @@ -85,11 +85,14 @@ impl LeaderBankNotifier { /// Wait for next notification for a completed leader slot. /// Returns `None` if the timeout is reached - pub fn wait_for_next_completed(&self, mut timeout: Duration) -> Option { + pub fn wait_for_next_completed(&self, mut remaining_timeout: Duration) -> Option { loop { let start = Instant::now(); let status = self.status.lock().unwrap(); - let (status, result) = self.condvar.wait_timeout(status, timeout).unwrap(); + let (status, result) = self + .condvar + .wait_timeout(status, remaining_timeout) + .unwrap(); if result.timed_out() { return None; } @@ -99,7 +102,7 @@ impl LeaderBankNotifier { return Some(slot); } - timeout = timeout.saturating_sub(start.elapsed()); + remaining_timeout = remaining_timeout.saturating_sub(start.elapsed()); } } } From 03ec4e6f9cc3b1c317db6e842200fe3bfbf5d1be Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 21:14:22 +0000 Subject: [PATCH 11/36] Single mutex for internal state --- runtime/src/leader_bank_notifier.rs | 67 ++++++++++++++--------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/runtime/src/leader_bank_notifier.rs b/runtime/src/leader_bank_notifier.rs index 655d45f3b89eb6..37dd562d6644ae 100644 --- a/runtime/src/leader_bank_notifier.rs +++ b/runtime/src/leader_bank_notifier.rs @@ -2,7 +2,7 @@ use { crate::bank::Bank, solana_sdk::slot_history::Slot, std::{ - sync::{Arc, Condvar, Mutex, RwLock, Weak}, + sync::{Arc, Condvar, Mutex, Weak}, time::{Duration, Instant}, }, }; @@ -12,10 +12,8 @@ use { /// 2. A leader slot completes (=PoH-completed) #[derive(Debug, Default)] pub struct LeaderBankNotifier { - /// Current state of the system - status: Mutex, - /// Weak reference to the current bank - bank: RwLock)>>, + /// Current state (slot, bank, and status) of the system + state: Mutex, /// CondVar to notify status changes and waiting condvar: Condvar, } @@ -30,18 +28,28 @@ enum Status { InProgress, } +#[derive(Debug, Default)] +struct SlotAndBankWithStatus { + status: Status, + slot: Slot, + bank: Weak, +} + impl LeaderBankNotifier { /// Set the status to `InProgress` and notify any waiting threads /// if the status was not already `InProgress`. pub fn set_in_progress(&self, bank: &Arc) { - let mut status = self.status.lock().unwrap(); - if matches!(*status, Status::InProgress) { + let mut state = self.state.lock().unwrap(); + if matches!(state.status, Status::InProgress) { return; } - *status = Status::InProgress; - *self.bank.write().unwrap() = Some((bank.slot(), Arc::downgrade(bank))); - drop(status); + *state = SlotAndBankWithStatus { + status: Status::InProgress, + slot: bank.slot(), + bank: Arc::downgrade(bank), + }; + drop(state); self.condvar.notify_all(); } @@ -50,19 +58,13 @@ impl LeaderBankNotifier { /// 1. the status was not already `StandBy` and /// 2. the slot is higher than the current slot (sanity check). pub fn set_completed(&self, slot: Slot) { - let mut status = self.status.lock().unwrap(); - if matches!(*status, Status::StandBy) { + let mut state = self.state.lock().unwrap(); + if matches!(state.status, Status::StandBy) || slot < state.slot { return; } - if let Some((current_slot, _)) = *self.bank.read().unwrap() { - if slot < current_slot { - return; - } - } - - *status = Status::StandBy; - drop(status); + state.status = Status::StandBy; + drop(state); self.condvar.notify_all(); } @@ -71,16 +73,15 @@ impl LeaderBankNotifier { /// Otherwise, wait up to the `timeout` for the status to become `InProgress`. /// Returns `None` if the timeout is reached. pub fn wait_for_in_progress(&self, timeout: Duration) -> Option> { - let status = self.status.lock().unwrap(); - - // Hold status lock until after the weak bank reference is cloned. - let (_status, wait_timeout_result) = self + let state = self.state.lock().unwrap(); + let (state, wait_timeout_result) = self .condvar - .wait_timeout_while(status, timeout, |status| matches!(*status, Status::StandBy)) + .wait_timeout_while(state, timeout, |state| { + matches!(state.status, Status::StandBy) + }) .unwrap(); - (!wait_timeout_result.timed_out()) - .then(|| self.bank.read().unwrap().as_ref().unwrap().1.clone()) + (!wait_timeout_result.timed_out()).then(|| state.bank.clone()) } /// Wait for next notification for a completed leader slot. @@ -88,18 +89,14 @@ impl LeaderBankNotifier { pub fn wait_for_next_completed(&self, mut remaining_timeout: Duration) -> Option { loop { let start = Instant::now(); - let status = self.status.lock().unwrap(); - let (status, result) = self - .condvar - .wait_timeout(status, remaining_timeout) - .unwrap(); + let state = self.state.lock().unwrap(); + let (state, result) = self.condvar.wait_timeout(state, remaining_timeout).unwrap(); if result.timed_out() { return None; } - if matches!(*status, Status::StandBy) { - let slot = self.bank.read().unwrap().as_ref().unwrap().0; - return Some(slot); + if matches!(state.status, Status::StandBy) { + return Some(state.slot); } remaining_timeout = remaining_timeout.saturating_sub(start.elapsed()); From 891f3ebd717d335d7ca88df38d34eb8e9199713d Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 21:21:00 +0000 Subject: [PATCH 12/36] Additional renames --- core/src/replay_stage.rs | 8 +++--- poh/src/poh_recorder.rs | 16 ++++++------ runtime/src/leader_bank_notifier.rs | 40 ++++++++++++++--------------- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1c272bfe0566ac..c5fa1e09895ff4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -482,7 +482,7 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; - let leader_bank_status = poh_recorder.read().unwrap().leader_bank_status.clone(); + let leader_bank_notifier = poh_recorder.read().unwrap().leader_bank_notifier.clone(); Self::reset_poh_recorder( &my_pubkey, @@ -950,7 +950,7 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, &banking_tracer, - &leader_bank_status, + &leader_bank_notifier, has_new_vote_been_rooted, transaction_status_sender.is_some(), ); @@ -1680,7 +1680,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, banking_tracer: &Arc, - leader_bank_status: &LeaderBankNotifier, + leader_bank_notifier: &LeaderBankNotifier, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { @@ -1806,7 +1806,7 @@ impl ReplayStage { banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash()); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); - leader_bank_status.set_in_progress(&tpu_bank); + leader_bank_notifier.set_in_progress(&tpu_bank); poh_recorder .write() .unwrap() diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 781025818b30d2..2b7273c3e9c431 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -142,19 +142,19 @@ pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, pub is_exited: Arc, - leader_bank_status: Arc, + leader_bank_notifier: Arc, } impl TransactionRecorder { pub fn new( record_sender: Sender, is_exited: Arc, - leader_bank_status: Arc, + leader_bank_notifier: Arc, ) -> Self { Self { record_sender, is_exited, - leader_bank_status, + leader_bank_notifier, } } @@ -180,7 +180,7 @@ impl TransactionRecorder { starting_transaction_index = starting_index; } Err(PohRecorderError::MaxHeightReached) => { - self.leader_bank_status.set_completed(bank_slot); + self.leader_bank_notifier.set_completed(bank_slot); return RecordTransactionsSummary { record_transactions_timings, result: Err(PohRecorderError::MaxHeightReached), @@ -305,14 +305,14 @@ pub struct PohRecorder { ticks_from_record: u64, last_metric: Instant, record_sender: Sender, - pub leader_bank_status: Arc, + pub leader_bank_notifier: Arc, pub is_exited: Arc, } impl PohRecorder { fn clear_bank(&mut self) { if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() { - self.leader_bank_status.set_completed(bank.slot()); + self.leader_bank_notifier.set_completed(bank.slot()); let next_leader_slot = self.leader_schedule_cache.next_leader_slot( &self.id, bank.slot(), @@ -423,7 +423,7 @@ impl PohRecorder { TransactionRecorder::new( self.record_sender.clone(), self.is_exited.clone(), - self.leader_bank_status.clone(), + self.leader_bank_notifier.clone(), ) } @@ -966,7 +966,7 @@ impl PohRecorder { ticks_from_record: 0, last_metric: Instant::now(), record_sender, - leader_bank_status: Arc::default(), + leader_bank_notifier: Arc::default(), is_exited, }, receiver, diff --git a/runtime/src/leader_bank_notifier.rs b/runtime/src/leader_bank_notifier.rs index 37dd562d6644ae..950b5312f3b886 100644 --- a/runtime/src/leader_bank_notifier.rs +++ b/runtime/src/leader_bank_notifier.rs @@ -109,52 +109,52 @@ mod tests { use super::*; #[test] - fn test_leader_bank_status_wait_for_in_progress() { - let leader_bank_status = Arc::new(LeaderBankNotifier::default()); - let leader_bank_status2 = leader_bank_status.clone(); + fn test_leader_bank_notifier_wait_for_in_progress() { + let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + let leader_bank_notifier2 = leader_bank_notifier.clone(); let jh = std::thread::spawn(move || { - let _weak_bank = leader_bank_status2.wait_for_in_progress(Duration::from_secs(1)); + let _weak_bank = leader_bank_notifier2.wait_for_in_progress(Duration::from_secs(1)); }); std::thread::sleep(Duration::from_millis(10)); - leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); jh.join().unwrap(); } #[test] - fn test_leader_bank_status_wait_for_in_progress_timeout() { - let leader_bank_status = Arc::new(LeaderBankNotifier::default()); - leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); - leader_bank_status.set_completed(1); + fn test_leader_bank_notifier_wait_for_in_progress_timeout() { + let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_notifier.set_completed(1); - assert!(leader_bank_status + assert!(leader_bank_notifier .wait_for_in_progress(Duration::from_millis(1)) .is_none()); } #[test] - fn test_leader_bank_status_wait_for_next_completed() { - let leader_bank_status = Arc::new(LeaderBankNotifier::default()); - let leader_bank_status2 = leader_bank_status.clone(); + fn test_leader_bank_notifier_wait_for_next_completed() { + let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + let leader_bank_notifier2 = leader_bank_notifier.clone(); let jh = std::thread::spawn(move || { - let _slot = leader_bank_status2.wait_for_next_completed(Duration::from_secs(1)); + let _slot = leader_bank_notifier2.wait_for_next_completed(Duration::from_secs(1)); }); - leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); std::thread::sleep(Duration::from_millis(10)); - leader_bank_status.set_completed(1); + leader_bank_notifier.set_completed(1); jh.join().unwrap(); } #[test] - fn test_leader_bank_status_wait_for_next_completed_timeout() { - let leader_bank_status = Arc::new(LeaderBankNotifier::default()); + fn test_leader_bank_notifier_wait_for_next_completed_timeout() { + let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); - leader_bank_status.set_in_progress(&Arc::new(Bank::default_for_tests())); + leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); assert_eq!( - leader_bank_status.wait_for_next_completed(Duration::from_millis(1)), + leader_bank_notifier.wait_for_next_completed(Duration::from_millis(1)), None ); } From 91b4e8f448e800ce6f20aca553405d30cc2d2a8a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 21 Mar 2023 21:25:27 +0000 Subject: [PATCH 13/36] new_leader_bank_notifier --- core/src/replay_stage.rs | 2 +- poh/src/poh_recorder.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c5fa1e09895ff4..d887790dd7c2e7 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -482,7 +482,7 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; - let leader_bank_notifier = poh_recorder.read().unwrap().leader_bank_notifier.clone(); + let leader_bank_notifier = poh_recorder.read().unwrap().new_leader_bank_notifier(); Self::reset_poh_recorder( &my_pubkey, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 2b7273c3e9c431..bae0007c80ef4f 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -305,7 +305,7 @@ pub struct PohRecorder { ticks_from_record: u64, last_metric: Instant, record_sender: Sender, - pub leader_bank_notifier: Arc, + leader_bank_notifier: Arc, pub is_exited: Arc, } @@ -427,6 +427,10 @@ impl PohRecorder { ) } + pub fn new_leader_bank_notifier(&self) -> Arc { + self.leader_bank_notifier.clone() + } + fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool { (slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..slot).any(|slot| { // Check if the last slot Poh reset to was any of the From 48a747ba2f2ccb97c537a8495598d52d91a4661f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:10:08 +0000 Subject: [PATCH 14/36] Move leader_bank_notifier into poh --- core/src/replay_stage.rs | 6 ++++-- {runtime => poh}/src/leader_bank_notifier.rs | 2 +- poh/src/lib.rs | 1 + poh/src/poh_recorder.rs | 4 ++-- runtime/src/lib.rs | 1 - 5 files changed, 8 insertions(+), 6 deletions(-) rename {runtime => poh}/src/leader_bank_notifier.rs (99%) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d887790dd7c2e7..d058c4783942e6 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -46,7 +46,10 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_info, - solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, + solana_poh::{ + leader_bank_notifier::LeaderBankNotifier, + poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, + }, solana_program_runtime::timings::ExecuteTimings, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -58,7 +61,6 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, - leader_bank_notifier::LeaderBankNotifier, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, diff --git a/runtime/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs similarity index 99% rename from runtime/src/leader_bank_notifier.rs rename to poh/src/leader_bank_notifier.rs index 950b5312f3b886..5c5c3fc570a147 100644 --- a/runtime/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -1,5 +1,5 @@ use { - crate::bank::Bank, + solana_runtime::bank::Bank, solana_sdk::slot_history::Slot, std::{ sync::{Arc, Condvar, Mutex, Weak}, diff --git a/poh/src/lib.rs b/poh/src/lib.rs index b3980690cbe8df..017d9889c0e901 100644 --- a/poh/src/lib.rs +++ b/poh/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] +pub mod leader_bank_notifier; pub mod poh_recorder; pub mod poh_service; diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index bae0007c80ef4f..b07313a63de23b 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -12,7 +12,7 @@ //! pub use solana_sdk::clock::Slot; use { - crate::poh_service::PohService, + crate::{leader_bank_notifier::LeaderBankNotifier, poh_service::PohService}, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, log::*, solana_entry::{ @@ -26,7 +26,7 @@ use { }, solana_measure::{measure, measure_us}, solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, - solana_runtime::{bank::Bank, leader_bank_notifier::LeaderBankNotifier}, + solana_runtime::bank::Bank, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, saturating_add_assign, transaction::VersionedTransaction, diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index fbffea25a2afb1..8a5bf2b0511af6 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -45,7 +45,6 @@ pub mod in_mem_accounts_index; pub mod inline_spl_associated_token_account; pub mod inline_spl_token; pub mod inline_spl_token_2022; -pub mod leader_bank_notifier; pub mod loader_utils; pub mod message_processor; pub mod non_circulating_supply; From 22454f6e5e8796b9ddfbf6b9581e6c2f102c1ef7 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:13:27 +0000 Subject: [PATCH 15/36] waiters named with qualifying verbs instead of noun --- poh/src/leader_bank_notifier.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 5c5c3fc570a147..65cc793818a6d6 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -72,7 +72,7 @@ impl LeaderBankNotifier { /// If the status is `InProgress`, immediately return a weak reference to the bank. /// Otherwise, wait up to the `timeout` for the status to become `InProgress`. /// Returns `None` if the timeout is reached. - pub fn wait_for_in_progress(&self, timeout: Duration) -> Option> { + pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Option> { let state = self.state.lock().unwrap(); let (state, wait_timeout_result) = self .condvar @@ -86,7 +86,7 @@ impl LeaderBankNotifier { /// Wait for next notification for a completed leader slot. /// Returns `None` if the timeout is reached - pub fn wait_for_next_completed(&self, mut remaining_timeout: Duration) -> Option { + pub fn wait_for_completed(&self, mut remaining_timeout: Duration) -> Option { loop { let start = Instant::now(); let state = self.state.lock().unwrap(); @@ -109,12 +109,13 @@ mod tests { use super::*; #[test] - fn test_leader_bank_notifier_wait_for_in_progress() { + fn test_leader_bank_notifier_get_or_wait_for_in_progress() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); let leader_bank_notifier2 = leader_bank_notifier.clone(); let jh = std::thread::spawn(move || { - let _weak_bank = leader_bank_notifier2.wait_for_in_progress(Duration::from_secs(1)); + let _weak_bank = + leader_bank_notifier2.get_or_wait_for_in_progress(Duration::from_secs(1)); }); std::thread::sleep(Duration::from_millis(10)); leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); @@ -123,23 +124,23 @@ mod tests { } #[test] - fn test_leader_bank_notifier_wait_for_in_progress_timeout() { + fn test_leader_bank_notifier_get_or_wait_for_in_progress_timeout() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); leader_bank_notifier.set_completed(1); assert!(leader_bank_notifier - .wait_for_in_progress(Duration::from_millis(1)) + .get_or_wait_for_in_progress(Duration::from_millis(1)) .is_none()); } #[test] - fn test_leader_bank_notifier_wait_for_next_completed() { + fn test_leader_bank_notifier_wait_for_completed() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); let leader_bank_notifier2 = leader_bank_notifier.clone(); let jh = std::thread::spawn(move || { - let _slot = leader_bank_notifier2.wait_for_next_completed(Duration::from_secs(1)); + let _slot = leader_bank_notifier2.wait_for_completed(Duration::from_secs(1)); }); leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); std::thread::sleep(Duration::from_millis(10)); @@ -149,12 +150,12 @@ mod tests { } #[test] - fn test_leader_bank_notifier_wait_for_next_completed_timeout() { + fn test_leader_bank_notifier_wait_for_completed_timeout() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); assert_eq!( - leader_bank_notifier.wait_for_next_completed(Duration::from_millis(1)), + leader_bank_notifier.wait_for_completed(Duration::from_millis(1)), None ); } From 30c1488d9d52eed822323d39ff121002933de9be Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:18:59 +0000 Subject: [PATCH 16/36] return Weak::new on timeout --- poh/src/leader_bank_notifier.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 65cc793818a6d6..95fd4fdcac8d27 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -71,8 +71,8 @@ impl LeaderBankNotifier { /// If the status is `InProgress`, immediately return a weak reference to the bank. /// Otherwise, wait up to the `timeout` for the status to become `InProgress`. - /// Returns `None` if the timeout is reached. - pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Option> { + /// If the timeout is reached, the weak reference is unupgradable. + pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak { let state = self.state.lock().unwrap(); let (state, wait_timeout_result) = self .condvar @@ -81,7 +81,9 @@ impl LeaderBankNotifier { }) .unwrap(); - (!wait_timeout_result.timed_out()).then(|| state.bank.clone()) + (!wait_timeout_result.timed_out()) + .then(|| state.bank.clone()) + .unwrap_or_else(Weak::new) } /// Wait for next notification for a completed leader slot. @@ -131,6 +133,7 @@ mod tests { assert!(leader_bank_notifier .get_or_wait_for_in_progress(Duration::from_millis(1)) + .upgrade() .is_none()); } From 8261ab6de586b3889c50898ba3100f71cae4747e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:21:20 +0000 Subject: [PATCH 17/36] Use else-if --- poh/src/leader_bank_notifier.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 95fd4fdcac8d27..e082c395f24cbe 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -95,9 +95,7 @@ impl LeaderBankNotifier { let (state, result) = self.condvar.wait_timeout(state, remaining_timeout).unwrap(); if result.timed_out() { return None; - } - - if matches!(state.status, Status::StandBy) { + } else if matches!(state.status, Status::StandBy) { return Some(state.slot); } From 80fd964b49649c919113d6c67e9823f2d4c7b175 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:27:25 +0000 Subject: [PATCH 18/36] set_in_progress panics if not in StandBy --- poh/src/leader_bank_notifier.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index e082c395f24cbe..650ca3521e420a 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -36,13 +36,12 @@ struct SlotAndBankWithStatus { } impl LeaderBankNotifier { - /// Set the status to `InProgress` and notify any waiting threads - /// if the status was not already `InProgress`. + /// Set the status to `InProgress` and notify any waiting threads. + /// Panics if the status is not `StandBy` - cannot have multiple + /// leader banks in progress. pub fn set_in_progress(&self, bank: &Arc) { let mut state = self.state.lock().unwrap(); - if matches!(state.status, Status::InProgress) { - return; - } + assert_eq!(state.status, Status::StandBy); *state = SlotAndBankWithStatus { status: Status::InProgress, From bb1791ad0569903f731a4a7bfba8872f686f3d78 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 17:37:21 +0000 Subject: [PATCH 19/36] set_completed panics on a slot mismatch --- poh/src/leader_bank_notifier.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 650ca3521e420a..dbd1809d6ea097 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -53,12 +53,15 @@ impl LeaderBankNotifier { self.condvar.notify_all(); } - /// Set the status to `StandBy` and notify any waiting threads if - /// 1. the status was not already `StandBy` and - /// 2. the slot is higher than the current slot (sanity check). + /// Set the status to `StandBy` and notify any waiting threads if the status was changed. + /// Panics if the stored slot does not match the given slot. pub fn set_completed(&self, slot: Slot) { let mut state = self.state.lock().unwrap(); - if matches!(state.status, Status::StandBy) || slot < state.slot { + assert_eq!(state.slot, slot); + + // `set_completed` can be called multiple times for the same slot because it + // may be called from multiple threads. + if matches!(state.status, Status::StandBy) { return; } From 8b924cbf5a021e6f9e8005ee96ca71510db0cde1 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 19:42:42 +0000 Subject: [PATCH 20/36] optional slot --- poh/src/leader_bank_notifier.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index dbd1809d6ea097..db52d767299fcf 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -31,7 +31,7 @@ enum Status { #[derive(Debug, Default)] struct SlotAndBankWithStatus { status: Status, - slot: Slot, + slot: Option, bank: Weak, } @@ -45,7 +45,7 @@ impl LeaderBankNotifier { *state = SlotAndBankWithStatus { status: Status::InProgress, - slot: bank.slot(), + slot: Some(bank.slot()), bank: Arc::downgrade(bank), }; drop(state); @@ -57,7 +57,7 @@ impl LeaderBankNotifier { /// Panics if the stored slot does not match the given slot. pub fn set_completed(&self, slot: Slot) { let mut state = self.state.lock().unwrap(); - assert_eq!(state.slot, slot); + assert_eq!(state.slot, Some(slot)); // `set_completed` can be called multiple times for the same slot because it // may be called from multiple threads. @@ -98,7 +98,7 @@ impl LeaderBankNotifier { if result.timed_out() { return None; } else if matches!(state.status, Status::StandBy) { - return Some(state.slot); + return state.slot; } remaining_timeout = remaining_timeout.saturating_sub(start.elapsed()); From 785b3b2f3662d2c0dfad6e7c25e7b86353906934 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 21:31:50 +0000 Subject: [PATCH 21/36] Fix tests --- poh/src/leader_bank_notifier.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index db52d767299fcf..ce68330193e2da 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -128,9 +128,18 @@ mod tests { #[test] fn test_leader_bank_notifier_get_or_wait_for_in_progress_timeout() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); - leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); - leader_bank_notifier.set_completed(1); + // Uninitialized + assert!(leader_bank_notifier + .get_or_wait_for_in_progress(Duration::from_millis(1)) + .upgrade() + .is_none()); + + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + leader_bank_notifier.set_completed(bank.slot()); + + // Completed assert!(leader_bank_notifier .get_or_wait_for_in_progress(Duration::from_millis(1)) .upgrade() @@ -145,9 +154,10 @@ mod tests { let jh = std::thread::spawn(move || { let _slot = leader_bank_notifier2.wait_for_completed(Duration::from_secs(1)); }); - leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); std::thread::sleep(Duration::from_millis(10)); - leader_bank_notifier.set_completed(1); + leader_bank_notifier.set_completed(bank.slot()); jh.join().unwrap(); } From 0f1258371c4a70e5d2724be679b8000783c8656f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 22:17:32 +0000 Subject: [PATCH 22/36] additional testing - actual immediate return --- poh/src/leader_bank_notifier.rs | 146 ++++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 25 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index ce68330193e2da..2d6ef327110bcf 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -76,6 +76,12 @@ impl LeaderBankNotifier { /// If the timeout is reached, the weak reference is unupgradable. pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak { let state = self.state.lock().unwrap(); + + // Immediately return if the status is `InProgress` + if matches!(state.status, Status::InProgress) { + return state.bank.clone(); + } + let (state, wait_timeout_result) = self .condvar .wait_timeout_while(state, timeout, |state| { @@ -111,23 +117,77 @@ mod tests { use super::*; #[test] - fn test_leader_bank_notifier_get_or_wait_for_in_progress() { - let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); - let leader_bank_notifier2 = leader_bank_notifier.clone(); + fn test_leader_bank_notifier_default() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!(state.status, Status::StandBy); + assert_eq!(state.slot, None); + assert!(state.bank.upgrade().is_none()); + } - let jh = std::thread::spawn(move || { - let _weak_bank = - leader_bank_notifier2.get_or_wait_for_in_progress(Duration::from_secs(1)); - }); - std::thread::sleep(Duration::from_millis(10)); - leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); + #[test] + #[should_panic] + fn test_leader_bank_notifier_set_in_progress_already_in_progress() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + leader_bank_notifier.set_in_progress(&bank); + } - jh.join().unwrap(); + #[test] + fn test_leader_bank_notifier_set_in_progress() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + + let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!(state.status, Status::InProgress); + assert_eq!(state.slot, Some(bank.slot())); + assert_eq!(state.bank.upgrade(), Some(bank)); + } + + #[test] + #[should_panic] + fn test_leader_bank_notifier_set_completed_uninitialized() { + let leader_bank_notifier = LeaderBankNotifier::default(); + leader_bank_notifier.set_completed(0); + } + + #[test] + #[should_panic] + fn test_leader_bank_notifier_set_completed_mismatched_in_progress_slot() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + leader_bank_notifier.set_completed(bank.slot() + 1); + } + + #[test] + #[should_panic] + fn test_leader_bank_notifier_set_completed_mismatched_completed_slot() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + leader_bank_notifier.set_completed(bank.slot()); + leader_bank_notifier.set_completed(bank.slot() + 1); + } + + #[test] + fn test_leader_bank_notifier_set_completed() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + leader_bank_notifier.set_completed(bank.slot()); + + let state = leader_bank_notifier.state.lock().unwrap(); + assert_eq!(state.status, Status::StandBy); + assert_eq!(state.slot, Some(bank.slot())); + assert_eq!(state.bank.upgrade(), Some(bank)); } #[test] fn test_leader_bank_notifier_get_or_wait_for_in_progress_timeout() { - let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + let leader_bank_notifier = LeaderBankNotifier::default(); // Uninitialized assert!(leader_bank_notifier @@ -147,29 +207,65 @@ mod tests { } #[test] - fn test_leader_bank_notifier_wait_for_completed() { - let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); - let leader_bank_notifier2 = leader_bank_notifier.clone(); + fn test_leader_bank_notifier_get_in_progress() { + let leader_bank_notifier = LeaderBankNotifier::default(); - let jh = std::thread::spawn(move || { - let _slot = leader_bank_notifier2.wait_for_completed(Duration::from_secs(1)); - }); let bank = Arc::new(Bank::default_for_tests()); leader_bank_notifier.set_in_progress(&bank); - std::thread::sleep(Duration::from_millis(10)); - leader_bank_notifier.set_completed(bank.slot()); + let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::ZERO); + assert!(weak_bank.upgrade().is_some()); + } + + #[test] + fn test_leader_bank_notifier_wait_for_in_progress() { + let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + let bank = Arc::new(Bank::default_for_tests()); + + // Need to spawn a separate thread so we wait for the condvar in `get_or_wait_for_in_progress` + let jh = std::thread::spawn({ + let leader_bank_notifier = leader_bank_notifier.clone(); + let bank = bank.clone(); + move || { + std::thread::sleep(Duration::from_millis(10)); + leader_bank_notifier.set_in_progress(&bank); + } + }); + + let weak_bank = leader_bank_notifier.get_or_wait_for_in_progress(Duration::from_secs(1)); + let upgraded_bank = weak_bank.upgrade().unwrap(); + assert_eq!(upgraded_bank.slot(), bank.slot()); jh.join().unwrap(); } #[test] - fn test_leader_bank_notifier_wait_for_completed_timeout() { + fn test_leader_bank_notifier_wait_for_completed() { let leader_bank_notifier = Arc::new(LeaderBankNotifier::default()); + let bank = Arc::new(Bank::default_for_tests()); - leader_bank_notifier.set_in_progress(&Arc::new(Bank::default_for_tests())); - assert_eq!( - leader_bank_notifier.wait_for_completed(Duration::from_millis(1)), - None - ); + let jh = std::thread::spawn({ + let leader_bank_notifier = leader_bank_notifier.clone(); + let bank = bank.clone(); + move || { + leader_bank_notifier.set_in_progress(&bank); + std::thread::sleep(Duration::from_millis(10)); + leader_bank_notifier.set_completed(bank.slot()); + } + }); + + let slot = leader_bank_notifier.wait_for_completed(Duration::from_secs(1)); + assert_eq!(slot, Some(bank.slot())); + + jh.join().unwrap(); + } + + #[test] + fn test_leader_bank_notifier_wait_for_completed_timeout() { + let leader_bank_notifier = LeaderBankNotifier::default(); + let bank = Arc::new(Bank::default_for_tests()); + leader_bank_notifier.set_in_progress(&bank); + assert!(leader_bank_notifier + .wait_for_completed(Duration::from_millis(1)) + .is_none()); } } From f350c0acc41522b436ebe6973fa868735da96544 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 22:23:37 +0000 Subject: [PATCH 23/36] Internal get_or_wait_for_in_progress_state --- poh/src/leader_bank_notifier.rs | 40 ++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 2d6ef327110bcf..981633a23d7104 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -2,7 +2,7 @@ use { solana_runtime::bank::Bank, solana_sdk::slot_history::Slot, std::{ - sync::{Arc, Condvar, Mutex, Weak}, + sync::{Arc, Condvar, Mutex, MutexGuard, Weak}, time::{Duration, Instant}, }, }; @@ -76,21 +76,8 @@ impl LeaderBankNotifier { /// If the timeout is reached, the weak reference is unupgradable. pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak { let state = self.state.lock().unwrap(); - - // Immediately return if the status is `InProgress` - if matches!(state.status, Status::InProgress) { - return state.bank.clone(); - } - - let (state, wait_timeout_result) = self - .condvar - .wait_timeout_while(state, timeout, |state| { - matches!(state.status, Status::StandBy) - }) - .unwrap(); - - (!wait_timeout_result.timed_out()) - .then(|| state.bank.clone()) + self.get_or_wait_for_in_progress_state(timeout, state) + .map(|state| state.bank.clone()) .unwrap_or_else(Weak::new) } @@ -110,6 +97,27 @@ impl LeaderBankNotifier { remaining_timeout = remaining_timeout.saturating_sub(start.elapsed()); } } + + // Helper function to get or wait for the `InProgress` status with a given `MutexGuard`. + fn get_or_wait_for_in_progress_state<'a>( + &self, + timeout: Duration, + state: MutexGuard<'a, SlotAndBankWithStatus>, + ) -> Option> { + // Immediately return if the status is `InProgress` + if matches!(state.status, Status::InProgress) { + return Some(state); + } + + let (state, wait_timeout_result) = self + .condvar + .wait_timeout_while(state, timeout, |state| { + matches!(state.status, Status::StandBy) + }) + .unwrap(); + + (!wait_timeout_result.timed_out()).then_some(state) + } } #[cfg(test)] From 4a7125a4edffd8bf2058a697ace183fee1a239cc Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 22 Mar 2023 22:33:19 +0000 Subject: [PATCH 24/36] wait_for_completed explicit checks for InProgress -> StandBy --- poh/src/leader_bank_notifier.rs | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 981633a23d7104..f0dcda5728d013 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -84,17 +84,27 @@ impl LeaderBankNotifier { /// Wait for next notification for a completed leader slot. /// Returns `None` if the timeout is reached pub fn wait_for_completed(&self, mut remaining_timeout: Duration) -> Option { - loop { - let start = Instant::now(); - let state = self.state.lock().unwrap(); - let (state, result) = self.condvar.wait_timeout(state, remaining_timeout).unwrap(); - if result.timed_out() { - return None; - } else if matches!(state.status, Status::StandBy) { - return state.slot; - } + let state = self.state.lock().unwrap(); + + // If currently `StandBy`, need to wait for `InProgress` to begin. + let now = Instant::now(); + let state = self.get_or_wait_for_in_progress_state(remaining_timeout, state); + + let Some(state) = state else { return None; }; + remaining_timeout = remaining_timeout.saturating_sub(now.elapsed()); + + // Wait for `StandBy` to be set. + let (state, wait_timeout_result) = self + .condvar + .wait_timeout_while(state, remaining_timeout, |state| { + matches!(state.status, Status::InProgress) + }) + .unwrap(); - remaining_timeout = remaining_timeout.saturating_sub(start.elapsed()); + if !wait_timeout_result.timed_out() { + state.slot + } else { + None } } From 90fab191b061c828dc841d7c729eb9202b0a7e6c Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 23 Mar 2023 20:06:25 +0000 Subject: [PATCH 25/36] revert some changes. only allow poh_recorder to set leader_bank state --- core/src/replay_stage.rs | 9 +-------- poh/src/poh_recorder.rs | 26 ++++++++++++-------------- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d058c4783942e6..e17772321c62da 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -46,10 +46,7 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_info, - solana_poh::{ - leader_bank_notifier::LeaderBankNotifier, - poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, - }, + solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, @@ -484,7 +481,6 @@ impl ReplayStage { r_bank_forks.get_vote_only_mode_signal(), ) }; - let leader_bank_notifier = poh_recorder.read().unwrap().new_leader_bank_notifier(); Self::reset_poh_recorder( &my_pubkey, @@ -952,7 +948,6 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, &banking_tracer, - &leader_bank_notifier, has_new_vote_been_rooted, transaction_status_sender.is_some(), ); @@ -1682,7 +1677,6 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, banking_tracer: &Arc, - leader_bank_notifier: &LeaderBankNotifier, has_new_vote_been_rooted: bool, track_transaction_indexes: bool, ) { @@ -1808,7 +1802,6 @@ impl ReplayStage { banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash()); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); - leader_bank_notifier.set_in_progress(&tpu_bank); poh_recorder .write() .unwrap() diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index b07313a63de23b..3a65f7be38589a 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -137,24 +137,25 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } -#[derive(Clone)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, pub is_exited: Arc, - leader_bank_notifier: Arc, +} + +impl Clone for TransactionRecorder { + fn clone(&self) -> Self { + TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone()) + } } impl TransactionRecorder { - pub fn new( - record_sender: Sender, - is_exited: Arc, - leader_bank_notifier: Arc, - ) -> Self { + pub fn new(record_sender: Sender, is_exited: Arc) -> Self { Self { + // shared record_sender, + // shared is_exited, - leader_bank_notifier, } } @@ -180,7 +181,6 @@ impl TransactionRecorder { starting_transaction_index = starting_index; } Err(PohRecorderError::MaxHeightReached) => { - self.leader_bank_notifier.set_completed(bank_slot); return RecordTransactionsSummary { record_transactions_timings, result: Err(PohRecorderError::MaxHeightReached), @@ -420,11 +420,7 @@ impl PohRecorder { } pub fn new_recorder(&self) -> TransactionRecorder { - TransactionRecorder::new( - self.record_sender.clone(), - self.is_exited.clone(), - self.leader_bank_notifier.clone(), - ) + TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone()) } pub fn new_leader_bank_notifier(&self) -> Arc { @@ -585,6 +581,8 @@ impl PohRecorder { } pub fn set_bank(&mut self, bank: &Arc, track_transaction_indexes: bool) { + assert!(self.working_bank.is_none()); + self.leader_bank_notifier.set_in_progress(bank); let working_bank = WorkingBank { bank: bank.clone(), start: Arc::new(Instant::now()), From 547b808e310f8afe8cb1c136710a2a2fcf7ddece Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 23 Mar 2023 22:43:44 +0000 Subject: [PATCH 26/36] Use reset instead of set since the bank is already set --- banking-bench/src/main.rs | 2 +- core/benches/banking_stage.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index cdfb4989c7d5c9..5cdad2a043f9af 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -454,7 +454,6 @@ fn main() { bank_forks.clone(), &Arc::new(PrioritizationFeeCache::new(0u64)), ); - poh_recorder.write().unwrap().set_bank(&bank, false); // This is so that the signal_receiver does not go out of scope after the closure. // If it is dropped before poh_service, then poh_service will error when @@ -538,6 +537,7 @@ fn main() { std::u64::MAX, ); + assert!(poh_recorder.read().unwrap().bank().is_none()); poh_recorder.write().unwrap().set_bank(&bank, false); assert!(poh_recorder.read().unwrap().bank().is_some()); debug!( diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index a35bd723a9764c..bcb887e70e09b1 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -292,7 +292,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { bank_forks, &Arc::new(PrioritizationFeeCache::new(0u64)), ); - poh_recorder.write().unwrap().set_bank(&bank, false); let chunk_len = verified.len() / CHUNKS; let mut start = 0; From 9ba7d928e14362b6e7e25c9cf278733cad71ff13 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:13:23 +0000 Subject: [PATCH 27/36] pub(crate) state setting - now only intended to be used in solana-poh --- poh/src/leader_bank_notifier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index f0dcda5728d013..5844205194932f 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -39,7 +39,7 @@ impl LeaderBankNotifier { /// Set the status to `InProgress` and notify any waiting threads. /// Panics if the status is not `StandBy` - cannot have multiple /// leader banks in progress. - pub fn set_in_progress(&self, bank: &Arc) { + pub(crate) fn set_in_progress(&self, bank: &Arc) { let mut state = self.state.lock().unwrap(); assert_eq!(state.status, Status::StandBy); @@ -55,7 +55,7 @@ impl LeaderBankNotifier { /// Set the status to `StandBy` and notify any waiting threads if the status was changed. /// Panics if the stored slot does not match the given slot. - pub fn set_completed(&self, slot: Slot) { + pub(crate) fn set_completed(&self, slot: Slot) { let mut state = self.state.lock().unwrap(); assert_eq!(state.slot, Some(slot)); From aa673da0aaaaa6b9148fbc87165b46ed981c4d95 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:14:39 +0000 Subject: [PATCH 28/36] unwrap_or_default --- poh/src/leader_bank_notifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 5844205194932f..c5089af57dd2da 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -78,7 +78,7 @@ impl LeaderBankNotifier { let state = self.state.lock().unwrap(); self.get_or_wait_for_in_progress_state(timeout, state) .map(|state| state.bank.clone()) - .unwrap_or_else(Weak::new) + .unwrap_or_default() } /// Wait for next notification for a completed leader slot. From 9117ddd210901e24a028887ab545a2820048e5b3 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:24:25 +0000 Subject: [PATCH 29/36] reduce complexity --- poh/src/leader_bank_notifier.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index c5089af57dd2da..28234db64e90e5 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -114,11 +114,6 @@ impl LeaderBankNotifier { timeout: Duration, state: MutexGuard<'a, SlotAndBankWithStatus>, ) -> Option> { - // Immediately return if the status is `InProgress` - if matches!(state.status, Status::InProgress) { - return Some(state); - } - let (state, wait_timeout_result) = self .condvar .wait_timeout_while(state, timeout, |state| { From de788b1e39e9129f69fece73ca5d805843253c53 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:34:47 +0000 Subject: [PATCH 30/36] state option using ? --- poh/src/leader_bank_notifier.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 28234db64e90e5..2e311cf55a4c7c 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -88,9 +88,7 @@ impl LeaderBankNotifier { // If currently `StandBy`, need to wait for `InProgress` to begin. let now = Instant::now(); - let state = self.get_or_wait_for_in_progress_state(remaining_timeout, state); - - let Some(state) = state else { return None; }; + let state = self.get_or_wait_for_in_progress_state(remaining_timeout, state)?; remaining_timeout = remaining_timeout.saturating_sub(now.elapsed()); // Wait for `StandBy` to be set. From b893a6bc7489a84226feceaf9b8069ded0340c50 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:40:32 +0000 Subject: [PATCH 31/36] timeout sub using checked_sub and ? --- poh/src/leader_bank_notifier.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 2e311cf55a4c7c..852278836ace32 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -89,7 +89,7 @@ impl LeaderBankNotifier { // If currently `StandBy`, need to wait for `InProgress` to begin. let now = Instant::now(); let state = self.get_or_wait_for_in_progress_state(remaining_timeout, state)?; - remaining_timeout = remaining_timeout.saturating_sub(now.elapsed()); + remaining_timeout = remaining_timeout.checked_sub(now.elapsed())?; // Wait for `StandBy` to be set. let (state, wait_timeout_result) = self From 06542a18b7d26830011c0153040c371d675f55dd Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:45:42 +0000 Subject: [PATCH 32/36] get_or_wait_for_in_progress_state takes in Condvar ref instead of self --- poh/src/leader_bank_notifier.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 852278836ace32..75cdac92384484 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -76,7 +76,7 @@ impl LeaderBankNotifier { /// If the timeout is reached, the weak reference is unupgradable. pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak { let state = self.state.lock().unwrap(); - self.get_or_wait_for_in_progress_state(timeout, state) + Self::get_or_wait_for_in_progress_state(&self.condvar, timeout, state) .map(|state| state.bank.clone()) .unwrap_or_default() } @@ -88,7 +88,8 @@ impl LeaderBankNotifier { // If currently `StandBy`, need to wait for `InProgress` to begin. let now = Instant::now(); - let state = self.get_or_wait_for_in_progress_state(remaining_timeout, state)?; + let state = + Self::get_or_wait_for_in_progress_state(&self.condvar, remaining_timeout, state)?; remaining_timeout = remaining_timeout.checked_sub(now.elapsed())?; // Wait for `StandBy` to be set. @@ -106,14 +107,14 @@ impl LeaderBankNotifier { } } - // Helper function to get or wait for the `InProgress` status with a given `MutexGuard`. + /// Helper function to get or wait for the `InProgress` status with a given `MutexGuard`. + /// If `InProgress` status is reached, the state `MutexGuard` is returned, otherwise None. fn get_or_wait_for_in_progress_state<'a>( - &self, + condvar: &'a Condvar, timeout: Duration, state: MutexGuard<'a, SlotAndBankWithStatus>, ) -> Option> { - let (state, wait_timeout_result) = self - .condvar + let (state, wait_timeout_result) = condvar .wait_timeout_while(state, timeout, |state| { matches!(state.status, Status::StandBy) }) From 29cee36f73c45a6bfe59efe94fcb07c3acf533c6 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 15:46:53 +0000 Subject: [PATCH 33/36] get_or_wait_for_in_progress_state argument order --- poh/src/leader_bank_notifier.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 75cdac92384484..fbf1b6ba2bd181 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -76,7 +76,7 @@ impl LeaderBankNotifier { /// If the timeout is reached, the weak reference is unupgradable. pub fn get_or_wait_for_in_progress(&self, timeout: Duration) -> Weak { let state = self.state.lock().unwrap(); - Self::get_or_wait_for_in_progress_state(&self.condvar, timeout, state) + Self::get_or_wait_for_in_progress_state(&self.condvar, state, timeout) .map(|state| state.bank.clone()) .unwrap_or_default() } @@ -89,7 +89,7 @@ impl LeaderBankNotifier { // If currently `StandBy`, need to wait for `InProgress` to begin. let now = Instant::now(); let state = - Self::get_or_wait_for_in_progress_state(&self.condvar, remaining_timeout, state)?; + Self::get_or_wait_for_in_progress_state(&self.condvar, state, remaining_timeout)?; remaining_timeout = remaining_timeout.checked_sub(now.elapsed())?; // Wait for `StandBy` to be set. @@ -111,8 +111,8 @@ impl LeaderBankNotifier { /// If `InProgress` status is reached, the state `MutexGuard` is returned, otherwise None. fn get_or_wait_for_in_progress_state<'a>( condvar: &'a Condvar, - timeout: Duration, state: MutexGuard<'a, SlotAndBankWithStatus>, + timeout: Duration, ) -> Option> { let (state, wait_timeout_result) = condvar .wait_timeout_while(state, timeout, |state| { From 42d778b77b87c21a189138894d7ed49237eb1851 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 24 Mar 2023 16:16:29 +0000 Subject: [PATCH 34/36] set_completed is now only called by PohRecorder. Added assertions appropriately --- poh/src/leader_bank_notifier.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index fbf1b6ba2bd181..0114c2f37ee9fe 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -53,18 +53,14 @@ impl LeaderBankNotifier { self.condvar.notify_all(); } - /// Set the status to `StandBy` and notify any waiting threads if the status was changed. - /// Panics if the stored slot does not match the given slot. + /// Set the status to `StandBy` and notify any waiting threads. + /// Panics if the current status is not `InProgress` or the stored slot does not match + /// the given slot. pub(crate) fn set_completed(&self, slot: Slot) { let mut state = self.state.lock().unwrap(); + assert_eq!(state.status, Status::InProgress); assert_eq!(state.slot, Some(slot)); - // `set_completed` can be called multiple times for the same slot because it - // may be called from multiple threads. - if matches!(state.status, Status::StandBy) { - return; - } - state.status = Status::StandBy; drop(state); From b679dee9c2bc06a1a8a98c1c4af54c5b9d6b9b3f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Sat, 25 Mar 2023 23:28:49 +0000 Subject: [PATCH 35/36] manually revert previous changes... --- poh/src/poh_recorder.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 3a65f7be38589a..3abd9ae57f64ff 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -137,24 +137,17 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } +#[derive(Clone)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, pub is_exited: Arc, } -impl Clone for TransactionRecorder { - fn clone(&self) -> Self { - TransactionRecorder::new(self.record_sender.clone(), self.is_exited.clone()) - } -} - impl TransactionRecorder { pub fn new(record_sender: Sender, is_exited: Arc) -> Self { Self { - // shared record_sender, - // shared is_exited, } } From 974cf7ee59d910de56ec44ff3a8cbe93de26ac8a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Sat, 25 Mar 2023 23:42:45 +0000 Subject: [PATCH 36/36] Use then and expect for returning optional slot - verifies assumption --- poh/src/leader_bank_notifier.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/poh/src/leader_bank_notifier.rs b/poh/src/leader_bank_notifier.rs index 0114c2f37ee9fe..8d950d487b41fe 100644 --- a/poh/src/leader_bank_notifier.rs +++ b/poh/src/leader_bank_notifier.rs @@ -96,11 +96,7 @@ impl LeaderBankNotifier { }) .unwrap(); - if !wait_timeout_result.timed_out() { - state.slot - } else { - None - } + (!wait_timeout_result.timed_out()).then(|| state.slot.expect("some slot when completed")) } /// Helper function to get or wait for the `InProgress` status with a given `MutexGuard`.