-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathworkers.go
490 lines (425 loc) · 11 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
package xsync
import (
container "container/list"
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/gobwas/xsync/internal/buildtags"
)
// WorkerContext represents worker goroutine context.
type WorkerContext struct {
context.Context
id uint32
}
// ID returns worker identifier within its group.
// Returned id is always an integer that is less than worker group size.
//
// NOTE: ID might be reused after worker exit as idle.
func (c *WorkerContext) ID() uint32 {
return c.id
}
// Task is the interface that holds task implementaion.
type Task interface {
// Exec executes the task.
// Given context holds the worker goroutine related info. Context might be
// canceled when worker group is closing.
Exec(*WorkerContext)
}
// TaskFunc is an adapter to allow the use of ordinary functions as Task.
type TaskFunc func(*WorkerContext)
// Exec implements Task.
func (f TaskFunc) Exec(ctx *WorkerContext) {
f(ctx)
}
// WorkerGroup contains options and logic of managing worker goroutines and
// sharing work between them.
type WorkerGroup struct {
// QueueSize specifies the size of the internal tasks queue. Note that
// workers fetch tasks from the queue in accordance with task priority (if
// any was given).
//
// The greater queue size the more tasks with high priority will be
// executed at first. The less queue size, the less difference in execution
// order between tasks with different priorities.
//
// Note that FetchSize field also affects workers behaviour.
QueueSize int
// FetchSize specifies how many tasks will be pulled from the queue per
// each scheduling cycle.
//
// The smaller FetchSize the higher starvation rate for the low priority
// tasks. Contrariwise, when FetchSize is equal to QueueSize, then all
// previously scheduled tasks will be fetched from the queue; that is,
// queue will be drained.
//
// FetchSize must not be greater than QueueSize.
FetchSize int
// SizeLimit specifies the capacity of the worker group.
// If SizeLimit is zero then worker group will contain one worker.
SizeLimit int
// IdleLimit specifies the maximum number of idle workers.
// When set, IdleTimeout must also be set.
// If IdleLimit is zero then no idle limit is used.
IdleLimit int
// IdleTimeout specifies the duration after which worker is considered
// idle.
IdleTimeout time.Duration
// OnStart is an optional callback that will be called right after worker
// goroutine has started.
OnStart func(*WorkerContext)
// OnComplete is an optional callback that will be called right before
// worker goroutine complete.
OnComplete func(*WorkerContext)
initOnce sync.Once
closeOnce sync.Once
start time.Time // Used for monotonic clocks.
sem chan struct{}
work chan Task
manageDone chan struct{}
ctx context.Context
cancel context.CancelFunc
queue queue
drain uint32
mu sync.RWMutex
// workers is a list of allocated workers.
// Note that it may contain workers that already stopped.
workers *container.List
id idPool
// These hooks are called only if debug buildtag passed.
hookFlushScheduled func()
}
func (w *WorkerGroup) init() {
w.initOnce.Do(func() {
w.start = time.Now()
w.queue.init(max(w.QueueSize, 1))
w.manageDone = make(chan struct{})
w.work = make(chan Task)
w.sem = make(chan struct{}, max(w.SizeLimit, 1))
w.ctx, w.cancel = context.WithCancel(context.Background())
w.workers = container.New()
// Start task manager goroutine.
// It receives tasks from the queue and puts them into the work
// channel starting new workers if needed and possible.
go w.manage()
})
}
func (w *WorkerGroup) manage() {
defer close(w.manageDone)
// Prepare tasks buffer where tasks would be moved to from the queue.
var (
full = make([]Task, max(w.QueueSize, 1))
part = full[:max(w.FetchSize, 1)]
)
// We are trying to receive as many tasks from queue as possible to reduce
// number of synchronizations with queue. That is, lock w.queue's mutex
// less times.
//
// This behavior also helps a bit to prevent tasks starvation, when lower
// priority tasks executed less frequently due to the high pressure of
// tasks with higher priority (this is not works when FetchSize is
// specified).
for {
var tasks []Task
if atomic.CompareAndSwapUint32(&w.drain, 1, 0) {
// Need to fetch all the tasks from queue.
tasks = full
} else {
tasks = part
}
n, err := w.queue.recvTo(tasks, nil)
if err == ErrClosed {
// Close w.work to signal all started workers that no more work
// will be feeded.
close(w.work)
return
}
if err != nil {
panic(fmt.Sprintf("xsync: workers: unexpected error: %v", err))
}
for i := 0; i < n; i++ {
var direct chan<- Task
for {
select {
case direct <- tasks[i]:
case w.work <- tasks[i]:
default:
select {
case direct <- tasks[i]:
case w.work <- tasks[i]:
case w.sem <- struct{}{}:
worker := w.startWorker()
direct = worker.direct
continue
}
}
break
}
}
}
}
func (w *WorkerGroup) startWorker() *worker {
worker := &worker{
common: w.work,
direct: make(chan Task, 1),
done: make(chan struct{}),
}
// NOTE: Need to do slow path in caller's goroutine to get rid of spurious
// workers start. Moreover we do Gosched() at the end of the function to
// give the worker goroutine chance to get its processor.
defer runtime.Gosched()
w.mu.Lock()
el := w.workers.PushBack(worker)
n := w.workers.Len()
w.mu.Unlock()
// NOTE: w.workers can contain stopped but not yet removed workers; but its
// not a problem for counting such workers while making decision on idle
// timer below. We always have "non-preemtible" goroutines there, so if the
// workers count is greater or equal than IdleLimit it would be so even
// after removal of stopped worker from the list.
var idle time.Duration
if limit := w.IdleLimit; limit > 0 && n > limit {
idle = w.IdleTimeout
}
id := w.id.acquire()
ctx, cancel := context.WithCancel(w.ctx)
wctx := &WorkerContext{
Context: ctx,
id: id,
}
go func() {
if fn := w.OnStart; fn != nil {
fn(wctx)
}
worker.work(wctx, idle)
if fn := w.OnComplete; fn != nil {
fn(wctx)
}
cancel()
w.id.release(id)
w.mu.Lock()
if w.workers != nil {
w.workers.Remove(el)
}
w.mu.Unlock()
<-w.sem
}()
return worker
}
type worker struct {
common <-chan Task
direct chan Task
done chan struct{}
idle time.Timer
}
func (w *worker) work(ctx *WorkerContext, idle time.Duration) {
defer func() {
cork(ctx, w.direct)
close(w.done)
}()
var (
timer *time.Timer
timeout <-chan time.Time
)
if idle > 0 {
timer = time.NewTimer(idle)
timeout = timer.C
}
for {
var (
t Task
ok bool
)
select {
case t = <-w.direct:
case t, ok = <-w.common:
if !ok {
return
}
case <-timeout:
return
}
t.Exec(ctx)
if timer != nil {
if !timer.Stop() {
<-timeout
}
timer.Reset(idle)
}
}
}
// Close terminates all spawned goroutines.
// It returns when all goroutines and all tasks scheduled before are done.
func (w *WorkerGroup) Close() {
_ = w.close(nil)
}
func (w *WorkerGroup) Shutdown(ctx context.Context) error {
return w.close(ctx.Done())
}
func (w *WorkerGroup) close(cancel <-chan struct{}) (err error) {
w.init()
w.closeOnce.Do(func() {
// Close of queue leads to close of w.work, which in turn leads to
// worker exit. So we don't need additional channels like manageDone.
w.queue.close()
// Cancel the parent context of all workers.
w.cancel()
// Need to wait for manager goroutine exit, to not compete with it in
// accessing the workers list.
select {
case <-w.manageDone:
case <-cancel:
err = ErrCanceled
return
}
w.mu.Lock()
workers := w.workers
w.workers = nil
w.mu.Unlock()
for el := workers.Front(); el != nil; el = el.Next() {
w := el.Value.(*worker)
select {
case <-w.done:
case <-cancel:
err = ErrCanceled
return
}
}
})
return err
}
// Exec makes t to be executed in one of the running workers.
func (w *WorkerGroup) Exec(d Demand, t Task) error {
w.init()
return w.queue.send(d, t)
}
// Flush waits for completion of all task successfully scheduled before.
//
// Note that Flush() call leads to one-time full queue fetch inside group.
// That is, it affects the priority of execution if w.FetchSize was set (and
// acts like w.FetchSize was not set).
func (w *WorkerGroup) Flush(ctx context.Context) error {
w.init()
return w.wait(ctx.Done())
}
// Barrier waits for completion of all currently running tasks.
//
// That is, having two workers in the group and three tasks T1, T2 and T3
// scheduled and T1 and T2 executing, call to Barrier() will block until T1 and
// T2 are done. It doesn't guarantee that T3 is done as well. To be sure that
// all tasks in the queue are done, use Flush().
func (w *WorkerGroup) Barrier(ctx context.Context) error {
return w.barrier(ctx.Done())
}
func (w *WorkerGroup) monotonic() int64 {
return int64(time.Since(w.start))
}
func (w *WorkerGroup) wait(cancel <-chan struct{}) error {
ch := make(chan struct{}, 1)
// Schedule task with lowest priority to be sure that it will be executed
// after previously scheduled normal tasks.
err := w.queue.send(
Demand{
Priority: LowestPriority(w.monotonic()),
Cancel: cancel,
},
TaskFunc(func(*WorkerContext) {
ch <- struct{}{}
}),
)
if err != nil {
return err
}
// Signal manager goroutine that it must drain queue.
atomic.StoreUint32(&w.drain, 1)
if buildtags.Debug {
if hook := w.hookFlushScheduled; hook != nil {
hook()
}
}
select {
case <-ch:
case <-cancel:
return ErrCanceled
}
// Ensure all tasks before ours are done.
return w.barrier(cancel)
}
func (w *WorkerGroup) barrier(cancel <-chan struct{}) error {
ch := make(chan struct{}, 1)
n, err := w.broadcast(TaskFunc(func(*WorkerContext) {
select {
case ch <- struct{}{}:
case <-cancel:
}
}), cancel)
for err == nil && n > 0 {
select {
case <-ch:
n--
case <-cancel:
err = ErrCanceled
}
}
return err
}
func (w *WorkerGroup) broadcast(t Task, cancel <-chan struct{}) (n int, err error) {
w.mu.RLock()
defer w.mu.RUnlock()
for el := w.workers.Front(); el != nil; el = el.Next() {
w := el.Value.(*worker)
select {
case w.direct <- t:
n++
case <-w.done:
// That is fine. Less things to do.
case <-cancel:
err = ErrCanceled
return
}
}
return n, err
}
func cork(ctx *WorkerContext, ch chan Task) {
for i, c := 0, cap(ch); i != c; {
corking:
for i = 0; i < c; i++ {
select {
case ch <- nil:
default:
break corking
}
}
for j := 0; i != c && j < c; j++ {
task := <-ch
if task != nil {
task.Exec(ctx)
}
}
}
}
type idPool struct {
mu sync.Mutex
free []uint32
next uint32
}
func (p *idPool) acquire() (id uint32) {
p.mu.Lock()
defer p.mu.Unlock()
if n := len(p.free); n > 0 {
id = p.free[0]
p.free[0] = p.free[n-1]
p.free = p.free[:n-1]
} else {
id = p.next
p.next++
}
return id
}
func (p *idPool) release(id uint32) {
p.mu.Lock()
defer p.mu.Unlock()
p.free = append(p.free, id)
}