From 981f98786c659bf1308e2db3704e474a291f7f5a Mon Sep 17 00:00:00 2001
From: reugn
A rate limiters package for Go.
-Pick one of the rate limiters to throttle requests and control quota. +The `equalizer` package provides a set of simple and easy-to-use rate limiters for Go. +These rate limiters can be used to limit the rate of requests to any resource, such as a database, +API, or file. + +The package includes the following rate limiters: * [Equalizer](#equalizer) * [Slider](#slider) * [TokenBucket](#tokenbucket) ## Equalizer -`Equalizer` is a rate limiter that manages quota based on previous requests' statuses and slows down or accelerates accordingly. +Equalizer is a rate limiter that adjusts the rate of requests based on the outcomes of previous requests. +If previous attempts have failed, Equalizer will slow down the rate of requests to avoid overloading the system. +Conversely, if previous attempts have been successful, Equalizer will accelerate the rate of requests to make +the most of the available capacity. ### Usage ```go @@ -24,14 +30,14 @@ offset := equalizer.NewRandomOffset(96) eq := equalizer.NewEqualizer(96, 16, offset) // non-blocking quota request -haveQuota := eq.Ask() +haveQuota := eq.TryAcquire() -// update with ten previous successful requests -eq.Notify(true, 10) +// update on successful request +eq.Success() ``` ### Benchmarks -```sh +```console BenchmarkEqualizerShortAskStep-16 30607452 37.5 ns/op 0 B/op 0 allocs/op BenchmarkEqualizerShortAskRandom-16 31896340 34.5 ns/op 0 B/op 0 allocs/op BenchmarkEqualizerShortNotify-16 12715494 81.9 ns/op 0 B/op 0 allocs/op @@ -41,49 +47,54 @@ BenchmarkEqualizerLongNotify-16 59935 20343 ns/op ``` ## Slider -`Slider` rate limiter is based on a sliding window with a specified quota capacity. -Implements the `Limiter` interface. +Slider tracks the number of requests that have been processed in a recent time window. +If the number of requests exceeds the limit, the rate limiter will block new requests until the window +has moved forward. +Implements the `equalizer.Limiter` interface. ### Usage ```go // a Slider with one second window size, 100 millis sliding interval // and the capacity of 32 -slider := equalizer.NewSlider(time.Second, time.Millisecond*100, 32) +slider := equalizer.NewSlider(time.Second, 100*time.Millisecond, 32) // non-blocking quota request -haveQuota := slider.Ask() +haveQuota := slider.TryAcquire() // blocking call -slider.Take() +slider.Acquire(context.Background()) ``` ### Benchmarks -```sh +```console BenchmarkSliderShortWindow-16 123488035 9.67 ns/op 0 B/op 0 allocs/op BenchmarkSliderLongerWindow-16 128023276 9.76 ns/op 0 B/op 0 allocs/op ``` ## TokenBucket -`TokenBucket` rate limiter is based on the token bucket algorithm with a refill interval. -Implements the `Limiter` interface. +TokenBucket maintains a fixed number of tokens. Each token represents a request that can be processed. +When a request is made, the rate limiter checks to see if there are any available tokens. If there are, +the request is processed and one token is removed from the bucket. If there are no available tokens, +the request is blocked until a token becomes available. +Implements the `equalizer.Limiter` interface. ### Usage ```go // a TokenBucket with the capacity of 32 and 100 millis refill interval -tokenBucket := equalizer.NewTokenBucket(32, time.Millisecond*100) +tokenBucket := equalizer.NewTokenBucket(32, 100*time.Millisecond) // non-blocking quota request -haveQuota := tokenBucket.Ask() +haveQuota := tokenBucket.TryAcquire() // blocking call -tokenBucket.Take() +tokenBucket.Acquire(context.Background()) ``` ### Benchmarks -```sh +```console BenchmarkTokenBucketDenseRefill-16 212631714 5.64 ns/op 0 B/op 0 allocs/op BenchmarkTokenBucketSparseRefill-16 211491368 5.63 ns/op 0 B/op 0 allocs/op ``` ## License -Licensed under the MIT License. +Licensed under the [MIT](./LICENSE) License. diff --git a/bench_test.go b/bench_test.go index fb97484..770d7b6 100644 --- a/bench_test.go +++ b/bench_test.go @@ -11,7 +11,7 @@ func BenchmarkEqualizerShortAskStep(b *testing.B) { eq := NewEqualizer(96, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Ask() + eq.TryAcquire() } } @@ -20,7 +20,7 @@ func BenchmarkEqualizerShortAskRandom(b *testing.B) { eq := NewEqualizer(96, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Ask() + eq.TryAcquire() } } @@ -29,7 +29,7 @@ func BenchmarkEqualizerShortNotify(b *testing.B) { eq := NewEqualizer(96, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Notify(false, 1) + eq.Failure() } } @@ -38,7 +38,7 @@ func BenchmarkEqualizerLongAskStep(b *testing.B) { eq := NewEqualizer(1048576, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Ask() + eq.TryAcquire() } } @@ -47,7 +47,7 @@ func BenchmarkEqualizerLongAskRandom(b *testing.B) { eq := NewEqualizer(1048576, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Ask() + eq.TryAcquire() } } @@ -56,34 +56,38 @@ func BenchmarkEqualizerLongNotify(b *testing.B) { eq := NewEqualizer(1048576, 16, offset) b.ResetTimer() for i := 0; i < b.N; i++ { - eq.Notify(false, 1) + eq.Failure() } } func BenchmarkSliderShortWindow(b *testing.B) { slider := NewSlider(time.Millisecond*100, time.Millisecond*10, 32) + b.ResetTimer() for i := 0; i < b.N; i++ { - slider.Ask() + slider.TryAcquire() } } func BenchmarkSliderLongerWindow(b *testing.B) { slider := NewSlider(time.Second, time.Millisecond*100, 32) + b.ResetTimer() for i := 0; i < b.N; i++ { - slider.Ask() + slider.TryAcquire() } } func BenchmarkTokenBucketDenseRefill(b *testing.B) { tokenBucket := NewTokenBucket(32, time.Millisecond*10) + b.ResetTimer() for i := 0; i < b.N; i++ { - tokenBucket.Ask() + tokenBucket.TryAcquire() } } func BenchmarkTokenBucketSparseRefill(b *testing.B) { tokenBucket := NewTokenBucket(32, time.Second) + b.ResetTimer() for i := 0; i < b.N; i++ { - tokenBucket.Ask() + tokenBucket.TryAcquire() } } diff --git a/equalizer.go b/equalizer.go index ee6a4f3..94e5a4f 100644 --- a/equalizer.go +++ b/equalizer.go @@ -2,116 +2,106 @@ package equalizer import ( "math/big" + "strings" "sync" ) -// An Equalizer represents a bitmap based adaptive rate limiter. -// The quota management algorithm is based on a Round-robin bitmap tape with a moving head. +// An Equalizer represents a bitmap-based adaptive rate limiter. // -// An Equalizer is safe for use by multiple goroutines simultaneously. -// -// Use Ask function to request quota. -// Use Notify to update the bitmap tape with previous requests' statuses. +// The Equalizer uses a round-robin bitmap tape with a moving head to manage +// quotas. +// The quota management algorithm is simple and works in the following way. +// To request a permit in a non-blocking manner use the TryAcquire method. +// The Equalizer will locate the appropriate position on the tape using the +// offset manager and return the value, denoting whether the request is allowed +// or not. To update the tape state, a notification method (Success or Failure) +// should be invoked based on the operation status. +// The Reset and Purge methods allow for immediate transition of the limiter +// state to whether permissive or restrictive. // +// An Equalizer is safe for use by multiple goroutines simultaneously. type Equalizer struct { sync.RWMutex - // tape is the underlying bitmap tape tape *big.Int - - // mask is the positive bits mask + // seed is the initial state of the bitmap tape + seed *big.Int + // mask is the positive bitmask mask *big.Int - // offset is the next index offset manager offset Offset - // size is the bitmap tape size size int - - // reserved is the number of reserved positive bits - reserved int } -// NewEqualizer allocates and returns a new Equalizer rate limiter. -// -// len is the size of the bitmap. -// reserved is the number of reserved positive bits. -// offset is the equalizer.Offset strategy instance. -func NewEqualizer(size int, reserved int, offset Offset) *Equalizer { - // init the bitmap tape - var tape big.Int - fill(&tape, 0, size, 1) +// NewEqualizer instantiates and returns a new Equalizer rate limiter, where +// len is the size of the bitmap, reserved is the number of reserved positive +// bits and offset is an instance of the equalizer.Offset strategy. +func NewEqualizer(size, reserved int, offset Offset) *Equalizer { + // init the seed bitmap tape + var seed big.Int + seed.SetString(strings.Repeat("1", size), 2) - // init the positive bits mask + // init the positive bitmask var mask big.Int - fill(&mask, 0, reserved, 1) + mask.SetString(strings.Repeat("1", reserved), 2) mask.Lsh(&mask, uint(size-reserved)) + // init the bitmap tape + var tape big.Int + tape.Set(&seed) + return &Equalizer{ - tape: &tape, - mask: &mask, - offset: offset, - size: size, - reserved: reserved, + tape: &tape, + seed: &seed, + mask: &mask, + offset: offset, + size: size, } } -// Ask moves the tape head to the next index and returns the value. -func (eq *Equalizer) Ask() bool { +// TryAcquire moves the tape head to the next index and returns the value. +func (eq *Equalizer) TryAcquire() bool { eq.RLock() defer eq.RUnlock() - head := eq.next() + head := eq.offset.NextIndex() return eq.tape.Bit(head) > 0 } -// Notify shifts the tape left by n bits and appends the specified value n times. -// Use n > 1 to update the same value in a bulk to gain performance. -// value is the boolean representation of a bit value (false -> 0, true -> 1). -// n is the number of bits to set. -func (eq *Equalizer) Notify(value bool, n uint) { - eq.Lock() - defer eq.Unlock() +// Success notifies the equalizer with a successful operation. +func (eq *Equalizer) Success() { + eq.notify(1) +} - bit := boolToUint(value) - var shift big.Int - fill(&shift, 0, int(n), bit) - eq.tape.Lsh(eq.tape, n) - fill(eq.tape, eq.size, eq.size+int(n), 0) - eq.tape.Or(eq.tape, &shift).Or(eq.tape, eq.mask) +// Failure notifies the equalizer with a failed operation. +func (eq *Equalizer) Failure() { + eq.notify(0) } -// ResetPositive resets the tape with positive bits. -func (eq *Equalizer) ResetPositive() { +// notify shifts the tape left by 1 bits and prepends the given value. +func (eq *Equalizer) notify(value uint) { eq.Lock() defer eq.Unlock() - fill(eq.tape, 0, eq.size, 1) + eq.tape.Lsh(eq.tape, 1). + SetBit(eq.tape, eq.size, 0). + Or(eq.tape, eq.mask). + SetBit(eq.tape, 0, value) } -// Reset resets the tape to initial state. +// Reset resets the tape to its initial state. func (eq *Equalizer) Reset() { eq.Lock() defer eq.Unlock() - fill(eq.tape, 0, eq.reserved, 1) - fill(eq.tape, eq.reserved, eq.size, 0) -} - -// next returns the next index of the tape head. -func (eq *Equalizer) next() int { - return eq.offset.NextIndex() + eq.tape.Set(eq.seed) } -func fill(n *big.Int, from int, to int, bit uint) { - for i := from; i < to; i++ { - n.SetBit(n, i, bit) - } -} +// Purge blanks out the tape to the positive bitmask state. +func (eq *Equalizer) Purge() { + eq.Lock() + defer eq.Unlock() -func boolToUint(b bool) uint { - if b { - return 1 - } - return 0 + eq.tape.Set(eq.mask) } diff --git a/equalizer_test.go b/equalizer_test.go index 5452c56..cde4f43 100644 --- a/equalizer_test.go +++ b/equalizer_test.go @@ -1,39 +1,48 @@ package equalizer import ( + "strings" "testing" ) func TestEqualizer(t *testing.T) { - offset := NewStepOffset(96, 15) - eq := NewEqualizer(96, 16, offset) + offset := NewStepOffset(32, 5) + eq := NewEqualizer(32, 8, offset) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") - assertEqual(t, eq.mask.Text(2), "111111111111111100000000000000000000000000000000000000000000000000000000000000000000000000000000") + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 32)) + assertEqual(t, eq.mask.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 24)) + assertEqual(t, eq.mask.Bit(0), uint(0)) - eq.Notify(false, 50) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111100000000000000000000000000000000000000000000000000") - - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), true) + for i := 0; i < 16; i++ { + eq.Failure() + } + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 16)+strings.Repeat("0", 16)) - eq.Notify(true, 10) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111000000000000000000000000000000000000000000000000001111111111") + assertEqual(t, eq.TryAcquire(), false) + assertEqual(t, eq.TryAcquire(), false) + assertEqual(t, eq.TryAcquire(), false) + assertEqual(t, eq.TryAcquire(), true) - eq.Notify(false, 1) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111110000000000000000000000000000000000000000000000000011111111110") + for i := 0; i < 10; i++ { + eq.Success() + } + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 14)+ + strings.Repeat("1", 10)) - eq.ResetPositive() - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") + eq.Failure() + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 13)+ + strings.Repeat("1", 10)+strings.Repeat("0", 1)) eq.Reset() - assertEqual(t, eq.mask.Text(2), "111111111111111100000000000000000000000000000000000000000000000000000000000000000000000000000000") + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 32)) + + eq.Purge() + assertEqual(t, eq.tape.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 24)) } func assertEqual(t *testing.T, a interface{}, b interface{}) { if a != b { + t.Helper() t.Fatalf("%s != %s", a, b) } } diff --git a/internal/async/task.go b/internal/async/task.go new file mode 100644 index 0000000..295b828 --- /dev/null +++ b/internal/async/task.go @@ -0,0 +1,45 @@ +package async + +import "time" + +// Callback represents a background task to run. +type Callback func() + +// Task orchestrates a recurring callback execution within an independent +// goroutine. +type Task struct { + interval time.Duration + stop chan struct{} +} + +// New returns a new Task configured to execute periodically at the +// specified interval. +func NewTask(interval time.Duration) *Task { + return &Task{ + interval: interval, + stop: make(chan struct{}), + } +} + +// Run initiates a goroutine to execute the provided callback at +// the specified interval. +func (t *Task) Run(callback Callback) { + go func() { + ticker := time.NewTicker(t.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + callback() + case <-t.stop: + return + } + } + }() +} + +// Stop signals the termination of the worker goroutine by closing the +// stop channel. +func (t *Task) Stop() { + close(t.stop) +} diff --git a/internal/sweeper.go b/internal/sweeper.go deleted file mode 100644 index a192afd..0000000 --- a/internal/sweeper.go +++ /dev/null @@ -1,26 +0,0 @@ -package internal - -import "time" - -// Callback represents a background task to run. -type Callback func() - -// Sweeper verifies shutting down background processing. -type Sweeper struct { - Interval time.Duration - Stop chan interface{} -} - -// Run Sweeper with the specified callback. -func (s *Sweeper) Run(callback Callback) { - ticker := time.NewTicker(s.Interval) - for { - select { - case <-ticker.C: - callback() - case <-s.Stop: - ticker.Stop() - return - } - } -} diff --git a/limiter.go b/limiter.go index 24f6706..a960491 100644 --- a/limiter.go +++ b/limiter.go @@ -1,9 +1,27 @@ package equalizer +import "context" + +// A token represents a single unit of capacity for the rate limiter. type token struct{} // Limiter represents a rate limiter. +// +// Rate limiters control the rate at which requests are processed by allocating +// a certain number of tokens. Each token represents the ability to process a +// single request. When a request is made, the limiter checks if there are any +// available tokens. If there are, it deducts one token and allows the request +// to proceed. If there are no tokens available, the request is blocked until a +// token becomes available. +// +// By controlling the number of tokens available, rate limiters can ensure that +// requests are processed at a controlled rate, preventing overloading and +// ensuring fair access to resources. type Limiter interface { - Ask() bool - Take() + // Acquire blocks the calling goroutine until a token is acquired, the Context + // is canceled, or the wait time exceeds the Context's Deadline. + Acquire(ctx context.Context) error + // TryAcquire attempts to acquire a token without blocking. + // Returns true if a token was acquired, false if no tokens are available. + TryAcquire() bool } diff --git a/offset.go b/offset.go index eb49864..353bbaa 100644 --- a/offset.go +++ b/offset.go @@ -5,42 +5,49 @@ import ( "sync/atomic" ) -// Offset is the Equalizer's tape offset manager interface. +// The Offset component is responsible for advancing the head position of the +// Equalizer tape after each request. Implementations of Offset must be thread-safe. type Offset interface { NextIndex() int } -// RandomOffset is the random based offset manager. +// RandomOffset is an offset manager that uses a random-based offset approach. type RandomOffset struct { Len int } -// NewRandomOffset allocates and returns a new RandomOffset. -// len is the bitmap length. +var _ Offset = (*RandomOffset)(nil) + +// NewRandomOffset returns a new RandomOffset, where len is the bitmap tape length. func NewRandomOffset(len int) *RandomOffset { - return &RandomOffset{len} + return &RandomOffset{Len: len} } -// NextIndex returns the next random index. +// NextIndex returns the next random index within a tape. func (ro *RandomOffset) NextIndex() int { return rand.Intn(ro.Len) } -// StepOffset is the step based offset manager. +// StepOffset is an offset manager that uses a fixed step approach. type StepOffset struct { - Len int - Step int64 - previousIndex int64 + Len int + Step int64 + prev int64 } -// NewStepOffset allocates and returns a new StepOffset. -// len is the bitmap length. -// step is the offset from the previous index. -func NewStepOffset(len int, step int64) *StepOffset { - return &StepOffset{len, step, 0} +var _ Offset = (*StepOffset)(nil) + +// NewStepOffset allocates and returns a new StepOffset, where len is the length +// of the bitmap tape and step is the offset to be taken from the previous position. +func NewStepOffset(len, step int) *StepOffset { + return &StepOffset{ + Len: len, + Step: int64(step), + } } -// NextIndex returns the next index in the Round-robin way. +// NextIndex returns the next index in a round-robin fashion, +// utilizing the specified step value to advance along the tape. func (so *StepOffset) NextIndex() int { - return int(atomic.AddInt64(&so.previousIndex, so.Step)) % so.Len + return int(atomic.AddInt64(&so.prev, so.Step)) % so.Len } diff --git a/slider.go b/slider.go index f99f7d2..e7507b8 100644 --- a/slider.go +++ b/slider.go @@ -1,11 +1,12 @@ package equalizer import ( + "context" "runtime" "sync/atomic" "time" - "github.com/reugn/equalizer/internal" + "github.com/reugn/equalizer/internal/async" ) // A Slider represents a rate limiter which is based on a sliding window @@ -17,11 +18,12 @@ import ( // not keep the main Slider object from being garbage collected. When it is // garbage collected, the finalizer stops the background goroutine, after // which the underlying slider can be collected. -// type Slider struct { slider *slider } +var _ Limiter = (*Slider)(nil) + type slider struct { window time.Duration slidingInterval time.Duration @@ -29,46 +31,46 @@ type slider struct { permits chan token issued chan int64 windowStart int64 - sweeper *internal.Sweeper + windowShifter *async.Task } -// NewSlider allocates and returns a new Slider rate limiter. -// -// window is the fixed duration of the sliding window. -// slidingInterval controls how frequently a new sliding window is started. +// NewSlider allocates and returns a new Slider rate limiter, where +// window is the fixed duration of the sliding window, +// slidingInterval controls how frequently a new sliding window is started and // capacity is the quota limit for the window. -func NewSlider(window time.Duration, slidingInterval time.Duration, capacity int) *Slider { +func NewSlider(window, slidingInterval time.Duration, capacity int) *Slider { underlying := &slider{ window: window, slidingInterval: slidingInterval, capacity: capacity, permits: make(chan token, capacity), issued: make(chan int64, capacity), + windowShifter: async.NewTask(slidingInterval), } - underlying.initSlider() - + underlying.init() + // initialize a goroutine responsible for periodically moving the + // window forward + underlying.windowShifter.Run(underlying.slide) slider := &Slider{ slider: underlying, } - - // start the sliding goroutine - goSlide(underlying, slidingInterval) - // the finalizer may run as soon as the slider becomes unreachable. - runtime.SetFinalizer(slider, stopSliderSweeper) + // the finalizer may run as soon as the slider becomes unreachable + runtime.SetFinalizer(slider, stopWindowShifter) return slider } -// initSlider prefills the permits channel. -func (s *slider) initSlider() { +// init initializes the permits channel by populating it with the +// specified number of tokens. +func (s *slider) init() { for i := 0; i < s.capacity; i++ { s.permits <- token{} } } -// doSlide starts a new sliding window. -func (s *slider) doSlide() { - atomic.StoreInt64(&s.windowStart, nowNano()) +// slide moves the sliding window forward. +func (s *slider) slide() { + atomic.StoreInt64(&s.windowStart, time.Now().UnixNano()) for ts := range s.issued { s.permits <- token{} if ts > s.windowStart { @@ -77,9 +79,21 @@ func (s *slider) doSlide() { } } -// Ask requires a permit. -// It is a non blocking call, returns true or false. -func (s *Slider) Ask() bool { +// Acquire blocks the calling goroutine until a token is acquired, the Context +// is canceled, or the wait time exceeds the Context's Deadline. +func (s *Slider) Acquire(ctx context.Context) error { + select { + case <-s.slider.permits: + s.slider.issued <- atomic.LoadInt64(&s.slider.windowStart) + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// TryAcquire attempts to acquire a token without blocking. +// returns true if a token was acquired, false if no tokens are available. +func (s *Slider) TryAcquire() bool { select { case <-s.slider.permits: s.slider.issued <- atomic.LoadInt64(&s.slider.windowStart) @@ -89,30 +103,8 @@ func (s *Slider) Ask() bool { } } -// Take blocks to get a permit. -func (s *Slider) Take() { - <-s.slider.permits - s.slider.issued <- atomic.LoadInt64(&s.slider.windowStart) -} - -func nowNano() int64 { - return time.Now().UTC().UnixNano() -} - -// stopSliderSweeper is the callback to stop the sliding goroutine. -func stopSliderSweeper(s *Slider) { - s.slider.sweeper.Stop <- struct{}{} +// stopWindowShifter is the callback function used to terminate the goroutine +// responsible for periodically moving the sliding window. +func stopWindowShifter(s *Slider) { + s.slider.windowShifter.Stop() } - -// goSlide starts the sliding goroutine. -func goSlide(slider *slider, slidingInterval time.Duration) { - sweeper := &internal.Sweeper{ - Interval: slidingInterval, - Stop: make(chan interface{}), - } - slider.sweeper = sweeper - go sweeper.Run(slider.doSlide) -} - -// Verify Slider satisfies the equalizer.Limiter interface. -var _ Limiter = (*Slider)(nil) diff --git a/slider_test.go b/slider_test.go index 3965333..a2c6c13 100644 --- a/slider_test.go +++ b/slider_test.go @@ -1,25 +1,26 @@ package equalizer import ( + "context" "testing" "time" ) func TestSlider(t *testing.T) { - slider := NewSlider(time.Second, time.Millisecond*100, 32) + slider := NewSlider(time.Second, 100*time.Millisecond, 32) var quota bool for i := 0; i < 32; i++ { - quota = slider.Ask() + quota = slider.TryAcquire() } assertEqual(t, quota, true) - quota = slider.Ask() + quota = slider.TryAcquire() assertEqual(t, quota, false) - time.Sleep(time.Millisecond * 1010) + time.Sleep(1010 * time.Millisecond) - quota = slider.Ask() + quota = slider.TryAcquire() assertEqual(t, quota, true) - slider.Take() + slider.Acquire(context.Background()) } diff --git a/token_bucket.go b/token_bucket.go index 06a351d..515a97e 100644 --- a/token_bucket.go +++ b/token_bucket.go @@ -1,11 +1,11 @@ package equalizer import ( + "context" "runtime" - "sync/atomic" "time" - "github.com/reugn/equalizer/internal" + "github.com/reugn/equalizer/internal/async" ) // A TokenBucket represents a rate limiter based on a custom implementation of the @@ -18,45 +18,43 @@ import ( // not keep the main TokenBucket object from being garbage collected. When it is // garbage collected, the finalizer stops the background goroutine, after // which the underlying tokenBucket can be collected. -// type TokenBucket struct { t *tokenBucket } +var _ Limiter = (*TokenBucket)(nil) + type tokenBucket struct { - capacity int32 + capacity int refillInterval time.Duration permits chan token - issued int32 - sweeper *internal.Sweeper + tokenSupplier *async.Task } -// NewTokenBucket allocates and returns a new TokenBucket rate limiter. -// -// capacity is the token bucket capacity. -// refillInterval is the bucket refill interval. -func NewTokenBucket(capacity int32, refillInterval time.Duration) *TokenBucket { +// NewTokenBucket allocates and returns a new TokenBucket rate limiter, where +// capacity is the capacity of the token bucket and refillInterval is the token +// bucket refill interval. +func NewTokenBucket(capacity int, refillInterval time.Duration) *TokenBucket { underlying := &tokenBucket{ capacity: capacity, refillInterval: refillInterval, permits: make(chan token, capacity), - issued: 0, + tokenSupplier: async.NewTask(refillInterval), } - underlying.fillTokenBucket(int(capacity)) - + underlying.fillTokenBucket(capacity) + // initialize a goroutine responsible for periodically refilling the + // token bucket + underlying.tokenSupplier.Run(underlying.refill) tokenBucket := &TokenBucket{ t: underlying, } - - // start the refilling goroutine - goRefill(underlying, refillInterval) - // the finalizer may run as soon as the tokenBucket becomes unreachable. - runtime.SetFinalizer(tokenBucket, stopSweeper) + // the finalizer may run as soon as the tokenBucket becomes unreachable + runtime.SetFinalizer(tokenBucket, stopTokenSupplier) return tokenBucket } -// fillTokenBucket fills the permits channel. +// fillTokenBucket fills up the permits channel with the capacity number of tokens. func (tb *tokenBucket) fillTokenBucket(capacity int) { for i := 0; i < capacity; i++ { tb.permits <- token{} @@ -65,42 +63,34 @@ func (tb *tokenBucket) fillTokenBucket(capacity int) { // refill refills the token bucket. func (tb *tokenBucket) refill() { - issued := atomic.SwapInt32(&tb.issued, 0) - tb.fillTokenBucket(int(issued)) + issued := tb.capacity - len(tb.permits) + tb.fillTokenBucket(issued) +} + +// Acquire blocks the calling goroutine until a token is acquired, the Context +// is canceled, or the wait time exceeds the Context's Deadline. +func (tb *TokenBucket) Acquire(ctx context.Context) error { + select { + case <-tb.t.permits: + return nil + case <-ctx.Done(): + return ctx.Err() + } } -// Ask requires a permit. -// It is a non blocking call, returns true or false. -func (tb *TokenBucket) Ask() bool { +// TryAcquire attempts to acquire a token without blocking. +// Returns true if a token was acquired, false if no tokens are available. +func (tb *TokenBucket) TryAcquire() bool { select { case <-tb.t.permits: - atomic.AddInt32(&tb.t.issued, 1) return true default: return false } } -// Take blocks to get a permit. -func (tb *TokenBucket) Take() { - <-tb.t.permits - atomic.AddInt32(&tb.t.issued, 1) +// stopTokenSupplier is the callback function used to terminate the +// goroutine responsible for periodically refilling the token bucket. +func stopTokenSupplier(t *TokenBucket) { + t.t.tokenSupplier.Stop() } - -// stopSweeper is the callback to stop the refilling goroutine. -func stopSweeper(t *TokenBucket) { - t.t.sweeper.Stop <- struct{}{} -} - -// goRefill starts the refilling goroutine. -func goRefill(t *tokenBucket, refillInterval time.Duration) { - sweeper := &internal.Sweeper{ - Interval: refillInterval, - Stop: make(chan interface{}), - } - t.sweeper = sweeper - go sweeper.Run(t.refill) -} - -// Verify TokenBucket satisfies the equalizer.Limiter interface. -var _ Limiter = (*TokenBucket)(nil) diff --git a/token_bucket_test.go b/token_bucket_test.go index ad9eec8..b480e39 100644 --- a/token_bucket_test.go +++ b/token_bucket_test.go @@ -1,25 +1,26 @@ package equalizer import ( + "context" "testing" "time" ) func TestTokenBucket(t *testing.T) { - tokenBucket := NewTokenBucket(32, time.Millisecond*100) + tokenBucket := NewTokenBucket(32, 100*time.Millisecond) var quota bool for i := 0; i < 32; i++ { - quota = tokenBucket.Ask() + quota = tokenBucket.TryAcquire() } assertEqual(t, quota, true) - quota = tokenBucket.Ask() + quota = tokenBucket.TryAcquire() assertEqual(t, quota, false) - time.Sleep(time.Millisecond * 110) + time.Sleep(110 * time.Millisecond) - quota = tokenBucket.Ask() + quota = tokenBucket.TryAcquire() assertEqual(t, quota, true) - tokenBucket.Take() + tokenBucket.Acquire(context.Background()) } From 16043dbcc3904580212e76c777313781bce2fd50 Mon Sep 17 00:00:00 2001 From: reugn- + diff --git a/equalizer.go b/equalizer.go index 5dbb662..a70d4d1 100644 --- a/equalizer.go +++ b/equalizer.go @@ -7,9 +7,9 @@ import ( "sync" ) -// An Equalizer represents a bitmap-based adaptive rate limiter. +// An Equalizer represents an adaptive rate limiter based on a bit array. // -// The Equalizer uses a round-robin bitmap tape with a moving head to manage +// The Equalizer uses a round-robin bit array with a moving head to manage // quotas. // The quota management algorithm is simple and works in the following way. // To request a permit in a non-blocking manner use the TryAcquire method. @@ -23,9 +23,9 @@ import ( // An Equalizer is safe for use by multiple goroutines simultaneously. type Equalizer struct { sync.RWMutex - // tape is the underlying bitmap tape + // tape is the underlying bit array tape *big.Int - // seed is the initial state of the bitmap tape + // seed is the initial state of the bit array tape seed *big.Int // mask is the positive bitmask mask *big.Int @@ -38,8 +38,8 @@ type Equalizer struct { } // NewEqualizer instantiates and returns a new Equalizer rate limiter, where -// len is the size of the bitmap, reserved is the number of reserved positive -// bits and offset is an instance of the equalizer.Offset strategy. +// size is the length of the bit array, reserved is the number of reserved +// positive bits and offset is an instance of the equalizer.Offset strategy. func NewEqualizer(size, reserved int, offset Offset) (*Equalizer, error) { if offset == nil { return nil, fmt.Errorf("offset is nil") @@ -53,7 +53,7 @@ func NewEqualizer(size, reserved int, offset Offset) (*Equalizer, error) { if reserved > size { return nil, fmt.Errorf("reserved must not exceed size") } - // init the seed bitmap tape + // init the seed bit array var seed big.Int seed.SetString(strings.Repeat("1", size), 2) @@ -62,7 +62,7 @@ func NewEqualizer(size, reserved int, offset Offset) (*Equalizer, error) { mask.SetString(strings.Repeat("1", reserved), 2) mask.Lsh(&mask, uint(size-reserved)) - // init the bitmap tape + // init the operational bit array tape var tape big.Int tape.Set(&seed) diff --git a/internal/async/task.go b/internal/async/task.go index 295b828..68d1abd 100644 --- a/internal/async/task.go +++ b/internal/async/task.go @@ -12,7 +12,7 @@ type Task struct { stop chan struct{} } -// New returns a new Task configured to execute periodically at the +// NewTask returns a new Task configured to execute periodically at the // specified interval. func NewTask(interval time.Duration) *Task { return &Task{