Skip to content

Commit

Permalink
Merge pull request #476 from hatoo/update-work-until
Browse files Browse the repository at this point in the history
Update work_until_*
  • Loading branch information
hatoo authored May 4, 2024
2 parents 7adec35 + fe5c25f commit 701e797
Showing 1 changed file with 20 additions and 36 deletions.
56 changes: 20 additions & 36 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,12 +1257,6 @@ pub async fn work_until(
// Maybe there is a better concurrent primitive to do this
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand Down Expand Up @@ -1325,18 +1319,16 @@ pub async fn work_until(
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
} else {
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand All @@ -1363,6 +1355,10 @@ pub async fn work_until(
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
Expand Down Expand Up @@ -1424,12 +1420,6 @@ pub async fn work_until_with_qps(
if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand Down Expand Up @@ -1499,18 +1489,15 @@ pub async fn work_until_with_qps(
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
} else {
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand Down Expand Up @@ -1539,6 +1526,9 @@ pub async fn work_until_with_qps(
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
Expand Down Expand Up @@ -1599,12 +1589,6 @@ pub async fn work_until_with_qps_latency_correction(
if client.is_http2() {
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand Down Expand Up @@ -1674,18 +1658,15 @@ pub async fn work_until_with_qps_latency_correction(
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
} else {
let s = Arc::new(tokio::sync::Semaphore::new(0));

let s1 = s.clone();
tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
s1.close();
});

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
Expand Down Expand Up @@ -1714,6 +1695,9 @@ pub async fn work_until_with_qps_latency_correction(
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
s.close();

for f in futures {
let _ = f.await;
}
Expand Down

0 comments on commit 701e797

Please sign in to comment.