diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 16a1a3cc566492..9bb1882bd32a37 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4809,8 +4809,7 @@ fn update_completed_data_indexes( .windows(2) .filter(|ix| { let (begin, end) = (ix[0] as u64, ix[1] as u64); - let num_shreds = (end - begin) as usize; - received_data_shreds.range(begin..end).count() == num_shreds + received_data_shreds.is_complete(begin..end) }) .map(|ix| (ix[0], ix[1] - 1)) .collect() diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 8a674b2a3f5be1..a8d05f1671dbb7 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -796,6 +796,10 @@ pub trait ColumnName { pub trait TypedColumn: Column { type Type: Serialize + DeserializeOwned; + + fn bincode_deserialize(bytes: &[u8]) -> bincode::Result { + bincode::deserialize(bytes) + } } impl TypedColumn for columns::AddressSignatures { @@ -1210,6 +1214,18 @@ impl ColumnName for columns::Index { } impl TypedColumn for columns::Index { type Type = blockstore_meta::Index; + + fn bincode_deserialize(bytes: &[u8]) -> bincode::Result { + // For backward compatibility, try first to read LegacyIndex and + // convert from. If that fails retry reading Index. + // It can be showed serialized bytes obtained from an Index will always + // fail to deserialize into a LegacyIndex because there are not enough + // trailing bytes in the payload. + bincode::deserialize::(bytes) + .as_ref() + .map(blockstore_meta::Index::from) + .or_else(|_| bincode::deserialize::(bytes)) + } } impl SlotColumn for columns::DeadSlots {} @@ -1647,7 +1663,7 @@ where let result = self .backend .multi_get_cf(self.handle(), &keys) - .map(|out| Ok(out?.as_deref().map(deserialize).transpose()?)) + .map(|out| Ok(out?.as_deref().map(C::bincode_deserialize).transpose()?)) .collect::>>>(); if let Some(op_start_instant) = is_perf_enabled { // use multi-get instead @@ -1675,7 +1691,7 @@ where &self.read_perf_status, ); if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? { - let value = deserialize(pinnable_slice.as_ref())?; + let value = C::bincode_deserialize(pinnable_slice.as_ref())?; result = Ok(Some(value)) } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 3b39b41cd92935..8ab84e8dd51fc4 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -1,15 +1,15 @@ use { - crate::shred::{Shred, ShredType}, + crate::{ + blockstore::MAX_DATA_SHREDS_PER_SLOT, + shred::{Shred, ShredType}, + }, bitflags::bitflags, serde::{Deserialize, Deserializer, Serialize, Serializer}, solana_sdk::{ clock::{Slot, UnixTimestamp}, hash::Hash, }, - std::{ - collections::BTreeSet, - ops::{Range, RangeBounds}, - }, + std::{collections::BTreeSet, ops::Range}, }; bitflags! { @@ -104,6 +104,8 @@ mod serde_compat { } } +// TODO: For downgrade safety, manually implement +// Serialize to first convert to LegacyIndex. #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] /// Index recording presence/absence of shreds pub struct Index { @@ -112,12 +114,67 @@ pub struct Index { coding: ShredIndex, } -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct ShredIndex { + #[serde(with = "serde_bytes")] + bytes: Vec, + num_shreds: usize, +} + +impl Default for ShredIndex { + fn default() -> Self { + Self { + bytes: vec![0u8; Self::MIN_NUM_BYTES], + num_shreds: 0, + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +pub struct LegacyIndex { + pub(crate) slot: Slot, + data: LegacyShredIndex, + coding: LegacyShredIndex, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] +pub struct LegacyShredIndex { /// Map representing presence/absence of shreds index: BTreeSet, } +impl From<&LegacyIndex> for Index { + fn from(LegacyIndex { slot, data, coding }: &LegacyIndex) -> Self { + Self { + slot: *slot, + data: ShredIndex::from(data), + coding: ShredIndex::from(coding), + } + } +} + +impl From<&LegacyShredIndex> for ShredIndex { + fn from(value: &LegacyShredIndex) -> Self { + let num_bytes = Self::MIN_NUM_BYTES.max( + value + .index + .last() + .copied() + .map(Self::get_byte_index) + .unwrap_or_default() + .saturating_add(1), + ); + let mut bytes = vec![0u8; num_bytes]; + for &k in &value.index { + bytes[Self::get_byte_index(k)] |= Self::get_bit_mask(k); + } + Self { + bytes, + num_shreds: value.index.len(), + } + } +} + #[derive(Clone, Copy, Debug, Deserialize, Serialize, Eq, PartialEq)] /// Erasure coding information pub struct ErasureMeta { @@ -228,46 +285,134 @@ pub struct FrozenHashStatus { impl Index { pub(crate) fn new(slot: Slot) -> Self { - Index { + Self { slot, data: ShredIndex::default(), coding: ShredIndex::default(), } } + #[inline] pub fn data(&self) -> &ShredIndex { &self.data } - pub fn coding(&self) -> &ShredIndex { + + #[inline] + pub(crate) fn coding(&self) -> &ShredIndex { &self.coding } + #[inline] pub(crate) fn data_mut(&mut self) -> &mut ShredIndex { &mut self.data } + + #[inline] pub(crate) fn coding_mut(&mut self) -> &mut ShredIndex { &mut self.coding } } +static_assertions::const_assert!(ShredIndex::MIN_NUM_BYTES >= (MAX_DATA_SHREDS_PER_SLOT + 7) / 8); + impl ShredIndex { + const MIN_NUM_BYTES: usize = 4_096; + + #[inline] pub fn num_shreds(&self) -> usize { - self.index.len() + self.num_shreds } - pub(crate) fn range(&self, bounds: R) -> impl Iterator - where - R: RangeBounds, - { - self.index.range(bounds) + // Returns number of shreds received within the range. + fn count_range(&self, Range { mut start, end }: Range) -> usize { + let mut num_ones = 0; + // Count bits one by one until byte boundary. + while start % 8 != 0 && start < end { + if self.contains(start) { + num_ones += 1; + } + start += 1; + } + // Count ones in full bytes. + let mut byte_index = Self::get_byte_index(start); + while start.saturating_add(7) < end { + num_ones += self + .bytes + .get(byte_index) + .map(|byte| byte.count_ones()) + .unwrap_or_default() as usize; + start += 8; + byte_index += 1; + } + // Count remaining bits one by one. + num_ones + + (start..end) + .map(|index| if self.contains(index) { 1 } else { 0usize }) + .sum::() + } + + // Returns true if all shreds within the range are received. + pub(crate) fn is_complete(&self, Range { mut start, end }: Range) -> bool { + // Check bits one by one until byte boundary. + while start % 8 != 0 && start < end { + if !self.contains(start) { + return false; + } + start += 1; + } + // Check bytes in full. + let mut byte_index = Self::get_byte_index(start); + while start.saturating_add(7) < end { + if self.bytes.get(byte_index).copied() != Some(0xFF) { + return false; + } + start += 8; + byte_index += 1; + } + // Check remaining bits one by one. + (start..end).all(|index| self.contains(index)) } + #[inline] pub(crate) fn contains(&self, index: u64) -> bool { - self.index.contains(&index) + self.bytes + .get(Self::get_byte_index(index)) + .map(|byte| (byte & Self::get_bit_mask(index)) != 0) + .unwrap_or_default() } pub(crate) fn insert(&mut self, index: u64) { - self.index.insert(index); + let byte_index = Self::get_byte_index(index); + if self.bytes.len() <= byte_index { + // This branch should not happen but just in case. + self.bytes.resize(byte_index + 1, 0u8); + } + let bit_mask = Self::get_bit_mask(index); + if self.bytes[byte_index] & bit_mask == 0u8 { + self.bytes[byte_index] |= bit_mask; + self.num_shreds += 1; + } + } + + #[inline] + fn get_bit_mask(shred_index: u64) -> u8 { + 1u8 << (shred_index & 7) + } + + #[inline] + fn get_byte_index(shred_index: u64) -> usize { + (shred_index >> 3) as usize + } + + #[cfg(test)] + fn remove(&mut self, shred_index: u64) { + let byte_index = Self::get_byte_index(shred_index); + let bit_mask = Self::get_bit_mask(shred_index); + // Test only method so no bounds check! + if self.bytes[byte_index] & bit_mask != 0u8 { + self.bytes[byte_index] ^= bit_mask; + self.num_shreds -= 1; + } } } @@ -442,8 +587,8 @@ impl ErasureMeta { pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus { use ErasureMetaStatus::*; - let num_coding = index.coding().range(self.coding_shreds_indices()).count(); - let num_data = index.data().range(self.data_shreds_indices()).count(); + let num_coding = index.coding().count_range(self.coding_shreds_indices()); + let num_data = index.data().count_range(self.data_shreds_indices()); let (data_missing, num_needed) = ( self.config.num_data.saturating_sub(num_data), @@ -637,7 +782,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_data) { - index.data_mut().index.remove(&idx); + index.data_mut().remove(idx); assert_eq!(e_meta.status(&index), CanRecover); } @@ -650,7 +795,7 @@ mod test { .collect::>() .choose_multiple(&mut rng, erasure_config.num_coding) { - index.coding_mut().index.remove(&idx); + index.coding_mut().remove(idx); assert_eq!(e_meta.status(&index), DataFull); }