Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): fix the hashes-in-blocks feature implementation #3529

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rs/p2p/artifact_downloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DEPENDENCIES = [
]

DEV_DEPENDENCIES = [
"//rs/canister_client/sender",
"//rs/p2p/test_utils",
"//rs/test_utilities/consensus",
"//rs/test_utilities/types",
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/artifact_downloader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ axum = { workspace = true }
backoff = { workspace = true }
bytes = { workspace = true }
ic-base-types = { path = "../../types/base_types" }
ic-canister-client-sender = { path = "../../canister_client/sender" }
ic-consensus-manager = { path = "../consensus_manager" }
ic-interfaces = { path = "../../interfaces" }
ic-logger = { path = "../../monitoring/logger" }
Expand Down
139 changes: 125 additions & 14 deletions rs/p2p/artifact_downloader/src/fetch_stripped_artifact/assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ use super::{
download::download_ingress,
metrics::{FetchStrippedConsensusArtifactMetrics, IngressMessageSource, IngressSenderMetrics},
stripper::Strippable,
types::stripped::{
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
types::{
stripped::{
MaybeStrippedConsensusMessage, StrippedBlockProposal, StrippedConsensusMessageId,
},
SignedIngressId,
},
};

Expand Down Expand Up @@ -246,7 +249,7 @@ impl ArtifactAssembler<ConsensusMessage, MaybeStrippedConsensusMessage>
/// Tries to get the missing object either from the pool(s) or from the peers who are advertising
/// it.
async fn get_or_fetch<P: Peers>(
ingress_message_id: IngressMessageId,
signed_ingress_id: SignedIngressId,
ingress_pool: ValidatedPoolReaderRef<SignedIngress>,
transport: Arc<dyn Transport>,
// Id of the *full* artifact which should contain the missing data
Expand All @@ -257,13 +260,19 @@ async fn get_or_fetch<P: Peers>(
peer_rx: P,
) -> (SignedIngress, NodeId) {
// First check if the ingress message exists in the Ingress Pool.
if let Some(ingress_message) = ingress_pool.read().unwrap().get(&ingress_message_id) {
return (ingress_message, node_id);
if let Some(ingress_message) = ingress_pool
.read()
.unwrap()
.get(&signed_ingress_id.ingress_message_id)
{
if SignedIngressId::from(&ingress_message) == signed_ingress_id {
kpop-dfinity marked this conversation as resolved.
Show resolved Hide resolved
return (ingress_message, node_id);
}
}

download_ingress(
transport,
ingress_message_id,
signed_ingress_id,
kpop-dfinity marked this conversation as resolved.
Show resolved Hide resolved
full_consensus_message_id,
&log,
&metrics,
Expand All @@ -290,7 +299,7 @@ pub(crate) enum AssemblyError {

struct BlockProposalAssembler {
stripped_block_proposal: StrippedBlockProposal,
ingress_messages: Vec<(IngressMessageId, Option<SignedIngress>)>,
ingress_messages: Vec<(SignedIngressId, Option<SignedIngress>)>,
}

impl BlockProposalAssembler {
Expand All @@ -300,14 +309,14 @@ impl BlockProposalAssembler {
.stripped_ingress_payload
.ingress_messages
.iter()
.map(|ingress_message_id| (ingress_message_id.clone(), None))
.map(|ingress_message| (ingress_message.clone(), None))
kpop-dfinity marked this conversation as resolved.
Show resolved Hide resolved
.collect(),
stripped_block_proposal,
}
}

/// Returns the list of [`IngressMessageId`]s which have been stripped from the block.
pub(crate) fn missing_ingress_messages(&self) -> Vec<IngressMessageId> {
/// Returns the list of ingress messages which have been stripped from the block.
pub(crate) fn missing_ingress_messages(&self) -> Vec<SignedIngressId> {
self.ingress_messages
.iter()
.filter_map(|(ingress_message_id, maybe_ingress)| {
Expand All @@ -326,14 +335,14 @@ impl BlockProposalAssembler {
&mut self,
ingress_message: SignedIngress,
) -> Result<(), InsertionError> {
let ingress_message_id = IngressMessageId::from(&ingress_message);
let signed_ingress_id = SignedIngressId::from(&ingress_message);

// We can have at most 1000 elements in the vector, so it should be reasonably fast to do a
// linear scan here.
let (_, ingress) = self
.ingress_messages
.iter_mut()
.find(|(id, _maybe_ingress)| *id == ingress_message_id)
.find(|(id, _maybe_ingress)| *id == signed_ingress_id)
.ok_or(InsertionError::NotNeeded)?;

if ingress.is_some() {
Expand All @@ -356,7 +365,9 @@ impl BlockProposalAssembler {
let ingresses = self
.ingress_messages
.into_iter()
.map(|(id, message)| message.ok_or_else(|| AssemblyError::Missing(id)))
.map(|(id, message)| {
message.ok_or_else(|| AssemblyError::Missing(id.ingress_message_id))
})
.collect::<Result<Vec<_>, _>>()?;
let reconstructed_ingress_payload = IngressPayload::from(ingresses);

Expand All @@ -377,8 +388,18 @@ impl BlockProposalAssembler {
mod tests {
use crate::fetch_stripped_artifact::test_utils::{
fake_block_proposal_with_ingresses, fake_ingress_message,
fake_ingress_message_with_arg_size, fake_stripped_block_proposal_with_ingresses,
fake_ingress_message_with_arg_size, fake_ingress_message_with_sig,
fake_stripped_block_proposal_with_ingresses,
};
use crate::fetch_stripped_artifact::types::rpc::GetIngressMessageInBlockResponse;
use bytes::Bytes;
use ic_interfaces::p2p::consensus::BouncerValue;
use ic_logger::no_op_logger;
use ic_p2p_test_utils::mocks::MockBouncerFactory;
use ic_p2p_test_utils::mocks::MockTransport;
use ic_p2p_test_utils::mocks::MockValidatedPoolReader;
use ic_protobuf::proxy::ProtoProxy;
use ic_types_test_utils::ids::NODE_1;

use super::*;

Expand Down Expand Up @@ -502,4 +523,94 @@ mod tests {
Err(InsertionError::NotNeeded)
);
}

#[derive(Clone)]
struct MockPeers(NodeId);

impl Peers for MockPeers {
fn peers(&self) -> Vec<NodeId> {
vec![self.0]
}
}

fn set_up_assembler_with_fake_dependencies(
ingress_pool_message: Option<SignedIngress>,
peers_message: Option<SignedIngress>,
) -> FetchStrippedConsensusArtifact {
let mut mock_transport = MockTransport::new();
let mut ingress_pool = MockValidatedPoolReader::<SignedIngress>::default();

if let Some(ingress_message) = ingress_pool_message {
ingress_pool.expect_get().return_const(ingress_message);
}

if let Some(ingress_message) = peers_message {
let fake_response = axum::response::Response::builder()
.body(Bytes::from(
pb::GetIngressMessageInBlockResponse::proxy_encode(
GetIngressMessageInBlockResponse {
serialized_ingress_message: ingress_message.binary().clone(),
},
),
))
.unwrap();

mock_transport
.expect_rpc()
.returning(move |_, _| (Ok(fake_response.clone())));
}

let consensus_pool = MockValidatedPoolReader::<ConsensusMessage>::default();
let mut mock_bouncer_factory = MockBouncerFactory::default();
mock_bouncer_factory
.expect_new_bouncer()
.returning(|_| Box::new(|_| BouncerValue::Wants));

let f = FetchStrippedConsensusArtifact::new(
no_op_logger(),
tokio::runtime::Handle::current(),
Arc::new(RwLock::new(consensus_pool)),
Arc::new(RwLock::new(ingress_pool)),
Arc::new(mock_bouncer_factory),
MetricsRegistry::new(),
NODE_1,
)
.0;

(f)(Arc::new(mock_transport))
}

/// Tests whether the assembler uses the ingress message with the correct signature in the case
/// when the local ingress pool contains an ingress message with the same content as the one in
/// the stripped block proposal but with a different signature.
#[tokio::test]
async fn roundtrip_test_with_two_identical_ingress_messages_different_signatures() {
let (ingress_1, _ingress_1_id) = fake_ingress_message_with_sig("fake_1", vec![1, 2, 3]);
let (ingress_2, _ingress_2_id) = fake_ingress_message_with_sig("fake_1", vec![2, 3, 4]);
assert_eq!(
IngressMessageId::from(&ingress_1),
IngressMessageId::from(&ingress_2)
);
let block_proposal = fake_block_proposal_with_ingresses(vec![ingress_2.clone()]);

let assembler = set_up_assembler_with_fake_dependencies(
/*ingress_pool_message=*/ Some(ingress_1.clone()),
/*consensus_pool_message=*/ Some(ingress_2.clone()),
);
let stripped_block_proposal =
assembler.disassemble_message(ConsensusMessage::BlockProposal(block_proposal.clone()));
let reassembled_block_proposal = assembler
.assemble_message(
stripped_block_proposal.id(),
Some((stripped_block_proposal, NODE_1)),
MockPeers(NODE_1),
)
.await
.expect("should reassemble the message given the dependencies");

assert_eq!(
reassembled_block_proposal,
(ConsensusMessage::BlockProposal(block_proposal), NODE_1)
);
}
}
Loading
Loading