Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor http2 workers #508

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 102 additions & 108 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ pub async fn work(
}
Err(err) => {
if counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ pub async fn work_with_qps(
Err(err) => {
// Consume a task
if let Ok(()) = rx.recv_async().await {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ pub async fn work_with_qps_latency_correction(
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}
Expand Down Expand Up @@ -1263,29 +1263,19 @@ pub async fn work_until(
let s = s.clone();
tokio::spawn(async move {
// This is where HTTP2 loops to make all the requests for a given client and worker
tokio::select! {
is_cancel = async {
loop {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

if is_cancel || is_reconnect {
break is_cancel;
}
}
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
loop {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
break is_cancel;
}
}
})
Expand All @@ -1294,16 +1284,24 @@ pub async fn work_until(

let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1312,7 +1310,7 @@ pub async fn work_until(
}

Err(err) => {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
if s.is_closed() {
break;
}
Expand Down Expand Up @@ -1358,10 +1356,7 @@ pub async fn work_until(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down Expand Up @@ -1439,46 +1434,45 @@ pub async fn work_until_with_qps(
let mut client_state = client_state.clone();
let s = s.clone();
tokio::spawn(async move {
tokio::select! {
is_cancel = async {
while let Ok(()) = rx.recv_async().await {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
while let Ok(()) = rx.recv_async().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
None,
)
.await;

let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
})
})
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1488,10 +1482,14 @@ pub async fn work_until_with_qps(
Err(err) => {
// Consume a task
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}

if s.is_closed() {
return;
}
}
}
}
Expand Down Expand Up @@ -1535,10 +1533,7 @@ pub async fn work_until_with_qps(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down Expand Up @@ -1615,46 +1610,44 @@ pub async fn work_until_with_qps_latency_correction(
let mut client_state = client_state.clone();
let s = s.clone();
tokio::spawn(async move {
tokio::select! {
is_cancel = async {
while let Ok(start) = rx.recv_async().await {
let (is_cancel, is_reconnect) =
work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
)
.await;
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
} => {
is_cancel
}
_ = s.acquire() => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
true
while let Ok(start) = rx.recv_async().await {
let (is_cancel, is_reconnect) = work_http2_once(
&client,
&mut client_state,
&report_tx,
connection_time,
Some(start),
)
.await;
let is_cancel = is_cancel || s.is_closed();
if is_cancel || is_reconnect {
return is_cancel;
}
}
true
})
})
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
tokio::select! {
r = f => {
match r {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
}
Err(_) => {
// Unexpected
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
connection_gone = true;
}
_ => {}
}
}
if connection_gone {
Expand All @@ -1664,10 +1657,14 @@ pub async fn work_until_with_qps_latency_correction(

Err(err) => {
if rx.recv_async().await.is_ok() {
report_tx.send_async(Err(err)).await.unwrap();
report_tx.send(Err(err)).unwrap();
} else {
return;
}

if s.is_closed() {
return;
}
}
}
}
Expand Down Expand Up @@ -1712,10 +1709,7 @@ pub async fn work_until_with_qps_latency_correction(
f.abort();
if let Err(e) = f.await {
if e.is_cancelled() {
report_tx
.send_async(Err(ClientError::Deadline))
.await
.unwrap();
report_tx.send(Err(ClientError::Deadline)).unwrap();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async fn test_unix_socket() {
actix_web::App::new().service(actix_web::web::resource("/").to(move || {
let tx = tx.clone();
async move {
tx.send_async(()).await.unwrap();
tx.send(()).unwrap();
"Hello World"
}
}))
Expand Down