forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathretry_delay.go
146 lines (130 loc) · 4.73 KB
/
retry_delay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package eventsource
import (
"math"
"math/rand"
"sync"
"time"
)
// Encapsulation of configurable backoff/jitter behavior.
//
// - The system can either be in a "good" state or a "bad" state. The initial state is "bad"; the
// caller is responsible for indicating when it transitions to "good". When we ask for a new retry
// delay, that implies the state is now transitioning to "bad".
//
// - There is a configurable base delay, which can be changed at any time (if the SSE server sends
// us a "retry:" directive).
//
// - There are optional strategies for applying backoff and jitter to the delay.
//
// This object is meant to be used from a single goroutine once it's been created; its methods are
// not safe for concurrent use.
type retryDelayStrategy struct {
baseDelay time.Duration
backoff backoffStrategy
jitter jitterStrategy
resetInterval time.Duration
retryCount int
goodSince time.Time // nonzero only if the state is currently "good"
lock sync.Mutex
}
// Abstraction for backoff delay behavior.
type backoffStrategy interface {
applyBackoff(baseDelay time.Duration, retryCount int) time.Duration
}
// Abstraction for delay jitter behavior.
type jitterStrategy interface {
applyJitter(computedDelay time.Duration) time.Duration
}
type defaultBackoffStrategy struct {
maxDelay time.Duration
}
// Creates the default implementation of exponential backoff, which doubles the delay each time up to
// the specified maximum.
//
// If a resetInterval was specified for the retryDelayStrategy, and the system has been in a "good"
// state for at least that long, the delay is reset back to the base. This avoids perpetually increasing
// delays in a situation where failures are rare).
func newDefaultBackoff(maxDelay time.Duration) backoffStrategy {
return defaultBackoffStrategy{maxDelay}
}
func (s defaultBackoffStrategy) applyBackoff(baseDelay time.Duration, retryCount int) time.Duration {
d := math.Min(float64(baseDelay)*math.Pow(2, float64(retryCount)), float64(s.maxDelay))
return time.Duration(d)
}
type defaultJitterStrategy struct {
ratio float64
random *rand.Rand
}
// Creates the default implementation of jitter, which subtracts a pseudo-random amount from each delay.
// The ratio parameter should be greater than 0 and less than or equal to 1.0.
func newDefaultJitter(ratio float64, randSeed int64) jitterStrategy {
if randSeed <= 0 {
randSeed = time.Now().UnixNano()
}
if ratio > 1.0 {
ratio = 1.0
}
//nolint:gosec // This isn't a cryptographic use-case, weak RNG is acceptable
return &defaultJitterStrategy{ratio, rand.New(rand.NewSource(randSeed))}
}
func (s *defaultJitterStrategy) applyJitter(computedDelay time.Duration) time.Duration {
// retryCount doesn't matter here - it's included in the int
jitter := time.Duration(s.random.Int63n(int64(float64(computedDelay) * s.ratio)))
return computedDelay - jitter
}
// Creates a retryDelayStrategy.
func newRetryDelayStrategy(
baseDelay time.Duration,
resetInterval time.Duration,
backoff backoffStrategy,
jitter jitterStrategy,
) *retryDelayStrategy {
return &retryDelayStrategy{
baseDelay: baseDelay,
resetInterval: resetInterval,
backoff: backoff,
jitter: jitter,
}
}
// NextRetryDelay computes the next retry interval. This also sets the current state to "bad".
//
// Note that currentTime is passed as a parameter instead of computed by this function to guarantee predictable
// behavior in tests.
func (r *retryDelayStrategy) NextRetryDelay(currentTime time.Time) time.Duration {
r.lock.Lock()
defer r.lock.Unlock()
if !r.goodSince.IsZero() && r.resetInterval > 0 && (currentTime.Sub(r.goodSince) >= r.resetInterval) {
r.retryCount = 0
}
r.goodSince = time.Time{}
delay := r.baseDelay
if r.backoff != nil {
delay = r.backoff.applyBackoff(delay, r.retryCount)
}
r.retryCount++
if r.jitter != nil {
delay = r.jitter.applyJitter(delay)
}
return delay
}
// SetGoodSince marks the current state as "good" and records the time. See comments on the backoff type.
func (r *retryDelayStrategy) SetGoodSince(goodSince time.Time) {
r.lock.Lock()
r.goodSince = goodSince
r.lock.Unlock()
}
// SetBaseDelay changes the initial retry delay and resets the backoff (if any) so the next retry will use
// that value.
//
// This is used to implement the optional SSE behavior where the server sends a "retry:" command to
// set the base retry to a specific value. Note that we will still apply a jitter, if jitter is enabled,
// and subsequent retries will still increase exponentially.
func (r *retryDelayStrategy) SetBaseDelay(baseDelay time.Duration) {
r.lock.Lock()
r.baseDelay = baseDelay
r.retryCount = 0
r.lock.Unlock()
}
func (r *retryDelayStrategy) hasJitter() bool { //nolint:unused // used only in tests
return r.jitter != nil
}