From 59a3b2f59d4f9254508d82f75d6d69d8ee667aa0 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 17 Jan 2025 11:13:52 -0600 Subject: [PATCH] reworks args needed for erasure recovery when inserting shreds into blockstore When inserting own shreds during leader slots, we shouldn't try to recover shreds. If shreds are not to be recovered we don't need the retransmit channel either. Otherwise, if we are inserting shreds from another leader, we need to try erasure recovery and retransmit recovered shreds. So Reed-Solomon cache and retransmit-sender should only be present together. The commit makes this explicit by adding should_recover_shreds: Option<( &ReedSolomonCache, &Sender>>, // retransmit_sender )>, argument to Blockstore::do_insert_shreds. --- core/src/window_service.rs | 5 +-- ledger/src/blockstore.rs | 64 ++++++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index ebd56f02864b62..63e1f06ad110b9 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -347,7 +347,7 @@ where repairs, Some(leader_schedule_cache), false, // is_trusted - Some(retransmit_sender), + retransmit_sender, &handle_duplicate, reed_solomon_cache, metrics, @@ -708,6 +708,7 @@ mod test { let _ = duplicate_shred_sender.send(shred); }; let num_trials = 100; + let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0); for slot in 0..num_trials { let (shreds, _) = make_many_slot_entries(slot, 1, 10); let duplicate_index = 0; @@ -724,7 +725,7 @@ mod test { vec![false, false], None, false, // is_trusted - None, + &dummy_retransmit_sender, &handle_duplicate, &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d8bfb21b5bb3e2..61f9ab543c94e1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -982,7 +982,7 @@ impl Blockstore { leader_schedule: Option<&LeaderScheduleCache>, reed_solomon_cache: &ReedSolomonCache, shred_insertion_tracker: &mut ShredInsertionTracker, - retransmit_sender: Option<&Sender>>>, + retransmit_sender: &Sender>>, is_trusted: bool, metrics: &mut BlockstoreInsertionMetrics, ) { @@ -1008,9 +1008,7 @@ impl Blockstore { }) .collect(); if !recovered_shreds.is_empty() { - if let Some(retransmit_sender) = retransmit_sender { - let _ = retransmit_sender.send(recovered_shreds); - } + let _ = retransmit_sender.send(recovered_shreds); } for shred in recovered_data_shreds.into_iter().flatten() { metrics.num_recovered += 1; @@ -1218,8 +1216,15 @@ impl Blockstore { is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, - retransmit_sender: Option<&Sender>>>, - reed_solomon_cache: &ReedSolomonCache, + // When inserting own shreds during leader slots, we shouldn't try to + // recover shreds. If shreds are not to be recovered we don't need the + // retransmit channel either. Otherwise, if we are inserting shreds + // from another leader, we need to try erasure recovery and retransmit + // recovered shreds. + should_recover_shreds: Option<( + &ReedSolomonCache, + &Sender>>, // retransmit_sender + )>, metrics: &mut BlockstoreInsertionMetrics, ) -> Result { assert_eq!(shreds.len(), is_repaired.len()); @@ -1242,16 +1247,16 @@ impl Blockstore { &mut shred_insertion_tracker, metrics, ); - - self.handle_shred_recovery( - leader_schedule, - reed_solomon_cache, - &mut shred_insertion_tracker, - retransmit_sender, - is_trusted, - metrics, - ); - + if let Some((reed_solomon_cache, retransmit_sender)) = should_recover_shreds { + self.handle_shred_recovery( + leader_schedule, + reed_solomon_cache, + &mut shred_insertion_tracker, + retransmit_sender, + is_trusted, + metrics, + ); + } // Handle chaining for the members of the slot_meta_working_set that // were inserted into, drop the others. self.handle_chaining( @@ -1289,13 +1294,16 @@ impl Blockstore { }) } + // Attempts to recover and retransmit recovered shreds (also identifying + // and handling duplicate shreds). Broadcast stage should instead call + // Blockstore::insert_shreds when inserting own shreds during leader slots. pub fn insert_shreds_handle_duplicate( &self, shreds: Vec, is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, - retransmit_sender: Option<&Sender>>>, + retransmit_sender: &Sender>>, handle_duplicate: &F, reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, @@ -1311,8 +1319,7 @@ impl Blockstore { is_repaired, leader_schedule, is_trusted, - retransmit_sender, - reed_solomon_cache, + Some((reed_solomon_cache, retransmit_sender)), metrics, )?; @@ -1369,6 +1376,8 @@ impl Blockstore { } } + // Bypasses erasure recovery becuase it is called from broadcast stage + // when inserting own shreds during leader slots. pub fn insert_shreds( &self, shreds: Vec, @@ -1381,8 +1390,7 @@ impl Blockstore { vec![false; shreds_len], leader_schedule, is_trusted, - None, // retransmit-sender - &ReedSolomonCache::default(), + None, // (reed_solomon_cache, retransmit_sender) &mut BlockstoreInsertionMetrics::default(), )?; Ok(insert_results.completed_data_set_infos) @@ -1400,8 +1408,7 @@ impl Blockstore { vec![false], Some(leader_schedule), false, - None, // retransmit-sender - &ReedSolomonCache::default(), + None, // (reed_solomon_cache, retransmit_sender) &mut BlockstoreInsertionMetrics::default(), ) .unwrap(); @@ -10231,8 +10238,17 @@ pub mod tests { let (data_shreds, coding_shreds, leader_schedule_cache) = setup_erasure_shreds(slot, 0, 100); + let (dummy_retransmit_sender, _) = crossbeam_channel::bounded(0); + let is_repaired = vec![false; coding_shreds.len()]; blockstore - .insert_shreds(coding_shreds, Some(&leader_schedule_cache), false) + .do_insert_shreds( + coding_shreds, + is_repaired, + Some(&leader_schedule_cache), + false, // is_trusted + Some((&ReedSolomonCache::default(), &dummy_retransmit_sender)), + &mut BlockstoreInsertionMetrics::default(), + ) .unwrap(); let shred_bufs: Vec<_> = data_shreds.iter().map(Shred::payload).cloned().collect();