Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hatoo committed Jan 14, 2024
1 parent 47e26f6 commit a480ba2
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,55 +1436,57 @@ pub async fn work_until_with_qps_latency_correction(
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::spawn(async move {
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
let futures = (0..n_http2_parallel)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let rx = rx.clone();
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
set_start_latency_correction(&mut res, start);
set_connection_time(&mut res, connection_time);
let is_cancel = is_too_many_open_files(&res);
let is_reconnect = is_hyper_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
return is_cancel;
loop {
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
let futures = (0..n_http2_parallel)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let rx = rx.clone();
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
set_start_latency_correction(&mut res, start);
set_connection_time(&mut res, connection_time);
let is_cancel = is_too_many_open_files(&res);
let is_reconnect = is_hyper_error(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
return is_cancel;
}
}
}
true
true
})
})
})
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
.collect::<Vec<_>>();
let mut connection_gone = false;
for f in futures {
match f.await {
Ok(true) => {
// All works done
connection_gone = true;
}
Err(_) => {
// Unexpected
connection_gone = true;
}
_ => {}
}
_ => {}
}
if connection_gone {
return;
}
}
if connection_gone {
return;
}
}

Err(err) => {
if let Ok(_) = rx.recv_async().await {
report_tx.send_async(Err(err)).await.unwrap();
} else {
return;
Err(err) => {
if let Ok(_) = rx.recv_async().await {
report_tx.send_async(Err(err)).await.unwrap();
} else {
return;
}
}
}
}
Expand Down

0 comments on commit a480ba2

Please sign in to comment.