Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async reconnections #400

Merged
merged 7 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_util::{future::BoxFuture, StreamExt};
use futures_util::future::BoxFuture;
use log::trace;
use native_tls::TlsConnector;
use rust_engineio::{
Expand All @@ -8,7 +8,7 @@ use rust_engineio::{
use std::collections::HashMap;
use url::Url;

use crate::{error::Result, Error, Event, Payload, TransportType};
use crate::{error::Result, Event, Payload, TransportType};

use super::{
callback::{Callback, DynAsyncAnyCallback, DynAsyncCallback},
Expand All @@ -22,13 +22,19 @@ use crate::asynchronous::socket::Socket as InnerSocket;
/// acts the `build` method and returns a connected [`Client`].
pub struct ClientBuilder {
address: String,
on: HashMap<Event, Callback<DynAsyncCallback>>,
on_any: Option<Callback<DynAsyncAnyCallback>>,
namespace: String,
pub(crate) on: HashMap<Event, Callback<DynAsyncCallback>>,
pub(crate) on_any: Option<Callback<DynAsyncAnyCallback>>,
pub(crate) namespace: String,
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
auth: Option<serde_json::Value>,
pub(crate) auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
// None implies infinite attempts
pub(crate) max_reconnect_attempts: Option<u8>,
pub(crate) reconnect_delay_min: u64,
pub(crate) reconnect_delay_max: u64,
}

impl ClientBuilder {
Expand Down Expand Up @@ -81,6 +87,12 @@ impl ClientBuilder {
opening_headers: None,
transport_type: TransportType::Any,
auth: None,
reconnect: true,
reconnect_on_disconnect: false,
// None implies infinite attempts
max_reconnect_attempts: None,
reconnect_delay_min: 1000,
reconnect_delay_max: 5000,
}
}

Expand Down Expand Up @@ -315,6 +327,34 @@ impl ClientBuilder {
self
}

/// If set to `false` do not try to reconnect on network errors. Defaults to
/// `true`
pub fn reconnect(mut self, reconnect: bool) -> Self {
self.reconnect = reconnect;
self
}

/// If set to `true` try to reconnect when the server disconnects the
/// client. Defaults to `false`
pub fn reconnect_on_disconnect(mut self, reconnect_on_disconnect: bool) -> Self {
self.reconnect_on_disconnect = reconnect_on_disconnect;
self
}

/// Sets the minimum and maximum delay between reconnection attempts
pub fn reconnect_delay(mut self, min: u64, max: u64) -> Self {
self.reconnect_delay_min = min;
self.reconnect_delay_max = max;
self
}

/// Sets the maximum number of times to attempt reconnections. Defaults to
/// an infinite number of attempts
pub fn max_reconnect_attempts(mut self, reconnect_attempts: u8) -> Self {
self.max_reconnect_attempts = Some(reconnect_attempts);
self
}

/// Connects the socket to a certain endpoint. This returns a connected
/// [`Client`] instance. This method returns an [`std::result::Result::Err`]
/// value if something goes wrong during connection. Also starts a separate
Expand Down Expand Up @@ -343,26 +383,14 @@ impl ClientBuilder {
/// }
/// ```
pub async fn connect(self) -> Result<Client> {
let socket = self.connect_manual().await?;
rageshkrishna marked this conversation as resolved.
Show resolved Hide resolved
let socket_clone = socket.clone();

// Use thread to consume items in iterator in order to call callbacks
tokio::runtime::Handle::current().spawn(async move {
let mut stream = socket_clone.as_stream();
// Consume the stream until it returns None and the stream is closed.
while let Some(item) = stream.next().await {
if let e @ Err(Error::IncompleteResponseFromEngineIo(_)) = item {
trace!("Network error occurred: {}", e.unwrap_err());
}
}
});
let mut socket = self.connect_manual().await?;
socket.poll_stream().await?;

Ok(socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
// Parse url here rather than in new to keep new returning Self.
/// Creates a new Socket that can be used for reconnections
pub(crate) async fn inner_create(&self) -> Result<InnerSocket> {
let mut url = Url::parse(&self.address)?;

if url.path() == "/" {
Expand All @@ -371,11 +399,11 @@ impl ClientBuilder {

let mut builder = EngineIoClientBuilder::new(url);

if let Some(tls_config) = self.tls_config {
builder = builder.tls_config(tls_config);
if let Some(tls_config) = &self.tls_config {
builder = builder.tls_config(tls_config.to_owned());
}
if let Some(headers) = self.opening_headers {
builder = builder.headers(headers);
if let Some(headers) = &self.opening_headers {
builder = builder.headers(headers.to_owned());
}

let engine_client = match self.transport_type {
Expand All @@ -386,14 +414,14 @@ impl ClientBuilder {
};

let inner_socket = InnerSocket::new(engine_client)?;
Ok(inner_socket)
}

//TODO: 0.3.X stabilize
pub(crate) async fn connect_manual(self) -> Result<Client> {
let inner_socket = self.inner_create().await?;

let socket = Client::new(
inner_socket,
&self.namespace,
self.on,
self.on_any,
self.auth,
)?;
let socket = Client::new(inner_socket, self)?;
socket.connect().await?;

Ok(socket)
Expand Down
Loading
Loading