From 0b68ebb1bbd3d6f684229f6cbab1eaf10e363546 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Sat, 30 Nov 2024 02:15:02 -0500 Subject: [PATCH 1/6] Implement Ping/Pong Mech to Improve Connection Reliability --- .../src/nonblocking/pubsub_client.rs | 40 ++++++++- pubsub-client/src/pubsub_client.rs | 90 +++++++++++++++---- 2 files changed, 111 insertions(+), 19 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 476cf935f93b3c..70f3973eb629cd 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -197,7 +197,7 @@ use { net::TcpStream, sync::{mpsc, oneshot}, task::JoinHandle, - time::{sleep, Duration}, + time::{interval, Interval, sleep, Duration}, }, tokio_stream::wrappers::UnboundedReceiverStream, tokio_tungstenite::{ @@ -213,6 +213,11 @@ use { pub type PubsubClientResult = Result; +/// The interval between pings measured in seconds +pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +/// The maximum number of consecutive failed pings before considering the connection stale +pub const DEFAULT_MAX_FAILED_PINGS: usize = 3; + #[derive(Debug, Error)] pub enum PubsubClientError { #[error("url parse error")] @@ -249,6 +254,16 @@ pub enum PubsubClientError { UnexpectedGetVersionResponse(String), } +impl PubsubClientError { + pub fn is_timeout(&self) -> bool { + matches!( + self, + PubsubClientError::WsError(tungstenite::Error::Io(ref err)) + if err.kind() == std::io::ErrorKind::WouldBlock + ) + } +} + type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; type SubscribeResponseMsg = Result<(mpsc::UnboundedReceiver, UnsubscribeFn), PubsubClientError>; @@ -500,6 +515,9 @@ impl PubsubClient { let mut subscriptions = BTreeMap::new(); let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel(); + let mut ping_interval: Interval = interval(Duration::from_secs(DEFAULT_PING_DURATION_SECONDS)); + let mut unmatched_pings: usize = 0usize; + loop { tokio::select! { // Send close on shutdown signal @@ -510,8 +528,15 @@ impl PubsubClient { break; }, // Send `Message::Ping` each 10s if no any other communication - () = sleep(Duration::from_secs(10)) => { + _ = ping_interval.tick() => { ws.send(Message::Ping(Vec::new())).await?; + unmatched_pings += 1; + + if unmatched_pings > DEFAULT_MAX_FAILED_PINGS { + info!("No pong received after {} pings. Closing connection...", DEFAULT_MAX_FAILED_PINGS); + ws.close(Some(CloseFrame { code: CloseCode::Normal, reason: "No pong received".into() })).await?; + break; + } }, // Read message for subscribe Some((operation, params, response_sender)) = subscribe_receiver.recv() => { @@ -547,13 +572,20 @@ impl PubsubClient { // Get text from the message let text = match msg { - Message::Text(text) => text, + Message::Text(text) => { + unmatched_pings = 0; + text + }, Message::Binary(_data) => continue, // Ignore Message::Ping(data) => { ws.send(Message::Pong(data)).await?; + unmatched_pings = 0; + continue + }, + Message::Pong(_data) => { + unmatched_pings = 0; continue }, - Message::Pong(_data) => continue, Message::Close(_frame) => break, Message::Frame(_frame) => continue, }; diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 77e1b12f6bf84b..3fc5c9fa3b19dc 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -125,6 +125,11 @@ use { url::Url, }; +/// The interval between pings measured in seconds +pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +/// The maximum number of consecutive failed pings before considering the connection stale +pub const DEFAULT_MAX_FAILED_PINGS: usize = 3; + /// A subscription. /// /// The subscription is unsubscribed on drop, and note that unsubscription (and @@ -211,24 +216,35 @@ where fn read_message( writable_socket: &Arc>>>, ) -> Result, PubsubClientError> { - let message = writable_socket.write().unwrap().read()?; - if message.is_ping() { - return Ok(None); - } - let message_text = &message.into_text()?; - if let Ok(json_msg) = serde_json::from_str::>(message_text) { - if let Some(Object(params)) = json_msg.get("params") { - if let Some(result) = params.get("result") { - if let Ok(x) = serde_json::from_value::(result.clone()) { - return Ok(Some(x)); + match writable_socket.write().unwrap().read() { + Ok(message) => { + if message.is_ping() || message.is_pong() { + return Ok(None); + } + + let message_text = &message.into_text()?; + if let Ok(json_msg) = serde_json::from_str::>(message_text) { + if let Some(Object(params)) = json_msg.get("params") { + if let Some(result) = params.get("result") { + if let Ok(x) = serde_json::from_value::(result.clone()) { + return Ok(Some(x)); + } + } } } + + Err(PubsubClientError::UnexpectedMessageError(format!( + "msg={message_text}" + ))) + } + Err(tungstenite::Error::Io(ref err)) if err.kind() == std::io::ErrorKind::WouldBlock => { + // Read timed out + Err(PubsubClientError::WsError(tungstenite::Error::Io( + std::io::Error::from(std::io::ErrorKind::WouldBlock), + ))) } + Err(err) => Err(PubsubClientError::WsError(err)), } - - Err(PubsubClientError::UnexpectedMessageError(format!( - "msg={message_text}" - ))) } /// Shutdown the internel message receiver and wait for its thread to exit. @@ -795,15 +811,59 @@ impl PubsubClient { T: DeserializeOwned, F: Fn(T) + Send + 'static, { + let ping_interval: Duration = Duration::from_secs(DEFAULT_PING_DURATION_SECONDS); + let max_failed_pings: usize = DEFAULT_MAX_FAILED_PINGS; + let mut last_ping_time: std::time::Instant = std::time::Instant::now(); + let unmatched_pings: Arc = Arc::new(AtomicUsize::new(0)); + loop { if exit.load(Ordering::Relaxed) { break; } + // Send ping if the interval has passed + if last_ping_time.elapsed() >= ping_interval { + if let Err(err) = socket.write().unwrap().write_message(Message::Ping(vec![])) { + info!("Error sending ping: {:?}", err); + break; + } + + last_ping_time = std::time::Instant::now(); + let pings = unmatched_pings.fetch_add(1, Ordering::Relaxed) + 1; + + // Check if max_failed_pings has been exceeded + if pings > max_failed_pings { + info!( + "No pong received after {} pings. Closing connection...", + max_failed_pings + ); + + let _ = socket.write().unwrap().close(None); + break; + } + } + + // Read timeout to prevent indefinite blocking on `read_message` + socket + .write() + .unwrap() + .get_mut() + .get_mut() + .set_read_timeout(Some(Duration::from_secs(0.5))) + .unwrap(); + match PubsubClientSubscription::read_message(socket) { - Ok(Some(message)) => handler(message), + Ok(Some(message)) => { + unmatched_pings.store(0, Ordering::Relaxed); + handler(message) + } Ok(None) => { // Nothing useful, means we received a ping message + unmatched_pings.store(0, Ordering::Relaxed); + } + Err(ref err) if err.is_timeout() => { + // Read timed out - continue the loop + continue; } Err(err) => { info!("receive error: {:?}", err); From 4cad6eea4be432f56b42f3464f22d353e218e115 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 8 Jan 2025 16:24:58 -0500 Subject: [PATCH 2/6] Implement Feedback --- .../src/nonblocking/pubsub_client.rs | 20 ++++++++----------- pubsub-client/src/pubsub_client.rs | 12 +++-------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 70f3973eb629cd..0880d93a2f05cc 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -197,7 +197,7 @@ use { net::TcpStream, sync::{mpsc, oneshot}, task::JoinHandle, - time::{interval, Interval, sleep, Duration}, + time::{interval, Interval, Duration}, }, tokio_stream::wrappers::UnboundedReceiverStream, tokio_tungstenite::{ @@ -210,14 +210,10 @@ use { }, url::Url, }; +use crate::nonblocking::pubsub_client::{DEFAULT_PING_DURATION_SECONDS, DEFAULT_MAX_FAILED_PINGS}; pub type PubsubClientResult = Result; -/// The interval between pings measured in seconds -pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10; -/// The maximum number of consecutive failed pings before considering the connection stale -pub const DEFAULT_MAX_FAILED_PINGS: usize = 3; - #[derive(Debug, Error)] pub enum PubsubClientError { #[error("url parse error")] @@ -516,7 +512,7 @@ impl PubsubClient { let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel(); let mut ping_interval: Interval = interval(Duration::from_secs(DEFAULT_PING_DURATION_SECONDS)); - let mut unmatched_pings: usize = 0usize; + let mut elapsed_pings: usize = 0usize; loop { tokio::select! { @@ -530,9 +526,9 @@ impl PubsubClient { // Send `Message::Ping` each 10s if no any other communication _ = ping_interval.tick() => { ws.send(Message::Ping(Vec::new())).await?; - unmatched_pings += 1; + elapsed_pings += 1; - if unmatched_pings > DEFAULT_MAX_FAILED_PINGS { + if elapsed_pings > DEFAULT_MAX_FAILED_PINGS { info!("No pong received after {} pings. Closing connection...", DEFAULT_MAX_FAILED_PINGS); ws.close(Some(CloseFrame { code: CloseCode::Normal, reason: "No pong received".into() })).await?; break; @@ -573,17 +569,17 @@ impl PubsubClient { // Get text from the message let text = match msg { Message::Text(text) => { - unmatched_pings = 0; + elapsed_pings = 0; text }, Message::Binary(_data) => continue, // Ignore Message::Ping(data) => { ws.send(Message::Pong(data)).await?; - unmatched_pings = 0; + elapsed_pings = 0; continue }, Message::Pong(_data) => { - unmatched_pings = 0; + elapsed_pings = 0; continue }, Message::Close(_frame) => break, diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 3fc5c9fa3b19dc..5832598e3031fb 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -115,7 +115,7 @@ use { marker::PhantomData, net::TcpStream, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, RwLock, }, thread::{sleep, JoinHandle}, @@ -237,12 +237,6 @@ where "msg={message_text}" ))) } - Err(tungstenite::Error::Io(ref err)) if err.kind() == std::io::ErrorKind::WouldBlock => { - // Read timed out - Err(PubsubClientError::WsError(tungstenite::Error::Io( - std::io::Error::from(std::io::ErrorKind::WouldBlock), - ))) - } Err(err) => Err(PubsubClientError::WsError(err)), } } @@ -823,7 +817,7 @@ impl PubsubClient { // Send ping if the interval has passed if last_ping_time.elapsed() >= ping_interval { - if let Err(err) = socket.write().unwrap().write_message(Message::Ping(vec![])) { + if let Err(err) = socket.write().unwrap().send(Message::Ping(vec![])) { info!("Error sending ping: {:?}", err); break; } @@ -849,7 +843,7 @@ impl PubsubClient { .unwrap() .get_mut() .get_mut() - .set_read_timeout(Some(Duration::from_secs(0.5))) + .set_read_timeout(Some(Duration::from_millis(500))) .unwrap(); match PubsubClientSubscription::read_message(socket) { From fefc87e468efe517e0ea8e999c3d65d456f6e170 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 8 Jan 2025 16:25:25 -0500 Subject: [PATCH 3/6] Formatting --- pubsub-client/src/nonblocking/pubsub_client.rs | 7 ++++--- pubsub-client/src/pubsub_client.rs | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 0880d93a2f05cc..0d7be3264a133c 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -166,6 +166,7 @@ //! # Ok::<(), anyhow::Error>(()) //! ``` +use crate::nonblocking::pubsub_client::{DEFAULT_MAX_FAILED_PINGS, DEFAULT_PING_DURATION_SECONDS}; use { futures_util::{ future::{ready, BoxFuture, FutureExt}, @@ -197,7 +198,7 @@ use { net::TcpStream, sync::{mpsc, oneshot}, task::JoinHandle, - time::{interval, Interval, Duration}, + time::{interval, Duration, Interval}, }, tokio_stream::wrappers::UnboundedReceiverStream, tokio_tungstenite::{ @@ -210,7 +211,6 @@ use { }, url::Url, }; -use crate::nonblocking::pubsub_client::{DEFAULT_PING_DURATION_SECONDS, DEFAULT_MAX_FAILED_PINGS}; pub type PubsubClientResult = Result; @@ -511,7 +511,8 @@ impl PubsubClient { let mut subscriptions = BTreeMap::new(); let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel(); - let mut ping_interval: Interval = interval(Duration::from_secs(DEFAULT_PING_DURATION_SECONDS)); + let mut ping_interval: Interval = + interval(Duration::from_secs(DEFAULT_PING_DURATION_SECONDS)); let mut elapsed_pings: usize = 0usize; loop { diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 5832598e3031fb..30d0fa1e1647c5 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -221,7 +221,7 @@ where if message.is_ping() || message.is_pong() { return Ok(None); } - + let message_text = &message.into_text()?; if let Ok(json_msg) = serde_json::from_str::>(message_text) { if let Some(Object(params)) = json_msg.get("params") { @@ -232,7 +232,7 @@ where } } } - + Err(PubsubClientError::UnexpectedMessageError(format!( "msg={message_text}" ))) @@ -836,7 +836,7 @@ impl PubsubClient { break; } } - + // Read timeout to prevent indefinite blocking on `read_message` socket .write() From e68884b372cb6a41ecdfd437c794e37fc5305e2a Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 9 Jan 2025 12:09:06 -0500 Subject: [PATCH 4/6] Fix Issues With Imports and Var Naming --- pubsub-client/src/nonblocking/pubsub_client.rs | 2 +- pubsub-client/src/pubsub_client.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 0d7be3264a133c..742b63561cb5e9 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -166,7 +166,6 @@ //! # Ok::<(), anyhow::Error>(()) //! ``` -use crate::nonblocking::pubsub_client::{DEFAULT_MAX_FAILED_PINGS, DEFAULT_PING_DURATION_SECONDS}; use { futures_util::{ future::{ready, BoxFuture, FutureExt}, @@ -174,6 +173,7 @@ use { stream::{BoxStream, StreamExt}, }, log::*, + pubsub_client::{DEFAULT_MAX_FAILED_PINGS, DEFAULT_PING_DURATION_SECONDS}, serde::de::DeserializeOwned, serde_json::{json, Map, Value}, solana_account_decoder_client_types::UiAccount, diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 30d0fa1e1647c5..470e0dfa438b5d 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -808,7 +808,7 @@ impl PubsubClient { let ping_interval: Duration = Duration::from_secs(DEFAULT_PING_DURATION_SECONDS); let max_failed_pings: usize = DEFAULT_MAX_FAILED_PINGS; let mut last_ping_time: std::time::Instant = std::time::Instant::now(); - let unmatched_pings: Arc = Arc::new(AtomicUsize::new(0)); + let elapsed_pings: Arc = Arc::new(AtomicUsize::new(0)); loop { if exit.load(Ordering::Relaxed) { @@ -823,7 +823,7 @@ impl PubsubClient { } last_ping_time = std::time::Instant::now(); - let pings = unmatched_pings.fetch_add(1, Ordering::Relaxed) + 1; + let pings = elapsed_pings.fetch_add(1, Ordering::Relaxed) + 1; // Check if max_failed_pings has been exceeded if pings > max_failed_pings { @@ -848,12 +848,12 @@ impl PubsubClient { match PubsubClientSubscription::read_message(socket) { Ok(Some(message)) => { - unmatched_pings.store(0, Ordering::Relaxed); + elapsed_pings.store(0, Ordering::Relaxed); handler(message) } Ok(None) => { // Nothing useful, means we received a ping message - unmatched_pings.store(0, Ordering::Relaxed); + elapsed_pings.store(0, Ordering::Relaxed); } Err(ref err) if err.is_timeout() => { // Read timed out - continue the loop From e0af650ee4db8f3ff6afb1c60b1ac9ce8cc703ec Mon Sep 17 00:00:00 2001 From: 0xIchigo <0xIchigo@protonmail.com> Date: Fri, 10 Jan 2025 18:12:34 -0500 Subject: [PATCH 5/6] Fix Compile Errors --- .../src/nonblocking/pubsub_client.rs | 2 +- pubsub-client/src/pubsub_client.rs | 20 +++++++++++-------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 742b63561cb5e9..323b4530b74cfb 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -167,13 +167,13 @@ //! ``` use { + crate::pubsub_client::{DEFAULT_MAX_FAILED_PINGS, DEFAULT_PING_DURATION_SECONDS}, futures_util::{ future::{ready, BoxFuture, FutureExt}, sink::SinkExt, stream::{BoxStream, StreamExt}, }, log::*, - pubsub_client::{DEFAULT_MAX_FAILED_PINGS, DEFAULT_PING_DURATION_SECONDS}, serde::de::DeserializeOwned, serde_json::{json, Map, Value}, solana_account_decoder_client_types::UiAccount, diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 470e0dfa438b5d..8ae2d86e83370f 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -837,14 +837,18 @@ impl PubsubClient { } } - // Read timeout to prevent indefinite blocking on `read_message` - socket - .write() - .unwrap() - .get_mut() - .get_mut() - .set_read_timeout(Some(Duration::from_millis(500))) - .unwrap(); + let mut ws = socket.write().unwrap(); + let maybe_tls_stream = ws.get_mut(); + + match maybe_tls_stream { + MaybeTlsStream::Plain(tcp_stream) => { + if let Err(e) = tcp_stream.set_read_timeout(Some(Duration::from_millis(500))) { + info!("Failed to set read timeout on TcpStream: {:?}", e); + } + } + // We can only set a read time out safely if it's a plain TCP connection + _ => {} + } match PubsubClientSubscription::read_message(socket) { Ok(Some(message)) => { From 7d797515eed5bdcd4fdf5ad668866c887c18043c Mon Sep 17 00:00:00 2001 From: 0xIchigo <0xIchigo@protonmail.com> Date: Tue, 14 Jan 2025 18:09:10 -0500 Subject: [PATCH 6/6] Change Match to if let --- pubsub-client/src/pubsub_client.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 8ae2d86e83370f..f5349af41ce787 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -840,14 +840,11 @@ impl PubsubClient { let mut ws = socket.write().unwrap(); let maybe_tls_stream = ws.get_mut(); - match maybe_tls_stream { - MaybeTlsStream::Plain(tcp_stream) => { - if let Err(e) = tcp_stream.set_read_timeout(Some(Duration::from_millis(500))) { - info!("Failed to set read timeout on TcpStream: {:?}", e); - } + // We can only set a read time out safely if it's a plain TCP connection + if let MaybeTlsStream::Plain(tcp_stream) = maybe_tls_stream { + if let Err(e) = tcp_stream.set_read_timeout(Some(Duration::from_millis(500))) { + info!("Failed to set read timeout on TcpStream: {:?}", e); } - // We can only set a read time out safely if it's a plain TCP connection - _ => {} } match PubsubClientSubscription::read_message(socket) {