From a044b88a43103943ac59bbb6808770a61facfc6d Mon Sep 17 00:00:00 2001 From: StemCll Date: Fri, 20 Oct 2023 09:20:10 +0200 Subject: [PATCH] feat(gossipsub): remove `fast_message_id_fn` Resolves #4138. Pull-Request: #4285. --- protocols/gossipsub/CHANGELOG.md | 2 + protocols/gossipsub/src/behaviour.rs | 58 +++------------ protocols/gossipsub/src/behaviour/tests.rs | 87 +--------------------- protocols/gossipsub/src/config.rs | 34 +-------- protocols/gossipsub/src/lib.rs | 2 +- protocols/gossipsub/src/time_cache.rs | 10 --- protocols/gossipsub/src/types.rs | 60 ++++++--------- 7 files changed, 36 insertions(+), 217 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 88ea170a885..b86ec4de6d4 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,7 @@ ## 0.46.0 - unreleased +- Remove `fast_message_id_fn` mechanism from `Config`. + See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285). - Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`. See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642). - Return typed error from config builder. diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index d20fb564ad2..69fa36b002f 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -55,12 +55,12 @@ use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; -use crate::time_cache::{DuplicateCache, TimeCache}; +use crate::time_cache::DuplicateCache; use crate::topic::{Hasher, Topic, TopicHash}; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::{ - ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, - Subscription, SubscriptionAction, + ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, + SubscriptionAction, }; use crate::types::{PeerConnections, PeerKind, Rpc}; use crate::{rpc_proto::proto, TopicScoreParams}; @@ -323,9 +323,6 @@ pub struct Behaviour { /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, - /// Short term cache for fast message ids mapping them to the real message ids - fast_message_id_cache: TimeCache, - /// The filter used to handle message subscriptions. subscription_filter: F, @@ -446,7 +443,6 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), peer_topics: HashMap::new(), explicit_peers: HashSet::new(), @@ -1755,31 +1751,6 @@ where metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len()); } - let fast_message_id = self.config.fast_message_id(&raw_message); - - if let Some(fast_message_id) = fast_message_id.as_ref() { - if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) { - let msg_id = msg_id.clone(); - // Report the duplicate - if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.duplicated_message( - propagation_source, - &msg_id, - &raw_message.topic, - ); - } - // Update the cache, informing that we have received a duplicate from another peer. - // The peers in this cache are used to prevent us forwarding redundant messages onto - // these peers. - self.mcache.observe_duplicate(&msg_id, propagation_source); - } - - // This message has been seen previously. Ignore it - return; - } - } - // Try and perform the data transform to the message. If it fails, consider it invalid. let message = match self.data_transform.inbound_transform(raw_message.clone()) { Ok(message) => message, @@ -1805,14 +1776,6 @@ where return; } - // Add the message to the duplicate caches - if let Some(fast_message_id) = fast_message_id { - // add id to cache - self.fast_message_id_cache - .entry(fast_message_id) - .or_insert_with(|| msg_id.clone()); - } - if !self.duplicate_cache.insert(msg_id.clone()) { debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { @@ -1887,20 +1850,17 @@ where metrics.register_invalid_message(&raw_message.topic); } - let fast_message_id_cache = &self.fast_message_id_cache; + if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) { + let message_id = self.config.message_id(&message); - if let Some(msg_id) = self - .config - .fast_message_id(raw_message) - .and_then(|id| fast_message_id_cache.get(&id)) - { peer_score.reject_message( propagation_source, - msg_id, - &raw_message.topic, + &message_id, + &message.topic, reject_reason, ); - gossip_promises.reject_message(msg_id, &reject_reason); + + gossip_promises.reject_message(&message_id, &reject_reason); } else { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index e8643294162..46415288d9b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -24,17 +24,12 @@ use super::*; use crate::protocol::ProtocolConfig; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; -use crate::types::FastMessageId; use crate::ValidationError; -use crate::{ - config::Config, config::ConfigBuilder, IdentTopic as Topic, Message, TopicScoreParams, -}; +use crate::{config::Config, config::ConfigBuilder, IdentTopic as Topic, TopicScoreParams}; use async_std::net::Ipv4Addr; use byteorder::{BigEndian, ByteOrder}; use libp2p_core::{ConnectedPoint, Endpoint}; use rand::Rng; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::thread::sleep; use std::time::Duration; @@ -5064,86 +5059,6 @@ fn test_public_api() { ); } -#[test] -fn test_msg_id_fn_only_called_once_with_fast_message_ids() { - struct Pointers { - slow_counter: u32, - fast_counter: u32, - } - - let mut counters = Pointers { - slow_counter: 0, - fast_counter: 0, - }; - - let counters_pointer: *mut Pointers = &mut counters; - - let counters_address = counters_pointer as u64; - - macro_rules! get_counters_pointer { - ($m: expr) => {{ - let mut address_bytes: [u8; 8] = Default::default(); - address_bytes.copy_from_slice($m.as_slice()); - let address = u64::from_be_bytes(address_bytes); - address as *mut Pointers - }}; - } - - macro_rules! get_counters_and_hash { - ($m: expr) => {{ - let mut hasher = DefaultHasher::new(); - $m.hash(&mut hasher); - let id = hasher.finish().to_be_bytes().into(); - (id, get_counters_pointer!($m)) - }}; - } - - let message_id_fn = |m: &Message| -> MessageId { - let (mut id, counters_pointer): (MessageId, *mut Pointers) = - get_counters_and_hash!(&m.data); - unsafe { - (*counters_pointer).slow_counter += 1; - } - id.0.reverse(); - id - }; - let fast_message_id_fn = |m: &RawMessage| -> FastMessageId { - let (id, counters_pointer) = get_counters_and_hash!(&m.data); - unsafe { - (*counters_pointer).fast_counter += 1; - } - id - }; - let config = ConfigBuilder::default() - .message_id_fn(message_id_fn) - .fast_message_id_fn(fast_message_id_fn) - .build() - .unwrap(); - let (mut gs, _, topic_hashes) = inject_nodes1() - .peer_no(0) - .topics(vec![String::from("topic1")]) - .to_subscribe(true) - .gs_config(config) - .create_network(); - - let message = RawMessage { - source: None, - data: counters_address.to_be_bytes().to_vec(), - sequence_number: None, - topic: topic_hashes[0].clone(), - signature: None, - key: None, - validated: true, - }; - - for _ in 0..5 { - gs.handle_received_message(message.clone(), &PeerId::random()); - } - - assert_eq!(counters.fast_counter, 5); - assert_eq!(counters.slow_counter, 1); -} - #[test] fn test_subscribe_to_invalid_topic() { let t1 = Topic::new("t1"); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 30fd9343722..7e79912cc4a 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -24,7 +24,7 @@ use std::time::Duration; use crate::error::ConfigBuilderError; use crate::protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL}; -use crate::types::{FastMessageId, Message, MessageId, PeerKind, RawMessage}; +use crate::types::{Message, MessageId, PeerKind}; use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; @@ -78,7 +78,6 @@ pub struct Config { duplicate_cache_time: Duration, validate_messages: bool, message_id_fn: Arc MessageId + Send + Sync + 'static>, - fast_message_id_fn: Option FastMessageId + Send + Sync + 'static>>, allow_self_origin: bool, do_px: bool, prune_peers: usize, @@ -218,20 +217,6 @@ impl Config { (self.message_id_fn)(message) } - /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawMessage`] to - /// [`Message`] for duplicates. Two semantically different messages must always - /// have different fast message ids, but it is allowed that two semantically identical messages - /// have different fast message ids as long as the message_id_fn produces the same id for them. - /// - /// The function takes a [`RawMessage`] as input and outputs a String to be - /// interpreted as the fast message id. Default is None. - pub fn fast_message_id(&self, message: &RawMessage) -> Option { - self.fast_message_id_fn - .as_ref() - .map(|fast_message_id_fn| fast_message_id_fn(message)) - } - /// By default, gossipsub will reject messages that are sent to us that have the same message /// source as we have specified locally. Enabling this, allows these messages and prevents /// penalizing the peer that sent us the message. Default is false. @@ -415,7 +400,6 @@ impl Default for ConfigBuilder { .push_str(&message.sequence_number.unwrap_or_default().to_string()); MessageId::from(source_string) }), - fast_message_id_fn: None, allow_self_origin: false, do_px: false, prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented. @@ -634,22 +618,6 @@ impl ConfigBuilder { self } - /// A user-defined optional function that computes fast ids from raw messages. This can be used - /// to avoid possibly expensive transformations from [`RawMessage`] to - /// [`Message`] for duplicates. Two semantically different messages must always - /// have different fast message ids, but it is allowed that two semantically identical messages - /// have different fast message ids as long as the message_id_fn produces the same id for them. - /// - /// The function takes a [`Message`] as input and outputs a String to be interpreted - /// as the fast message id. Default is None. - pub fn fast_message_id_fn(&mut self, fast_id_fn: F) -> &mut Self - where - F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static, - { - self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn)); - self - } - /// Enables Peer eXchange. This should be enabled in bootstrappers and other well /// connected/trusted nodes. The default is false. /// diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index fdc5c05ac76..1d70febf296 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -171,7 +171,7 @@ pub use self::subscription_filter::{ }; pub use self::topic::{Hasher, Topic, TopicHash}; pub use self::transform::{DataTransform, IdentityTransform}; -pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc}; +pub use self::types::{Message, MessageAcceptance, MessageId, RawMessage, Rpc}; pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index ffc95a474f4..89fd4afee09 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -93,12 +93,6 @@ impl<'a, K: 'a, V: 'a> Entry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { - pub(crate) fn or_insert_with V>(self, default: F) -> &'a mut V { - match self { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => entry.insert(default()), - } - } pub(crate) fn or_default(self) -> &'a mut V where V: Default, @@ -159,10 +153,6 @@ where pub(crate) fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) } - - pub(crate) fn get(&self, key: &Key) -> Option<&Value> { - self.map.get(key).map(|e| &e.element) - } } pub(crate) struct DuplicateCache(TimeCache); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f1865635454..196468b8d32 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -43,49 +43,33 @@ pub enum MessageAcceptance { Ignore, } -/// Macro for declaring message id types -macro_rules! declare_message_id_type { - ($name: ident, $name_string: expr) => { - #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] - #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] - pub struct $name(pub Vec); - - impl $name { - pub fn new(value: &[u8]) -> Self { - Self(value.to_vec()) - } - } - - impl>> From for $name { - fn from(value: T) -> Self { - Self(value.into()) - } - } +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MessageId(pub Vec); - impl std::fmt::Display for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", hex_fmt::HexFmt(&self.0)) - } - } +impl MessageId { + pub fn new(value: &[u8]) -> Self { + Self(value.to_vec()) + } +} - impl std::fmt::Debug for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0)) - } - } - }; +impl>> From for MessageId { + fn from(value: T) -> Self { + Self(value.into()) + } } -// A type for gossipsub message ids. -declare_message_id_type!(MessageId, "MessageId"); +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_fmt::HexFmt(&self.0)) + } +} -// A type for gossipsub fast messsage ids, not to confuse with "real" message ids. -// -// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On -// high intensive networks with lots of messages, where the message_id is based on the result of -// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and -// filter duplicates quickly without performing the overhead of decompression. -declare_message_id_type!(FastMessageId, "FastMessageId"); +impl std::fmt::Debug for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0)) + } +} #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct PeerConnections {