-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathkettle.go
253 lines (199 loc) · 5.45 KB
/
kettle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package kettle
import (
"fmt"
"log"
"os"
"sync/atomic"
"time"
"github.com/fatih/color"
"github.com/go-redsync/redsync"
"github.com/gofrs/uuid/v5"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
)
var (
red = color.New(color.FgRed).SprintFunc()
green = color.New(color.FgGreen).SprintFunc()
)
// DistLocker abstracts a distributed locker.
type DistLocker interface {
Lock() error
Unlock() (bool, error)
}
// KettleOption configures Kettle.
type KettleOption interface {
Apply(*Kettle)
}
type withName string
// Apply applies a name to a Kettle instance.
func (w withName) Apply(o *Kettle) { o.name = string(w) }
// WithName configures Kettle instance's name.
func WithName(v string) KettleOption { return withName(v) }
type withVerbose bool
// Apply applies a verbosity value to a Kettle instance.
func (w withVerbose) Apply(o *Kettle) { o.verbose = bool(w) }
// WithVerbose configures a Kettle instance's log verbosity.
func WithVerbose(v bool) KettleOption { return withVerbose(v) }
type withDistLocker struct{ dl DistLocker }
// Apply applies a distributed locker to a Kettle instance.
func (w withDistLocker) Apply(o *Kettle) { o.lock = w.dl }
// WithDistLocker configures a Kettle instance's DistLocker.
func WithDistLocker(v DistLocker) KettleOption { return withDistLocker{v} }
type withTickTime int64
// Apply applies a tick time interval value to a Kettle instance.
func (w withTickTime) Apply(o *Kettle) { o.tickTime = int64(w) }
// WithTickTime configures a Kettle instance's tick timer in seconds.
func WithTickTime(v int64) KettleOption { return withTickTime(v) }
// Kettle provides functions that abstract the master election of a group of workers
// at a given interval time.
type Kettle struct {
name string
verbose bool
pool *redis.Pool
lock DistLocker
master int32 // 1 if we are master, otherwise, 0
hostname string
startInput *StartInput // copy of StartInput
masterQuit chan error // signal master set to quit
masterDone chan error // master termination done
tickTime int64
}
// Name returns the instance's name.
func (k Kettle) Name() string { return k.name }
// IsVerbose returns the verbosity setting.
func (k Kettle) IsVerbose() bool { return k.verbose }
// IsMaster returns master status.
func (k Kettle) IsMaster() bool { return k.isMaster() }
// Pool returns the configured Redis connection pool.
func (k Kettle) Pool() *redis.Pool { return k.pool }
func (k Kettle) info(v ...interface{}) {
if !k.verbose {
return
}
m := fmt.Sprintln(v...)
log.Printf("%s %s", green("[info]"), m)
}
func (k Kettle) infof(format string, v ...interface{}) {
if !k.verbose {
return
}
m := fmt.Sprintf(format, v...)
log.Printf("%s %s", green("[info]"), m)
}
func (k Kettle) error(v ...interface{}) {
if !k.verbose {
return
}
m := fmt.Sprintln(v...)
log.Printf("%s %s", red("[error]"), m)
}
func (k Kettle) errorf(format string, v ...interface{}) {
if !k.verbose {
return
}
m := fmt.Sprintf(format, v...)
log.Printf("%s %s", red("[error]"), m)
}
func (k Kettle) fatal(v ...interface{}) {
k.error(v...)
os.Exit(1)
}
func (k Kettle) fatalf(format string, v ...interface{}) {
k.errorf(format, v...)
os.Exit(1)
}
func (k Kettle) isMaster() bool {
if atomic.LoadInt32(&k.master) == 1 {
return true
} else {
return false
}
}
func (k *Kettle) setMaster() {
if err := k.lock.Lock(); err != nil {
atomic.StoreInt32(&k.master, 0)
return
}
k.infof("[%v] %v set to master", k.name, k.hostname)
atomic.StoreInt32(&k.master, 1)
}
func (k *Kettle) doMaster() {
masterTicker := time.NewTicker(time.Second * time.Duration(k.tickTime))
f := func() {
// Attempt to be master here.
k.setMaster()
// Only if we are master.
if k.isMaster() {
if k.startInput.Master != nil {
k.startInput.Master(k.startInput.MasterCtx)
}
}
}
f() // first invoke before tick
go func() {
for {
select {
case <-masterTicker.C:
f() // succeeding ticks
case <-k.masterQuit:
k.masterDone <- nil
return
}
}
}()
}
// StartInput configures the Start function.
type StartInput struct {
Master func(ctx interface{}) error // function to call every time we are master
MasterCtx interface{} // callback function parameter
Quit chan error // signal for us to terminate
Done chan error // report that we are done
}
// Start starts Kettle's main function.
func (k *Kettle) Start(in *StartInput) error {
if in == nil {
return errors.Errorf("input cannot be nil")
}
k.startInput = in
hostname, _ := os.Hostname()
id, _ := uuid.NewV4()
hostname = hostname + fmt.Sprintf("__%v", id)
k.hostname = hostname
k.masterQuit = make(chan error, 1)
k.masterDone = make(chan error, 1)
go func() {
<-in.Quit
k.infof("[%v] requested to terminate", k.name)
// Attempt to gracefully terminate master.
k.masterQuit <- nil
<-k.masterDone
k.infof("[%v] terminate complete", k.name)
in.Done <- nil
}()
go k.doMaster()
return nil
}
// New returns an instance of Kettle.
func New(opts ...KettleOption) (*Kettle, error) {
k := &Kettle{
name: "kettle",
tickTime: 30,
}
for _, opt := range opts {
opt.Apply(k)
}
if k.lock == nil {
pool, err := NewRedisPool()
if err != nil {
return nil, err
}
k.pool = pool
pools := []redsync.Pool{pool}
rs := redsync.New(pools)
k.lock = rs.NewMutex(
fmt.Sprintf("%v-distlocker", k.name),
redsync.SetExpiry(time.Second*time.Duration(k.tickTime)),
)
}
return k, nil
}