Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sync util #481

Merged
merged 2 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::net::TcpStream;
use url::{ParseError, Url};

use crate::{
flag::Flag,
signal::Signal,
url_generator::{UrlGenerator, UrlGeneratorError},
ConnectToEntry,
};
Expand Down Expand Up @@ -1345,14 +1345,14 @@ pub async fn work_until(
let _ = f.await;
}
} else {
let flag = Flag::new();
let mut signal = Signal::new();

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let report_tx = report_tx.clone();
let mut client_state = ClientStateHttp1::default();
let flag = flag.clone();
let signal = signal.recv_signal();
tokio::spawn(async move {
tokio::select! {
// This is where HTTP1 loops to make all the requests for a given client
Expand All @@ -1366,7 +1366,7 @@ pub async fn work_until(
}
}
} => {},
_ = flag => {
_ = signal => {
report_tx
.send_async(Err(ClientError::Deadline))
.await
Expand All @@ -1378,7 +1378,7 @@ pub async fn work_until(
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
flag.signal();
signal.signal();

for f in futures {
let _ = f.await;
Expand Down Expand Up @@ -1526,15 +1526,15 @@ pub async fn work_until_with_qps(
let _ = f.await;
}
} else {
let flag = Flag::new();
let mut signal = Signal::new();

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let mut client_state = ClientStateHttp1::default();
let report_tx = report_tx.clone();
let rx = rx.clone();
let flag = flag.clone();
let signal = signal.recv_signal();
tokio::spawn(async move {
tokio::select! {
_ = async {
Expand All @@ -1547,7 +1547,7 @@ pub async fn work_until_with_qps(
}
}
} => {},
_ = flag => {
_ = signal => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
}
}
Expand All @@ -1556,7 +1556,7 @@ pub async fn work_until_with_qps(
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
flag.signal();
signal.signal();

for f in futures {
let _ = f.await;
Expand Down Expand Up @@ -1704,15 +1704,15 @@ pub async fn work_until_with_qps_latency_correction(
let _ = f.await;
}
} else {
let flag = Flag::new();
let mut signal = Signal::new();

let futures = (0..n_connections)
.map(|_| {
let client = client.clone();
let mut client_state = ClientStateHttp1::default();
let report_tx = report_tx.clone();
let rx = rx.clone();
let flag = flag.clone();
let signal = signal.recv_signal();
tokio::spawn(async move {
tokio::select! {
_ = async {
Expand All @@ -1726,7 +1726,7 @@ pub async fn work_until_with_qps_latency_correction(
}
}
} => {}
_ = flag => {
_ = signal => {
report_tx.send_async(Err(ClientError::Deadline)).await.unwrap();
}
}
Expand All @@ -1735,7 +1735,7 @@ pub async fn work_until_with_qps_latency_correction(
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
flag.signal();
signal.signal();

for f in futures {
let _ = f.await;
Expand Down
51 changes: 0 additions & 51 deletions src/flag.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use url::Url;
use url_generator::UrlGenerator;

mod client;
mod flag;
mod histogram;
mod monitor;
mod printer;
mod result_data;
mod signal;
mod timescale;
mod url_generator;

Expand Down
62 changes: 62 additions & 0 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use futures::future::Future;
use futures::task::{AtomicWaker, Context, Poll};
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, Weak};

struct SignalFutureInner {
waker: AtomicWaker,
woken: Arc<AtomicBool>,
}

pub struct SignalFuture(Arc<SignalFutureInner>);

pub struct Signal {
wakers: Vec<Weak<SignalFutureInner>>,
woken: Arc<AtomicBool>,
}

impl Signal {
pub fn new() -> Self {
Self {
wakers: Vec::new(),
woken: Arc::new(AtomicBool::new(false)),
}
}

pub fn signal(self) {
self.woken.store(true, Relaxed);

for waker in self.wakers {
if let Some(waker) = waker.upgrade() {
waker.waker.wake();
}
}
}

pub fn recv_signal(&mut self) -> SignalFuture {
let inner = Arc::new(SignalFutureInner {
waker: AtomicWaker::new(),
woken: self.woken.clone(),
});

self.wakers.push(Arc::downgrade(&inner));

SignalFuture(inner)
}
}

impl Future for SignalFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.0.waker.register(cx.waker());

if self.0.woken.load(Relaxed) {
Poll::Ready(())
} else {
Poll::Pending
}
}
}