Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dcutr): exchange address candidates #4624

Merged
merged 22 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5b9b8c0
Fix TODO
thomaseizinger Oct 11, 2023
377eb99
Use address candidates in dcutr
thomaseizinger Oct 11, 2023
a6b59d5
Remove log
thomaseizinger Oct 11, 2023
72d1f78
Reuse `is_relayed`
thomaseizinger Oct 11, 2023
507d46c
Extract dedicated `Candidates` struct
thomaseizinger Oct 11, 2023
4ee89b0
Update changelog and version
thomaseizinger Oct 11, 2023
987fc5f
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 15, 2023
d144362
Remove unnecessary `.clone`
thomaseizinger Oct 16, 2023
bfd6e8a
No longer add external address explicitly
thomaseizinger Oct 16, 2023
ffc5fa6
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 19, 2023
fd58272
Update protocols/dcutr/src/behaviour.rs
thomaseizinger Oct 19, 2023
5761253
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 20, 2023
35daa57
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 25, 2023
990ff07
Clarify log for @mxinden
thomaseizinger Oct 25, 2023
0c612ba
Apply suggestions from code review
thomaseizinger Oct 25, 2023
4367585
Ensure `src` and `dst` don't have external addresses
thomaseizinger Oct 25, 2023
9feb28b
Merge remote-tracking branch 'origin/fix/dcutr-candidates' into fix/d…
thomaseizinger Oct 25, 2023
4200e11
Add assertion
thomaseizinger Oct 25, 2023
5d975d3
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 25, 2023
bdad319
Merge branch 'master' into fix/dcutr-candidates
thomaseizinger Oct 26, 2023
b9e235f
Don't add external address
thomaseizinger Oct 26, 2023
221df28
Merge branch 'master' into fix/dcutr-candidates
mergify[bot] Oct 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
info: identify::Info { observed_addr, .. },
..
})) => {
info!("Relay told us our public address: {:?}", observed_addr);
swarm.add_external_address(observed_addr);
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
info!("Relay told us our observed address: {observed_addr}");
learned_observed_addr = true;
}
event => panic!("{event:?}"),
Expand Down
1 change: 0 additions & 1 deletion hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ async fn client_connect_to_relay(
..
})) => {
log::info!("Relay told us our public address: {observed_addr}");
swarm.add_external_address(observed_addr);
break;
}
SwarmEvent::ConnectionEstablished { connection_id, .. }
Expand Down
4 changes: 4 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

[PR 4558]: https://github.com/libp2p/rust-libp2p/pull/4558

- Exchange address _candidates_ instead of external addresses in `CONNECT`.
If hole-punching wasn't working properly for you until now, this might be the reason why.
See [PR 4624](https://github.com/libp2p/rust-libp2p/pull/4624).

## 0.10.0

- Raise MSRV to 1.65.
Expand Down
1 change: 1 addition & 0 deletions protocols/dcutr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
thiserror = "1.0"
void = "1"
lru = "0.11.1"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm};
use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, THandler, THandlerOutEvent,
};
use libp2p_swarm::{
ExternalAddresses, NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent,
ToSwarm,
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, NewExternalAddrCandidate, THandler,
THandlerOutEvent,
};
use libp2p_swarm::{NetworkBehaviour, NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm};
use lru::LruCache;
use std::collections::{HashMap, HashSet, VecDeque};
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use thiserror::Error;
use void::Void;
Expand Down Expand Up @@ -79,9 +79,7 @@ pub struct Behaviour {
/// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,

external_addresses: ExternalAddresses,

local_peer_id: PeerId,
address_candidates: Candidates,

direct_to_relayed_connections: HashMap<ConnectionId, ConnectionId>,

Expand All @@ -95,20 +93,14 @@ impl Behaviour {
Behaviour {
queued_events: Default::default(),
direct_connections: Default::default(),
external_addresses: Default::default(),
local_peer_id,
address_candidates: Candidates::new(local_peer_id),
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
}
}

fn observed_addresses(&self) -> Vec<Multiaddr> {
self.external_addresses
.iter()
.filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit))
.cloned()
.map(|a| a.with(Protocol::P2p(self.local_peer_id)))
.collect()
self.address_candidates.iter().cloned().collect()
}

fn on_dial_failure(
Expand Down Expand Up @@ -359,13 +351,14 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm) {
self.external_addresses.on_swarm_event(&event);

match event {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
self.address_candidates.add(addr.clone());
}
FromSwarm::AddressChange(_)
| FromSwarm::ConnectionEstablished(_)
| FromSwarm::ListenFailure(_)
Expand All @@ -374,13 +367,48 @@ impl NetworkBehaviour for Behaviour {
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}

/// Stores our address candidates.
///
/// We use an [`LruCache`] to favor addresses that are reported more often.
/// When attempting a hole-punch, we will try more frequent addresses first.
/// Most of these addresses will come from observations by other nodes (via e.g. the identify protocol).
/// More common observations mean a more likely stable port-mapping and thus a higher chance of a successful hole-punch.
struct Candidates {
inner: LruCache<Multiaddr, ()>,
me: PeerId,
}

impl Candidates {
fn new(me: PeerId) -> Self {
Self {
inner: LruCache::new(NonZeroUsize::new(20).expect("20 > 0")),
me,
}
}

fn add(&mut self, mut address: Multiaddr) {
if is_relayed(&address) {
return;
}

if address.iter().last() != Some(Protocol::P2p(self.me)) {
address.push(Protocol::P2p(self.me));
}

self.inner.push(address, ());
}

fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
self.inner.iter().map(|(a, _)| a)
}
}

fn is_relayed(addr: &Multiaddr) -> bool {
addr.iter().any(|p| p == Protocol::P2pCircuit)
}
2 changes: 1 addition & 1 deletion protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

//! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection.

use crate::behaviour_impl::MAX_NUMBER_OF_UPGRADE_ATTEMPTS;
use crate::behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS;
use crate::protocol;
use either::Either;
use futures::future;
Expand Down
6 changes: 2 additions & 4 deletions protocols/dcutr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed.
mod behaviour;
mod handler;
mod protocol;

Expand All @@ -33,9 +33,7 @@ mod proto {
pub(crate) use self::holepunch::pb::{mod_HolePunch::*, HolePunch};
}

pub use behaviour_impl::Behaviour;
pub use behaviour_impl::Error;
pub use behaviour_impl::Event;
pub use behaviour::{Behaviour, Error, Event};
pub use protocol::PROTOCOL_NAME;
pub mod inbound {
pub use crate::protocol::inbound::UpgradeError;
Expand Down
72 changes: 52 additions & 20 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::upgrade::Version;
use libp2p_core::transport::{MemoryTransport, Transport};
use libp2p_dcutr as dcutr;
use libp2p_identify as identify;
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_plaintext as plaintext;
Expand All @@ -38,10 +39,19 @@ async fn connect() {
let mut dst = build_client();
let mut src = build_client();

// Have all swarms listen on a local memory address.
let (relay_addr, _) = relay.listen().await;
let (dst_addr, _) = dst.listen().await;
src.listen().await;
// Have all swarms listen on a local TCP address.
let (memory_addr, relay_addr) = relay.listen().await;
relay.remove_external_address(&memory_addr);
relay.add_external_address(relay_addr.clone());

let (dst_mem_addr, dst_tcp_addr) = dst.listen().await;
let (src_mem_addr, _) = src.listen().await;

dst.remove_external_address(&dst_mem_addr);
src.remove_external_address(&src_mem_addr);

assert!(src.external_addresses().next().is_none());
assert!(dst.external_addresses().next().is_none());

let relay_peer_id = *relay.local_peer_id();
let dst_peer_id = *dst.local_peer_id();
Expand Down Expand Up @@ -80,11 +90,12 @@ async fn connect() {
break;
}
}
ClientEvent::Identify(_) => {}
other => panic!("Unexpected event: {other:?}."),
}
}

let dst_addr = dst_addr.with(Protocol::P2p(dst_peer_id));
let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id));

let established_conn_id = src
.wait(move |e| match e {
Expand All @@ -109,20 +120,33 @@ async fn connect() {
assert_eq!(established_conn_id, reported_conn_id);
}

fn build_relay() -> Swarm<relay::Behaviour> {
fn build_relay() -> Swarm<Relay> {
Swarm::new_ephemeral(|identity| {
let local_peer_id = identity.public().to_peer_id();

relay::Behaviour::new(
local_peer_id,
relay::Config {
reservation_duration: Duration::from_secs(2),
..Default::default()
},
)
Relay {
relay: relay::Behaviour::new(
local_peer_id,
relay::Config {
reservation_duration: Duration::from_secs(2),
..Default::default()
},
),
identify: identify::Behaviour::new(identify::Config::new(
"/relay".to_owned(),
identity.public(),
)),
}
})
}

#[derive(NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct Relay {
relay: relay::Behaviour,
identify: identify::Behaviour,
}

fn build_client() -> Swarm<Client> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = local_key.public().to_peer_id();
Expand All @@ -142,6 +166,10 @@ fn build_client() -> Swarm<Client> {
Client {
relay: behaviour,
dcutr: dcutr::Behaviour::new(local_peer_id),
identify: identify::Behaviour::new(identify::Config::new(
"/client".to_owned(),
local_key.public(),
)),
},
local_peer_id,
Config::with_async_std_executor(),
Expand All @@ -153,6 +181,7 @@ fn build_client() -> Swarm<Client> {
struct Client {
relay: relay::client::Behaviour,
dcutr: dcutr::Behaviour,
identify: identify::Behaviour,
}

async fn wait_for_reservation(
Expand All @@ -163,14 +192,16 @@ async fn wait_for_reservation(
) {
let mut new_listen_addr_for_relayed_addr = false;
let mut reservation_req_accepted = false;
let mut addr_observed = false;

loop {
if new_listen_addr_for_relayed_addr && reservation_req_accepted && addr_observed {
break;
}

match client.next_swarm_event().await {
SwarmEvent::NewListenAddr { address, .. } if address != client_addr => {}
SwarmEvent::NewListenAddr { address, .. } if address == client_addr => {
new_listen_addr_for_relayed_addr = true;
if reservation_req_accepted {
break;
}
}
SwarmEvent::Behaviour(ClientEvent::Relay(
relay::client::Event::ReservationReqAccepted {
Expand All @@ -180,15 +211,16 @@ async fn wait_for_reservation(
},
)) if relay_peer_id == peer_id && renewal == is_renewal => {
reservation_req_accepted = true;
if new_listen_addr_for_relayed_addr {
break;
}
}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::Behaviour(ClientEvent::Identify(identify::Event::Received { .. })) => {
addr_observed = true;
}
SwarmEvent::Behaviour(ClientEvent::Identify(_)) => {}
e => panic!("{e:?}"),
}
}
Expand Down