Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): add ConnectionHandler::connection_keep_alive default implementation #4703

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e5aaa04
add no_keep_alive fn to Stream and counter to Connection
leonzchang Oct 5, 2023
55525cc
call fn no_keep_alive when ping protocol stream received
leonzchang Oct 6, 2023
340ecc9
apply suggested fixes
leonzchang Oct 6, 2023
55b2c6d
apply suggested fixes
leonzchang Oct 11, 2023
e07e274
update root Cargo.toml
leonzchang Oct 11, 2023
0fc1a2a
fix rustdoc intra-doc links
leonzchang Oct 11, 2023
f5a7cd4
fix tests
leonzchang Oct 11, 2023
b9f0039
compute the shutdown only when negotiated streams are finished
leonzchang Oct 12, 2023
346ba26
remove stream tracking code in protocols
leonzchang Oct 12, 2023
9dc3ff3
apply suggested fixes
leonzchang Oct 13, 2023
c1b5d98
correct crate version
leonzchang Oct 16, 2023
3006d84
add allow deprecated to compute_new_shutdown
leonzchang Oct 18, 2023
8b276e5
fix rustfmt
leonzchang Oct 18, 2023
7b29afe
update libp2p-noise version in root Cargo.toml
leonzchang Oct 18, 2023
9cc7d93
Merge branch 'master' into feat/stream-counter
thomaseizinger Oct 20, 2023
2ed8071
Fix compile error
thomaseizinger Oct 20, 2023
ad8104d
Always compute shutdown
thomaseizinger Oct 20, 2023
8f34ba8
Merge branch 'master' into feat/stream-counter
leonzchang Oct 20, 2023
5db7968
fix checking keep alive status condition in relay
leonzchang Oct 20, 2023
aeb0480
Merge branch 'master' into feat/stream-counter
leonzchang Oct 20, 2023
464b803
Merge branch 'master' into feat/stream-counter
leonzchang Oct 20, 2023
021da14
dispatch no_keep_alive on outbound streams in ping protocols
leonzchang Oct 20, 2023
9c049e6
Merge branch 'master' into feat/stream-counter
leonzchang Oct 20, 2023
f6531aa
remove protocols keep alive stream check
leonzchang Oct 20, 2023
b8dea1e
Merge branch 'master' into feat/stream-counter
leonzchang Oct 20, 2023
ae19cc2
fix idiomatic return
leonzchang Oct 20, 2023
1837ad6
Merge branch 'master' into feat/stream-counter
leonzchang Oct 22, 2023
77b840e
apply partial suggested fixes
leonzchang Oct 22, 2023
b9c37fc
add connection_keep_alive default impl
leonzchang Oct 22, 2023
8a22bc8
update changelog & docs
leonzchang Oct 22, 2023
7401ed2
remove the remaining KeepAlive::Until & related test
leonzchang Oct 22, 2023
20a6582
remove unuse import & KeepAlive::Until related tests
leonzchang Oct 22, 2023
c8481fe
Merge branch 'feat/stream-counter' into feat/ConnectionHandler-defaul…
leonzchang Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,6 @@ where
type ConnectionHandler = Handler;
type ToSwarm = Event;

#[allow(deprecated)]
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
Expand All @@ -3276,7 +3275,6 @@ where
Ok(Handler::new(self.config.protocol_config()))
}

#[allow(deprecated)]
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
Expand Down
1 change: 0 additions & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ impl ConnectionHandler for Handler {
return KeepAlive::Yes;
}

#[allow(deprecated)]
KeepAlive::No
}
Handler::Disabled(_) => KeepAlive::No,
Expand Down
4 changes: 0 additions & 4 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,6 @@ pub struct Handler {
///
/// Contains a [`futures::future::Future`] for each lend out substream that
/// resolves once the substream is dropped.
///
/// Once all substreams are dropped and this handler has no other work,
/// [`KeepAlive::Until`] can be set, allowing the connection to be closed
/// eventually.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<()>>,
/// Futures relaying data for circuit between two peers.
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
Expand Down
2 changes: 2 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
See [PR 4225](https://github.com/libp2p/rust-libp2p/pull/4225).
- Remove deprecated `keep_alive_timeout` in `OneShotHandlerConfig`.
See [PR 4677](https://github.com/libp2p/rust-libp2p/pull/4677).
- Add `ConnectionHandler::connection_keep_alive` default implementation that returns `KeepAlive::No`.
See [PR 4703](https://github.com/libp2p/rust-libp2p/pull/4703).

## 0.43.6

Expand Down
152 changes: 1 addition & 151 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use libp2p_core::upgrade;
use libp2p_core::upgrade::{NegotiationError, ProtocolError};
use libp2p_core::Endpoint;
use libp2p_identity::PeerId;
use std::cmp::max;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::future::Future;
Expand Down Expand Up @@ -450,44 +449,9 @@ fn compute_new_shutdown(
current_shutdown: &Shutdown,
idle_timeout: Duration,
) -> Option<Shutdown> {
#[allow(deprecated)]
match (current_shutdown, handler_keep_alive) {
(Shutdown::Later(_, deadline), KeepAlive::Until(t)) => {
let now = Instant::now();

if *deadline != t {
let deadline = t;
if let Some(new_duration) = deadline.checked_duration_since(Instant::now()) {
let effective_keep_alive = max(new_duration, idle_timeout);

let safe_keep_alive = checked_add_fraction(now, effective_keep_alive);
return Some(Shutdown::Later(Delay::new(safe_keep_alive), deadline));
}
}
None
}
(_, KeepAlive::Until(earliest_shutdown)) => {
let now = Instant::now();

if let Some(requested) = earliest_shutdown.checked_duration_since(now) {
let effective_keep_alive = max(requested, idle_timeout);

let safe_keep_alive = checked_add_fraction(now, effective_keep_alive);

// Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch.
// This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See <https://github.com/libp2p/rust-libp2p/issues/3844>/
return Some(Shutdown::Later(
Delay::new(safe_keep_alive),
earliest_shutdown,
));
}
None
}
(_, KeepAlive::No) if idle_timeout == Duration::ZERO => Some(Shutdown::Asap),
(Shutdown::Later(_, _), KeepAlive::No) => {
// Do nothing, i.e. let the shutdown timer continue to tick.
None
}
(Shutdown::Later(_, _), KeepAlive::No) => None, // Do nothing, i.e. let the shutdown timer continue to tick.
(_, KeepAlive::No) => {
let now = Instant::now();
let safe_keep_alive = checked_add_fraction(now, idle_timeout);
Expand Down Expand Up @@ -933,68 +897,6 @@ mod tests {
));
}

#[tokio::test]
async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() {
let idle_timeout = Duration::from_millis(100);

let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
KeepAliveUntilConnectionHandler {
until: Instant::now() + idle_timeout * 2,
},
None,
0,
idle_timeout,
);

assert!(connection.poll_noop_waker().is_pending());

tokio::time::sleep(idle_timeout).await;

assert!(
connection.poll_noop_waker().is_pending(),
"`KeepAlive::Until` is greater than idle-timeout, continue sleeping"
);

tokio::time::sleep(idle_timeout).await;

assert!(matches!(
connection.poll_noop_waker(),
Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
));
}

#[tokio::test]
async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() {
let idle_timeout = Duration::from_millis(100);

let mut connection = Connection::new(
StreamMuxerBox::new(PendingStreamMuxer),
KeepAliveUntilConnectionHandler {
until: Instant::now() + idle_timeout / 2,
},
None,
0,
idle_timeout,
);

assert!(connection.poll_noop_waker().is_pending());

tokio::time::sleep(idle_timeout / 2).await;

assert!(
connection.poll_noop_waker().is_pending(),
"`KeepAlive::Until` is less than idle-timeout, honor idle-timeout"
);

tokio::time::sleep(idle_timeout / 2).await;

assert!(matches!(
connection.poll_noop_waker(),
Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
));
}

#[test]
fn checked_add_fraction_can_add_u64_max() {
let _ = env_logger::try_init();
Expand Down Expand Up @@ -1058,58 +960,6 @@ mod tests {
QuickCheck::new().quickcheck(prop as fn(_, _, _));
}

struct KeepAliveUntilConnectionHandler {
until: Instant,
}

impl ConnectionHandler for KeepAliveUntilConnectionHandler {
type FromBehaviour = Void;
type ToBehaviour = Void;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type InboundOpenInfo = ();
type OutboundOpenInfo = Void;

fn listen_protocol(
&self,
) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn connection_keep_alive(&self) -> KeepAlive {
#[allow(deprecated)]
KeepAlive::Until(self.until)
}

fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
Poll::Pending
}

fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {}

fn on_connection_event(
&mut self,
_: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
}
}

struct DummyStreamMuxer {
counter: Arc<()>,
}
Expand Down
41 changes: 11 additions & 30 deletions swarm/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub use select::ConnectionHandlerSelect;

use crate::StreamProtocol;
use ::either::Either;
use instant::Instant;
use libp2p_core::Multiaddr;
use once_cell::sync::Lazy;
use smallvec::SmallVec;
Expand Down Expand Up @@ -125,25 +124,23 @@ pub trait ConnectionHandler: Send + 'static {

/// Returns until when the connection should be kept alive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to completely overhaul this together with #4595. What do you think of merging this PR into #4595? It is so closely related, I think it makes sense to just merge it as a single-patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, what do your think we also change KeepAlive to boolean in #4595

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, what do your think we also change KeepAlive to boolean in #4595

I think it makes sense. With only two variants, we don't really gain anything from having another type and I cannot see how somebody could misinterpret what true and false mean in this case.

///
/// This method is called by the `Swarm` after each invocation of
/// [`ConnectionHandler::poll`] to determine if the connection and the associated
/// [`ConnectionHandler`]s should be kept alive as far as this handler is concerned
/// and if so, for how long.
/// This method is an optional implementation and can be called by the `Swarm` after
/// each invocation of [`ConnectionHandler::poll`] to determine if the connection
/// and the associated [`ConnectionHandler`]s should be kept alive.
///
/// Returning [`KeepAlive::No`] indicates that the connection should be
/// closed and this handler destroyed immediately.
///
/// Returning [`KeepAlive::Until`] indicates that the connection may be closed
/// and this handler destroyed after the specified `Instant`.
///
/// Returning [`KeepAlive::Yes`] indicates that the connection should
/// be kept alive until the next call to this method.
///
/// > **Note**: The connection is always closed and the handler destroyed
/// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the
/// > connection may be closed for reasons outside of the control
/// > of the handler.
fn connection_keep_alive(&self) -> KeepAlive;
fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No
}

/// Should behave like `Stream::poll()`.
fn poll(
Expand Down Expand Up @@ -727,11 +724,6 @@ where
/// How long the connection should be kept alive.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum KeepAlive {
/// If nothing new happens, the connection should be closed at the given `Instant`.
#[deprecated(
note = "Use `swarm::Config::with_idle_connection_timeout` instead. See <https://github.com/libp2p/rust-libp2p/issues/3844> for details."
)]
Until(Instant),
/// Keep the connection alive.
Yes,
/// Close the connection as soon as possible.
Expand All @@ -751,35 +743,24 @@ impl PartialOrd for KeepAlive {
}
}

#[allow(deprecated)]
impl Ord for KeepAlive {
fn cmp(&self, other: &KeepAlive) -> Ordering {
use self::KeepAlive::*;

match (self, other) {
(No, No) | (Yes, Yes) => Ordering::Equal,
(No, _) | (_, Yes) => Ordering::Less,
(_, No) | (Yes, _) => Ordering::Greater,
(Until(t1), Until(t2)) => t1.cmp(t2),
(Yes, No) => Ordering::Less,
(No, Yes) => Ordering::Greater,
}
}
}

#[cfg(test)]
impl quickcheck::Arbitrary for KeepAlive {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
match quickcheck::GenRange::gen_range(g, 1u8..4) {
1 =>
{
#[allow(deprecated)]
KeepAlive::Until(
Instant::now()
.checked_add(Duration::arbitrary(g))
.unwrap_or(Instant::now()),
)
}
2 => KeepAlive::Yes,
3 => KeepAlive::No,
match quickcheck::GenRange::gen_range(g, 1u8..3) {
1 => KeepAlive::Yes,
2 => KeepAlive::No,
_ => unreachable!(),
}
}
Expand Down
2 changes: 0 additions & 2 deletions swarm/src/handler/one_shot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ pub struct OneShotHandlerConfig {
}

impl Default for OneShotHandlerConfig {
#[allow(deprecated)]
fn default() -> Self {
OneShotHandlerConfig {
outbound_substream_timeout: Duration::from_secs(10),
Expand All @@ -249,7 +248,6 @@ mod tests {
use void::Void;

#[test]
#[allow(deprecated)]
fn do_not_keep_idle_connection_alive() {
let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new(
SubstreamProtocol::new(DeniedUpgrade {}, ()),
Expand Down