diff --git a/Cargo.lock b/Cargo.lock index f70c30ce41c..725044ebd7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5416,6 +5416,7 @@ dependencies = [ "futures", "http-body-util", "ic-base-types", + "ic-canister-client-sender", "ic-consensus-manager", "ic-interfaces", "ic-logger", diff --git a/rs/p2p/artifact_downloader/BUILD.bazel b/rs/p2p/artifact_downloader/BUILD.bazel index 0cec405e726..0c1714af4ff 100644 --- a/rs/p2p/artifact_downloader/BUILD.bazel +++ b/rs/p2p/artifact_downloader/BUILD.bazel @@ -27,6 +27,7 @@ DEPENDENCIES = [ ] DEV_DEPENDENCIES = [ + "//rs/canister_client/sender", "//rs/p2p/test_utils", "//rs/test_utilities/consensus", "//rs/test_utilities/types", diff --git a/rs/p2p/artifact_downloader/Cargo.toml b/rs/p2p/artifact_downloader/Cargo.toml index 40a7e00ed5f..df311983432 100644 --- a/rs/p2p/artifact_downloader/Cargo.toml +++ b/rs/p2p/artifact_downloader/Cargo.toml @@ -13,6 +13,7 @@ axum = { workspace = true } backoff = { workspace = true } bytes = { workspace = true } ic-base-types = { path = "../../types/base_types" } +ic-canister-client-sender = { path = "../../canister_client/sender" } ic-consensus-manager = { path = "../consensus_manager" } ic-interfaces = { path = "../../interfaces" } ic-logger = { path = "../../monitoring/logger" } diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs index 98ab7e9bfaf..922bc4e7a07 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs @@ -26,8 +26,11 @@ use super::{ download::download_ingress, metrics::{FetchStrippedConsensusArtifactMetrics, IngressMessageSource, IngressSenderMetrics}, stripper::Strippable, - types::stripped::{ - MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId, + types::{ + stripped::{ + MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId, + }, + SignedIngressId, }, }; @@ -169,12 +172,12 @@ impl ArtifactAssembler .start_timer(); let mut assembler = BlockProposalAssembler::new(stripped_block_proposal); - let missing_ingress_ids = assembler.missing_ingress_messages(); + let stripped_ingress_ids = assembler.missing_ingress_messages(); // For each stripped object in the message, try to fetch it either from the local pools // or from a random peer who is advertising it. - for missing_ingress_id in missing_ingress_ids { + for stripped_ingress_id in stripped_ingress_ids { join_set.spawn(get_or_fetch( - missing_ingress_id, + stripped_ingress_id, self.ingress_pool.clone(), self.transport.clone(), id.as_ref().clone(), @@ -246,7 +249,7 @@ impl ArtifactAssembler /// Tries to get the missing object either from the pool(s) or from the peers who are advertising /// it. async fn get_or_fetch( - ingress_message_id: IngressMessageId, + signed_ingress_id: SignedIngressId, ingress_pool: ValidatedPoolReaderRef, transport: Arc, // Id of the *full* artifact which should contain the missing data @@ -257,13 +260,21 @@ async fn get_or_fetch( peer_rx: P, ) -> (SignedIngress, NodeId) { // First check if the ingress message exists in the Ingress Pool. - if let Some(ingress_message) = ingress_pool.read().unwrap().get(&ingress_message_id) { - return (ingress_message, node_id); + if let Some(ingress_message) = ingress_pool + .read() + .unwrap() + .get(&signed_ingress_id.ingress_message_id) + { + // Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_ + // uniquely identify ingress messages, we thus need to perform an extra check. + if SignedIngressId::from(&ingress_message) == signed_ingress_id { + return (ingress_message, node_id); + } } download_ingress( transport, - ingress_message_id, + signed_ingress_id, full_consensus_message_id, &log, &metrics, @@ -290,7 +301,7 @@ pub(crate) enum AssemblyError { struct BlockProposalAssembler { stripped_block_proposal: StrippedBlockProposal, - ingress_messages: Vec<(IngressMessageId, Option)>, + ingress_messages: Vec<(SignedIngressId, Option)>, } impl BlockProposalAssembler { @@ -300,19 +311,19 @@ impl BlockProposalAssembler { .stripped_ingress_payload .ingress_messages .iter() - .map(|ingress_message_id| (ingress_message_id.clone(), None)) + .map(|signed_ingress_id| (signed_ingress_id.clone(), None)) .collect(), stripped_block_proposal, } } - /// Returns the list of [`IngressMessageId`]s which have been stripped from the block. - pub(crate) fn missing_ingress_messages(&self) -> Vec { + /// Returns the list of ingress messages which have been stripped from the block. + pub(crate) fn missing_ingress_messages(&self) -> Vec { self.ingress_messages .iter() - .filter_map(|(ingress_message_id, maybe_ingress)| { + .filter_map(|(signed_ingress_id, maybe_ingress)| { if maybe_ingress.is_none() { - Some(ingress_message_id) + Some(signed_ingress_id) } else { None } @@ -326,14 +337,14 @@ impl BlockProposalAssembler { &mut self, ingress_message: SignedIngress, ) -> Result<(), InsertionError> { - let ingress_message_id = IngressMessageId::from(&ingress_message); + let signed_ingress_id = SignedIngressId::from(&ingress_message); // We can have at most 1000 elements in the vector, so it should be reasonably fast to do a // linear scan here. let (_, ingress) = self .ingress_messages .iter_mut() - .find(|(id, _maybe_ingress)| *id == ingress_message_id) + .find(|(id, _maybe_ingress)| *id == signed_ingress_id) .ok_or(InsertionError::NotNeeded)?; if ingress.is_some() { @@ -356,7 +367,9 @@ impl BlockProposalAssembler { let ingresses = self .ingress_messages .into_iter() - .map(|(id, message)| message.ok_or_else(|| AssemblyError::Missing(id))) + .map(|(id, message)| { + message.ok_or_else(|| AssemblyError::Missing(id.ingress_message_id)) + }) .collect::, _>>()?; let reconstructed_ingress_payload = IngressPayload::from(ingresses); @@ -377,8 +390,18 @@ impl BlockProposalAssembler { mod tests { use crate::fetch_stripped_artifact::test_utils::{ fake_block_proposal_with_ingresses, fake_ingress_message, - fake_ingress_message_with_arg_size, fake_stripped_block_proposal_with_ingresses, + fake_ingress_message_with_arg_size, fake_ingress_message_with_sig, + fake_stripped_block_proposal_with_ingresses, }; + use crate::fetch_stripped_artifact::types::rpc::GetIngressMessageInBlockResponse; + use bytes::Bytes; + use ic_interfaces::p2p::consensus::BouncerValue; + use ic_logger::no_op_logger; + use ic_p2p_test_utils::mocks::MockBouncerFactory; + use ic_p2p_test_utils::mocks::MockTransport; + use ic_p2p_test_utils::mocks::MockValidatedPoolReader; + use ic_protobuf::proxy::ProtoProxy; + use ic_types_test_utils::ids::NODE_1; use super::*; @@ -502,4 +525,94 @@ mod tests { Err(InsertionError::NotNeeded) ); } + + #[derive(Clone)] + struct MockPeers(NodeId); + + impl Peers for MockPeers { + fn peers(&self) -> Vec { + vec![self.0] + } + } + + fn set_up_assembler_with_fake_dependencies( + ingress_pool_message: Option, + peers_message: Option, + ) -> FetchStrippedConsensusArtifact { + let mut mock_transport = MockTransport::new(); + let mut ingress_pool = MockValidatedPoolReader::::default(); + + if let Some(ingress_message) = ingress_pool_message { + ingress_pool.expect_get().return_const(ingress_message); + } + + if let Some(ingress_message) = peers_message { + let fake_response = axum::response::Response::builder() + .body(Bytes::from( + pb::GetIngressMessageInBlockResponse::proxy_encode( + GetIngressMessageInBlockResponse { + serialized_ingress_message: ingress_message.binary().clone(), + }, + ), + )) + .unwrap(); + + mock_transport + .expect_rpc() + .returning(move |_, _| (Ok(fake_response.clone()))); + } + + let consensus_pool = MockValidatedPoolReader::::default(); + let mut mock_bouncer_factory = MockBouncerFactory::default(); + mock_bouncer_factory + .expect_new_bouncer() + .returning(|_| Box::new(|_| BouncerValue::Wants)); + + let f = FetchStrippedConsensusArtifact::new( + no_op_logger(), + tokio::runtime::Handle::current(), + Arc::new(RwLock::new(consensus_pool)), + Arc::new(RwLock::new(ingress_pool)), + Arc::new(mock_bouncer_factory), + MetricsRegistry::new(), + NODE_1, + ) + .0; + + (f)(Arc::new(mock_transport)) + } + + /// Tests whether the assembler uses the ingress message with the correct signature in the case + /// when the local ingress pool contains an ingress message with the same content as the one in + /// the stripped block proposal but with a different signature. + #[tokio::test] + async fn roundtrip_test_with_two_identical_ingress_messages_different_signatures() { + let (ingress_1, _ingress_1_id) = fake_ingress_message_with_sig("fake_1", vec![1, 2, 3]); + let (ingress_2, _ingress_2_id) = fake_ingress_message_with_sig("fake_1", vec![2, 3, 4]); + assert_eq!( + IngressMessageId::from(&ingress_1), + IngressMessageId::from(&ingress_2) + ); + let block_proposal = fake_block_proposal_with_ingresses(vec![ingress_2.clone()]); + + let assembler = set_up_assembler_with_fake_dependencies( + /*ingress_pool_message=*/ Some(ingress_1.clone()), + /*consensus_pool_message=*/ Some(ingress_2.clone()), + ); + let stripped_block_proposal = + assembler.disassemble_message(ConsensusMessage::BlockProposal(block_proposal.clone())); + let reassembled_block_proposal = assembler + .assemble_message( + stripped_block_proposal.id(), + Some((stripped_block_proposal, NODE_1)), + MockPeers(NODE_1), + ) + .await + .expect("should reassemble the message given the dependencies"); + + assert_eq!( + reassembled_block_proposal, + (ConsensusMessage::BlockProposal(block_proposal), NODE_1) + ); + } } diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs index 4e6d6c0587b..d3880ae850a 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/download.rs @@ -16,9 +16,9 @@ use ic_logger::{warn, ReplicaLogger}; use ic_protobuf::{proxy::ProtoProxy, types::v1 as pb}; use ic_quic_transport::Transport; use ic_types::{ - artifact::{ConsensusMessageId, IngressMessageId}, + artifact::ConsensusMessageId, consensus::{BlockPayload, ConsensusMessage}, - messages::SignedIngress, + messages::{SignedIngress, SignedRequestBytes}, NodeId, }; use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng}; @@ -26,7 +26,10 @@ use tokio::time::{sleep_until, timeout_at, Instant}; use super::{ metrics::{FetchStrippedConsensusArtifactMetrics, IngressSenderMetrics}, - types::rpc::{GetIngressMessageInBlockRequest, GetIngressMessageInBlockResponse}, + types::{ + rpc::{GetIngressMessageInBlockRequest, GetIngressMessageInBlockResponse}, + SignedIngressId, + }, }; type ValidatedPoolReaderRef = Arc + Send + Sync>>; @@ -58,13 +61,19 @@ impl Pools { /// Retrieves the request [`SignedIngress`] from either of the pools. fn get( &self, - ingress_message_id: &IngressMessageId, + signed_ingress_id: &SignedIngressId, block_proposal_id: &ConsensusMessageId, - ) -> Result { + ) -> Result { + let ingress_message_id = &signed_ingress_id.ingress_message_id; + // First check if the requested ingress message exists in the Ingress Pool. if let Some(ingress_message) = self.ingress_pool.read().unwrap().get(ingress_message_id) { - self.metrics.ingress_messages_in_ingress_pool.inc(); - return Ok(ingress_message); + // Make sure that this is the correct ingress message. [`IngressMessageId`] does _not_ + // uniquely identify ingress messages, we thus need to perform an extra check. + if SignedIngressId::from(&ingress_message) == *signed_ingress_id { + self.metrics.ingress_messages_in_ingress_pool.inc(); + return Ok(ingress_message.into()); + } } // Otherwise find the block which should contain the ingress message. @@ -83,10 +92,20 @@ impl Pools { return Err(PoolsAccessError::SummaryBlock); }; - match data_payload.batch.ingress.get_by_id(ingress_message_id) { - Ok(Some(ingress_message)) => { + match data_payload + .batch + .ingress + .get_serialized_by_id(ingress_message_id) + { + Some(bytes) + // Make sure that this is the correct ingress message. [`IngressMessageId`] + // does _not_ uniquely identify ingress messages, we thus need to perform + // an extra check. + if SignedIngressId::new(ingress_message_id.clone(), bytes) + == *signed_ingress_id => + { self.metrics.ingress_messages_in_block.inc(); - Ok(ingress_message) + Ok(bytes.clone()) } _ => { self.metrics.ingress_messages_not_found.inc(); @@ -112,10 +131,12 @@ async fn rpc_handler(State(pools): State, payload: Bytes) -> Result Ok::<_, StatusCode>(Bytes::from( + match pools.get(&request.signed_ingress_id, &request.block_proposal_id) { + Ok(serialized_ingress_message) => Ok::<_, StatusCode>(Bytes::from( pb::GetIngressMessageInBlockResponse::proxy_encode( - GetIngressMessageInBlockResponse { ingress_message }, + GetIngressMessageInBlockResponse { + serialized_ingress_message, + }, ), )), Err(PoolsAccessError::IngressMessageNotFound | PoolsAccessError::BlockNotFound) => { @@ -137,7 +158,7 @@ async fn rpc_handler(State(pools): State, payload: Bytes) -> Result( transport: Arc, - ingress_message_id: IngressMessageId, + signed_ingress_id: SignedIngressId, block_proposal_id: ConsensusMessageId, log: &ReplicaLogger, metrics: &FetchStrippedConsensusArtifactMetrics, @@ -153,7 +174,7 @@ pub(crate) async fn download_ingress( let mut rng = SmallRng::from_entropy(); let request = GetIngressMessageInBlockRequest { - ingress_message_id: ingress_message_id.clone(), + signed_ingress_id: signed_ingress_id.clone(), block_proposal_id, }; let bytes = Bytes::from(pb::GetIngressMessageInBlockRequest::proxy_encode(request)); @@ -167,16 +188,12 @@ pub(crate) async fn download_ingress( if let Some(peer) = { peer_rx.peers().into_iter().choose(&mut rng) } { match timeout_at(next_request_at, transport.rpc(&peer, request.clone())).await { Ok(Ok(response)) if response.status() == StatusCode::OK => { - let body = response.into_body(); - if let Ok(response) = pb::GetIngressMessageInBlockResponse::proxy_decode(&body) - .and_then(|proto: pb::GetIngressMessageInBlockResponse| { - GetIngressMessageInBlockResponse::try_from(proto) - }) - { - if IngressMessageId::from(&response.ingress_message) == ingress_message_id { + if let Some(ingress_message) = parse_response(response.into_body(), metrics) { + if SignedIngressId::from(&ingress_message) == signed_ingress_id { metrics.active_ingress_message_downloads.dec(); - return (response.ingress_message, peer); + return (ingress_message, peer); } else { + metrics.report_download_error("mismatched_signed_ingress_id"); warn!( log, "Peer {} responded with wrong artifact for advert", peer @@ -184,8 +201,14 @@ pub(crate) async fn download_ingress( } } } - _ => { - metrics.total_ingress_message_download_errors.inc(); + Ok(Ok(_response)) => { + metrics.report_download_error("status_not_ok"); + } + Ok(Err(_rpc_error)) => { + metrics.report_download_error("rpc_error"); + } + Err(_timeout) => { + metrics.report_download_error("timeout"); } } } @@ -194,6 +217,27 @@ pub(crate) async fn download_ingress( } } +fn parse_response( + body: Bytes, + metrics: &FetchStrippedConsensusArtifactMetrics, +) -> Option { + let Ok(response) = pb::GetIngressMessageInBlockResponse::proxy_decode(&body).and_then( + |proto: pb::GetIngressMessageInBlockResponse| { + GetIngressMessageInBlockResponse::try_from(proto) + }, + ) else { + metrics.report_download_error("response_decoding_failed"); + return None; + }; + + let Ok(ingress) = SignedIngress::try_from(response.serialized_ingress_message) else { + metrics.report_download_error("ingress_deserialization_failed"); + return None; + }; + + Some(ingress) +} + #[cfg(test)] mod tests { use crate::fetch_stripped_artifact::test_utils::{ @@ -203,19 +247,20 @@ mod tests { use super::*; use http_body_util::Full; + use ic_canister_client_sender::Sender; use ic_logger::no_op_logger; use ic_metrics::MetricsRegistry; use ic_p2p_test_utils::mocks::{MockPeers, MockTransport, MockValidatedPoolReader}; use ic_test_utilities_types::messages::SignedIngressBuilder; + use ic_types::{artifact::IngressMessageId, time::UNIX_EPOCH}; use ic_types_test_utils::ids::NODE_1; use tower::ServiceExt; fn mock_pools( ingress_message: Option, consensus_message: Option, + expect_consensus_pool_access: bool, ) -> Pools { - let should_call_consensus_pool = ingress_message.is_none(); - let mut ingress_pool = MockValidatedPoolReader::::default(); if let Some(ingress_message) = ingress_message { ingress_pool @@ -238,7 +283,7 @@ mod tests { ))) .once() .return_const(consensus_message.clone()); - } else if should_call_consensus_pool { + } else if expect_consensus_pool_access { consensus_pool.expect_get().once().return_const(None); } @@ -278,54 +323,114 @@ mod tests { async fn rpc_get_from_ingress_pool_test() { let ingress_message = SignedIngressBuilder::new().nonce(1).build(); let block = fake_block_proposal(vec![]); - let pools = mock_pools(Some(ingress_message.clone()), None); + let pools = mock_pools( + Some(ingress_message.clone()), + None, + /*expect_consensus_pool_access=*/ false, + ); let router = build_axum_router(pools); let response = send_request( router, request( ConsensusMessageId::from(&block), - IngressMessageId::from(&ingress_message), + SignedIngressId::from(&ingress_message), ), ) .await .expect("Should return a valid response"); - assert_eq!(response.ingress_message, ingress_message); + assert_eq!( + &response.serialized_ingress_message, + ingress_message.binary() + ); } #[tokio::test] async fn rpc_get_from_consensus_pool_test() { let ingress_message = SignedIngressBuilder::new().nonce(1).build(); let block = fake_block_proposal(vec![ingress_message.clone()]); - let pools = mock_pools(None, Some(block.clone())); + let pools = mock_pools( + None, + Some(block.clone()), + /*expect_consensus_pool_access=*/ true, + ); let router = build_axum_router(pools); let response = send_request( router, request( ConsensusMessageId::from(&block), - IngressMessageId::from(&ingress_message), + SignedIngressId::from(&ingress_message), ), ) .await .expect("Should return a valid response"); - assert_eq!(response.ingress_message, ingress_message); + assert_eq!( + &response.serialized_ingress_message, + ingress_message.binary() + ); } #[tokio::test] async fn rpc_get_not_found_test() { let ingress_message = SignedIngressBuilder::new().nonce(1).build(); let block = fake_block_proposal(vec![]); - let pools = mock_pools(None, None); + let pools = mock_pools(None, None, /*expect_consensus_pool_access=*/ true); let router = build_axum_router(pools); let response = send_request( router, request( ConsensusMessageId::from(&block), - IngressMessageId::from(&ingress_message), + SignedIngressId::from(&ingress_message), + ), + ) + .await; + + assert_eq!(response, Err(StatusCode::NOT_FOUND)); + } + + #[tokio::test] + async fn rpc_get_not_found_mismatched_hash_test() { + let ingress_message = |signature: Vec| { + SignedIngressBuilder::new() + .nonce(1) + .expiry_time(UNIX_EPOCH) + .sign_for_sender(&Sender::Node { + pub_key: vec![0, 1, 2, 3], + sign: Arc::new(move |_| Ok(signature.clone())), + }) + .build() + }; + + let ingress_message_1 = ingress_message(vec![1, 1, 1]); + let ingress_message_2 = ingress_message(vec![2, 2, 2]); + let ingress_message_3 = ingress_message(vec![3, 3, 3]); + + assert_eq!( + IngressMessageId::from(&ingress_message_1), + IngressMessageId::from(&ingress_message_2) + ); + assert_eq!( + IngressMessageId::from(&ingress_message_2), + IngressMessageId::from(&ingress_message_3) + ); + + let block = fake_block_proposal(vec![ingress_message_2.clone()]); + let pools = mock_pools( + Some(ingress_message_1.clone()), + Some(block.clone()), + /*expect_consensus_pool_access=*/ true, + ); + let router = build_axum_router(pools); + + let response = send_request( + router, + request( + ConsensusMessageId::from(&block), + SignedIngressId::from(&ingress_message_3), ), ) .await; @@ -337,14 +442,18 @@ mod tests { async fn rpc_get_summary_block_returns_bad_request_test() { let ingress_message = SignedIngressBuilder::new().nonce(1).build(); let block = fake_summary_block_proposal(); - let pools = mock_pools(None, Some(block.clone())); + let pools = mock_pools( + None, + Some(block.clone()), + /*expect_consensus_pool_access=*/ true, + ); let router = build_axum_router(pools); let response = send_request( router, request( ConsensusMessageId::from(&block), - IngressMessageId::from(&ingress_message), + SignedIngressId::from(&ingress_message), ), ) .await; @@ -366,7 +475,7 @@ mod tests { let response = download_ingress( Arc::new(mock_transport), - IngressMessageId::from(&ingress_message), + SignedIngressId::from(&ingress_message), ConsensusMessageId::from(&block), &no_op_logger(), &FetchStrippedConsensusArtifactMetrics::new(&MetricsRegistry::new()), @@ -378,7 +487,6 @@ mod tests { } // Utility functions below - fn fake_block_proposal(ingress_messages: Vec) -> ConsensusMessage { let block_proposal = fake_block_proposal_with_ingresses(ingress_messages); @@ -387,10 +495,10 @@ mod tests { fn request( consensus_message_id: ConsensusMessageId, - ingress_message_id: IngressMessageId, + signed_ingress_id: SignedIngressId, ) -> Bytes { let request = GetIngressMessageInBlockRequest { - ingress_message_id, + signed_ingress_id, block_proposal_id: consensus_message_id, }; @@ -401,7 +509,9 @@ mod tests { axum::response::Response::builder() .body(Bytes::from( pb::GetIngressMessageInBlockResponse::proxy_encode( - GetIngressMessageInBlockResponse { ingress_message }, + GetIngressMessageInBlockResponse { + serialized_ingress_message: ingress_message.binary().clone(), + }, ), )) .unwrap() diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/metrics.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/metrics.rs index 6ea3c0b6488..9d3336a4b29 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/metrics.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/metrics.rs @@ -1,5 +1,5 @@ use ic_metrics::{buckets::decimal_buckets_with_zero, MetricsRegistry}; -use prometheus::{Histogram, HistogramVec, IntCounter, IntGauge}; +use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge}; const SOURCE_LABEL: &str = "source"; @@ -10,7 +10,7 @@ pub(super) struct FetchStrippedConsensusArtifactMetrics { pub(super) missing_ingress_messages_bytes: Histogram, pub(super) total_block_assembly_duration: Histogram, pub(super) active_ingress_message_downloads: IntGauge, - pub(super) total_ingress_message_download_errors: IntCounter, + pub(super) total_ingress_message_download_errors: IntCounterVec, } #[derive(Copy, Clone)] @@ -58,10 +58,11 @@ impl FetchStrippedConsensusArtifactMetrics { "ic_stripped_consensus_artifact_active_ingress_message_downloads", "The number of active missing ingress message downloads", ), - total_ingress_message_download_errors: metrics_registry.int_counter( + total_ingress_message_download_errors: metrics_registry.int_counter_vec( "ic_stripped_consensus_artifact_total_ingress_message_download_errors", "The total number of errors occurred while downloading \ missing ingress messages", + &["error"], ), } } @@ -71,6 +72,12 @@ impl FetchStrippedConsensusArtifactMetrics { .with_label_values(&[source.as_str()]) .observe(count as f64) } + + pub(super) fn report_download_error(&self, label: &str) { + self.total_ingress_message_download_errors + .with_label_values(&[label]) + .inc() + } } #[derive(Clone)] diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/stripper.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/stripper.rs index 30fab0ed7ff..819a7816972 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/stripper.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/stripper.rs @@ -3,8 +3,9 @@ use ic_types::{ artifact::IdentifiableArtifact, batch::IngressPayload, consensus::ConsensusMessage, }; -use super::types::stripped::{ - MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedIngressPayload, +use super::types::{ + stripped::{MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedIngressPayload}, + SignedIngressId, }; /// Provides functionality for stripping objects of given information. @@ -25,11 +26,11 @@ impl Strippable for ConsensusMessage { match self { // We only strip data blocks. - ConsensusMessage::BlockProposal(block_proposal) + ConsensusMessage::BlockProposal(ref block_proposal) if block_proposal.as_ref().payload.payload_type() == ic_types::consensus::PayloadType::Data => { - let mut proto = pb::BlockProposal::from(&block_proposal); + let mut proto = pb::BlockProposal::from(block_proposal); // Remove the ingress payload from the proto. if let Some(block) = proto.value.as_mut() { @@ -54,9 +55,12 @@ impl Strippable for &IngressPayload { type Output = StrippedIngressPayload; fn strip(self) -> Self::Output { - Self::Output { - ingress_messages: self.message_ids().cloned().collect(), - } + let ingress_messages = self + .iter_serialized() + .map(|(id, bytes)| SignedIngressId::new(id.clone(), bytes)) + .collect(); + + StrippedIngressPayload { ingress_messages } } } diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/test_utils.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/test_utils.rs index f6eee4a723a..91be6f561e4 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/test_utils.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/test_utils.rs @@ -4,7 +4,7 @@ use ic_test_utilities_consensus::{ make_genesis, }; use ic_types::{ - artifact::{ConsensusMessageId, IngressMessageId}, + artifact::ConsensusMessageId, batch::{BatchPayload, IngressPayload}, consensus::{ dkg::{DkgDataPayload, Summary}, @@ -13,22 +13,40 @@ use ic_types::{ }, crypto::{CryptoHash, CryptoHashOf}, messages::{Blob, HttpCallContent, HttpCanisterUpdate, HttpRequestEnvelope, SignedIngress}, - time::expiry_time_from_now, + time::UNIX_EPOCH, Height, }; use ic_types_test_utils::ids::node_test_id; -use super::types::stripped::{StrippedBlockProposal, StrippedIngressPayload}; +use super::types::{ + stripped::{StrippedBlockProposal, StrippedIngressPayload}, + SignedIngressId, +}; -pub(crate) fn fake_ingress_message(method_name: &str) -> (SignedIngress, IngressMessageId) { - fake_ingress_message_with_arg_size(method_name, 0) +pub(crate) fn fake_ingress_message(method_name: &str) -> (SignedIngress, SignedIngressId) { + fake_ingress_message_with_arg_size_and_sig(method_name, 0, vec![1; 32]) +} + +pub(crate) fn fake_ingress_message_with_sig( + method_name: &str, + sig: Vec, +) -> (SignedIngress, SignedIngressId) { + fake_ingress_message_with_arg_size_and_sig(method_name, 0, sig) } pub(crate) fn fake_ingress_message_with_arg_size( method_name: &str, arg_size: usize, -) -> (SignedIngress, IngressMessageId) { - let ingress_expiry = expiry_time_from_now(); +) -> (SignedIngress, SignedIngressId) { + fake_ingress_message_with_arg_size_and_sig(method_name, arg_size, vec![1; 32]) +} + +pub(crate) fn fake_ingress_message_with_arg_size_and_sig( + method_name: &str, + arg_size: usize, + sig: Vec, +) -> (SignedIngress, SignedIngressId) { + let ingress_expiry = UNIX_EPOCH; let content = HttpCallContent::Call { update: HttpCanisterUpdate { canister_id: Blob(vec![42; 8]), @@ -39,17 +57,18 @@ pub(crate) fn fake_ingress_message_with_arg_size( ingress_expiry: ingress_expiry.as_nanos_since_unix_epoch(), }, }; - let ingress = HttpRequestEnvelope:: { + let ingress: SignedIngress = HttpRequestEnvelope:: { content, sender_pubkey: Some(Blob(vec![2; 32])), - sender_sig: Some(Blob(vec![1; 32])), + sender_sig: Some(Blob(sig)), sender_delegation: None, } .try_into() .unwrap(); - let ingress_id = IngressMessageId::from(&ingress); - (ingress, ingress_id) + let signed_ingress_id = SignedIngressId::from(&ingress); + + (ingress, signed_ingress_id) } pub(crate) fn fake_block_proposal_with_ingresses( @@ -77,7 +96,7 @@ pub(crate) fn fake_block_proposal_with_ingresses( } pub(crate) fn fake_stripped_block_proposal_with_ingresses( - ingress_messages: Vec, + ingress_messages: Vec, ) -> StrippedBlockProposal { StrippedBlockProposal { block_proposal_without_ingresses_proto: pb::BlockProposal::default(), diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types.rs index 66e598f8246..4e119073286 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types.rs @@ -1,2 +1,51 @@ +use ic_protobuf::proxy::{try_from_option_field, ProxyDecodeError}; +use ic_protobuf::types::v1 as pb; +use ic_types::crypto::CryptoHash; +use ic_types::messages::SignedIngress; +use ic_types::{artifact::IngressMessageId, crypto::CryptoHashOf, messages::SignedRequestBytes}; + pub(super) mod rpc; pub(super) mod stripped; + +type IngressBytesHash = CryptoHashOf; + +/// A unique identifier of a [`SignedIngress`]. +/// Note that the hash of [`SignedIngress::binary`] should be enough to uniquely identify a +/// [`SignedIngress`] because all the fields of [`SignedIngress`] are derived from it. +/// Note also that [`IngressMessageId`] is not strictly required here to uniquely identify +/// [`SignedIngress`] but we keep it here because of [`IngressPool`] API. +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct SignedIngressId { + pub(crate) ingress_message_id: IngressMessageId, + pub(crate) ingress_bytes_hash: IngressBytesHash, +} + +impl SignedIngressId { + pub(crate) fn new(ingress_message_id: IngressMessageId, bytes: &SignedRequestBytes) -> Self { + Self { + ingress_message_id, + ingress_bytes_hash: ic_types::crypto::crypto_hash(bytes), + } + } +} + +impl From<&SignedIngress> for SignedIngressId { + fn from(value: &SignedIngress) -> Self { + Self::new(IngressMessageId::from(value), value.binary()) + } +} + +impl TryFrom for SignedIngressId { + type Error = ProxyDecodeError; + + fn try_from(value: pb::StrippedIngressMessage) -> Result { + let ingress_message_id = + try_from_option_field(value.stripped, "StrippedIngressMessage::stripped")?; + let ingress_bytes_hash = CryptoHashOf::from(CryptoHash(value.ingress_bytes_hash)); + + Ok(SignedIngressId { + ingress_message_id, + ingress_bytes_hash, + }) + } +} diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/rpc.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/rpc.rs index 3f2941855b2..401b9fd15da 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/rpc.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/rpc.rs @@ -3,18 +3,19 @@ use ic_protobuf::{ types::v1 as pb, }; use ic_types::{ - artifact::{ConsensusMessageId, IngressMessageId}, + artifact::ConsensusMessageId, consensus::ConsensusMessageHash, - messages::{SignedIngress, SignedRequestBytes}, + crypto::{CryptoHash, CryptoHashOf}, + messages::SignedRequestBytes, }; -use bytes::Bytes; +use super::SignedIngressId; /// Parameters for the `/block/ingress/` rpc requests. #[derive(Clone, Debug, PartialEq)] // FIXME(kpop): check that it's a block proposal indeed pub(crate) struct GetIngressMessageInBlockRequest { - pub(crate) ingress_message_id: IngressMessageId, + pub(crate) signed_ingress_id: SignedIngressId, pub(crate) block_proposal_id: ConsensusMessageId, } @@ -30,6 +31,7 @@ impl TryFrom for GetIngressMessageInBlockRe value.block_proposal_id, "GetIngressMessageInBlockRequest::block_proposal_id", )?; + let ingress_bytes_hash = CryptoHashOf::from(CryptoHash(value.ingress_bytes_hash)); match &consensus_message_id.hash { ConsensusMessageHash::BlockProposal(_) => {} @@ -42,8 +44,11 @@ impl TryFrom for GetIngressMessageInBlockRe }; Ok(Self { - ingress_message_id, block_proposal_id: consensus_message_id, + signed_ingress_id: SignedIngressId { + ingress_message_id, + ingress_bytes_hash, + }, }) } } @@ -51,8 +56,9 @@ impl TryFrom for GetIngressMessageInBlockRe impl From for pb::GetIngressMessageInBlockRequest { fn from(value: GetIngressMessageInBlockRequest) -> Self { Self { - ingress_message_id: Some(value.ingress_message_id.into()), + ingress_message_id: Some(value.signed_ingress_id.ingress_message_id.into()), block_proposal_id: Some(value.block_proposal_id.into()), + ingress_bytes_hash: value.signed_ingress_id.ingress_bytes_hash.get().0, } } } @@ -60,23 +66,23 @@ impl From for pb::GetIngressMessageInBlockReque /// `/block/ingress/` rpc response. #[derive(Debug, PartialEq)] pub(crate) struct GetIngressMessageInBlockResponse { - pub(crate) ingress_message: SignedIngress, + pub(crate) serialized_ingress_message: SignedRequestBytes, } impl TryFrom for GetIngressMessageInBlockResponse { type Error = ProxyDecodeError; fn try_from(value: pb::GetIngressMessageInBlockResponse) -> Result { - let ingress_message = SignedIngress::try_from(Bytes::from(value.ingress_message))?; - - Ok(Self { ingress_message }) + Ok(Self { + serialized_ingress_message: SignedRequestBytes::from(value.ingress_message), + }) } } impl From for pb::GetIngressMessageInBlockResponse { fn from(value: GetIngressMessageInBlockResponse) -> Self { pb::GetIngressMessageInBlockResponse { - ingress_message: SignedRequestBytes::from(value.ingress_message).into(), + ingress_message: value.serialized_ingress_message.into(), } } } @@ -84,6 +90,7 @@ impl From for pb::GetIngressMessageInBlockResp #[cfg(test)] mod tests { use ic_types::{ + artifact::IngressMessageId, crypto::{CryptoHash, CryptoHashOf}, time::UNIX_EPOCH, Height, @@ -95,7 +102,10 @@ mod tests { #[test] fn get_ingress_message_in_block_request_serialization_test() { let request = GetIngressMessageInBlockRequest { - ingress_message_id: IngressMessageId::new(UNIX_EPOCH, message_test_id(42)), + signed_ingress_id: SignedIngressId { + ingress_message_id: IngressMessageId::new(UNIX_EPOCH, message_test_id(42)), + ingress_bytes_hash: CryptoHashOf::from(CryptoHash(vec![1, 2, 3])), + }, block_proposal_id: ConsensusMessageId { hash: ConsensusMessageHash::BlockProposal(CryptoHashOf::from(CryptoHash( Vec::new(), diff --git a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/stripped.rs b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/stripped.rs index 0f40063b8f4..f2a45aef3d6 100644 --- a/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/stripped.rs +++ b/rs/p2p/artifact_downloader/src/fetch_stripped_artifact/types/stripped.rs @@ -3,14 +3,16 @@ use ic_protobuf::{ types::v1 as pb, }; use ic_types::{ - artifact::{ConsensusMessageId, IdentifiableArtifact, IngressMessageId, PbArtifact}, + artifact::{ConsensusMessageId, IdentifiableArtifact, PbArtifact}, consensus::ConsensusMessage, }; +use super::SignedIngressId; + /// Stripped version of the [`IngressPayload`]. #[derive(Clone, Debug, Default, PartialEq)] pub(crate) struct StrippedIngressPayload { - pub(crate) ingress_messages: Vec, + pub(crate) ingress_messages: Vec, } /// Stripped version of the [`BlockProposal`]. @@ -76,12 +78,7 @@ impl TryFrom for StrippedBlockProposal { ingress_messages: value .ingress_messages .into_iter() - .map(|stripped_ingress| { - try_from_option_field( - stripped_ingress.stripped, - "StrippedIngressMessage::stripped", - ) - }) + .map(SignedIngressId::try_from) .collect::, _>>()?, }, unstripped_consensus_message_id: try_from_option_field( @@ -102,8 +99,9 @@ impl From for pb::StrippedBlockProposal { .stripped_ingress_payload .ingress_messages .into_iter() - .map(|ingress_id| pb::StrippedIngressMessage { - stripped: Some(ingress_id.into()), + .map(|signed_ingress_id| pb::StrippedIngressMessage { + stripped: Some(signed_ingress_id.ingress_message_id.into()), + ingress_bytes_hash: signed_ingress_id.ingress_bytes_hash.get().0, }) .collect(), unstripped_consensus_message_id: Some(value.unstripped_consensus_message_id.into()), diff --git a/rs/protobuf/def/types/v1/consensus.proto b/rs/protobuf/def/types/v1/consensus.proto index 8737a307d34..51a7db0f0df 100644 --- a/rs/protobuf/def/types/v1/consensus.proto +++ b/rs/protobuf/def/types/v1/consensus.proto @@ -219,6 +219,7 @@ message IngressPayload { message GetIngressMessageInBlockRequest { IngressMessageId ingress_message_id = 1; ConsensusMessageId block_proposal_id = 2; + bytes ingress_bytes_hash = 3; } message GetIngressMessageInBlockResponse { @@ -233,6 +234,7 @@ message StrippedBlockProposal { message StrippedIngressMessage { IngressMessageId stripped = 1; + bytes ingress_bytes_hash = 2; } message StrippedConsensusMessage { diff --git a/rs/protobuf/src/gen/types/types.v1.rs b/rs/protobuf/src/gen/types/types.v1.rs index 1ac349addae..b6f72194e8e 100644 --- a/rs/protobuf/src/gen/types/types.v1.rs +++ b/rs/protobuf/src/gen/types/types.v1.rs @@ -1548,6 +1548,8 @@ pub struct GetIngressMessageInBlockRequest { pub ingress_message_id: ::core::option::Option, #[prost(message, optional, tag = "2")] pub block_proposal_id: ::core::option::Option, + #[prost(bytes = "vec", tag = "3")] + pub ingress_bytes_hash: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetIngressMessageInBlockResponse { @@ -1567,6 +1569,8 @@ pub struct StrippedBlockProposal { pub struct StrippedIngressMessage { #[prost(message, optional, tag = "1")] pub stripped: ::core::option::Option, + #[prost(bytes = "vec", tag = "2")] + pub ingress_bytes_hash: ::prost::alloc::vec::Vec, } #[allow(clippy::large_enum_variant)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/rs/types/types/src/batch/ingress.rs b/rs/types/types/src/batch/ingress.rs index e1379a92870..79ad4c296ac 100644 --- a/rs/types/types/src/batch/ingress.rs +++ b/rs/types/types/src/batch/ingress.rs @@ -89,16 +89,12 @@ impl IngressPayload { self.serialized_ingress_messages.is_empty() } - /// Return the [`SignedIngress`] referenced by the [`IngressMessageId`]. - /// Return [`IngressPayloadError`] if we fail to deserialize the message. - pub fn get_by_id( + /// Return the [`SignedRequestBytes`] referenced by the [`IngressMessageId`]. + pub fn get_serialized_by_id( &self, ingress_message_id: &IngressMessageId, - ) -> Result, IngressPayloadError> { - self.serialized_ingress_messages - .get(ingress_message_id) - .map(|bytes| SignedIngress::try_from(bytes.clone()).map_err(IngressPayloadError)) - .transpose() + ) -> Option<&SignedRequestBytes> { + self.serialized_ingress_messages.get(ingress_message_id) } /// Iterates over the ingress messages in their deserialized form. @@ -117,6 +113,12 @@ impl IngressPayload { ) }) } + + pub fn iter_serialized( + &self, + ) -> impl Iterator { + self.serialized_ingress_messages.iter() + } } impl CountBytes for IngressPayload { @@ -149,7 +151,6 @@ impl From> for IngressPayload { impl TryFrom for Vec { type Error = IngressPayloadError; - fn try_from(payload: IngressPayload) -> Result, Self::Error> { payload .serialized_ingress_messages @@ -267,10 +268,10 @@ mod tests { payload ); // Individual lookup works. - assert_eq!(payload.get_by_id(&id1), Ok(Some(m1))); - assert_eq!(payload.get_by_id(&id2), Ok(Some(m2))); - assert_eq!(payload.get_by_id(&id3), Ok(Some(m3))); - assert_eq!(payload.get_by_id(&id4), Ok(None)); + assert_eq!(payload.get_serialized_by_id(&id1), Some(m1.binary())); + assert_eq!(payload.get_serialized_by_id(&id2), Some(m2.binary())); + assert_eq!(payload.get_serialized_by_id(&id3), Some(m3.binary())); + assert_eq!(payload.get_serialized_by_id(&id4), None); // Converting back to messages should match original assert_eq!(msgs, >::try_from(payload).unwrap()); }