From bf02c60e2f378e8f140934f4f63529030592abb8 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Mon, 18 Mar 2024 11:32:49 +0530 Subject: [PATCH 1/6] Add reconnect settings to async builder The builder can now be configured with the following properties to control automatic reconnections on network errors or server disconnects: - reconnect - reconnect_on_disconnect - reconnect_delay (min and max) - max_reconnect_attempts --- socketio/src/asynchronous/client/builder.rs | 40 +++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index 99dd86df..2e4fec25 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -29,6 +29,12 @@ pub struct ClientBuilder { opening_headers: Option, transport_type: TransportType, auth: Option, + pub(crate) reconnect: bool, + pub(crate) reconnect_on_disconnect: bool, + // None implies infinite attempts + pub(crate) max_reconnect_attempts: Option, + pub(crate) reconnect_delay_min: u64, + pub(crate) reconnect_delay_max: u64, } impl ClientBuilder { @@ -81,6 +87,12 @@ impl ClientBuilder { opening_headers: None, transport_type: TransportType::Any, auth: None, + reconnect: true, + reconnect_on_disconnect: false, + // None implies infinite attempts + max_reconnect_attempts: None, + reconnect_delay_min: 1000, + reconnect_delay_max: 5000, } } @@ -315,6 +327,34 @@ impl ClientBuilder { self } + /// If set to `false` do not try to reconnect on network errors. Defaults to + /// `true` + pub fn reconnect(mut self, reconnect: bool) -> Self { + self.reconnect = reconnect; + self + } + + /// If set to `true` try to reconnect when the server disconnects the + /// client. Defaults to `false` + pub fn reconnect_on_disconnect(mut self, reconnect_on_disconnect: bool) -> Self { + self.reconnect_on_disconnect = reconnect_on_disconnect; + self + } + + /// Sets the minimum and maximum delay between reconnection attempts + pub fn reconnect_delay(mut self, min: u64, max: u64) -> Self { + self.reconnect_delay_min = min; + self.reconnect_delay_max = max; + self + } + + /// Sets the maximum number of times to attempt reconnections. Defaults to + /// an infinite number of attempts + pub fn max_reconnect_attempts(mut self, reconnect_attempts: u8) -> Self { + self.max_reconnect_attempts = Some(reconnect_attempts); + self + } + /// Connects the socket to a certain endpoint. This returns a connected /// [`Client`] instance. This method returns an [`std::result::Result::Err`] /// value if something goes wrong during connection. Also starts a separate From 795cdbd97448932c18e4ea571417bf643338c1e0 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Mon, 18 Mar 2024 16:57:45 +0530 Subject: [PATCH 2/6] feat(async):Async reconnections The async client can now be configured to automatically reconnect on network errors or server disconnections --- socketio/src/asynchronous/client/builder.rs | 54 ++--- socketio/src/asynchronous/client/client.rs | 247 +++++++++++++++++--- 2 files changed, 231 insertions(+), 70 deletions(-) diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index 2e4fec25..496f5a4d 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -1,4 +1,4 @@ -use futures_util::{future::BoxFuture, StreamExt}; +use futures_util::future::BoxFuture; use log::trace; use native_tls::TlsConnector; use rust_engineio::{ @@ -8,7 +8,7 @@ use rust_engineio::{ use std::collections::HashMap; use url::Url; -use crate::{error::Result, Error, Event, Payload, TransportType}; +use crate::{error::Result, Event, Payload, TransportType}; use super::{ callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback}, @@ -22,13 +22,13 @@ use crate::asynchronous::socket::Socket as InnerSocket; /// acts the `build` method and returns a connected [`Client`]. pub struct ClientBuilder { address: String, - on: HashMap>, - on_any: Option>, - namespace: String, + pub(crate) on: HashMap>, + pub(crate) on_any: Option>, + pub(crate) namespace: String, tls_config: Option, opening_headers: Option, transport_type: TransportType, - auth: Option, + pub(crate) auth: Option, pub(crate) reconnect: bool, pub(crate) reconnect_on_disconnect: bool, // None implies infinite attempts @@ -383,26 +383,14 @@ impl ClientBuilder { /// } /// ``` pub async fn connect(self) -> Result { - let socket = self.connect_manual().await?; - let socket_clone = socket.clone(); - - // Use thread to consume items in iterator in order to call callbacks - tokio::runtime::Handle::current().spawn(async move { - let mut stream = socket_clone.as_stream(); - // Consume the stream until it returns None and the stream is closed. - while let Some(item) = stream.next().await { - if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item { - trace!("Network error occurred: {}", e.unwrap_err()); - } - } - }); + let mut socket = self.connect_manual().await?; + socket.poll_stream().await?; Ok(socket) } - //TODO: 0.3.X stabilize - pub(crate) async fn connect_manual(self) -> Result { - // Parse url here rather than in new to keep new returning Self. + /// Creates a new Socket that can be used for reconnections + pub(crate) async fn inner_create(&self) -> Result { let mut url = Url::parse(&self.address)?; if url.path() == "/" { @@ -411,11 +399,11 @@ impl ClientBuilder { let mut builder = EngineIoClientBuilder::new(url); - if let Some(tls_config) = self.tls_config { - builder = builder.tls_config(tls_config); + if let Some(tls_config) = &self.tls_config { + builder = builder.tls_config(tls_config.to_owned()); } - if let Some(headers) = self.opening_headers { - builder = builder.headers(headers); + if let Some(headers) = &self.opening_headers { + builder = builder.headers(headers.to_owned()); } let engine_client = match self.transport_type { @@ -426,14 +414,14 @@ impl ClientBuilder { }; let inner_socket = InnerSocket::new(engine_client)?; + Ok(inner_socket) + } + + //TODO: 0.3.X stabilize + pub(crate) async fn connect_manual(self) -> Result { + let inner_socket = self.inner_create().await?; - let socket = Client::new( - inner_socket, - &self.namespace, - self.on, - self.on_any, - self.auth, - )?; + let socket = Client::new(inner_socket, self)?; socket.connect().await?; Ok(socket) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index dc72bcaa..2043f97f 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -1,17 +1,26 @@ -use std::{collections::HashMap, ops::DerefMut, pin::Pin, sync::Arc}; +use std::{ + ops::DerefMut, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use futures_util::{future::BoxFuture, stream, Stream, StreamExt}; use log::trace; use rand::{thread_rng, Rng}; use serde_json::Value; use tokio::{ sync::RwLock, - time::{Duration, Instant}, + time::{sleep, Duration, Instant}, }; use super::{ ack::Ack, - callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback}, + builder::ClientBuilder, + callback::{Callback, DynAsyncCallback}, }; use crate::{ asynchronous::socket::Socket as InnerSocket, @@ -26,14 +35,15 @@ use crate::{ #[derive(Clone)] pub struct Client { /// The inner socket client to delegate the methods to. - socket: InnerSocket, - on: Arc>>>, - on_any: Arc>>>, + socket: Arc>, outstanding_acks: Arc>>, // namespace, for multiplexing messages nsp: String, // Data send in the opening packet (commonly used as for auth) auth: Option, + builder: Arc>, + manually_disconnected: Arc, + server_disconnected: Arc, } impl Client { @@ -41,20 +51,15 @@ impl Client { /// namespace. If `None` is passed in as namespace, the default namespace /// `"/"` is taken. /// ``` - pub(crate) fn new>( - socket: InnerSocket, - namespace: T, - on: HashMap>, - on_any: Option>, - auth: Option, - ) -> Result { + pub(crate) fn new(socket: InnerSocket, builder: ClientBuilder) -> Result { Ok(Client { - socket, - nsp: namespace.into(), - on: Arc::new(RwLock::new(on)), - on_any: Arc::new(RwLock::new(on_any)), + socket: Arc::new(RwLock::new(socket)), + nsp: builder.namespace.to_owned(), outstanding_acks: Arc::new(RwLock::new(Vec::new())), - auth, + auth: builder.auth.clone(), + builder: Arc::new(RwLock::new(builder)), + manually_disconnected: Arc::new(AtomicBool::new(false)), + server_disconnected: Arc::new(AtomicBool::new(false)), }) } @@ -62,13 +67,89 @@ impl Client { /// called to interact with the server. pub(crate) async fn connect(&self) -> Result<()> { // Connect the underlying socket - self.socket.connect().await?; + self.socket.read().await.connect().await?; // construct the opening packet let auth = self.auth.as_ref().map(|data| data.to_string()); let open_packet = Packet::new(PacketId::Connect, self.nsp.clone(), auth, None, 0, None); - self.socket.send(open_packet).await?; + self.socket.read().await.send(open_packet).await?; + + Ok(()) + } + + pub(crate) async fn reconnect(&mut self) -> Result<()> { + let builder = self.builder.write().await; + let socket = builder.inner_create().await?; + + // New inner socket that can be connected + let mut client_socket = self.socket.write().await; + *client_socket = socket; + drop(client_socket); + + self.connect().await?; + + Ok(()) + } + + /// Drives the stream using a thread so messages are processed + pub(crate) async fn poll_stream(&mut self) -> Result<()> { + let builder = self.builder.read().await; + let reconnect_delay_min = builder.reconnect_delay_min; + let reconnect_delay_max = builder.reconnect_delay_max; + let max_reconnect_attempts = builder.max_reconnect_attempts; + drop(builder); + + let mut client_clone = self.clone(); + + tokio::runtime::Handle::current().spawn(async move { + loop { + let mut stream = client_clone.as_stream(); + // Consume the stream until it returns None and the stream is closed. + while let Some(item) = stream.next().await { + if let Err(e) = item { + trace!("Network error occurred: {}", e); + } + } + + // Drop the stream so we can once again use `socket_clone` as mutable + drop(stream); + + if client_clone.should_reconnect().await { + let mut reconnect_attempts = 0; + let mut backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(reconnect_delay_min)) + .with_max_interval(Duration::from_millis(reconnect_delay_max)) + .build(); + + loop { + if let Some(max_reconnect_attempts) = max_reconnect_attempts { + reconnect_attempts += 1; + if reconnect_attempts > max_reconnect_attempts { + trace!("Max reconnect attempts reached without success"); + break; + } + } + match client_clone.reconnect().await { + Ok(_) => { + trace!("Reconnected after {reconnect_attempts} attempts"); + break; + } + Err(e) => { + trace!("Failed to reconnect: {e:?}"); + if let Some(delay) = backoff.next_backoff() { + let delay_ms = delay.as_millis(); + trace!("Waiting for {delay_ms}ms before reconnecting"); + sleep(delay).await; + } + } + } + } + } else { + break; + } + } + }); Ok(()) } @@ -111,7 +192,11 @@ impl Client { E: Into, D: Into, { - self.socket.emit(&self.nsp, event.into(), data.into()).await + self.socket + .read() + .await + .emit(&self.nsp, event.into(), data.into()) + .await } /// Disconnects this client from the server by sending a `socket.io` closing @@ -148,11 +233,13 @@ impl Client { /// } /// ``` pub async fn disconnect(&self) -> Result<()> { + self.manually_disconnected.store(true, Ordering::Release); + let disconnect_packet = Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None); - self.socket.send(disconnect_packet).await?; - self.socket.disconnect().await?; + self.socket.read().await.send(disconnect_packet).await?; + self.socket.read().await.disconnect().await?; Ok(()) } @@ -195,7 +282,7 @@ impl Client { /// Payload::String(str) => println!("{}", str), /// } /// }.boxed() - /// }; + /// }; /// /// /// let payload = json!({"token": 123}); @@ -234,33 +321,27 @@ impl Client { // add the ack to the tuple of outstanding acks self.outstanding_acks.write().await.push(ack); - self.socket.send(socket_packet).await + self.socket.read().await.send(socket_packet).await } async fn callback>(&self, event: &Event, payload: P) -> Result<()> { - let mut on = self.on.write().await; - let mut on_any = self.on_any.write().await; - - let on_lock = on.deref_mut(); - let on_any_lock = on_any.deref_mut(); + let mut builder = self.builder.write().await; let payload = payload.into(); - if let Some(callback) = on_lock.get_mut(event) { + if let Some(callback) = builder.on.get_mut(event) { callback(payload.clone(), self.clone()).await; } // Call on_any for all common and custom events. match event { Event::Message | Event::Custom(_) => { - if let Some(callback) = on_any_lock { + if let Some(callback) = builder.on_any.as_mut() { callback(event.clone(), payload, self.clone()).await; } } _ => (), } - drop(on); - drop(on_any); Ok(()) } @@ -375,9 +456,11 @@ impl Client { } } PacketId::Connect => { + self.server_disconnected.store(false, Ordering::Release); self.callback(&Event::Connect, "").await?; } PacketId::Disconnect => { + self.server_disconnected.store(true, Ordering::Release); self.callback(&Event::Close, "").await?; } PacketId::ConnectError => { @@ -401,13 +484,31 @@ impl Client { Ok(()) } + /// Indicates whether the client should try to reconnect + pub(crate) async fn should_reconnect(&self) -> bool { + let manually_disconnected = self.manually_disconnected.load(Ordering::Acquire); + let server_disconnected = self.server_disconnected.load(Ordering::Acquire); + + if server_disconnected { + self.builder.read().await.reconnect_on_disconnect + } else { + !manually_disconnected + } + } + /// Returns the packet stream for the client. pub(crate) fn as_stream<'a>( &'a self, ) -> Pin> + Send + 'a>> { - stream::unfold(self.socket.clone(), |mut socket| async { + let socket_clone = self.socket.clone(); + + stream::unfold(socket_clone, |socket| async { + let mut socket_read = { + let s = socket.read().await; + (*s).clone() + }; // wait for the next payload - let packet: Option> = socket.next().await; + let packet: Option> = socket_read.next().await; match packet { // end the stream if the underlying one is closed None => None, @@ -431,7 +532,13 @@ impl Client { #[cfg(test)] mod test { - use std::{sync::Arc, time::Duration}; + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, + }; use bytes::Bytes; use futures_util::{FutureExt, StreamExt}; @@ -590,6 +697,68 @@ mod test { Ok(()) } + #[tokio::test] + async fn socket_io_reconnect_integration() -> Result<()> { + static CONNECT_NUM: AtomicUsize = AtomicUsize::new(0); + static MESSAGE_NUM: AtomicUsize = AtomicUsize::new(0); + + let url = crate::test::socket_io_restart_server(); + + let socket = ClientBuilder::new(url) + .reconnect(true) + .max_reconnect_attempts(100) + .reconnect_delay(100, 100) + .on("open", |_, socket| { + async move { + CONNECT_NUM.fetch_add(1, Ordering::Release); + let r = socket.emit_with_ack( + "message", + json!(""), + Duration::from_millis(100), + |_, _| async move {}.boxed(), + ); + assert!(r.await.is_ok(), "should emit message success"); + } + .boxed() + }) + .on("message", |_, _socket| { + async move { + // test the iterator implementation and make sure there is a constant + // stream of packets, even when reconnecting + MESSAGE_NUM.fetch_add(1, Ordering::Release); + } + .boxed() + }) + .connect() + .await; + + assert!(socket.is_ok(), "should connect success"); + let socket = socket.unwrap(); + + // waiting for server to emit message + sleep(Duration::from_millis(500)).await; + + assert_eq!(load(&CONNECT_NUM), 1, "should connect once"); + assert_eq!(load(&MESSAGE_NUM), 1, "should receive one"); + + let r = socket.emit("restart_server", json!("")).await; + assert!(r.is_ok(), "should emit restart success"); + + // waiting for server to restart + for _ in 0..10 { + sleep(Duration::from_millis(400)).await; + if load(&CONNECT_NUM) == 2 && load(&MESSAGE_NUM) == 2 { + break; + } + } + + assert_eq!(load(&CONNECT_NUM), 2, "should connect twice"); + assert_eq!(load(&MESSAGE_NUM), 2, "should receive two messages"); + + socket.disconnect().await?; + Ok(()) + } + #[tokio::test] async fn socket_io_builder_integration_iterator() -> Result<()> { let url = crate::test::socket_io_server(); @@ -894,4 +1063,8 @@ mod test { Ok(()) } + + fn load(num: &AtomicUsize) -> usize { + num.load(Ordering::Acquire) + } } From 210fe951fd4f41d8058e17889d2327c354f12513 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Thu, 21 Mar 2024 11:17:31 +0530 Subject: [PATCH 3/6] Introduce `DisconnectReason` enum The enum replaces the need for multiple `AtomicBool`'s to maintain the disconnection reason. This makes the code easier to read and more ergonomic to maintain the state. --- socketio/src/asynchronous/client/client.rs | 58 ++++++++++++---------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index 2043f97f..525ab39a 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -29,6 +29,17 @@ use crate::{ Event, Payload, }; +#[derive(Default)] +enum DisconnectReason { + /// There is no known reason for the disconnect; likely a network error + #[default] + Unknown, + /// The user disconnected manually + Manual, + /// The server disconnected + Server, +} + /// A socket which handles communication with the server. It's initialized with /// a specific address as well as an optional namespace to connect to. If `None` /// is given the client will connect to the default namespace `"/"`. @@ -42,8 +53,7 @@ pub struct Client { // Data send in the opening packet (commonly used as for auth) auth: Option, builder: Arc>, - manually_disconnected: Arc, - server_disconnected: Arc, + disconnect_reason: Arc>, } impl Client { @@ -58,8 +68,7 @@ impl Client { outstanding_acks: Arc::new(RwLock::new(Vec::new())), auth: builder.auth.clone(), builder: Arc::new(RwLock::new(builder)), - manually_disconnected: Arc::new(AtomicBool::new(false)), - server_disconnected: Arc::new(AtomicBool::new(false)), + disconnect_reason: Arc::new(RwLock::new(DisconnectReason::default())), }) } @@ -85,6 +94,9 @@ impl Client { // New inner socket that can be connected let mut client_socket = self.socket.write().await; *client_socket = socket; + + // Now that we have replaced `self.socket`, we drop the write lock + // because the `connect` method we call below will need to use it drop(client_socket); self.connect().await?; @@ -98,6 +110,8 @@ impl Client { let reconnect_delay_min = builder.reconnect_delay_min; let reconnect_delay_max = builder.reconnect_delay_max; let max_reconnect_attempts = builder.max_reconnect_attempts; + let reconnect = builder.reconnect; + let reconnect_on_disconnect = builder.reconnect_on_disconnect; drop(builder); let mut client_clone = self.clone(); @@ -115,7 +129,13 @@ impl Client { // Drop the stream so we can once again use `socket_clone` as mutable drop(stream); - if client_clone.should_reconnect().await { + let should_reconnect = match *(client_clone.disconnect_reason.read().await) { + DisconnectReason::Unknown => reconnect, + DisconnectReason::Manual => false, + DisconnectReason::Server => reconnect_on_disconnect, + }; + + if should_reconnect { let mut reconnect_attempts = 0; let mut backoff = ExponentialBackoffBuilder::new() .with_initial_interval(Duration::from_millis(reconnect_delay_min)) @@ -233,7 +253,7 @@ impl Client { /// } /// ``` pub async fn disconnect(&self) -> Result<()> { - self.manually_disconnected.store(true, Ordering::Release); + *(self.disconnect_reason.write().await) = DisconnectReason::Manual; let disconnect_packet = Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None); @@ -456,11 +476,11 @@ impl Client { } } PacketId::Connect => { - self.server_disconnected.store(false, Ordering::Release); + *(self.disconnect_reason.write().await) = DisconnectReason::default(); self.callback(&Event::Connect, "").await?; } PacketId::Disconnect => { - self.server_disconnected.store(true, Ordering::Release); + *(self.disconnect_reason.write().await) = DisconnectReason::Server; self.callback(&Event::Close, "").await?; } PacketId::ConnectError => { @@ -484,31 +504,15 @@ impl Client { Ok(()) } - /// Indicates whether the client should try to reconnect - pub(crate) async fn should_reconnect(&self) -> bool { - let manually_disconnected = self.manually_disconnected.load(Ordering::Acquire); - let server_disconnected = self.server_disconnected.load(Ordering::Acquire); - - if server_disconnected { - self.builder.read().await.reconnect_on_disconnect - } else { - !manually_disconnected - } - } - /// Returns the packet stream for the client. pub(crate) fn as_stream<'a>( &'a self, ) -> Pin> + Send + 'a>> { - let socket_clone = self.socket.clone(); + let socket_clone = (*self.socket.blocking_read()).clone(); - stream::unfold(socket_clone, |socket| async { - let mut socket_read = { - let s = socket.read().await; - (*s).clone() - }; + stream::unfold(socket_clone, |mut socket| async { // wait for the next payload - let packet: Option> = socket_read.next().await; + let packet: Option> = socket.next().await; match packet { // end the stream if the underlying one is closed None => None, From ac1ad928739519075609d4637f88c9bad8007739 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Fri, 22 Mar 2024 14:01:42 +0530 Subject: [PATCH 4/6] Make `as_stream` async and remove blocking read --- socketio/src/asynchronous/client/client.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index 525ab39a..d89f1692 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -1,11 +1,4 @@ -use std::{ - ops::DerefMut, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; +use std::{ops::DerefMut, pin::Pin, sync::Arc}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use futures_util::{future::BoxFuture, stream, Stream, StreamExt}; @@ -118,7 +111,7 @@ impl Client { tokio::runtime::Handle::current().spawn(async move { loop { - let mut stream = client_clone.as_stream(); + let mut stream = client_clone.as_stream().await; // Consume the stream until it returns None and the stream is closed. while let Some(item) = stream.next().await { if let Err(e) = item { @@ -505,10 +498,10 @@ impl Client { } /// Returns the packet stream for the client. - pub(crate) fn as_stream<'a>( + pub(crate) async fn as_stream<'a>( &'a self, ) -> Pin> + Send + 'a>> { - let socket_clone = (*self.socket.blocking_read()).clone(); + let socket_clone = (*self.socket.read().await).clone(); stream::unfold(socket_clone, |mut socket| async { // wait for the next payload @@ -854,7 +847,7 @@ mod test { .await?; // open packet - let mut socket_stream = socket.as_stream(); + let mut socket_stream = socket.as_stream().await; let _ = socket_stream.next().await.unwrap()?; println!("Here12"); @@ -916,7 +909,7 @@ mod test { async fn test_socketio_socket(socket: Client, nsp: String) -> Result<()> { // open packet - let mut socket_stream = socket.as_stream(); + let mut socket_stream = socket.as_stream().await; let _: Option = Some(socket_stream.next().await.unwrap()?); let packet: Option = Some(socket_stream.next().await.unwrap()?); From 1db90dbb5b56b3ccb8cc5238191d4a525845642d Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Fri, 22 Mar 2024 15:21:25 +0530 Subject: [PATCH 5/6] Remove unnecessary println in test --- socketio/src/asynchronous/client/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index d89f1692..1f62c77b 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -850,7 +850,6 @@ mod test { let mut socket_stream = socket.as_stream().await; let _ = socket_stream.next().await.unwrap()?; - println!("Here12"); let packet = socket_stream.next().await.unwrap()?; assert_eq!( packet, From e2eea62f3f8723bcfbbf90c55ebb12487cb5c472 Mon Sep 17 00:00:00 2001 From: Ragesh Krishna Date: Fri, 22 Mar 2024 16:57:26 +0530 Subject: [PATCH 6/6] Serialize the reconnect tests Tests that rely on the reconnect socket server cannot run in parallel because they ask the server to disconnect and restart as part of the test. This can cause other tests running in parallel to fail in mysterious ways. By using the `serial_test` module and applying it selectively to the affected tests, we can ensure that they are never executed concurrently. --- Cargo.lock | 103 +++++++++++++++++++-- socketio/Cargo.toml | 1 + socketio/src/asynchronous/client/client.rs | 2 + socketio/src/client/client.rs | 2 + 4 files changed, 102 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6faf0f52..0fd7636f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -577,6 +577,17 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "dashmap" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" +dependencies = [ + "cfg-if", + "num_cpus", + "parking_lot", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -709,6 +720,7 @@ checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -717,9 +729,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.17" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -731,6 +743,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -766,6 +789,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1182,6 +1206,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +[[package]] +name = "lock_api" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.20" @@ -1315,11 +1348,11 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.13.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.1.19", + "hermit-abi 0.3.1", "libc", ] @@ -1397,6 +1430,29 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.4.1", + "smallvec", + "windows-targets", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -1561,6 +1617,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.7.3" @@ -1668,6 +1733,7 @@ dependencies = [ "rust_engineio", "serde", "serde_json", + "serial_test", "thiserror", "tokio", "url", @@ -1827,6 +1893,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ad9342b3aaca7cb43c45c097dd008d4907070394bd0751a0aa8817e5a018d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1943,7 +2034,7 @@ dependencies = [ "autocfg", "cfg-if", "fastrand", - "redox_syscall", + "redox_syscall 0.3.5", "rustix 0.37.25", "windows-sys", ] diff --git a/socketio/Cargo.toml b/socketio/Cargo.toml index aef63cf1..3620faf1 100644 --- a/socketio/Cargo.toml +++ b/socketio/Cargo.toml @@ -29,6 +29,7 @@ serde = "1.0.193" [dev-dependencies] cargo-tarpaulin = "0.18.5" +serial_test = "3.0.0" [dev-dependencies.tokio] version = "1.36.0" diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index d89f1692..c7d28f9f 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -541,6 +541,7 @@ mod test { use futures_util::{FutureExt, StreamExt}; use native_tls::TlsConnector; use serde_json::json; + use serial_test::serial; use tokio::{ sync::mpsc, time::{sleep, timeout}, @@ -695,6 +696,7 @@ mod test { } #[tokio::test] + #[serial(reconnect)] async fn socket_io_reconnect_integration() -> Result<()> { static CONNECT_NUM: AtomicUsize = AtomicUsize::new(0); static MESSAGE_NUM: AtomicUsize = AtomicUsize::new(0); diff --git a/socketio/src/client/client.rs b/socketio/src/client/client.rs index 695e4684..fe924307 100644 --- a/socketio/src/client/client.rs +++ b/socketio/src/client/client.rs @@ -278,10 +278,12 @@ mod test { use crate::error::Result; use crate::ClientBuilder; use serde_json::json; + use serial_test::serial; use std::time::{Duration, SystemTime}; use url::Url; #[test] + #[serial(reconnect)] fn socket_io_reconnect_integration() -> Result<()> { static CONNECT_NUM: AtomicUsize = AtomicUsize::new(0); static CLOSE_NUM: AtomicUsize = AtomicUsize::new(0);