-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection_options.go
225 lines (195 loc) · 6.77 KB
/
connection_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package clarimq
import (
"encoding/json"
"fmt"
"net"
"net/url"
"strconv"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
const (
defaultRecoveryInterval time.Duration = time.Second
defaultMaxRecoveryRetries int = 10
defaultBackOffFactor int = 2
defaultPrefetchCount int = 0
defaultConnectionNamePrefix string = "connection_"
amqpConnectionNameKey string = "connection_name"
)
type (
// ConnectionOption is an option for a Connection.
ConnectionOption func(*ConnectionOptions)
// Config is used in DialConfig and Open to specify the desired tuning
// parameters used during a connection open handshake. The negotiated tuning
// will be stored in the returned connection's Config field.
Config amqp.Config
// ConnectionOptions are used to describe how a new connection will be created.
ConnectionOptions struct {
ReturnHandler
loggers []Logger
Config *Config
codec *codec
uri string
uriMU *sync.RWMutex
PrefetchCount int
RecoveryInterval time.Duration
MaxRecoveryRetries int
BackOffFactor int
}
// ConnectionSettings holds settings for a broker connection.
ConnectionSettings struct {
// UserName contains the username of the broker user.
UserName string
// Password contains the password of the broker user.
Password string
// Host contains the hostname or ip of the broker.
Host string
// Post contains the port number the broker is listening on.
Port int
}
ReturnHandler func(Return)
)
// ToURI returns the URI representation of the ConnectionSettings.
// Includes url escaping for safe usage.
func (c *ConnectionSettings) ToURI() string {
return fmt.Sprintf("amqp://%s:%s@%s/",
url.QueryEscape(c.UserName),
url.QueryEscape(c.Password),
net.JoinHostPort(
url.QueryEscape(c.Host),
strconv.Itoa(c.Port),
),
)
}
func defaultConnectionOptions(uri string) *ConnectionOptions {
return &ConnectionOptions{
uri: uri,
uriMU: &sync.RWMutex{},
RecoveryInterval: defaultRecoveryInterval,
MaxRecoveryRetries: defaultMaxRecoveryRetries,
BackOffFactor: defaultBackOffFactor,
Config: &Config{
Properties: amqp.Table{
amqpConnectionNameKey: defaultConnectionNamePrefix + newRandomString(),
},
},
PrefetchCount: defaultPrefetchCount,
codec: &codec{
Encoder: json.Marshal,
Decoder: json.Unmarshal,
},
}
}
// WithCustomConnectionOptions sets the connection options.
//
// It can be used to set all connection options at once.
func WithCustomConnectionOptions(options *ConnectionOptions) ConnectionOption {
return func(opt *ConnectionOptions) {
if options != nil {
opt.PrefetchCount = options.PrefetchCount
opt.RecoveryInterval = options.RecoveryInterval
if options.Config != nil {
opt.Config = options.Config
}
if options.ReturnHandler != nil {
opt.ReturnHandler = options.ReturnHandler
}
}
}
}
// WithConnectionOptionConnectionName sets the name of the connection.
func WithConnectionOptionConnectionName(name string) ConnectionOption {
return func(options *ConnectionOptions) { options.Config.Properties.SetClientConnectionName(name) }
}
// WithConnectionOptionLoggers adds multiple loggers.
func WithConnectionOptionLoggers(loggers ...Logger) ConnectionOption {
return func(o *ConnectionOptions) {
o.loggers = append(o.loggers, loggers...)
}
}
// WithConnectionOptionAMQPConfig sets the amqp.Config that will be used to create the connection.
//
// Warning: this will override any values set in the connection config.
func WithConnectionOptionAMQPConfig(config *Config) ConnectionOption {
return func(o *ConnectionOptions) { o.Config = config }
}
// WithConnectionOptionPrefetchCount sets the number of messages that will be prefetched.
func WithConnectionOptionPrefetchCount(count int) ConnectionOption {
return func(o *ConnectionOptions) { o.PrefetchCount = count }
}
// WithConnectionOptionEncoder sets the encoder that will be used to encode messages.
func WithConnectionOptionEncoder(encoder JSONEncoder) ConnectionOption {
return func(options *ConnectionOptions) { options.codec.Encoder = encoder }
}
// WithConnectionOptionDecoder sets the decoder that will be used to decode messages.
func WithConnectionOptionDecoder(decoder JSONDecoder) ConnectionOption {
return func(options *ConnectionOptions) { options.codec.Decoder = decoder }
}
// WithConnectionOptionReturnHandler sets an Handler that can be used to handle undeliverable publishes.
//
// When a publish is undeliverable from being mandatory, it will be returned and can be handled
// by this return handler.
func WithConnectionOptionReturnHandler(returnHandler ReturnHandler) ConnectionOption {
return func(options *ConnectionOptions) { options.ReturnHandler = returnHandler }
}
// WithConnectionOptionRecoveryInterval sets the initial recovery interval.
//
// Default: 1s.
func WithConnectionOptionRecoveryInterval(interval time.Duration) ConnectionOption {
return func(options *ConnectionOptions) { options.RecoveryInterval = interval }
}
// WithConnectionOptionMaxRecoveryRetries sets the limit for maximum retries.
//
// Default: 10.
func WithConnectionOptionMaxRecoveryRetries(maxRetries int) ConnectionOption {
return func(options *ConnectionOptions) { options.MaxRecoveryRetries = maxRetries }
}
// WithConnectionOptionBackOffFactor sets the exponential back-off factor.
//
// Default: 2.
func WithConnectionOptionBackOffFactor(factor int) ConnectionOption {
return func(options *ConnectionOptions) { options.BackOffFactor = factor }
}
// SetLoggers provides possibility to add loggers.
func (c *Connection) SetLoggers(loggers ...Logger) {
if len(loggers) > 0 {
c.options.loggers = loggers
}
}
// SetReturnHandler provides possibility to set the json encoder.
func (c *Connection) SetEncoder(encoder JSONEncoder) {
if encoder != nil {
c.options.codec.Encoder = encoder
}
}
// SetReturnHandler provides possibility to set the json decoder.
func (c *Connection) SetDecoder(decoder JSONDecoder) {
if decoder != nil {
c.options.codec.Decoder = decoder
}
}
// SetReturnHandler provides possibility to add a return handler.
func (c *Connection) SetReturnHandler(returnHandler ReturnHandler) {
if returnHandler != nil {
c.returnHandler = returnHandler
}
}
// SetRecoveryInterval sets the recovery interval.
//
// Default: 1s.
func (c *Connection) SetRecoveryInterval(interval time.Duration) {
c.options.RecoveryInterval = interval
}
// SetMaxRecoveryRetries sets the limit for maximum retries.
//
// Default: 10.
func (c *Connection) SetMaxRecoveryRetries(maxRetries int) {
c.options.MaxRecoveryRetries = maxRetries
}
// SetBackOffFactor sets the exponential back-off factor.
//
// Default: 2.
func (c *Connection) SetBackOffFactor(factor int) {
c.options.BackOffFactor = factor
}