Skip to content

Commit

Permalink
Add doc comments to some of the methods and remove redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
Velnbur committed Jan 10, 2025
1 parent 345e27b commit c2abc3a
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 59 deletions.
20 changes: 0 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/db/src/states.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
pub enum PeerDepositState {
/// State before sent setup for depostit.
PreSetup,
/// Peer sent setup for deposit.
Setup,
/// Peer sent nonces for deposit.
Nonces,
/// Peer sent signatures for deposit.
Sigs,
}
1 change: 0 additions & 1 deletion crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ libp2p = { version = "0.54.1", features = [
"tokio",
"secp256k1",
"macros",
"ping",
"yamux",
"identify",
] }
Expand Down
20 changes: 11 additions & 9 deletions crates/p2p/src/swarm/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::collections::HashSet;
use libp2p::{
allow_block_list::{AllowedPeers, Behaviour as AllowListBehaviour},
gossipsub::{
self, Behaviour as Gossipsub, Hasher, IdentityTransform, MessageAuthenticity,
self, Behaviour as Gossipsub, IdentityTransform, MessageAuthenticity,
WhitelistSubscriptionFilter,
},
identify::{Behaviour as Identify, Config},
identity::{secp256k1::Keypair, PublicKey},
ping::Behaviour as Ping,
request_response::{
Behaviour as RequestResponse, Config as RequestResponseConfig, ProtocolSupport,
},
Expand All @@ -17,16 +16,23 @@ use libp2p::{
};
use strata_p2p_wire::p2p::v1::proto::{GetMessageRequest, GetMessageResponse};

use super::{codec, hasher::Sha256Hasher, TOPIC};
use super::{codec, TOPIC};

/// Alias for request-response behaviour with messages serialized by using
/// our codec implementation.
pub type RequestResponseProtoBehaviour<Req, Resp> = RequestResponse<codec::Codec<Req, Resp>>;

/// Composite behaviour which consists of other ones used by swarm in P2P
/// implementation.
#[derive(NetworkBehaviour)]
pub struct Behaviour {
/// Gossipsub - pub/sub model for messages distribution.
pub gossipsub: Gossipsub<IdentityTransform, WhitelistSubscriptionFilter>,
pub ping: Ping,
/// Identification of peers, address to connect to, public keys, etc.
pub identify: Identify,
/// Request-response model for recursive discovery of lost or skipped info.
pub request_response: RequestResponseProtoBehaviour<GetMessageRequest, GetMessageResponse>,
/// Connect only allowed peers by peer id.
pub allow_list: AllowListBehaviour<AllowedPeers>,
}

Expand All @@ -38,7 +44,7 @@ impl Behaviour {
}

let mut filter = HashSet::new();
filter.insert(Sha256Hasher::hash(TOPIC.to_owned()));
filter.insert(TOPIC.hash());

Self {
identify: Identify::new(Config::new(
Expand All @@ -51,17 +57,13 @@ impl Behaviour {
)),
gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive)
.flood_publish(true)
.mesh_n_low(1)
.mesh_outbound_min(1)
.validate_messages()
.build()
.expect("gossipsub config at this stage must be valid"),
None,
WhitelistSubscriptionFilter(filter),
)
.unwrap(),
ping: Ping::default(),
request_response: RequestResponseProtoBehaviour::new(
[(StreamProtocol::new(protocol_name), ProtocolSupport::Full)],
RequestResponseConfig::default(),
Expand Down
5 changes: 5 additions & 0 deletions crates/p2p/src/swarm/codec.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! This module implement protobuf serialization/deserizliation codec for request-response
//! behaviour.
//!
//! Copied from `rust-libp2p/protocols/request-response/src/json.rs` and
//! rewritten using [`prost`] crate.
use std::{io, marker::PhantomData};

use async_trait::async_trait;
use futures::prelude::*;
use libp2p::swarm::StreamProtocol;

// NOTE(Velnbur): commit f096394 in rust-libp2p repo made this one
// configurable recently, so we way want too.
/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024;
/// Max response size in bytes
Expand Down
15 changes: 15 additions & 0 deletions crates/p2p/src/swarm/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Entity to control P2P implementation, spawned in another async task,
//! and listem to its events through channels.
use bitcoin::{hashes::sha256, OutPoint, XOnlyPublicKey};
use musig2::{PartialSignature, PubNonce};
use tokio::sync::{
Expand All @@ -7,6 +10,8 @@ use tokio::sync::{

use crate::{commands::Command, events::Event};

/// Handle to interact with P2P implementation spawned in another async
/// task. To create new one, use [`super::P2P::new_handle`].
#[derive(Debug)]
pub struct P2PHandle<DSP>
where
Expand All @@ -27,6 +32,8 @@ where
Self { events, commands }
}

/// Send command for P2P implementation to distribute genesis info across
/// network.
pub async fn send_genesis_info(
&self,
pre_stake_outpoint: OutPoint,
Expand All @@ -41,20 +48,26 @@ where
.await;
}

/// Send command for P2P implementation to distribute deposit setup
/// across network.
pub async fn send_deposit_setup(&self, scope: sha256::Hash, payload: DSP) {
let _ = self
.commands
.send(Command::SendDepositSetup { scope, payload })
.await;
}

/// Send command for P2P implementation to distribute deposit nonces
/// across network.
pub async fn send_deposit_nonces(&self, scope: sha256::Hash, pub_nonces: Vec<PubNonce>) {
let _ = self
.commands
.send(Command::SendDepositNonces { scope, pub_nonces })
.await;
}

/// Send command for P2P implementation to distribute sigs nonces across
/// network.
pub async fn send_deposit_sigs(
&self,
scope: sha256::Hash,
Expand All @@ -69,10 +82,12 @@ where
.await;
}

/// Get next event from P2P from events channel.
pub async fn next_event(&mut self) -> Result<Event<DSP>, RecvError> {
self.events.recv().await
}

/// Check event's channel is empty or not.
pub fn events_is_empty(&self) -> bool {
self.events.is_empty()
}
Expand Down
12 changes: 0 additions & 12 deletions crates/p2p/src/swarm/hasher.rs

This file was deleted.

43 changes: 26 additions & 17 deletions crates/p2p/src/swarm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, sync::LazyLock, time::Duration};

use behavior::{Behaviour, BehaviourEvent};
use bitcoin::hashes::Hash;
use futures::StreamExt as _;
use handle::P2PHandle;
use hasher::Sha256Hasher;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::MemoryTransport, ConnectedPoint},
gossipsub::{
Event as GossipsubEvent, Hasher as _, Message, MessageAcceptance, MessageId, Topic,
},
gossipsub::{Event as GossipsubEvent, Message, MessageAcceptance, MessageId, Sha256Topic},
identity::secp256k1::Keypair,
noise,
request_response::Event as RequestResponseEvent,
Expand Down Expand Up @@ -42,10 +39,13 @@ use crate::{
mod behavior;
mod codec;
pub mod handle;
mod hasher;

const TOPIC: &str = "bitvm2";
// TODO(Velnbur): make this configurable later
/// Global topic name for gossipsub messages.
static TOPIC: LazyLock<Sha256Topic> = LazyLock::new(|| Sha256Topic::new("bitvm2"));

// TODO(Velnbur): make this configurable later
/// Global name of the protocol
const PROTOCOL_NAME: &str = "/strata-bitvm2";

#[derive(Snafu, Debug)]
Expand All @@ -63,6 +63,7 @@ pub enum Error {

pub type P2PResult<T> = Result<T, Error>;

/// Configuration options for [`P2P`].
pub struct P2PConfig {
/// Duration which will be considered stale after last moment current operator received message
/// that advanced it's state.
Expand All @@ -83,6 +84,7 @@ pub struct P2PConfig {
pub connect_to: Vec<Multiaddr>,
}

/// Implementation of p2p protocol for BitVM2 data exchange.
pub struct P2P<DepositSetupPayload: ProtoMsg + Clone, Repository> {
swarm: Swarm<Behaviour>,

Expand All @@ -99,8 +101,8 @@ pub struct P2P<DepositSetupPayload: ProtoMsg + Clone, Repository> {
config: P2PConfig,
}

/// Alias for returning without dropping channel to P2P and P2P itself.
type P2PWithHandle<DSP, Repository> = (P2P<DSP, Repository>, P2PHandle<DSP>);
/// Alias for P2P and P2PHandle tuple returned by `from_config`.
pub type P2PWithHandle<DSP, Repository> = (P2P<DSP, Repository>, P2PHandle<DSP>);

impl<DSP, DB: RepositoryExt<DSP>> P2P<DSP, DB>
where
Expand Down Expand Up @@ -163,7 +165,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.subscribe(&Topic::<Sha256Hasher>::new(TOPIC))
.subscribe(&TOPIC)
.inspect_err(|err| error!(%err, "Failed to subscribe for events"));

let mut subscriptions = 0;
Expand Down Expand Up @@ -197,6 +199,11 @@ where
info!("Established all connections and subscriptions");
}

/// Start listening and handling events from the network and commands from
/// handles.
///
/// This method should be spawned in separate async task or polled periodicly
/// to advance handling of new messages, event or commands.
pub async fn listen(mut self) {
loop {
let result = select! {
Expand Down Expand Up @@ -258,6 +265,11 @@ where
}
}

/// Handle new message from gossipsub network.
///
/// If message is not [`GossipsubMsg`] or is not signed, the message will
/// be rejected without propagation, otherwise if we didn't handled it
/// before, send an [`Event`] to handles, store it and reset timeout.
#[instrument(skip(self, message), fields(sender = %message.source.unwrap()))]
async fn handle_gossip_msg(
&mut self,
Expand Down Expand Up @@ -288,7 +300,7 @@ where
.source
.expect("Message must have author as ValidationMode set to Permissive");
if let Err(err) = validate_gossipsub_msg(source, &msg) {
debug!(reason=%err, "Got invalid message from peer, rejecting it.");
debug!(reason=%err, "Got invalid signature from peer, rejecting message.");
// no error should appear in case of message rejection
let _ = self
.swarm
Expand Down Expand Up @@ -334,8 +346,8 @@ where
Ok(())
}

/// Insert data received from event and reset timeout for this peer and deposit if it wasn't
/// set before.
/// Insert data received from event and reset timeout for this peer and
/// deposit if it wasn't set before.
///
/// Returns if the data was already presented or not.
async fn insert_msg_if_not_exists_with_timeout(
Expand Down Expand Up @@ -469,10 +481,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.publish(
/* TODO(Velnbur): make this constant */ Sha256Hasher::hash(TOPIC.to_owned()),
msg.into_raw().encode_to_vec(),
)
.publish(TOPIC.hash(), msg.into_raw().encode_to_vec())
.inspect_err(|err| debug!(%err, "Failed to publish msg through gossipsub"));

Ok(())
Expand Down

0 comments on commit c2abc3a

Please sign in to comment.