Skip to content

Commit

Permalink
Merge pull request #508 from hatoo/refactor-http2
Browse files Browse the repository at this point in the history
refactor http2 workers
  • Loading branch information
hatoo authored May 29, 2024
2 parents 5171ee2 + 3df55ea commit 84980cc
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 109 deletions.
210 changes: 102 additions & 108 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ pub async fn work(
}
Err(err) => {
if counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ pub async fn work_with_qps(
Err(err) => {
// Consume a task
if let Ok(()) = rx.recv_async().await {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ pub async fn work_with_qps_latency_correction(
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1263,29 +1263,19 @@ pub async fn work_until(
let s = s.clone();
tokio::spawn(async move {
// This is where HTTP2 loops to make all the requests for a given client and worker
tokio::select! {
is_cancel = async {
loop {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

if is_cancel || is_reconnect {
break is_cancel;
}
}
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
loop {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
break is_cancel;
}
}
})
Expand All @@ -1294,16 +1284,24 @@ pub async fn work_until(

let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1312,7 +1310,7 @@ pub async fn work_until(
}

Err(err) => {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
if s.is_closed() {
break;
}
Expand Down Expand Up @@ -1358,10 +1356,7 @@ pub async fn work_until(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down Expand Up @@ -1439,46 +1434,45 @@ pub async fn work_until_with_qps(
let mut client_state = client_state.clone();
let s = s.clone();
tokio::spawn(async move {
tokio::select! {
is_cancel = async {
while let Ok(()) = rx.recv_async().await {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
while let Ok(()) = rx.recv_async().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
})
})
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1488,10 +1482,14 @@ pub async fn work_until_with_qps(
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}

if s.is_closed() {
return;
}
}
}
}
Expand Down Expand Up @@ -1535,10 +1533,7 @@ pub async fn work_until_with_qps(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down Expand Up @@ -1615,46 +1610,44 @@ pub async fn work_until_with_qps_latency_correction(
let mut client_state = client_state.clone();
let s = s.clone();
tokio::spawn(async move {
tokio::select! {
is_cancel = async {
while let Ok(start) = rx.recv_async().await {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
while let Ok(start) = rx.recv_async().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
)
.await;
let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
})
})
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1664,10 +1657,14 @@ pub async fn work_until_with_qps_latency_correction(

Err(err) => {
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}

if s.is_closed() {
return;
}
}
}
}
Expand Down Expand Up @@ -1712,10 +1709,7 @@ pub async fn work_until_with_qps_latency_correction(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async fn test_unix_socket() {
actix_web::App::new().service(actix_web::web::resource("/").to(move || {
let tx = tx.clone();
async move {
tx.send_async(()).await.unwrap();
tx.send(()).unwrap();
"Hello World"
}
}))
Expand Down

0 comments on commit 84980cc

Please sign in to comment.