From 4f01fe093c88281740d98505a548c3fc1bfcaf43 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 15:39:00 +0900 Subject: [PATCH 01/26] save --- src/client.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 2 +- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index d77c749f..fe5a7158 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1088,6 +1088,103 @@ pub async fn work( }; } +/// Run n tasks by m workers +pub async fn work2( + client: Arc, + report_tx: flume::Sender>, + n_tasks: usize, + n_connections: usize, + _n_http2_parallel: usize, +) { + use std::sync::atomic::{AtomicUsize, Ordering}; + let counter = Arc::new(AtomicUsize::new(0)); + + if client.is_work_http2() { + todo!() + } else { + let num_threads = 15; //num_cpus::get(); + + let handles = (0..num_threads) + .filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }) + .map(|num_connection| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let futures = (0..num_connection) + .map(|_| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + rt.spawn(async move { + let mut client_state = ClientStateHttp1::default(); + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + report_tx.send(res).unwrap(); + if is_cancel { + break; + } + } + }) + }) + .collect::>(); + rt.block_on(async { + for f in futures { + let _ = f.await; + } + }); + }) + }) + .collect::>(); + + for handle in handles { + let _ = tokio::task::block_in_place(move || handle.join()); + } + + /* + let futures = (0..n_connections) + .map(|_| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + tokio::spawn(async move { + let mut client_state = ClientStateHttp1::default(); + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + report_tx.send(res).unwrap(); + if is_cancel { + break; + } + } + }) + }) + .collect::>(); + for f in futures { + let _ = f.await; + } + */ + }; +} + /// n tasks by m workers limit to qps works in a second pub async fn work_with_qps( client: Arc, diff --git a/src/main.rs b/src/main.rs index 64d34efa..cc2a226b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -659,7 +659,7 @@ async fn run() -> anyhow::Result<()> { match opts.query_per_second { Some(0) | None => match opts.burst_duration { None => { - client::work( + client::work2( client.clone(), result_tx, opts.n_requests, From 6e03e8cb1c8bd836458a3f2df3232ff547b2e8f1 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 15:46:21 +0900 Subject: [PATCH 02/26] unstable --- .cargo/config.toml | 2 ++ src/client.rs | 7 ++----- 2 files changed, 4 insertions(+), 5 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..bff29e6e --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/src/client.rs b/src/client.rs index fe5a7158..8d03f7c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1123,17 +1123,14 @@ pub async fn work2( let counter = counter.clone(); let client = client.clone(); std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + let rt = tokio::runtime::LocalRuntime::new().unwrap(); let futures = (0..num_connection) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); - rt.spawn(async move { + rt.spawn_local(async move { let mut client_state = ClientStateHttp1::default(); while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { let res = client.work_http1(&mut client_state).await; From 190d7f358ac4ff21d272ee4fb924429c89807ce9 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 15:54:06 +0900 Subject: [PATCH 03/26] a --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 8d03f7c1..261282f2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1102,7 +1102,7 @@ pub async fn work2( if client.is_work_http2() { todo!() } else { - let num_threads = 15; //num_cpus::get(); + let num_threads = 16; //num_cpus::get(); let handles = (0..num_threads) .filter_map(|i| { From 6bea6cfd797d8ff54a27feea52431c5f56c2f1ab Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 16:32:46 +0900 Subject: [PATCH 04/26] phys --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 261282f2..a200ed3c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1102,7 +1102,7 @@ pub async fn work2( if client.is_work_http2() { todo!() } else { - let num_threads = 16; //num_cpus::get(); + let num_threads = num_cpus::get_physical(); let handles = (0..num_threads) .filter_map(|i| { From b2300fcfc76ed7cd66f0b69256d8004580c95dac Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 16:48:07 +0900 Subject: [PATCH 05/26] aa --- src/client.rs | 35 ++++++----------------------------- 1 file changed, 6 insertions(+), 29 deletions(-) diff --git a/src/client.rs b/src/client.rs index a200ed3c..26520042 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1135,7 +1135,7 @@ pub async fn work2( while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { let res = client.work_http1(&mut client_state).await; let is_cancel = is_cancel_error(&res); - report_tx.send(res).unwrap(); + report_tx.send_async(res).await.unwrap(); if is_cancel { break; } @@ -1151,34 +1151,11 @@ pub async fn work2( }) }) .collect::>(); - - for handle in handles { - let _ = tokio::task::block_in_place(move || handle.join()); - } - - /* - let futures = (0..n_connections) - .map(|_| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - tokio::spawn(async move { - let mut client_state = ClientStateHttp1::default(); - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let res = client.work_http1(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - report_tx.send(res).unwrap(); - if is_cancel { - break; - } - } - }) - }) - .collect::>(); - for f in futures { - let _ = f.await; - } - */ + tokio::task::block_in_place(|| { + for handle in handles { + let _ = handle.join(); + } + }); }; } From 0576d05aa03924d652d24c8b06c3c1b20f46ef35 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 17:45:45 +0900 Subject: [PATCH 06/26] fix warning --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 26520042..7ba2b0ee 100644 --- a/src/client.rs +++ b/src/client.rs @@ -980,7 +980,7 @@ pub async fn work_debug( } /// Run n tasks by m workers -pub async fn work( +pub async fn _work( client: Arc, report_tx: flume::Sender>, n_tasks: usize, From 40fe69bfff5982d8edbdfa1db7161999156ad5b9 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sat, 4 Jan 2025 19:22:31 +0900 Subject: [PATCH 07/26] no tokio-unstable --- .cargo/config.toml | 2 -- src/client.rs | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) delete mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml deleted file mode 100644 index bff29e6e..00000000 --- a/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] diff --git a/src/client.rs b/src/client.rs index 7ba2b0ee..862c4854 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1123,14 +1123,17 @@ pub async fn work2( let counter = counter.clone(); let client = client.clone(); std::thread::spawn(move || { - let rt = tokio::runtime::LocalRuntime::new().unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); let futures = (0..num_connection) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); - rt.spawn_local(async move { + rt.spawn(async move { let mut client_state = ClientStateHttp1::default(); while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { let res = client.work_http1(&mut client_state).await; From 8b09f9f522c2dd6459d34164eac1ea644af12b40 Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 5 Jan 2025 19:14:11 +0900 Subject: [PATCH 08/26] I don't know --- src/client.rs | 48 ++++++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/src/client.rs b/src/client.rs index 862c4854..0e86b3be 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1122,35 +1122,31 @@ pub async fn work2( let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let futures = (0..num_connection) - .map(|_| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - rt.spawn(async move { - let mut client_state = ClientStateHttp1::default(); - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let res = client.work_http1(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - report_tx.send_async(res).await.unwrap(); - if is_cancel { - break; - } + let local = tokio::task::LocalSet::new(); + + (0..num_connection).for_each(|_| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + local.spawn_local(async move { + let mut client_state = ClientStateHttp1::default(); + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + report_tx.send_async(res).await.unwrap(); + if is_cancel { + break; } - }) - }) - .collect::>(); - rt.block_on(async { - for f in futures { - let _ = f.await; - } + } + }); }); + rt.block_on(local); }) }) .collect::>(); From fce00a8e51d12518c0df048d81a56e813ec9579a Mon Sep 17 00:00:00 2001 From: hatoo Date: Sun, 5 Jan 2025 23:32:01 +0900 Subject: [PATCH 09/26] tweak --- src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 0e86b3be..9c1ed813 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1130,7 +1130,7 @@ pub async fn work2( std::thread::spawn(move || { let local = tokio::task::LocalSet::new(); - (0..num_connection).for_each(|_| { + for _ in 0..num_connection { let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); @@ -1145,7 +1145,7 @@ pub async fn work2( } } }); - }); + } rt.block_on(local); }) }) From 0619f1dd144d5a02c8c6c6aea0d0196fff44f2c4 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 6 Jan 2025 15:53:33 +0900 Subject: [PATCH 10/26] looks good --- src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9c1ed813..477b42f1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1134,7 +1134,7 @@ pub async fn work2( let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); - local.spawn_local(async move { + local.spawn_local(Box::pin(async move { let mut client_state = ClientStateHttp1::default(); while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { let res = client.work_http1(&mut client_state).await; @@ -1144,7 +1144,7 @@ pub async fn work2( break; } } - }); + })); } rt.block_on(local); }) From 8391f5e663f83648d9d7d92bd9a85ba77b8a9e10 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 6 Jan 2025 17:36:15 +0900 Subject: [PATCH 11/26] WIP --- src/client.rs | 7 +++++-- src/main.rs | 22 +++++++++++++++++++--- src/result_data.rs | 8 ++++++++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 477b42f1..f23ee688 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,6 +19,7 @@ use url::{ParseError, Url}; use crate::{ pcg64si::Pcg64Si, + result_data::ResultData, url_generator::{UrlGenerator, UrlGeneratorError}, ConnectToEntry, }; @@ -1091,7 +1092,7 @@ pub async fn _work( /// Run n tasks by m workers pub async fn work2( client: Arc, - report_tx: flume::Sender>, + report_tx: flume::Sender, n_tasks: usize, n_connections: usize, _n_http2_parallel: usize, @@ -1136,14 +1137,16 @@ pub async fn work2( let client = client.clone(); local.spawn_local(Box::pin(async move { let mut client_state = ClientStateHttp1::default(); + let mut result_data = ResultData::default(); while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { let res = client.work_http1(&mut client_state).await; let is_cancel = is_cancel_error(&res); - report_tx.send_async(res).await.unwrap(); + result_data.push(res); if is_cancel { break; } } + report_tx.send(result_data).unwrap(); })); } rt.block_on(local); diff --git a/src/main.rs b/src/main.rs index cc2a226b..94cc749b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use rand_regex::Regex; use ratatui::crossterm; use result_data::ResultData; use std::{ + convert::Infallible, env, fs::File, io::{BufRead, Read}, @@ -532,8 +533,8 @@ async fn run() -> anyhow::Result<()> { let mut all: ResultData = Default::default(); tokio::select! { _ = async { - while let Ok(report) = result_rx.recv_async().await { - all.push(report); + while let Ok(res) = result_rx.recv_async().await { + all.merge(res); } } => {} _ = tokio::signal::ctrl_c() => { @@ -542,10 +543,12 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_SUCCESS); } } - Ok(all) + Ok::<_, Infallible>(all) }) } else { + todo!() // Spawn monitor future which draws realtime tui + /* tokio::spawn( monitor::Monitor { print_mode, @@ -561,6 +564,7 @@ async fn run() -> anyhow::Result<()> { } .monitor(), ) + */ }; // When panics, reset terminal mode and exit immediately. std::panic::set_hook(Box::new(move |info| { @@ -575,11 +579,16 @@ async fn run() -> anyhow::Result<()> { })); if opts.debug { + todo!() + /* if let Err(e) = client::work_debug(client, result_tx).await { eprintln!("{e}"); } std::process::exit(libc::EXIT_SUCCESS); + */ } else if let Some(duration) = opts.duration.take() { + todo!() + /* match opts.query_per_second { Some(0) | None => match opts.burst_duration { None => { @@ -655,6 +664,7 @@ async fn run() -> anyhow::Result<()> { } } } + */ } else { match opts.query_per_second { Some(0) | None => match opts.burst_duration { @@ -669,6 +679,7 @@ async fn run() -> anyhow::Result<()> { .await } Some(burst_duration) => { + /* if opts.latency_correction { client::work_with_qps_latency_correction( client.clone(), @@ -696,9 +707,12 @@ async fn run() -> anyhow::Result<()> { ) .await } + */ + todo!() } }, Some(qps) => { + /* if opts.latency_correction { client::work_with_qps_latency_correction( client.clone(), @@ -720,6 +734,8 @@ async fn run() -> anyhow::Result<()> { ) .await } + */ + todo!() } } } diff --git a/src/result_data.rs b/src/result_data.rs index bd8c8a9d..c82b7918 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -66,6 +66,14 @@ impl ResultData { self.success.len() + self.error_distribution.values().sum::() } + pub fn merge(&mut self, other: ResultData) { + self.success.extend(other.success); + for (k, v) in other.error_distribution { + let count = self.error_distribution.entry(k).or_insert(0); + *count += v; + } + } + // An existence of this method doesn't prevent us to using hdrhistogram. // Because this is only called from `monitor` and `monitor` can collect own data. pub fn success(&self) -> &[RequestResult] { From 255609993b1d785d1748e92abea47e0180008e58 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 6 Jan 2025 20:08:28 +0900 Subject: [PATCH 12/26] support ctrl-c --- Cargo.lock | 1 + Cargo.toml | 1 + src/client.rs | 31 +++++++++++++++++++++++-------- src/main.rs | 21 ++++++--------------- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4bc55102..623ce654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2348,6 +2348,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tokio-vsock", "url", ] diff --git a/Cargo.toml b/Cargo.toml index a01836fb..f6ef7cb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ hyper-util = { version = "0.1.6", features = ["tokio"] } tokio-vsock = { version = "0.5.0", optional = true } rusqlite = { version = "0.32.0", features = ["bundled"] } num_cpus = "1.16.0" +tokio-util = "0.7.13" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/client.rs b/src/client.rs index f23ee688..f41c1139 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1105,6 +1105,7 @@ pub async fn work2( } else { let num_threads = num_cpus::get_physical(); + let token = tokio_util::sync::CancellationToken::new(); let handles = (0..num_threads) .filter_map(|i| { let num_connection = n_connections / num_threads @@ -1128,6 +1129,7 @@ pub async fn work2( .build() .unwrap(); + let token = token.clone(); std::thread::spawn(move || { let local = tokio::task::LocalSet::new(); @@ -1135,16 +1137,23 @@ pub async fn work2( let report_tx = report_tx.clone(); let counter = counter.clone(); let client = client.clone(); + let token = token.clone(); local.spawn_local(Box::pin(async move { - let mut client_state = ClientStateHttp1::default(); let mut result_data = ResultData::default(); - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let res = client.work_http1(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - result_data.push(res); - if is_cancel { - break; - } + + tokio::select! { + _ = token.cancelled() => {} + _ = async { + let mut client_state = ClientStateHttp1::default(); + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + result_data.push(res); + if is_cancel { + break; + } + } + } => {} } report_tx.send(result_data).unwrap(); })); @@ -1153,6 +1162,12 @@ pub async fn work2( }) }) .collect::>(); + + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + token.cancel(); + }); + tokio::task::block_in_place(|| { for handle in handles { let _ = handle.join(); diff --git a/src/main.rs b/src/main.rs index 94cc749b..2b5c4810 100644 --- a/src/main.rs +++ b/src/main.rs @@ -529,22 +529,13 @@ async fn run() -> anyhow::Result<()> { let no_tui = opts.no_tui || !std::io::stdout().is_tty() || opts.debug; let data_collector = if no_tui { // When `--no-tui` is enabled, just collect all data. - tokio::spawn(async move { + async { let mut all: ResultData = Default::default(); - tokio::select! { - _ = async { - while let Ok(res) = result_rx.recv_async().await { - all.merge(res); - } - } => {} - _ = tokio::signal::ctrl_c() => { - // User pressed ctrl-c. - let _ = printer::print_result(&mut std::io::stdout(), print_mode, start, &all, start.elapsed(), opts.disable_color, opts.stats_success_breakdown); - std::process::exit(libc::EXIT_SUCCESS); - } + while let Ok(res) = result_rx.recv_async().await { + all.merge(res); } - Ok::<_, Infallible>(all) - }) + all + } } else { todo!() // Spawn monitor future which draws realtime tui @@ -742,7 +733,7 @@ async fn run() -> anyhow::Result<()> { let duration = start.elapsed(); - let res: ResultData = data_collector.await??; + let res: ResultData = data_collector.await; printer::print_result( &mut std::io::stdout(), From 21fb7af0171616172b8818546f702ba0a1afc754 Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 17:16:15 +0900 Subject: [PATCH 13/26] work_until2 --- src/client.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 11 ++++-- 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/src/client.rs b/src/client.rs index f41c1139..a01e1baf 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1610,6 +1610,107 @@ pub async fn work_until( }; } +/// Run until dead_line by n workers +pub async fn work_until2( + client: Arc, + report_tx: flume::Sender, + dead_line: std::time::Instant, + n_connections: usize, + n_http2_parallel: usize, + wait_ongoing_requests_after_deadline: bool, +) { + if client.is_work_http2() { + todo!() + } else { + let num_threads = num_cpus::get_physical(); + + let is_end = Arc::new(AtomicBool::new(false)); + let token = tokio_util::sync::CancellationToken::new(); + + let handles = (0..num_threads) + .filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }) + .map(|num_connection| { + let report_tx = report_tx.clone(); + let is_end = is_end.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let token = token.clone(); + std::thread::spawn(move || { + let local = tokio::task::LocalSet::new(); + + for _ in 0..num_connection { + let report_tx = report_tx.clone(); + let is_end = is_end.clone(); + let client = client.clone(); + let token = token.clone(); + local.spawn_local(Box::pin(async move { + let mut result_data = ResultData::default(); + + let work = async { + let mut client_state = ClientStateHttp1::default(); + loop { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + result_data.push(res); + if is_cancel || is_end.load(Relaxed) { + break; + } + } + }; + + tokio::select! { + _ = work => { + } + _ = token.cancelled() => { + result_data.push(Err(ClientError::Deadline)); + } + } + report_tx.send(result_data).unwrap(); + })); + } + rt.block_on(local); + }) + }) + .collect::>(); + + tokio::select! { + _ = tokio::time::sleep_until(dead_line.into()) => { + } + _ = tokio::signal::ctrl_c() => { + } + } + + is_end.store(true, Relaxed); + + if wait_ongoing_requests_after_deadline { + for handle in handles { + let _ = handle.join(); + } + } else { + token.cancel(); + for handle in handles { + let _ = handle.join(); + } + } + }; +} + /// Run until dead_line by n workers limit to qps works in a second #[allow(clippy::too_many_arguments)] pub async fn work_until_with_qps( diff --git a/src/main.rs b/src/main.rs index 2b5c4810..a8696361 100644 --- a/src/main.rs +++ b/src/main.rs @@ -578,12 +578,10 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_SUCCESS); */ } else if let Some(duration) = opts.duration.take() { - todo!() - /* match opts.query_per_second { Some(0) | None => match opts.burst_duration { None => { - client::work_until( + client::work_until2( client.clone(), result_tx, start + duration.into(), @@ -593,6 +591,8 @@ async fn run() -> anyhow::Result<()> { ) .await } + _ => todo!(), + /* Some(burst_duration) => { if opts.latency_correction { client::work_until_with_qps_latency_correction( @@ -626,7 +626,10 @@ async fn run() -> anyhow::Result<()> { .await } } + */ }, + _ => todo!(), + /* Some(qps) => { if opts.latency_correction { client::work_until_with_qps_latency_correction( @@ -654,8 +657,8 @@ async fn run() -> anyhow::Result<()> { .await } } + */ } - */ } else { match opts.query_per_second { Some(0) | None => match opts.burst_duration { From 1a2d2c1f44747495fa55406b554e6ab1b2849947 Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 18:17:29 +0900 Subject: [PATCH 14/26] whatever --- src/client.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/client.rs b/src/client.rs index a01e1baf..5858270d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1698,15 +1698,11 @@ pub async fn work_until2( is_end.store(true, Relaxed); - if wait_ongoing_requests_after_deadline { - for handle in handles { - let _ = handle.join(); - } - } else { + if !wait_ongoing_requests_after_deadline { token.cancel(); - for handle in handles { - let _ = handle.join(); - } + } + for handle in handles { + let _ = handle.join(); } }; } From 501922b711232e6e6c8c37f49361e25cd1b75b3a Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 19:40:24 +0900 Subject: [PATCH 15/26] wip glue --- src/client.rs | 4 +- src/main.rs | 349 ++++++++++++++++++++++++++++---------------------- 2 files changed, 201 insertions(+), 152 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5858270d..ccd97e0f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -981,7 +981,7 @@ pub async fn work_debug( } /// Run n tasks by m workers -pub async fn _work( +pub async fn work( client: Arc, report_tx: flume::Sender>, n_tasks: usize, @@ -1616,7 +1616,7 @@ pub async fn work_until2( report_tx: flume::Sender, dead_line: std::time::Instant, n_connections: usize, - n_http2_parallel: usize, + _n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, ) { if client.is_work_http2() { diff --git a/src/main.rs b/src/main.rs index a8696361..14fe8bc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,6 @@ use rand_regex::Regex; use ratatui::crossterm; use result_data::ResultData; use std::{ - convert::Infallible, env, fs::File, io::{BufRead, Read}, @@ -475,8 +474,6 @@ async fn run() -> anyhow::Result<()> { PrintMode::Text }; - let (result_tx, result_rx) = flume::unbounded(); - let ip_strategy = match (opts.ipv4, opts.ipv6) { (false, false) => Default::default(), (true, false) => hickory_resolver::config::LookupIpStrategy::Ipv4Only, @@ -527,36 +524,7 @@ async fn run() -> anyhow::Result<()> { let start = std::time::Instant::now(); let no_tui = opts.no_tui || !std::io::stdout().is_tty() || opts.debug; - let data_collector = if no_tui { - // When `--no-tui` is enabled, just collect all data. - async { - let mut all: ResultData = Default::default(); - while let Ok(res) = result_rx.recv_async().await { - all.merge(res); - } - all - } - } else { - todo!() - // Spawn monitor future which draws realtime tui - /* - tokio::spawn( - monitor::Monitor { - print_mode, - end_line: opts - .duration - .map(|d| monitor::EndLine::Duration(d.into())) - .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), - report_receiver: result_rx, - start, - fps: opts.fps, - disable_color: opts.disable_color, - stats_success_breakdown: opts.stats_success_breakdown, - } - .monitor(), - ) - */ - }; + // When panics, reset terminal mode and exit immediately. std::panic::set_hook(Box::new(move |info| { if !no_tui { @@ -569,39 +537,160 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_FAILURE); })); - if opts.debug { - todo!() - /* - if let Err(e) = client::work_debug(client, result_tx).await { - eprintln!("{e}"); + let res = if no_tui + && !opts.debug + && opts.duration.is_some() + && opts.query_per_second.is_none() + && opts.burst_duration.is_none() + && client.http_version == http::Version::HTTP_11 + { + // Use optimized work_until2 for duration only mode. + + let duration = opts.duration.unwrap(); + let (result_tx, result_rx) = flume::unbounded(); + + client::work_until2( + client.clone(), + result_tx, + start + duration.into(), + opts.n_connections, + opts.n_http2_parallel, + opts.wait_ongoing_requests_after_deadline, + ) + .await; + + let mut res = ResultData::default(); + + while let Ok(r) = result_rx.recv() { + res.merge(r); + } + + res + } else if no_tui + && !opts.debug + && opts.duration.is_none() + && opts.query_per_second.is_none() + && opts.burst_duration.is_none() + && client.http_version == http::Version::HTTP_11 + { + // Use optimized work_until2 for duration only mode. + + let (result_tx, result_rx) = flume::unbounded(); + + client::work2( + client.clone(), + result_tx, + opts.n_requests, + opts.n_connections, + opts.n_http2_parallel, + ) + .await; + + let mut res = ResultData::default(); + + while let Ok(r) = result_rx.recv() { + res.merge(r); } - std::process::exit(libc::EXIT_SUCCESS); - */ - } else if let Some(duration) = opts.duration.take() { - match opts.query_per_second { - Some(0) | None => match opts.burst_duration { - None => { - client::work_until2( - client.clone(), - result_tx, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await + + res + } else { + let (result_tx, result_rx) = flume::unbounded(); + let data_collector = if no_tui { + // When `--no-tui` is enabled, just collect all data. + tokio::spawn(async move { + let mut all: ResultData = Default::default(); + tokio::select! { + _ = async { + while let Ok(report) = result_rx.recv_async().await { + all.push(report); + } + } => {} + _ = tokio::signal::ctrl_c() => { + // User pressed ctrl-c. + let _ = printer::print_result(&mut std::io::stdout(), print_mode, start, &all, start.elapsed(), opts.disable_color, opts.stats_success_breakdown); + std::process::exit(libc::EXIT_SUCCESS); + } + } + Ok(all) + }) + } else { + // Spawn monitor future which draws realtime tui + tokio::spawn( + monitor::Monitor { + print_mode, + end_line: opts + .duration + .map(|d| monitor::EndLine::Duration(d.into())) + .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), + report_receiver: result_rx, + start, + fps: opts.fps, + disable_color: opts.disable_color, + stats_success_breakdown: opts.stats_success_breakdown, } - _ => todo!(), - /* - Some(burst_duration) => { + .monitor(), + ) + }; + + if opts.debug { + if let Err(e) = client::work_debug(client, result_tx).await { + eprintln!("{e}"); + } + std::process::exit(libc::EXIT_SUCCESS) + } else if let Some(duration) = opts.duration.take() { + match opts.query_per_second { + Some(0) | None => match opts.burst_duration { + None => { + client::work_until( + client.clone(), + result_tx, + start + duration.into(), + opts.n_connections, + opts.n_http2_parallel, + opts.wait_ongoing_requests_after_deadline, + ) + .await + } + Some(burst_duration) => { + if opts.latency_correction { + client::work_until_with_qps_latency_correction( + client.clone(), + result_tx, + client::QueryLimit::Burst( + burst_duration.into(), + opts.burst_requests.unwrap_or(1), + ), + start, + start + duration.into(), + opts.n_connections, + opts.n_http2_parallel, + opts.wait_ongoing_requests_after_deadline, + ) + .await + } else { + client::work_until_with_qps( + client.clone(), + result_tx, + client::QueryLimit::Burst( + burst_duration.into(), + opts.burst_requests.unwrap_or(1), + ), + start, + start + duration.into(), + opts.n_connections, + opts.n_http2_parallel, + opts.wait_ongoing_requests_after_deadline, + ) + .await + } + } + }, + Some(qps) => { if opts.latency_correction { client::work_until_with_qps_latency_correction( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), + client::QueryLimit::Qps(qps), start, start + duration.into(), opts.n_connections, @@ -613,10 +702,7 @@ async fn run() -> anyhow::Result<()> { client::work_until_with_qps( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), + client::QueryLimit::Qps(qps), start, start + duration.into(), opts.n_connections, @@ -626,62 +712,56 @@ async fn run() -> anyhow::Result<()> { .await } } - */ - }, - _ => todo!(), - /* - Some(qps) => { - if opts.latency_correction { - client::work_until_with_qps_latency_correction( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await - } else { - client::work_until_with_qps( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await - } } - */ - } - } else { - match opts.query_per_second { - Some(0) | None => match opts.burst_duration { - None => { - client::work2( - client.clone(), - result_tx, - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, - ) - .await - } - Some(burst_duration) => { - /* + } else { + match opts.query_per_second { + Some(0) | None => match opts.burst_duration { + None => { + client::work( + client.clone(), + result_tx, + opts.n_requests, + opts.n_connections, + opts.n_http2_parallel, + ) + .await + } + Some(burst_duration) => { + if opts.latency_correction { + client::work_with_qps_latency_correction( + client.clone(), + result_tx, + client::QueryLimit::Burst( + burst_duration.into(), + opts.burst_requests.unwrap_or(1), + ), + opts.n_requests, + opts.n_connections, + opts.n_http2_parallel, + ) + .await + } else { + client::work_with_qps( + client.clone(), + result_tx, + client::QueryLimit::Burst( + burst_duration.into(), + opts.burst_requests.unwrap_or(1), + ), + opts.n_requests, + opts.n_connections, + opts.n_http2_parallel, + ) + .await + } + } + }, + Some(qps) => { if opts.latency_correction { client::work_with_qps_latency_correction( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), + client::QueryLimit::Qps(qps), opts.n_requests, opts.n_connections, opts.n_http2_parallel, @@ -691,52 +771,21 @@ async fn run() -> anyhow::Result<()> { client::work_with_qps( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), + client::QueryLimit::Qps(qps), opts.n_requests, opts.n_connections, opts.n_http2_parallel, ) .await } - */ - todo!() - } - }, - Some(qps) => { - /* - if opts.latency_correction { - client::work_with_qps_latency_correction( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, - ) - .await - } else { - client::work_with_qps( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, - ) - .await } - */ - todo!() } } - } - let duration = start.elapsed(); + data_collector.await?? + }; - let res: ResultData = data_collector.await; + let duration = start.elapsed(); printer::print_result( &mut std::io::stdout(), From e69455e9c41e5f8f63d75eb970b26ca913396e9a Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 22:32:02 +0900 Subject: [PATCH 16/26] a --- src/client.rs | 5 +---- src/main.rs | 16 +++++++--------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/client.rs b/src/client.rs index ccd97e0f..3d245318 100644 --- a/src/client.rs +++ b/src/client.rs @@ -949,10 +949,7 @@ fn set_start_latency_correction( } /// Run n tasks by m workers -pub async fn work_debug( - client: Arc, - _report_tx: flume::Sender>, -) -> Result<(), ClientError> { +pub async fn work_debug(client: Arc) -> Result<(), ClientError> { let mut rng = StdRng::from_entropy(); let url = client.url_generator.generate(&mut rng)?; println!("URL: {}", url); diff --git a/src/main.rs b/src/main.rs index 14fe8bc7..707eb6e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -537,8 +537,12 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_FAILURE); })); - let res = if no_tui - && !opts.debug + let res = if opts.debug { + if let Err(e) = client::work_debug(client).await { + eprintln!("{e}"); + } + std::process::exit(libc::EXIT_SUCCESS) + } else if no_tui && opts.duration.is_some() && opts.query_per_second.is_none() && opts.burst_duration.is_none() @@ -567,7 +571,6 @@ async fn run() -> anyhow::Result<()> { res } else if no_tui - && !opts.debug && opts.duration.is_none() && opts.query_per_second.is_none() && opts.burst_duration.is_none() @@ -632,12 +635,7 @@ async fn run() -> anyhow::Result<()> { ) }; - if opts.debug { - if let Err(e) = client::work_debug(client, result_tx).await { - eprintln!("{e}"); - } - std::process::exit(libc::EXIT_SUCCESS) - } else if let Some(duration) = opts.duration.take() { + if let Some(duration) = opts.duration.take() { match opts.query_per_second { Some(0) | None => match opts.burst_duration { None => { From 149409417508e5785e3ef7b0cbb2656478ca35a8 Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 22:34:26 +0900 Subject: [PATCH 17/26] a --- src/client.rs | 2 +- src/main.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 3d245318..07dbcc67 100644 --- a/src/client.rs +++ b/src/client.rs @@ -324,7 +324,7 @@ impl Client { self.proxy_http_version == http::Version::HTTP_2 } - fn is_work_http2(&self) -> bool { + pub fn is_work_http2(&self) -> bool { if self.proxy_url.is_some() { let url = self.url_generator.generate(&mut thread_rng()).unwrap(); if url.scheme() == "https" { diff --git a/src/main.rs b/src/main.rs index 707eb6e2..03ba3125 100644 --- a/src/main.rs +++ b/src/main.rs @@ -546,7 +546,7 @@ async fn run() -> anyhow::Result<()> { && opts.duration.is_some() && opts.query_per_second.is_none() && opts.burst_duration.is_none() - && client.http_version == http::Version::HTTP_11 + && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. @@ -574,7 +574,7 @@ async fn run() -> anyhow::Result<()> { && opts.duration.is_none() && opts.query_per_second.is_none() && opts.burst_duration.is_none() - && client.http_version == http::Version::HTTP_11 + && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. From 67f8b8357cfb90977231967ed8dfb037f23d8aae Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 22:48:22 +0900 Subject: [PATCH 18/26] delay --- src/main.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 03ba3125..e2a62afc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use std::{ fs::File, io::{BufRead, Read}, path::Path, + pin::Pin, str::FromStr, sync::Arc, }; @@ -563,13 +564,13 @@ async fn run() -> anyhow::Result<()> { ) .await; - let mut res = ResultData::default(); - - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - - res + Box::pin(async move { + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); + } + res + }) as Pin>> } else if no_tui && opts.duration.is_none() && opts.query_per_second.is_none() @@ -589,13 +590,13 @@ async fn run() -> anyhow::Result<()> { ) .await; - let mut res = ResultData::default(); - - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - - res + Box::pin(async move { + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); + } + res + }) as Pin>> } else { let (result_tx, result_rx) = flume::unbounded(); let data_collector = if no_tui { @@ -780,10 +781,12 @@ async fn run() -> anyhow::Result<()> { } } - data_collector.await?? + Box::pin(async move { data_collector.await.unwrap().unwrap() }) + as Pin>> }; let duration = start.elapsed(); + let res = res.await; printer::print_result( &mut std::io::stdout(), From 6d2e426678d4339dd73108e97ff50d239f4ee0f3 Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 22:52:21 +0900 Subject: [PATCH 19/26] wip --- src/main.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index e2a62afc..3cb8d754 100644 --- a/src/main.rs +++ b/src/main.rs @@ -601,7 +601,7 @@ async fn run() -> anyhow::Result<()> { let (result_tx, result_rx) = flume::unbounded(); let data_collector = if no_tui { // When `--no-tui` is enabled, just collect all data. - tokio::spawn(async move { + let join_handle = tokio::spawn(async move { let mut all: ResultData = Default::default(); tokio::select! { _ = async { @@ -615,11 +615,14 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_SUCCESS); } } - Ok(all) - }) + all + }); + + Box::pin(async { join_handle.await.unwrap() }) + as Pin>> } else { // Spawn monitor future which draws realtime tui - tokio::spawn( + let join_handle = tokio::spawn( monitor::Monitor { print_mode, end_line: opts @@ -633,7 +636,10 @@ async fn run() -> anyhow::Result<()> { stats_success_breakdown: opts.stats_success_breakdown, } .monitor(), - ) + ); + + Box::pin(async { join_handle.await.unwrap().unwrap() }) + as Pin>> }; if let Some(duration) = opts.duration.take() { @@ -781,8 +787,7 @@ async fn run() -> anyhow::Result<()> { } } - Box::pin(async move { data_collector.await.unwrap().unwrap() }) - as Pin>> + data_collector }; let duration = start.elapsed(); From 0be8611f82babb0a627e1c46ea79aabd1dbfd0ff Mon Sep 17 00:00:00 2001 From: hatoo Date: Tue, 7 Jan 2025 22:56:44 +0900 Subject: [PATCH 20/26] OK --- src/main.rs | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3cb8d754..b2bb861d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -601,25 +601,33 @@ async fn run() -> anyhow::Result<()> { let (result_tx, result_rx) = flume::unbounded(); let data_collector = if no_tui { // When `--no-tui` is enabled, just collect all data. - let join_handle = tokio::spawn(async move { + + let result_rx_ctrl_c = result_rx.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; let mut all: ResultData = Default::default(); - tokio::select! { - _ = async { - while let Ok(report) = result_rx.recv_async().await { - all.push(report); - } - } => {} - _ = tokio::signal::ctrl_c() => { - // User pressed ctrl-c. - let _ = printer::print_result(&mut std::io::stdout(), print_mode, start, &all, start.elapsed(), opts.disable_color, opts.stats_success_breakdown); - std::process::exit(libc::EXIT_SUCCESS); - } + for report in result_rx_ctrl_c.drain() { + all.push(report); } - all + let _ = printer::print_result( + &mut std::io::stdout(), + print_mode, + start, + &all, + start.elapsed(), + opts.disable_color, + opts.stats_success_breakdown, + ); + std::process::exit(libc::EXIT_SUCCESS); }); - Box::pin(async { join_handle.await.unwrap() }) - as Pin>> + Box::pin(async move { + let mut all = ResultData::default(); + while let Ok(res) = result_rx.recv() { + all.push(res); + } + all + }) as Pin>> } else { // Spawn monitor future which draws realtime tui let join_handle = tokio::spawn( From f0cb277fc1aa5db56d54bbb62e93e13ad71502ef Mon Sep 17 00:00:00 2001 From: hatoo Date: Wed, 8 Jan 2025 20:08:09 +0900 Subject: [PATCH 21/26] work2_http2 --- src/client.rs | 154 ++++++++++++++++++++++++++++++++++++++++++++------ src/main.rs | 1 - 2 files changed, 136 insertions(+), 19 deletions(-) diff --git a/src/client.rs b/src/client.rs index 07dbcc67..f2d3f117 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1092,31 +1092,149 @@ pub async fn work2( report_tx: flume::Sender, n_tasks: usize, n_connections: usize, - _n_http2_parallel: usize, + n_http2_parallel: usize, ) { use std::sync::atomic::{AtomicUsize, Ordering}; let counter = Arc::new(AtomicUsize::new(0)); + let num_threads = num_cpus::get_physical(); + let connections = (0..num_threads).filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }); + let token = tokio_util::sync::CancellationToken::new(); if client.is_work_http2() { - todo!() - } else { - let num_threads = num_cpus::get_physical(); + let handles = connections + .map(|num_connections| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let token = token.clone(); - let token = tokio_util::sync::CancellationToken::new(); - let handles = (0..num_threads) - .filter_map(|i| { - let num_connection = n_connections / num_threads - + (if (n_connections % num_threads) > i { - 1 - } else { - 0 - }); - if num_connection > 0 { - Some(num_connection) - } else { - None - } + std::thread::spawn(move || { + let client = client.clone(); + let local = tokio::task::LocalSet::new(); + for _ in 0..num_connections { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let token = token.clone(); + local.spawn_local(Box::pin(async move { + loop { + let client = client.clone(); + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let mut client_state = client_state.clone(); + let counter = counter.clone(); + let client = client.clone(); + let report_tx = report_tx.clone(); + let token = token.clone(); + tokio::task::spawn_local(async move { + let mut result_data = ResultData::default(); + + let work = async { + while counter + .fetch_add(1, Ordering::Relaxed) + < n_tasks + { + let mut res = client + .work_http2(&mut client_state) + .await; + let is_cancel = is_cancel_error(&res); + let is_reconnect = is_hyper_error(&res); + set_connection_time( + &mut res, + connection_time, + ); + + result_data.push(res); + + if is_cancel || is_reconnect { + return is_cancel; + } + } + true + }; + + let is_cancel = tokio::select! { + is_cancel = work => { + is_cancel + } + _ = token.cancelled() => { + true + } + }; + + report_tx.send(result_data).unwrap(); + is_cancel + }) + }) + .collect::>(); + + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + + if connection_gone { + return; + } + } + Err(err) => { + if counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let mut result_data = ResultData::default(); + result_data.push(Err(err)); + report_tx.send(result_data).unwrap(); + } else { + return; + } + } + } + } + })); + } + + rt.block_on(local); + }) }) + .collect::>(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + token.cancel(); + }); + + tokio::task::block_in_place(|| { + for handle in handles { + let _ = handle.join(); + } + }); + } else { + let handles = connections .map(|num_connection| { let report_tx = report_tx.clone(); let counter = counter.clone(); diff --git a/src/main.rs b/src/main.rs index b2bb861d..3b7ef0cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -575,7 +575,6 @@ async fn run() -> anyhow::Result<()> { && opts.duration.is_none() && opts.query_per_second.is_none() && opts.burst_duration.is_none() - && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. From 8e35829cb67a7654520b6f4d9554f5656c15c345 Mon Sep 17 00:00:00 2001 From: hatoo Date: Wed, 8 Jan 2025 20:32:32 +0900 Subject: [PATCH 22/26] reorder --- src/main.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3b7ef0cb..98485334 100644 --- a/src/main.rs +++ b/src/main.rs @@ -544,23 +544,20 @@ async fn run() -> anyhow::Result<()> { } std::process::exit(libc::EXIT_SUCCESS) } else if no_tui - && opts.duration.is_some() + && opts.duration.is_none() && opts.query_per_second.is_none() && opts.burst_duration.is_none() - && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. - let duration = opts.duration.unwrap(); let (result_tx, result_rx) = flume::unbounded(); - client::work_until2( + client::work2( client.clone(), result_tx, - start + duration.into(), + opts.n_requests, opts.n_connections, opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, ) .await; @@ -572,20 +569,23 @@ async fn run() -> anyhow::Result<()> { res }) as Pin>> } else if no_tui - && opts.duration.is_none() + && opts.duration.is_some() && opts.query_per_second.is_none() && opts.burst_duration.is_none() + && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. + let duration = opts.duration.unwrap(); let (result_tx, result_rx) = flume::unbounded(); - client::work2( + client::work_until2( client.clone(), result_tx, - opts.n_requests, + start + duration.into(), opts.n_connections, opts.n_http2_parallel, + opts.wait_ongoing_requests_after_deadline, ) .await; From 651c5397b789647d3c7a9ffd3b4d6ac1677b616d Mon Sep 17 00:00:00 2001 From: hatoo Date: Wed, 8 Jan 2025 21:29:03 +0900 Subject: [PATCH 23/26] work_until2_http2 --- src/client.rs | 145 +++++++++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 1 - 2 files changed, 139 insertions(+), 7 deletions(-) diff --git a/src/client.rs b/src/client.rs index f2d3f117..4373fd80 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1731,17 +1731,150 @@ pub async fn work_until2( report_tx: flume::Sender, dead_line: std::time::Instant, n_connections: usize, - _n_http2_parallel: usize, + n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, ) { + use std::sync::atomic::{AtomicBool, Ordering}; + let num_threads = num_cpus::get_physical(); + + let is_end = Arc::new(AtomicBool::new(false)); + let connections = (0..num_threads).filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }); + let token = tokio_util::sync::CancellationToken::new(); if client.is_work_http2() { - todo!() - } else { - let num_threads = num_cpus::get_physical(); + let handles = connections + .map(|num_connections| { + let report_tx = report_tx.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let token = token.clone(); + let is_end = is_end.clone(); - let is_end = Arc::new(AtomicBool::new(false)); - let token = tokio_util::sync::CancellationToken::new(); + std::thread::spawn(move || { + let client = client.clone(); + let local = tokio::task::LocalSet::new(); + for _ in 0..num_connections { + let report_tx = report_tx.clone(); + let client = client.clone(); + let token = token.clone(); + let is_end = is_end.clone(); + local.spawn_local(Box::pin(async move { + loop { + let client = client.clone(); + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let mut client_state = client_state.clone(); + let client = client.clone(); + let report_tx = report_tx.clone(); + let token = token.clone(); + let is_end = is_end.clone(); + tokio::task::spawn_local(async move { + let mut result_data = ResultData::default(); + + let work = async { + loop { + let mut res = client + .work_http2(&mut client_state) + .await; + let is_cancel = is_cancel_error(&res) || is_end.load(Ordering::Relaxed); + let is_reconnect = is_hyper_error(&res); + set_connection_time( + &mut res, + connection_time, + ); + + result_data.push(res); + + if is_cancel || is_reconnect { + return is_cancel; + } + } + }; + + let is_cancel = tokio::select! { + is_cancel = work => { + is_cancel + } + _ = token.cancelled() => { + result_data.push(Err(ClientError::Deadline)); + true + } + }; + + report_tx.send(result_data).unwrap(); + is_cancel + }) + }) + .collect::>(); + + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + if connection_gone { + return; + } + } + Err(err) => { + let mut result_data = ResultData::default(); + result_data.push(Err(err)); + report_tx.send(result_data).unwrap(); + if is_end.load(Ordering::Relaxed) { + return; + } + } + } + } + })); + } + + rt.block_on(local); + }) + }) + .collect::>(); + tokio::select! { + _ = tokio::time::sleep_until(dead_line.into()) => { + } + _ = tokio::signal::ctrl_c() => { + } + } + + is_end.store(true, Relaxed); + + if !wait_ongoing_requests_after_deadline { + token.cancel(); + } + for handle in handles { + let _ = handle.join(); + } + } else { let handles = (0..num_threads) .filter_map(|i| { let num_connection = n_connections / num_threads diff --git a/src/main.rs b/src/main.rs index 98485334..8bc20a51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -572,7 +572,6 @@ async fn run() -> anyhow::Result<()> { && opts.duration.is_some() && opts.query_per_second.is_none() && opts.burst_duration.is_none() - && !client.is_work_http2() { // Use optimized work_until2 for duration only mode. From 71d246dd82317a17a7a9cdd7b38642fc8e3ec9b3 Mon Sep 17 00:00:00 2001 From: hatoo Date: Thu, 9 Jan 2025 16:03:47 +0900 Subject: [PATCH 24/26] mod --- src/client.rs | 972 +++++++++++++++++++++++++------------------------- src/main.rs | 4 +- 2 files changed, 496 insertions(+), 480 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4373fd80..c26158ff 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,7 +19,6 @@ use url::{ParseError, Url}; use crate::{ pcg64si::Pcg64Si, - result_data::ResultData, url_generator::{UrlGenerator, UrlGeneratorError}, ConnectToEntry, }; @@ -1086,211 +1085,6 @@ pub async fn work( }; } -/// Run n tasks by m workers -pub async fn work2( - client: Arc, - report_tx: flume::Sender, - n_tasks: usize, - n_connections: usize, - n_http2_parallel: usize, -) { - use std::sync::atomic::{AtomicUsize, Ordering}; - let counter = Arc::new(AtomicUsize::new(0)); - let num_threads = num_cpus::get_physical(); - let connections = (0..num_threads).filter_map(|i| { - let num_connection = n_connections / num_threads - + (if (n_connections % num_threads) > i { - 1 - } else { - 0 - }); - if num_connection > 0 { - Some(num_connection) - } else { - None - } - }); - let token = tokio_util::sync::CancellationToken::new(); - - if client.is_work_http2() { - let handles = connections - .map(|num_connections| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let token = token.clone(); - - std::thread::spawn(move || { - let client = client.clone(); - let local = tokio::task::LocalSet::new(); - for _ in 0..num_connections { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - let token = token.clone(); - local.spawn_local(Box::pin(async move { - loop { - let client = client.clone(); - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let mut client_state = client_state.clone(); - let counter = counter.clone(); - let client = client.clone(); - let report_tx = report_tx.clone(); - let token = token.clone(); - tokio::task::spawn_local(async move { - let mut result_data = ResultData::default(); - - let work = async { - while counter - .fetch_add(1, Ordering::Relaxed) - < n_tasks - { - let mut res = client - .work_http2(&mut client_state) - .await; - let is_cancel = is_cancel_error(&res); - let is_reconnect = is_hyper_error(&res); - set_connection_time( - &mut res, - connection_time, - ); - - result_data.push(res); - - if is_cancel || is_reconnect { - return is_cancel; - } - } - true - }; - - let is_cancel = tokio::select! { - is_cancel = work => { - is_cancel - } - _ = token.cancelled() => { - true - } - }; - - report_tx.send(result_data).unwrap(); - is_cancel - }) - }) - .collect::>(); - - let mut connection_gone = false; - for f in futures { - match f.await { - Ok(true) => { - // All works done - connection_gone = true; - } - Err(_) => { - // Unexpected - connection_gone = true; - } - _ => {} - } - } - - if connection_gone { - return; - } - } - Err(err) => { - if counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let mut result_data = ResultData::default(); - result_data.push(Err(err)); - report_tx.send(result_data).unwrap(); - } else { - return; - } - } - } - } - })); - } - - rt.block_on(local); - }) - }) - .collect::>(); - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - token.cancel(); - }); - - tokio::task::block_in_place(|| { - for handle in handles { - let _ = handle.join(); - } - }); - } else { - let handles = connections - .map(|num_connection| { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let token = token.clone(); - std::thread::spawn(move || { - let local = tokio::task::LocalSet::new(); - - for _ in 0..num_connection { - let report_tx = report_tx.clone(); - let counter = counter.clone(); - let client = client.clone(); - let token = token.clone(); - local.spawn_local(Box::pin(async move { - let mut result_data = ResultData::default(); - - tokio::select! { - _ = token.cancelled() => {} - _ = async { - let mut client_state = ClientStateHttp1::default(); - while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { - let res = client.work_http1(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - result_data.push(res); - if is_cancel { - break; - } - } - } => {} - } - report_tx.send(result_data).unwrap(); - })); - } - rt.block_on(local); - }) - }) - .collect::>(); - - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - token.cancel(); - }); - - tokio::task::block_in_place(|| { - for handle in handles { - let _ = handle.join(); - } - }); - }; -} - /// n tasks by m workers limit to qps works in a second pub async fn work_with_qps( client: Arc, @@ -1725,288 +1519,58 @@ pub async fn work_until( }; } -/// Run until dead_line by n workers -pub async fn work_until2( +/// Run until dead_line by n workers limit to qps works in a second +#[allow(clippy::too_many_arguments)] +pub async fn work_until_with_qps( client: Arc, - report_tx: flume::Sender, + report_tx: flume::Sender>, + query_limit: QueryLimit, + start: std::time::Instant, dead_line: std::time::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, ) { - use std::sync::atomic::{AtomicBool, Ordering}; - let num_threads = num_cpus::get_physical(); - - let is_end = Arc::new(AtomicBool::new(false)); - let connections = (0..num_threads).filter_map(|i| { - let num_connection = n_connections / num_threads - + (if (n_connections % num_threads) > i { - 1 - } else { - 0 + let rx = match query_limit { + QueryLimit::Qps(qps) => { + let (tx, rx) = flume::unbounded(); + tokio::spawn(async move { + for i in 0.. { + if std::time::Instant::now() > dead_line { + break; + } + tokio::time::sleep_until( + (start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(), + ) + .await; + let _ = tx.send(()); + } + // tx gone }); - if num_connection > 0 { - Some(num_connection) - } else { - None + rx } - }); - let token = tokio_util::sync::CancellationToken::new(); - if client.is_work_http2() { - let handles = connections - .map(|num_connections| { - let report_tx = report_tx.clone(); - let client = client.clone(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let token = token.clone(); - let is_end = is_end.clone(); - - std::thread::spawn(move || { - let client = client.clone(); - let local = tokio::task::LocalSet::new(); - for _ in 0..num_connections { - let report_tx = report_tx.clone(); - let client = client.clone(); - let token = token.clone(); - let is_end = is_end.clone(); - local.spawn_local(Box::pin(async move { - loop { - let client = client.clone(); - match setup_http2(&client).await { - Ok((connection_time, client_state)) => { - let futures = (0..n_http2_parallel) - .map(|_| { - let mut client_state = client_state.clone(); - let client = client.clone(); - let report_tx = report_tx.clone(); - let token = token.clone(); - let is_end = is_end.clone(); - tokio::task::spawn_local(async move { - let mut result_data = ResultData::default(); + QueryLimit::Burst(duration, rate) => { + let (tx, rx) = flume::unbounded(); + tokio::spawn(async move { + // Handle via rate till deadline is reached + for _ in 0.. { + if std::time::Instant::now() > dead_line { + break; + } - let work = async { - loop { - let mut res = client - .work_http2(&mut client_state) - .await; - let is_cancel = is_cancel_error(&res) || is_end.load(Ordering::Relaxed); - let is_reconnect = is_hyper_error(&res); - set_connection_time( - &mut res, - connection_time, - ); + tokio::time::sleep(duration).await; + for _ in 0..rate { + let _ = tx.send(()); + } + } + // tx gone + }); + rx + } + }; - result_data.push(res); - - if is_cancel || is_reconnect { - return is_cancel; - } - } - }; - - let is_cancel = tokio::select! { - is_cancel = work => { - is_cancel - } - _ = token.cancelled() => { - result_data.push(Err(ClientError::Deadline)); - true - } - }; - - report_tx.send(result_data).unwrap(); - is_cancel - }) - }) - .collect::>(); - - let mut connection_gone = false; - for f in futures { - match f.await { - Ok(true) => { - // All works done - connection_gone = true; - } - Err(_) => { - // Unexpected - connection_gone = true; - } - _ => {} - } - } - - if connection_gone { - return; - } - } - Err(err) => { - let mut result_data = ResultData::default(); - result_data.push(Err(err)); - report_tx.send(result_data).unwrap(); - if is_end.load(Ordering::Relaxed) { - return; - } - } - } - } - })); - } - - rt.block_on(local); - }) - }) - .collect::>(); - tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { - } - _ = tokio::signal::ctrl_c() => { - } - } - - is_end.store(true, Relaxed); - - if !wait_ongoing_requests_after_deadline { - token.cancel(); - } - for handle in handles { - let _ = handle.join(); - } - } else { - let handles = (0..num_threads) - .filter_map(|i| { - let num_connection = n_connections / num_threads - + (if (n_connections % num_threads) > i { - 1 - } else { - 0 - }); - if num_connection > 0 { - Some(num_connection) - } else { - None - } - }) - .map(|num_connection| { - let report_tx = report_tx.clone(); - let is_end = is_end.clone(); - let client = client.clone(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let token = token.clone(); - std::thread::spawn(move || { - let local = tokio::task::LocalSet::new(); - - for _ in 0..num_connection { - let report_tx = report_tx.clone(); - let is_end = is_end.clone(); - let client = client.clone(); - let token = token.clone(); - local.spawn_local(Box::pin(async move { - let mut result_data = ResultData::default(); - - let work = async { - let mut client_state = ClientStateHttp1::default(); - loop { - let res = client.work_http1(&mut client_state).await; - let is_cancel = is_cancel_error(&res); - result_data.push(res); - if is_cancel || is_end.load(Relaxed) { - break; - } - } - }; - - tokio::select! { - _ = work => { - } - _ = token.cancelled() => { - result_data.push(Err(ClientError::Deadline)); - } - } - report_tx.send(result_data).unwrap(); - })); - } - rt.block_on(local); - }) - }) - .collect::>(); - - tokio::select! { - _ = tokio::time::sleep_until(dead_line.into()) => { - } - _ = tokio::signal::ctrl_c() => { - } - } - - is_end.store(true, Relaxed); - - if !wait_ongoing_requests_after_deadline { - token.cancel(); - } - for handle in handles { - let _ = handle.join(); - } - }; -} - -/// Run until dead_line by n workers limit to qps works in a second -#[allow(clippy::too_many_arguments)] -pub async fn work_until_with_qps( - client: Arc, - report_tx: flume::Sender>, - query_limit: QueryLimit, - start: std::time::Instant, - dead_line: std::time::Instant, - n_connections: usize, - n_http2_parallel: usize, - wait_ongoing_requests_after_deadline: bool, -) { - let rx = match query_limit { - QueryLimit::Qps(qps) => { - let (tx, rx) = flume::unbounded(); - tokio::spawn(async move { - for i in 0.. { - if std::time::Instant::now() > dead_line { - break; - } - tokio::time::sleep_until( - (start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(), - ) - .await; - let _ = tx.send(()); - } - // tx gone - }); - rx - } - QueryLimit::Burst(duration, rate) => { - let (tx, rx) = flume::unbounded(); - tokio::spawn(async move { - // Handle via rate till deadline is reached - for _ in 0.. { - if std::time::Instant::now() > dead_line { - break; - } - - tokio::time::sleep(duration).await; - for _ in 0..rate { - let _ = tx.send(()); - } - } - // tx gone - }); - rx - } - }; - - if client.is_work_http2() { - let s = Arc::new(tokio::sync::Semaphore::new(0)); + if client.is_work_http2() { + let s = Arc::new(tokio::sync::Semaphore::new(0)); let futures = (0..n_connections) .map(|_| { @@ -2319,3 +1883,455 @@ pub async fn work_until_with_qps_latency_correction( } } } + +/// Optimized workers for `--no-tui` mode +pub mod fast { + use std::sync::Arc; + + use crate::{ + client::{ + is_cancel_error, is_hyper_error, set_connection_time, setup_http2, ClientError, + ClientStateHttp1, + }, + result_data::ResultData, + }; + + use super::Client; + + /// Run n tasks by m workers + pub async fn work( + client: Arc, + report_tx: flume::Sender, + n_tasks: usize, + n_connections: usize, + n_http2_parallel: usize, + ) { + use std::sync::atomic::{AtomicUsize, Ordering}; + let counter = Arc::new(AtomicUsize::new(0)); + let num_threads = num_cpus::get_physical(); + let connections = (0..num_threads).filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }); + let token = tokio_util::sync::CancellationToken::new(); + + if client.is_work_http2() { + let handles = connections + .map(|num_connections| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let token = token.clone(); + + std::thread::spawn(move || { + let client = client.clone(); + let local = tokio::task::LocalSet::new(); + for _ in 0..num_connections { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let token = token.clone(); + local.spawn_local(Box::pin(async move { + loop { + let client = client.clone(); + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let mut client_state = client_state.clone(); + let counter = counter.clone(); + let client = client.clone(); + let report_tx = report_tx.clone(); + let token = token.clone(); + tokio::task::spawn_local(async move { + let mut result_data = ResultData::default(); + + let work = async { + while counter + .fetch_add(1, Ordering::Relaxed) + < n_tasks + { + let mut res = client + .work_http2(&mut client_state) + .await; + let is_cancel = + is_cancel_error(&res); + let is_reconnect = + is_hyper_error(&res); + set_connection_time( + &mut res, + connection_time, + ); + + result_data.push(res); + + if is_cancel || is_reconnect { + return is_cancel; + } + } + true + }; + + let is_cancel = tokio::select! { + is_cancel = work => { + is_cancel + } + _ = token.cancelled() => { + true + } + }; + + report_tx.send(result_data).unwrap(); + is_cancel + }) + }) + .collect::>(); + + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + + if connection_gone { + return; + } + } + Err(err) => { + if counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let mut result_data = ResultData::default(); + result_data.push(Err(err)); + report_tx.send(result_data).unwrap(); + } else { + return; + } + } + } + } + })); + } + + rt.block_on(local); + }) + }) + .collect::>(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + token.cancel(); + }); + + tokio::task::block_in_place(|| { + for handle in handles { + let _ = handle.join(); + } + }); + } else { + let handles = connections + .map(|num_connection| { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let token = token.clone(); + std::thread::spawn(move || { + let local = tokio::task::LocalSet::new(); + + for _ in 0..num_connection { + let report_tx = report_tx.clone(); + let counter = counter.clone(); + let client = client.clone(); + let token = token.clone(); + local.spawn_local(Box::pin(async move { + let mut result_data = ResultData::default(); + + tokio::select! { + _ = token.cancelled() => {} + _ = async { + let mut client_state = ClientStateHttp1::default(); + while counter.fetch_add(1, Ordering::Relaxed) < n_tasks { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + result_data.push(res); + if is_cancel { + break; + } + } + } => {} + } + report_tx.send(result_data).unwrap(); + })); + } + rt.block_on(local); + }) + }) + .collect::>(); + + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + token.cancel(); + }); + + tokio::task::block_in_place(|| { + for handle in handles { + let _ = handle.join(); + } + }); + }; + } + + /// Run until dead_line by n workers + pub async fn work_until( + client: Arc, + report_tx: flume::Sender, + dead_line: std::time::Instant, + n_connections: usize, + n_http2_parallel: usize, + wait_ongoing_requests_after_deadline: bool, + ) { + use std::sync::atomic::{AtomicBool, Ordering}; + let num_threads = num_cpus::get_physical(); + + let is_end = Arc::new(AtomicBool::new(false)); + let connections = (0..num_threads).filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }); + let token = tokio_util::sync::CancellationToken::new(); + if client.is_work_http2() { + let handles = connections + .map(|num_connections| { + let report_tx = report_tx.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let token = token.clone(); + let is_end = is_end.clone(); + + std::thread::spawn(move || { + let client = client.clone(); + let local = tokio::task::LocalSet::new(); + for _ in 0..num_connections { + let report_tx = report_tx.clone(); + let client = client.clone(); + let token = token.clone(); + let is_end = is_end.clone(); + local.spawn_local(Box::pin(async move { + loop { + let client = client.clone(); + match setup_http2(&client).await { + Ok((connection_time, client_state)) => { + let futures = (0..n_http2_parallel) + .map(|_| { + let mut client_state = client_state.clone(); + let client = client.clone(); + let report_tx = report_tx.clone(); + let token = token.clone(); + let is_end = is_end.clone(); + tokio::task::spawn_local(async move { + let mut result_data = ResultData::default(); + + let work = async { + loop { + let mut res = client + .work_http2(&mut client_state) + .await; + let is_cancel = is_cancel_error(&res) || is_end.load(Ordering::Relaxed); + let is_reconnect = is_hyper_error(&res); + set_connection_time( + &mut res, + connection_time, + ); + + result_data.push(res); + + if is_cancel || is_reconnect { + return is_cancel; + } + } + }; + + let is_cancel = tokio::select! { + is_cancel = work => { + is_cancel + } + _ = token.cancelled() => { + result_data.push(Err(ClientError::Deadline)); + true + } + }; + + report_tx.send(result_data).unwrap(); + is_cancel + }) + }) + .collect::>(); + + let mut connection_gone = false; + for f in futures { + match f.await { + Ok(true) => { + // All works done + connection_gone = true; + } + Err(_) => { + // Unexpected + connection_gone = true; + } + _ => {} + } + } + + if connection_gone { + return; + } + } + Err(err) => { + let mut result_data = ResultData::default(); + result_data.push(Err(err)); + report_tx.send(result_data).unwrap(); + if is_end.load(Ordering::Relaxed) { + return; + } + } + } + } + })); + } + + rt.block_on(local); + }) + }) + .collect::>(); + tokio::select! { + _ = tokio::time::sleep_until(dead_line.into()) => { + } + _ = tokio::signal::ctrl_c() => { + } + } + + is_end.store(true, Ordering::Relaxed); + + if !wait_ongoing_requests_after_deadline { + token.cancel(); + } + for handle in handles { + let _ = handle.join(); + } + } else { + let handles = (0..num_threads) + .filter_map(|i| { + let num_connection = n_connections / num_threads + + (if (n_connections % num_threads) > i { + 1 + } else { + 0 + }); + if num_connection > 0 { + Some(num_connection) + } else { + None + } + }) + .map(|num_connection| { + let report_tx = report_tx.clone(); + let is_end = is_end.clone(); + let client = client.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let token = token.clone(); + std::thread::spawn(move || { + let local = tokio::task::LocalSet::new(); + + for _ in 0..num_connection { + let report_tx = report_tx.clone(); + let is_end = is_end.clone(); + let client = client.clone(); + let token = token.clone(); + local.spawn_local(Box::pin(async move { + let mut result_data = ResultData::default(); + + let work = async { + let mut client_state = ClientStateHttp1::default(); + loop { + let res = client.work_http1(&mut client_state).await; + let is_cancel = is_cancel_error(&res); + result_data.push(res); + if is_cancel || is_end.load(Ordering::Relaxed) { + break; + } + } + }; + + tokio::select! { + _ = work => { + } + _ = token.cancelled() => { + result_data.push(Err(ClientError::Deadline)); + } + } + report_tx.send(result_data).unwrap(); + })); + } + rt.block_on(local); + }) + }) + .collect::>(); + + tokio::select! { + _ = tokio::time::sleep_until(dead_line.into()) => { + } + _ = tokio::signal::ctrl_c() => { + } + } + + is_end.store(true, Ordering::Relaxed); + + if !wait_ongoing_requests_after_deadline { + token.cancel(); + } + for handle in handles { + let _ = handle.join(); + } + }; + } +} diff --git a/src/main.rs b/src/main.rs index 8bc20a51..3dde813b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -552,7 +552,7 @@ async fn run() -> anyhow::Result<()> { let (result_tx, result_rx) = flume::unbounded(); - client::work2( + client::fast::work( client.clone(), result_tx, opts.n_requests, @@ -578,7 +578,7 @@ async fn run() -> anyhow::Result<()> { let duration = opts.duration.unwrap(); let (result_tx, result_rx) = flume::unbounded(); - client::work_until2( + client::fast::work_until( client.clone(), result_tx, start + duration.into(), From 85bf2ae733bf0b8903a21e47cd34a8e42904f510 Mon Sep 17 00:00:00 2001 From: hatoo Date: Thu, 9 Jan 2025 16:08:10 +0900 Subject: [PATCH 25/26] tweak --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3dde813b..01016a2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -522,8 +522,6 @@ async fn run() -> anyhow::Result<()> { client.pre_lookup().await?; } - let start = std::time::Instant::now(); - let no_tui = opts.no_tui || !std::io::stdout().is_tty() || opts.debug; // When panics, reset terminal mode and exit immediately. @@ -538,6 +536,8 @@ async fn run() -> anyhow::Result<()> { std::process::exit(libc::EXIT_FAILURE); })); + let start = std::time::Instant::now(); + let res = if opts.debug { if let Err(e) = client::work_debug(client).await { eprintln!("{e}"); From 7a71d2cca36f44c20b3ca72f0c99164e861485a2 Mon Sep 17 00:00:00 2001 From: hatoo Date: Thu, 9 Jan 2025 16:37:17 +0900 Subject: [PATCH 26/26] work_mode enum --- src/main.rs | 452 +++++++++++++++++++++++++++------------------------- 1 file changed, 233 insertions(+), 219 deletions(-) diff --git a/src/main.rs b/src/main.rs index 01016a2e..44a8a179 100644 --- a/src/main.rs +++ b/src/main.rs @@ -302,7 +302,8 @@ impl FromStr for VsockAddr { } async fn run() -> anyhow::Result<()> { - let mut opts: Opts = Opts::parse(); + let opts: Opts = Opts::parse(); + let work_mode = opts.work_mode(); let parse_http_version = |is_http2: bool, version: Option<&str>| match (is_http2, version) { (true, Some(_)) => anyhow::bail!("--http2 and --http-version are exclusive"), @@ -538,262 +539,213 @@ async fn run() -> anyhow::Result<()> { let start = std::time::Instant::now(); - let res = if opts.debug { - if let Err(e) = client::work_debug(client).await { - eprintln!("{e}"); - } - std::process::exit(libc::EXIT_SUCCESS) - } else if no_tui - && opts.duration.is_none() - && opts.query_per_second.is_none() - && opts.burst_duration.is_none() - { - // Use optimized work_until2 for duration only mode. - - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work( - client.clone(), - result_tx, - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, - ) - .await; - - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } else if no_tui - && opts.duration.is_some() - && opts.query_per_second.is_none() - && opts.burst_duration.is_none() - { - // Use optimized work_until2 for duration only mode. - - let duration = opts.duration.unwrap(); - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work_until( - client.clone(), - result_tx, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await; - - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } else { - let (result_tx, result_rx) = flume::unbounded(); - let data_collector = if no_tui { - // When `--no-tui` is enabled, just collect all data. - - let result_rx_ctrl_c = result_rx.clone(); - tokio::spawn(async move { - let _ = tokio::signal::ctrl_c().await; - let mut all: ResultData = Default::default(); - for report in result_rx_ctrl_c.drain() { - all.push(report); - } - let _ = printer::print_result( - &mut std::io::stdout(), - print_mode, - start, - &all, - start.elapsed(), - opts.disable_color, - opts.stats_success_breakdown, - ); - std::process::exit(libc::EXIT_SUCCESS); - }); + let res = match work_mode { + WorkMode::FixedNumber { + n_requests, + n_connections, + n_http2_parallel, + query_limit: None, + latency_correction: _, + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work( + client.clone(), + result_tx, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; Box::pin(async move { - let mut all = ResultData::default(); - while let Ok(res) = result_rx.recv() { - all.push(res); + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); } - all + res }) as Pin>> - } else { - // Spawn monitor future which draws realtime tui - let join_handle = tokio::spawn( - monitor::Monitor { - print_mode, - end_line: opts - .duration - .map(|d| monitor::EndLine::Duration(d.into())) - .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), - report_receiver: result_rx, - start, - fps: opts.fps, - disable_color: opts.disable_color, - stats_success_breakdown: opts.stats_success_breakdown, + } + WorkMode::Until { + duration, + n_connections, + n_http2_parallel, + query_limit: None, + latency_correction: _, + wait_ongoing_requests_after_deadline, + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work_until( + client.clone(), + result_tx, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; + + Box::pin(async move { + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); } - .monitor(), - ); + res + }) as Pin>> + } + mode => { + let (result_tx, result_rx) = flume::unbounded(); + let data_collector = if no_tui { + // When `--no-tui` is enabled, just collect all data. + + let result_rx_ctrl_c = result_rx.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + let mut all: ResultData = Default::default(); + for report in result_rx_ctrl_c.drain() { + all.push(report); + } + let _ = printer::print_result( + &mut std::io::stdout(), + print_mode, + start, + &all, + start.elapsed(), + opts.disable_color, + opts.stats_success_breakdown, + ); + std::process::exit(libc::EXIT_SUCCESS); + }); + + Box::pin(async move { + let mut all = ResultData::default(); + while let Ok(res) = result_rx.recv() { + all.push(res); + } + all + }) as Pin>> + } else { + // Spawn monitor future which draws realtime tui + let join_handle = tokio::spawn( + monitor::Monitor { + print_mode, + end_line: opts + .duration + .map(|d| monitor::EndLine::Duration(d.into())) + .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), + report_receiver: result_rx, + start, + fps: opts.fps, + disable_color: opts.disable_color, + stats_success_breakdown: opts.stats_success_breakdown, + } + .monitor(), + ); - Box::pin(async { join_handle.await.unwrap().unwrap() }) - as Pin>> - }; + Box::pin(async { join_handle.await.unwrap().unwrap() }) + as Pin>> + }; - if let Some(duration) = opts.duration.take() { - match opts.query_per_second { - Some(0) | None => match opts.burst_duration { - None => { - client::work_until( - client.clone(), - result_tx, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await + match mode { + WorkMode::Debug => { + if let Err(e) = client::work_debug(client).await { + eprintln!("{e}"); } - Some(burst_duration) => { - if opts.latency_correction { - client::work_until_with_qps_latency_correction( + std::process::exit(libc::EXIT_SUCCESS) + } + WorkMode::FixedNumber { + n_requests, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_with_qps( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, + query_limit, + n_requests, + n_connections, + n_http2_parallel, ) - .await + .await; } else { - client::work_until_with_qps( + client::work_with_qps_latency_correction( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, + query_limit, + n_requests, + n_connections, + n_http2_parallel, ) - .await + .await; } - } - }, - Some(qps) => { - if opts.latency_correction { - client::work_until_with_qps_latency_correction( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await } else { - client::work_until_with_qps( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - start, - start + duration.into(), - opts.n_connections, - opts.n_http2_parallel, - opts.wait_ongoing_requests_after_deadline, - ) - .await - } - } - } - } else { - match opts.query_per_second { - Some(0) | None => match opts.burst_duration { - None => { client::work( client.clone(), result_tx, - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, + n_requests, + n_connections, + n_http2_parallel, ) - .await + .await; } - Some(burst_duration) => { - if opts.latency_correction { - client::work_with_qps_latency_correction( + } + WorkMode::Until { + duration, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + wait_ongoing_requests_after_deadline, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_until_with_qps_latency_correction( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, ) - .await + .await; } else { - client::work_with_qps( + client::work_until_with_qps( client.clone(), result_tx, - client::QueryLimit::Burst( - burst_duration.into(), - opts.burst_requests.unwrap_or(1), - ), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, ) - .await + .await; } - } - }, - Some(qps) => { - if opts.latency_correction { - client::work_with_qps_latency_correction( - client.clone(), - result_tx, - client::QueryLimit::Qps(qps), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, - ) - .await } else { - client::work_with_qps( + client::work_until( client.clone(), result_tx, - client::QueryLimit::Qps(qps), - opts.n_requests, - opts.n_connections, - opts.n_http2_parallel, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, ) - .await + .await; } } } - } - data_collector + data_collector + } }; let duration = start.elapsed(); @@ -846,3 +798,65 @@ fn main() { .unwrap(); rt.block_on(run()).unwrap(); } + +enum WorkMode { + Debug, + FixedNumber { + n_requests: usize, + n_connections: usize, + n_http2_parallel: usize, + query_limit: Option, + // ignored when query_limit is None + latency_correction: bool, + }, + Until { + duration: std::time::Duration, + n_connections: usize, + n_http2_parallel: usize, + query_limit: Option, + // ignored when query_limit is None + latency_correction: bool, + wait_ongoing_requests_after_deadline: bool, + }, +} + +impl Opts { + fn work_mode(&self) -> WorkMode { + if self.debug { + WorkMode::Debug + } else if let Some(duration) = self.duration { + WorkMode::Until { + duration: duration.into(), + n_connections: self.n_connections, + n_http2_parallel: self.n_http2_parallel, + query_limit: match self.query_per_second { + Some(0) | None => self.burst_duration.map(|burst_duration| { + client::QueryLimit::Burst( + burst_duration.into(), + self.burst_requests.unwrap_or(1), + ) + }), + Some(qps) => Some(client::QueryLimit::Qps(qps)), + }, + latency_correction: self.latency_correction, + wait_ongoing_requests_after_deadline: self.wait_ongoing_requests_after_deadline, + } + } else { + WorkMode::FixedNumber { + n_requests: self.n_requests, + n_connections: self.n_connections, + n_http2_parallel: self.n_http2_parallel, + query_limit: match self.query_per_second { + Some(0) | None => self.burst_duration.map(|burst_duration| { + client::QueryLimit::Burst( + burst_duration.into(), + self.burst_requests.unwrap_or(1), + ) + }), + Some(qps) => Some(client::QueryLimit::Qps(qps)), + }, + latency_correction: self.latency_correction, + } + } + } +}