-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpublisher.go
63 lines (53 loc) · 1.53 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package bunnify
import (
"context"
"encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
// Publisher is used for publishing events.
type Publisher struct {
inUseChannel *amqp.Channel
getNewChannel func() (*amqp.Channel, bool)
}
// NewPublisher creates a publisher using the specified connection.
func (c *Connection) NewPublisher() *Publisher {
return &Publisher{
getNewChannel: func() (*amqp.Channel, bool) {
return c.getNewChannel(NotificationSourcePublisher)
},
}
}
// Publish publishes an event to the specified exchange.
// If the channel is closed, it will retry until a channel is obtained.
func (p *Publisher) Publish(
ctx context.Context,
exchange, routingKey string,
event PublishableEvent) error {
if p.inUseChannel == nil || p.inUseChannel.IsClosed() {
channel, connectionClosed := p.getNewChannel()
if connectionClosed {
return fmt.Errorf("connection closed by system, channel will not reconnect")
}
p.inUseChannel = channel
}
b, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("could not marshal event: %w", err)
}
publishing := amqp.Publishing{
ContentEncoding: "application/json",
CorrelationId: event.CorrelationID,
MessageId: event.ID,
Timestamp: event.Timestamp,
Body: b,
Headers: injectToHeaders(ctx),
}
err = p.inUseChannel.PublishWithContext(ctx, exchange, routingKey, true, false, publishing)
if err != nil {
eventPublishFailed(exchange, routingKey)
return err
}
eventPublishSucceed(exchange, routingKey)
return nil
}