From 640dac626cc2944d25c2f85d7f99ff1b55ce2769 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 13 Jan 2024 16:46:16 +0900 Subject: [PATCH 01/12] fix work --- src/client.rs | 65 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/client.rs b/src/client.rs index 6f317266..cd9a5b7e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -766,34 +766,49 @@ pub async fn work( let counter = counter.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(move |_| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks + { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } + + true + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + + for f in futures { + if f.await.unwrap_or(true) { + return; + } + } + } + Err(err) => { + report_tx.send_async(Err(err)).await.unwrap(); + + if !(counter.fetch_add(1, Ordering::Relaxed) < n_tasks) { + break; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) @@ -1087,10 +1102,10 @@ pub async fn work_until( let mut res = client.work_http2(&mut client_state).await; let is_cancel = is_too_many_open_files(&res); - let is_hyper_error = is_hyper_error(&res); + let is_reconnect = is_hyper_error(&res); set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); - if is_cancel || is_hyper_error { + if is_cancel || is_reconnect { break is_cancel; } } From 98f582afeb1e7beab3345fcc8a807d6ed3a99118 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 13 Jan 2024 17:02:12 +0900 Subject: [PATCH 02/12] may speed up --- src/client.rs | 52 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/src/client.rs b/src/client.rs index cd9a5b7e..780b74c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -759,8 +759,8 @@ pub async fn work( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); @@ -786,35 +786,55 @@ pub async fn work( set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return is_cancel; + return (false, is_cancel); } } - true + (true, true) }) }) .collect::>(); + let mut connection_gone = false; for f in futures { - if f.await.unwrap_or(true) { - return; + match f.await { + Ok((true, _)) => { + // All works done + return true; + } + Ok((_, is_cancel)) => { + connection_gone |= is_cancel; + } + Err(_) => { + // Unexpected + return false; + } } } + + if connection_gone { + return false; + } } Err(err) => { report_tx.send_async(Err(err)).await.unwrap(); if !(counter.fetch_add(1, Ordering::Relaxed) < n_tasks) { - break; + return true; } } } } }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); @@ -826,17 +846,19 @@ pub async fn work( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - break; + return false; } } + true }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } }; - - for f in futures { - let _ = f.await; - } } /// n tasks by m workers limit to qps works in a second From 2520b5f85e5ae66d86e94784f811ca5517d17757 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 13 Jan 2024 17:24:52 +0900 Subject: [PATCH 03/12] clippy --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 780b74c0..f8e9c91b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -819,7 +819,7 @@ pub async fn work( Err(err) => { report_tx.send_async(Err(err)).await.unwrap(); - if !(counter.fetch_add(1, Ordering::Relaxed) < n_tasks) { + if counter.fetch_add(1, Ordering::Relaxed) >= n_tasks { return true; } } From 983f095ed7fdc8128e33c0fdf8072959e428fb36 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 14:43:24 +0900 Subject: [PATCH 04/12] work_with_qps --- src/client.rs | 94 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/src/client.rs b/src/client.rs index f8e9c91b..b3fde63f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -911,46 +911,77 @@ pub async fn work_with_qps( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let client = client.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(()) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let client = client.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(()) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return (false, is_cancel); + } } - } + (true, true) + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok((true, _)) => { + // All works done + return true; + } + Ok((_, is_cancel)) => { + connection_gone |= is_cancel; + } + Err(_) => { + // Unexpected + return false; + } + } + } + if connection_gone { + return false; + } + } + Err(err) => { + report_tx.send_async(Err(err)).await.unwrap(); + // Consume a task + if rx.recv_async().await.is_err() { + return true; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); @@ -967,12 +998,11 @@ pub async fn work_with_qps( } }) }) - .collect::>() + .collect::>(); + for f in futures { + let _ = f.await; + } }; - - for f in futures { - let _ = f.await; - } } /// n tasks by m workers limit to qps works in a second with latency correction From bae0caa50746ba3e5778716d1fbde0f6dae1d4e6 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 14:46:35 +0900 Subject: [PATCH 05/12] could be --- src/client.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index b3fde63f..8f78caa9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -993,14 +993,17 @@ pub async fn work_with_qps( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - break; + return false; } } + true }) }) .collect::>(); for f in futures { - let _ = f.await; + if matches!(f.await, Ok(true)) { + break; + } } }; } From 45018ffe07ed7aa18268b2df3e041fc39009f297 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 14:53:20 +0900 Subject: [PATCH 06/12] work_with_qps_latency_correction --- src/client.rs | 101 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 34 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8f78caa9..296735c3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1060,47 +1060,78 @@ pub async fn work_with_qps_latency_correction( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let rx = rx.clone(); let client = client.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let client = client.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(start) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - set_start_latency_correction(&mut res, start); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let client = client.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(start) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + set_start_latency_correction(&mut res, start); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return (false, is_cancel); + } } - } + (true, true) + }) }) - }) - .collect::>(); - for f in futures { - let _ = f.await; + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok((true, _)) => { + // All works done + return true; + } + Ok((_, is_cancel)) => { + connection_gone |= is_cancel; + } + Err(_) => { + // Unexpected + return false; + } + } + } + if connection_gone { + return false; + } + } + Err(err) => { + report_tx.send_async(Err(err)).await.unwrap(); + // Consume a task + if rx.recv_async().await.is_err() { + return true; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let client = client.clone(); let mut client_state = ClientStateHttp1::default(); @@ -1113,17 +1144,19 @@ pub async fn work_with_qps_latency_correction( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - break; + return false; } } + true }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } }; - - for f in futures { - let _ = f.await; - } } /// Run until dead_line by n workers From 3b82bc7bdd3ed8c4fc6c9bd667378aaf55a5edda Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:03:39 +0900 Subject: [PATCH 07/12] tmp --- src/client.rs | 73 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/src/client.rs b/src/client.rs index 296735c3..4b675d17 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1310,34 +1310,59 @@ pub async fn work_until_with_qps( let report_tx = report_tx.clone(); let rx = rx.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(()) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - let is_cancel = is_too_many_open_files(&res); - set_connection_time(&mut res, connection_time); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(()) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time(&mut res, connection_time); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return (false, is_cancel); + } } - } + (true, true) + }) }) - }) - .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; - for f in futures { - f.abort(); + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok((true, _)) => { + // All works done + return true; + } + Ok((_, is_cancel)) => { + connection_gone |= is_cancel; + } + Err(_) => { + // Unexpected + return false; + } + } + } + if connection_gone { + return false; + } + } + Err(err) => { + report_tx.send_async(Err(err)).await.unwrap(); + // Consume a task + if rx.recv_async().await.is_err() { + return true; + } } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), } }) }) From 02d492f91d9e5bbd866d03e8f1d4122bdc43573d Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:13:50 +0900 Subject: [PATCH 08/12] fix --- src/client.rs | 103 ++++++++++++++++++++------------------------------ 1 file changed, 42 insertions(+), 61 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4b675d17..b94663a3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -786,11 +786,11 @@ pub async fn work( set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return (false, is_cancel); + return is_cancel; } } - (true, true) + true }) }) .collect::>(); @@ -798,29 +798,27 @@ pub async fn work( let mut connection_gone = false; for f in futures { match f.await { - Ok((true, _)) => { + Ok(true) => { // All works done - return true; - } - Ok((_, is_cancel)) => { - connection_gone |= is_cancel; + connection_gone = true; } Err(_) => { // Unexpected - return false; + connection_gone = true; } + _ => {} } } if connection_gone { - return false; + return; } } Err(err) => { - report_tx.send_async(Err(err)).await.unwrap(); - - if counter.fetch_add(1, Ordering::Relaxed) >= n_tasks { - return true; + if counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; } } } @@ -829,9 +827,7 @@ pub async fn work( }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } } else { let futures = (0..n_connections) @@ -846,17 +842,14 @@ pub async fn work( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - return false; + break; } } - true }) }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } }; } @@ -936,38 +929,37 @@ pub async fn work_with_qps( set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return (false, is_cancel); + return is_cancel; } } - (true, true) + true }) }) .collect::>(); let mut connection_gone = false; for f in futures { match f.await { - Ok((true, _)) => { + Ok(true) => { // All works done - return true; - } - Ok((_, is_cancel)) => { - connection_gone |= is_cancel; + connection_gone = true; } Err(_) => { // Unexpected - return false; + connection_gone = true; } + _ => {} } } if connection_gone { - return false; + return; } } Err(err) => { - report_tx.send_async(Err(err)).await.unwrap(); // Consume a task - if rx.recv_async().await.is_err() { - return true; + if let Ok(()) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; } } } @@ -976,9 +968,7 @@ pub async fn work_with_qps( }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } } else { let futures = (0..n_connections) @@ -993,17 +983,14 @@ pub async fn work_with_qps( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - return false; + break; } } - true }) }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } }; } @@ -1086,38 +1073,37 @@ pub async fn work_with_qps_latency_correction( set_start_latency_correction(&mut res, start); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return (false, is_cancel); + return is_cancel; } } - (true, true) + true }) }) .collect::>(); let mut connection_gone = false; for f in futures { match f.await { - Ok((true, _)) => { + Ok(true) => { // All works done - return true; - } - Ok((_, is_cancel)) => { - connection_gone |= is_cancel; + connection_gone = true; } Err(_) => { // Unexpected - return false; + connection_gone = true; } + _ => {} } } if connection_gone { - return false; + return; } } Err(err) => { - report_tx.send_async(Err(err)).await.unwrap(); // Consume a task - if rx.recv_async().await.is_err() { - return true; + if let Ok(_) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; } } } @@ -1126,9 +1112,7 @@ pub async fn work_with_qps_latency_correction( }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } } else { let futures = (0..n_connections) @@ -1144,17 +1128,14 @@ pub async fn work_with_qps_latency_correction( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - return false; + break; } } - true }) }) .collect::>(); for f in futures { - if matches!(f.await, Ok(true)) { - break; - } + let _ = f.await; } }; } From a03c7c52e36873637321af6a6e155667b9239972 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:19:04 +0900 Subject: [PATCH 09/12] work_until_with_qps --- src/client.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index b94663a3..ca1dd97a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1309,38 +1309,37 @@ pub async fn work_until_with_qps( set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return (false, is_cancel); + return is_cancel; } } - (true, true) + true }) }) .collect::>(); let mut connection_gone = false; for f in futures { match f.await { - Ok((true, _)) => { + Ok(true) => { // All works done - return true; - } - Ok((_, is_cancel)) => { - connection_gone |= is_cancel; + connection_gone = true; } Err(_) => { // Unexpected - return false; + connection_gone = true; } + _ => {} } } if connection_gone { - return false; + return; } } Err(err) => { - report_tx.send_async(Err(err)).await.unwrap(); // Consume a task - if rx.recv_async().await.is_err() { - return true; + if let Ok(_) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; } } } From 47e26f620be4fbd1adcad814c8e18d1440cab798 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:21:46 +0900 Subject: [PATCH 10/12] work_until_with_qps_latency_correction --- src/client.rs | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index ca1dd97a..7e413f99 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1451,21 +1451,42 @@ pub async fn work_until_with_qps_latency_correction( set_start_latency_correction(&mut res, start); set_connection_time(&mut res, connection_time); let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; + if is_cancel || is_reconnect { + return is_cancel; } } + true }) }) .collect::>(); - tokio::time::sleep_until(dead_line.into()).await; + let mut connection_gone = false; for f in futures { - f.abort(); + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; } } - Err(err) => report_tx.send_async(Err(err)).await.unwrap(), + Err(err) => { + if let Ok(_) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } + } } }) }) From a480ba216f87cb09f8fa2bcb8a149fb6959f77fc Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:32:24 +0900 Subject: [PATCH 11/12] fix --- src/client.rs | 88 ++++++++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7e413f99..05ed0cdc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1436,55 +1436,57 @@ pub async fn work_until_with_qps_latency_correction( let report_tx = report_tx.clone(); let rx = rx.clone(); tokio::spawn(async move { - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let client = client.clone(); - let report_tx = report_tx.clone(); - let rx = rx.clone(); - let mut client_state = client_state.clone(); - tokio::spawn(async move { - while let Ok(start) = rx.recv_async().await { - let mut res = - client.work_http2(&mut client_state).await; - set_start_latency_correction(&mut res, start); - set_connection_time(&mut res, connection_time); - let is_cancel = is_too_many_open_files(&res); - let is_reconnect = is_hyper_error(&res); - report_tx.send_async(res).await.unwrap(); - if is_cancel || is_reconnect { - return is_cancel; + loop { + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let client = client.clone(); + let report_tx = report_tx.clone(); + let rx = rx.clone(); + let mut client_state = client_state.clone(); + tokio::spawn(async move { + while let Ok(start) = rx.recv_async().await { + let mut res = + client.work_http2(&mut client_state).await; + set_start_latency_correction(&mut res, start); + set_connection_time(&mut res, connection_time); + let is_cancel = is_too_many_open_files(&res); + let is_reconnect = is_hyper_error(&res); + report_tx.send_async(res).await.unwrap(); + if is_cancel || is_reconnect { + return is_cancel; + } } - } - true + true + }) }) - }) - .collect::>(); - let mut connection_gone = false; - for f in futures { - match f.await { - Ok(true) => { - // All works done - connection_gone = true; - } - Err(_) => { - // Unexpected - connection_gone = true; + .collect::>(); + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} } - _ => {} + } + if connection_gone { + return; } } - if connection_gone { - return; - } - } - Err(err) => { - if let Ok(_) = rx.recv_async().await { - report_tx.send_async(Err(err)).await.unwrap(); - } else { - return; + Err(err) => { + if let Ok(_) = rx.recv_async().await { + report_tx.send_async(Err(err)).await.unwrap(); + } else { + return; + } } } } From c5c1af46f614bc47839c9d16a81a56f8dbec09dc Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 14 Jan 2024 15:35:59 +0900 Subject: [PATCH 12/12] clippy --- src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 05ed0cdc..deee1e46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1100,7 +1100,7 @@ pub async fn work_with_qps_latency_correction( } Err(err) => { // Consume a task - if let Ok(_) = rx.recv_async().await { + if rx.recv_async().await.is_ok() { report_tx.send_async(Err(err)).await.unwrap(); } else { return; @@ -1336,7 +1336,7 @@ pub async fn work_until_with_qps( } Err(err) => { // Consume a task - if let Ok(_) = rx.recv_async().await { + if rx.recv_async().await.is_ok() { report_tx.send_async(Err(err)).await.unwrap(); } else { return; @@ -1482,7 +1482,7 @@ pub async fn work_until_with_qps_latency_correction( } Err(err) => { - if let Ok(_) = rx.recv_async().await { + if rx.recv_async().await.is_ok() { report_tx.send_async(Err(err)).await.unwrap(); } else { return;