-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlist.go
272 lines (235 loc) · 6.36 KB
/
list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package xsync
import (
"sync"
"sync/atomic"
"github.com/gobwas/xsync/internal/buildtags"
)
// list is a ticket-based prioritized notification list.
//
// Ticket-based means that a caller first calls Add() to receive a list ticket
// and then eventually calls Wait() with it. Receiving ticket gives ability to
// fix caller's position in the queue under its own synchronization and then
// call Wait() without.
//
// list maintains atomic counter which consist of two parts – first one is
// number of goroutines in transition from Add() to Wait() (pending count);
// second is a monotonically increasing ticked id.
// We need to provide that pending counter to be able to answer such question
// during Notify() processing: if the list's heap is empty now – does it mean
// that list is empty, or we just won the race with some goroutine in
// transition?
//
// For simplicity of logic and implementation list's Notify() does not wait for
// pending goroutines if there is at least one element in the heap. Such
// behavior can lead to not complete fairness – suppose that G1 is in the heap
// right now, and G2 is in the transition; G1 has much lower priority; G1 has
// ticket older than G2's; but G2 loose the race with G1's Wait() and some
// other's Notify(). From the one hand – we must wait for G2 and signal it
// instead of G1, but from the other hand – we do not know exactly about G2's
// priority until we receive it in the Wait(). That is, if G2 is lower than G1,
// we could delay G1's signaling without any reason.
type list struct {
mu sync.Mutex
heap wgHeap
wait dcounter // Atomic counter of [ pending uint32, waiters uint32 ].
// done is the number of signaled/canceled tickets.
// It may be changed atomically only under the mu lock.
// It is possible to atomically read it without taking the lock.
done uint32
inbox uint32
// NOTE: for complete fairness list may hold cond's sync.Locker and Lock()
// it before sending event into wg.ch.
// These hooks are called only if debug buildtag passed.
hookInsert func(*wg)
hookNotify func(*wg)
}
// Add returns list ticket without locking.
func (l *list) Add() (ticket uint32) {
// Increment both counters and use waiters value as a new ticket.
_, t := l.wait.Add(
1, // Number of tickets in transition (pending) from Add() to Wait().
1, // Ticket counter.
)
return t
}
// Wait blocks caller until notification received.
func (l *list) Wait(ticket uint32, d Demand) bool {
g := acquireG()
g.t = ticket
g.p = d.Priority
l.mu.Lock()
// Caller arrived with ticket. Decrement the pending part of a counter by
// one.
//
// NOTE: we decrementing counter only under the mutex.
// That is, received pending count may only grow until we release the
// mutex.
l.wait.Add(minusOne, 0)
if l.inbox > 0 {
l.inbox--
if buildtags.Debug {
if l.hookNotify != nil {
l.hookNotify(g)
}
}
l.mu.Unlock()
return true
}
// Insert ticket into the list and block until notification.
l.heap.Push(g)
if buildtags.Debug {
if hook := l.hookInsert; hook != nil {
hook(g)
}
}
l.mu.Unlock()
var cancel <-chan struct{}
if d.Cancel != nil {
cancel = d.Cancel
}
if cancel == nil {
<-g.c
releaseG(g)
return true
}
select {
case <-g.c:
releaseG(g)
return true
case <-cancel:
l.mu.Lock()
evicted := l.heap.Remove(g)
if evicted {
atomic.AddUint32(&l.done, 1)
}
l.mu.Unlock()
if !evicted {
// Ticket was already removed from the list. That is, it means that
// it was notified successfully and we loose the race on removing g
// from list with notification process. We must interpret this case
// like successful await, and not drop the notification.
return true
}
// G is successfully evicted from the list thus we can reuse it to
// reduce the pressure on GC and allocator.
releaseG(g)
return false
}
}
func (l *list) Notify() {
_, last := l.wait.Load()
if equalbits(last, atomic.LoadUint32(&l.done)) {
return
}
l.mu.Lock()
defer l.mu.Unlock()
// Re-check under the lock if we need to do something.
// Invariants:
// - pending can grow;
// - done can not change.
pending, last := l.wait.Load()
done := atomic.LoadUint32(&l.done)
if equalbits(last, done) {
return
}
g := l.heap.Pop()
if g == nil && pending > l.inbox {
// No landed tickets yet.
// Increase the inbox counter to let them grab it immediately.
atomic.AddUint32(&l.done, 1)
l.inbox++
return
}
if g == nil && pending == 0 {
panic("xsync: inconsistent list state")
}
if buildtags.Debug {
if l.hookNotify != nil {
l.hookNotify(g)
}
}
g.notify()
atomic.AddUint32(&l.done, 1)
}
// TODO: test notify all while there are pending waiters – land pendings after
// NotifyAll() and call NotifyOne().
//
func (l *list) NotifyAll() {
_, last := l.wait.Load()
if equalbits(last, atomic.LoadUint32(&l.done)) {
return
}
l.mu.Lock()
// Re-check under the lock if we need to do something.
// Invariants:
// - pending can grow;
// - done can not change.
pending, last := l.wait.Load()
done := atomic.LoadUint32(&l.done)
if equalbits(last, done) {
l.mu.Unlock()
return
}
prev := l.inbox
l.inbox = pending
h := l.heap
l.heap = wgHeap{}
atomic.AddUint32(&l.done, uint32(h.Size())+(l.inbox-prev))
l.mu.Unlock()
for _, g := range h.data {
if buildtags.Debug {
if l.hookNotify != nil {
l.hookNotify(g)
}
}
g.notify()
}
}
var gp sync.Pool
type wg struct {
c chan struct{}
t uint32
p Priority
// pos is the position within the list's heap.
// It MUST be populated with h.mu held.
pos int
}
func (g *wg) less(b *wg) bool {
return compare(g, b) < 0
}
func compare(g1, g2 *wg) (c int) {
// TODO: fix low priority tickets starvation case.
// Low priority tickets could starve even for current counter generation
// change. We must detect the change of the generation of the current
// counter and release the lower priority tickets having previous
// generation.
if c = comparePriority(g1.p, g2.p); c == 0 {
// Demands are equal so compare tickets but in reverse order – least
// ticket is the topmost item in the queue.
c = comparebits(g2.t, g1.t)
}
return
}
func invert(c int) int {
return ^c + 1
}
func (g *wg) notify() {
g.c <- struct{}{}
}
func acquireG() *wg {
if g, ok := gp.Get().(*wg); ok {
return g
}
return &wg{
c: make(chan struct{}, 1),
pos: -1,
}
}
func releaseG(g *wg) {
c := g.c
*g = wg{
c: c,
pos: -1,
}
gp.Put(g)
}