Skip to content

Commit

Permalink
Merge pull request #612 from hatoo/save-url
Browse files Browse the repository at this point in the history
Re:  Write URL of the succeed requests to the sqlite database. #606
  • Loading branch information
hatoo authored Nov 27, 2024
2 parents 17b992c + 6edbb0a commit 468c858
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ rustls-pki-types = { version = "1.7.0", optional = true }

base64 = "0.22.1"
rand = "0.8"
rand_core = "0.6.4"
hickory-resolver = "0.24.1"
rand_regex = "0.17.0"
regex-syntax = "0.8.4"
Expand Down
49 changes: 24 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use hyper::http;
use hyper_util::rt::{TokioExecutor, TokioIo};
use rand::prelude::*;
use std::{
borrow::Cow,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
Expand All @@ -14,6 +15,7 @@ use tokio::net::TcpStream;
use url::{ParseError, Url};

use crate::{
pcg64si::Pcg64Si,
url_generator::{UrlGenerator, UrlGeneratorError},
ConnectToEntry,
};
Expand All @@ -30,6 +32,7 @@ pub struct ConnectionTime {
#[derive(Debug, Clone)]
/// a result for a request
pub struct RequestResult {
pub rng: Pcg64Si,
// When the query should started
pub start_latency_correction: Option<std::time::Instant>,
/// When the query started
Expand Down Expand Up @@ -177,28 +180,28 @@ pub struct Client {
}

struct ClientStateHttp1 {
rng: StdRng,
rng: Pcg64Si,
send_request: Option<SendRequestHttp1>,
}

impl Default for ClientStateHttp1 {
fn default() -> Self {
Self {
rng: StdRng::from_entropy(),
rng: SeedableRng::from_entropy(),
send_request: None,
}
}
}

struct ClientStateHttp2 {
rng: StdRng,
rng: Pcg64Si,
send_request: SendRequestHttp2,
}

impl Clone for ClientStateHttp2 {
fn clone(&self) -> Self {
Self {
rng: StdRng::from_entropy(),
rng: SeedableRng::from_entropy(),
send_request: self.send_request.clone(),
}
}
Expand Down Expand Up @@ -315,6 +318,11 @@ impl Client {
Ok(())
}

pub fn generate_url(&self, rng: &mut Pcg64Si) -> Result<(Cow<Url>, Pcg64Si), ClientError> {
let snapshot = *rng;
Ok((self.url_generator.generate(rng)?, snapshot))
}

async fn client<R: Rng>(
&self,
url: &Url,
Expand Down Expand Up @@ -467,7 +475,7 @@ impl Client {
client_state: &mut ClientStateHttp1,
) -> Result<RequestResult, ClientError> {
let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
let (url, rng) = self.generate_url(&mut client_state.rng)?;
let mut start = std::time::Instant::now();
let mut connection_time: Option<ConnectionTime> = None;

Expand Down Expand Up @@ -523,6 +531,7 @@ impl Client {
let end = std::time::Instant::now();

let result = RequestResult {
rng,
start_latency_correction: None,
start,
end,
Expand Down Expand Up @@ -573,7 +582,7 @@ impl Client {
client_state: &mut ClientStateHttp2,
) -> Result<RequestResult, ClientError> {
let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
let (url, rng) = self.generate_url(&mut client_state.rng)?;
let start = std::time::Instant::now();
let connection_time: Option<ConnectionTime> = None;

Expand All @@ -591,6 +600,7 @@ impl Client {
let end = std::time::Instant::now();

let result = RequestResult {
rng,
start_latency_correction: None,
start,
end,
Expand Down Expand Up @@ -760,7 +770,7 @@ fn is_hyper_error(res: &Result<RequestResult, ClientError>) -> bool {
}

async fn setup_http2(client: &Client) -> Result<(ConnectionTime, ClientStateHttp2), ClientError> {
let mut rng = StdRng::from_entropy();
let mut rng = SeedableRng::from_entropy();
let url = client.url_generator.generate(&mut rng)?;
let (connection_time, send_request) = client.connect_http2(&url, &mut rng).await?;

Expand Down Expand Up @@ -804,7 +814,7 @@ fn set_start_latency_correction<E>(

/// Run n tasks by m workers
pub async fn work_debug(
client: Client,
client: Arc<Client>,
_report_tx: flume::Sender<Result<RequestResult, ClientError>>,
) -> Result<(), ClientError> {
let mut rng = StdRng::from_entropy();
Expand Down Expand Up @@ -836,7 +846,7 @@ pub async fn work_debug(

/// Run n tasks by m workers
pub async fn work(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
n_tasks: usize,
n_connections: usize,
Expand All @@ -845,8 +855,6 @@ pub async fn work(
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = Arc::new(AtomicUsize::new(0));

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -947,7 +955,7 @@ pub async fn work(

/// n tasks by m workers limit to qps works in a second
pub async fn work_with_qps(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
Expand Down Expand Up @@ -992,8 +1000,6 @@ pub async fn work_with_qps(
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -1094,7 +1100,7 @@ pub async fn work_with_qps(

/// n tasks by m workers limit to qps works in a second with latency correction
pub async fn work_with_qps_latency_correction(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
n_tasks: usize,
Expand Down Expand Up @@ -1142,8 +1148,6 @@ pub async fn work_with_qps_latency_correction(
Ok::<(), flume::SendError<_>>(())
};

let client = Arc::new(client);

if client.is_http2() {
let futures = (0..n_connections)
.map(|_| {
Expand Down Expand Up @@ -1245,14 +1249,13 @@ pub async fn work_with_qps_latency_correction(

/// Run until dead_line by n workers
pub async fn work_until(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
dead_line: std::time::Instant,
n_connections: usize,
n_http2_parallel: usize,
wait_ongoing_requests_after_deadline: bool,
) {
let client = Arc::new(client);
if client.is_http2() {
// Using semaphore to control the deadline
// Maybe there is a better concurrent primitive to do this
Expand Down Expand Up @@ -1387,7 +1390,7 @@ pub async fn work_until(
/// Run until dead_line by n workers limit to qps works in a second
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
Expand Down Expand Up @@ -1434,8 +1437,6 @@ pub async fn work_until_with_qps(
}
};

let client = Arc::new(client);

if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand Down Expand Up @@ -1572,7 +1573,7 @@ pub async fn work_until_with_qps(
/// Run until dead_line by n workers limit to qps works in a second with latency correction
#[allow(clippy::too_many_arguments)]
pub async fn work_until_with_qps_latency_correction(
client: Client,
client: Arc<Client>,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
query_limit: QueryLimit,
start: std::time::Instant,
Expand Down Expand Up @@ -1618,8 +1619,6 @@ pub async fn work_until_with_qps_latency_correction(
}
};

let client = Arc::new(client);

if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

Expand Down
46 changes: 43 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use rusqlite::Connection;

use crate::client::RequestResult;
use crate::client::{Client, RequestResult};

fn create_db(conn: &Connection) -> Result<usize, rusqlite::Error> {
conn.execute(
"CREATE TABLE oha (
url TEXT NOT NULL,
start REAL NOT NULL,
start_latency_correction REAL,
end REAL NOT NULL,
Expand All @@ -17,6 +18,7 @@ fn create_db(conn: &Connection) -> Result<usize, rusqlite::Error> {
}

pub fn store(
client: &Client,
db_url: &str,
start: std::time::Instant,
request_records: &[RequestResult],
Expand All @@ -28,9 +30,11 @@ pub fn store(
let mut affected_rows = 0;

for request in request_records {
let url = client.generate_url(&mut request.rng.clone()).unwrap().0;
affected_rows += t.execute(
"INSERT INTO oha (start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
"INSERT INTO oha (url, start, start_latency_correction, end, duration, status, len_bytes) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
(
url.to_string(),
(request.start - start).as_secs_f64(),
request.start_latency_correction.map(|d| (d - start).as_secs_f64()),
(request.end - start).as_secs_f64(),
Expand All @@ -48,12 +52,18 @@ pub fn store(

#[cfg(test)]
mod test_db {
use hyper::{HeaderMap, Method, Version};
use rand::SeedableRng;

use crate::{client::Dns, url_generator::UrlGenerator};

use super::*;

#[test]
fn test_store() {
let start = std::time::Instant::now();
let test_val = RequestResult {
rng: SeedableRng::seed_from_u64(0),
status: hyper::StatusCode::OK,
len_bytes: 100,
start_latency_correction: None,
Expand All @@ -62,7 +72,37 @@ mod test_db {
end: std::time::Instant::now(),
};
let test_vec = vec![test_val.clone(), test_val.clone()];
let result = store(":memory:", start, &test_vec);
let client = Client {
http_version: Version::HTTP_11,
url_generator: UrlGenerator::new_static("http://example.com".parse().unwrap()),
method: Method::GET,
headers: HeaderMap::new(),
body: None,
dns: Dns {
resolver: hickory_resolver::AsyncResolver::tokio_from_system_conf().unwrap(),
connect_to: Vec::new(),
},
timeout: None,
redirect_limit: 0,
disable_keepalive: false,
insecure: false,
#[cfg(unix)]
unix_socket: None,
#[cfg(feature = "vsock")]
vsock_addr: None,
#[cfg(feature = "rustls")]
// Cache rustls_native_certs::load_native_certs() because it's expensive.
root_cert_store: {
let mut root_cert_store = rustls::RootCertStore::empty();
for cert in
rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
root_cert_store.add(cert).unwrap();
}
std::sync::Arc::new(root_cert_store)
},
};
let result = store(&client, ":memory:", start, &test_vec);
assert_eq!(result.unwrap(), 2);
}
}
Loading

0 comments on commit 468c858

Please sign in to comment.