-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathreader_behaviour.go
108 lines (99 loc) · 3.39 KB
/
reader_behaviour.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package streams
import (
"context"
"github.com/cenkalti/backoff/v4"
)
// ReaderBehaviour is a middleware function with extra functionality which will be executed prior a ReaderHandleFunc
// or Reader component for every stream-reading job instance registered into a Hub.
//
// The middleware gets injected the context ReaderNode (the stream-reading job to be executed), the root Hub instance and
// the parent middleware function.
//
// Moreover, there are built-in behaviours ready to be used with streams:
//
// - Retry backoff
//
// - Correlation and causation ID injection
//
// - Consumer group injection
//
// - Auto-unmarshalling (*only if using reflection-based stream registry or GoType was defined when registering stream)
//
// - Logging*
//
// - Metrics*
//
// - Tracing*
//
// *Manual specification on configuration required
type ReaderBehaviour func(node *ReaderNode, hub *Hub, next ReaderHandleFunc) ReaderHandleFunc
// ReaderBaseBehaviours default ReaderBehaviours
//
// Behaviours will be executed in descending order
var ReaderBaseBehaviours = []ReaderBehaviour{
unmarshalReaderBehaviour,
injectGroupReaderBehaviour,
injectTxIDsReaderBehaviour,
retryReaderBehaviour,
}
// ReaderBaseBehavioursNoUnmarshal default ReaderBehaviours without unmarshaling
//
// Behaviours will be executed in descending order
var ReaderBaseBehavioursNoUnmarshal = []ReaderBehaviour{
injectGroupReaderBehaviour,
injectTxIDsReaderBehaviour,
retryReaderBehaviour,
}
var retryReaderBehaviour ReaderBehaviour = func(node *ReaderNode, _ *Hub, next ReaderHandleFunc) ReaderHandleFunc {
b := backoff.NewExponentialBackOff()
b.InitialInterval = node.RetryInitialInterval
b.MaxInterval = node.RetryMaxInterval
b.MaxElapsedTime = node.RetryTimeout
return func(ctx context.Context, message Message) error {
return backoff.Retry(func() error {
return next(ctx, message)
}, b)
}
}
var unmarshalReaderBehaviour ReaderBehaviour = func(_ *ReaderNode, h *Hub, next ReaderHandleFunc) ReaderHandleFunc {
return func(ctx context.Context, message Message) error {
metadata, err := h.StreamRegistry.GetByStreamName(message.Stream)
if err != nil {
return err
}
var schemaDef string
if h.SchemaRegistry != nil {
schemaDef, err = h.SchemaRegistry.GetSchemaDefinition(metadata.SchemaDefinitionName,
metadata.SchemaVersion)
if err != nil {
return err
}
}
if metadata.GoType != nil {
decodedData := metadata.GoType.New()
if err = h.Marshaler.Unmarshal(schemaDef, message.Data, decodedData); err != nil {
return err
}
switch h.Marshaler.ContentType() {
case MarshalerProtoContentType:
message.DecodedData = decodedData
default:
message.DecodedData = metadata.GoType.Indirect(decodedData)
}
}
return next(ctx, message)
}
}
var injectGroupReaderBehaviour ReaderBehaviour = func(node *ReaderNode, _ *Hub, next ReaderHandleFunc) ReaderHandleFunc {
return func(ctx context.Context, message Message) error {
message.GroupName = node.Group
return next(ctx, message)
}
}
var injectTxIDsReaderBehaviour ReaderBehaviour = func(_ *ReaderNode, h *Hub, next ReaderHandleFunc) ReaderHandleFunc {
return func(ctx context.Context, message Message) error {
ctxCorrelation := context.WithValue(ctx, ContextCorrelationID, MessageContextKey(message.CorrelationID))
ctxCausation := context.WithValue(ctxCorrelation, ContextCausationID, MessageContextKey(message.ID))
return next(ctxCausation, message)
}
}