From d49ae98cbc7ce063b198927210ec5abe2c37c201 Mon Sep 17 00:00:00 2001 From: Nikita Masych <92444221+NikitaMasych@users.noreply.github.com> Date: Tue, 21 Jan 2025 17:57:32 +0200 Subject: [PATCH] refactor: request command use fallback neighbours request (#20) * refactor: added request from neighbours in request command * wip: added request-response test * fix: resolved test * refactor: upd tests * trying to fix compiler bug * fix visibility in tests * refactor: renamed OperatorLever * refactor: moved handling of request-response message to separate function --- crates/db/src/sled.rs | 1 + crates/p2p/src/events.rs | 38 +---- crates/p2p/src/swarm/mod.rs | 123 +++++++++++----- crates/p2p/tests/common.rs | 4 +- crates/p2p/tests/gossipsub.rs | 267 ++++++++++++++++++++++------------ 5 files changed, 269 insertions(+), 164 deletions(-) diff --git a/crates/db/src/sled.rs b/crates/db/src/sled.rs index 434abdd..0ae72b9 100644 --- a/crates/db/src/sled.rs +++ b/crates/db/src/sled.rs @@ -8,6 +8,7 @@ use tracing::warn; use super::{DBResult, Repository, RepositoryError}; +#[derive(Clone)] pub struct AsyncDB { pool: ThreadPool, db: Arc, diff --git a/crates/p2p/src/events.rs b/crates/p2p/src/events.rs index 76216aa..82a4db2 100644 --- a/crates/p2p/src/events.rs +++ b/crates/p2p/src/events.rs @@ -1,7 +1,5 @@ -use bitcoin::hashes::sha256; -use libp2p::PeerId; use prost::Message; -use strata_p2p_wire::p2p::v1::{GossipsubMsg, GossipsubMsgKind}; +use strata_p2p_wire::p2p::v1::GossipsubMsg; /// Events emitted from P2P to handle from operator side. /// @@ -9,40 +7,14 @@ use strata_p2p_wire::p2p::v1::{GossipsubMsg, GossipsubMsgKind}; /// applications may vary. The only requirement for them to be decodable from bytes as protobuf /// message. #[derive(Clone, Debug)] -pub struct Event { - pub peer_id: PeerId, - pub kind: EventKind, -} - -impl Event { - pub fn new(peer_id: PeerId, kind: EventKind) -> Self { - Self { peer_id, kind } - } - - pub fn scope(&self) -> Option { - // TODD(Velnbur): when other tpes of event are added, remove this one: - #[allow(irrefutable_let_patterns)] - let EventKind::GossipsubMsg(GossipsubMsg { kind, .. }) = &self.kind - else { - return None; - }; - - match kind { - GossipsubMsgKind::Deposit { scope, .. } => Some(*scope), - _ => None, - } - } -} - -#[derive(Debug, Clone)] -pub enum EventKind { - GossipsubMsg(GossipsubMsg), +pub enum Event { + ReceivedMessage(GossipsubMsg), } impl From> - for EventKind + for Event { fn from(v: GossipsubMsg) -> Self { - Self::GossipsubMsg(v) + Self::ReceivedMessage(v) } } diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index fd81c9c..0bddefc 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -33,12 +33,9 @@ use tokio::{ sync::{broadcast, mpsc}, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument}; -use crate::{ - commands::Command, - events::{Event, EventKind}, -}; +use crate::{commands::Command, events::Event}; mod behavior; mod codec; @@ -330,7 +327,7 @@ where // them something, knowing signer's (operator's) node peer id. self.db.set_peer_for_signer_pubkey(&msg.key, source).await?; - let event = Event::new(source, EventKind::GossipsubMsg(msg)); + let event = Event::from(msg); let _ = self .swarm @@ -432,20 +429,32 @@ where } Command::RequestMessage(request) => { let request_target_pubkey = request.operator_pubkey(); - let distributor_peer_id = self + + let maybe_distributor = self .db .get_peer_by_signer_pubkey(request_target_pubkey) .await?; - let Some(distributor_peer_id) = distributor_peer_id else { - warn!("Tried to sent request for operator that has no corresponding peer"); - return Ok(()); - }; + let request = request.into_msg(); + + if let Some(distributor_peer_id) = maybe_distributor { + if self.swarm.is_connected(&distributor_peer_id) { + self.swarm + .behaviour_mut() + .request_response + .send_request(&distributor_peer_id, request); + return Ok(()); + } // TODO: try to establish connection? + } + + let connected_peers = self.swarm.connected_peers().cloned().collect::>(); + for peer in connected_peers { + self.swarm + .behaviour_mut() + .request_response + .send_request(&peer, request.clone()); + } - self.swarm - .behaviour_mut() - .request_response - .send_request(&distributor_peer_id, request.into_msg()); Ok(()) } } @@ -456,34 +465,68 @@ where &mut self, event: RequestResponseEvent, ) -> P2PResult<()> { - if let RequestResponseEvent::InboundFailure { - peer, - request_id, - error, - } = event - { - debug!(%peer, %error, %request_id, "Failed to send response"); - return Ok(()); + match event { + RequestResponseEvent::Message { peer, message } => { + self.handle_message_event(peer, message).await? + } + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error, + } => { + debug!(%peer, %error, %request_id, "Outbound failure") + } + RequestResponseEvent::InboundFailure { + peer, + request_id, + error, + } => { + debug!(%peer, %error, %request_id, "Inbound failure") + } + RequestResponseEvent::ResponseSent { peer, request_id } => { + debug!(%peer, %request_id, "Response sent") + } } - let RequestResponseEvent::Message { peer, message } = event else { - return Ok(()); - }; - match message { + Ok(()) + } + + async fn handle_message_event( + &mut self, + peer_id: PeerId, + msg: request_response::Message, + ) -> P2PResult<()> { + match msg { request_response::Message::Request { request_id, request, channel, } => { + let empty_response = GetMessageResponse { msg: vec![] }; + let Some(req) = v1::GetMessageRequest::from_msg(request) else { - debug!(%peer, "Peer sent invalid get message request, disconnecting it"); - let _ = self.swarm.disconnect_peer_id(peer); + debug!(%peer_id, "Peer sent invalid get message request, disconnecting it"); + let _ = self.swarm.disconnect_peer_id(peer_id); + let _ = self + .swarm + .behaviour_mut() + .request_response + .send_response(channel, empty_response) + .inspect_err(|_| debug!("Failed to send response")); + return Ok(()); }; let Some(msg) = self.handle_get_message_request(req).await? else { - debug!(%request_id, "Have no needed data, requesting from neighbours"); - return Ok(()); // TODO(NikitaMasych): launch recursive request. + debug!(%request_id, "Have no needed data"); + let _ = self + .swarm + .behaviour_mut() + .request_response + .send_response(channel, empty_response) + .inspect_err(|_| debug!("Failed to send response")); + + return Ok(()); }; let response = GetMessageResponse { msg: vec![msg] }; @@ -501,8 +544,8 @@ where response, } => { if response.msg.is_empty() { - debug!(%request_id, "Have no needed data, requesting from neighbours"); - return Ok(()); // TODO(NikitaMasych): launch recursive request. + debug!(%request_id, "Received empty response"); + return Ok(()); } for msg in response.msg.into_iter() { @@ -514,12 +557,12 @@ where let msg = match GossipsubMsg::from_proto(msg.clone()) { Ok(msg) => msg, Err(err) => { - debug!(%peer, reason=%err, "Peer sent invalid message"); + debug!(%peer_id, reason=%err, "Peer sent invalid message"); continue; } }; if let Err(err) = self.validate_gossipsub_msg(&msg) { - debug!(%peer, reason=%err, "Message failed validation"); + debug!(%peer_id, reason=%err, "Message failed validation"); continue; } @@ -602,7 +645,15 @@ where } async fn handle_get_message_response(&mut self, msg: GossipsubMsg) -> P2PResult<()> { - self.add_msg_if_not_exists(&msg).await?; + let new_event = self.add_msg_if_not_exists(&msg).await?; + + if new_event { + let event = Event::from(msg); + + self.events + .send(event) + .map_err(|e| ProtocolError::EventsChannelClosed(e.into()))?; + } Ok(()) } diff --git a/crates/p2p/tests/common.rs b/crates/p2p/tests/common.rs index 6aecc67..c4eb764 100644 --- a/crates/p2p/tests/common.rs +++ b/crates/p2p/tests/common.rs @@ -11,6 +11,7 @@ pub struct Operator { pub p2p: P2P<(), AsyncDB>, pub handle: P2PHandle<()>, pub kp: SecpKeypair, + pub db: AsyncDB, } impl Operator { @@ -35,12 +36,13 @@ impl Operator { let swarm = swarm::with_inmemory_transport(&config)?; let db = AsyncDB::new(ThreadPool::new(1), Arc::new(db)); - let (p2p, handle) = P2P::<(), AsyncDB>::from_config(config, cancel, db, swarm)?; + let (p2p, handle) = P2P::<(), AsyncDB>::from_config(config, cancel, db.clone(), swarm)?; Ok(Self { handle, p2p, kp: keypair, + db, }) } } diff --git a/crates/p2p/tests/gossipsub.rs b/crates/p2p/tests/gossipsub.rs index e48b984..1058769 100644 --- a/crates/p2p/tests/gossipsub.rs +++ b/crates/p2p/tests/gossipsub.rs @@ -12,20 +12,29 @@ use libp2p::{ }; use strata_p2p::{ commands::{Command, UnsignedPublishMessage}, - events::EventKind, + events::Event, swarm::handle::P2PHandle, }; +use strata_p2p_db::{sled::AsyncDB, GenesisInfoEntry, RepositoryExt}; use strata_p2p_types::OperatorPubKey; -use strata_p2p_wire::p2p::v1::{GossipsubMsg, GossipsubMsgDepositKind, GossipsubMsgKind}; +use strata_p2p_wire::p2p::v1::{GetMessageRequest, GossipsubMsgDepositKind, GossipsubMsgKind}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::info; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; mod common; +/// Auxiliary structure to control operators from outside. +struct OperatorHandle { + handle: P2PHandle<()>, + peer_id: PeerId, + kp: SecpKeypair, + db: AsyncDB, // We include DB here to manipulate internal data and flow mechanics. +} + struct Setup { cancel: CancellationToken, - operators: Vec<(P2PHandle<()>, PeerId, SecpKeypair)>, + operators: Vec, tasks: TaskTracker, } @@ -62,12 +71,12 @@ impl Setup { operators.push(operator); } - let (handles, tasks) = Self::start_operators(operators).await; + let (operators, tasks) = Self::start_operators(operators).await; Ok(Self { cancel, tasks, - operators: handles, + operators, }) } @@ -91,10 +100,8 @@ impl Setup { /// Wait until all operators established connections with other operators, /// and then spawn [`P2P::listen`]s in separate tasks using [`TaskTracker`]. - async fn start_operators( - mut operators: Vec, - ) -> (Vec<(P2PHandle<()>, PeerId, SecpKeypair)>, TaskTracker) { - // wait until all of of them established connections and subscriptions + async fn start_operators(mut operators: Vec) -> (Vec, TaskTracker) { + // wait until all of them established connections and subscriptions join_all( operators .iter_mut() @@ -103,15 +110,22 @@ impl Setup { ) .await; - let mut handles = Vec::new(); + let mut levers = Vec::new(); let tasks = TaskTracker::new(); for operator in operators { let peer_id = operator.p2p.local_peer_id(); tasks.spawn(operator.p2p.listen()); - handles.push((operator.handle, peer_id, operator.kp)); + + levers.push(OperatorHandle { + handle: operator.handle, + peer_id, + kp: operator.kp, + db: operator.db, + }); } + tasks.close(); - (handles, tasks) + (levers, tasks) } } @@ -144,6 +158,74 @@ async fn test_all_to_all_one_scope() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 5)] +async fn test_request_response() -> anyhow::Result<()> { + const OPERATORS_NUM: usize = 4; + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let Setup { + mut operators, + cancel, + tasks, + } = Setup::all_to_all(OPERATORS_NUM).await?; + + // last operator won't send his info to others + exchange_genesis_info(&mut operators[..OPERATORS_NUM - 1], OPERATORS_NUM - 1).await?; + + // create command to request info from the last operator + let operator_pk: OperatorPubKey = operators[OPERATORS_NUM - 1].kp.public().clone().into(); + let command = Command::<()>::RequestMessage(GetMessageRequest::Genesis { + operator_pk: operator_pk.clone(), + }); + + // put data in the last operator, so that he can respond it + match mock_genesis_info(&operators[OPERATORS_NUM - 1].kp.clone()) { + Command::PublishMessage(msg) => match msg.msg { + UnsignedPublishMessage::GenesisInfo { + pre_stake_outpoint, + checkpoint_pubkeys, + } => { + let entry = GenesisInfoEntry { + entry: (pre_stake_outpoint, checkpoint_pubkeys), + signature: msg.signature, + key: msg.key, + }; + >::set_genesis_info_if_not_exists::<'_, '_>( + &operators[OPERATORS_NUM - 1].db, + entry, + ) + .await?; + } + _ => unreachable!(), + }, + _ => unreachable!(), + } + + operators[0].handle.send_command(command).await; + + let event = operators[0].handle.next_event().await?; + + match event { + Event::ReceivedMessage(msg) + if matches!(msg.kind, GossipsubMsgKind::GenesisInfo(_)) && msg.key == operator_pk => + { + info!("Got genesis info from the last operator") + } + + _ => bail!("Got event other than 'genesis_info' - {:?}", event), + } + + cancel.cancel(); + + tasks.wait().await; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_all_to_all_multiple_scopes() -> anyhow::Result<()> { const OPERATORS_NUM: usize = 10; @@ -183,139 +265,136 @@ async fn test_all_to_all_multiple_scopes() -> anyhow::Result<()> { } async fn exchange_genesis_info( - operators: &mut [(P2PHandle<()>, PeerId, SecpKeypair)], + operators: &mut [OperatorHandle], operators_num: usize, ) -> anyhow::Result<()> { - for (operator, _, kp) in operators.iter() { - operator.send_command(mock_genesis_info(kp)).await; + for operator in operators.iter() { + operator + .handle + .send_command(mock_genesis_info(&operator.kp)) + .await; } - for (operator, peer_id, _) in operators.iter_mut() { + for operator in operators.iter_mut() { // received genesis info from other n-1 operators for _ in 0..operators_num - 1 { - let event = operator.next_event().await.unwrap(); - - if !matches!( - event.kind, - EventKind::GossipsubMsg(GossipsubMsg { - kind: GossipsubMsgKind::GenesisInfo(_), - .. - }) - ) { - bail!("Got event other than 'genesis_info' - {:?}", event); + let event = operator.handle.next_event().await?; + + match event { + Event::ReceivedMessage(msg) + if matches!(msg.kind, GossipsubMsgKind::GenesisInfo(_)) => + { + info!(to=%operator.peer_id, "Got genesis info") + } + + _ => bail!("Got event other than 'genesis_info' - {:?}", event), } - info!(to=%peer_id, from=%event.peer_id, "Got genesis info"); } - assert!(operator.events_is_empty()); + + assert!(operator.handle.events_is_empty()); } Ok(()) } async fn exchange_deposit_setup( - operators: &mut [(P2PHandle<()>, PeerId, SecpKeypair)], + operators: &mut [OperatorHandle], operators_num: usize, scope_hash: sha256::Hash, ) -> anyhow::Result<()> { - for (operator, _, kp) in operators.iter() { + for operator in operators.iter() { operator - .send_command(mock_deposit_setup(kp, scope_hash)) + .handle + .send_command(mock_deposit_setup(&operator.kp, scope_hash)) .await; } - for (operator, peer_id, _) in operators.iter_mut() { + for operator in operators.iter_mut() { for _ in 0..operators_num - 1 { - let event = operator.next_event().await.unwrap(); - // Skip messages from other scopes. - if matches!(event.scope(), Some(scope) if scope != scope_hash) { - continue; - } - if !matches!( - event.kind, - EventKind::GossipsubMsg(GossipsubMsg { - kind: GossipsubMsgKind::Deposit { - kind: GossipsubMsgDepositKind::Setup(_), - .. - }, - .. - }) - ) { - bail!("Got event other than 'deposit_setup' - {:?}", event); + let event = operator.handle.next_event().await?; + match event { + Event::ReceivedMessage(msg) + if matches!( + msg.kind, + GossipsubMsgKind::Deposit { + kind: GossipsubMsgDepositKind::Setup(_), + .. + }, + ) => + { + info!(to=%operator.peer_id, "Got deposit setup") + } + _ => bail!("Got event other than 'deposit_setup' - {:?}", event), } - info!(to=%peer_id, from=%event.peer_id, "Got deposit setup"); } - assert!(operator.events_is_empty()); + assert!(operator.handle.events_is_empty()); } Ok(()) } async fn exchange_deposit_nonces( - operators: &mut [(P2PHandle<()>, PeerId, SecpKeypair)], + operators: &mut [OperatorHandle], operators_num: usize, scope_hash: sha256::Hash, ) -> anyhow::Result<()> { - for (operator, _, kp) in operators.iter() { + for operator in operators.iter() { operator - .send_command(mock_deposit_nonces(kp, scope_hash)) + .handle + .send_command(mock_deposit_nonces(&operator.kp, scope_hash)) .await; } - for (operator, peer_id, _) in operators.iter_mut() { + for operator in operators.iter_mut() { for _ in 0..operators_num - 1 { - let event = operator.next_event().await.unwrap(); - // Skip messages from other scopes. - if matches!(event.scope(), Some(scope) if scope != scope_hash) { - continue; + let event = operator.handle.next_event().await?; + match event { + Event::ReceivedMessage(msg) + if matches!( + msg.kind, + GossipsubMsgKind::Deposit { + kind: GossipsubMsgDepositKind::Nonces(_), + .. + }, + ) => + { + info!(to=%operator.peer_id, "Got deposit nonces") + } + _ => bail!("Got event other than 'deposit_nonces' - {:?}", event), } - if !matches!( - event.kind, - EventKind::GossipsubMsg(GossipsubMsg { - kind: GossipsubMsgKind::Deposit { - kind: GossipsubMsgDepositKind::Nonces(_), - .. - }, - .. - }) - ) { - bail!("Got event other than 'deposit_nonces' - {:?}", event); - } - info!(to=%peer_id, from=%event.peer_id, "Got deposit nonces"); } - assert!(operator.events_is_empty()); + assert!(operator.handle.events_is_empty()); } Ok(()) } async fn exchange_deposit_sigs( - operators: &mut [(P2PHandle<()>, PeerId, SecpKeypair)], + operators: &mut [OperatorHandle], operators_num: usize, scope_hash: sha256::Hash, ) -> anyhow::Result<()> { - for (operator, _, kp) in operators.iter() { + for operator in operators.iter() { operator - .send_command(mock_deposit_sigs(kp, scope_hash)) + .handle + .send_command(mock_deposit_sigs(&operator.kp, scope_hash)) .await; } - for (operator, peer_id, _) in operators.iter_mut() { + for operator in operators.iter_mut() { for _ in 0..operators_num - 1 { - let event = operator.next_event().await.unwrap(); - // Skip messages from other scopes. - if matches!(event.scope(), Some(scope) if scope != scope_hash) { - continue; - } - if !matches!( - event.kind, - EventKind::GossipsubMsg(GossipsubMsg { - kind: GossipsubMsgKind::Deposit { - kind: GossipsubMsgDepositKind::Sigs(_), - .. - }, - .. - }) - ) { - bail!("Got event other than 'deposit_sigs' - {:?}", event); + let event = operator.handle.next_event().await?; + match event { + Event::ReceivedMessage(msg) + if matches!( + msg.kind, + GossipsubMsgKind::Deposit { + kind: GossipsubMsgDepositKind::Sigs(_), + .. + }, + ) => + { + info!(to=%operator.peer_id, "Got deposit sigs") + } + _ => bail!("Got event other than 'deposit_sigs' - {:?}", event), } - info!(to=%peer_id, from=%event.peer_id, "Got deposit sigs"); } - assert!(operator.events_is_empty()); + assert!(operator.handle.events_is_empty()); } Ok(())