Skip to content

Commit

Permalink
refactor(relay): keep connection alive while we are using it
Browse files Browse the repository at this point in the history
Previously, the relay client was applying a 10 second timeout to idle connections. Instead, we now compute the connection keep alive based on whether we are still using the connection.

This makes all tests pass apart from 1: `reuse_connection`. This makes sense as that connection is immediately idle once dialed. To make that test pass, we configure a global idle connection timeout of 1 second.

In a real-world scenario, reusing a connection only applies if the connection is still alive due to other protocols being active.

Related: #3844.

Pull-Request: #4696.
  • Loading branch information
thomaseizinger authored Oct 20, 2023
1 parent a044b88 commit b709a40
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
53 changes: 25 additions & 28 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use futures::stream::{FuturesUnordered, StreamExt};
use futures::TryFutureExt;
use futures_bounded::{PushError, Timeout};
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_core::Multiaddr;
Expand Down Expand Up @@ -122,8 +121,6 @@ pub struct Handler {
Either<inbound_stop::FatalUpgradeError, outbound_hop::FatalUpgradeError>,
>,
>,
/// Until when to keep the connection alive.
keep_alive: KeepAlive,

/// Queue of events to return when polled.
queued_events: VecDeque<
Expand Down Expand Up @@ -152,10 +149,6 @@ pub struct Handler {
///
/// Contains a [`futures::future::Future`] for each lend out substream that
/// resolves once the substream is dropped.
///
/// Once all substreams are dropped and this handler has no other work,
/// [`KeepAlive::Until`] can be set, allowing the connection to be closed
/// eventually.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<void::Void>>,

open_circuit_futs:
Expand Down Expand Up @@ -194,7 +187,6 @@ impl Handler {
MAX_NUMBER_DENYING_CIRCUIT,
),
send_error_futs: Default::default(),
keep_alive: KeepAlive::Yes,
}
}

Expand Down Expand Up @@ -328,7 +320,27 @@ impl ConnectionHandler for Handler {
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
if self.reservation.is_some() {
return KeepAlive::Yes;
}

if !self.alive_lend_out_substreams.is_empty() {
return KeepAlive::Yes;
}

if !self.circuit_deny_futs.is_empty() {
return KeepAlive::Yes;
}

if !self.open_circuit_futs.is_empty() {
return KeepAlive::Yes;
}

if !self.outbound_circuits.is_empty() {
return KeepAlive::Yes;
}

KeepAlive::No
}

fn poll(
Expand Down Expand Up @@ -488,25 +500,6 @@ impl ConnectionHandler for Handler {
}
}

// Update keep-alive handling.
#[allow(deprecated)]
{
if matches!(self.reservation, Reservation::None)
&& self.alive_lend_out_substreams.is_empty()
&& self.circuit_deny_futs.is_empty()
{
match self.keep_alive {
KeepAlive::Yes => {
self.keep_alive =
KeepAlive::Until(Instant::now() + Duration::from_secs(10));
}
KeepAlive::Until(_) => {}
KeepAlive::No => panic!("Handler never sets KeepAlive::No."),
}
} else {
self.keep_alive = KeepAlive::Yes;
}
}
Poll::Pending
}

Expand Down Expand Up @@ -639,6 +632,10 @@ impl Reservation {
Event::ReservationReqAccepted { renewal, limit }
}

fn is_some(&self) -> bool {
matches!(self, Self::Accepted { .. } | Self::Renewing { .. })
}

/// Marks the current reservation as failed.
///
/// Returns whether the reservation request was a renewal.
Expand Down
12 changes: 10 additions & 2 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ fn reuse_connection() {
.clone()
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit);
let mut client = build_client();

// To reuse the connection, we need to ensure it is not shut down due to being idle.
let mut client = build_client_with_config(
Config::with_async_std_executor().with_idle_connection_timeout(Duration::from_secs(1)),
);
let client_peer_id = *client.local_peer_id();

client.dial(relay_addr).unwrap();
Expand Down Expand Up @@ -328,6 +332,10 @@ fn build_relay() -> Swarm<Relay> {
}

fn build_client() -> Swarm<Client> {
build_client_with_config(Config::with_async_std_executor())
}

fn build_client_with_config(config: Config) -> Swarm<Client> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = local_key.public().to_peer_id();

Expand All @@ -344,7 +352,7 @@ fn build_client() -> Swarm<Client> {
relay: behaviour,
},
local_peer_id,
Config::with_async_std_executor(),
config,
)
}

Expand Down

0 comments on commit b709a40

Please sign in to comment.