diff --git a/CHANGELOG.md b/CHANGELOG.md index 17329a646aa..318f3d35c06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. - [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks - [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently. +- [2388](https://github.com/FuelLabs/fuel-core/pull/2388): Rework the P2P service codecs to avoid unnecessary coupling between components. The refactoring makes it explicit that the Gossipsub and RequestResponse codecs only share encoding/decoding functionalities from the Postcard codec. It also makes handling Gossipsub and RequestResponse messages completely independent of each other. #### Breaking - [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`. diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 3a5f6e78009..211df26030e 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -23,7 +23,10 @@ use fuel_core_chain_config::{ StateConfig, }; use fuel_core_p2p::{ - codecs::postcard::PostcardCodec, + codecs::{ + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, + }, network_service::FuelP2PService, p2p_service::FuelP2PEvent, request_response::messages::{ @@ -142,10 +145,18 @@ impl Bootstrap { /// Spawn a bootstrap node. pub async fn new(node_config: &Config) -> anyhow::Result { let bootstrap_config = extract_p2p_config(node_config).await; - let codec = PostcardCodec::new(bootstrap_config.max_block_size); + let request_response_codec = + RequestResponseMessageHandler::new(bootstrap_config.max_block_size); + let gossipsub_codec = GossipsubMessageHandler::new(); let (sender, _) = broadcast::channel(bootstrap_config.reserved_nodes.len().saturating_add(1)); - let mut bootstrap = FuelP2PService::new(sender, bootstrap_config, codec).await?; + let mut bootstrap = FuelP2PService::new( + sender, + bootstrap_config, + gossipsub_codec, + request_response_codec, + ) + .await?; bootstrap.start().await?; let listeners = bootstrap.multiaddrs(); diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 9fe58c5dec5..0e194c88442 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,7 +1,8 @@ use crate::{ codecs::{ postcard::PostcardCodec, - NetworkCodec, + request_response::RequestResponseMessageHandler, + RequestResponseProtocols, }, config::Config, discovery, @@ -59,11 +60,15 @@ pub struct FuelBehaviour { discovery: discovery::Behaviour, /// RequestResponse protocol - request_response: request_response::Behaviour, + request_response: + request_response::Behaviour>, } impl FuelBehaviour { - pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result { + pub(crate) fn new( + p2p_config: &Config, + request_response_codec: RequestResponseMessageHandler, + ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); @@ -110,7 +115,7 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = codec + let req_res_protocol = request_response_codec .get_req_res_protocols() .map(|protocol| (protocol, ProtocolSupport::Full)); @@ -119,7 +124,7 @@ impl FuelBehaviour { .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec.clone(), + request_response_codec.clone(), req_res_protocol, req_res_config, ); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 505cf40c9bf..d9b0e3444a7 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -1,18 +1,60 @@ +pub mod gossipsub; pub mod postcard; +pub mod request_response; -use crate::{ - gossipsub::messages::{ - GossipTopicTag, - GossipsubBroadcastRequest, - GossipsubMessage, - }, - request_response::messages::{ - RequestMessage, - V2ResponseMessage, - }, +use crate::gossipsub::messages::GossipTopicTag; +use libp2p::request_response as libp2p_request_response; + +use std::{ + borrow::Cow, + io, }; -use libp2p::request_response; -use std::io; + +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. +pub trait Encoder: Send { + /// Returns the serialized object as a slice. + fn as_bytes(&self) -> Cow<[u8]>; +} + +/// The trait encodes the type to the bytes and passes it to the `Encoder`, +/// which stores it and provides a reference to it. That allows gives more +/// flexibility and more performant encoding, allowing the use of slices and arrays +/// instead of vectors in some cases. Since the [`Encoder`] returns `Cow<[u8]>`, +/// it is always possible to take ownership of the serialized value. +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. +pub trait Encode { + type Error; + /// The encoder type that stores serialized object. + type Encoder<'a>: Encoder + where + T: 'a; + + /// Encodes the object to the bytes and passes it to the `Encoder`. + fn encode<'a>(&self, t: &'a T) -> Result, Self::Error>; +} + +/// The trait decodes the type from the bytes. +// TODO: https://github.com/FuelLabs/fuel-core/issues/2403 +// This trait is largely a copy-paste from the storage crate. +// It would be best to have this trait in a separate crate that both storage and p2p can depend on. +pub trait Decode { + type Error; + /// Decodes the type `T` from the bytes. + fn decode(&self, bytes: &[u8]) -> Result; +} + +impl<'a> Encoder for Cow<'a, [u8]> { + fn as_bytes(&self) -> Cow<'_, [u8]> { + match self { + Cow::Borrowed(borrowed) => Cow::Borrowed(borrowed), + Cow::Owned(owned) => Cow::Borrowed(owned.as_ref()), + } + } +} /// Implement this in order to handle serialization & deserialization of Gossipsub messages pub trait GossipsubCodec { @@ -28,22 +70,10 @@ pub trait GossipsubCodec { ) -> Result; } -// TODO: https://github.com/FuelLabs/fuel-core/issues/2368 -// Remove this trait -/// Main Codec trait -/// Needs to be implemented and provided to FuelBehaviour -pub trait NetworkCodec: - GossipsubCodec< - RequestMessage = GossipsubBroadcastRequest, - ResponseMessage = GossipsubMessage, - > + request_response::Codec - + Clone - + Send - + 'static -{ +pub trait RequestResponseProtocols: libp2p_request_response::Codec { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour fn get_req_res_protocols( &self, - ) -> impl Iterator::Protocol>; + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs new file mode 100644 index 00000000000..e5bab789134 --- /dev/null +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -0,0 +1,53 @@ +use std::io; + +use fuel_core_types::fuel_tx::Transaction; + +use crate::gossipsub::messages::{ + GossipTopicTag, + GossipsubBroadcastRequest, + GossipsubMessage, +}; + +use super::{ + Decode, + Encode, + Encoder, + GossipsubCodec, +}; + +#[derive(Debug, Clone, Default)] +pub struct GossipsubMessageHandler { + pub(crate) codec: Codec, +} + +impl GossipsubCodec for GossipsubMessageHandler +where + Codec: Encode + + Decode + + Send, +{ + type RequestMessage = GossipsubBroadcastRequest; + type ResponseMessage = GossipsubMessage; + + fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { + match data { + GossipsubBroadcastRequest::NewTx(tx) => { + Ok(self.codec.encode(&tx)?.as_bytes().into_owned()) + } + } + } + + fn decode( + &self, + encoded_data: &[u8], + gossipsub_tag: GossipTopicTag, + ) -> Result { + let decoded_response = match gossipsub_tag { + GossipTopicTag::NewTx => { + GossipsubMessage::NewTx(self.codec.decode(encoded_data)?) + } + }; + + Ok(decoded_response) + } +} diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index e2b7a97f9b1..615b2a4e0e8 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -1,206 +1,63 @@ use super::{ - GossipsubCodec, - NetworkCodec, + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, + Decode, + Encode, }; -use crate::{ - gossipsub::messages::{ - GossipTopicTag, - GossipsubBroadcastRequest, - GossipsubMessage, - }, - request_response::messages::{ - RequestMessage, - V1ResponseMessage, - V2ResponseMessage, - V1_REQUEST_RESPONSE_PROTOCOL_ID, - V2_REQUEST_RESPONSE_PROTOCOL_ID, - }, -}; -use async_trait::async_trait; -use futures::{ - AsyncRead, - AsyncReadExt, - AsyncWriteExt, -}; -use libp2p::request_response; -use serde::{ - Deserialize, - Serialize, -}; -use std::io; -use strum::IntoEnumIterator; -use strum_macros::EnumIter; - -/// Helper method for decoding data -/// Reusable across `RequestResponseCodec` and `GossipsubCodec` -fn deserialize<'a, R: Deserialize<'a>>(encoded_data: &'a [u8]) -> Result { - postcard::from_bytes(encoded_data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) -} -fn serialize(data: &D) -> Result, io::Error> { - postcard::to_stdvec(&data) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) -} +use std::{ + borrow::Cow, + io, +}; -#[derive(Debug, Clone)] -pub struct PostcardCodec { - /// Used for `max_size` parameter when reading Response Message - /// Necessary in order to avoid DoS attacks - /// Currently the size mostly depends on the max size of the Block - max_response_size: usize, -} +#[derive(Clone, Default)] +pub struct PostcardCodec; -impl PostcardCodec { +impl RequestResponseMessageHandler { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, - "PostcardCodec does not support zero block size" + "RequestResponseMessageHandler does not support zero block size" ); Self { + codec: PostcardCodec, max_response_size: max_block_size, } } } -/// Since Postcard does not support async reads or writes out of the box -/// We prefix Request & Response Messages with the length of the data in bytes -/// We expect the substream to be properly closed when response channel is dropped. -/// Since the request protocol used here expects a response, the sender considers this -/// early close as a protocol violation which results in the connection being closed. -/// If the substream was not properly closed when dropped, the sender would instead -/// run into a timeout waiting for the response. -#[async_trait] -impl request_response::Codec for PostcardCodec { - type Protocol = PostcardProtocol; - type Request = RequestMessage; - type Response = V2ResponseMessage; - - async fn read_request( - &mut self, - _protocol: &Self::Protocol, - socket: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut response = Vec::new(); - socket - .take(self.max_response_size as u64) - .read_to_end(&mut response) - .await?; - deserialize(&response) - } - - async fn read_response( - &mut self, - protocol: &Self::Protocol, - socket: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut response = Vec::new(); - socket - .take(self.max_response_size as u64) - .read_to_end(&mut response) - .await?; - - match protocol { - PostcardProtocol::V1 => { - let v1_response = deserialize::(&response)?; - Ok(v1_response.into()) - } - PostcardProtocol::V2 => deserialize::(&response), +impl GossipsubMessageHandler { + pub fn new() -> Self { + GossipsubMessageHandler { + codec: PostcardCodec, } } - - async fn write_request( - &mut self, - _protocol: &Self::Protocol, - socket: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: futures::AsyncWrite + Unpin + Send, - { - let encoded_data = serialize(&req)?; - socket.write_all(&encoded_data).await?; - Ok(()) - } - - async fn write_response( - &mut self, - protocol: &Self::Protocol, - socket: &mut T, - res: Self::Response, - ) -> io::Result<()> - where - T: futures::AsyncWrite + Unpin + Send, - { - let encoded_data = match protocol { - PostcardProtocol::V1 => { - let v1_response: V1ResponseMessage = res.into(); - serialize(&v1_response)? - } - PostcardProtocol::V2 => serialize(&res)?, - }; - socket.write_all(&encoded_data).await?; - Ok(()) - } } -impl GossipsubCodec for PostcardCodec { - type RequestMessage = GossipsubBroadcastRequest; - type ResponseMessage = GossipsubMessage; - - fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { - let encoded_data = match data { - GossipsubBroadcastRequest::NewTx(tx) => postcard::to_stdvec(&*tx), - }; - - encoded_data.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) - } - - fn decode( - &self, - encoded_data: &[u8], - gossipsub_tag: GossipTopicTag, - ) -> Result { - let decoded_response = match gossipsub_tag { - GossipTopicTag::NewTx => GossipsubMessage::NewTx(deserialize(encoded_data)?), - }; - - Ok(decoded_response) +impl Encode for PostcardCodec +where + T: ?Sized + serde::Serialize, +{ + type Encoder<'a> = Cow<'a, [u8]> where T: 'a; + type Error = io::Error; + + fn encode<'a>(&self, value: &'a T) -> Result, Self::Error> { + Ok(Cow::Owned(postcard::to_allocvec(value).map_err(|e| { + io::Error::new(io::ErrorKind::Other, e.to_string()) + })?)) } } -impl NetworkCodec for PostcardCodec { - fn get_req_res_protocols( - &self, - ) -> impl Iterator::Protocol> { - // TODO: https://github.com/FuelLabs/fuel-core/issues/2374 - // Iterating over versions in reverse order should prefer - // peers to use V2 over V1 for exchanging messages. However, this is - // not guaranteed by the specs for the `request_response` protocol, - // and it should be tested. - PostcardProtocol::iter().rev() - } -} +impl Decode for PostcardCodec +where + T: serde::de::DeserializeOwned, +{ + type Error = io::Error; -#[derive(Debug, Clone, EnumIter)] -pub enum PostcardProtocol { - V1, - V2, -} - -impl AsRef for PostcardProtocol { - fn as_ref(&self) -> &str { - match self { - PostcardProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, - PostcardProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, - } + fn decode(&self, bytes: &[u8]) -> Result { + postcard::from_bytes(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } } @@ -208,12 +65,21 @@ impl AsRef for PostcardProtocol { #[allow(non_snake_case)] mod tests { use fuel_core_types::blockchain::SealedBlockHeader; - use request_response::Codec as _; + use libp2p::request_response::Codec; use super::*; - use crate::request_response::messages::{ - ResponseMessageErrorCode, - MAX_REQUEST_SIZE, + use crate::{ + codecs::request_response::RequestResponseMessageHandler, + request_response::{ + messages::{ + RequestMessage, + ResponseMessageErrorCode, + V1ResponseMessage, + V2ResponseMessage, + MAX_REQUEST_SIZE, + }, + protocols::RequestResponseProtocol, + }, }; #[test] @@ -229,17 +95,18 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V2, &mut buf, response) + .write_response(&RequestResponseProtocol::V2, &mut buf, response) .await .expect("Valid Vec should be serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V2, &mut buf.as_slice()) .await .expect("Valid Vec should be deserialized using v1"); @@ -256,17 +123,18 @@ mod tests { // Given let sealed_block_headers = vec![SealedBlockHeader::default()]; let response = V2ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response) + .write_response(&RequestResponseProtocol::V1, &mut buf, response) .await .expect("Valid Vec should be serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec should be deserialized using v1"); @@ -283,17 +151,18 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V2, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V2, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V2, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); @@ -313,17 +182,18 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::RequestedRangeTooLarge, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V1, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); @@ -342,19 +212,20 @@ mod tests { let response = V2ResponseMessage::SealedHeaders(Err( ResponseMessageErrorCode::ProtocolV1EmptyResponse, )); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); let mut buf = Vec::with_capacity(1024); // When codec - .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .write_response(&RequestResponseProtocol::V1, &mut buf, response.clone()) .await .expect("Valid Vec is serialized using v1"); let deserialized_as_v1 = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `V1ResponseMessage` type. - deserialize::(&buf).expect("Deserialization as V1ResponseMessage should succeed"); + codec.codec.decode(&buf).expect("Deserialization as V1ResponseMessage should succeed"); // Then assert!(matches!( @@ -367,13 +238,16 @@ mod tests { async fn codec__read_response_is_backwards_compatible_with_v1() { // Given let response = V1ResponseMessage::SealedHeaders(None); - let mut codec = PostcardCodec::new(1024); + let mut codec: RequestResponseMessageHandler = + RequestResponseMessageHandler::new(1024); // When - let buf = serialize(&response) + let buf = codec + .codec + .encode(&response) .expect("Serialization as V1ResponseMessage should succeed"); let deserialized = codec - .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .read_response(&RequestResponseProtocol::V1, &mut &*buf) .await .expect("Valid Vec is deserialized using v1"); diff --git a/crates/services/p2p/src/codecs/request_response.rs b/crates/services/p2p/src/codecs/request_response.rs new file mode 100644 index 00000000000..5afb2c0557c --- /dev/null +++ b/crates/services/p2p/src/codecs/request_response.rs @@ -0,0 +1,153 @@ +use std::io; + +use crate::request_response::{ + messages::{ + RequestMessage, + V1ResponseMessage, + V2ResponseMessage, + }, + protocols::RequestResponseProtocol, +}; +use async_trait::async_trait; +use futures::{ + AsyncRead, + AsyncReadExt, + AsyncWriteExt, +}; +use libp2p::request_response; +use strum::IntoEnumIterator as _; + +use super::{ + Decode, + Encode, + Encoder, + RequestResponseProtocols, +}; + +#[derive(Debug, Clone)] +pub struct RequestResponseMessageHandler { + pub(crate) codec: Codec, + /// Used for `max_size` parameter when reading Response Message + /// Necessary in order to avoid DoS attacks + /// Currently the size mostly depends on the max size of the Block + // TODO: https://github.com/FuelLabs/fuel-core/issues/2459. + // Make this a u64 instead of usize. + pub(crate) max_response_size: usize, +} + +/// Since Postcard does not support async reads or writes out of the box +/// We prefix Request & Response Messages with the length of the data in bytes +/// We expect the substream to be properly closed when response channel is dropped. +/// Since the request protocol used here expects a response, the sender considers this +/// early close as a protocol violation which results in the connection being closed. +/// If the substream was not properly closed when dropped, the sender would instead +/// run into a timeout waiting for the response. +#[async_trait] +impl request_response::Codec for RequestResponseMessageHandler +where + Codec: Encode + + Decode + + Encode + + Decode + + Encode + + Decode + + Send, +{ + type Protocol = RequestResponseProtocol; + type Request = RequestMessage; + type Response = V2ResponseMessage; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + socket: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut response = Vec::new(); + socket + .take(self.max_response_size as u64) + .read_to_end(&mut response) + .await?; + self.codec.decode(&response) + } + + async fn read_response( + &mut self, + protocol: &Self::Protocol, + socket: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut response = Vec::new(); + socket + .take(self.max_response_size as u64) + .read_to_end(&mut response) + .await?; + + match protocol { + RequestResponseProtocol::V1 => { + let v1_response: V1ResponseMessage = self.codec.decode(&response)?; + Ok(v1_response.into()) + } + RequestResponseProtocol::V2 => self.codec.decode(&response), + } + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + socket: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: futures::AsyncWrite + Unpin + Send, + { + let encoded_data = self.codec.encode(&req)?; + socket.write_all(&encoded_data.as_bytes()).await?; + Ok(()) + } + + async fn write_response( + &mut self, + protocol: &Self::Protocol, + socket: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: futures::AsyncWrite + Unpin + Send, + { + match protocol { + RequestResponseProtocol::V1 => { + let v1_response: V1ResponseMessage = res.into(); + let res = self.codec.encode(&v1_response)?; + let res = res.as_bytes(); + socket.write_all(&res).await?; + } + RequestResponseProtocol::V2 => { + let res = self.codec.encode(&res)?; + let res = res.as_bytes(); + socket.write_all(&res).await?; + } + }; + + Ok(()) + } +} + +impl RequestResponseProtocols for Codec +where + Codec: request_response::Codec, +{ + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + // TODO: https://github.com/FuelLabs/fuel-core/issues/2458. + // Iterating over versions in reverse order should prefer + // peers to use V2 over V1 for exchanging messages. However, this is + // not guaranteed by the specs for the `request_response` protocol. + RequestResponseProtocol::iter().rev() + } +} diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 5de17783af6..e894549bebd 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,7 +4,9 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ + gossipsub::GossipsubMessageHandler, postcard::PostcardCodec, + request_response::RequestResponseMessageHandler, GossipsubCodec, }, config::{ @@ -121,8 +123,8 @@ pub struct FuelP2PService { /// to the peer that requested it. inbound_requests_table: HashMap>, - /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages - network_codec: PostcardCodec, + /// `UboundedCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages + gossipsub_codec: GossipsubMessageHandler, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -211,7 +213,8 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - codec: PostcardCodec, + gossipsub_codec: GossipsubMessageHandler, + request_response_codec: RequestResponseMessageHandler, ) -> anyhow::Result { let metrics = config.metrics; @@ -227,7 +230,7 @@ impl FuelP2PService { // configure and build P2P Service let (transport_function, connection_state) = build_transport_function(&config); let tcp_config = tcp::Config::new().port_reuse(true); - let behaviour = FuelBehaviour::new(&config, codec.clone())?; + let behaviour = FuelBehaviour::new(&config, request_response_codec)?; let swarm_builder = SwarmBuilder::with_existing_identity(config.keypair.clone()) .with_tokio() @@ -287,7 +290,7 @@ impl FuelP2PService { local_address: config.address, tcp_port: config.tcp_port, swarm, - network_codec: codec, + gossipsub_codec, outbound_requests_table: HashMap::default(), inbound_requests_table: HashMap::default(), network_metadata, @@ -381,7 +384,7 @@ impl FuelP2PService { .topics .get_gossipsub_topic_hash(&message); - match self.network_codec.encode(message) { + match self.gossipsub_codec.encode(message) { Ok(encoded_data) => self .swarm .behaviour_mut() @@ -594,7 +597,7 @@ impl FuelP2PService { message_id, } => { let correct_topic = self.get_topic_tag(&message.topic)?; - match self.network_codec.decode(&message.data, correct_topic) { + match self.gossipsub_codec.decode(&message.data, correct_topic) { Ok(decoded_message) => Some(FuelP2PEvent::GossipsubMessage { peer_id: propagation_source, message_id, @@ -847,7 +850,10 @@ mod tests { PublishError, }; use crate::{ - codecs::postcard::PostcardCodec, + codecs::{ + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, + }, config::Config, gossipsub::{ messages::{ @@ -929,10 +935,14 @@ mod tests { let (sender, _) = broadcast::channel(p2p_config.reserved_nodes.len().saturating_add(1)); - let mut service = - FuelP2PService::new(sender, p2p_config, PostcardCodec::new(max_block_size)) - .await - .unwrap(); + let mut service = FuelP2PService::new( + sender, + p2p_config, + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), + ) + .await + .unwrap(); service.start().await.unwrap(); service } @@ -1069,7 +1079,8 @@ mod tests { let mut service = FuelP2PService::new( sender, p2p_config, - PostcardCodec::new(max_block_size), + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), ) .await .unwrap(); diff --git a/crates/services/p2p/src/request_response.rs b/crates/services/p2p/src/request_response.rs index ba63992f3cb..c1ad1b047e6 100644 --- a/crates/services/p2p/src/request_response.rs +++ b/crates/services/p2p/src/request_response.rs @@ -1 +1,2 @@ pub mod messages; +pub mod protocols; diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 5a453b784cf..46fb6eeb35e 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -1,3 +1,4 @@ +use crate::service::TaskError; use fuel_core_types::{ blockchain::SealedBlockHeader, fuel_tx::TxId, @@ -18,11 +19,6 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -use crate::service::TaskError; - -pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; -pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; - /// Max Size in Bytes of the Request Message #[cfg(test)] pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs new file mode 100644 index 00000000000..1430f0f6f9e --- /dev/null +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -0,0 +1,20 @@ +use strum_macros::EnumIter; + +pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; + +#[derive(Debug, Default, Clone, EnumIter)] +pub enum RequestResponseProtocol { + #[default] + V1, + V2, +} + +impl AsRef for RequestResponseProtocol { + fn as_ref(&self) -> &str { + match self { + RequestResponseProtocol::V1 => V1_REQUEST_RESPONSE_PROTOCOL_ID, + RequestResponseProtocol::V2 => V2_REQUEST_RESPONSE_PROTOCOL_ID, + } + } +} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 5a12de61abf..6f62e1fc1db 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1,6 +1,9 @@ use crate::{ cached_view::CachedView, - codecs::postcard::PostcardCodec, + codecs::{ + gossipsub::GossipsubMessageHandler, + request_response::RequestResponseMessageHandler, + }, config::{ Config, NotInitialized, @@ -824,7 +827,8 @@ where let mut p2p_service = FuelP2PService::new( broadcast.reserved_peers_broadcast.clone(), config, - PostcardCodec::new(max_block_size), + GossipsubMessageHandler::new(), + RequestResponseMessageHandler::new(max_block_size), ) .await?; p2p_service.update_block_height(last_height);