From 4b680fca5e5d0a1ce47ba6bf650b5d8adda58d3e Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Fri, 10 Jan 2025 18:24:10 +0200 Subject: [PATCH 1/4] feat: added basic request-response --- crates/p2p/src/swarm/mod.rs | 293 +++++++++++++++++++++++++++++------- 1 file changed, 237 insertions(+), 56 deletions(-) diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index e996ff2..6120ffa 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -1,14 +1,14 @@ use std::{collections::HashSet, sync::LazyLock, time::Duration}; use behavior::{Behaviour, BehaviourEvent}; -use bitcoin::hashes::Hash; +use bitcoin::hashes::{sha256, Hash}; use futures::StreamExt as _; use handle::P2PHandle; use libp2p::{ core::{muxing::StreamMuxerBox, transport::MemoryTransport, ConnectedPoint}, gossipsub::{Event as GossipsubEvent, Message, MessageAcceptance, MessageId, Sha256Topic}, identity::secp256k1::Keypair, - noise, + noise, request_response, request_response::Event as RequestResponseEvent, swarm::SwarmEvent, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, Transport, @@ -19,9 +19,17 @@ use strata_p2p_db::{ states::PeerDepositState, DBResult, DepositSetupEntry, GenesisInfoEntry, NoncesEntry, PartialSignaturesEntry, RepositoryError, RepositoryExt, }; -use strata_p2p_wire::p2p::v1::{ - proto::{get_message_request, DepositRequestKey, GetMessageRequest, GetMessageResponse}, - GossipsubMsg, GossipsubMsgDepositKind, GossipsubMsgKind, +use strata_p2p_wire::p2p::{ + v1, + v1::{ + proto, + proto::{ + get_message_request, gossipsub_msg::Body, DepositRequestKey, GetMessageRequest, + GetMessageResponse, + }, + DepositNonces, DepositSetup, DepositSigs, GetMessageRequestExchangeKind, GossipsubMsg, + GossipsubMsgDepositKind, GossipsubMsgKind, + }, }; use tokio::{ select, @@ -240,7 +248,7 @@ where match event { BehaviourEvent::Gossipsub(event) => self.handle_gossip_event(event).await, BehaviourEvent::RequestResponse(event) => { - self.handle_response_request_event(event).await + self.handle_request_response_event(event).await } BehaviourEvent::Identify(_event) => { // let identify::Event::Received() @@ -536,58 +544,231 @@ where Ok(()) } - #[instrument(skip(self, _event))] - async fn handle_response_request_event( + #[instrument(skip(self, event))] + async fn handle_request_response_event( + &mut self, + event: RequestResponseEvent, + ) -> P2PResult<()> { + let RequestResponseEvent::Message { peer, message } = event else { + return Ok(()); + }; + + match message { + request_response::Message::Request { + request_id, + request, + channel, + } => { + let Some(req) = v1::GetMessageRequest::from_msg(request) else { + debug!(%peer, "Peer sent invalid get message request, disconnecting it"); + let _ = self.swarm.disconnect_peer_id(peer); + return Ok(()); + }; + + let body = self.handle_get_message_request(req).await?; + + if body.is_none() { + debug!(%request_id, "Have no needed data, requesting from neighbours"); + return Ok(()); // TODO(NikitaMasych): launch recursive request. + } + + let msg = proto::GossipsubMsg { + key: vec![], // TODO(NikitaMasych): add sign msg + signature: vec![], // TODO(NikitaMasych): add sign msg + body, + }; + let response = GetMessageResponse { msg: vec![msg] }; + + let _ = self + .swarm + .behaviour_mut() + .request_response + .send_response(channel, response) + .inspect_err(|_| debug!("Failed to send response")); + } + + request_response::Message::Response { + request_id, + response, + } => { + if response.msg.is_empty() { + debug!(%request_id, "Have no needed data, requesting from neighbours"); + return Ok(()); // TODO(NikitaMasych): launch recursive request. + } + let mut all_messages_empty = true; + for msg in response.msg.into_iter() { + // if let Err(err) = validate_gossipsub_msg(peer, &msg) { + // debug!(reason=%err, "Got invalid signature from peer, rejecting + // message."); let _ = + // self.swarm.disconnect_peer_id(peer); // TODO(NikitaMasych): report validation + // error? return Ok(()); + // } + + if msg.body.is_none() { + continue; + } + + all_messages_empty = false; + + self.handle_get_message_response(peer, msg) + .await + .or_else(|err| { + if !matches!(err, Error::Repository { .. }) { + debug!(%peer, "Peer sent invalid message"); // TODO: punish? + } + Err(err) + })?; + } + if all_messages_empty { + debug!(%request_id, "Have no needed data, requesting from neighbours"); + return Ok(()); // TODO(NikitaMasych): launch recursive request. + } + } + }; + + Ok(()) + } + + async fn handle_get_message_request( + &mut self, + request: v1::GetMessageRequest, + ) -> P2PResult> { + let body = match request { + v1::GetMessageRequest::Genesis { operator_id } => { + let info = self + .db + .get_genesis_info(operator_id) + .await + .context(RepositorySnafu)?; + + info.map(|v| { + Body::GenesisInfo(proto::GenesisInfo { + pre_stake_vout: v.entry.0.vout, + pre_stake_txid: v.entry.0.txid.to_byte_array().to_vec(), + checkpoint_pubkeys: v + .entry + .1 + .iter() + .map(|k| k.serialize().to_vec()) + .collect(), + }) + }) + } + v1::GetMessageRequest::ExchangeSession { + scope, + operator_id, + kind, + } => match kind { + GetMessageRequestExchangeKind::Setup => { + let setup = self + .db + .get_deposit_setup(operator_id, scope) + .await + .context(RepositorySnafu)?; + + setup.map(|v| { + Body::Setup(proto::DepositSetupExchange { + scope: scope.to_byte_array().to_vec(), + payload: v.payload.encode_to_vec(), + }) + }) + } + GetMessageRequestExchangeKind::Nonces => { + let nonces = self + .db + .get_pub_nonces(operator_id, scope) + .await + .context(RepositorySnafu)?; + + nonces.map(|v| { + Body::Nonce(proto::DepositNoncesExchange { + scope: scope.to_byte_array().to_vec(), + pub_nonces: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), + }) + }) + } + GetMessageRequestExchangeKind::Signatures => { + let sigs = self + .db + .get_partial_signatures(operator_id, scope) + .await + .context(RepositorySnafu)?; + + sigs.map(|v| { + Body::Sigs(proto::DepositSignaturesExchange { + scope: scope.to_byte_array().to_vec(), + partial_sigs: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), + }) + }) + } + }, + }; + + Ok(body) + } + async fn handle_get_message_response( &mut self, - _event: RequestResponseEvent, + peer: PeerId, + msg: proto::GossipsubMsg, ) -> P2PResult<()> { - // let RequestResponseEvent::Message { peer, message } = event else { - // return Ok(()); - // }; - - // match message { - // request_response::Message::Request { - // request_id, - // request, - // channel, - // } => { - // let Some(request) = typed::GetMessageRequest::from_msg(request) else { - // debug!(%peer, "Peer sent invalid get message request, disconnecting it"); - // let _ = self.swarm.disconnect_peer_id(peer); - // return Ok(()); - // }; - // match request { - // typed::GetMessageRequest::Genesis { operator_id } => todo!(), - // typed::GetMessageRequest::ExchangeSession { - // scope, - // operator_id, - // kind, - // } => { - // match kind { - // typed::GetMessageRequestExchangeKind::Setup => self - // .db - // .get_deposit_setup(operator_id, scope) - // .map(|| GetMessageResponse { - // body: Some(get_message_response::Body::Setup( - // DepositSetupExchange { - // scope: todo!(), - // payload: todo!(), - // }, - // )), - // }), - // typed::GetMessageRequestExchangeKind::Nonces => self.db.get, - // typed::GetMessageRequestExchangeKind::Signatures => todo!(), - // }; - // } - // }; - // } - // request_response::Message::Response { - // request_id, - // response, - // } => todo!(), - // }; - - todo!() + match msg.body.unwrap() { + Body::Setup(v) => { + let scope = sha256::Hash::from_slice(&v.scope) + .whatever_context("failed to convert scope")?; + let setup = + DepositSetup::from_proto_msg(&v).whatever_context("failed to convert setup")?; + let entry = DepositSetupEntry { + payload: setup.payload, + signature: msg.signature, + }; + self.db + .set_deposit_setup(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + Body::Nonce(v) => { + let scope = sha256::Hash::from_slice(&v.scope) + .whatever_context("failed to convert scope")?; + let nonces = DepositNonces::from_proto_msg(&v) + .whatever_context("failed to convert nonces")?; + let entry = NoncesEntry { + entry: nonces.nonces, + signature: msg.signature, + }; + self.db + .set_pub_nonces(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + Body::Sigs(v) => { + let scope = sha256::Hash::from_slice(&v.scope) + .whatever_context("failed to convert scope")?; + let sigs = DepositSigs::from_proto_msg(&v) + .whatever_context("failed to convert signatures")?; + let entry = PartialSignaturesEntry { + entry: sigs.partial_sigs, + signature: msg.signature, + }; + self.db + .set_partial_signatures(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + Body::GenesisInfo(v) => { + let info = v1::GenesisInfo::from_proto_msg(&v) + .whatever_context("failed to convert genesis info")?; + let entry = GenesisInfoEntry { + entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys), + signature: msg.signature, + }; + self.db + .set_genesis_info(peer, entry) + .await + .context(RepositorySnafu)? + } + } + + Ok(()) } } From 4a13417a9942ad232c0b0ad3d53528ba1d81a425 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 13 Jan 2025 14:36:50 +0200 Subject: [PATCH 2/4] feat: added pub key storage in db entry and signature validation --- crates/db/src/lib.rs | 10 ++-- crates/db/src/sled.rs | 3 ++ crates/p2p/src/swarm/mod.rs | 96 ++++++++++++++++++++------------- crates/wire/src/p2p/v1/typed.rs | 9 ++++ 4 files changed, 77 insertions(+), 41 deletions(-) diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 0d05976..46664db 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -24,14 +24,15 @@ pub enum RepositoryError { } #[derive(serde::Serialize, serde::Deserialize, Debug)] -pub struct EntryWithSig { +pub struct EntryWithSigAndKey { pub entry: T, pub signature: Vec, + pub key: Vec, // public key used to create a signature } -pub type PartialSignaturesEntry = EntryWithSig>; -pub type NoncesEntry = EntryWithSig>; -pub type GenesisInfoEntry = EntryWithSig<(OutPoint, Vec)>; +pub type PartialSignaturesEntry = EntryWithSigAndKey>; +pub type NoncesEntry = EntryWithSigAndKey>; +pub type GenesisInfoEntry = EntryWithSigAndKey<(OutPoint, Vec)>; #[async_trait] pub trait Repository: Send + Sync + 'static { @@ -177,4 +178,5 @@ pub struct DepositSetupEntry { #[serde(with = "prost_serde")] pub payload: DSP, pub signature: Vec, + pub key: Vec, // public key used to create a signature } diff --git a/crates/db/src/sled.rs b/crates/db/src/sled.rs index 6bb077b..4059e1f 100644 --- a/crates/db/src/sled.rs +++ b/crates/db/src/sled.rs @@ -62,6 +62,7 @@ mod tests { let nonces_entry = NoncesEntry { entry: vec![pub_nonce.clone()], signature: vec![0x8; 32], + key: vec![0x8; 32], }; db.set_pub_nonces(operator_id, tx_id, nonces_entry) @@ -77,6 +78,7 @@ mod tests { let sigs_entry = PartialSignaturesEntry { entry: vec![signature], signature: vec![], + key: vec![0x8; 32], }; db.set_partial_signatures(operator_id, tx_id, sigs_entry) @@ -96,6 +98,7 @@ mod tests { let entry = GenesisInfoEntry { entry: (outpoint, checkpoint_pubkeys.clone()), signature: vec![], + key: vec![0x8; 32], }; db.set_genesis_info(operator_id, entry).await.unwrap(); diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index 6120ffa..2885e47 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -375,6 +375,7 @@ where GenesisInfoEntry { entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys.clone()), signature: msg.signature.clone(), + key: msg.key.to_bytes().to_vec(), }, ) .await?; @@ -399,6 +400,7 @@ where DepositSetupEntry { payload: dep.payload.clone(), signature: msg.signature.clone(), + key: msg.key.to_bytes().to_vec(), }, ) .await?; @@ -425,6 +427,7 @@ where NoncesEntry { entry: dep.nonces.clone(), signature: msg.signature.clone(), + key: msg.key.to_bytes().to_vec(), }, ) .await?; @@ -450,6 +453,7 @@ where PartialSignaturesEntry { entry: dep.partial_sigs.clone(), signature: msg.signature.clone(), + key: msg.key.to_bytes().to_vec(), }, ) .await?; @@ -565,19 +569,16 @@ where return Ok(()); }; - let body = self.handle_get_message_request(req).await?; + let msg = self.handle_get_message_request(req).await?; - if body.is_none() { + if msg.is_none() { debug!(%request_id, "Have no needed data, requesting from neighbours"); return Ok(()); // TODO(NikitaMasych): launch recursive request. } - let msg = proto::GossipsubMsg { - key: vec![], // TODO(NikitaMasych): add sign msg - signature: vec![], // TODO(NikitaMasych): add sign msg - body, + let response = GetMessageResponse { + msg: vec![msg.unwrap()], }; - let response = GetMessageResponse { msg: vec![msg] }; let _ = self .swarm @@ -597,27 +598,35 @@ where } let mut all_messages_empty = true; for msg in response.msg.into_iter() { - // if let Err(err) = validate_gossipsub_msg(peer, &msg) { - // debug!(reason=%err, "Got invalid signature from peer, rejecting - // message."); let _ = - // self.swarm.disconnect_peer_id(peer); // TODO(NikitaMasych): report validation - // error? return Ok(()); - // } - if msg.body.is_none() { continue; } + // TODO: report/punish peer for invalid message? + match GossipsubMsg::from_proto(msg.clone()) { + Ok(msg) => { + if let Err(err) = validate_gossipsub_msg::(peer, &msg) { + debug!(%peer, reason=%err, "Got invalid signature from peer, rejecting message."); + continue; + } + } + Err(err) => { + debug!(%peer, reason=%err, "Peer sent invalid message"); + continue; + } + } + all_messages_empty = false; - self.handle_get_message_response(peer, msg) - .await - .or_else(|err| { - if !matches!(err, Error::Repository { .. }) { - debug!(%peer, "Peer sent invalid message"); // TODO: punish? - } - Err(err) - })?; + let result = self.handle_get_message_response(peer, msg).await; + + if let Err(err) = result { + if matches!(err, Error::Repository { .. }) { + return Err(err); + } else { + debug!(%peer, reason=%err, "Peer sent invalid message"); + } + } } if all_messages_empty { debug!(%request_id, "Have no needed data, requesting from neighbours"); @@ -632,8 +641,8 @@ where async fn handle_get_message_request( &mut self, request: v1::GetMessageRequest, - ) -> P2PResult> { - let body = match request { + ) -> P2PResult> { + let msg = match request { v1::GetMessageRequest::Genesis { operator_id } => { let info = self .db @@ -641,8 +650,8 @@ where .await .context(RepositorySnafu)?; - info.map(|v| { - Body::GenesisInfo(proto::GenesisInfo { + info.map(|v| proto::GossipsubMsg { + body: Some(Body::GenesisInfo(proto::GenesisInfo { pre_stake_vout: v.entry.0.vout, pre_stake_txid: v.entry.0.txid.to_byte_array().to_vec(), checkpoint_pubkeys: v @@ -651,7 +660,9 @@ where .iter() .map(|k| k.serialize().to_vec()) .collect(), - }) + })), + signature: v.signature, + key: v.key, }) } v1::GetMessageRequest::ExchangeSession { @@ -666,11 +677,13 @@ where .await .context(RepositorySnafu)?; - setup.map(|v| { - Body::Setup(proto::DepositSetupExchange { + setup.map(|v| proto::GossipsubMsg { + body: Some(Body::Setup(proto::DepositSetupExchange { scope: scope.to_byte_array().to_vec(), payload: v.payload.encode_to_vec(), - }) + })), + signature: v.signature, + key: v.key, }) } GetMessageRequestExchangeKind::Nonces => { @@ -680,11 +693,13 @@ where .await .context(RepositorySnafu)?; - nonces.map(|v| { - Body::Nonce(proto::DepositNoncesExchange { + nonces.map(|v| proto::GossipsubMsg { + body: Some(Body::Nonce(proto::DepositNoncesExchange { scope: scope.to_byte_array().to_vec(), pub_nonces: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), - }) + })), + signature: v.signature, + key: v.key, }) } GetMessageRequestExchangeKind::Signatures => { @@ -694,18 +709,21 @@ where .await .context(RepositorySnafu)?; - sigs.map(|v| { - Body::Sigs(proto::DepositSignaturesExchange { + sigs.map(|v| proto::GossipsubMsg { + body: Some(Body::Sigs(proto::DepositSignaturesExchange { scope: scope.to_byte_array().to_vec(), partial_sigs: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), - }) + })), + signature: v.signature, + key: v.key, }) } }, }; - Ok(body) + Ok(msg) } + async fn handle_get_message_response( &mut self, peer: PeerId, @@ -720,6 +738,7 @@ where let entry = DepositSetupEntry { payload: setup.payload, signature: msg.signature, + key: msg.key, }; self.db .set_deposit_setup(peer, scope, entry) @@ -734,6 +753,7 @@ where let entry = NoncesEntry { entry: nonces.nonces, signature: msg.signature, + key: msg.key, }; self.db .set_pub_nonces(peer, scope, entry) @@ -748,6 +768,7 @@ where let entry = PartialSignaturesEntry { entry: sigs.partial_sigs, signature: msg.signature, + key: msg.key, }; self.db .set_partial_signatures(peer, scope, entry) @@ -760,6 +781,7 @@ where let entry = GenesisInfoEntry { entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys), signature: msg.signature, + key: msg.key, }; self.db .set_genesis_info(peer, entry) diff --git a/crates/wire/src/p2p/v1/typed.rs b/crates/wire/src/p2p/v1/typed.rs index 4512a5c..6684617 100644 --- a/crates/wire/src/p2p/v1/typed.rs +++ b/crates/wire/src/p2p/v1/typed.rs @@ -335,6 +335,15 @@ where }) } + pub fn from_proto(msg: ProtoGossipMsg) -> Result { + Ok(Self { + signature: msg.signature, + key: PublicKey::try_from_bytes(&msg.key) + .map_err(|_| DecodeError::new("couldn't parse pub key"))?, + kind: GossipsubMsgKind::from_msg_proto(&msg.body.unwrap())?, + }) + } + pub fn into_raw(self) -> ProtoGossipMsg { ProtoGossipMsg { key: self.key.to_bytes().to_vec(), From d9936bae0076efaacce0d1842dc469f9ca7f5c56 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 13 Jan 2025 17:01:44 +0200 Subject: [PATCH 3/4] fix: review issues resolved --- crates/db/src/lib.rs | 39 ++++++++- crates/db/src/sled.rs | 18 ++++- crates/p2p/src/swarm/mod.rs | 155 ++++++++++++++++-------------------- 3 files changed, 118 insertions(+), 94 deletions(-) diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 46664db..47bad06 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use bitcoin::{OutPoint, XOnlyPublicKey, hashes::sha256}; -use libp2p_identity::PeerId; +use libp2p_identity::{PeerId, secp256k1::PublicKey}; use musig2::{PartialSignature, PubNonce}; -use serde::{Serialize, de::DeserializeOwned}; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned}; use snafu::{ResultExt, Snafu}; use crate::states::PeerDepositState; @@ -23,11 +23,42 @@ pub enum RepositoryError { InvalidData { source: Box }, } +#[derive(Debug, Clone)] +pub struct SerializablePublicKey(pub PublicKey); + +impl Serialize for SerializablePublicKey { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let key_bytes = self.0.to_bytes(); + serializer.serialize_bytes(&key_bytes) + } +} + +impl<'de> Deserialize<'de> for SerializablePublicKey { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let key_bytes: Vec = Deserialize::deserialize(deserializer)?; + PublicKey::try_from_bytes(&key_bytes) + .map(SerializablePublicKey) + .map_err(serde::de::Error::custom) + } +} + +impl From for SerializablePublicKey { + fn from(key: PublicKey) -> Self { + SerializablePublicKey(key) + } +} + #[derive(serde::Serialize, serde::Deserialize, Debug)] pub struct EntryWithSigAndKey { pub entry: T, pub signature: Vec, - pub key: Vec, // public key used to create a signature + pub key: SerializablePublicKey, } pub type PartialSignaturesEntry = EntryWithSigAndKey>; @@ -178,5 +209,5 @@ pub struct DepositSetupEntry { #[serde(with = "prost_serde")] pub payload: DSP, pub signature: Vec, - pub key: Vec, // public key used to create a signature + pub key: SerializablePublicKey, } diff --git a/crates/db/src/sled.rs b/crates/db/src/sled.rs index 4059e1f..0386e0f 100644 --- a/crates/db/src/sled.rs +++ b/crates/db/src/sled.rs @@ -35,7 +35,9 @@ mod tests { use rand::thread_rng; use secp256k1::{All, Keypair, Secp256k1}; - use crate::{GenesisInfoEntry, NoncesEntry, PartialSignaturesEntry, RepositoryExt}; + use crate::{ + GenesisInfoEntry, NoncesEntry, PartialSignaturesEntry, RepositoryExt, SerializablePublicKey, + }; #[tokio::test] async fn test_repository() { @@ -46,6 +48,7 @@ mod tests { let secp = Secp256k1::new(); let keypair = Keypair::new(&secp, &mut rand::thread_rng()); let message = b"message"; + let libp2p_pkey = generate_random_pubkey(); let sec_nonce = SecNonce::generate( [0u8; 32], @@ -62,7 +65,7 @@ mod tests { let nonces_entry = NoncesEntry { entry: vec![pub_nonce.clone()], signature: vec![0x8; 32], - key: vec![0x8; 32], + key: libp2p_pkey.clone(), }; db.set_pub_nonces(operator_id, tx_id, nonces_entry) @@ -78,7 +81,7 @@ mod tests { let sigs_entry = PartialSignaturesEntry { entry: vec![signature], signature: vec![], - key: vec![0x8; 32], + key: libp2p_pkey.clone(), }; db.set_partial_signatures(operator_id, tx_id, sigs_entry) @@ -98,7 +101,7 @@ mod tests { let entry = GenesisInfoEntry { entry: (outpoint, checkpoint_pubkeys.clone()), signature: vec![], - key: vec![0x8; 32], + key: libp2p_pkey.clone(), }; db.set_genesis_info(operator_id, entry).await.unwrap(); @@ -126,4 +129,11 @@ mod tests { let (xonly, _parity) = pubkey.x_only_public_key(); xonly } + + fn generate_random_pubkey() -> SerializablePublicKey { + let keypair = libp2p_identity::secp256k1::Keypair::generate(); + let pk = keypair.public().clone(); + + SerializablePublicKey::from(pk) + } } diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index 2885e47..345458a 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -1,7 +1,7 @@ use std::{collections::HashSet, sync::LazyLock, time::Duration}; use behavior::{Behaviour, BehaviourEvent}; -use bitcoin::hashes::{sha256, Hash}; +use bitcoin::hashes::Hash; use futures::StreamExt as _; use handle::P2PHandle; use libp2p::{ @@ -27,8 +27,7 @@ use strata_p2p_wire::p2p::{ get_message_request, gossipsub_msg::Body, DepositRequestKey, GetMessageRequest, GetMessageResponse, }, - DepositNonces, DepositSetup, DepositSigs, GetMessageRequestExchangeKind, GossipsubMsg, - GossipsubMsgDepositKind, GossipsubMsgKind, + GetMessageRequestExchangeKind, GossipsubMsg, GossipsubMsgDepositKind, GossipsubMsgKind, }, }; use tokio::{ @@ -375,7 +374,7 @@ where GenesisInfoEntry { entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys.clone()), signature: msg.signature.clone(), - key: msg.key.to_bytes().to_vec(), + key: msg.key.clone().into(), }, ) .await?; @@ -400,7 +399,7 @@ where DepositSetupEntry { payload: dep.payload.clone(), signature: msg.signature.clone(), - key: msg.key.to_bytes().to_vec(), + key: msg.key.clone().into(), }, ) .await?; @@ -427,7 +426,7 @@ where NoncesEntry { entry: dep.nonces.clone(), signature: msg.signature.clone(), - key: msg.key.to_bytes().to_vec(), + key: msg.key.clone().into(), }, ) .await?; @@ -453,7 +452,7 @@ where PartialSignaturesEntry { entry: dep.partial_sigs.clone(), signature: msg.signature.clone(), - key: msg.key.to_bytes().to_vec(), + key: msg.key.clone().into(), }, ) .await?; @@ -553,6 +552,15 @@ where &mut self, event: RequestResponseEvent, ) -> P2PResult<()> { + if let RequestResponseEvent::InboundFailure { + peer, + request_id, + error, + } = event + { + debug!(peer=?peer, error=?error, request_id=?request_id, "Failed to send response"); + return Ok(()); + } let RequestResponseEvent::Message { peer, message } = event else { return Ok(()); }; @@ -569,17 +577,13 @@ where return Ok(()); }; - let msg = self.handle_get_message_request(req).await?; - - if msg.is_none() { + let Some(msg) = self.handle_get_message_request(req).await? else { debug!(%request_id, "Have no needed data, requesting from neighbours"); return Ok(()); // TODO(NikitaMasych): launch recursive request. - } - - let response = GetMessageResponse { - msg: vec![msg.unwrap()], }; + let response = GetMessageResponse { msg: vec![msg] }; + let _ = self .swarm .behaviour_mut() @@ -603,30 +607,21 @@ where } // TODO: report/punish peer for invalid message? - match GossipsubMsg::from_proto(msg.clone()) { - Ok(msg) => { - if let Err(err) = validate_gossipsub_msg::(peer, &msg) { - debug!(%peer, reason=%err, "Got invalid signature from peer, rejecting message."); - continue; - } - } + let msg = match GossipsubMsg::from_proto(msg.clone()) { + Ok(msg) => msg, Err(err) => { debug!(%peer, reason=%err, "Peer sent invalid message"); continue; } + }; + if let Err(err) = validate_gossipsub_msg::(peer, &msg) { + debug!(%peer, reason=%err, "Got invalid signature from peer, rejecting message."); + continue; } all_messages_empty = false; - let result = self.handle_get_message_response(peer, msg).await; - - if let Err(err) = result { - if matches!(err, Error::Repository { .. }) { - return Err(err); - } else { - debug!(%peer, reason=%err, "Peer sent invalid message"); - } - } + self.handle_get_message_response(peer, msg).await? } if all_messages_empty { debug!(%request_id, "Have no needed data, requesting from neighbours"); @@ -662,7 +657,7 @@ where .collect(), })), signature: v.signature, - key: v.key, + key: v.key.0.to_bytes().to_vec(), }) } v1::GetMessageRequest::ExchangeSession { @@ -683,7 +678,7 @@ where payload: v.payload.encode_to_vec(), })), signature: v.signature, - key: v.key, + key: v.key.0.to_bytes().to_vec(), }) } GetMessageRequestExchangeKind::Nonces => { @@ -699,7 +694,7 @@ where pub_nonces: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), })), signature: v.signature, - key: v.key, + key: v.key.0.to_bytes().to_vec(), }) } GetMessageRequestExchangeKind::Signatures => { @@ -715,7 +710,7 @@ where partial_sigs: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), })), signature: v.signature, - key: v.key, + key: v.key.0.to_bytes().to_vec(), }) } }, @@ -727,67 +722,55 @@ where async fn handle_get_message_response( &mut self, peer: PeerId, - msg: proto::GossipsubMsg, + msg: GossipsubMsg, ) -> P2PResult<()> { - match msg.body.unwrap() { - Body::Setup(v) => { - let scope = sha256::Hash::from_slice(&v.scope) - .whatever_context("failed to convert scope")?; - let setup = - DepositSetup::from_proto_msg(&v).whatever_context("failed to convert setup")?; - let entry = DepositSetupEntry { - payload: setup.payload, - signature: msg.signature, - key: msg.key, - }; - self.db - .set_deposit_setup(peer, scope, entry) - .await - .context(RepositorySnafu)? - } - Body::Nonce(v) => { - let scope = sha256::Hash::from_slice(&v.scope) - .whatever_context("failed to convert scope")?; - let nonces = DepositNonces::from_proto_msg(&v) - .whatever_context("failed to convert nonces")?; - let entry = NoncesEntry { - entry: nonces.nonces, - signature: msg.signature, - key: msg.key, - }; - self.db - .set_pub_nonces(peer, scope, entry) - .await - .context(RepositorySnafu)? - } - Body::Sigs(v) => { - let scope = sha256::Hash::from_slice(&v.scope) - .whatever_context("failed to convert scope")?; - let sigs = DepositSigs::from_proto_msg(&v) - .whatever_context("failed to convert signatures")?; - let entry = PartialSignaturesEntry { - entry: sigs.partial_sigs, - signature: msg.signature, - key: msg.key, - }; - self.db - .set_partial_signatures(peer, scope, entry) - .await - .context(RepositorySnafu)? - } - Body::GenesisInfo(v) => { - let info = v1::GenesisInfo::from_proto_msg(&v) - .whatever_context("failed to convert genesis info")?; + match msg.kind { + GossipsubMsgKind::GenesisInfo(v) => { let entry = GenesisInfoEntry { - entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys), + entry: (v.pre_stake_outpoint, v.checkpoint_pubkeys), signature: msg.signature, - key: msg.key, + key: msg.key.clone().into(), }; self.db .set_genesis_info(peer, entry) .await .context(RepositorySnafu)? } + GossipsubMsgKind::Deposit { scope, kind } => match kind { + GossipsubMsgDepositKind::Sigs(v) => { + let entry = PartialSignaturesEntry { + entry: v.partial_sigs, + signature: msg.signature, + key: msg.key.clone().into(), + }; + self.db + .set_partial_signatures(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + GossipsubMsgDepositKind::Setup(v) => { + let entry = DepositSetupEntry { + payload: v.payload, + signature: msg.signature, + key: msg.key.clone().into(), + }; + self.db + .set_deposit_setup(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + GossipsubMsgDepositKind::Nonces(v) => { + let entry = NoncesEntry { + entry: v.nonces, + signature: msg.signature, + key: msg.key.clone().into(), + }; + self.db + .set_pub_nonces(peer, scope, entry) + .await + .context(RepositorySnafu)? + } + }, } Ok(()) From 6f93af65384a06bab083edd532d4820d1328124b Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 13 Jan 2025 18:41:34 +0200 Subject: [PATCH 4/4] fix: refactor pubkey serde + remove all_messages_empty --- crates/db/src/lib.rs | 29 ++++++++++------------------- crates/db/src/sled.rs | 14 +++++--------- crates/p2p/src/swarm/mod.rs | 34 ++++++++++++++-------------------- 3 files changed, 29 insertions(+), 48 deletions(-) diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 47bad06..97bcbdb 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -23,34 +23,23 @@ pub enum RepositoryError { InvalidData { source: Box }, } -#[derive(Debug, Clone)] -pub struct SerializablePublicKey(pub PublicKey); +pub mod public_key_serde { + use super::*; -impl Serialize for SerializablePublicKey { - fn serialize(&self, serializer: S) -> Result + pub fn serialize(key: &PublicKey, serializer: S) -> Result where S: Serializer, { - let key_bytes = self.0.to_bytes(); + let key_bytes = key.to_bytes(); serializer.serialize_bytes(&key_bytes) } -} -impl<'de> Deserialize<'de> for SerializablePublicKey { - fn deserialize(deserializer: D) -> Result + pub fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { let key_bytes: Vec = Deserialize::deserialize(deserializer)?; - PublicKey::try_from_bytes(&key_bytes) - .map(SerializablePublicKey) - .map_err(serde::de::Error::custom) - } -} - -impl From for SerializablePublicKey { - fn from(key: PublicKey) -> Self { - SerializablePublicKey(key) + PublicKey::try_from_bytes(&key_bytes).map_err(serde::de::Error::custom) } } @@ -58,7 +47,8 @@ impl From for SerializablePublicKey { pub struct EntryWithSigAndKey { pub entry: T, pub signature: Vec, - pub key: SerializablePublicKey, + #[serde(with = "public_key_serde")] + pub key: PublicKey, } pub type PartialSignaturesEntry = EntryWithSigAndKey>; @@ -209,5 +199,6 @@ pub struct DepositSetupEntry { #[serde(with = "prost_serde")] pub payload: DSP, pub signature: Vec, - pub key: SerializablePublicKey, + #[serde(with = "public_key_serde")] + pub key: PublicKey, } diff --git a/crates/db/src/sled.rs b/crates/db/src/sled.rs index 0386e0f..d3c2113 100644 --- a/crates/db/src/sled.rs +++ b/crates/db/src/sled.rs @@ -30,14 +30,12 @@ mod tests { OutPoint, XOnlyPublicKey, hashes::{Hash, sha256}, }; - use libp2p_identity::PeerId; + use libp2p_identity::{PeerId, secp256k1::PublicKey}; use musig2::{AggNonce, KeyAggContext, SecNonce, sign_partial}; use rand::thread_rng; use secp256k1::{All, Keypair, Secp256k1}; - use crate::{ - GenesisInfoEntry, NoncesEntry, PartialSignaturesEntry, RepositoryExt, SerializablePublicKey, - }; + use crate::{GenesisInfoEntry, NoncesEntry, PartialSignaturesEntry, RepositoryExt}; #[tokio::test] async fn test_repository() { @@ -130,10 +128,8 @@ mod tests { xonly } - fn generate_random_pubkey() -> SerializablePublicKey { - let keypair = libp2p_identity::secp256k1::Keypair::generate(); - let pk = keypair.public().clone(); - - SerializablePublicKey::from(pk) + fn generate_random_pubkey() -> PublicKey { + let kp = libp2p_identity::secp256k1::Keypair::generate(); + kp.public().clone() } } diff --git a/crates/p2p/src/swarm/mod.rs b/crates/p2p/src/swarm/mod.rs index 345458a..2d18a48 100644 --- a/crates/p2p/src/swarm/mod.rs +++ b/crates/p2p/src/swarm/mod.rs @@ -374,7 +374,7 @@ where GenesisInfoEntry { entry: (info.pre_stake_outpoint, info.checkpoint_pubkeys.clone()), signature: msg.signature.clone(), - key: msg.key.clone().into(), + key: msg.key.clone(), }, ) .await?; @@ -399,7 +399,7 @@ where DepositSetupEntry { payload: dep.payload.clone(), signature: msg.signature.clone(), - key: msg.key.clone().into(), + key: msg.key.clone(), }, ) .await?; @@ -426,7 +426,7 @@ where NoncesEntry { entry: dep.nonces.clone(), signature: msg.signature.clone(), - key: msg.key.clone().into(), + key: msg.key.clone(), }, ) .await?; @@ -452,7 +452,7 @@ where PartialSignaturesEntry { entry: dep.partial_sigs.clone(), signature: msg.signature.clone(), - key: msg.key.clone().into(), + key: msg.key.clone(), }, ) .await?; @@ -558,7 +558,7 @@ where error, } = event { - debug!(peer=?peer, error=?error, request_id=?request_id, "Failed to send response"); + debug!(%peer, %error, %request_id, "Failed to send response"); return Ok(()); } let RequestResponseEvent::Message { peer, message } = event else { @@ -600,7 +600,7 @@ where debug!(%request_id, "Have no needed data, requesting from neighbours"); return Ok(()); // TODO(NikitaMasych): launch recursive request. } - let mut all_messages_empty = true; + for msg in response.msg.into_iter() { if msg.body.is_none() { continue; @@ -619,14 +619,8 @@ where continue; } - all_messages_empty = false; - self.handle_get_message_response(peer, msg).await? } - if all_messages_empty { - debug!(%request_id, "Have no needed data, requesting from neighbours"); - return Ok(()); // TODO(NikitaMasych): launch recursive request. - } } }; @@ -657,7 +651,7 @@ where .collect(), })), signature: v.signature, - key: v.key.0.to_bytes().to_vec(), + key: v.key.to_bytes().to_vec(), }) } v1::GetMessageRequest::ExchangeSession { @@ -678,7 +672,7 @@ where payload: v.payload.encode_to_vec(), })), signature: v.signature, - key: v.key.0.to_bytes().to_vec(), + key: v.key.to_bytes().to_vec(), }) } GetMessageRequestExchangeKind::Nonces => { @@ -694,7 +688,7 @@ where pub_nonces: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), })), signature: v.signature, - key: v.key.0.to_bytes().to_vec(), + key: v.key.to_bytes().to_vec(), }) } GetMessageRequestExchangeKind::Signatures => { @@ -710,7 +704,7 @@ where partial_sigs: v.entry.iter().map(|n| n.serialize().to_vec()).collect(), })), signature: v.signature, - key: v.key.0.to_bytes().to_vec(), + key: v.key.to_bytes().to_vec(), }) } }, @@ -729,7 +723,7 @@ where let entry = GenesisInfoEntry { entry: (v.pre_stake_outpoint, v.checkpoint_pubkeys), signature: msg.signature, - key: msg.key.clone().into(), + key: msg.key.clone(), }; self.db .set_genesis_info(peer, entry) @@ -741,7 +735,7 @@ where let entry = PartialSignaturesEntry { entry: v.partial_sigs, signature: msg.signature, - key: msg.key.clone().into(), + key: msg.key.clone(), }; self.db .set_partial_signatures(peer, scope, entry) @@ -752,7 +746,7 @@ where let entry = DepositSetupEntry { payload: v.payload, signature: msg.signature, - key: msg.key.clone().into(), + key: msg.key.clone(), }; self.db .set_deposit_setup(peer, scope, entry) @@ -763,7 +757,7 @@ where let entry = NoncesEntry { entry: v.nonces, signature: msg.signature, - key: msg.key.clone().into(), + key: msg.key.clone(), }; self.db .set_pub_nonces(peer, scope, entry)