diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..f65e1e4 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,33 @@ +name: Build + +on: + push: + branches: + - '**' + pull_request: + branches: + - master + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + go-version: [1.19.x, 1.22.x] + steps: + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run coverage + run: go test -race ./... -coverprofile=coverage.out -covermode=atomic + + - name: Upload coverage to Codecov + if: ${{ matrix.go-version == '1.19.x' }} + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 0000000..3226b45 --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,28 @@ +name: golangci-lint + +on: + push: + branches: + - master + pull_request: + +permissions: + contents: read + +jobs: + golangci: + name: lint + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version: '1.22' + cache: false + + - name: golangci-lint + uses: golangci/golangci-lint-action@v4 + with: + version: v1.56 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 0a95508..8b90e1d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea/ -.vscode/ \ No newline at end of file +.vscode/ +coverage.out +go.work diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..50951ba --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,34 @@ +linters: + disable-all: true + enable: + - dupl + - errcheck + - errorlint + - exportloopref + - funlen + - gci + - goconst + - gocritic + - gocyclo + - gofmt + - goimports + - gosimple + - govet + - ineffassign + - lll + - misspell + - prealloc + - revive + - staticcheck + - stylecheck + - typecheck + - unconvert + - unparam + - unused + +issues: + exclude-rules: + - path: _test\.go + linters: + - unparam + - funlen \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index c1e7469..0000000 --- a/.travis.yml +++ /dev/null @@ -1,16 +0,0 @@ -language: go - -go: - - "1.15" - -os: - - linux - -env: - - GO111MODULE=on - -script: - - go test ./... -coverprofile=coverage.txt -covermode=atomic - -after_success: - - bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/README.md b/README.md index 8ed3dc8..197e513 100644 --- a/README.md +++ b/README.md @@ -1,89 +1,95 @@
A rate limiters package for Go.
-Pick one of the rate limiters to throttle requests and control quota. +The `equalizer` package provides a set of simple and easy-to-use rate limiters for Go. +These rate limiters can be used to limit the rate of requests to any resource, such as a database, +API, or file. + +The package includes the following rate limiters: * [Equalizer](#equalizer) * [Slider](#slider) -* [TokenBucket](#tokenbucket) +* [Token Bucket](#token-bucket) ## Equalizer -`Equalizer` is a rate limiter that manages quota based on previous requests' statuses and slows down or accelerates accordingly. +`Equalizer` is a rate limiter that adjusts the rate of requests based on the outcomes of previous requests. +If previous attempts have failed, Equalizer will slow down the rate of requests to avoid overloading the system. +Conversely, if previous attempts have been successful, Equalizer will accelerate the rate of requests to make +the most of the available capacity. -### Usage +### Usage Example ```go +// a random offset manager offset := equalizer.NewRandomOffset(96) - -// an Equalizer with the bitmap size of 96 with 16 reserved -// positive bits and the random offset manager +// an Equalizer with a bitmap size of 96, 16 reserved positive bits, and the random offset manager eq := equalizer.NewEqualizer(96, 16, offset) - // non-blocking quota request -haveQuota := eq.Ask() - -// update with ten previous successful requests -eq.Notify(true, 10) +haveQuota := eq.TryAcquire() +// update on the successful request +eq.Success(1) ``` ### Benchmarks -```sh -BenchmarkEqualizerShortAskStep-16 30607452 37.5 ns/op 0 B/op 0 allocs/op -BenchmarkEqualizerShortAskRandom-16 31896340 34.5 ns/op 0 B/op 0 allocs/op -BenchmarkEqualizerShortNotify-16 12715494 81.9 ns/op 0 B/op 0 allocs/op -BenchmarkEqualizerLongAskStep-16 34627239 35.4 ns/op 0 B/op 0 allocs/op -BenchmarkEqualizerLongAskRandom-16 32399748 34.0 ns/op 0 B/op 0 allocs/op -BenchmarkEqualizerLongNotify-16 59935 20343 ns/op 0 B/op 0 allocs/op +```console +BenchmarkEqualizer_ShortTryAcquireStep-16 31538967 38.33 ns/op 0 B/op 0 allocs/op +BenchmarkEqualizer_ShortTryAcquireRandom-16 37563639 31.66 ns/op 0 B/op 0 allocs/op +BenchmarkEqualizer_ShortNotify-16 29519719 40.43 ns/op 0 B/op 0 allocs/op +BenchmarkEqualizer_LongTryAcquireStep-16 32084402 38.36 ns/op 0 B/op 0 allocs/op +BenchmarkEqualizer_LongTryAcquireRandom-16 39996501 30.37 ns/op 0 B/op 0 allocs/op +BenchmarkEqualizer_LongNotify-16 29648655 40.46 ns/op 0 B/op 0 allocs/op ``` ## Slider -`Slider` rate limiter is based on a sliding window with a specified quota capacity. -Implements the `Limiter` interface. +`Slider` tracks the number of requests that have been processed in a recent time window. +If the number of requests exceeds the limit, the rate limiter will block new requests until the window +has moved forward. +Implements the `equalizer.Limiter` interface. -### Usage +### Usage Example ```go -// a Slider with one second window size, 100 millis sliding interval -// and the capacity of 32 -slider := equalizer.NewSlider(time.Second, time.Millisecond*100, 32) - +// a Slider with a one-second window size, a 100-millisecond sliding interval, +// and a capacity of 32 +slider := equalizer.NewSlider(time.Second, 100*time.Millisecond, 32) // non-blocking quota request -haveQuota := slider.Ask() - +haveQuota := slider.TryAcquire() // blocking call -slider.Take() +slider.Acquire(context.Background()) ``` ### Benchmarks -```sh -BenchmarkSliderShortWindow-16 123488035 9.67 ns/op 0 B/op 0 allocs/op -BenchmarkSliderLongerWindow-16 128023276 9.76 ns/op 0 B/op 0 allocs/op +```console +BenchmarkSlider_TryAcquire-16 293645348 4.033 ns/op 0 B/op 0 allocs/op +BenchmarkRateLimiter_Allow-16 9362382 127.4 ns/op 0 B/op 0 allocs/op ``` +* Compared to `rate.Limiter` from the [golang.org/x/time](https://pkg.go.dev/golang.org/x/time/rate) package. -## TokenBucket -`TokenBucket` rate limiter is based on the token bucket algorithm with a refill interval. -Implements the `Limiter` interface. +## Token Bucket +`TokenBucket` maintains a fixed number of tokens. Each token represents a request that can be processed. +When a request is made, the rate limiter checks to see if there are any available tokens. If there are, +the request is processed and one token is removed from the bucket. If there are no available tokens, +the request is blocked until a token becomes available. +Implements the `equalizer.Limiter` interface. -### Usage +### Usage Example ```go -// a TokenBucket with the capacity of 32 and 100 millis refill interval -tokenBucket := equalizer.NewTokenBucket(32, time.Millisecond*100) - +// a TokenBucket with the capacity of 32 and a 100-millisecond refill interval +tokenBucket := equalizer.NewTokenBucket(32, 100*time.Millisecond) // non-blocking quota request -haveQuota := tokenBucket.Ask() - +haveQuota := tokenBucket.TryAcquire() // blocking call -tokenBucket.Take() +tokenBucket.Acquire(context.Background()) ``` ### Benchmarks -```sh -BenchmarkTokenBucketDenseRefill-16 212631714 5.64 ns/op 0 B/op 0 allocs/op -BenchmarkTokenBucketSparseRefill-16 211491368 5.63 ns/op 0 B/op 0 allocs/op +```console +BenchmarkTokenBucket_TryAcquire-16 304653043 3.909 ns/op 0 B/op 0 allocs/op +BenchmarkRateLimiter_Allow-16 9362382 127.4 ns/op 0 B/op 0 allocs/op ``` +* Compared to `rate.Limiter` from the [golang.org/x/time](https://pkg.go.dev/golang.org/x/time/rate) package. ## License -Licensed under the MIT License. +Licensed under the [MIT](./LICENSE) License. diff --git a/bench_test.go b/bench_test.go deleted file mode 100644 index fb97484..0000000 --- a/bench_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package equalizer - -import ( - "testing" - "time" -) - -// go test -bench=. -benchmem -func BenchmarkEqualizerShortAskStep(b *testing.B) { - offset := NewStepOffset(96, 1) - eq := NewEqualizer(96, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Ask() - } -} - -func BenchmarkEqualizerShortAskRandom(b *testing.B) { - offset := NewRandomOffset(96) - eq := NewEqualizer(96, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Ask() - } -} - -func BenchmarkEqualizerShortNotify(b *testing.B) { - offset := NewStepOffset(96, 1) - eq := NewEqualizer(96, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Notify(false, 1) - } -} - -func BenchmarkEqualizerLongAskStep(b *testing.B) { - offset := NewStepOffset(1048576, 1) - eq := NewEqualizer(1048576, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Ask() - } -} - -func BenchmarkEqualizerLongAskRandom(b *testing.B) { - offset := NewRandomOffset(1048576) - eq := NewEqualizer(1048576, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Ask() - } -} - -func BenchmarkEqualizerLongNotify(b *testing.B) { - offset := NewStepOffset(1048576, 1) - eq := NewEqualizer(1048576, 16, offset) - b.ResetTimer() - for i := 0; i < b.N; i++ { - eq.Notify(false, 1) - } -} - -func BenchmarkSliderShortWindow(b *testing.B) { - slider := NewSlider(time.Millisecond*100, time.Millisecond*10, 32) - for i := 0; i < b.N; i++ { - slider.Ask() - } -} - -func BenchmarkSliderLongerWindow(b *testing.B) { - slider := NewSlider(time.Second, time.Millisecond*100, 32) - for i := 0; i < b.N; i++ { - slider.Ask() - } -} - -func BenchmarkTokenBucketDenseRefill(b *testing.B) { - tokenBucket := NewTokenBucket(32, time.Millisecond*10) - for i := 0; i < b.N; i++ { - tokenBucket.Ask() - } -} - -func BenchmarkTokenBucketSparseRefill(b *testing.B) { - tokenBucket := NewTokenBucket(32, time.Second) - for i := 0; i < b.N; i++ { - tokenBucket.Ask() - } -} diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go new file mode 100644 index 0000000..662237b --- /dev/null +++ b/benchmarks/benchmark_test.go @@ -0,0 +1,88 @@ +package benchmarks + +import ( + "testing" + "time" + + "github.com/reugn/equalizer" + "golang.org/x/time/rate" +) + +// go test -bench=. -benchmem +func BenchmarkEqualizer_ShortTryAcquireStep(b *testing.B) { + offset := equalizer.NewStepOffset(96, 1) + eq, _ := equalizer.NewEqualizer(96, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.TryAcquire() + } +} + +func BenchmarkEqualizer_ShortTryAcquireRandom(b *testing.B) { + offset := equalizer.NewRandomOffset(96) + eq, _ := equalizer.NewEqualizer(96, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.TryAcquire() + } +} + +func BenchmarkEqualizer_ShortNotify(b *testing.B) { + offset := equalizer.NewStepOffset(96, 1) + eq, _ := equalizer.NewEqualizer(96, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.Failure(1) + } +} + +func BenchmarkEqualizer_LongTryAcquireStep(b *testing.B) { + offset := equalizer.NewStepOffset(1048576, 1) + eq, _ := equalizer.NewEqualizer(1048576, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.TryAcquire() + } +} + +func BenchmarkEqualizer_LongTryAcquireRandom(b *testing.B) { + offset := equalizer.NewRandomOffset(1048576) + eq, _ := equalizer.NewEqualizer(1048576, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.TryAcquire() + } +} + +func BenchmarkEqualizer_LongNotify(b *testing.B) { + offset := equalizer.NewStepOffset(1048576, 1) + eq, _ := equalizer.NewEqualizer(1048576, 16, offset) + b.ResetTimer() + for i := 0; i < b.N; i++ { + eq.Failure(1) + } +} + +func BenchmarkSlider_TryAcquire(b *testing.B) { + slider, _ := equalizer.NewSlider(time.Second, time.Millisecond*100, 32) + b.ResetTimer() + for i := 0; i < b.N; i++ { + slider.TryAcquire() + } +} + +func BenchmarkTokenBucket_TryAcquire(b *testing.B) { + tokenBucket, _ := equalizer.NewTokenBucket(32, time.Second) + b.ResetTimer() + for i := 0; i < b.N; i++ { + tokenBucket.TryAcquire() + } +} + +func BenchmarkRateLimiter_Allow(b *testing.B) { + limiter := rate.NewLimiter(rate.Limit(32), 32) + b.ResetTimer() + for i := 0; i < b.N; i++ { + limiter.Allow() + } +} diff --git a/benchmarks/go.mod b/benchmarks/go.mod new file mode 100644 index 0000000..ce058e1 --- /dev/null +++ b/benchmarks/go.mod @@ -0,0 +1,10 @@ +module github.com/reugn/equalizer/benchmarks + +go 1.19 + +require ( + github.com/reugn/equalizer v0.0.0 + golang.org/x/time v0.5.0 +) + +replace github.com/reugn/equalizer => ../ diff --git a/benchmarks/go.sum b/benchmarks/go.sum new file mode 100644 index 0000000..a2652c5 --- /dev/null +++ b/benchmarks/go.sum @@ -0,0 +1,2 @@ +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/equalizer.go b/equalizer.go index ee6a4f3..a70d4d1 100644 --- a/equalizer.go +++ b/equalizer.go @@ -1,117 +1,123 @@ package equalizer import ( + "fmt" "math/big" + "strings" "sync" ) -// An Equalizer represents a bitmap based adaptive rate limiter. -// The quota management algorithm is based on a Round-robin bitmap tape with a moving head. +// An Equalizer represents an adaptive rate limiter based on a bit array. // -// An Equalizer is safe for use by multiple goroutines simultaneously. -// -// Use Ask function to request quota. -// Use Notify to update the bitmap tape with previous requests' statuses. +// The Equalizer uses a round-robin bit array with a moving head to manage +// quotas. +// The quota management algorithm is simple and works in the following way. +// To request a permit in a non-blocking manner use the TryAcquire method. +// The Equalizer will locate the appropriate position on the tape using the +// offset manager and return the value, denoting whether the request is allowed +// or not. To update the tape state, a notification method (Success or Failure) +// should be invoked based on the operation status. +// The Reset and Purge methods allow for immediate transition of the limiter +// state to whether permissive or restrictive. // +// An Equalizer is safe for use by multiple goroutines simultaneously. type Equalizer struct { sync.RWMutex - - // tape is the underlying bitmap tape + // tape is the underlying bit array tape *big.Int - - // mask is the positive bits mask + // seed is the initial state of the bit array tape + seed *big.Int + // mask is the positive bitmask mask *big.Int - // offset is the next index offset manager offset Offset - - // size is the bitmap tape size - size int - - // reserved is the number of reserved positive bits - reserved int + // notifyHead is the moving pointer for notifications + notifyHead int + // adjustable is the number of unmasked bits + adjustable int } -// NewEqualizer allocates and returns a new Equalizer rate limiter. -// -// len is the size of the bitmap. -// reserved is the number of reserved positive bits. -// offset is the equalizer.Offset strategy instance. -func NewEqualizer(size int, reserved int, offset Offset) *Equalizer { - // init the bitmap tape - var tape big.Int - fill(&tape, 0, size, 1) +// NewEqualizer instantiates and returns a new Equalizer rate limiter, where +// size is the length of the bit array, reserved is the number of reserved +// positive bits and offset is an instance of the equalizer.Offset strategy. +func NewEqualizer(size, reserved int, offset Offset) (*Equalizer, error) { + if offset == nil { + return nil, fmt.Errorf("offset is nil") + } + if size < 1 { + return nil, fmt.Errorf("nonpositive size: %d", size) + } + if reserved < 1 { + return nil, fmt.Errorf("nonpositive reserved: %d", reserved) + } + if reserved > size { + return nil, fmt.Errorf("reserved must not exceed size") + } + // init the seed bit array + var seed big.Int + seed.SetString(strings.Repeat("1", size), 2) - // init the positive bits mask + // init the positive bitmask var mask big.Int - fill(&mask, 0, reserved, 1) + mask.SetString(strings.Repeat("1", reserved), 2) mask.Lsh(&mask, uint(size-reserved)) + // init the operational bit array tape + var tape big.Int + tape.Set(&seed) + return &Equalizer{ - tape: &tape, - mask: &mask, - offset: offset, - size: size, - reserved: reserved, - } + tape: &tape, + seed: &seed, + mask: &mask, + offset: offset, + adjustable: size - reserved, + }, nil } -// Ask moves the tape head to the next index and returns the value. -func (eq *Equalizer) Ask() bool { +// TryAcquire moves the tape head to the next index and returns the value. +func (eq *Equalizer) TryAcquire() bool { eq.RLock() defer eq.RUnlock() - head := eq.next() + head := eq.offset.NextIndex() return eq.tape.Bit(head) > 0 } -// Notify shifts the tape left by n bits and appends the specified value n times. -// Use n > 1 to update the same value in a bulk to gain performance. -// value is the boolean representation of a bit value (false -> 0, true -> 1). -// n is the number of bits to set. -func (eq *Equalizer) Notify(value bool, n uint) { - eq.Lock() - defer eq.Unlock() +// Success notifies the equalizer with n successful operations. +func (eq *Equalizer) Success(n int) { + eq.notify(n, 1) +} - bit := boolToUint(value) - var shift big.Int - fill(&shift, 0, int(n), bit) - eq.tape.Lsh(eq.tape, n) - fill(eq.tape, eq.size, eq.size+int(n), 0) - eq.tape.Or(eq.tape, &shift).Or(eq.tape, eq.mask) +// Failure notifies the equalizer with n failed operations. +func (eq *Equalizer) Failure(n int) { + eq.notify(n, 0) } -// ResetPositive resets the tape with positive bits. -func (eq *Equalizer) ResetPositive() { +// notify advances the notification head by n bits, assigning the given value. +func (eq *Equalizer) notify(n int, value uint) { eq.Lock() defer eq.Unlock() - fill(eq.tape, 0, eq.size, 1) + for i := 0; i < n; i++ { + pos := eq.notifyHead % eq.adjustable + eq.tape.SetBit(eq.tape, pos, value) + eq.notifyHead++ + } } -// Reset resets the tape to initial state. +// Reset resets the tape to its initial state. func (eq *Equalizer) Reset() { eq.Lock() defer eq.Unlock() - fill(eq.tape, 0, eq.reserved, 1) - fill(eq.tape, eq.reserved, eq.size, 0) + eq.tape.Set(eq.seed) } -// next returns the next index of the tape head. -func (eq *Equalizer) next() int { - return eq.offset.NextIndex() -} - -func fill(n *big.Int, from int, to int, bit uint) { - for i := from; i < to; i++ { - n.SetBit(n, i, bit) - } -} +// Purge blanks out the tape to the positive bitmask state. +func (eq *Equalizer) Purge() { + eq.Lock() + defer eq.Unlock() -func boolToUint(b bool) uint { - if b { - return 1 - } - return 0 + eq.tape.Set(eq.mask) } diff --git a/equalizer_test.go b/equalizer_test.go index 5452c56..d15a2b3 100644 --- a/equalizer_test.go +++ b/equalizer_test.go @@ -1,39 +1,69 @@ package equalizer import ( + "strings" "testing" + + "github.com/reugn/equalizer/internal/assert" ) func TestEqualizer(t *testing.T) { - offset := NewStepOffset(96, 15) - eq := NewEqualizer(96, 16, offset) + offset := NewStepOffset(32, 5) + eq, err := NewEqualizer(32, 8, offset) + assert.IsNil(t, err) + + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 32)) + assert.Equal(t, eq.mask.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 24)) + assert.Equal(t, eq.mask.Bit(0), uint(0)) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") - assertEqual(t, eq.mask.Text(2), "111111111111111100000000000000000000000000000000000000000000000000000000000000000000000000000000") + eq.Failure(16) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 16)+strings.Repeat("0", 16)) - eq.Notify(false, 50) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111100000000000000000000000000000000000000000000000000") + assert.Equal(t, eq.TryAcquire(), false) + assert.Equal(t, eq.TryAcquire(), false) + assert.Equal(t, eq.TryAcquire(), false) + assert.Equal(t, eq.TryAcquire(), true) - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), false) - assertEqual(t, eq.Ask(), true) + eq.Success(10) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 16)+strings.Repeat("0", 14)+ + strings.Repeat("1", 2)) - eq.Notify(true, 10) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111000000000000000000000000000000000000000000000000001111111111") + eq.Failure(1) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 16)+strings.Repeat("0", 14)+ + strings.Repeat("1", 2)) - eq.Notify(false, 1) - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111110000000000000000000000000000000000000000000000000011111111110") + eq.Success(2) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 16)+strings.Repeat("0", 11)+ + "110"+strings.Repeat("1", 2)) - eq.ResetPositive() - assertEqual(t, eq.tape.Text(2), "111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") + eq.Failure(32) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 24)) eq.Reset() - assertEqual(t, eq.mask.Text(2), "111111111111111100000000000000000000000000000000000000000000000000000000000000000000000000000000") + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 32)) + + eq.Failure(-1) + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 32)) + + eq.Purge() + assert.Equal(t, eq.tape.Text(2), strings.Repeat("1", 8)+strings.Repeat("0", 24)) } -func assertEqual(t *testing.T, a interface{}, b interface{}) { - if a != b { - t.Fatalf("%s != %s", a, b) - } +func TestEqualizer_Errors(t *testing.T) { + eq, err := NewEqualizer(32, 8, nil) + assert.ErrorContains(t, err, "offset is nil") + assert.IsNil(t, eq) + + offset := NewStepOffset(32, 5) + eq, err = NewEqualizer(-1, 8, offset) + assert.ErrorContains(t, err, "nonpositive size") + assert.IsNil(t, eq) + + eq, err = NewEqualizer(32, -1, offset) + assert.ErrorContains(t, err, "nonpositive reserved") + assert.IsNil(t, eq) + + eq, err = NewEqualizer(8, 32, offset) + assert.ErrorContains(t, err, "reserved must not exceed size") + assert.IsNil(t, eq) } diff --git a/go.mod b/go.mod index 746bb3d..20a060b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ module github.com/reugn/equalizer -go 1.15 +go 1.19 diff --git a/internal/assert/assertions.go b/internal/assert/assertions.go new file mode 100644 index 0000000..2676d31 --- /dev/null +++ b/internal/assert/assertions.go @@ -0,0 +1,70 @@ +package assert + +import ( + "errors" + "reflect" + "strings" + "testing" +) + +// Equal verifies equality of two objects. +func Equal[T any](t *testing.T, a, b T) { + if !reflect.DeepEqual(a, b) { + t.Helper() + t.Fatalf("%v != %v", a, b) + } +} + +// NotEqual verifies objects are not equal. +func NotEqual[T any](t *testing.T, a T, b T) { + if reflect.DeepEqual(a, b) { + t.Helper() + t.Fatalf("%v == %v", a, b) + } +} + +// IsNil verifies that the object is nil. +func IsNil(t *testing.T, obj any) { + if obj != nil { + value := reflect.ValueOf(obj) + switch value.Kind() { + case reflect.Ptr, reflect.Map, reflect.Slice, + reflect.Interface, reflect.Func, reflect.Chan: + if value.IsNil() { + return + } + } + t.Helper() + t.Fatalf("%v is not nil", obj) + } +} + +// ErrorContains checks whether the given error contains the specified string. +func ErrorContains(t *testing.T, err error, str string) { + if err == nil { + t.Helper() + t.Fatalf("Error is nil") + } else if !strings.Contains(err.Error(), str) { + t.Helper() + t.Fatalf("Error does not contain string: %s", str) + } +} + +// ErrorIs checks whether any error in err's tree matches target. +func ErrorIs(t *testing.T, err error, target error) { + if !errors.Is(err, target) { + t.Helper() + t.Fatalf("Error type mismatch: %v != %v", err, target) + } +} + +// Panics checks whether the given function panics. +func Panics(t *testing.T, f func()) { + defer func() { + if r := recover(); r == nil { + t.Helper() + t.Fatalf("Function did not panic") + } + }() + f() +} diff --git a/internal/async/task.go b/internal/async/task.go new file mode 100644 index 0000000..68d1abd --- /dev/null +++ b/internal/async/task.go @@ -0,0 +1,45 @@ +package async + +import "time" + +// Callback represents a background task to run. +type Callback func() + +// Task orchestrates a recurring callback execution within an independent +// goroutine. +type Task struct { + interval time.Duration + stop chan struct{} +} + +// NewTask returns a new Task configured to execute periodically at the +// specified interval. +func NewTask(interval time.Duration) *Task { + return &Task{ + interval: interval, + stop: make(chan struct{}), + } +} + +// Run initiates a goroutine to execute the provided callback at +// the specified interval. +func (t *Task) Run(callback Callback) { + go func() { + ticker := time.NewTicker(t.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + callback() + case <-t.stop: + return + } + } + }() +} + +// Stop signals the termination of the worker goroutine by closing the +// stop channel. +func (t *Task) Stop() { + close(t.stop) +} diff --git a/internal/sweeper.go b/internal/sweeper.go deleted file mode 100644 index a192afd..0000000 --- a/internal/sweeper.go +++ /dev/null @@ -1,26 +0,0 @@ -package internal - -import "time" - -// Callback represents a background task to run. -type Callback func() - -// Sweeper verifies shutting down background processing. -type Sweeper struct { - Interval time.Duration - Stop chan interface{} -} - -// Run Sweeper with the specified callback. -func (s *Sweeper) Run(callback Callback) { - ticker := time.NewTicker(s.Interval) - for { - select { - case <-ticker.C: - callback() - case <-s.Stop: - ticker.Stop() - return - } - } -} diff --git a/limiter.go b/limiter.go index 24f6706..a960491 100644 --- a/limiter.go +++ b/limiter.go @@ -1,9 +1,27 @@ package equalizer +import "context" + +// A token represents a single unit of capacity for the rate limiter. type token struct{} // Limiter represents a rate limiter. +// +// Rate limiters control the rate at which requests are processed by allocating +// a certain number of tokens. Each token represents the ability to process a +// single request. When a request is made, the limiter checks if there are any +// available tokens. If there are, it deducts one token and allows the request +// to proceed. If there are no tokens available, the request is blocked until a +// token becomes available. +// +// By controlling the number of tokens available, rate limiters can ensure that +// requests are processed at a controlled rate, preventing overloading and +// ensuring fair access to resources. type Limiter interface { - Ask() bool - Take() + // Acquire blocks the calling goroutine until a token is acquired, the Context + // is canceled, or the wait time exceeds the Context's Deadline. + Acquire(ctx context.Context) error + // TryAcquire attempts to acquire a token without blocking. + // Returns true if a token was acquired, false if no tokens are available. + TryAcquire() bool } diff --git a/offset.go b/offset.go index eb49864..ba63bd3 100644 --- a/offset.go +++ b/offset.go @@ -5,42 +5,49 @@ import ( "sync/atomic" ) -// Offset is the Equalizer's tape offset manager interface. +// The Offset component is responsible for advancing the head position of the +// Equalizer tape after each request. Implementations of Offset must be thread-safe. type Offset interface { NextIndex() int } -// RandomOffset is the random based offset manager. +// RandomOffset is an offset manager that uses a random-based offset approach. type RandomOffset struct { Len int } -// NewRandomOffset allocates and returns a new RandomOffset. -// len is the bitmap length. +var _ Offset = (*RandomOffset)(nil) + +// NewRandomOffset returns a new RandomOffset, where len is the bitmap tape length. func NewRandomOffset(len int) *RandomOffset { - return &RandomOffset{len} + return &RandomOffset{Len: len} } -// NextIndex returns the next random index. +// NextIndex returns the next random index within a tape. func (ro *RandomOffset) NextIndex() int { return rand.Intn(ro.Len) } -// StepOffset is the step based offset manager. +// StepOffset is an offset manager that uses a fixed step approach. type StepOffset struct { - Len int - Step int64 - previousIndex int64 + Len int + Step int64 + prev atomic.Int64 } -// NewStepOffset allocates and returns a new StepOffset. -// len is the bitmap length. -// step is the offset from the previous index. -func NewStepOffset(len int, step int64) *StepOffset { - return &StepOffset{len, step, 0} +var _ Offset = (*StepOffset)(nil) + +// NewStepOffset allocates and returns a new StepOffset, where len is the length +// of the bitmap tape and step is the offset to be taken from the previous position. +func NewStepOffset(len, step int) *StepOffset { + return &StepOffset{ + Len: len, + Step: int64(step), + } } -// NextIndex returns the next index in the Round-robin way. +// NextIndex returns the next index in a round-robin fashion, +// utilizing the specified step value to advance along the tape. func (so *StepOffset) NextIndex() int { - return int(atomic.AddInt64(&so.previousIndex, so.Step)) % so.Len + return int(so.prev.Add(so.Step)) % so.Len } diff --git a/slider.go b/slider.go index f99f7d2..cebb26b 100644 --- a/slider.go +++ b/slider.go @@ -1,11 +1,13 @@ package equalizer import ( + "context" + "fmt" "runtime" "sync/atomic" "time" - "github.com/reugn/equalizer/internal" + "github.com/reugn/equalizer/internal/async" ) // A Slider represents a rate limiter which is based on a sliding window @@ -17,102 +19,100 @@ import ( // not keep the main Slider object from being garbage collected. When it is // garbage collected, the finalizer stops the background goroutine, after // which the underlying slider can be collected. -// type Slider struct { slider *slider } +var _ Limiter = (*Slider)(nil) + type slider struct { window time.Duration slidingInterval time.Duration capacity int permits chan token issued chan int64 - windowStart int64 - sweeper *internal.Sweeper + windowStart atomic.Int64 + windowShifter *async.Task } -// NewSlider allocates and returns a new Slider rate limiter. -// -// window is the fixed duration of the sliding window. -// slidingInterval controls how frequently a new sliding window is started. +// NewSlider allocates and returns a new Slider rate limiter, where +// window is the fixed duration of the sliding window, +// slidingInterval controls how frequently a new sliding window is started and // capacity is the quota limit for the window. -func NewSlider(window time.Duration, slidingInterval time.Duration, capacity int) *Slider { +func NewSlider(window, slidingInterval time.Duration, capacity int) (*Slider, error) { + if capacity < 1 { + return nil, fmt.Errorf("nonpositive capacity: %d", capacity) + } + if slidingInterval > window { + return nil, fmt.Errorf("slidingInterval must not exceed window") + } underlying := &slider{ window: window, slidingInterval: slidingInterval, capacity: capacity, permits: make(chan token, capacity), issued: make(chan int64, capacity), + windowShifter: async.NewTask(slidingInterval), } - underlying.initSlider() - + underlying.init() + // initialize a goroutine responsible for periodically moving the + // window forward + underlying.windowShifter.Run(underlying.slide) slider := &Slider{ slider: underlying, } + // the finalizer may run as soon as the slider becomes unreachable + runtime.SetFinalizer(slider, stopWindowShifter) - // start the sliding goroutine - goSlide(underlying, slidingInterval) - // the finalizer may run as soon as the slider becomes unreachable. - runtime.SetFinalizer(slider, stopSliderSweeper) - - return slider + return slider, nil } -// initSlider prefills the permits channel. -func (s *slider) initSlider() { +// init initializes the permits channel by populating it with the +// specified number of tokens. +func (s *slider) init() { for i := 0; i < s.capacity; i++ { s.permits <- token{} } } -// doSlide starts a new sliding window. -func (s *slider) doSlide() { - atomic.StoreInt64(&s.windowStart, nowNano()) - for ts := range s.issued { +// slide moves the sliding window forward. +func (s *slider) slide() { + now := time.Now().UnixNano() + s.windowStart.Store(now) + for issuedTime := range s.issued { s.permits <- token{} - if ts > s.windowStart { + if issuedTime >= now { break } } } -// Ask requires a permit. -// It is a non blocking call, returns true or false. -func (s *Slider) Ask() bool { +// Acquire blocks the calling goroutine until a token is acquired, the Context +// is canceled, or the wait time exceeds the Context's Deadline. +func (s *Slider) Acquire(ctx context.Context) error { + select { + case <-s.slider.permits: + s.slider.issued <- s.slider.windowStart.Load() + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// TryAcquire attempts to acquire a token without blocking. +// returns true if a token was acquired, false if no tokens are available. +func (s *Slider) TryAcquire() bool { select { case <-s.slider.permits: - s.slider.issued <- atomic.LoadInt64(&s.slider.windowStart) + s.slider.issued <- s.slider.windowStart.Load() return true default: return false } } -// Take blocks to get a permit. -func (s *Slider) Take() { - <-s.slider.permits - s.slider.issued <- atomic.LoadInt64(&s.slider.windowStart) -} - -func nowNano() int64 { - return time.Now().UTC().UnixNano() -} - -// stopSliderSweeper is the callback to stop the sliding goroutine. -func stopSliderSweeper(s *Slider) { - s.slider.sweeper.Stop <- struct{}{} +// stopWindowShifter is the callback function used to terminate the goroutine +// responsible for periodically moving the sliding window. +func stopWindowShifter(s *Slider) { + s.slider.windowShifter.Stop() } - -// goSlide starts the sliding goroutine. -func goSlide(slider *slider, slidingInterval time.Duration) { - sweeper := &internal.Sweeper{ - Interval: slidingInterval, - Stop: make(chan interface{}), - } - slider.sweeper = sweeper - go sweeper.Run(slider.doSlide) -} - -// Verify Slider satisfies the equalizer.Limiter interface. -var _ Limiter = (*Slider)(nil) diff --git a/slider_test.go b/slider_test.go index 3965333..5d27eae 100644 --- a/slider_test.go +++ b/slider_test.go @@ -1,25 +1,50 @@ package equalizer import ( + "context" "testing" "time" + + "github.com/reugn/equalizer/internal/assert" ) func TestSlider(t *testing.T) { - slider := NewSlider(time.Second, time.Millisecond*100, 32) + slider, err := NewSlider(time.Second, 100*time.Millisecond, 32) + assert.IsNil(t, err) var quota bool for i := 0; i < 32; i++ { - quota = slider.Ask() + quota = slider.TryAcquire() } - assertEqual(t, quota, true) + assert.Equal(t, quota, true) + + quota = slider.TryAcquire() + assert.Equal(t, quota, false) - quota = slider.Ask() - assertEqual(t, quota, false) + time.Sleep(1010 * time.Millisecond) - time.Sleep(time.Millisecond * 1010) + quota = slider.TryAcquire() + assert.Equal(t, quota, true) + + assert.IsNil(t, slider.Acquire(context.Background())) +} + +func TestSlider_Context(t *testing.T) { + slider, err := NewSlider(time.Second, 200*time.Millisecond, 1) + assert.IsNil(t, err) + assert.IsNil(t, slider.Acquire(context.Background())) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) + defer cancel() + err = slider.Acquire(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} - quota = slider.Ask() - assertEqual(t, quota, true) +func TestSlider_Errors(t *testing.T) { + slider, err := NewSlider(time.Second, 200*time.Millisecond, -1) + assert.ErrorContains(t, err, "nonpositive capacity") + assert.IsNil(t, slider) - slider.Take() + slider, err = NewSlider(100*time.Millisecond, time.Second, 32) + assert.ErrorContains(t, err, "slidingInterval must not exceed window") + assert.IsNil(t, slider) } diff --git a/token_bucket.go b/token_bucket.go index 06a351d..9ce9ea5 100644 --- a/token_bucket.go +++ b/token_bucket.go @@ -1,11 +1,12 @@ package equalizer import ( + "context" + "fmt" "runtime" - "sync/atomic" "time" - "github.com/reugn/equalizer/internal" + "github.com/reugn/equalizer/internal/async" ) // A TokenBucket represents a rate limiter based on a custom implementation of the @@ -18,45 +19,46 @@ import ( // not keep the main TokenBucket object from being garbage collected. When it is // garbage collected, the finalizer stops the background goroutine, after // which the underlying tokenBucket can be collected. -// type TokenBucket struct { t *tokenBucket } +var _ Limiter = (*TokenBucket)(nil) + type tokenBucket struct { - capacity int32 + capacity int refillInterval time.Duration permits chan token - issued int32 - sweeper *internal.Sweeper + tokenSupplier *async.Task } -// NewTokenBucket allocates and returns a new TokenBucket rate limiter. -// -// capacity is the token bucket capacity. -// refillInterval is the bucket refill interval. -func NewTokenBucket(capacity int32, refillInterval time.Duration) *TokenBucket { +// NewTokenBucket allocates and returns a new TokenBucket rate limiter, where +// capacity is the capacity of the token bucket and refillInterval is the token +// bucket refill interval. +func NewTokenBucket(capacity int, refillInterval time.Duration) (*TokenBucket, error) { + if capacity < 1 { + return nil, fmt.Errorf("nonpositive capacity: %d", capacity) + } underlying := &tokenBucket{ capacity: capacity, refillInterval: refillInterval, permits: make(chan token, capacity), - issued: 0, + tokenSupplier: async.NewTask(refillInterval), } - underlying.fillTokenBucket(int(capacity)) - + underlying.fillTokenBucket(capacity) + // initialize a goroutine responsible for periodically refilling the + // token bucket + underlying.tokenSupplier.Run(underlying.refill) tokenBucket := &TokenBucket{ t: underlying, } + // the finalizer may run as soon as the tokenBucket becomes unreachable + runtime.SetFinalizer(tokenBucket, stopTokenSupplier) - // start the refilling goroutine - goRefill(underlying, refillInterval) - // the finalizer may run as soon as the tokenBucket becomes unreachable. - runtime.SetFinalizer(tokenBucket, stopSweeper) - - return tokenBucket + return tokenBucket, nil } -// fillTokenBucket fills the permits channel. +// fillTokenBucket fills up the permits channel with the capacity number of tokens. func (tb *tokenBucket) fillTokenBucket(capacity int) { for i := 0; i < capacity; i++ { tb.permits <- token{} @@ -65,42 +67,34 @@ func (tb *tokenBucket) fillTokenBucket(capacity int) { // refill refills the token bucket. func (tb *tokenBucket) refill() { - issued := atomic.SwapInt32(&tb.issued, 0) - tb.fillTokenBucket(int(issued)) + issued := tb.capacity - len(tb.permits) + tb.fillTokenBucket(issued) } -// Ask requires a permit. -// It is a non blocking call, returns true or false. -func (tb *TokenBucket) Ask() bool { +// Acquire blocks the calling goroutine until a token is acquired, the Context +// is canceled, or the wait time exceeds the Context's Deadline. +func (tb *TokenBucket) Acquire(ctx context.Context) error { + select { + case <-tb.t.permits: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// TryAcquire attempts to acquire a token without blocking. +// Returns true if a token was acquired, false if no tokens are available. +func (tb *TokenBucket) TryAcquire() bool { select { case <-tb.t.permits: - atomic.AddInt32(&tb.t.issued, 1) return true default: return false } } -// Take blocks to get a permit. -func (tb *TokenBucket) Take() { - <-tb.t.permits - atomic.AddInt32(&tb.t.issued, 1) +// stopTokenSupplier is the callback function used to terminate the +// goroutine responsible for periodically refilling the token bucket. +func stopTokenSupplier(t *TokenBucket) { + t.t.tokenSupplier.Stop() } - -// stopSweeper is the callback to stop the refilling goroutine. -func stopSweeper(t *TokenBucket) { - t.t.sweeper.Stop <- struct{}{} -} - -// goRefill starts the refilling goroutine. -func goRefill(t *tokenBucket, refillInterval time.Duration) { - sweeper := &internal.Sweeper{ - Interval: refillInterval, - Stop: make(chan interface{}), - } - t.sweeper = sweeper - go sweeper.Run(t.refill) -} - -// Verify TokenBucket satisfies the equalizer.Limiter interface. -var _ Limiter = (*TokenBucket)(nil) diff --git a/token_bucket_test.go b/token_bucket_test.go index ad9eec8..eb9df66 100644 --- a/token_bucket_test.go +++ b/token_bucket_test.go @@ -1,25 +1,46 @@ package equalizer import ( + "context" "testing" "time" + + "github.com/reugn/equalizer/internal/assert" ) func TestTokenBucket(t *testing.T) { - tokenBucket := NewTokenBucket(32, time.Millisecond*100) + tokenBucket, err := NewTokenBucket(32, 100*time.Millisecond) + assert.IsNil(t, err) var quota bool for i := 0; i < 32; i++ { - quota = tokenBucket.Ask() + quota = tokenBucket.TryAcquire() } - assertEqual(t, quota, true) + assert.Equal(t, quota, true) + + quota = tokenBucket.TryAcquire() + assert.Equal(t, quota, false) + + time.Sleep(110 * time.Millisecond) - quota = tokenBucket.Ask() - assertEqual(t, quota, false) + quota = tokenBucket.TryAcquire() + assert.Equal(t, quota, true) - time.Sleep(time.Millisecond * 110) + assert.IsNil(t, tokenBucket.Acquire(context.Background())) +} + +func TestTokenBucket_Context(t *testing.T) { + tokenBucket, err := NewTokenBucket(1, time.Second) + assert.IsNil(t, err) + assert.IsNil(t, tokenBucket.Acquire(context.Background())) - quota = tokenBucket.Ask() - assertEqual(t, quota, true) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond)) + defer cancel() + err = tokenBucket.Acquire(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} - tokenBucket.Take() +func TestTokenBucket_Errors(t *testing.T) { + tokenBucket, err := NewTokenBucket(-1, time.Second) + assert.ErrorContains(t, err, "nonpositive capacity") + assert.IsNil(t, tokenBucket) }