Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(identify): implement signedPeerRecord #5785

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use std::{
};

use libp2p_core::{
multiaddr, multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint, Multiaddr,
multiaddr::{self, Protocol},
transport::PortUse,
ConnectedPoint, Endpoint, Multiaddr,
};
use libp2p_identity::{PeerId, PublicKey};
use libp2p_identity::{Keypair, PeerId, PublicKey};
use libp2p_swarm::{
behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
Expand Down Expand Up @@ -118,7 +120,7 @@ pub struct Config {
/// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
protocol_version: String,
/// The public key of the local node. To report on the wire.
drHuangMHT marked this conversation as resolved.
Show resolved Hide resolved
local_public_key: PublicKey,
local_key: CryptoKey,
/// Name and version of the local peer implementation, similar to the
/// `User-Agent` header in the HTTP protocol.
///
Expand Down Expand Up @@ -156,12 +158,25 @@ pub struct Config {

impl Config {
/// Creates a new configuration for the identify [`Behaviour`] that
/// advertises the given protocol version and public key.
pub fn new(protocol_version: String, local_public_key: PublicKey) -> Self {
/// advertises the given protocol version and public key.
/// Use `new_with_keypair` for `SignedPeerRecord` support.
pub fn new(protocol_version: String, public_key: PublicKey) -> Self {
Self {
protocol_version,
agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
local_public_key,
local_key: public_key.into(),
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
hide_listen_addrs: false,
}
}

pub fn new_with_keypair(protocol_version: String, local_keypair: Keypair) -> Self {
drHuangMHT marked this conversation as resolved.
Show resolved Hide resolved
Self {
protocol_version,
agent_version: format!("rust-libp2p/{}", env!("CARGO_PKG_VERSION")),
local_key: local_keypair.into(),
interval: Duration::from_secs(5 * 60),
push_listen_addr_updates: false,
cache_size: 100,
Expand Down Expand Up @@ -209,7 +224,7 @@ impl Config {

/// Get the local public key of the Config.
pub fn local_public_key(&self) -> &PublicKey {
&self.local_public_key
self.local_key.public_key()
}

/// Get the agent version of the Config.
Expand Down Expand Up @@ -380,7 +395,7 @@ impl NetworkBehaviour for Behaviour {
Ok(Handler::new(
self.config.interval,
peer,
self.config.local_public_key.clone(),
self.config.local_key.clone(),
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
remote_addr.clone(),
Expand Down Expand Up @@ -413,7 +428,7 @@ impl NetworkBehaviour for Behaviour {
Ok(Handler::new(
self.config.interval,
peer,
self.config.local_public_key.clone(),
self.config.local_key.clone(),
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
// TODO: This is weird? That is the public address we dialed,
Expand Down Expand Up @@ -670,6 +685,39 @@ impl PeerCache {
}
}

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum CryptoKey {
drHuangMHT marked this conversation as resolved.
Show resolved Hide resolved
// With public key only the behaviour will not
// be able to produce a `SignedEnvelope`.
Public(PublicKey),
Keypair {
keypair: Keypair,
public_key: PublicKey,
},
}
impl From<PublicKey> for CryptoKey {
fn from(value: PublicKey) -> Self {
Self::Public(value)
}
}
impl From<Keypair> for CryptoKey {
fn from(value: Keypair) -> Self {
Self::Keypair {
public_key: value.public(),
keypair: value,
}
}
}
impl CryptoKey {
pub(crate) fn public_key(&self) -> &PublicKey {
match &self {
CryptoKey::Public(pubkey) => pubkey,
CryptoKey::Keypair { public_key, .. } => public_key,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 7 additions & 0 deletions protocols/identify/src/generated/structs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ message Identify {
optional bytes observedAddr = 4;

repeated string protocols = 3;

// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.
// see github.com/libp2p/go-libp2p/core/record/pb/envelope.proto and
// github.com/libp2p/go-libp2p/core/peer/pb/peer_record.proto for message definitions.
optional bytes signedPeerRecord = 8;
}
6 changes: 5 additions & 1 deletion protocols/identify/src/generated/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Identify {
pub listenAddrs: Vec<Vec<u8>>,
pub observedAddr: Option<Vec<u8>>,
pub protocols: Vec<String>,
pub signedPeerRecord: Option<Vec<u8>>,
}

impl<'a> MessageRead<'a> for Identify {
Expand All @@ -35,6 +36,7 @@ impl<'a> MessageRead<'a> for Identify {
Ok(18) => msg.listenAddrs.push(r.read_bytes(bytes)?.to_owned()),
Ok(34) => msg.observedAddr = Some(r.read_bytes(bytes)?.to_owned()),
Ok(26) => msg.protocols.push(r.read_string(bytes)?.to_owned()),
Ok(66) => msg.signedPeerRecord = Some(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
Expand All @@ -43,7 +45,7 @@ impl<'a> MessageRead<'a> for Identify {
}
}

impl MessageWrite for Identify {
impl<'a> MessageWrite for Identify {
fn get_size(&self) -> usize {
0
+ self.protocolVersion.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
Expand All @@ -52,6 +54,7 @@ impl MessageWrite for Identify {
+ self.listenAddrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
+ self.observedAddr.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.protocols.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
+ self.signedPeerRecord.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}

fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
Expand All @@ -61,6 +64,7 @@ impl MessageWrite for Identify {
for s in &self.listenAddrs { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.observedAddr { w.write_with_tag(34, |w| w.write_bytes(&**s))?; }
for s in &self.protocols { w.write_with_tag(26, |w| w.write_string(&**s))?; }
if let Some(ref s) = self.signedPeerRecord { w.write_with_tag(66, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
Expand Down
24 changes: 17 additions & 7 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use libp2p_core::{
upgrade::{ReadyUpgrade, SelectUpgrade},
Multiaddr,
};
use libp2p_identity::{PeerId, PublicKey};
use libp2p_identity::PeerId;
use libp2p_swarm::{
handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
Expand All @@ -45,8 +45,8 @@ use smallvec::SmallVec;
use tracing::Level;

use crate::{
protocol,
protocol::{Info, PushInfo, UpgradeError},
behaviour::CryptoKey,
protocol::{self, Info, PushInfo, UpgradeError},
PROTOCOL_NAME, PUSH_PROTOCOL_NAME,
};

Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct Handler {
interval: Duration,

/// The public key of the local peer.
public_key: PublicKey,
local_key: CryptoKey,

/// Application-specific version of the protocol family used by the peer,
/// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Handler {
pub fn new(
interval: Duration,
remote_peer_id: PeerId,
public_key: PublicKey,
public_key: CryptoKey,
protocol_version: String,
agent_version: String,
observed_addr: Multiaddr,
Expand All @@ -144,7 +144,7 @@ impl Handler {
trigger_next_identify: Delay::new(Duration::ZERO),
exchanged_one_periodic_identify: false,
interval,
public_key,
local_key: public_key,
protocol_version,
agent_version,
observed_addr,
Expand Down Expand Up @@ -232,13 +232,23 @@ impl Handler {
}

fn build_info(&mut self) -> Info {
let signed_envelope = match &self.local_key {
CryptoKey::Public(_) => None,
CryptoKey::Keypair { keypair, .. } => libp2p_core::PeerRecord::new(
keypair,
Vec::from_iter(self.external_addresses.iter().cloned()),
)
.ok()
.map(|r| r.into_signed_envelope()),
};
Info {
public_key: self.public_key.clone(),
public_key: self.local_key.public_key().clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs: Vec::from_iter(self.external_addresses.iter().cloned()),
protocols: Vec::from_iter(self.local_supported_protocols.iter().cloned()),
observed_addr: self.observed_addr.clone(),
signed_peer_record: signed_envelope,
}
}

Expand Down
21 changes: 15 additions & 6 deletions protocols/identify/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::io;

use asynchronous_codec::{FramedRead, FramedWrite};
use futures::prelude::*;
use libp2p_core::{multiaddr, Multiaddr};
use libp2p_core::{multiaddr, Multiaddr, SignedEnvelope};
use libp2p_identity as identity;
use libp2p_identity::PublicKey;
use libp2p_swarm::StreamProtocol;
Expand Down Expand Up @@ -53,6 +53,7 @@ pub struct Info {
pub protocols: Vec<StreamProtocol>,
/// Address observed by or for the remote.
pub observed_addr: Multiaddr,
pub signed_peer_record: Option<SignedEnvelope>,
}

impl Info {
Expand Down Expand Up @@ -108,6 +109,10 @@ where
listenAddrs: listen_addrs,
observedAddr: Some(info.observed_addr.to_vec()),
protocols: info.protocols.iter().map(|p| p.to_string()).collect(),
signedPeerRecord: info
.signed_peer_record
.clone()
.map(|r| r.into_protobuf_encoding()),
};

let mut framed_io = FramedWrite::new(
Expand Down Expand Up @@ -166,7 +171,7 @@ where
fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
listen_addrs
.into_iter()
.filter_map(|bytes| match Multiaddr::try_from(bytes) {
.filter_map(|bytes| match Multiaddr::try_from(bytes.to_vec()) {
Ok(a) => Some(a),
Err(e) => {
tracing::debug!("Unable to parse multiaddr: {e:?}");
Expand All @@ -179,7 +184,7 @@ fn parse_listen_addrs(listen_addrs: Vec<Vec<u8>>) -> Vec<Multiaddr> {
fn parse_protocols(protocols: Vec<String>) -> Vec<StreamProtocol> {
protocols
.into_iter()
.filter_map(|p| match StreamProtocol::try_from_owned(p) {
.filter_map(|p| match StreamProtocol::try_from_owned(p.to_string()) {
Ok(p) => Some(p),
Err(e) => {
tracing::debug!("Received invalid protocol from peer: {e}");
Expand All @@ -200,7 +205,7 @@ fn parse_public_key(public_key: Option<Vec<u8>>) -> Option<PublicKey> {
}

fn parse_observed_addr(observed_addr: Option<Vec<u8>>) -> Option<Multiaddr> {
observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes) {
observed_addr.and_then(|bytes| match Multiaddr::try_from(bytes.to_vec()) {
Ok(a) => Some(a),
Err(e) => {
tracing::debug!("Unable to parse observed multiaddr: {e:?}");
Expand Down Expand Up @@ -228,6 +233,9 @@ impl TryFrom<proto::Identify> for Info {
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr).unwrap_or(Multiaddr::empty()),
signed_peer_record: msg
.signedPeerRecord
.and_then(|b| SignedEnvelope::from_protobuf_encoding(b.as_ref()).ok()),
};

Ok(info)
Expand All @@ -240,8 +248,8 @@ impl TryFrom<proto::Identify> for PushInfo {
fn try_from(msg: proto::Identify) -> Result<Self, Self::Error> {
let info = PushInfo {
public_key: parse_public_key(msg.publicKey),
protocol_version: msg.protocolVersion,
agent_version: msg.agentVersion,
protocol_version: msg.protocolVersion.map(|v| v.to_string()),
agent_version: msg.agentVersion.map(|v| v.to_string()),
listen_addrs: parse_listen_addrs(msg.listenAddrs),
protocols: parse_protocols(msg.protocols),
observed_addr: parse_observed_addr(msg.observedAddr),
Expand Down Expand Up @@ -293,6 +301,7 @@ mod tests {
.public()
.encode_protobuf(),
),
signedPeerRecord: None,
};

let info = PushInfo::try_from(payload).expect("not to fail");
Expand Down
Loading