Skip to content

Commit

Permalink
fix work
Browse files Browse the repository at this point in the history
  • Loading branch information
hatoo committed Jan 13, 2024
1 parent 44cdfe3 commit 640dac6
Showing 1 changed file with 40 additions and 25 deletions.
65 changes: 40 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,34 +766,49 @@ pub async fn work(
let counter = counter.clone();
let client = client.clone();
tokio::spawn(async move {
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
let futures = (0..n_http2_parallel)
.map(move |_| {
let report_tx = report_tx.clone();
let counter = counter.clone();
let client = client.clone();
loop {
match setup_http2(&client).await {
Ok((connection_time, client_state)) => {
let futures = (0..n_http2_parallel)
.map(|_| {
let report_tx = report_tx.clone();
let counter = counter.clone();
let client = client.clone();

let mut client_state = client_state.clone();
tokio::spawn(async move {
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
let mut client_state = client_state.clone();
tokio::spawn(async move {
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks
{
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_reconnect {
return is_cancel;
}
}
}

true
})
})
})
.collect::<Vec<_>>();
for f in futures {
let _ = f.await;
.collect::<Vec<_>>();

for f in futures {
if f.await.unwrap_or(true) {
return;
}
}
}
Err(err) => {
report_tx.send_async(Err(err)).await.unwrap();

if !(counter.fetch_add(1, Ordering::Relaxed) < n_tasks) {
break;
}
}
}
Err(err) => report_tx.send_async(Err(err)).await.unwrap(),
}
})
})
Expand Down Expand Up @@ -1087,10 +1102,10 @@ pub async fn work_until(
let mut res =
client.work_http2(&mut client_state).await;
let is_cancel = is_too_many_open_files(&res);
let is_hyper_error = is_hyper_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
report_tx.send_async(res).await.unwrap();
if is_cancel || is_hyper_error {
if is_cancel || is_reconnect {
break is_cancel;
}
}
Expand Down

0 comments on commit 640dac6

Please sign in to comment.