Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add doc comments to some of the methods and remove redundant code #13

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/db/src/states.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
pub enum PeerDepositState {
/// State before sent setup for depostit.
PreSetup,
/// Peer sent setup for deposit.
Setup,
/// Peer sent nonces for deposit.
Nonces,
/// Peer sent signatures for deposit.
Sigs,
}
1 change: 0 additions & 1 deletion crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ libp2p = { version = "0.54.1", features = [
"tokio",
"secp256k1",
"macros",
"ping",
"yamux",
"identify",
] }
Expand Down
20 changes: 11 additions & 9 deletions crates/p2p/src/swarm/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::collections::HashSet;
use libp2p::{
allow_block_list::{AllowedPeers, Behaviour as AllowListBehaviour},
gossipsub::{
self, Behaviour as Gossipsub, Hasher, IdentityTransform, MessageAuthenticity,
self, Behaviour as Gossipsub, IdentityTransform, MessageAuthenticity,
WhitelistSubscriptionFilter,
},
identify::{Behaviour as Identify, Config},
identity::{secp256k1::Keypair, PublicKey},
ping::Behaviour as Ping,
request_response::{
Behaviour as RequestResponse, Config as RequestResponseConfig, ProtocolSupport,
},
Expand All @@ -17,16 +16,23 @@ use libp2p::{
};
use strata_p2p_wire::p2p::v1::proto::{GetMessageRequest, GetMessageResponse};

use super::{codec, hasher::Sha256Hasher, TOPIC};
use super::{codec, TOPIC};

/// Alias for request-response behaviour with messages serialized by using
/// our codec implementation.
pub type RequestResponseProtoBehaviour<Req, Resp> = RequestResponse<codec::Codec<Req, Resp>>;

/// Composite behaviour which consists of other ones used by swarm in P2P
/// implementation.
#[derive(NetworkBehaviour)]
pub struct Behaviour {
/// Gossipsub - pub/sub model for messages distribution.
pub gossipsub: Gossipsub<IdentityTransform, WhitelistSubscriptionFilter>,
pub ping: Ping,
/// Identification of peers, address to connect to, public keys, etc.
pub identify: Identify,
/// Request-response model for recursive discovery of lost or skipped info.
pub request_response: RequestResponseProtoBehaviour<GetMessageRequest, GetMessageResponse>,
/// Connect only allowed peers by peer id.
pub allow_list: AllowListBehaviour<AllowedPeers>,
}

Expand All @@ -38,7 +44,7 @@ impl Behaviour {
}

let mut filter = HashSet::new();
filter.insert(Sha256Hasher::hash(TOPIC.to_owned()));
filter.insert(TOPIC.hash());

Self {
identify: Identify::new(Config::new(
Expand All @@ -51,17 +57,13 @@ impl Behaviour {
)),
gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Permissive)
.flood_publish(true)
.mesh_n_low(1)
.mesh_outbound_min(1)
.validate_messages()
.build()
.expect("gossipsub config at this stage must be valid"),
None,
WhitelistSubscriptionFilter(filter),
)
.unwrap(),
ping: Ping::default(),
request_response: RequestResponseProtoBehaviour::new(
[(StreamProtocol::new(protocol_name), ProtocolSupport::Full)],
RequestResponseConfig::default(),
Expand Down
5 changes: 5 additions & 0 deletions crates/p2p/src/swarm/codec.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! This module implement protobuf serialization/deserizliation codec for request-response
//! behaviour.
//!
//! Copied from `rust-libp2p/protocols/request-response/src/json.rs` and
//! rewritten using [`prost`] crate.

use std::{io, marker::PhantomData};

use async_trait::async_trait;
use futures::prelude::*;
use libp2p::swarm::StreamProtocol;

// NOTE(Velnbur): commit f096394 in rust-libp2p repo made this one
// configurable recently, so we way want too.
/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024;
/// Max response size in bytes
Expand Down
15 changes: 15 additions & 0 deletions crates/p2p/src/swarm/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Entity to control P2P implementation, spawned in another async task,
//! and listem to its events through channels.

use bitcoin::{hashes::sha256, OutPoint, XOnlyPublicKey};
use musig2::{PartialSignature, PubNonce};
use tokio::sync::{
Expand All @@ -7,6 +10,8 @@ use tokio::sync::{

use crate::{commands::Command, events::Event};

/// Handle to interact with P2P implementation spawned in another async
/// task. To create new one, use [`super::P2P::new_handle`].
#[derive(Debug)]
pub struct P2PHandle<DSP>
where
Expand All @@ -27,6 +32,8 @@ where
Self { events, commands }
}

/// Send command for P2P implementation to distribute genesis info across
/// network.
pub async fn send_genesis_info(
&self,
pre_stake_outpoint: OutPoint,
Expand All @@ -41,20 +48,26 @@ where
.await;
}

/// Send command for P2P implementation to distribute deposit setup
/// across network.
pub async fn send_deposit_setup(&self, scope: sha256::Hash, payload: DSP) {
let _ = self
.commands
.send(Command::SendDepositSetup { scope, payload })
.await;
}

/// Send command for P2P implementation to distribute deposit nonces
/// across network.
pub async fn send_deposit_nonces(&self, scope: sha256::Hash, pub_nonces: Vec<PubNonce>) {
let _ = self
.commands
.send(Command::SendDepositNonces { scope, pub_nonces })
.await;
}

/// Send command for P2P implementation to distribute sigs nonces across
/// network.
pub async fn send_deposit_sigs(
&self,
scope: sha256::Hash,
Expand All @@ -69,10 +82,12 @@ where
.await;
}

/// Get next event from P2P from events channel.
pub async fn next_event(&mut self) -> Result<Event<DSP>, RecvError> {
self.events.recv().await
}

/// Check event's channel is empty or not.
pub fn events_is_empty(&self) -> bool {
self.events.is_empty()
}
Expand Down
12 changes: 0 additions & 12 deletions crates/p2p/src/swarm/hasher.rs

This file was deleted.

43 changes: 26 additions & 17 deletions crates/p2p/src/swarm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, sync::LazyLock, time::Duration};

use behavior::{Behaviour, BehaviourEvent};
use bitcoin::hashes::Hash;
use futures::StreamExt as _;
use handle::P2PHandle;
use hasher::Sha256Hasher;
use libp2p::{
core::{muxing::StreamMuxerBox, transport::MemoryTransport, ConnectedPoint},
gossipsub::{
Event as GossipsubEvent, Hasher as _, Message, MessageAcceptance, MessageId, Topic,
},
gossipsub::{Event as GossipsubEvent, Message, MessageAcceptance, MessageId, Sha256Topic},
identity::secp256k1::Keypair,
noise,
request_response::Event as RequestResponseEvent,
Expand Down Expand Up @@ -42,10 +39,13 @@ use crate::{
mod behavior;
mod codec;
pub mod handle;
mod hasher;

const TOPIC: &str = "bitvm2";
// TODO(Velnbur): make this configurable later
/// Global topic name for gossipsub messages.
static TOPIC: LazyLock<Sha256Topic> = LazyLock::new(|| Sha256Topic::new("bitvm2"));

// TODO(Velnbur): make this configurable later
/// Global name of the protocol
const PROTOCOL_NAME: &str = "/strata-bitvm2";

#[derive(Snafu, Debug)]
Expand All @@ -63,6 +63,7 @@ pub enum Error {

pub type P2PResult<T> = Result<T, Error>;

/// Configuration options for [`P2P`].
pub struct P2PConfig {
/// Duration which will be considered stale after last moment current operator received message
/// that advanced it's state.
Expand All @@ -83,6 +84,7 @@ pub struct P2PConfig {
pub connect_to: Vec<Multiaddr>,
}

/// Implementation of p2p protocol for BitVM2 data exchange.
pub struct P2P<DepositSetupPayload: ProtoMsg + Clone, Repository> {
swarm: Swarm<Behaviour>,

Expand All @@ -99,8 +101,8 @@ pub struct P2P<DepositSetupPayload: ProtoMsg + Clone, Repository> {
config: P2PConfig,
}

/// Alias for returning without dropping channel to P2P and P2P itself.
type P2PWithHandle<DSP, Repository> = (P2P<DSP, Repository>, P2PHandle<DSP>);
/// Alias for P2P and P2PHandle tuple returned by `from_config`.
pub type P2PWithHandle<DSP, Repository> = (P2P<DSP, Repository>, P2PHandle<DSP>);

impl<DSP, DB: RepositoryExt<DSP>> P2P<DSP, DB>
where
Expand Down Expand Up @@ -163,7 +165,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.subscribe(&Topic::<Sha256Hasher>::new(TOPIC))
.subscribe(&TOPIC)
.inspect_err(|err| error!(%err, "Failed to subscribe for events"));

let mut subscriptions = 0;
Expand Down Expand Up @@ -197,6 +199,11 @@ where
info!("Established all connections and subscriptions");
}

/// Start listening and handling events from the network and commands from
/// handles.
///
/// This method should be spawned in separate async task or polled periodicly
/// to advance handling of new messages, event or commands.
pub async fn listen(mut self) {
loop {
let result = select! {
Expand Down Expand Up @@ -258,6 +265,11 @@ where
}
}

/// Handle new message from gossipsub network.
///
/// If message is not [`GossipsubMsg`] or is not signed, the message will
/// be rejected without propagation, otherwise if we didn't handled it
/// before, send an [`Event`] to handles, store it and reset timeout.
#[instrument(skip(self, message), fields(sender = %message.source.unwrap()))]
async fn handle_gossip_msg(
&mut self,
Expand Down Expand Up @@ -288,7 +300,7 @@ where
.source
.expect("Message must have author as ValidationMode set to Permissive");
if let Err(err) = validate_gossipsub_msg(source, &msg) {
debug!(reason=%err, "Got invalid message from peer, rejecting it.");
debug!(reason=%err, "Got invalid signature from peer, rejecting message.");
// no error should appear in case of message rejection
let _ = self
.swarm
Expand Down Expand Up @@ -334,8 +346,8 @@ where
Ok(())
}

/// Insert data received from event and reset timeout for this peer and deposit if it wasn't
/// set before.
/// Insert data received from event and reset timeout for this peer and
/// deposit if it wasn't set before.
///
/// Returns if the data was already presented or not.
async fn insert_msg_if_not_exists_with_timeout(
Expand Down Expand Up @@ -469,10 +481,7 @@ where
.swarm
.behaviour_mut()
.gossipsub
.publish(
/* TODO(Velnbur): make this constant */ Sha256Hasher::hash(TOPIC.to_owned()),
msg.into_raw().encode_to_vec(),
)
.publish(TOPIC.hash(), msg.into_raw().encode_to_vec())
.inspect_err(|err| debug!(%err, "Failed to publish msg through gossipsub"));

Ok(())
Expand Down
Loading