Skip to content

Commit

Permalink
refactor: request command use fallback neighbours request (#20)
Browse files Browse the repository at this point in the history
* refactor: added request from neighbours in request command

* wip: added request-response test

* fix: resolved test

* refactor: upd tests

* trying to fix compiler bug

* fix visibility in tests

* refactor: renamed OperatorLever

* refactor: moved handling of request-response message to separate function
  • Loading branch information
NikitaMasych authored Jan 21, 2025
1 parent 3af2daf commit d49ae98
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 164 deletions.
1 change: 1 addition & 0 deletions crates/db/src/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::warn;

use super::{DBResult, Repository, RepositoryError};

#[derive(Clone)]
pub struct AsyncDB {
pool: ThreadPool,
db: Arc<Db>,
Expand Down
38 changes: 5 additions & 33 deletions crates/p2p/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,20 @@
use bitcoin::hashes::sha256;
use libp2p::PeerId;
use prost::Message;
use strata_p2p_wire::p2p::v1::{GossipsubMsg, GossipsubMsgKind};
use strata_p2p_wire::p2p::v1::GossipsubMsg;

/// Events emitted from P2P to handle from operator side.
///
/// `DepositSetupPayload` is generic as some implementation details for different BitVM2
/// applications may vary. The only requirement for them to be decodable from bytes as protobuf
/// message.
#[derive(Clone, Debug)]
pub struct Event<DepositSetupPayload: Message + Clone> {
pub peer_id: PeerId,
pub kind: EventKind<DepositSetupPayload>,
}

impl<DSP: Message + Clone> Event<DSP> {
pub fn new(peer_id: PeerId, kind: EventKind<DSP>) -> Self {
Self { peer_id, kind }
}

pub fn scope(&self) -> Option<sha256::Hash> {
// TODD(Velnbur): when other tpes of event are added, remove this one:
#[allow(irrefutable_let_patterns)]
let EventKind::GossipsubMsg(GossipsubMsg { kind, .. }) = &self.kind
else {
return None;
};

match kind {
GossipsubMsgKind::Deposit { scope, .. } => Some(*scope),
_ => None,
}
}
}

#[derive(Debug, Clone)]
pub enum EventKind<DepositSetupPayload: Message + Clone> {
GossipsubMsg(GossipsubMsg<DepositSetupPayload>),
pub enum Event<DepositSetupPayload: Message + Clone> {
ReceivedMessage(GossipsubMsg<DepositSetupPayload>),
}

impl<DepositSetupPayload: Message + Clone> From<GossipsubMsg<DepositSetupPayload>>
for EventKind<DepositSetupPayload>
for Event<DepositSetupPayload>
{
fn from(v: GossipsubMsg<DepositSetupPayload>) -> Self {
Self::GossipsubMsg(v)
Self::ReceivedMessage(v)
}
}
123 changes: 87 additions & 36 deletions crates/p2p/src/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ use tokio::{
sync::{broadcast, mpsc},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument};

use crate::{
commands::Command,
events::{Event, EventKind},
};
use crate::{commands::Command, events::Event};

mod behavior;
mod codec;
Expand Down Expand Up @@ -330,7 +327,7 @@ where
// them something, knowing signer's (operator's) node peer id.
self.db.set_peer_for_signer_pubkey(&msg.key, source).await?;

let event = Event::new(source, EventKind::GossipsubMsg(msg));
let event = Event::from(msg);

let _ = self
.swarm
Expand Down Expand Up @@ -432,20 +429,32 @@ where
}
Command::RequestMessage(request) => {
let request_target_pubkey = request.operator_pubkey();
let distributor_peer_id = self

let maybe_distributor = self
.db
.get_peer_by_signer_pubkey(request_target_pubkey)
.await?;

let Some(distributor_peer_id) = distributor_peer_id else {
warn!("Tried to sent request for operator that has no corresponding peer");
return Ok(());
};
let request = request.into_msg();

if let Some(distributor_peer_id) = maybe_distributor {
if self.swarm.is_connected(&distributor_peer_id) {
self.swarm
.behaviour_mut()
.request_response
.send_request(&distributor_peer_id, request);
return Ok(());
} // TODO: try to establish connection?
}

let connected_peers = self.swarm.connected_peers().cloned().collect::<Vec<_>>();
for peer in connected_peers {
self.swarm
.behaviour_mut()
.request_response
.send_request(&peer, request.clone());
}

self.swarm
.behaviour_mut()
.request_response
.send_request(&distributor_peer_id, request.into_msg());
Ok(())
}
}
Expand All @@ -456,34 +465,68 @@ where
&mut self,
event: RequestResponseEvent<GetMessageRequest, GetMessageResponse>,
) -> P2PResult<()> {
if let RequestResponseEvent::InboundFailure {
peer,
request_id,
error,
} = event
{
debug!(%peer, %error, %request_id, "Failed to send response");
return Ok(());
match event {
RequestResponseEvent::Message { peer, message } => {
self.handle_message_event(peer, message).await?
}
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error,
} => {
debug!(%peer, %error, %request_id, "Outbound failure")
}
RequestResponseEvent::InboundFailure {
peer,
request_id,
error,
} => {
debug!(%peer, %error, %request_id, "Inbound failure")
}
RequestResponseEvent::ResponseSent { peer, request_id } => {
debug!(%peer, %request_id, "Response sent")
}
}
let RequestResponseEvent::Message { peer, message } = event else {
return Ok(());
};

match message {
Ok(())
}

async fn handle_message_event(
&mut self,
peer_id: PeerId,
msg: request_response::Message<GetMessageRequest, GetMessageResponse, GetMessageResponse>,
) -> P2PResult<()> {
match msg {
request_response::Message::Request {
request_id,
request,
channel,
} => {
let empty_response = GetMessageResponse { msg: vec![] };

let Some(req) = v1::GetMessageRequest::from_msg(request) else {
debug!(%peer, "Peer sent invalid get message request, disconnecting it");
let _ = self.swarm.disconnect_peer_id(peer);
debug!(%peer_id, "Peer sent invalid get message request, disconnecting it");
let _ = self.swarm.disconnect_peer_id(peer_id);
let _ = self
.swarm
.behaviour_mut()
.request_response
.send_response(channel, empty_response)
.inspect_err(|_| debug!("Failed to send response"));

return Ok(());
};

let Some(msg) = self.handle_get_message_request(req).await? else {
debug!(%request_id, "Have no needed data, requesting from neighbours");
return Ok(()); // TODO(NikitaMasych): launch recursive request.
debug!(%request_id, "Have no needed data");
let _ = self
.swarm
.behaviour_mut()
.request_response
.send_response(channel, empty_response)
.inspect_err(|_| debug!("Failed to send response"));

return Ok(());
};

let response = GetMessageResponse { msg: vec![msg] };
Expand All @@ -501,8 +544,8 @@ where
response,
} => {
if response.msg.is_empty() {
debug!(%request_id, "Have no needed data, requesting from neighbours");
return Ok(()); // TODO(NikitaMasych): launch recursive request.
debug!(%request_id, "Received empty response");
return Ok(());
}

for msg in response.msg.into_iter() {
Expand All @@ -514,12 +557,12 @@ where
let msg = match GossipsubMsg::from_proto(msg.clone()) {
Ok(msg) => msg,
Err(err) => {
debug!(%peer, reason=%err, "Peer sent invalid message");
debug!(%peer_id, reason=%err, "Peer sent invalid message");
continue;
}
};
if let Err(err) = self.validate_gossipsub_msg(&msg) {
debug!(%peer, reason=%err, "Message failed validation");
debug!(%peer_id, reason=%err, "Message failed validation");
continue;
}

Expand Down Expand Up @@ -602,7 +645,15 @@ where
}

async fn handle_get_message_response(&mut self, msg: GossipsubMsg<DSP>) -> P2PResult<()> {
self.add_msg_if_not_exists(&msg).await?;
let new_event = self.add_msg_if_not_exists(&msg).await?;

if new_event {
let event = Event::from(msg);

self.events
.send(event)
.map_err(|e| ProtocolError::EventsChannelClosed(e.into()))?;
}

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion crates/p2p/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Operator {
pub p2p: P2P<(), AsyncDB>,
pub handle: P2PHandle<()>,
pub kp: SecpKeypair,
pub db: AsyncDB,
}

impl Operator {
Expand All @@ -35,12 +36,13 @@ impl Operator {

let swarm = swarm::with_inmemory_transport(&config)?;
let db = AsyncDB::new(ThreadPool::new(1), Arc::new(db));
let (p2p, handle) = P2P::<(), AsyncDB>::from_config(config, cancel, db, swarm)?;
let (p2p, handle) = P2P::<(), AsyncDB>::from_config(config, cancel, db.clone(), swarm)?;

Ok(Self {
handle,
p2p,
kp: keypair,
db,
})
}
}
Loading

0 comments on commit d49ae98

Please sign in to comment.