Skip to content

Commit

Permalink
reworks args needed for erasure recovery when inserting shreds into b…
Browse files Browse the repository at this point in the history
…lockstore

When inserting own shreds during leader slots, we shouldn't try to
recover shreds. If shreds are not to be recovered we don't need the
retransmit channel either. Otherwise, if we are inserting shreds from
another leader, we need to try erasure recovery and retransmit recovered
shreds.

So Reed-Solomon cache and retransmit-sender should only be present
together. The commit makes this explicit by adding

    should_recover_shreds: Option<(
        &ReedSolomonCache,
        &Sender<Vec</*shred:*/ Vec<u8>>>, // retransmit_sender
    )>,

argument to Blockstore::do_insert_shreds.
  • Loading branch information
behzadnouri committed Jan 17, 2025
1 parent baf53a4 commit 59a3b2f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
5 changes: 3 additions & 2 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ where
repairs,
Some(leader_schedule_cache),
false, // is_trusted
Some(retransmit_sender),
retransmit_sender,
&handle_duplicate,
reed_solomon_cache,
metrics,
Expand Down Expand Up @@ -708,6 +708,7 @@ mod test {
let _ = duplicate_shred_sender.send(shred);
};
let num_trials = 100;
let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0);
for slot in 0..num_trials {
let (shreds, _) = make_many_slot_entries(slot, 1, 10);
let duplicate_index = 0;
Expand All @@ -724,7 +725,7 @@ mod test {
vec![false, false],
None,
false, // is_trusted
None,
&dummy_retransmit_sender,
&handle_duplicate,
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
Expand Down
64 changes: 40 additions & 24 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ impl Blockstore {
leader_schedule: Option<&LeaderScheduleCache>,
reed_solomon_cache: &ReedSolomonCache,
shred_insertion_tracker: &mut ShredInsertionTracker,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
is_trusted: bool,
metrics: &mut BlockstoreInsertionMetrics,
) {
Expand All @@ -1008,9 +1008,7 @@ impl Blockstore {
})
.collect();
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_shreds);
}
let _ = retransmit_sender.send(recovered_shreds);
}
for shred in recovered_data_shreds.into_iter().flatten() {
metrics.num_recovered += 1;
Expand Down Expand Up @@ -1218,8 +1216,15 @@ impl Blockstore {
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
reed_solomon_cache: &ReedSolomonCache,
// When inserting own shreds during leader slots, we shouldn't try to
// recover shreds. If shreds are not to be recovered we don't need the
// retransmit channel either. Otherwise, if we are inserting shreds
// from another leader, we need to try erasure recovery and retransmit
// recovered shreds.
should_recover_shreds: Option<(
&ReedSolomonCache,
&Sender<Vec</*shred:*/ Vec<u8>>>, // retransmit_sender
)>,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
assert_eq!(shreds.len(), is_repaired.len());
Expand All @@ -1242,16 +1247,16 @@ impl Blockstore {
&mut shred_insertion_tracker,
metrics,
);

self.handle_shred_recovery(
leader_schedule,
reed_solomon_cache,
&mut shred_insertion_tracker,
retransmit_sender,
is_trusted,
metrics,
);

if let Some((reed_solomon_cache, retransmit_sender)) = should_recover_shreds {
self.handle_shred_recovery(
leader_schedule,
reed_solomon_cache,
&mut shred_insertion_tracker,
retransmit_sender,
is_trusted,
metrics,
);
}
// Handle chaining for the members of the slot_meta_working_set that
// were inserted into, drop the others.
self.handle_chaining(
Expand Down Expand Up @@ -1289,13 +1294,16 @@ impl Blockstore {
})
}

// Attempts to recover and retransmit recovered shreds (also identifying
// and handling duplicate shreds). Broadcast stage should instead call
// Blockstore::insert_shreds when inserting own shreds during leader slots.
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
Expand All @@ -1311,8 +1319,7 @@ impl Blockstore {
is_repaired,
leader_schedule,
is_trusted,
retransmit_sender,
reed_solomon_cache,
Some((reed_solomon_cache, retransmit_sender)),
metrics,
)?;

Expand Down Expand Up @@ -1369,6 +1376,8 @@ impl Blockstore {
}
}

// Bypasses erasure recovery becuase it is called from broadcast stage
// when inserting own shreds during leader slots.
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
Expand All @@ -1381,8 +1390,7 @@ impl Blockstore {
vec![false; shreds_len],
leader_schedule,
is_trusted,
None, // retransmit-sender
&ReedSolomonCache::default(),
None, // (reed_solomon_cache, retransmit_sender)
&mut BlockstoreInsertionMetrics::default(),
)?;
Ok(insert_results.completed_data_set_infos)
Expand All @@ -1400,8 +1408,7 @@ impl Blockstore {
vec![false],
Some(leader_schedule),
false,
None, // retransmit-sender
&ReedSolomonCache::default(),
None, // (reed_solomon_cache, retransmit_sender)
&mut BlockstoreInsertionMetrics::default(),
)
.unwrap();
Expand Down Expand Up @@ -10231,8 +10238,17 @@ pub mod tests {
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100);

let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0);
let is_repaired = vec![false; coding_shreds.len()];
blockstore
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.do_insert_shreds(
coding_shreds,
is_repaired,
Some(&leader_schedule_cache),
false, // is_trusted
Some((&ReedSolomonCache::default(), &dummy_retransmit_sender)),
&mut BlockstoreInsertionMetrics::default(),
)
.unwrap();
let shred_bufs: Vec<_> = data_shreds.iter().map(Shred::payload).cloned().collect();

Expand Down

0 comments on commit 59a3b2f

Please sign in to comment.