diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs index 695645a8211..922bc4e7a07 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs @@ -172,12 +172,12 @@ impl ArtifactAssembler .start_timer(); let mut assembler = BlockProposalAssembler::new(stripped_block_proposal); - let missing_ingress_ids = assembler.missing_ingress_messages(); + let stripped_ingress_ids = assembler.missing_ingress_messages(); // For each stripped object in the message, try to fetch it either from the local pools // or from a random peer who is advertising it. - for missing_ingress_id in missing_ingress_ids { + for stripped_ingress_id in stripped_ingress_ids { join_set.spawn(get_or_fetch( - missing_ingress_id, + stripped_ingress_id, self.ingress_pool.clone(), self.transport.clone(), id.as_ref().clone(), @@ -265,6 +265,8 @@ async fn get_or_fetch( .unwrap() .get(&signed_ingress_id.ingress_message_id) { + // Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_ + // uniquely identify ingress messages, we thus need to perform an extra check. if SignedIngressId::from(&ingress_message) == signed_ingress_id { return (ingress_message, node_id); } @@ -309,7 +311,7 @@ impl BlockProposalAssembler { .stripped_ingress_payload .ingress_messages .iter() - .map(|ingress_message| (ingress_message.clone(), None)) + .map(|signed_ingress_id| (signed_ingress_id.clone(), None)) .collect(), stripped_block_proposal, } @@ -319,9 +321,9 @@ impl BlockProposalAssembler { pub(crate) fn missing_ingress_messages(&self) -> Vec { self.ingress_messages .iter() - .filter_map(|(ingress_message_id, maybe_ingress)| { + .filter_map(|(signed_ingress_id, maybe_ingress)| { if maybe_ingress.is_none() { - Some(ingress_message_id) + Some(signed_ingress_id) } else { None } diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs index 54279c78382..d3880ae850a 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs @@ -68,6 +68,8 @@ impl Pools { // First check if the requested ingress message exists in the Ingress Pool. if let Some(ingress_message) = self.ingress_pool.read().unwrap().get(ingress_message_id) { + // Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_ + // uniquely identify ingress messages, we thus need to perform an extra check. if SignedIngressId::from(&ingress_message) == *signed_ingress_id { self.metrics.ingress_messages_in_ingress_pool.inc(); return Ok(ingress_message.into()); @@ -96,6 +98,9 @@ impl Pools { .get_serialized_by_id(ingress_message_id) { Some(bytes) + // Make sure that this is the correct ingress message. [`IngressMessageId`] + // does _not_ uniquely identify ingress messages, we thus need to perform + // an extra check. if SignedIngressId::new(ingress_message_id.clone(), bytes) == *signed_ingress_id => { @@ -226,7 +231,7 @@ fn parse_response( }; let Ok(ingress) = SignedIngress::try_from(response.serialized_ingress_message) else { - metrics.report_download_error("ingress_deserializedion_failed"); + metrics.report_download_error("ingress_deserialization_failed"); return None; }; @@ -321,7 +326,7 @@ mod tests { let pools = mock_pools( Some(ingress_message.clone()), None, - /*expect_consensus_pool_acces=*/ false, + /*expect_consensus_pool_access=*/ false, ); let router = build_axum_router(pools); @@ -348,7 +353,7 @@ mod tests { let pools = mock_pools( None, Some(block.clone()), - /*expect_consensus_pool_acces=*/ true, + /*expect_consensus_pool_access=*/ true, ); let router = build_axum_router(pools); @@ -372,7 +377,7 @@ mod tests { async fn rpc_get_not_found_test() { let ingress_message = SignedIngressBuilder::new().nonce(1).build(); let block = fake_block_proposal(vec![]); - let pools = mock_pools(None, None, /*expect_consensus_pool_acces=*/ true); + let pools = mock_pools(None, None, /*expect_consensus_pool_access=*/ true); let router = build_axum_router(pools); let response = send_request( @@ -417,7 +422,7 @@ mod tests { let pools = mock_pools( Some(ingress_message_1.clone()), Some(block.clone()), - /*expect_consensus_pool_acces=*/ true, + /*expect_consensus_pool_access=*/ true, ); let router = build_axum_router(pools); @@ -440,7 +445,7 @@ mod tests { let pools = mock_pools( None, Some(block.clone()), - /*expect_consensus_pool_acces=*/ true, + /*expect_consensus_pool_access=*/ true, ); let router = build_axum_router(pools);