diff --git a/Cargo.lock b/Cargo.lock index b1477d18..8733c67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2203,6 +2203,7 @@ dependencies = [ "pin-project-lite", "predicates", "rand", + "rand_core", "rand_regex", "ratatui", "regex", diff --git a/Cargo.toml b/Cargo.toml index 29e8cd70..5c6f8361 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ rustls-pki-types = { version = "1.7.0", optional = true } base64 = "0.22.1" rand = "0.8" +rand_core = "0.6.4" hickory-resolver = "0.24.1" rand_regex = "0.17.0" regex-syntax = "0.8.4" diff --git a/src/client.rs b/src/client.rs index 5a9331e1..0f7e3460 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,6 +3,7 @@ use hyper::http; use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::prelude::*; use std::{ + borrow::Cow, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, @@ -14,6 +15,7 @@ use tokio::net::TcpStream; use url::{ParseError, Url}; use crate::{ + pcg64si::Pcg64Si, url_generator::{UrlGenerator, UrlGeneratorError}, ConnectToEntry, }; @@ -30,6 +32,7 @@ pub struct ConnectionTime { #[derive(Debug, Clone)] /// a result for a request pub struct RequestResult { + pub rng: Pcg64Si, // When the query should started pub start_latency_correction: Option, /// When the query started @@ -177,28 +180,28 @@ pub struct Client { } struct ClientStateHttp1 { - rng: StdRng, + rng: Pcg64Si, send_request: Option, } impl Default for ClientStateHttp1 { fn default() -> Self { Self { - rng: StdRng::from_entropy(), + rng: SeedableRng::from_entropy(), send_request: None, } } } struct ClientStateHttp2 { - rng: StdRng, + rng: Pcg64Si, send_request: SendRequestHttp2, } impl Clone for ClientStateHttp2 { fn clone(&self) -> Self { Self { - rng: StdRng::from_entropy(), + rng: SeedableRng::from_entropy(), send_request: self.send_request.clone(), } } @@ -315,6 +318,11 @@ impl Client { Ok(()) } + pub fn generate_url(&self, rng: &mut Pcg64Si) -> Result<(Cow, Pcg64Si), ClientError> { + let snapshot = *rng; + Ok((self.url_generator.generate(rng)?, snapshot)) + } + async fn client( &self, url: &Url, @@ -467,7 +475,7 @@ impl Client { client_state: &mut ClientStateHttp1, ) -> Result { let do_req = async { - let url = self.url_generator.generate(&mut client_state.rng)?; + let (url, rng) = self.generate_url(&mut client_state.rng)?; let mut start = std::time::Instant::now(); let mut connection_time: Option = None; @@ -523,6 +531,7 @@ impl Client { let end = std::time::Instant::now(); let result = RequestResult { + rng, start_latency_correction: None, start, end, @@ -573,7 +582,7 @@ impl Client { client_state: &mut ClientStateHttp2, ) -> Result { let do_req = async { - let url = self.url_generator.generate(&mut client_state.rng)?; + let (url, rng) = self.generate_url(&mut client_state.rng)?; let start = std::time::Instant::now(); let connection_time: Option = None; @@ -591,6 +600,7 @@ impl Client { let end = std::time::Instant::now(); let result = RequestResult { + rng, start_latency_correction: None, start, end, @@ -760,7 +770,7 @@ fn is_hyper_error(res: &Result) -> bool { } async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp2), ClientError> { - let mut rng = StdRng::from_entropy(); + let mut rng = SeedableRng::from_entropy(); let url = client.url_generator.generate(&mut rng)?; let (connection_time, send_request) = client.connect_http2(&url, &mut rng).await?; @@ -804,7 +814,7 @@ fn set_start_latency_correction( /// Run n tasks by m workers pub async fn work_debug( - client: Client, + client: Arc, _report_tx: flume::Sender>, ) -> Result<(), ClientError> { let mut rng = StdRng::from_entropy(); @@ -836,7 +846,7 @@ pub async fn work_debug( /// Run n tasks by m workers pub async fn work( - client: Client, + client: Arc, report_tx: flume::Sender>, n_tasks: usize, n_connections: usize, @@ -845,8 +855,6 @@ pub async fn work( use std::sync::atomic::{AtomicUsize, Ordering}; let counter = Arc::new(AtomicUsize::new(0)); - let client = Arc::new(client); - if client.is_http2() { let futures = (0..n_connections) .map(|_| { @@ -947,7 +955,7 @@ pub async fn work( /// n tasks by m workers limit to qps works in a second pub async fn work_with_qps( - client: Client, + client: Arc, report_tx: flume::Sender>, query_limit: QueryLimit, n_tasks: usize, @@ -992,8 +1000,6 @@ pub async fn work_with_qps( Ok::<(), flume::SendError<_>>(()) }; - let client = Arc::new(client); - if client.is_http2() { let futures = (0..n_connections) .map(|_| { @@ -1094,7 +1100,7 @@ pub async fn work_with_qps( /// n tasks by m workers limit to qps works in a second with latency correction pub async fn work_with_qps_latency_correction( - client: Client, + client: Arc, report_tx: flume::Sender>, query_limit: QueryLimit, n_tasks: usize, @@ -1142,8 +1148,6 @@ pub async fn work_with_qps_latency_correction( Ok::<(), flume::SendError<_>>(()) }; - let client = Arc::new(client); - if client.is_http2() { let futures = (0..n_connections) .map(|_| { @@ -1245,14 +1249,13 @@ pub async fn work_with_qps_latency_correction( /// Run until dead_line by n workers pub async fn work_until( - client: Client, + client: Arc, report_tx: flume::Sender>, dead_line: std::time::Instant, n_connections: usize, n_http2_parallel: usize, wait_ongoing_requests_after_deadline: bool, ) { - let client = Arc::new(client); if client.is_http2() { // Using semaphore to control the deadline // Maybe there is a better concurrent primitive to do this @@ -1387,7 +1390,7 @@ pub async fn work_until( /// Run until dead_line by n workers limit to qps works in a second #[allow(clippy::too_many_arguments)] pub async fn work_until_with_qps( - client: Client, + client: Arc, report_tx: flume::Sender>, query_limit: QueryLimit, start: std::time::Instant, @@ -1434,8 +1437,6 @@ pub async fn work_until_with_qps( } }; - let client = Arc::new(client); - if client.is_http2() { let s = Arc::new(tokio::sync::Semaphore::new(0)); @@ -1572,7 +1573,7 @@ pub async fn work_until_with_qps( /// Run until dead_line by n workers limit to qps works in a second with latency correction #[allow(clippy::too_many_arguments)] pub async fn work_until_with_qps_latency_correction( - client: Client, + client: Arc, report_tx: flume::Sender>, query_limit: QueryLimit, start: std::time::Instant, @@ -1618,8 +1619,6 @@ pub async fn work_until_with_qps_latency_correction( } }; - let client = Arc::new(client); - if client.is_http2() { let s = Arc::new(tokio::sync::Semaphore::new(0)); diff --git a/src/db.rs b/src/db.rs index 94cfca50..ed212a92 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,10 +1,11 @@ use rusqlite::Connection; -use crate::client::RequestResult; +use crate::client::{Client, RequestResult}; fn create_db(conn: &Connection) -> Result { conn.execute( "CREATE TABLE oha ( + url TEXT NOT NULL, start REAL NOT NULL, start_latency_correction REAL, end REAL NOT NULL, @@ -17,6 +18,7 @@ fn create_db(conn: &Connection) -> Result { } pub fn store( + client: &Client, db_url: &str, start: std::time::Instant, request_records: &[RequestResult], @@ -28,9 +30,11 @@ pub fn store( let mut affected_rows = 0; for request in request_records { + let url = client.generate_url(&mut request.rng.clone()).unwrap().0; affected_rows += t.execute( - "INSERT INTO oha (start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + "INSERT INTO oha (url, start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", ( + url.to_string(), (request.start - start).as_secs_f64(), request.start_latency_correction.map(|d| (d - start).as_secs_f64()), (request.end - start).as_secs_f64(), @@ -48,12 +52,18 @@ pub fn store( #[cfg(test)] mod test_db { + use hyper::{HeaderMap, Method, Version}; + use rand::SeedableRng; + + use crate::{client::Dns, url_generator::UrlGenerator}; + use super::*; #[test] fn test_store() { let start = std::time::Instant::now(); let test_val = RequestResult { + rng: SeedableRng::seed_from_u64(0), status: hyper::StatusCode::OK, len_bytes: 100, start_latency_correction: None, @@ -62,7 +72,37 @@ mod test_db { end: std::time::Instant::now(), }; let test_vec = vec![test_val.clone(), test_val.clone()]; - let result = store(":memory:", start, &test_vec); + let client = Client { + http_version: Version::HTTP_11, + url_generator: UrlGenerator::new_static("http://example.com".parse().unwrap()), + method: Method::GET, + headers: HeaderMap::new(), + body: None, + dns: Dns { + resolver: hickory_resolver::AsyncResolver::tokio_from_system_conf().unwrap(), + connect_to: Vec::new(), + }, + timeout: None, + redirect_limit: 0, + disable_keepalive: false, + insecure: false, + #[cfg(unix)] + unix_socket: None, + #[cfg(feature = "vsock")] + vsock_addr: None, + #[cfg(feature = "rustls")] + // Cache rustls_native_certs::load_native_certs() because it's expensive. + root_cert_store: { + let mut root_cert_store = rustls::RootCertStore::empty(); + for cert in + rustls_native_certs::load_native_certs().expect("could not load platform certs") + { + root_cert_store.add(cert).unwrap(); + } + std::sync::Arc::new(root_cert_store) + }, + }; + let result = store(&client, ":memory:", start, &test_vec); assert_eq!(result.unwrap(), 2); } } diff --git a/src/main.rs b/src/main.rs index 570e0674..8ae00ab7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use rand::prelude::*; use rand_regex::Regex; use ratatui::crossterm; use result_data::ResultData; -use std::{env, io::Read, str::FromStr}; +use std::{env, io::Read, str::FromStr, sync::Arc}; use url::Url; use url_generator::UrlGenerator; @@ -20,6 +20,7 @@ mod client; mod db; mod histogram; mod monitor; +mod pcg64si; mod printer; mod result_data; mod timescale; @@ -444,7 +445,7 @@ async fn main() -> anyhow::Result<()> { resolver_opts.ip_strategy = ip_strategy; let resolver = hickory_resolver::AsyncResolver::tokio(config, resolver_opts); - let client = client::Client { + let client = Arc::new(client::Client { http_version, url_generator, method: opts.method, @@ -473,7 +474,7 @@ async fn main() -> anyhow::Result<()> { } std::sync::Arc::new(root_cert_store) }, - }; + }); if !opts.no_pre_lookup { client.pre_lookup().await?; @@ -540,7 +541,7 @@ async fn main() -> anyhow::Result<()> { Some(0) | None => match opts.burst_duration { None => { client::work_until( - client, + client.clone(), result_tx, start + duration.into(), opts.n_connections, @@ -552,7 +553,7 @@ async fn main() -> anyhow::Result<()> { Some(burst_duration) => { if opts.latency_correction { client::work_until_with_qps_latency_correction( - client, + client.clone(), result_tx, client::QueryLimit::Burst( burst_duration.into(), @@ -567,7 +568,7 @@ async fn main() -> anyhow::Result<()> { .await } else { client::work_until_with_qps( - client, + client.clone(), result_tx, client::QueryLimit::Burst( burst_duration.into(), @@ -586,7 +587,7 @@ async fn main() -> anyhow::Result<()> { Some(qps) => { if opts.latency_correction { client::work_until_with_qps_latency_correction( - client, + client.clone(), result_tx, client::QueryLimit::Qps(qps), start, @@ -598,7 +599,7 @@ async fn main() -> anyhow::Result<()> { .await } else { client::work_until_with_qps( - client, + client.clone(), result_tx, client::QueryLimit::Qps(qps), start, @@ -616,7 +617,7 @@ async fn main() -> anyhow::Result<()> { Some(0) | None => match opts.burst_duration { None => { client::work( - client, + client.clone(), result_tx, opts.n_requests, opts.n_connections, @@ -627,7 +628,7 @@ async fn main() -> anyhow::Result<()> { Some(burst_duration) => { if opts.latency_correction { client::work_with_qps_latency_correction( - client, + client.clone(), result_tx, client::QueryLimit::Burst( burst_duration.into(), @@ -640,7 +641,7 @@ async fn main() -> anyhow::Result<()> { .await } else { client::work_with_qps( - client, + client.clone(), result_tx, client::QueryLimit::Burst( burst_duration.into(), @@ -657,7 +658,7 @@ async fn main() -> anyhow::Result<()> { Some(qps) => { if opts.latency_correction { client::work_with_qps_latency_correction( - client, + client.clone(), result_tx, client::QueryLimit::Qps(qps), opts.n_requests, @@ -667,7 +668,7 @@ async fn main() -> anyhow::Result<()> { .await } else { client::work_with_qps( - client, + client.clone(), result_tx, client::QueryLimit::Qps(qps), opts.n_requests, @@ -696,7 +697,7 @@ async fn main() -> anyhow::Result<()> { if let Some(db_url) = opts.db_url { eprintln!("Storing results to {db_url}"); - let _ = db::store(&db_url, start, res.success()); + db::store(&client, &db_url, start, res.success())?; } Ok(()) diff --git a/src/pcg64si.rs b/src/pcg64si.rs new file mode 100644 index 00000000..558d0dad --- /dev/null +++ b/src/pcg64si.rs @@ -0,0 +1,45 @@ +// https://github.com/imneme/pcg-c +use rand::{Error, RngCore, SeedableRng}; +use rand_core::impls; + +#[derive(Debug, Copy, Clone)] +pub struct Pcg64Si { + state: u64, +} + +impl RngCore for Pcg64Si { + fn next_u32(&mut self) -> u32 { + self.next_u64() as u32 + } + + fn next_u64(&mut self) -> u64 { + let old_state = self.state; + self.state = self + .state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + + let word = + ((old_state >> ((old_state >> 59) + 5)) ^ old_state).wrapping_mul(12605985483714917081); + (word >> 43) ^ word + } + + fn fill_bytes(&mut self, dest: &mut [u8]) { + impls::fill_bytes_via_next(self, dest) + } + + fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Error> { + self.fill_bytes(dest); + Ok(()) + } +} + +impl SeedableRng for Pcg64Si { + type Seed = [u8; 8]; + + fn from_seed(seed: Self::Seed) -> Pcg64Si { + Pcg64Si { + state: u64::from_le_bytes(seed), + } + } +} diff --git a/src/result_data.rs b/src/result_data.rs index d205e86a..bd8c8a9d 100644 --- a/src/result_data.rs +++ b/src/result_data.rs @@ -179,6 +179,7 @@ impl ResultData { #[cfg(test)] mod tests { use float_cmp::assert_approx_eq; + use rand::SeedableRng; use super::*; use crate::client::{ClientError, ConnectionTime, RequestResult}; @@ -193,6 +194,7 @@ mod tests { ) -> Result { let now = Instant::now(); Ok(RequestResult { + rng: SeedableRng::seed_from_u64(0), start_latency_correction: None, start: now, connection_time: Some(ConnectionTime { diff --git a/src/url_generator.rs b/src/url_generator.rs index 1b4b613a..3e605fa2 100644 --- a/src/url_generator.rs +++ b/src/url_generator.rs @@ -43,6 +43,8 @@ impl UrlGenerator { #[cfg(test)] mod tests { + use crate::pcg64si::Pcg64Si; + use super::*; use rand_regex::Regex as RandRegex; use regex::Regex; @@ -70,4 +72,20 @@ mod tests { .captures(url.path()) .is_some()); } + + #[test] + fn test_url_generator_dynamic_consistency() { + let url_generator = UrlGenerator::new_dynamic( + RandRegex::compile(r"http://127\.0\.0\.1/[a-z][a-z][0-9]", 4).unwrap(), + ); + + for _ in 0..100 { + let rng: Pcg64Si = SeedableRng::from_entropy(); + + assert_eq!( + url_generator.generate(&mut rng.clone()).unwrap(), + url_generator.generate(&mut rng.clone()).unwrap() + ); + } + } }