diff --git a/src/main.rs b/src/main.rs index cb4353f7..4b3d0c78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use hyper::http::{ self, header::{HeaderName, HeaderValue}, }; -use printer::PrintMode; +use printer::{PrintConfig, PrintMode}; use rand::prelude::*; use rand_regex::Regex; use ratatui::crossterm; @@ -17,7 +17,7 @@ use std::{ env, fs::File, io::{BufRead, Read}, - path::Path, + path::{Path, PathBuf}, pin::Pin, str::FromStr, sync::Arc, @@ -248,6 +248,12 @@ Note: If qps is specified, burst will be ignored", help = "Perform a single request and dump the request and response" )] debug: bool, + #[arg( + help = "Output file to write the results to. If not specified, results are written to stdout.", + long, + short + )] + output: Option, } /// An entry specified by `connect-to` to override DNS resolution and default @@ -505,10 +511,24 @@ async fn run() -> anyhow::Result<()> { _ => None, }; - let print_mode = if opts.json { - PrintMode::Json - } else { - PrintMode::Text + let print_config = { + let mode = if opts.json { + PrintMode::Json + } else { + PrintMode::Text + }; + let output: Box = if let Some(output) = opts.output { + Box::new(File::create(output)?) + } else { + Box::new(std::io::stdout()) + }; + + PrintConfig { + mode, + output, + disable_color: opts.disable_color, + stats_success_breakdown: opts.stats_success_breakdown, + } }; let ip_strategy = match (opts.ipv4, opts.ipv6) { @@ -575,190 +595,204 @@ async fn run() -> anyhow::Result<()> { let start = std::time::Instant::now(); - let res = match work_mode { - WorkMode::FixedNumber { - n_requests, - n_connections, - n_http2_parallel, - query_limit: None, - latency_correction: _, - } if no_tui => { - // Use optimized worker of no_tui mode. - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work( - client.clone(), - result_tx, + let data_collect_future: Pin>> = + match work_mode { + WorkMode::FixedNumber { n_requests, n_connections, n_http2_parallel, - ) - .await; + query_limit: None, + latency_correction: _, + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work( + client.clone(), + result_tx, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } - WorkMode::Until { - duration, - n_connections, - n_http2_parallel, - query_limit: None, - latency_correction: _, - wait_ongoing_requests_after_deadline, - } if no_tui => { - // Use optimized worker of no_tui mode. - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work_until( - client.clone(), - result_tx, - start + duration, + Box::pin(async move { + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); + } + (res, print_config) + }) + } + WorkMode::Until { + duration, n_connections, n_http2_parallel, + query_limit: None, + latency_correction: _, wait_ongoing_requests_after_deadline, - ) - .await; - - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } - mode => { - let (result_tx, result_rx) = flume::unbounded(); - let data_collector = if no_tui { - // When `--no-tui` is enabled, just collect all data. - - let result_rx_ctrl_c = result_rx.clone(); - tokio::spawn(async move { - let _ = tokio::signal::ctrl_c().await; - let mut all: ResultData = Default::default(); - for report in result_rx_ctrl_c.drain() { - all.push(report); - } - let _ = printer::print_result( - &mut std::io::stdout(), - print_mode, - start, - &all, - start.elapsed(), - opts.disable_color, - opts.stats_success_breakdown, - ); - std::process::exit(libc::EXIT_SUCCESS); - }); + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work_until( + client.clone(), + result_tx, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; Box::pin(async move { - let mut all = ResultData::default(); - while let Ok(res) = result_rx.recv() { - all.push(res); - } - all - }) as Pin>> - } else { - // Spawn monitor future which draws realtime tui - let join_handle = tokio::spawn( - monitor::Monitor { - print_mode, - end_line: opts - .duration - .map(|d| monitor::EndLine::Duration(d.into())) - .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), - report_receiver: result_rx, - start, - fps: opts.fps, - disable_color: opts.disable_color, - stats_success_breakdown: opts.stats_success_breakdown, + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); } - .monitor(), - ); + (res, print_config) + }) + } + mode => { + let (result_tx, result_rx) = flume::unbounded(); + let data_collector = if no_tui { + // When `--no-tui` is enabled, just collect all data. + + let token = tokio_util::sync::CancellationToken::new(); + let result_rx_ctrl_c = result_rx.clone(); + let token_ctrl_c = token.clone(); + let ctrl_c = tokio::spawn(async move { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + let mut all: ResultData = Default::default(); + for report in result_rx_ctrl_c.drain() { + all.push(report); + } + let _ = printer::print_result(print_config, start, &all, start.elapsed()); + std::process::exit(libc::EXIT_SUCCESS); + } + _ = token_ctrl_c.cancelled() => { + print_config + } + + } + }); + + Box::pin(async move { + token.cancel(); + let config = ctrl_c.await.unwrap(); + let mut all = ResultData::default(); + while let Ok(res) = result_rx.recv() { + all.push(res); + } + (all, config) + }) + as Pin>> + } else { + // Spawn monitor future which draws realtime tui + let join_handle = tokio::spawn( + monitor::Monitor { + print_config, + end_line: opts + .duration + .map(|d| monitor::EndLine::Duration(d.into())) + .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), + report_receiver: result_rx, + start, + fps: opts.fps, + } + .monitor(), + ); - Box::pin(async { join_handle.await.unwrap().unwrap() }) - as Pin>> - }; + Box::pin(async { join_handle.await.unwrap().unwrap() }) + as Pin>> + }; - match mode { - WorkMode::Debug => { - if let Err(e) = client::work_debug(client).await { - eprintln!("{e}"); + match mode { + WorkMode::Debug => { + if let Err(e) = client::work_debug(client).await { + eprintln!("{e}"); + } + std::process::exit(libc::EXIT_SUCCESS) } - std::process::exit(libc::EXIT_SUCCESS) - } - WorkMode::FixedNumber { - n_requests, - n_connections, - n_http2_parallel, - query_limit, - latency_correction, - } => { - if let Some(query_limit) = query_limit { - if latency_correction { - client::work_with_qps( - client.clone(), - result_tx, - query_limit, - n_requests, - n_connections, - n_http2_parallel, - ) - .await; + WorkMode::FixedNumber { + n_requests, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_with_qps( + client.clone(), + result_tx, + query_limit, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; + } else { + client::work_with_qps_latency_correction( + client.clone(), + result_tx, + query_limit, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; + } } else { - client::work_with_qps_latency_correction( + client::work( client.clone(), result_tx, - query_limit, n_requests, n_connections, n_http2_parallel, ) .await; } - } else { - client::work( - client.clone(), - result_tx, - n_requests, - n_connections, - n_http2_parallel, - ) - .await; } - } - WorkMode::Until { - duration, - n_connections, - n_http2_parallel, - query_limit, - latency_correction, - wait_ongoing_requests_after_deadline, - } => { - if let Some(query_limit) = query_limit { - if latency_correction { - client::work_until_with_qps_latency_correction( - client.clone(), - result_tx, - query_limit, - start, - start + duration, - n_connections, - n_http2_parallel, - wait_ongoing_requests_after_deadline, - ) - .await; + WorkMode::Until { + duration, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + wait_ongoing_requests_after_deadline, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_until_with_qps_latency_correction( + client.clone(), + result_tx, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; + } else { + client::work_until_with_qps( + client.clone(), + result_tx, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; + } } else { - client::work_until_with_qps( + client::work_until( client.clone(), result_tx, - query_limit, - start, start + duration, n_connections, n_http2_parallel, @@ -766,36 +800,17 @@ async fn run() -> anyhow::Result<()> { ) .await; } - } else { - client::work_until( - client.clone(), - result_tx, - start + duration, - n_connections, - n_http2_parallel, - wait_ongoing_requests_after_deadline, - ) - .await; } } - } - data_collector - } - }; + data_collector + } + }; let duration = start.elapsed(); - let res = res.await; - - printer::print_result( - &mut std::io::stdout(), - print_mode, - start, - &res, - duration, - opts.disable_color, - opts.stats_success_breakdown, - )?; + let (res, print_config) = data_collect_future.await; + + printer::print_result(print_config, start, &res, duration)?; if let Some(db_url) = opts.db_url { eprintln!("Storing results to {db_url}"); diff --git a/src/monitor.rs b/src/monitor.rs index 79501a5f..6cd969d4 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -17,7 +17,7 @@ use std::{collections::BTreeMap, io}; use crate::{ client::{ClientError, RequestResult}, - printer::PrintMode, + printer::PrintConfig, result_data::{MinMaxMean, ResultData}, timescale::{TimeLabel, TimeScale}, }; @@ -53,7 +53,7 @@ impl ColorScheme { } pub struct Monitor { - pub print_mode: PrintMode, + pub print_config: PrintConfig, pub end_line: EndLine, /// All workers sends each result to this channel pub report_receiver: flume::Receiver>, @@ -61,8 +61,6 @@ pub struct Monitor { pub start: std::time::Instant, // Frame per second of TUI pub fps: usize, - pub disable_color: bool, - pub stats_success_breakdown: bool, } struct IntoRawMode; @@ -85,7 +83,7 @@ impl Drop for IntoRawMode { } impl Monitor { - pub async fn monitor(self) -> Result { + pub async fn monitor(self) -> Result<(ResultData, PrintConfig), std::io::Error> { let raw_mode = IntoRawMode::new()?; let mut terminal = { @@ -107,7 +105,7 @@ impl Monitor { let mut timescale_auto = None; let mut colors = ColorScheme::new(); - if !self.disable_color { + if !self.print_config.disable_color { colors.set_colors(); } @@ -434,13 +432,10 @@ impl Monitor { }) => { drop(raw_mode); let _ = crate::printer::print_result( - &mut std::io::stdout(), - self.print_mode, + self.print_config, self.start, &all, now - self.start, - self.disable_color, - self.stats_success_breakdown, ); std::process::exit(libc::EXIT_SUCCESS); } @@ -454,6 +449,6 @@ impl Monitor { tokio::time::sleep(per_frame - elapsed).await; } } - Ok(all) + Ok((all, self.print_config)) } } diff --git a/src/printer.rs b/src/printer.rs index a5bc9c45..620eb4f4 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -91,30 +91,40 @@ impl StyleScheme { } } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub enum PrintMode { Text, Json, } -pub fn print_result( - w: &mut W, - mode: PrintMode, +pub struct PrintConfig { + pub output: Box, + pub mode: PrintMode, + pub disable_color: bool, + pub stats_success_breakdown: bool, +} + +pub fn print_result( + mut config: PrintConfig, start: Instant, res: &ResultData, total_duration: Duration, - disable_color: bool, - stats_success_breakdown: bool, ) -> anyhow::Result<()> { - match mode { + match config.mode { PrintMode::Text => print_summary( - w, + &mut config.output, + res, + total_duration, + config.disable_color, + config.stats_success_breakdown, + )?, + PrintMode::Json => print_json( + &mut config.output, + start, res, total_duration, - disable_color, - stats_success_breakdown, + config.stats_success_breakdown, )?, - PrintMode::Json => print_json(w, start, res, total_duration, stats_success_breakdown)?, } Ok(()) }