Skip to content

Commit

Permalink
moves repair nonce verification from window-service to shred-fetch-stage
Browse files Browse the repository at this point in the history
Because repair nonce is a randomly generated u32 for each repair
request, it can be verified before sigverify so that shreds with invalid
repair nonce do not waste sigverify resources.
  • Loading branch information
behzadnouri committed Jan 29, 2025
1 parent 323039e commit 6355743
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 176 deletions.
14 changes: 7 additions & 7 deletions core/src/repair/outstanding_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct OutstandingRequests<T> {
requests: LruCache<Nonce, RequestStatus<T>>,
}

impl<T, S> OutstandingRequests<T>
impl<T, S: ?Sized> OutstandingRequests<T>
where
T: RequestResponse<Response = S>,
{
Expand Down Expand Up @@ -118,7 +118,7 @@ pub(crate) mod tests {
.expire_timestamp;

assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp + 1, |_| ())
.register_response(nonce, shred.payload(), expire_timestamp + 1, |_| ())
.is_none());
assert!(outstanding_requests.requests.get(&nonce).is_none());
}
Expand All @@ -144,7 +144,7 @@ pub(crate) mod tests {

// Response that passes all checks should decrease num_expected_responses
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp - 1, |_| ())
.register_response(nonce, shred.payload(), expire_timestamp - 1, |_| ())
.is_some());
num_expected_responses -= 1;
assert_eq!(
Expand All @@ -158,10 +158,10 @@ pub(crate) mod tests {

// Response with incorrect nonce is ignored
assert!(outstanding_requests
.register_response(nonce + 1, &shred, expire_timestamp - 1, |_| ())
.register_response(nonce + 1, shred.payload(), expire_timestamp - 1, |_| ())
.is_none());
assert!(outstanding_requests
.register_response(nonce + 1, &shred, expire_timestamp, |_| ())
.register_response(nonce + 1, shred.payload(), expire_timestamp, |_| ())
.is_none());
assert_eq!(
outstanding_requests
Expand All @@ -175,7 +175,7 @@ pub(crate) mod tests {
// Response with timestamp over limit should remove status, preventing late
// responses from being accepted
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp, |_| ())
.register_response(nonce, shred.payload(), expire_timestamp, |_| ())
.is_none());
assert!(outstanding_requests.requests.get(&nonce).is_none());

Expand All @@ -195,7 +195,7 @@ pub(crate) mod tests {
for _ in 0..num_expected_responses {
assert!(outstanding_requests.requests.get(&nonce).is_some());
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp - 1, |_| ())
.register_response(nonce, shred.payload(), expire_timestamp - 1, |_| ())
.is_some());
}
assert!(outstanding_requests.requests.get(&nonce).is_none());
Expand Down
9 changes: 0 additions & 9 deletions core/src/repair/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ pub fn repair_response_packet_from_bytes(
Some(packet)
}

pub(crate) fn nonce(packet: &Packet) -> Option<Nonce> {
// Nonces are attached to both repair and ancestor hashes responses.
let data = packet.data(..)?;
let offset = data.len().checked_sub(SIZE_OF_NONCE)?;
<[u8; SIZE_OF_NONCE]>::try_from(&data[offset..])
.map(Nonce::from_le_bytes)
.ok()
}

#[cfg(test)]
mod test {
use {
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/request_response.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub trait RequestResponse {
type Response;
type Response: ?Sized;
fn num_expected_responses(&self) -> u32;
fn verify_response(&self, response: &Self::Response) -> bool;
}
78 changes: 45 additions & 33 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use {
solana_ledger::{
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
blockstore::Blockstore,
shred::{Nonce, Shred, ShredFetchStats, SIZE_OF_NONCE},
shred::{self, Nonce, ShredFetchStats, SIZE_OF_NONCE},
},
solana_perf::{
data_budget::DataBudget,
Expand Down Expand Up @@ -113,21 +113,28 @@ impl ShredRepairType {
}

impl RequestResponse for ShredRepairType {
type Response = Shred;
type Response = [u8]; // shred's payload
fn num_expected_responses(&self) -> u32 {
match self {
ShredRepairType::Orphan(_) => MAX_ORPHAN_REPAIR_RESPONSES as u32,
ShredRepairType::Shred(_, _) | ShredRepairType::HighestShred(_, _) => 1,
}
}
fn verify_response(&self, response_shred: &Shred) -> bool {
fn verify_response(&self, shred: &Self::Response) -> bool {
#[inline]
fn get_shred_index(shred: &[u8]) -> Option<u64> {
shred::layout::get_index(shred).map(u64::from)
}
let Some(shred_slot) = shred::layout::get_slot(shred) else {
return false;
};
match self {
ShredRepairType::Orphan(slot) => response_shred.slot() <= *slot,
ShredRepairType::Orphan(slot) => shred_slot <= *slot,
ShredRepairType::HighestShred(slot, index) => {
response_shred.slot() == *slot && response_shred.index() as u64 >= *index
shred_slot == *slot && get_shred_index(shred) >= Some(*index)
}
ShredRepairType::Shred(slot, index) => {
response_shred.slot() == *slot && response_shred.index() as u64 == *index
shred_slot == *slot && get_shred_index(shred) == Some(*index)
}
}
}
Expand Down Expand Up @@ -1450,7 +1457,7 @@ mod tests {
get_tmp_ledger_path_auto_delete,
shred::{max_ticks_per_n_shreds, Shred, ShredFlags},
},
solana_perf::packet::{deserialize_from_with_limit, Packet},
solana_perf::packet::{deserialize_from_with_limit, Packet, PacketFlags},
solana_runtime::bank::Bank,
solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, timing::timestamp},
solana_streamer::socket::SocketAddrSpace,
Expand Down Expand Up @@ -1897,7 +1904,7 @@ mod tests {
);

let index = 1;
let rv = ServeRepair::run_highest_window_request(
let mut rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
Expand All @@ -1910,10 +1917,13 @@ mod tests {
verify_responses(&request, rv.iter());

let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
.iter_mut()
.map(|packet| {
packet.meta_mut().flags |= PacketFlags::REPAIR;
let (shred, repair_nonce) =
shred::layout::get_shred_and_repair_nonce(packet).unwrap();
assert_eq!(repair_nonce.unwrap(), nonce);
Shred::new_from_serialized_shred(shred.to_vec()).unwrap()
})
.collect();
assert!(!rv.is_empty());
Expand Down Expand Up @@ -1959,7 +1969,7 @@ mod tests {
.expect("Expect successful ledger write");

let index = 1;
let rv = ServeRepair::run_window_request(
let mut rv = ServeRepair::run_window_request(
&recycler,
&socketaddr_any!(),
&blockstore,
Expand All @@ -1971,10 +1981,13 @@ mod tests {
let request = ShredRepairType::Shred(slot, index);
verify_responses(&request, rv.iter());
let rv: Vec<Shred> = rv
.into_iter()
.filter_map(|p| {
assert_eq!(repair_response::nonce(p).unwrap(), nonce);
Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok()
.iter_mut()
.map(|packet| {
packet.meta_mut().flags |= PacketFlags::REPAIR;
let (shred, repair_nonce) =
shred::layout::get_shred_and_repair_nonce(packet).unwrap();
assert_eq!(repair_nonce.unwrap(), nonce);
Shred::new_from_serialized_shred(shred.to_vec()).unwrap()
})
.collect();
assert_eq!(rv[0].index(), 1);
Expand Down Expand Up @@ -2108,7 +2121,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce);
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 5, nonce);
assert!(rv.is_none());

// Create slots [slot, slot + num_slots) with 5 shreds apiece
Expand Down Expand Up @@ -2145,7 +2158,7 @@ mod tests {
.collect();

// Verify responses
let request = ShredRepairType::Orphan(slot);
let request = ShredRepairType::Orphan(slot + num_slots - 1);
verify_responses(&request, rv.iter());

let expected: Vec<_> = (slot..slot + num_slots)
Expand Down Expand Up @@ -2422,40 +2435,39 @@ mod tests {
// Orphan
let shred = new_test_data_shred(slot, 0);
let request = ShredRepairType::Orphan(slot);
assert!(request.verify_response(&shred));
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot - 1, 0);
assert!(request.verify_response(&shred));
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, 0);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));

// HighestShred
let shred = new_test_data_shred(slot, index);
let request = ShredRepairType::HighestShred(slot, index as u64);
assert!(request.verify_response(&shred));
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index + 1);
assert!(request.verify_response(&shred));
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index - 1);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot - 1, index);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, index);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));

// Shred
let shred = new_test_data_shred(slot, index);
let request = ShredRepairType::Shred(slot, index as u64);
assert!(request.verify_response(&shred));
assert!(request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot, index + 1);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));
let shred = new_test_data_shred(slot + 1, index);
assert!(!request.verify_response(&shred));
assert!(!request.verify_response(shred.payload()));
}

fn verify_responses<'a>(request: &ShredRepairType, packets: impl Iterator<Item = &'a Packet>) {
for packet in packets {
let shred_payload = packet.data(..).unwrap().to_vec();
let shred = Shred::new_from_serialized_shred(shred_payload).unwrap();
request.verify_response(&shred);
let shred = shred::layout::get_shred(packet).unwrap();
assert!(request.verify_response(shred));
}
}

Expand Down
Loading

0 comments on commit 6355743

Please sign in to comment.