-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconnection.go
132 lines (111 loc) · 3.34 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package bunnify
import (
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type connectionOption struct {
uri string
reconnectInterval time.Duration
notificationChannel chan<- Notification
}
// WithURI allows the consumer to specify the AMQP Server.
// It should be in the format of amqp://0.0.0.0:5672
func WithURI(URI string) func(*connectionOption) {
return func(opt *connectionOption) {
opt.uri = URI
}
}
// WithReconnectInterval establishes how much time to wait
// between each attempt of connection.
func WithReconnectInterval(interval time.Duration) func(*connectionOption) {
return func(opt *connectionOption) {
opt.reconnectInterval = interval
}
}
// WithNotificationChannel specifies a go channel to receive messages
// such as connection established, reconnecting, event published, consumed, etc.
func WithNotificationChannel(notificationCh chan<- Notification) func(*connectionOption) {
return func(opt *connectionOption) {
opt.notificationChannel = notificationCh
}
}
// Connection represents a connection towards the AMQP server.
// A single connection should be enough for the entire application as the
// consuming and publishing is handled by channels.
type Connection struct {
options connectionOption
connection *amqp.Connection
connectionClosedBySystem bool
}
// NewConnection creates a new AMQP connection using the indicated
// options. If the consumer does not supply options, it will by default
// connect to a localhost instance on and try to reconnect every 10 seconds.
func NewConnection(opts ...func(*connectionOption)) *Connection {
options := connectionOption{
reconnectInterval: 10 * time.Second,
uri: "amqp://localhost:5672",
}
for _, opt := range opts {
opt(&options)
}
return &Connection{
options: options,
}
}
// Start establishes the connection towards the AMQP server.
// Only returns errors when the uri is not valid (retry won't do a thing)
func (c *Connection) Start() error {
var err error
var conn *amqp.Connection
ticker := time.NewTicker(c.options.reconnectInterval)
uri, err := amqp.ParseURI(c.options.uri)
if err != nil {
return err
}
for {
conn, err = amqp.Dial(uri.String())
if err == nil {
break
}
notifyConnectionFailed(c.options.notificationChannel, err)
<-ticker.C
}
c.connection = conn
notifyConnectionEstablished(c.options.notificationChannel)
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
if !c.connectionClosedBySystem {
notifyConnectionLost(c.options.notificationChannel)
_ = c.Start()
}
}()
return nil
}
// Closes connection with towards the AMQP server
func (c *Connection) Close() error {
c.connectionClosedBySystem = true
if c.connection != nil {
notifyClosingConnection(c.options.notificationChannel)
return c.connection.Close()
}
return nil
}
func (c *Connection) getNewChannel(source NotificationSource) (*amqp.Channel, bool) {
if c.connectionClosedBySystem {
notifyConnectionClosedBySystem(c.options.notificationChannel)
return nil, true
}
var err error
var ch *amqp.Channel
ticker := time.NewTicker(c.options.reconnectInterval)
for {
ch, err = c.connection.Channel()
if err == nil {
break
}
notifyChannelFailed(c.options.notificationChannel, source, err)
<-ticker.C
}
notifyChannelEstablished(c.options.notificationChannel, source)
return ch, false
}