Skip to content

Commit

Permalink
Merge pull request #488 from hatoo/optimize
Browse files Browse the repository at this point in the history
Optimize work_until_*
  • Loading branch information
hatoo authored May 6, 2024
2 parents edef8a3 + 29ba90e commit 598799f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 121 deletions.
122 changes: 64 additions & 58 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ use hyper::{
};
use hyper_util::rt::{TokioExecutor, TokioIo};
use rand::prelude::*;
use std::{pin::Pin, sync::Arc, time::Instant};
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
time::Instant,
};
use thiserror::Error;
use tokio::net::TcpStream;
use url::{ParseError, Url};

use crate::{
signal::Signal,
url_generator::{UrlGenerator, UrlGeneratorError},
ConnectToEntry,
};
Expand Down Expand Up @@ -1317,43 +1323,41 @@ pub async fn work_until(
let _ = f.await;
}
} else {
let mut signal = Signal::new();
let is_end = Arc::new(AtomicBool::new(false));

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let mut client_state = ClientStateHttp1::default();
let signal = signal.recv_signal();
let is_end = is_end.clone();
tokio::spawn(async move {
tokio::select! {
// This is where HTTP1 loops to make all the requests for a given client
_ = async {
loop {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
}
} => {},
_ = signal => {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
loop {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
// TODO: Fix the case when aborted right here
report_tx.send_async(res).await.unwrap();
if is_cancel || is_end.load(Relaxed) {
break;
}
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
signal.signal();
is_end.store(true, Relaxed);

for f in futures {
let _ = f.await;
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
}
}
}
};
}
Expand Down Expand Up @@ -1498,40 +1502,41 @@ pub async fn work_until_with_qps(
let _ = f.await;
}
} else {
let mut signal = Signal::new();
let is_end = Arc::new(AtomicBool::new(false));

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let mut client_state = ClientStateHttp1::default();
let report_tx = report_tx.clone();
let rx = rx.clone();
let signal = signal.recv_signal();
let is_end = is_end.clone();
tokio::spawn(async move {
tokio::select! {
_ = async {
while let Ok(()) = rx.recv_async().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
}
} => {},
_ = signal => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
while let Ok(()) = rx.recv_async().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_end.load(Relaxed) {
break;
}
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
signal.signal();
is_end.store(true, Relaxed);

for f in futures {
let _ = f.await;
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
}
}
}
}
}
Expand Down Expand Up @@ -1675,41 +1680,42 @@ pub async fn work_until_with_qps_latency_correction(
let _ = f.await;
}
} else {
let mut signal = Signal::new();
let is_end = Arc::new(AtomicBool::new(false));

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let mut client_state = ClientStateHttp1::default();
let report_tx = report_tx.clone();
let rx = rx.clone();
let signal = signal.recv_signal();
let is_end = is_end.clone();
tokio::spawn(async move {
tokio::select! {
_ = async {
while let Ok(start) = rx.recv_async().await {
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
}
} => {}
_ = signal => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
while let Ok(start) = rx.recv_async().await {
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_end.load(Relaxed) {
break;
}
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
signal.signal();
is_end.store(true, Relaxed);

for f in futures {
let _ = f.await;
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
}
}
}
}
}
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod histogram;
mod monitor;
mod printer;
mod result_data;
mod signal;
mod timescale;
mod url_generator;

Expand Down
62 changes: 0 additions & 62 deletions src/signal.rs

This file was deleted.

0 comments on commit 598799f

Please sign in to comment.