Skip to content

Commit

Permalink
Async reconnections (#400)
Browse files Browse the repository at this point in the history
* Add reconnect settings to async builder

The builder can now be configured with the following properties to
control automatic reconnections on network errors or server disconnects:

- reconnect
- reconnect_on_disconnect
- reconnect_delay (min and max)
- max_reconnect_attempts

* Introduce `DisconnectReason` enum

The enum replaces the need for multiple `AtomicBool`'s to maintain the
disconnection reason. This makes the code easier to read and more
ergonomic to maintain the state.

* Serialize the reconnect tests

Tests that rely on the reconnect socket server cannot run in parallel
because they ask the server to disconnect and restart as part of the
test. This can cause other tests running in parallel to fail in
mysterious ways.

By using the `serial_test` module and applying it selectively to the
affected tests, we can ensure that they are never executed concurrently.
  • Loading branch information
rageshkrishna authored Mar 22, 2024
1 parent 4324d7b commit 5ae3d4d
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 79 deletions.
103 changes: 97 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions socketio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = "1.0.193"

[dev-dependencies]
cargo-tarpaulin = "0.18.5"
serial_test = "3.0.0"

[dev-dependencies.tokio]
version = "1.36.0"
Expand Down
94 changes: 61 additions & 33 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::{future::BoxFuture, StreamExt};
use futures_util::future::BoxFuture;
use log::trace;
use native_tls::TlsConnector;
use rust_engineio::{
Expand All @@ -8,7 +8,7 @@ use rust_engineio::{
use std::collections::HashMap;
use url::Url;

use crate::{error::Result, Error, Event, Payload, TransportType};
use crate::{error::Result, Event, Payload, TransportType};

use super::{
callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback},
Expand All @@ -22,13 +22,19 @@ use crate::asynchronous::socket::Socket as InnerSocket;
/// acts the `build` method and returns a connected [`Client`].
pub struct ClientBuilder {
address: String,
on: HashMap<Event, Callback<DynAsyncCallback>>,
on_any: Option<Callback<DynAsyncAnyCallback>>,
namespace: String,
pub(crate) on: HashMap<Event, Callback<DynAsyncCallback>>,
pub(crate) on_any: Option<Callback<DynAsyncAnyCallback>>,
pub(crate) namespace: String,
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
auth: Option<serde_json::Value>,
pub(crate) auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
// None implies infinite attempts
pub(crate) max_reconnect_attempts: Option<u8>,
pub(crate) reconnect_delay_min: u64,
pub(crate) reconnect_delay_max: u64,
}

impl ClientBuilder {
Expand Down Expand Up @@ -81,6 +87,12 @@ impl ClientBuilder {
opening_headers: None,
transport_type: TransportType::Any,
auth: None,
reconnect: true,
reconnect_on_disconnect: false,
// None implies infinite attempts
max_reconnect_attempts: None,
reconnect_delay_min: 1000,
reconnect_delay_max: 5000,
}
}

Expand Down Expand Up @@ -315,6 +327,34 @@ impl ClientBuilder {
self
}

/// If set to `false` do not try to reconnect on network errors. Defaults to
/// `true`
pub fn reconnect(mut self, reconnect: bool) -> Self {
self.reconnect = reconnect;
self
}

/// If set to `true` try to reconnect when the server disconnects the
/// client. Defaults to `false`
pub fn reconnect_on_disconnect(mut self, reconnect_on_disconnect: bool) -> Self {
self.reconnect_on_disconnect = reconnect_on_disconnect;
self
}

/// Sets the minimum and maximum delay between reconnection attempts
pub fn reconnect_delay(mut self, min: u64, max: u64) -> Self {
self.reconnect_delay_min = min;
self.reconnect_delay_max = max;
self
}

/// Sets the maximum number of times to attempt reconnections. Defaults to
/// an infinite number of attempts
pub fn max_reconnect_attempts(mut self, reconnect_attempts: u8) -> Self {
self.max_reconnect_attempts = Some(reconnect_attempts);
self
}

/// Connects the socket to a certain endpoint. This returns a connected
/// [`Client`] instance. This method returns an [`std::result::Result::Err`]
/// value if something goes wrong during connection. Also starts a separate
Expand Down Expand Up @@ -343,26 +383,14 @@ impl ClientBuilder {
/// }
/// ```
pub async fn connect(self) -> Result<Client> {
let socket = self.connect_manual().await?;
let socket_clone = socket.clone();

// Use thread to consume items in iterator in order to call callbacks
tokio::runtime::Handle::current().spawn(async move {
let mut stream = socket_clone.as_stream();
// Consume the stream until it returns None and the stream is closed.
while let Some(item) = stream.next().await {
if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item {
trace!("Network error occurred: {}", e.unwrap_err());
}
}
});
let mut socket = self.connect_manual().await?;
socket.poll_stream().await?;

Ok(socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
// Parse url here rather than in new to keep new returning Self.
/// Creates a new Socket that can be used for reconnections
pub(crate) async fn inner_create(&self) -> Result<InnerSocket> {
let mut url = Url::parse(&self.address)?;

if url.path() == "/" {
Expand All @@ -371,11 +399,11 @@ impl ClientBuilder {

let mut builder = EngineIoClientBuilder::new(url);

if let Some(tls_config) = self.tls_config {
builder = builder.tls_config(tls_config);
if let Some(tls_config) = &self.tls_config {
builder = builder.tls_config(tls_config.to_owned());
}
if let Some(headers) = self.opening_headers {
builder = builder.headers(headers);
if let Some(headers) = &self.opening_headers {
builder = builder.headers(headers.to_owned());
}

let engine_client = match self.transport_type {
Expand All @@ -386,14 +414,14 @@ impl ClientBuilder {
};

let inner_socket = InnerSocket::new(engine_client)?;
Ok(inner_socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
let inner_socket = self.inner_create().await?;

let socket = Client::new(
inner_socket,
&self.namespace,
self.on,
self.on_any,
self.auth,
)?;
let socket = Client::new(inner_socket, self)?;
socket.connect().await?;

Ok(socket)
Expand Down
Loading

0 comments on commit 5ae3d4d

Please sign in to comment.