-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_set.go
102 lines (85 loc) · 2.41 KB
/
queue_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package gokogeri
import (
"math/rand"
"time"
)
// A QueueSet implements a strategy for deciding which queues should be checked first by a group of workers. The set
// will be consulted every time there is a need to get the next job, so it is OK to return a different slice on every
// call to GetQueues.
type QueueSet interface {
// GetQueues returns the queues sorted by the desired priority.
GetQueues() []string
// Names returns a list of queue names in the same order as they were configured, ignoring the strategy of the set,
// such as randomization, for example. This is currently used for logging.
Names() []string
}
var _ QueueSet = (*RandomQueueSet)(nil)
// A RandomQueueSet returns the queues in random order, with the likelihood of each queue being first based on their
// relative weights.
type RandomQueueSet struct {
names []string
list []string
random []string
rand *rand.Rand
}
// NewRandomQueueSet returns a new instance.
func NewRandomQueueSet() *RandomQueueSet {
return &RandomQueueSet{
list: make([]string, 0),
random: make([]string, 0),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Add adds a queue with the given relative weight.
//
// Example:
//
// qs.Add("low_priority", 1)
// qs.Add("high_priority", 3)
//
// The "low_priority" queue has a 25% chance of being checked first: 1 / (1 + 3).
//
// The "high_priority" queue has a 75% chance of being checked first: 3 / (1 + 3).
func (qs *RandomQueueSet) Add(q string, weight int) {
for i := 0; i < weight; i++ {
qs.list = append(qs.list, q)
}
qs.names = append(qs.names, q)
}
// GetQueues implements QueueSet.
func (qs *RandomQueueSet) GetQueues() []string {
qs.rand.Shuffle(len(qs.list), func(i, j int) {
tmp := qs.list[i]
qs.list[i] = qs.list[j]
qs.list[j] = tmp
})
qs.random = qs.random[:0]
for _, q := range qs.list {
found := false
for _, r := range qs.random {
if r == q {
found = true
break
}
}
if !found {
qs.random = append(qs.random, q)
}
}
return qs.random
}
// Names implements QueueSet.
func (qs *RandomQueueSet) Names() []string {
return qs.names
}
var _ QueueSet = OrderedQueueSet(nil)
// An OrderedQueueSet always returns the queues in the desired order.
type OrderedQueueSet []string
// GetQueues implements QueueSet.
func (qs OrderedQueueSet) GetQueues() []string {
return qs
}
// Names implements QueueSet.
func (qs OrderedQueueSet) Names() []string {
return qs
}