-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
756e4cd
commit 3e07514
Showing
14 changed files
with
585 additions
and
162 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package bunnify | ||
|
||
import ( | ||
"encoding/json" | ||
"time" | ||
|
||
amqp "github.com/rabbitmq/amqp091-go" | ||
) | ||
|
||
func (c Consumer) loop(channel *amqp.Channel, deliveries <-chan amqp.Delivery) { | ||
for delivery := range deliveries { | ||
startTime := time.Now() | ||
deliveryInfo := getDeliveryInfo(c.queueName, delivery) | ||
EventReceived(c.queueName, deliveryInfo.RoutingKey) | ||
|
||
// Establish which handler is invoked | ||
handler, ok := c.options.handlers[deliveryInfo.RoutingKey] | ||
if !ok { | ||
if c.options.defaultHandler == nil { | ||
_ = delivery.Nack(false, false) | ||
EventWithoutHandler(c.queueName, deliveryInfo.RoutingKey) | ||
continue | ||
} | ||
handler = c.options.defaultHandler | ||
} | ||
|
||
uevt := unmarshalEvent{DeliveryInfo: deliveryInfo} | ||
|
||
// For this error to happen an event not published by Bunnify is required | ||
if err := json.Unmarshal(delivery.Body, &uevt); err != nil { | ||
_ = delivery.Nack(false, false) | ||
EventNotParsable(c.queueName, deliveryInfo.RoutingKey) | ||
continue | ||
} | ||
|
||
tracingCtx := extractToContext(delivery.Headers) | ||
if err := handler(tracingCtx, uevt); err != nil { | ||
elapsed := time.Since(startTime).Milliseconds() | ||
notifyEventHandlerFailed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed, err) | ||
_ = delivery.Nack(false, false) | ||
EventNack(c.queueName, deliveryInfo.RoutingKey, elapsed) | ||
continue | ||
} | ||
|
||
elapsed := time.Since(startTime).Milliseconds() | ||
notifyEventHandlerSucceed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed) | ||
_ = delivery.Ack(false) | ||
EventAck(c.queueName, deliveryInfo.RoutingKey, elapsed) | ||
} | ||
|
||
// If the for exits, the channel stopped. Close it, | ||
// notify the error and start the consumer so it will start another loop. | ||
if !channel.IsClosed() { | ||
channel.Close() | ||
} | ||
|
||
notifyChannelLost(c.options.notificationCh, NotificationSourceConsumer) | ||
|
||
if err := c.Consume(); err != nil { | ||
notifyChannelFailed(c.options.notificationCh, NotificationSourceConsumer, err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package bunnify | ||
|
||
import ( | ||
"encoding/json" | ||
) | ||
|
||
type consumerOption struct { | ||
deadLetterQueue string | ||
exchange string | ||
defaultHandler wrappedHandler | ||
handlers map[string]wrappedHandler | ||
prefetchCount int | ||
prefetchSize int | ||
quorumQueue bool | ||
notificationCh chan<- Notification | ||
} | ||
|
||
// WithBindingToExchange specifies the exchange on which the queue | ||
// will bind for the handlers provided. | ||
func WithBindingToExchange(exchange string) func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.exchange = exchange | ||
} | ||
} | ||
|
||
// WithQoS specifies the prefetch count and size for the consumer. | ||
func WithQoS(prefetchCount, prefetchSize int) func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.prefetchCount = prefetchCount | ||
opt.prefetchSize = prefetchSize | ||
} | ||
} | ||
|
||
// WithQuorumQueue specifies that the queue to consume will be created as quorum queue. | ||
// Quorum queues are used when data safety is the priority. | ||
func WithQuorumQueue() func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.quorumQueue = true | ||
} | ||
} | ||
|
||
// WithDeadLetterQueue indicates which queue will receive the events | ||
// that were NACKed for this consumer. | ||
func WithDeadLetterQueue(queueName string) func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.deadLetterQueue = queueName | ||
} | ||
} | ||
|
||
// WithDefaultHandler specifies a handler that can be use for any type | ||
// of routing key without a defined handler. This is mostly convenient if you | ||
// don't care about the specific payload of the event, which will be received as a byte array. | ||
func WithDefaultHandler(handler EventHandler[json.RawMessage]) func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.defaultHandler = newWrappedHandler(handler) | ||
} | ||
} | ||
|
||
// WithHandler specifies under which routing key the provided handler will be invoked. | ||
// The routing key indicated here will be bound to the queue if the WithBindingToExchange is supplied. | ||
func WithHandler[T any](routingKey string, handler EventHandler[T]) func(*consumerOption) { | ||
return func(opt *consumerOption) { | ||
opt.handlers[routingKey] = newWrappedHandler(handler) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package bunnify | ||
|
||
import ( | ||
amqp "github.com/rabbitmq/amqp091-go" | ||
) | ||
|
||
func getDeliveryInfo(queueName string, delivery amqp.Delivery) DeliveryInfo { | ||
deliveryInfo := DeliveryInfo{ | ||
Queue: queueName, | ||
Exchange: delivery.Exchange, | ||
RoutingKey: delivery.RoutingKey, | ||
} | ||
|
||
// If routing key is empty, it is mostly due to the event being dead lettered. | ||
// Check for the original delivery information in the headers | ||
if delivery.RoutingKey == "" { | ||
deaths, ok := delivery.Headers["x-death"].([]interface{}) | ||
if !ok || len(deaths) == 0 { | ||
return deliveryInfo | ||
} | ||
|
||
death, ok := deaths[0].(amqp.Table) | ||
if !ok { | ||
return deliveryInfo | ||
} | ||
|
||
queue, ok := death["queue"].(string) | ||
if !ok { | ||
return deliveryInfo | ||
} | ||
deliveryInfo.Queue = queue | ||
|
||
exchange, ok := death["exchange"].(string) | ||
if !ok { | ||
return deliveryInfo | ||
} | ||
deliveryInfo.Exchange = exchange | ||
|
||
routingKeys, ok := death["routing-keys"].([]interface{}) | ||
if !ok || len(routingKeys) == 0 { | ||
return deliveryInfo | ||
} | ||
key, ok := routingKeys[0].(string) | ||
if !ok { | ||
return deliveryInfo | ||
} | ||
deliveryInfo.RoutingKey = key | ||
} | ||
|
||
return deliveryInfo | ||
} |
File renamed without changes.
Oops, something went wrong.