From 5cbb86207b74cb7e0a594d46db63babd01224b43 Mon Sep 17 00:00:00 2001 From: harold Date: Fri, 5 Jan 2024 11:40:31 -0500 Subject: [PATCH 1/5] Issue 361 - Improve http2 connection error handling - Partial fix for #361 - ONLY implemented for the `-z 10s` (work_until) case - TODO: - [ ] The futures are not aborted when the timer is hit, which will cause long running requests to delay the program exit - this is only due to a borrow/move problem that I cannot figure out - [ ] Implement for the non-`work_until` cases - [ ] Add a timeout to the TCP socket setup - this appears to be where some of the delay on shutdown is happening if the server closes after startup - [ ] Consider adding a delay to the reconnect loop so that it will not try to connect more than 1 time per second per concurrent connection - Without this the connect loop will spin at ~23k connect attempts/second for `-c 20`, for example - Test cases: - Start with the server not running at all (never connects) - Currently this will exit on time - IMPROVED: Previously this would attempt to connect once for each `-c`, fail, and immediately exit - IMPROVED: Currently this will repeatedly try to connect until the specified timeout expires, then it will exit - Start with the server running and leave it running - This works fine as before - Start with the server running, exit the server, then restart the server before the test completes - This initially makes requests - IMPROVED: Previously this would OOM even if the server restarted - IMPROVED: Currently this will reconnect and continue making requests if the server restarts --- src/client.rs | 115 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 90 insertions(+), 25 deletions(-) diff --git a/src/client.rs b/src/client.rs index 51f3854b..0244f962 100644 --- a/src/client.rs +++ b/src/client.rs @@ -273,6 +273,9 @@ impl Client { url: &Url, ) -> Result { if url.scheme() == "https" { + // TODO: This may be where some of the connection loops are hanging when + // the server closes; we may need to add a timeout here so that we can exit + // more closely to the configured timeout self.tls_client(addr, url).await } else if let Some(socket_path) = &self.unix_socket { Ok(Stream::Unix( @@ -423,6 +426,8 @@ impl Client { .await .is_err() { + // This gets hit when the connection for HTTP/1.1 faults + // This re-connects start = std::time::Instant::now(); let addr = self.dns.lookup(&url, &mut client_state.rng).await?; let dns_lookup = std::time::Instant::now(); @@ -698,6 +703,20 @@ fn is_too_many_open_files(res: &Result) -> bool { .unwrap_or(false) } +/// Check error was any Hyper error (primarily for HTTP2 connection errors) +fn is_hyper_error(res: &Result) -> bool { + res.as_ref() + .err() + .map(|err| match err { + // REVIEW: IoErrors, if indicating the underlying connection has failed, + // should also cause a stop of HTTP2 requests + // ClientError::IoError(_) => true, + ClientError::HyperError(_) => true, + _ => false, + }) + .unwrap_or(false) +} + async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp2), ClientError> { let mut rng = StdRng::from_entropy(); let url = client.url_generator.generate(&mut rng)?; @@ -1041,42 +1060,87 @@ pub async fn work_until( n_connections: usize, n_http2_parallel: usize, ) { + use std::sync::atomic::{AtomicBool, Ordering}; let client = Arc::new(client); if client.is_http2() { + let should_exit = Arc::new(AtomicBool::new(false)); let futures = (0..n_connections) .map(|_| { let client = client.clone(); let report_tx = report_tx.clone(); + let should_exit = Arc::clone(&should_exit); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - loop { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + // Keep trying to establish or re-establish connections up to the deadline + loop { + // Stop if the deadline has passed + // This happens when a connection is never able to be established + // since the response handling / timeout logic never gets a chance to run + if std::time::Instant::now() > dead_line.into() { + break; + } + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + // Setup the parallel workers for each HTTP2 connection + loop { + if std::time::Instant::now() > dead_line.into() { + break; + } + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let mut client_state = client_state.clone(); + let should_exit = Arc::clone(&should_exit); + tokio::spawn(async move { + // This is where HTTP2 loops to make all the requests for a given client and worker + loop { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_hyper_error = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_hyper_error || should_exit.load(Ordering::Relaxed) { + break; + } + } + }) + }) + .collect::>(); + + tokio::select! { + result = futures::future::try_join_all(futures) => { + match result { + Ok(_) => { + // All tasks completed successfully + break; + } + Err(_) => { + // Re-establish the connection and restart the tasks + should_exit.store(true, Ordering::Relaxed); + break; + } } } - }) - }) - .collect::>(); - - tokio::time::sleep_until(dead_line.into()).await; - for f in futures { - f.abort(); + _ = tokio::time::sleep_until(dead_line.into()) => { + // TODO: Ideally we would abort all the futures here + // This would be particurlarly important for cases where an tested request + // takes, say, 30 seconds to complete on avg. In that case it will be really + // obvious that the test does not exit at any specific configured time, but + // rather up to 30 seconds after that time. + // I cannot figure out how to abort the futures without triggering borrow/move errors + // for f in futures { + // f.abort(); + // } + should_exit.store(true, Ordering::Relaxed); + break; + } + } + } } - } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), + Err(err) => report_tx.send_async(Err(err)).await.unwrap(), + } } }) }) @@ -1092,6 +1156,7 @@ pub async fn work_until( let mut client_state = ClientStateHttp1::default(); tokio::spawn(async move { loop { + // This is where HTTP1 loops to make all the requests for a given client let res = client.work_http1(&mut client_state).await; let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); From 9c4840e6e2313276dd4cfa67b2960304c4485e44 Mon Sep 17 00:00:00 2001 From: harold Date: Fri, 5 Jan 2024 14:10:08 -0500 Subject: [PATCH 2/5] Add connect timeouts / fix restart --- src/client.rs | 47 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0244f962..5a34e622 100644 --- a/src/client.rs +++ b/src/client.rs @@ -272,20 +272,37 @@ impl Client { addr: (std::net::IpAddr, u16), url: &Url, ) -> Result { + // TODO: Allow the connect timeout to be configured + let timeout_duration = tokio::time::Duration::from_secs(5); + if url.scheme() == "https" { - // TODO: This may be where some of the connection loops are hanging when - // the server closes; we may need to add a timeout here so that we can exit - // more closely to the configured timeout - self.tls_client(addr, url).await + // If we do not put a timeout here then the connections attempts will + // linger long past the configured timeout + let stream = tokio::time::timeout(timeout_duration, self.tls_client(addr, url)).await; + match stream { + Ok(Ok(stream)) => Ok(stream), + Ok(Err(err)) => Err(err), + Err(_) => Err(ClientError::Timeout), + } } else if let Some(socket_path) = &self.unix_socket { - Ok(Stream::Unix( - tokio::net::UnixStream::connect(socket_path).await?, - )) + let stream = tokio::time::timeout(timeout_duration, + tokio::net::UnixStream::connect(socket_path) + ).await; + match stream { + Ok(Ok(stream)) => Ok(Stream::Unix(stream)), + Ok(Err(err)) => Err(ClientError::IoError(err)), + Err(_) => Err(ClientError::Timeout), + } } else { - let stream = tokio::net::TcpStream::connect(addr).await?; - stream.set_nodelay(true)?; - // stream.set_keepalive(std::time::Duration::from_secs(1).into())?; - Ok(Stream::Tcp(stream)) + let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; + match stream { + Ok(Ok(stream)) => { + stream.set_nodelay(true)?; + return Ok(Stream::Tcp(stream)); + }, + Ok(Err(err)) => Err(ClientError::IoError(err)), + Err(_) => Err(ClientError::Timeout), + } } } @@ -710,7 +727,7 @@ fn is_hyper_error(res: &Result) -> bool { .map(|err| match err { // REVIEW: IoErrors, if indicating the underlying connection has failed, // should also cause a stop of HTTP2 requests - // ClientError::IoError(_) => true, + ClientError::IoError(_) => true, ClientError::HyperError(_) => true, _ => false, }) @@ -1063,12 +1080,10 @@ pub async fn work_until( use std::sync::atomic::{AtomicBool, Ordering}; let client = Arc::new(client); if client.is_http2() { - let should_exit = Arc::new(AtomicBool::new(false)); let futures = (0..n_connections) .map(|_| { let client = client.clone(); let report_tx = report_tx.clone(); - let should_exit = Arc::clone(&should_exit); tokio::spawn(async move { // Keep trying to establish or re-establish connections up to the deadline loop { @@ -1080,9 +1095,10 @@ pub async fn work_until( } match setup_http2(&client).await { Ok((connection_time, client_state)) => { + let should_exit = Arc::new(AtomicBool::new(false)); // Setup the parallel workers for each HTTP2 connection loop { - if std::time::Instant::now() > dead_line.into() { + if std::time::Instant::now() > dead_line.into() || should_exit.load(Ordering::Relaxed){ break; } let futures = (0..n_http2_parallel) @@ -1113,6 +1129,7 @@ pub async fn work_until( match result { Ok(_) => { // All tasks completed successfully + should_exit.store(true, Ordering::Relaxed); break; } Err(_) => { From 62166dbf7913921c34d89ff50495c9b19c3280c0 Mon Sep 17 00:00:00 2001 From: harold Date: Fri, 5 Jan 2024 16:15:45 -0500 Subject: [PATCH 3/5] Fix format --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 5a34e622..c7342fb3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1098,7 +1098,7 @@ pub async fn work_until( let should_exit = Arc::new(AtomicBool::new(false)); // Setup the parallel workers for each HTTP2 connection loop { - if std::time::Instant::now() > dead_line.into() || should_exit.load(Ordering::Relaxed){ + if std::time::Instant::now() > dead_line.into() || should_exit.load(Ordering::Relaxed) { break; } let futures = (0..n_http2_parallel) From fb90d37dd2c7646832564f9749aca8bfecc5d266 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 6 Jan 2024 17:34:24 +0900 Subject: [PATCH 4/5] Update work_until and cargo fmt --- src/client.rs | 107 +++++++++++++++++++------------------------------- 1 file changed, 40 insertions(+), 67 deletions(-) diff --git a/src/client.rs b/src/client.rs index c7342fb3..05dd81fb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -285,21 +285,24 @@ impl Client { Err(_) => Err(ClientError::Timeout), } } else if let Some(socket_path) = &self.unix_socket { - let stream = tokio::time::timeout(timeout_duration, - tokio::net::UnixStream::connect(socket_path) - ).await; + let stream = tokio::time::timeout( + timeout_duration, + tokio::net::UnixStream::connect(socket_path), + ) + .await; match stream { Ok(Ok(stream)) => Ok(Stream::Unix(stream)), Ok(Err(err)) => Err(ClientError::IoError(err)), Err(_) => Err(ClientError::Timeout), } } else { - let stream = tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; + let stream = + tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; match stream { Ok(Ok(stream)) => { stream.set_nodelay(true)?; return Ok(Stream::Tcp(stream)); - }, + } Ok(Err(err)) => Err(ClientError::IoError(err)), Err(_) => Err(ClientError::Timeout), } @@ -1077,7 +1080,6 @@ pub async fn work_until( n_connections: usize, n_http2_parallel: usize, ) { - use std::sync::atomic::{AtomicBool, Ordering}; let client = Arc::new(client); if client.is_http2() { let futures = (0..n_connections) @@ -1087,72 +1089,43 @@ pub async fn work_until( tokio::spawn(async move { // Keep trying to establish or re-establish connections up to the deadline loop { - // Stop if the deadline has passed - // This happens when a connection is never able to be established - // since the response handling / timeout logic never gets a chance to run - if std::time::Instant::now() > dead_line.into() { - break; - } match setup_http2(&client).await { Ok((connection_time, client_state)) => { - let should_exit = Arc::new(AtomicBool::new(false)); // Setup the parallel workers for each HTTP2 connection - loop { - if std::time::Instant::now() > dead_line.into() || should_exit.load(Ordering::Relaxed) { - break; - } - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let mut client_state = client_state.clone(); - let should_exit = Arc::clone(&should_exit); - tokio::spawn(async move { - // This is where HTTP2 loops to make all the requests for a given client and worker - loop { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - let is_hyper_error = is_hyper_error(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel || is_hyper_error || should_exit.load(Ordering::Relaxed) { - break; - } - } - }) - }) - .collect::>(); - - tokio::select! { - result = futures::future::try_join_all(futures) => { - match result { - Ok(_) => { - // All tasks completed successfully - should_exit.store(true, Ordering::Relaxed); - break; - } - Err(_) => { - // Re-establish the connection and restart the tasks - should_exit.store(true, Ordering::Relaxed); - break; + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + // This is where HTTP2 loops to make all the requests for a given client and worker + loop { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_hyper_error = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_hyper_error { + break is_cancel; } } - } - _ = tokio::time::sleep_until(dead_line.into()) => { - // TODO: Ideally we would abort all the futures here - // This would be particurlarly important for cases where an tested request - // takes, say, 30 seconds to complete on avg. In that case it will be really - // obvious that the test does not exit at any specific configured time, but - // rather up to 30 seconds after that time. - // I cannot figure out how to abort the futures without triggering borrow/move errors - // for f in futures { - // f.abort(); - // } - should_exit.store(true, Ordering::Relaxed); - break; - } - } + }) + }) + .chain(std::iter::once(tokio::spawn(async move { + tokio::time::sleep_until(dead_line.into()).await; + true + }))) + .collect::>(); + + let (is_cancel, _, rest) = + futures::future::select_all(futures).await; + for f in rest { + f.abort(); + } + + if matches!(is_cancel, Ok(true)) { + break; } } From 2d55312367fc15c2b6fe3afd2abd5f38c91a4799 Mon Sep 17 00:00:00 2001 From: hatoo Date: Wed, 10 Jan 2024 20:22:46 +0900 Subject: [PATCH 5/5] Add connection timeout to #[not(unix)] --- src/client.rs | 48 ++++++++++++++++-------------------------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/src/client.rs b/src/client.rs index 05dd81fb..6f317266 100644 --- a/src/client.rs +++ b/src/client.rs @@ -266,7 +266,6 @@ impl Client { self.http_version == http::Version::HTTP_2 } - #[cfg(unix)] async fn client( &self, addr: (std::net::IpAddr, u16), @@ -279,49 +278,34 @@ impl Client { // If we do not put a timeout here then the connections attempts will // linger long past the configured timeout let stream = tokio::time::timeout(timeout_duration, self.tls_client(addr, url)).await; - match stream { + return match stream { Ok(Ok(stream)) => Ok(stream), Ok(Err(err)) => Err(err), Err(_) => Err(ClientError::Timeout), - } - } else if let Some(socket_path) = &self.unix_socket { + }; + } + #[cfg(unix)] + if let Some(socket_path) = &self.unix_socket { let stream = tokio::time::timeout( timeout_duration, tokio::net::UnixStream::connect(socket_path), ) .await; - match stream { + return match stream { Ok(Ok(stream)) => Ok(Stream::Unix(stream)), Ok(Err(err)) => Err(ClientError::IoError(err)), Err(_) => Err(ClientError::Timeout), - } - } else { - let stream = - tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; - match stream { - Ok(Ok(stream)) => { - stream.set_nodelay(true)?; - return Ok(Stream::Tcp(stream)); - } - Ok(Err(err)) => Err(ClientError::IoError(err)), - Err(_) => Err(ClientError::Timeout), - } + }; } - } - - #[cfg(not(unix))] - async fn client( - &self, - addr: (std::net::IpAddr, u16), - url: &Url, - ) -> Result { - if url.scheme() == "https" { - self.tls_client(addr, url).await - } else { - let stream = tokio::net::TcpStream::connect(addr).await?; - stream.set_nodelay(true)?; - // stream.set_keepalive(std::time::Duration::from_secs(1).into())?; - Ok(Stream::Tcp(stream)) + let stream = + tokio::time::timeout(timeout_duration, tokio::net::TcpStream::connect(addr)).await; + match stream { + Ok(Ok(stream)) => { + stream.set_nodelay(true)?; + Ok(Stream::Tcp(stream)) + } + Ok(Err(err)) => Err(ClientError::IoError(err)), + Err(_) => Err(ClientError::Timeout), } }