Skip to content

Commit

Permalink
uses bit-mask for blocksotre_meta::ShredIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Dec 19, 2024
1 parent 7fc3fbb commit fdd66b8
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 24 deletions.
3 changes: 1 addition & 2 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4809,8 +4809,7 @@ fn update_completed_data_indexes(
.windows(2)
.filter(|ix| {
let (begin, end) = (ix[0] as u64, ix[1] as u64);
let num_shreds = (end - begin) as usize;
received_data_shreds.range(begin..end).count() == num_shreds
received_data_shreds.is_complete(begin..end)
})
.map(|ix| (ix[0], ix[1] - 1))
.collect()
Expand Down
20 changes: 18 additions & 2 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,10 @@ pub trait ColumnName {

pub trait TypedColumn: Column {
type Type: Serialize + DeserializeOwned;

fn bincode_deserialize(bytes: &[u8]) -> bincode::Result<Self::Type> {
bincode::deserialize(bytes)
}
}

impl TypedColumn for columns::AddressSignatures {
Expand Down Expand Up @@ -1210,6 +1214,18 @@ impl ColumnName for columns::Index {
}
impl TypedColumn for columns::Index {
type Type = blockstore_meta::Index;

fn bincode_deserialize(bytes: &[u8]) -> bincode::Result<Self::Type> {
// For backward compatibility, try first to read LegacyIndex and
// convert from. If that fails retry reading Index.
// It can be showed serialized bytes obtained from an Index will always
// fail to deserialize into a LegacyIndex because there are not enough
// trailing bytes in the payload.
bincode::deserialize::<blockstore_meta::LegacyIndex>(bytes)
.as_ref()
.map(blockstore_meta::Index::from)
.or_else(|_| bincode::deserialize::<blockstore_meta::Index>(bytes))
}
}

impl SlotColumn for columns::DeadSlots {}
Expand Down Expand Up @@ -1647,7 +1663,7 @@ where
let result = self
.backend
.multi_get_cf(self.handle(), &keys)
.map(|out| Ok(out?.as_deref().map(deserialize).transpose()?))
.map(|out| Ok(out?.as_deref().map(C::bincode_deserialize).transpose()?))
.collect::<Vec<Result<Option<_>>>>();
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
Expand Down Expand Up @@ -1675,7 +1691,7 @@ where
&self.read_perf_status,
);
if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? {
let value = deserialize(pinnable_slice.as_ref())?;
let value = C::bincode_deserialize(pinnable_slice.as_ref())?;
result = Ok(Some(value))
}

Expand Down
185 changes: 165 additions & 20 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use {
crate::shred::{Shred, ShredType},
crate::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredType},
},
bitflags::bitflags,
serde::{Deserialize, Deserializer, Serialize, Serializer},
solana_sdk::{
clock::{Slot, UnixTimestamp},
hash::Hash,
},
std::{
collections::BTreeSet,
ops::{Range, RangeBounds},
},
std::{collections::BTreeSet, ops::Range},
};

bitflags! {
Expand Down Expand Up @@ -104,6 +104,8 @@ mod serde_compat {
}
}

// TODO: For downgrade safety, manually implement
// Serialize to first convert to LegacyIndex.
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
/// Index recording presence/absence of shreds
pub struct Index {
Expand All @@ -112,12 +114,67 @@ pub struct Index {
coding: ShredIndex,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct ShredIndex {
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
num_shreds: usize,
}

impl Default for ShredIndex {
fn default() -> Self {
Self {
bytes: vec![0u8; Self::MIN_NUM_BYTES],
num_shreds: 0,
}
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct LegacyIndex {
pub(crate) slot: Slot,
data: LegacyShredIndex,
coding: LegacyShredIndex,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct LegacyShredIndex {
/// Map representing presence/absence of shreds
index: BTreeSet<u64>,
}

impl From<&LegacyIndex> for Index {
fn from(LegacyIndex { slot, data, coding }: &LegacyIndex) -> Self {
Self {
slot: *slot,
data: ShredIndex::from(data),
coding: ShredIndex::from(coding),
}
}
}

impl From<&LegacyShredIndex> for ShredIndex {
fn from(value: &LegacyShredIndex) -> Self {
let num_bytes = Self::MIN_NUM_BYTES.max(
value
.index
.last()
.copied()
.map(Self::get_byte_index)
.unwrap_or_default()
.saturating_add(1),
);
let mut bytes = vec![0u8; num_bytes];
for &k in &value.index {
bytes[Self::get_byte_index(k)] |= Self::get_bit_mask(k);
}
Self {
bytes,
num_shreds: value.index.len(),
}
}
}

#[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
Expand Down Expand Up @@ -228,46 +285,134 @@ pub struct FrozenHashStatus {

impl Index {
pub(crate) fn new(slot: Slot) -> Self {
Index {
Self {
slot,
data: ShredIndex::default(),
coding: ShredIndex::default(),
}
}

#[inline]
pub fn data(&self) -> &ShredIndex {
&self.data
}
pub fn coding(&self) -> &ShredIndex {

#[inline]
pub(crate) fn coding(&self) -> &ShredIndex {
&self.coding
}

#[inline]
pub(crate) fn data_mut(&mut self) -> &mut ShredIndex {
&mut self.data
}

#[inline]
pub(crate) fn coding_mut(&mut self) -> &mut ShredIndex {
&mut self.coding
}
}

static_assertions::const_assert!(ShredIndex::MIN_NUM_BYTES >= (MAX_DATA_SHREDS_PER_SLOT + 7) / 8);

impl ShredIndex {
const MIN_NUM_BYTES: usize = 4_096;

#[inline]
pub fn num_shreds(&self) -> usize {
self.index.len()
self.num_shreds
}

pub(crate) fn range<R>(&self, bounds: R) -> impl Iterator<Item = &u64>
where
R: RangeBounds<u64>,
{
self.index.range(bounds)
// Returns number of shreds received within the range.
fn count_range(&self, Range { mut start, end }: Range<u64>) -> usize {
let mut num_ones = 0;
// Count bits one by one until byte boundary.
while start % 8 != 0 && start < end {
if self.contains(start) {
num_ones += 1;
}
start += 1;
}
// Count ones in full bytes.
let mut byte_index = Self::get_byte_index(start);
while start.saturating_add(7) < end {
num_ones += self
.bytes
.get(byte_index)
.map(|byte| byte.count_ones())
.unwrap_or_default() as usize;
start += 8;
byte_index += 1;
}
// Count remaining bits one by one.
num_ones
+ (start..end)
.map(|index| if self.contains(index) { 1 } else { 0usize })
.sum::<usize>()
}

// Returns true if all shreds within the range are received.
pub(crate) fn is_complete(&self, Range { mut start, end }: Range<u64>) -> bool {
// Check bits one by one until byte boundary.
while start % 8 != 0 && start < end {
if !self.contains(start) {
return false;
}
start += 1;
}
// Check bytes in full.
let mut byte_index = Self::get_byte_index(start);
while start.saturating_add(7) < end {
if self.bytes.get(byte_index).copied() != Some(0xFF) {
return false;
}
start += 8;
byte_index += 1;
}
// Check remaining bits one by one.
(start..end).all(|index| self.contains(index))
}

#[inline]
pub(crate) fn contains(&self, index: u64) -> bool {
self.index.contains(&index)
self.bytes
.get(Self::get_byte_index(index))
.map(|byte| (byte & Self::get_bit_mask(index)) != 0)
.unwrap_or_default()
}

pub(crate) fn insert(&mut self, index: u64) {
self.index.insert(index);
let byte_index = Self::get_byte_index(index);
if self.bytes.len() <= byte_index {
// This branch should not happen but just in case.
self.bytes.resize(byte_index + 1, 0u8);
}
let bit_mask = Self::get_bit_mask(index);
if self.bytes[byte_index] & bit_mask == 0u8 {
self.bytes[byte_index] |= bit_mask;
self.num_shreds += 1;
}
}

#[inline]
fn get_bit_mask(shred_index: u64) -> u8 {
1u8 << (shred_index & 7)
}

#[inline]
fn get_byte_index(shred_index: u64) -> usize {
(shred_index >> 3) as usize
}

#[cfg(test)]
fn remove(&mut self, shred_index: u64) {
let byte_index = Self::get_byte_index(shred_index);
let bit_mask = Self::get_bit_mask(shred_index);
// Test only method so no bounds check!
if self.bytes[byte_index] & bit_mask != 0u8 {
self.bytes[byte_index] ^= bit_mask;
self.num_shreds -= 1;
}
}
}

Expand Down Expand Up @@ -442,8 +587,8 @@ impl ErasureMeta {
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;

let num_coding = index.coding().range(self.coding_shreds_indices()).count();
let num_data = index.data().range(self.data_shreds_indices()).count();
let num_coding = index.coding().count_range(self.coding_shreds_indices());
let num_data = index.data().count_range(self.data_shreds_indices());

let (data_missing, num_needed) = (
self.config.num_data.saturating_sub(num_data),
Expand Down Expand Up @@ -637,7 +782,7 @@ mod test {
.collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_data)
{
index.data_mut().index.remove(&idx);
index.data_mut().remove(idx);

assert_eq!(e_meta.status(&index), CanRecover);
}
Expand All @@ -650,7 +795,7 @@ mod test {
.collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_coding)
{
index.coding_mut().index.remove(&idx);
index.coding_mut().remove(idx);

assert_eq!(e_meta.status(&index), DataFull);
}
Expand Down

0 comments on commit fdd66b8

Please sign in to comment.