diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 87a16610547290..78b6c2d0298c80 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -1,6 +1,7 @@ //! Service to calculate accounts hashes use { + crate::snapshot_packager_service::PendingSnapshotPackages, crossbeam_channel::{Receiver, Sender}, solana_accounts_db::{ accounts_db::CalcAccountsHashKind, @@ -24,7 +25,7 @@ use { io::Result as IoResult, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -39,7 +40,7 @@ impl AccountsHashVerifier { pub fn new( accounts_package_sender: Sender, accounts_package_receiver: Receiver, - snapshot_package_sender: Option>, + pending_snapshot_packages: Arc>, exit: Arc, snapshot_config: SnapshotConfig, ) -> Self { @@ -71,9 +72,8 @@ impl AccountsHashVerifier { let (result, handling_time_us) = measure_us!(Self::process_accounts_package( accounts_package, - snapshot_package_sender.as_ref(), + &pending_snapshot_packages, &snapshot_config, - &exit, )); if let Err(err) = result { error!("Stopping AccountsHashVerifier! Fatal error while processing accounts package: {err}"); @@ -208,9 +208,8 @@ impl AccountsHashVerifier { #[allow(clippy::too_many_arguments)] fn process_accounts_package( accounts_package: AccountsPackage, - snapshot_package_sender: Option<&Sender>, + pending_snapshot_packages: &Mutex, snapshot_config: &SnapshotConfig, - exit: &AtomicBool, ) -> IoResult<()> { let (accounts_hash_kind, bank_incremental_snapshot_persistence) = Self::calculate_and_verify_accounts_hash(&accounts_package, snapshot_config)?; @@ -221,11 +220,10 @@ impl AccountsHashVerifier { Self::submit_for_packaging( accounts_package, - snapshot_package_sender, + pending_snapshot_packages, snapshot_config, accounts_hash_kind, bank_incremental_snapshot_persistence, - exit, ); Ok(()) @@ -462,11 +460,10 @@ impl AccountsHashVerifier { fn submit_for_packaging( accounts_package: AccountsPackage, - snapshot_package_sender: Option<&Sender>, + pending_snapshot_packages: &Mutex, snapshot_config: &SnapshotConfig, accounts_hash_kind: AccountsHashKind, bank_incremental_snapshot_persistence: Option, - exit: &AtomicBool, ) { if !snapshot_config.should_generate_snapshots() || !matches!( @@ -476,24 +473,16 @@ impl AccountsHashVerifier { { return; } - let Some(snapshot_package_sender) = snapshot_package_sender else { - return; - }; let snapshot_package = SnapshotPackage::new( accounts_package, accounts_hash_kind, bank_incremental_snapshot_persistence, ); - let send_result = snapshot_package_sender.send(snapshot_package); - if let Err(err) = send_result { - // Sending the snapshot package should never fail *unless* we're shutting down. - let snapshot_package = &err.0; - assert!( - exit.load(Ordering::Relaxed), - "Failed to send snapshot package: {err}, {snapshot_package:?}" - ); - } + pending_snapshot_packages + .lock() + .unwrap() + .push(snapshot_package); } pub fn join(self) -> thread::Result<()> { diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index ebfa0a9bbe869a..f9c40e4f9b13fe 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,20 +1,19 @@ +mod pending_snapshot_packages; mod snapshot_gossip_manager; +pub use pending_snapshot_packages::PendingSnapshotPackages; use { - crossbeam_channel::{Receiver, Sender}, snapshot_gossip_manager::SnapshotGossipManager, solana_gossip::cluster_info::ClusterInfo, solana_measure::{measure::Measure, measure_us}, solana_perf::thread::renice_this_thread, solana_runtime::{ - snapshot_config::SnapshotConfig, - snapshot_hash::StartingSnapshotHashes, - snapshot_package::{self, SnapshotPackage}, - snapshot_utils, + snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, + snapshot_package::SnapshotPackage, snapshot_utils, }, std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -30,8 +29,7 @@ impl SnapshotPackagerService { const LOOP_LIMITER: Duration = Duration::from_millis(100); pub fn new( - snapshot_package_sender: Sender, - snapshot_package_receiver: Receiver, + pending_snapshot_packages: Arc>, starting_snapshot_hashes: Option, exit: Arc, cluster_info: Arc, @@ -51,13 +49,8 @@ impl SnapshotPackagerService { break; } - let Some(( - snapshot_package, - num_outstanding_snapshot_packages, - num_re_enqueued_snapshot_packages, - )) = Self::get_next_snapshot_package( - &snapshot_package_sender, - &snapshot_package_receiver, + let Some(snapshot_package) = Self::get_next_snapshot_package( + &pending_snapshot_packages, ) else { std::thread::sleep(Self::LOOP_LIMITER); @@ -108,16 +101,6 @@ impl SnapshotPackagerService { let handling_time_us = measure_handling.end_as_us(); datapoint_info!( "snapshot_packager_service", - ( - "num_outstanding_snapshot_packages", - num_outstanding_snapshot_packages, - i64 - ), - ( - "num_re_enqueued_snapshot_packages", - num_re_enqueued_snapshot_packages, - i64 - ), ("enqueued_time_us", enqueued_time.as_micros(), i64), ("handling_time_us", handling_time_us, i64), ("archive_time_us", archive_time_us, i64), @@ -146,148 +129,10 @@ impl SnapshotPackagerService { self.t_snapshot_packager.join() } - /// Get the next snapshot package to handle - /// - /// Look through the snapshot package channel to find the highest priority one to handle next. - /// If there are no snapshot packages in the channel, return None. Otherwise return the - /// highest priority one. Unhandled snapshot packages with slots GREATER-THAN the handled one - /// will be re-enqueued. The remaining will be dropped. - /// - /// Also return the number of snapshot packages initially in the channel, and the number of - /// ones re-enqueued. + /// Returns the next snapshot package to handle fn get_next_snapshot_package( - snapshot_package_sender: &Sender, - snapshot_package_receiver: &Receiver, - ) -> Option<( - SnapshotPackage, - /*num outstanding snapshot packages*/ usize, - /*num re-enqueued snapshot packages*/ usize, - )> { - let mut snapshot_packages: Vec<_> = snapshot_package_receiver.try_iter().collect(); - // `select_nth()` panics if the slice is empty, so return if that's the case - if snapshot_packages.is_empty() { - return None; - } - let snapshot_packages_len = snapshot_packages.len(); - debug!("outstanding snapshot packages ({snapshot_packages_len}): {snapshot_packages:?}"); - - snapshot_packages.select_nth_unstable_by( - snapshot_packages_len - 1, - snapshot_package::cmp_snapshot_packages_by_priority, - ); - // SAFETY: We know `snapshot_packages` is not empty, so its len is >= 1, - // therefore there is always an element to pop. - let snapshot_package = snapshot_packages.pop().unwrap(); - let handled_snapshot_package_slot = snapshot_package.slot; - // re-enqueue any remaining snapshot packages for slots GREATER-THAN the snapshot package - // that will be handled - let num_re_enqueued_snapshot_packages = snapshot_packages - .into_iter() - .filter(|snapshot_package| snapshot_package.slot > handled_snapshot_package_slot) - .map(|snapshot_package| { - snapshot_package_sender - .try_send(snapshot_package) - .expect("re-enqueue snapshot package") - }) - .count(); - - Some(( - snapshot_package, - snapshot_packages_len, - num_re_enqueued_snapshot_packages, - )) - } -} - -#[cfg(test)] -mod tests { - use { - super::*, - rand::seq::SliceRandom, - solana_runtime::snapshot_package::{SnapshotKind, SnapshotPackage}, - solana_sdk::clock::Slot, - }; - - /// Ensure that unhandled snapshot packages are properly re-enqueued or dropped - /// - /// The snapshot package handler should re-enqueue unhandled snapshot packages, if those - /// unhandled snapshot packages are for slots GREATER-THAN the last handled snapshot package. - /// Otherwise, they should be dropped. - #[test] - fn test_get_next_snapshot_package() { - fn new(snapshot_kind: SnapshotKind, slot: Slot) -> SnapshotPackage { - SnapshotPackage { - snapshot_kind, - slot, - block_height: slot, - ..SnapshotPackage::default_for_tests() - } - } - fn new_full(slot: Slot) -> SnapshotPackage { - new(SnapshotKind::FullSnapshot, slot) - } - fn new_incr(slot: Slot, base: Slot) -> SnapshotPackage { - new(SnapshotKind::IncrementalSnapshot(base), slot) - } - - let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded(); - - // Populate the channel so that re-enqueueing and dropping will be tested - let mut snapshot_packages = [ - new_full(100), - new_incr(110, 100), - new_incr(210, 100), - new_full(300), - new_incr(310, 300), - new_full(400), // <-- handle 1st - new_incr(410, 400), - new_incr(420, 400), // <-- handle 2nd - ]; - // Shuffle the snapshot packages to simulate receiving new snapshot packages from AHV - // simultaneously as SPS is handling them. - snapshot_packages.shuffle(&mut rand::thread_rng()); - snapshot_packages - .into_iter() - .for_each(|snapshot_package| snapshot_package_sender.send(snapshot_package).unwrap()); - - // The Full Snapshot from slot 400 is handled 1st - // (the older full snapshots are skipped and dropped) - let ( - snapshot_package, - _num_outstanding_snapshot_packages, - num_re_enqueued_snapshot_packages, - ) = SnapshotPackagerService::get_next_snapshot_package( - &snapshot_package_sender, - &snapshot_package_receiver, - ) - .unwrap(); - assert_eq!(snapshot_package.snapshot_kind, SnapshotKind::FullSnapshot,); - assert_eq!(snapshot_package.slot, 400); - assert_eq!(num_re_enqueued_snapshot_packages, 2); - - // The Incremental Snapshot from slot 420 is handled 2nd - // (the older incremental snapshot from slot 410 is skipped and dropped) - let ( - snapshot_package, - _num_outstanding_snapshot_packages, - num_re_enqueued_snapshot_packages, - ) = SnapshotPackagerService::get_next_snapshot_package( - &snapshot_package_sender, - &snapshot_package_receiver, - ) - .unwrap(); - assert_eq!( - snapshot_package.snapshot_kind, - SnapshotKind::IncrementalSnapshot(400), - ); - assert_eq!(snapshot_package.slot, 420); - assert_eq!(num_re_enqueued_snapshot_packages, 0); - - // And now the snapshot package channel is empty! - assert!(SnapshotPackagerService::get_next_snapshot_package( - &snapshot_package_sender, - &snapshot_package_receiver - ) - .is_none()); + pending_snapshot_packages: &Mutex, + ) -> Option { + pending_snapshot_packages.lock().unwrap().pop() } } diff --git a/core/src/snapshot_packager_service/pending_snapshot_packages.rs b/core/src/snapshot_packager_service/pending_snapshot_packages.rs new file mode 100644 index 00000000000000..78423037fd7a13 --- /dev/null +++ b/core/src/snapshot_packager_service/pending_snapshot_packages.rs @@ -0,0 +1,290 @@ +use { + solana_runtime::snapshot_package::{ + cmp_snapshot_packages_by_priority, SnapshotKind, SnapshotPackage, + }, + std::cmp::Ordering::Greater, +}; + +/// Snapshot packages that are pending for archival +#[derive(Debug, Default)] +pub struct PendingSnapshotPackages { + full: Option, + incremental: Option, +} + +impl PendingSnapshotPackages { + /// Adds `snapshot_package` as a pending snapshot package + /// + /// This will overwrite currently-pending in-kind packages. + /// + /// Note: This function will panic if `snapshot_package` is *older* + /// than any currently-pending in-kind packages. + pub fn push(&mut self, snapshot_package: SnapshotPackage) { + match snapshot_package.snapshot_kind { + SnapshotKind::FullSnapshot => { + if let Some(pending_full_snapshot_package) = self.full.as_ref() { + // snapshots are monotonically increasing; only overwrite *old* packages + assert!(pending_full_snapshot_package + .snapshot_kind + .is_full_snapshot()); + assert_eq!( + cmp_snapshot_packages_by_priority( + &snapshot_package, + pending_full_snapshot_package, + ), + Greater, + "full snapshot package must be newer than pending package, \ + old: {pending_full_snapshot_package:?}, new: {snapshot_package:?}", + ); + info!( + "overwrote pending full snapshot package, old slot: {}, new slot: {}", + pending_full_snapshot_package.slot, snapshot_package.slot, + ); + } + self.full = Some(snapshot_package) + } + SnapshotKind::IncrementalSnapshot(_) => { + if let Some(pending_incremental_snapshot_package) = self.incremental.as_ref() { + // snapshots are monotonically increasing; only overwrite *old* packages + assert!(pending_incremental_snapshot_package + .snapshot_kind + .is_incremental_snapshot()); + assert_eq!( + cmp_snapshot_packages_by_priority( + &snapshot_package, + pending_incremental_snapshot_package, + ), + Greater, + "incremental snapshot package must be newer than pending package, \ + old: {pending_incremental_snapshot_package:?}, new: {snapshot_package:?}", + ); + info!( + "overwrote pending incremental snapshot package, old slot: {}, new slot: {}", + pending_incremental_snapshot_package.slot, snapshot_package.slot, + ); + } + self.incremental = Some(snapshot_package) + } + } + } + + /// Returns the next pending snapshot package to handle + pub fn pop(&mut self) -> Option { + let pending_full = self.full.take(); + let pending_incremental = self.incremental.take(); + match (pending_full, pending_incremental) { + (Some(pending_full), pending_incremental) => { + // If there is a pending incremental snapshot package, check its slot. + // If its slot is greater than the full snapshot package's, + // re-enqueue it, otherwise drop it. + // Note that it is *not supported* to handle incremental snapshots with + // slots *older* than the latest full snapshot. This is why we do not + // re-enqueue every incremental snapshot. + if let Some(pending_incremental) = pending_incremental { + let SnapshotKind::IncrementalSnapshot(base_slot) = + &pending_incremental.snapshot_kind + else { + panic!( + "the pending incremental snapshot package must be of kind \ + IncrementalSnapshot, but instead was {pending_incremental:?}", + ); + }; + if pending_incremental.slot > pending_full.slot + && *base_slot >= pending_full.slot + { + self.incremental = Some(pending_incremental); + } + } + + assert!(pending_full.snapshot_kind.is_full_snapshot()); + Some(pending_full) + } + (None, Some(pending_incremental)) => { + assert!(pending_incremental.snapshot_kind.is_incremental_snapshot()); + Some(pending_incremental) + } + (None, None) => None, + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_runtime::snapshot_package::{SnapshotKind, SnapshotPackage}, + solana_sdk::clock::Slot, + }; + + fn new(snapshot_kind: SnapshotKind, slot: Slot) -> SnapshotPackage { + SnapshotPackage { + snapshot_kind, + slot, + block_height: slot, + ..SnapshotPackage::default_for_tests() + } + } + fn new_full(slot: Slot) -> SnapshotPackage { + new(SnapshotKind::FullSnapshot, slot) + } + fn new_incr(slot: Slot, base: Slot) -> SnapshotPackage { + new(SnapshotKind::IncrementalSnapshot(base), slot) + } + + #[test] + fn test_default() { + let pending_snapshot_packages = PendingSnapshotPackages::default(); + assert!(pending_snapshot_packages.full.is_none()); + assert!(pending_snapshot_packages.incremental.is_none()); + } + + #[test] + fn test_push() { + let mut pending_snapshot_packages = PendingSnapshotPackages::default(); + + // ensure we can push full snapshot packages + let slot = 100; + pending_snapshot_packages.push(new_full(slot)); + assert_eq!(pending_snapshot_packages.full.as_ref().unwrap().slot, slot); + assert!(pending_snapshot_packages.incremental.is_none()); + + // ensure we can overwrite full snapshot packages + let slot = slot + 100; + pending_snapshot_packages.push(new_full(slot)); + assert_eq!(pending_snapshot_packages.full.as_ref().unwrap().slot, slot); + assert!(pending_snapshot_packages.incremental.is_none()); + + // ensure we can push incremental packages + let full_slot = slot; + let slot = full_slot + 10; + pending_snapshot_packages.push(new_incr(slot, full_slot)); + assert_eq!( + pending_snapshot_packages.full.as_ref().unwrap().slot, + full_slot, + ); + assert_eq!( + pending_snapshot_packages.incremental.as_ref().unwrap().slot, + slot, + ); + + // ensure we can overwrite incremental packages + let slot = slot + 10; + pending_snapshot_packages.push(new_incr(slot, full_slot)); + assert_eq!( + pending_snapshot_packages.full.as_ref().unwrap().slot, + full_slot, + ); + assert_eq!( + pending_snapshot_packages.incremental.as_ref().unwrap().slot, + slot, + ); + + // ensure pushing a full package doesn't affect the incremental package + // (we already tested above that pushing an incremental doesn't affect the full) + let incremental_slot = slot; + let slot = full_slot + 100; + pending_snapshot_packages.push(new_full(slot)); + assert_eq!(pending_snapshot_packages.full.as_ref().unwrap().slot, slot); + assert_eq!( + pending_snapshot_packages.incremental.as_ref().unwrap().slot, + incremental_slot, + ); + } + + #[test] + #[should_panic(expected = "full snapshot package must be newer than pending package")] + fn test_push_older_full() { + let slot = 100; + let mut pending_snapshot_packages = PendingSnapshotPackages { + full: Some(new_full(slot)), + incremental: None, + }; + + // pushing an older full should panic + pending_snapshot_packages.push(new_full(slot - 1)); + } + + #[test] + #[should_panic(expected = "incremental snapshot package must be newer than pending package")] + fn test_push_older_incremental() { + let base = 100; + let slot = base + 20; + let mut pending_snapshot_packages = PendingSnapshotPackages { + full: None, + incremental: Some(new_incr(slot, base)), + }; + + // pushing an older incremental should panic + pending_snapshot_packages.push(new_incr(slot - 1, base)); + } + + #[test] + fn test_pop() { + let mut pending_snapshot_packages = PendingSnapshotPackages::default(); + + // ensure we can call pop when there are no pending packages + assert!(pending_snapshot_packages.pop().is_none()); + + // ensure pop returns full when there's only a full + let slot = 100; + pending_snapshot_packages.full = Some(new_full(slot)); + pending_snapshot_packages.incremental = None; + let snapshot_package = pending_snapshot_packages.pop().unwrap(); + assert!(snapshot_package.snapshot_kind.is_full_snapshot()); + assert_eq!(snapshot_package.slot, slot); + + // ensure pop returns incremental when there's only an incremental + let base = 100; + let slot = base + 10; + pending_snapshot_packages.full = None; + pending_snapshot_packages.incremental = Some(new_incr(slot, base)); + let snapshot_package = pending_snapshot_packages.pop().unwrap(); + assert!(snapshot_package.snapshot_kind.is_incremental_snapshot()); + assert_eq!(snapshot_package.slot, slot); + + // ensure pop returns full when there's both a full and newer incremental + let full_slot = 100; + let incr_slot = full_slot + 10; + pending_snapshot_packages.full = Some(new_full(full_slot)); + pending_snapshot_packages.incremental = Some(new_incr(incr_slot, full_slot)); + let snapshot_package = pending_snapshot_packages.pop().unwrap(); + assert!(snapshot_package.snapshot_kind.is_full_snapshot()); + assert_eq!(snapshot_package.slot, full_slot); + + // ..and then the second pop returns the incremental + let snapshot_package = pending_snapshot_packages.pop().unwrap(); + assert!(snapshot_package.snapshot_kind.is_incremental_snapshot()); + assert_eq!(snapshot_package.slot, incr_slot); + + // but, if there's a full and *older* incremental, pop should return + // the full and *not* re-enque the incremental + let full_slot = 200; + let incr_slot = full_slot - 10; + pending_snapshot_packages.full = Some(new_full(full_slot)); + pending_snapshot_packages.incremental = Some(new_incr(incr_slot, full_slot)); + let snapshot_package = pending_snapshot_packages.pop().unwrap(); + assert!(snapshot_package.snapshot_kind.is_full_snapshot()); + assert_eq!(snapshot_package.slot, full_slot); + assert!(pending_snapshot_packages.incremental.is_none()); + } + + #[test] + #[should_panic] + fn test_pop_invalid_pending_full() { + let mut pending_snapshot_packages = PendingSnapshotPackages { + full: Some(new_incr(110, 100)), // <-- invalid! `full` is IncrementalSnapshot + incremental: None, + }; + pending_snapshot_packages.pop(); + } + + #[test] + #[should_panic] + fn test_pop_invalid_pending_incremental() { + let mut pending_snapshot_packages = PendingSnapshotPackages { + full: None, + incremental: Some(new_full(100)), // <-- invalid! `incremental` is FullSnapshot + }; + pending_snapshot_packages.pop(); + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index 4052a48e144a6f..467fd843640a20 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -19,7 +19,7 @@ use { rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, sigverify, - snapshot_packager_service::SnapshotPackagerService, + snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService}, stats_reporter_service::StatsReporterService, system_monitor_service::{ verify_net_stats_access, SystemMonitorService, SystemMonitorStatsReportConfig, @@ -130,7 +130,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -774,33 +774,27 @@ impl Validator { config.accounts_hash_interval_slots, )); - let (snapshot_package_sender, snapshot_packager_service) = - if config.snapshot_config.should_generate_snapshots() { - let enable_gossip_push = true; - let (snapshot_package_sender, snapshot_package_receiver) = - crossbeam_channel::unbounded(); - let snapshot_packager_service = SnapshotPackagerService::new( - snapshot_package_sender.clone(), - snapshot_package_receiver, - starting_snapshot_hashes, - exit.clone(), - cluster_info.clone(), - config.snapshot_config.clone(), - enable_gossip_push, - ); - ( - Some(snapshot_package_sender), - Some(snapshot_packager_service), - ) - } else { - (None, None) - }; + let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default())); + let snapshot_packager_service = if config.snapshot_config.should_generate_snapshots() { + let enable_gossip_push = true; + let snapshot_packager_service = SnapshotPackagerService::new( + pending_snapshot_packages.clone(), + starting_snapshot_hashes, + exit.clone(), + cluster_info.clone(), + config.snapshot_config.clone(), + enable_gossip_push, + ); + Some(snapshot_packager_service) + } else { + None + }; let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let accounts_hash_verifier = AccountsHashVerifier::new( accounts_package_sender.clone(), accounts_package_receiver, - snapshot_package_sender, + pending_snapshot_packages, exit.clone(), config.snapshot_config.clone(), ); diff --git a/core/tests/epoch_accounts_hash.rs b/core/tests/epoch_accounts_hash.rs index 938031fa05ffbe..f70815a640c595 100755 --- a/core/tests/epoch_accounts_hash.rs +++ b/core/tests/epoch_accounts_hash.rs @@ -12,7 +12,7 @@ use { }, solana_core::{ accounts_hash_verifier::AccountsHashVerifier, - snapshot_packager_service::SnapshotPackagerService, + snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService}, }, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_runtime::{ @@ -43,7 +43,7 @@ use { mem::ManuallyDrop, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, time::Duration, }, @@ -180,10 +180,9 @@ impl BackgroundServices { ) -> Self { info!("Starting background services..."); - let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded(); + let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default())); let snapshot_packager_service = SnapshotPackagerService::new( - snapshot_package_sender.clone(), - snapshot_package_receiver, + pending_snapshot_packages.clone(), None, exit.clone(), cluster_info.clone(), @@ -195,7 +194,7 @@ impl BackgroundServices { let accounts_hash_verifier = AccountsHashVerifier::new( accounts_package_sender.clone(), accounts_package_receiver, - Some(snapshot_package_sender), + pending_snapshot_packages, exit.clone(), snapshot_config.clone(), ); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index de56e29e0d3b47..2a6c77ddb0a0c0 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -12,7 +12,7 @@ use { }, solana_core::{ accounts_hash_verifier::AccountsHashVerifier, - snapshot_packager_service::SnapshotPackagerService, + snapshot_packager_service::{PendingSnapshotPackages, SnapshotPackagerService}, }, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_runtime::{ @@ -50,7 +50,7 @@ use { path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, time::{Duration, Instant}, }, @@ -667,7 +667,7 @@ fn test_snapshots_with_background_services( let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let (accounts_package_sender, accounts_package_receiver) = unbounded(); - let (snapshot_package_sender, snapshot_package_receiver) = unbounded(); + let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default())); let bank_forks = snapshot_test_config.bank_forks.clone(); bank_forks @@ -700,8 +700,7 @@ fn test_snapshots_with_background_services( let exit = Arc::new(AtomicBool::new(false)); let snapshot_packager_service = SnapshotPackagerService::new( - snapshot_package_sender.clone(), - snapshot_package_receiver, + pending_snapshot_packages.clone(), None, exit.clone(), cluster_info.clone(), @@ -712,7 +711,7 @@ fn test_snapshots_with_background_services( let accounts_hash_verifier = AccountsHashVerifier::new( accounts_package_sender, accounts_package_receiver, - Some(snapshot_package_sender), + pending_snapshot_packages, exit.clone(), snapshot_test_config.snapshot_config.clone(), ); diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 152ec84ed66f70..94778ee4b22407 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -8,7 +8,8 @@ use { utils::{create_all_accounts_run_and_snapshot_dirs, move_and_async_delete_path_contents}, }, solana_core::{ - accounts_hash_verifier::AccountsHashVerifier, validator::BlockVerificationMethod, + accounts_hash_verifier::AccountsHashVerifier, + snapshot_packager_service::PendingSnapshotPackages, validator::BlockVerificationMethod, }, solana_geyser_plugin_manager::geyser_plugin_service::{ GeyserPluginService, GeyserPluginServiceError, @@ -48,7 +49,7 @@ use { process::exit, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, }, thiserror::Error, @@ -335,11 +336,12 @@ pub fn load_and_process_ledger( } } + let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default())); let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let accounts_hash_verifier = AccountsHashVerifier::new( accounts_package_sender.clone(), accounts_package_receiver, - None, + pending_snapshot_packages, exit.clone(), SnapshotConfig::new_load_only(), );