Skip to content

Commit

Permalink
refactor(chain): remove unwrap from filter header chain
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Dec 10, 2024
1 parent 43842ba commit b77a9ec
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
12 changes: 6 additions & 6 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,12 @@ impl<H: HeaderStore> Chain<H> {
// Check for any obvious faults
self.audit_cf_headers(&batch).await?;
// We already have a message like this. Verify they are the same
if self.cf_header_chain.has_queue() {
Ok(self.cf_header_chain.verify(&mut batch).await)
} else {
// Associate the block hashes with the filter hashes and add them to the queue
let queue = self.construct_cf_header_queue(&mut batch).await?;
Ok(self.cf_header_chain.set_queue(queue).await)
match self.cf_header_chain.merged_queue.take() {
Some(queue) => Ok(self.cf_header_chain.verify(&mut batch, queue).await),
None => {
let queue = self.construct_cf_header_queue(&mut batch).await?;
Ok(self.cf_header_chain.set_queue(queue).await)
}
}
}

Expand Down
25 changes: 13 additions & 12 deletions src/filters/cfheader_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ impl QueuedCFHeader {
}
}

type Queue = Option<Vec<QueuedCFHeader>>;
type Queue = Vec<QueuedCFHeader>;

#[derive(Debug)]
pub(crate) struct CFHeaderChain {
pub(crate) merged_queue: Option<Queue>,
anchor_checkpoint: HeaderCheckpoint,
// We only really care about this relationship
hash_chain: HashMap<BlockHash, FilterHash>,
merged_queue: Queue,
prev_stophash_request: Option<BlockHash>,
prev_header: Option<FilterHeader>,
quorum_required: usize,
Expand All @@ -82,18 +82,18 @@ impl CFHeaderChain {

// Set a reference point for the block hashes and associated filter hash.
pub(crate) async fn set_queue(&mut self, cf_headers: Vec<QueuedCFHeader>) -> AppendAttempt {
self.merged_queue = Some(cf_headers);
self.current_quorum += 1;
self.attempt_merge().await
self.attempt_merge(cf_headers).await
}

// Verify a batch of filter headers and hashes is what we expect.
pub(crate) async fn verify(&mut self, cf_headers: &mut CFHeaderBatch) -> AppendAttempt {
pub(crate) async fn verify(
&mut self,
cf_headers: &mut CFHeaderBatch,
queue: Vec<QueuedCFHeader>,
) -> AppendAttempt {
// The caller is responsible for knowing if there is a queue or not
for ((block_hash, header_one, hash_one), (header_two, hash_two)) in self
.merged_queue
.as_ref()
.unwrap()
for ((block_hash, header_one, hash_one), (header_two, hash_two)) in queue
.iter()
.map(|queue| queue.tuple())
.zip(cf_headers.take_inner())
Expand All @@ -105,12 +105,11 @@ impl CFHeaderChain {
}
}
self.current_quorum += 1;
self.attempt_merge().await
self.attempt_merge(queue).await
}

// If enough peers have responded, insert those block hashes and filter hashes into a map.
async fn attempt_merge(&mut self) -> AppendAttempt {
let queue = self.merged_queue.as_ref().unwrap();
async fn attempt_merge(&mut self, queue: Vec<QueuedCFHeader>) -> AppendAttempt {
if self.current_quorum.ge(&self.quorum_required) {
for (block_hash, filter_hash) in queue.iter().map(|queue| queue.hash_tuple()) {
self.hash_chain.insert(block_hash, filter_hash);
Expand All @@ -121,6 +120,8 @@ impl CFHeaderChain {
self.merged_queue = None;
return AppendAttempt::Extended;
}
// The merge was not successful and we need to reset the queue
self.merged_queue = Some(queue);
AppendAttempt::AddedToQueue
}

Expand Down

0 comments on commit b77a9ec

Please sign in to comment.