diff --git a/src/bench.rs b/src/bench.rs index dbac8a6..232ec07 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -380,7 +380,7 @@ pub(crate) fn init(text: &str) -> (BenchKVMap, Vec>) { (map, phases) } -fn bench_phase_should_break( +fn bench_repeat_should_break( len: &Length, count: u64, start: Instant, @@ -631,28 +631,20 @@ fn bench_stat_final( /// throughput is achieved. Otherwise, it does nothing. struct RateLimiter { ops: u64, + start: Instant, } impl RateLimiter { - fn new(kops: u64, nr_threads: usize) -> Self { - let ops = match kops { - 0 => 0, - r => { - let per = r * 1000 / u64::try_from(nr_threads).unwrap() + 1; - per - } - }; - Self { ops } + fn new(kops: u64, nr_threads: usize, start: Instant) -> Self { + assert!(kops > 0); + let ops = kops * 1000 / u64::try_from(nr_threads).unwrap(); + Self { ops, start } } /// Returns whether the backoff is done. #[inline(always)] - fn try_backoff(&self, count: u64, start: Instant) -> bool { - if self.ops == 0 { - return true; - } - // self.kops is the target rate in kops, which is op/ms - let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); + fn try_backoff(&self, count: u64) -> bool { + let elapsed = u64::try_from(self.start.elapsed().as_nanos()).unwrap(); let ops = count * 1_000_000_000 / elapsed; if ops <= self.ops { return true; @@ -662,15 +654,9 @@ impl RateLimiter { /// Blocking backoff. #[inline(always)] - fn backoff(&self, count: u64, start: Instant) { - if self.ops == 0 { - return; - } - // self.kops is the target rate in kops, which is op/ms + fn backoff(&self, count: u64) { loop { - let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); - let ops = count * 1_000_000_000 / elapsed; - if ops <= self.ops { + if self.try_backoff(count) { break; } } @@ -701,17 +687,23 @@ fn bench_worker_regular(map: Arc>, context: WorkerContext) { Some(_) => || Some(Instant::now()), None => || None, }; - let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1); let mut handle = map.handle(); let mut rng = rand::thread_rng(); let mut workload = Workload::new(&benchmark.wopt, Some(thread_info)); - let start = Instant::now(); // for thread 0 + let start = Instant::now(); + for i in 0..benchmark.repeat { let counter = measurements[id].counters[i].reference(); // start the benchmark phase at roughly the same time barrier.wait(); let start = Instant::now(); + + let rate_limiter = match benchmark.rate_limit { + 0 => None, + r => Some(RateLimiter::new(r, thread_info.1, start)), + }; + // start benchmark loop { let op = workload.next(&mut rng); @@ -731,16 +723,17 @@ fn bench_worker_regular(map: Arc>, context: WorkerContext) { } } let op_end = latency_tick(); - if let Some(ref mut l) = latency { + if let Some(l) = &mut latency { l.record(op_end.unwrap() - op_start.unwrap()); } *counter += 1; - // internally it will not do anything if no rate limiter is in place - rate_limiter.backoff(*counter, start); + if let Some(r) = &rate_limiter { + r.backoff(*counter); + } // check if we need to break - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } @@ -810,7 +803,6 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { true => Some(measurements[id].latency.lock()), false => None, }; - let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1); let responder = Rc::new(RefCell::new(Vec::::new())); let mut handle = map.handle(responder.clone()); @@ -820,12 +812,19 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { let mut pending = 0usize; let mut requests = Vec::::with_capacity(benchmark.batch); let mut rid = 0usize; - let start = Instant::now(); // for thread 0 + let start = Instant::now(); + for i in 0..benchmark.repeat { let counter = measurements[id].counters[i].reference(); // start the benchmark phase at roughly the same time barrier.wait(); let start = Instant::now(); + + let rate_limiter = match benchmark.rate_limit { + 0 => None, + r => Some(RateLimiter::new(r, thread_info.1, start)), + }; + // start benchmark loop { // first clear the requests vector @@ -839,22 +838,31 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { // otherwise the last check may fail because the time check is after a certain // interval, but the mod is never 0 *counter += 1; - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + // stop the batch generation if the repeat is done + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { break; } + // if a rate limiter is in place and no further backoff is needed, break early to + // send the batch immediately + if let Some(r) = &rate_limiter { + if r.try_backoff(*counter) { + break; + } + } } + // now we have a batch, send it all, whatever its size is let len = requests.len(); handle.submit(&requests); pending += len; - if let Some(ref mut l) = latency { + if let Some(l) = &mut latency { let submit = Instant::now(); for r in requests.iter() { l.async_register(r.id, submit); } } - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } @@ -867,15 +875,25 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { handle.drain(); let responses = responder.replace_with(|_| Vec::new()); pending -= responses.len(); - if let Some(ref mut l) = latency { + if let Some(l) = &mut latency { let submit = Instant::now(); for r in responses.iter() { l.async_record(r.id, submit); } } } - // if the pending queue is under depth (can be 0), and backoff is done - if pending <= benchmark.qd && rate_limiter.try_backoff(*counter, start) { + let backoff_free = match &rate_limiter { + None => true, + Some(r) => { + if r.try_backoff(*counter) { + true + } else { + false + } + } + }; + // if the pending queue is under depth (can be 0) and no further backoff is needed + if pending <= benchmark.qd && backoff_free { break; } } @@ -914,7 +932,7 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { handle.drain(); let responses = responder.replace_with(|_| Vec::new()); pending -= responses.len(); - if let Some(ref mut l) = latency { + if let Some(l) = &mut latency { let submit = Instant::now(); for r in responses.iter() { l.async_record(r.id, submit);