-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathreader_supervisor_test.go
68 lines (59 loc) · 2.02 KB
/
reader_supervisor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package streams
import (
"context"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestReaderSupervisor_ForkNode(t *testing.T) {
h := NewHub(WithReaderBaseOptions(WithGroup("bar-queue")))
sv := newReaderSupervisor(h)
sv.forkNode("")
assert.Equal(t, 0, len(sv.readerRegistry))
sv.forkNode("foo")
itemInterface, _ := sv.readerRegistry["foo"].Get(0)
item := itemInterface.(ReaderNode)
assert.Equal(t, DefaultConcurrencyLevel, item.ConcurrencyLevel)
assert.Equal(t, DefaultRetryInitialInterval, item.RetryInitialInterval)
assert.Equal(t, DefaultRetryMaxInterval, item.RetryMaxInterval)
assert.Equal(t, DefaultRetryTimeout, item.RetryTimeout)
assert.Equal(t, "bar-queue", item.Group)
sv.forkNode("baz")
itemInterface, _ = sv.readerRegistry["baz"].Get(0)
item = itemInterface.(ReaderNode)
assert.Equal(t, "bar-queue", item.Group)
sv.forkNode("foobar", WithGroup("barbaz-queue"), WithHandlerFunc(func(ctx context.Context, message Message) error {
return nil
}))
itemInterface, _ = sv.readerRegistry["foobar"].Get(0)
item = itemInterface.(ReaderNode)
assert.Equal(t, "barbaz-queue", item.Group)
assert.NotNil(t, item.HandlerFunc)
}
func BenchmarkReaderSupervisor_ForkNode(b *testing.B) {
h := NewHub(WithReaderBaseOptions(WithGroup("bar-queue"), WithConcurrencyLevel(10)))
sv := newReaderSupervisor(h)
var handler ReaderHandleFunc
handler = func(_ context.Context, _ Message) error {
return nil
}
for i := 0; i < b.N; i++ {
b.ReportAllocs()
sv.forkNode("baz", WithHandlerFunc(handler))
}
}
func TestReaderSupervisor_StartNodes(t *testing.T) {
baseCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
h := NewHub(WithReaderBaseOptions(WithDriver(readerNoop{}), WithHandlerFunc(func(ctx context.Context, message Message) error {
return nil
})))
sv := newReaderSupervisor(h)
sv.forkNode("foo")
sv.forkNode("foo")
sv.startNodes(baseCtx)
assert.Equal(t, 4, runtime.NumGoroutine())
time.Sleep(time.Millisecond * 100)
assert.Equal(t, 2, runtime.NumGoroutine())
}