Skip to content

Commit

Permalink
Replaces SPS's channel with pending packages that has one slot per sn…
Browse files Browse the repository at this point in the history
…apshot kind
  • Loading branch information
brooksprumo committed Aug 5, 2024
1 parent 4ee8920 commit aec72bd
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 228 deletions.
33 changes: 11 additions & 22 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Service to calculate accounts hashes
use {
crate::snapshot_packager_service::PendingSnapshotPackages,
crossbeam_channel::{Receiver, Sender},
solana_accounts_db::{
accounts_db::CalcAccountsHashKind,
Expand All @@ -24,7 +25,7 @@ use {
io::Result as IoResult,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
Expand All @@ -39,7 +40,7 @@ impl AccountsHashVerifier {
pub fn new(
accounts_package_sender: Sender<AccountsPackage>,
accounts_package_receiver: Receiver<AccountsPackage>,
snapshot_package_sender: Option<Sender<SnapshotPackage>>,
pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
exit: Arc<AtomicBool>,
snapshot_config: SnapshotConfig,
) -> Self {
Expand Down Expand Up @@ -71,9 +72,8 @@ impl AccountsHashVerifier {

let (result, handling_time_us) = measure_us!(Self::process_accounts_package(
accounts_package,
snapshot_package_sender.as_ref(),
&pending_snapshot_packages,
&snapshot_config,
&exit,
));
if let Err(err) = result {
error!("Stopping AccountsHashVerifier! Fatal error while processing accounts package: {err}");
Expand Down Expand Up @@ -208,9 +208,8 @@ impl AccountsHashVerifier {
#[allow(clippy::too_many_arguments)]
fn process_accounts_package(
accounts_package: AccountsPackage,
snapshot_package_sender: Option<&Sender<SnapshotPackage>>,
pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
snapshot_config: &SnapshotConfig,
exit: &AtomicBool,
) -> IoResult<()> {
let (accounts_hash_kind, bank_incremental_snapshot_persistence) =
Self::calculate_and_verify_accounts_hash(&accounts_package, snapshot_config)?;
Expand All @@ -221,11 +220,10 @@ impl AccountsHashVerifier {

Self::submit_for_packaging(
accounts_package,
snapshot_package_sender,
pending_snapshot_packages,
snapshot_config,
accounts_hash_kind,
bank_incremental_snapshot_persistence,
exit,
);

Ok(())
Expand Down Expand Up @@ -462,11 +460,10 @@ impl AccountsHashVerifier {

fn submit_for_packaging(
accounts_package: AccountsPackage,
snapshot_package_sender: Option<&Sender<SnapshotPackage>>,
pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
snapshot_config: &SnapshotConfig,
accounts_hash_kind: AccountsHashKind,
bank_incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
exit: &AtomicBool,
) {
if !snapshot_config.should_generate_snapshots()
|| !matches!(
Expand All @@ -476,24 +473,16 @@ impl AccountsHashVerifier {
{
return;
}
let Some(snapshot_package_sender) = snapshot_package_sender else {
return;
};

let snapshot_package = SnapshotPackage::new(
accounts_package,
accounts_hash_kind,
bank_incremental_snapshot_persistence,
);
let send_result = snapshot_package_sender.send(snapshot_package);
if let Err(err) = send_result {
// Sending the snapshot package should never fail *unless* we're shutting down.
let snapshot_package = &err.0;
assert!(
exit.load(Ordering::Relaxed),
"Failed to send snapshot package: {err}, {snapshot_package:?}"
);
}
pending_snapshot_packages
.lock()
.unwrap()
.push(snapshot_package);
}

pub fn join(self) -> thread::Result<()> {
Expand Down
179 changes: 12 additions & 167 deletions core/src/snapshot_packager_service.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
mod pending_snapshot_packages;
mod snapshot_gossip_manager;
pub use pending_snapshot_packages::PendingSnapshotPackages;
use {
crossbeam_channel::{Receiver, Sender},
snapshot_gossip_manager::SnapshotGossipManager,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::{measure::Measure, measure_us},
solana_perf::thread::renice_this_thread,
solana_runtime::{
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::{self, SnapshotPackage},
snapshot_utils,
snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes,
snapshot_package::SnapshotPackage, snapshot_utils,
},
std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
Expand All @@ -30,8 +29,7 @@ impl SnapshotPackagerService {
const LOOP_LIMITER: Duration = Duration::from_millis(100);

pub fn new(
snapshot_package_sender: Sender<SnapshotPackage>,
snapshot_package_receiver: Receiver<SnapshotPackage>,
pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
Expand All @@ -51,13 +49,8 @@ impl SnapshotPackagerService {
break;
}

let Some((
snapshot_package,
num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
)) = Self::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver,
let Some(snapshot_package) = Self::get_next_snapshot_package(
&pending_snapshot_packages,
)
else {
std::thread::sleep(Self::LOOP_LIMITER);
Expand Down Expand Up @@ -108,16 +101,6 @@ impl SnapshotPackagerService {
let handling_time_us = measure_handling.end_as_us();
datapoint_info!(
"snapshot_packager_service",
(
"num_outstanding_snapshot_packages",
num_outstanding_snapshot_packages,
i64
),
(
"num_re_enqueued_snapshot_packages",
num_re_enqueued_snapshot_packages,
i64
),
("enqueued_time_us", enqueued_time.as_micros(), i64),
("handling_time_us", handling_time_us, i64),
("archive_time_us", archive_time_us, i64),
Expand Down Expand Up @@ -146,148 +129,10 @@ impl SnapshotPackagerService {
self.t_snapshot_packager.join()
}

/// Get the next snapshot package to handle
///
/// Look through the snapshot package channel to find the highest priority one to handle next.
/// If there are no snapshot packages in the channel, return None. Otherwise return the
/// highest priority one. Unhandled snapshot packages with slots GREATER-THAN the handled one
/// will be re-enqueued. The remaining will be dropped.
///
/// Also return the number of snapshot packages initially in the channel, and the number of
/// ones re-enqueued.
/// Returns the next snapshot package to handle
fn get_next_snapshot_package(
snapshot_package_sender: &Sender<SnapshotPackage>,
snapshot_package_receiver: &Receiver<SnapshotPackage>,
) -> Option<(
SnapshotPackage,
/*num outstanding snapshot packages*/ usize,
/*num re-enqueued snapshot packages*/ usize,
)> {
let mut snapshot_packages: Vec<_> = snapshot_package_receiver.try_iter().collect();
// `select_nth()` panics if the slice is empty, so return if that's the case
if snapshot_packages.is_empty() {
return None;
}
let snapshot_packages_len = snapshot_packages.len();
debug!("outstanding snapshot packages ({snapshot_packages_len}): {snapshot_packages:?}");

snapshot_packages.select_nth_unstable_by(
snapshot_packages_len - 1,
snapshot_package::cmp_snapshot_packages_by_priority,
);
// SAFETY: We know `snapshot_packages` is not empty, so its len is >= 1,
// therefore there is always an element to pop.
let snapshot_package = snapshot_packages.pop().unwrap();
let handled_snapshot_package_slot = snapshot_package.slot;
// re-enqueue any remaining snapshot packages for slots GREATER-THAN the snapshot package
// that will be handled
let num_re_enqueued_snapshot_packages = snapshot_packages
.into_iter()
.filter(|snapshot_package| snapshot_package.slot > handled_snapshot_package_slot)
.map(|snapshot_package| {
snapshot_package_sender
.try_send(snapshot_package)
.expect("re-enqueue snapshot package")
})
.count();

Some((
snapshot_package,
snapshot_packages_len,
num_re_enqueued_snapshot_packages,
))
}
}

#[cfg(test)]
mod tests {
use {
super::*,
rand::seq::SliceRandom,
solana_runtime::snapshot_package::{SnapshotKind, SnapshotPackage},
solana_sdk::clock::Slot,
};

/// Ensure that unhandled snapshot packages are properly re-enqueued or dropped
///
/// The snapshot package handler should re-enqueue unhandled snapshot packages, if those
/// unhandled snapshot packages are for slots GREATER-THAN the last handled snapshot package.
/// Otherwise, they should be dropped.
#[test]
fn test_get_next_snapshot_package() {
fn new(snapshot_kind: SnapshotKind, slot: Slot) -> SnapshotPackage {
SnapshotPackage {
snapshot_kind,
slot,
block_height: slot,
..SnapshotPackage::default_for_tests()
}
}
fn new_full(slot: Slot) -> SnapshotPackage {
new(SnapshotKind::FullSnapshot, slot)
}
fn new_incr(slot: Slot, base: Slot) -> SnapshotPackage {
new(SnapshotKind::IncrementalSnapshot(base), slot)
}

let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded();

// Populate the channel so that re-enqueueing and dropping will be tested
let mut snapshot_packages = [
new_full(100),
new_incr(110, 100),
new_incr(210, 100),
new_full(300),
new_incr(310, 300),
new_full(400), // <-- handle 1st
new_incr(410, 400),
new_incr(420, 400), // <-- handle 2nd
];
// Shuffle the snapshot packages to simulate receiving new snapshot packages from AHV
// simultaneously as SPS is handling them.
snapshot_packages.shuffle(&mut rand::thread_rng());
snapshot_packages
.into_iter()
.for_each(|snapshot_package| snapshot_package_sender.send(snapshot_package).unwrap());

// The Full Snapshot from slot 400 is handled 1st
// (the older full snapshots are skipped and dropped)
let (
snapshot_package,
_num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
) = SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver,
)
.unwrap();
assert_eq!(snapshot_package.snapshot_kind, SnapshotKind::FullSnapshot,);
assert_eq!(snapshot_package.slot, 400);
assert_eq!(num_re_enqueued_snapshot_packages, 2);

// The Incremental Snapshot from slot 420 is handled 2nd
// (the older incremental snapshot from slot 410 is skipped and dropped)
let (
snapshot_package,
_num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
) = SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver,
)
.unwrap();
assert_eq!(
snapshot_package.snapshot_kind,
SnapshotKind::IncrementalSnapshot(400),
);
assert_eq!(snapshot_package.slot, 420);
assert_eq!(num_re_enqueued_snapshot_packages, 0);

// And now the snapshot package channel is empty!
assert!(SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver
)
.is_none());
pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
) -> Option<SnapshotPackage> {
pending_snapshot_packages.lock().unwrap().pop()
}
}
Loading

0 comments on commit aec72bd

Please sign in to comment.