From 01e8810c1e79e6155f4c969ac6cd714e8fe7621c Mon Sep 17 00:00:00 2001 From: binarybaron Date: Wed, 23 Oct 2024 17:09:15 +0200 Subject: [PATCH] fix(libp2p): Use Instant instead of KeepAlive in swap/src/network/swap_setup/alice.rs See: https://github.com/libp2p/rust-libp2p/issues/3844 --- swap/src/network/swap_setup/alice.rs | 29 ++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/swap/src/network/swap_setup/alice.rs b/swap/src/network/swap_setup/alice.rs index b3fce7e8e..0eae9eb38 100644 --- a/swap/src/network/swap_setup/alice.rs +++ b/swap/src/network/swap_setup/alice.rs @@ -21,8 +21,9 @@ use libp2p::{Multiaddr, PeerId}; use std::collections::VecDeque; use std::fmt::Debug; use std::task::Poll; -use std::time::Duration; +use std::time::{Instant, Duration}; use uuid::Uuid; +use futures::AsyncWriteExt; #[derive(Debug)] #[allow(clippy::large_enum_variant)] @@ -219,8 +220,12 @@ pub struct Handler { latest_rate: LR, resume_only: bool, - timeout: Duration, - keep_alive: KeepAlive, + // This is the timeout for the negotiation phase where Alice and Bob exchange messages + negotiation_timeout: Duration, + + // If set to None, we will keep the connection alive indefinitely + // If set to Some, we will keep the connection alive until the given instant + keep_alive_until: Option, } impl Handler { @@ -239,8 +244,8 @@ impl Handler { env_config, latest_rate, resume_only, - timeout: Duration::from_secs(120), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)), + negotiation_timeout: Duration::from_secs(120), + keep_alive_until: Some(Instant::now() + Duration::from_secs(30)), } } } @@ -268,8 +273,10 @@ where } fn connection_keep_alive(&self) -> bool { - // self.keep_alive - false + match self.keep_alive_until { + None => true, + Some(keep_alive_until) => Instant::now() < keep_alive_until, + } } #[allow(clippy::type_complexity)] @@ -313,7 +320,9 @@ where ) { match event { ConnectionEvent::FullyNegotiatedInbound(substream) => { - self.keep_alive = KeepAlive::Yes; + self.keep_alive_until = None; + + let mut substream = substream.protocol; let (sender, receiver) = bmrng::channel_with_timeout::( 1, @@ -325,7 +334,7 @@ where let latest_rate = self.latest_rate.latest_rate(); let env_config = self.env_config; - let protocol = tokio::time::timeout(self.timeout, async move { + let protocol = tokio::time::timeout(self.negotiation_timeout, async move { let request = swap_setup::read_cbor_message::(&mut substream) .await .context("Failed to read spot price request")?; @@ -449,7 +458,7 @@ where Ok((swap_id, state3)) }); - let max_seconds = self.timeout.as_secs(); + let max_seconds = self.negotiation_timeout.as_secs(); self.inbound_stream = OptionFuture::from(Some( async move { protocol.await.with_context(|| {