diff --git a/Cargo.lock b/Cargo.lock index 7574ea76ac7..d4d274e7f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3558,6 +3558,8 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.8", + "strum 0.25.0", + "strum_macros 0.25.3", "thiserror", "tokio", "tracing", diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index fbc9f32afb4..872835f4da8 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -178,7 +178,7 @@ impl Bootstrap { if request_message == RequestMessage::TxPoolAllTransactionsIds { let _ = bootstrap.send_response_msg( request_id, - ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])), + ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])), ); } } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 5455e83c5f3..09662d26311 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -48,6 +48,8 @@ rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true } sha2 = "0.10" +strum = { workspace = true } +strum_macros = { workspace = true } thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2b689eb3949..9929dabbf46 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -112,15 +112,16 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = - core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); + let req_res_protocol = codec + .get_req_res_protocols() + .map(|protocol| (protocol, ProtocolSupport::Full)); let req_res_config = request_response::Config::default() .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec, + codec.clone(), req_res_protocol, req_res_config, ); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index c22aacd5671..283fef1ee6c 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -41,5 +41,7 @@ pub trait NetworkCodec: { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour - fn get_req_res_protocol(&self) -> ::Protocol; + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 94f23cd6fd2..4fea7484786 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -9,9 +9,11 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ + LegacyResponseMessage, RequestMessage, ResponseMessage, REQUEST_RESPONSE_PROTOCOL_ID, + REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID, }, }; use async_trait::async_trait; @@ -26,6 +28,8 @@ use serde::{ Serialize, }; use std::io; +use strum::IntoEnumIterator; +use strum_macros::EnumIter; /// Helper method for decoding data /// Reusable across `RequestResponseCodec` and `GossipsubCodec` @@ -69,13 +73,13 @@ impl PostcardCodec { /// run into a timeout waiting for the response. #[async_trait] impl request_response::Codec for PostcardCodec { - type Protocol = MessageExchangePostcardProtocol; + type Protocol = PostcardProtocol; type Request = RequestMessage; type Response = ResponseMessage; async fn read_request( &mut self, - _: &Self::Protocol, + _protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -91,7 +95,7 @@ impl request_response::Codec for PostcardCodec { async fn read_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -103,7 +107,13 @@ impl request_response::Codec for PostcardCodec { .read_to_end(&mut response) .await?; - deserialize(&response) + match protocol { + PostcardProtocol::V1 => { + let legacy_response = deserialize::(&response)?; + Ok(legacy_response.into()) + } + PostcardProtocol::V2 => deserialize::(&response), + } } async fn write_request( @@ -122,14 +132,20 @@ impl request_response::Codec for PostcardCodec { async fn write_response( &mut self, - _protocol: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, res: Self::Response, ) -> io::Result<()> where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = serialize(&res)?; + let encoded_data = match protocol { + PostcardProtocol::V1 => { + let legacy_response: LegacyResponseMessage = res.into(); + serialize(&legacy_response)? + } + PostcardProtocol::V2 => serialize(&res)?, + }; socket.write_all(&encoded_data).await?; Ok(()) } @@ -161,17 +177,31 @@ impl GossipsubCodec for PostcardCodec { } impl NetworkCodec for PostcardCodec { - fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocol {} + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + PostcardProtocol::iter() } } -#[derive(Default, Debug, Clone)] -pub struct MessageExchangePostcardProtocol; +#[derive(Debug, Clone, EnumIter)] +pub enum PostcardProtocol { + V1, + V2, +} -impl AsRef for MessageExchangePostcardProtocol { +impl Default for PostcardProtocol { + fn default() -> Self { + PostcardProtocol::V1 + } +} + +impl AsRef for PostcardProtocol { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID + match self { + PostcardProtocol::V1 => REQUEST_RESPONSE_PROTOCOL_ID, + PostcardProtocol::V2 => REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID, + } } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 024cc006785..556c7b1de29 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -675,7 +675,8 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { ResponseMessage::SealedHeaders(v) => { - c.send((peer, Ok(v))).is_ok() + // TODO[AC]: Change type of ResponseSender and remove the .ok() here + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -687,7 +688,7 @@ impl FuelP2PService { }, ResponseSender::Transactions(c) => match response { ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -699,7 +700,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -711,7 +712,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolFullTransactions(c) => match response { ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -1778,16 +1779,16 @@ mod tests { RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Ok(transactions))); } RequestMessage::TxPoolAllTransactionsIds => { let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Ok(tx_ids))); } RequestMessage::TxPoolFullTransactions(tx_ids) => { let txs = tx_ids.iter().enumerate().map(|(i, _)| { @@ -1797,7 +1798,7 @@ mod tests { Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) } }).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Ok(txs))); } } } @@ -1905,7 +1906,7 @@ mod tests { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event { let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers))); } tracing::info!("Node B Event: {:?}", node_b_event); diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 83f3f7a3a50..ef0d49b69a2 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -19,6 +19,8 @@ use thiserror::Error; use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID: &str = + "/fuel/req_res/0.0.2"; /// Max Size in Bytes of the Request Message #[cfg(test)] @@ -32,14 +34,77 @@ pub enum RequestMessage { TxPoolFullTransactions(Vec), } +// TODO: Do we want explicit status codes or an Error type? +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessageErrorCode { + /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` + #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] + ProtocolV1EmptyResponse = 0, +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ResponseMessage { +pub enum LegacyResponseMessage { SealedHeaders(Option>), Transactions(Option>), TxPoolAllTransactionsIds(Option>), TxPoolFullTransactions(Option>>), } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessage { + SealedHeaders(Result, ResponseMessageErrorCode>), + Transactions(Result, ResponseMessageErrorCode>), + TxPoolAllTransactionsIds(Result, ResponseMessageErrorCode>), + TxPoolFullTransactions( + Result>, ResponseMessageErrorCode>, + ), +} + +impl From for ResponseMessage { + fn from(v1_response: LegacyResponseMessage) -> Self { + match v1_response { + LegacyResponseMessage::SealedHeaders(sealed_headers) => { + ResponseMessage::SealedHeaders( + sealed_headers + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + LegacyResponseMessage::Transactions(vec) => ResponseMessage::Transactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ), + LegacyResponseMessage::TxPoolAllTransactionsIds(vec) => { + ResponseMessage::TxPoolAllTransactionsIds( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + LegacyResponseMessage::TxPoolFullTransactions(vec) => { + ResponseMessage::TxPoolFullTransactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + } + } +} + +impl From for LegacyResponseMessage { + fn from(response: ResponseMessage) -> Self { + match response { + ResponseMessage::SealedHeaders(sealed_headers) => { + LegacyResponseMessage::SealedHeaders(sealed_headers.ok()) + } + ResponseMessage::Transactions(transactions) => { + LegacyResponseMessage::Transactions(transactions.ok()) + } + ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => { + LegacyResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok()) + } + ResponseMessage::TxPoolFullTransactions(tx_pool) => { + LegacyResponseMessage::TxPoolFullTransactions(tx_pool.ok()) + } + } + } +} + pub type OnResponse = oneshot::Sender<(PeerId, Result)>; #[derive(Debug)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..22f7d50c778 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -23,6 +23,7 @@ use crate::{ OnResponse, RequestMessage, ResponseMessage, + ResponseMessageErrorCode, ResponseSender, }, }; @@ -136,19 +137,20 @@ pub enum TaskRequest { reporting_service: &'static str, }, DatabaseTransactionsLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, DatabaseHeaderLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolAllTransactionsIds { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolFullTransactions { - response: Option>>, + response: + Result>, ResponseMessageErrorCode>, request_id: InboundRequestId, }, } @@ -532,8 +534,11 @@ where where DbLookUpFn: Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, R: Send + 'static, { let instant = Instant::now(); @@ -550,7 +555,8 @@ where "Requested range is too big" ); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - let response = None; + // TODO[AC] Use more meaningful error codes + let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -564,17 +570,23 @@ where return; } - let response = db_lookup(&view, range.clone()).ok().flatten(); + // TODO[AC] Assign an error code to this + let response = db_lookup(&view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); + // TODO[AC]: Handle error cases and return meaningful status codes if result.is_err() { + let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(err)); } Ok(()) @@ -624,8 +636,11 @@ where task_request: TaskRequestFn, ) -> anyhow::Result<()> where - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, F: Future> + Send + 'static, { let instant = Instant::now(); @@ -644,14 +659,16 @@ where // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 let _ = response_channel - .try_send(task_request(Some(response), request_id)) + .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); }); if result.is_err() { + // TODO[AC]: return better error code + let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(res)); } Ok(()) @@ -680,10 +697,13 @@ where request_id: InboundRequestId, ) -> anyhow::Result<()> { // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO[AC] Use more meaningful error codes if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, - ResponseMessage::TxPoolFullTransactions(None), + ResponseMessage::TxPoolFullTransactions(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )), )?; return Ok(()); }