Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hash-cache-tool: Scan files with mmap and bins #2504

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 90 additions & 27 deletions accounts-db/accounts-hash-cache-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ use {
},
memmap2::Mmap,
solana_accounts_db::{
accounts_hash::AccountHash, parse_cache_hash_data_filename, CacheHashDataFileEntry,
CacheHashDataFileHeader, ParsedCacheHashDataFilename,
accounts_hash::AccountHash, parse_cache_hash_data_filename,
pubkey_bins::PubkeyBinCalculator24, CacheHashDataFileEntry, CacheHashDataFileHeader,
ParsedCacheHashDataFilename,
},
solana_program::pubkey::Pubkey,
std::{
cmp::{self, Ordering},
fs::{self, File, Metadata},
io::{self, BufReader, Read},
iter,
mem::size_of,
num::Saturating,
path::{Path, PathBuf},
Expand Down Expand Up @@ -295,10 +297,7 @@ fn do_diff_dirs(
dir2: impl AsRef<Path>,
then_diff_files: bool,
) -> Result<(), String> {
let _timer = ElapsedOnDrop {
message: "diffing directories took ".to_string(),
start: Instant::now(),
};
let _timer = ElapsedOnDrop::new("diffing directories took ");

let files1 = get_cache_files_in(dir1)
.map_err(|err| format!("failed to get cache files in dir1: {err}"))?;
Expand Down Expand Up @@ -358,10 +357,10 @@ fn do_diff_dirs(
}

// if the file headers have different entry counts, they are not equal
let Ok((mmap1, header1)) = map_file(&file1.path, false) else {
let Ok((mmap1, header1)) = mmap_file(&file1.path, false) else {
return false;
};
let Ok((mmap2, header2)) = map_file(&file2.path, false) else {
let Ok((mmap2, header2)) = mmap_file(&file2.path, false) else {
return false;
};
if header1.count != header2.count {
Expand Down Expand Up @@ -490,33 +489,81 @@ fn get_cache_files_in(dir: impl AsRef<Path>) -> Result<Vec<CacheFileInfo>, io::E
///
/// If there are multiple entries for a pubkey, only the latest is returned.
fn extract_latest_entries_in(file: impl AsRef<Path>) -> Result<LatestEntriesInfo, String> {
let force = false; // skipping sanity checks is not supported when extracting entries
let (reader, header) = open_file(&file, force).map_err(|err| {
format!(
"failed to open accounts hash cache file '{}': {err}",
file.as_ref().display(),
)
})?;
const NUM_BINS: usize = 1;
let BinnedLatestEntriesInfo {
latest_entries,
capitalization,
} = extract_binned_latest_entries_in(iter::once(file), NUM_BINS)?;
assert_eq!(latest_entries.len(), NUM_BINS);
let mut latest_entries = Vec::from(latest_entries);
let latest_entries = latest_entries.pop().unwrap();

// entries in the file are sorted by pubkey then slot,
// so we want to keep the *last* entry (if there are duplicates)
Ok(LatestEntriesInfo {
latest_entries,
capitalization,
})
}

/// Returns the entries in `files`, binned by pubkey, and the capitalization
///
/// If there are multiple entries for a pubkey, only the latest is returned.
///
/// Note: `files` must be sorted in ascending order, as insertion order is
/// relied on to guarantee the latest entry is returned.
fn extract_binned_latest_entries_in(
files: impl IntoIterator<Item = impl AsRef<Path>>,
bins: usize,
) -> Result<BinnedLatestEntriesInfo, String> {
let binner = PubkeyBinCalculator24::new(bins);
let mut entries: Box<_> = iter::repeat_with(HashMap::default).take(bins).collect();
let mut capitalization = Saturating(0);
let mut entries = HashMap::default();
scan_file(reader, header.count, |entry| {
capitalization += entry.lamports;
let old_value = entries.insert(entry.pubkey, (entry.hash, entry.lamports));
if let Some((_, old_lamports)) = old_value {
// back out the old value's lamports, so we only keep the latest's for capitalization
capitalization -= old_lamports;

for file in files.into_iter() {
let force = false; // skipping sanity checks is not supported when extracting entries
let (mmap, header) = mmap_file(&file, force).map_err(|err| {
format!(
"failed to open accounts hash cache file '{}': {err}",
file.as_ref().display(),
)
})?;

let num_entries = scan_mmap(&mmap, |entry| {
capitalization += entry.lamports;
let bin = binner.bin_from_pubkey(&entry.pubkey);
let old_value = entries[bin].insert(entry.pubkey, (entry.hash, entry.lamports));
if let Some((_, old_lamports)) = old_value {
// back out the old value's lamports, so we only keep the latest's for capitalization
capitalization -= old_lamports;
}
});

if num_entries != header.count {
return Err(format!(
"mismatched number of entries when scanning '{}': expected: {}, actual: {num_entries}",
file.as_ref().display(), header.count,
));
}
})?;
}

Ok(LatestEntriesInfo {
Ok(BinnedLatestEntriesInfo {
latest_entries: entries,
capitalization: capitalization.0,
})
}

/// Scans `mmap` and applies `user_fn` to each entry
fn scan_mmap(mmap: &Mmap, mut user_fn: impl FnMut(&CacheHashDataFileEntry)) -> usize {
const SIZE_OF_ENTRY: usize = size_of::<CacheHashDataFileEntry>();
let bytes = &mmap[size_of::<CacheHashDataFileHeader>()..];
let mut num_entries = Saturating(0);
for chunk in bytes.chunks_exact(SIZE_OF_ENTRY) {
let entry = bytemuck::from_bytes(chunk);
user_fn(entry);
num_entries += 1;
}
num_entries.0
}

/// Scans file with `reader` and applies `user_fn` to each entry
///
/// NOTE: `reader`'s cursor must already be at the first entry; i.e. *past* the header.
Expand Down Expand Up @@ -551,7 +598,7 @@ fn scan_file(
Ok(())
}

fn map_file(
fn mmap_file(
path: impl AsRef<Path>,
force: bool,
) -> Result<(Mmap, CacheHashDataFileHeader), String> {
Expand Down Expand Up @@ -612,12 +659,28 @@ struct LatestEntriesInfo {
capitalization: u64, // lamports
}

#[derive(Debug)]
struct BinnedLatestEntriesInfo {
latest_entries: Box<[HashMap<Pubkey, (AccountHash, /* lamports */ u64)>]>,
capitalization: u64, // lamports
}

#[derive(Debug)]
struct ElapsedOnDrop {
message: String,
start: Instant,
}

impl ElapsedOnDrop {
#[must_use]
fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
start: Instant::now(),
}
}
}

impl Drop for ElapsedOnDrop {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion accounts-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod epoch_accounts_hash;
mod file_io;
pub mod hardened_unpack;
pub mod partitioned_rewards;
mod pubkey_bins;
pub mod pubkey_bins;
mod read_only_accounts_cache;
mod rolling_bit_field;
pub mod secondary_index;
Expand Down
4 changes: 2 additions & 2 deletions accounts-db/src/pubkey_bins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl PubkeyBinCalculator24 {
Self::num_bits::<u32>() as u32 - x.leading_zeros() - 1
}

pub(crate) fn new(bins: usize) -> Self {
pub fn new(bins: usize) -> Self {
const MAX_BITS: u32 = 24;
assert!(bins > 0);
let max_plus_1 = 1 << MAX_BITS;
Expand All @@ -29,7 +29,7 @@ impl PubkeyBinCalculator24 {
}

#[inline]
pub(crate) fn bin_from_pubkey(&self, pubkey: &Pubkey) -> usize {
pub fn bin_from_pubkey(&self, pubkey: &Pubkey) -> usize {
let as_ref = pubkey.as_ref();
((as_ref[0] as usize) << 16 | (as_ref[1] as usize) << 8 | (as_ref[2] as usize))
>> self.shift_bits
Expand Down