Skip to content

Commit

Permalink
Document features and implement drop release
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed May 12, 2023
1 parent adc38b0 commit 4f69ab4
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ readme = "README.md"
homepage = "https://github.com/udoprog/leaky-bucket"
repository = "https://github.com/udoprog/leaky-bucket"
license = "MIT/Apache-2.0"
keywords = ["ratelimit", "throttle", "tokenbucket", "async", "futures"]
keywords = ["async", "futures", "ratelimit", "throttle", "tokenbucket"]
categories = ["algorithms", "concurrency", "network-programming"]

[dependencies]
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ of tokens has been drained from the bucket.
Since this crate uses timing facilities from tokio it has to be used within
a Tokio runtime with the [`time` feature] enabled.

This library has some neat features, which includes:
* Not requiring a background thread or task to *drip*. Instead, one of the
waiting tasks assumes the duty as coordinator called the *core*. See below
for more details.
* Dropped tasks releases their partially acquired tokens.

<br>

## Usage
Expand Down
47 changes: 41 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@
//! Since this crate uses timing facilities from tokio it has to be used within
//! a Tokio runtime with the [`time` feature] enabled.
//!
//! This library has some neat features, which includes:
//!
//! **Not requiring a background task**. This is usually needed by token bucket
//! rate limiters to drive progress. Instead, one of the waiting tasks
//! temporarily assumes the role as coordinator (called the *core*). This
//! reduces the amount of tasks needing to sleep, which can be a source of
//! jitter for imprecise sleeping implementations and tight limiters. See below
//! for more details.
//!
//! **Dropped tasks** release any resources they've reserved. So that
//! constructing and cancellaing asynchronous tasks to not end up taking up wait
//! slots it never uses which would be the case for cell-based rate limiters.
//!
//! <br>
//!
//! ## Usage
Expand Down Expand Up @@ -195,6 +208,9 @@

extern crate alloc;

#[macro_use]
extern crate std;

use core::cell::UnsafeCell;
use core::convert::TryFrom as _;
use core::fmt;
Expand Down Expand Up @@ -896,6 +912,27 @@ impl AcquireState {
}
}

/// Release any remaining tokens which are associated with this particular task.
unsafe fn release_remaining(
&mut self,
critical: &mut MutexGuard<'_, Critical>,
permits: usize,
lim: &RateLimiter,
) {
if mem::take(&mut self.linked) {
critical.waiters.remove(self.task_mut().into());
}

// Hand back permits which we've acquired so far.
let release = permits.saturating_sub(self.linking.get_mut().task.remaining);

// Temporarily assume the role of core and release the remaining
// tokens to waiting tasks.
if release > 0 {
self.drain_wait_queue(critical, release, lim);
}
}

/// Refill the wait queue with the given number of tokens.
#[tracing::instrument(skip(self, critical, lim), level = "trace")]
fn drain_wait_queue(
Expand Down Expand Up @@ -1356,15 +1393,13 @@ where
// ensure it's only accessed under a lock, but once it's been
// unlinked we can do what we want with it.
let mut critical = lim.critical.lock();
critical.waiters.remove(self.internal.task_mut().into());
self.internal
.release_remaining(&mut critical, self.permits, lim);
},
State::Core { .. } => unsafe {
let mut critical = lim.critical.lock();

if mem::take(&mut self.internal.linked) {
critical.waiters.remove(self.internal.task_mut().into());
}

self.internal
.release_remaining(&mut critical, self.permits, lim);
critical.release();
},
_ => (),
Expand Down
57 changes: 57 additions & 0 deletions tests/test_drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, Instant};

use leaky_bucket::RateLimiter;
use tokio::task::JoinSet;
use tokio::time::sleep;

#[tokio::test]
async fn test_drop() -> anyhow::Result<()> {
let limiter = Arc::new(
RateLimiter::builder()
.initial(0)
.refill(10)
.interval(Duration::from_millis(50))
.max(100)
.build(),
);

let limiter = limiter.clone();

let mut task = pin!(Some(limiter.acquire(10000)));

let mut join_set = JoinSet::new();

for _ in 0..10 {
let limiter = limiter.clone();

join_set.spawn(async move {
limiter.acquire(10).await;
});
}

tokio::select! {
_ = sleep(Duration::from_millis(1000)) => {
// Drop the task
task.set(None);
}
_ = Option::as_pin_mut(task.as_mut()).unwrap() => {
}
// Should never complete, because we have a giant task waiting.
_ = join_set.join_next() => {
}
}

let mut released = 0;

let start = Instant::now();

while join_set.join_next().await.is_some() {
released += 1;
}

assert!(Instant::now().duration_since(start).as_millis() <= 10);
assert_eq!(released, 10);
Ok(())
}

0 comments on commit 4f69ab4

Please sign in to comment.