Skip to content

Commit

Permalink
chore: tests
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-svensson committed Feb 9, 2024
1 parent 50880b6 commit f9d7a05
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 42 deletions.
20 changes: 4 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,8 @@ func (c *Connection) connectToAmqpURL() error {
return nil
}

func (c *Connection) addHandler(queueName, routingKey string, handler wrappedHandler) error {
return c.queueConsumers.add(queueName, routingKey, handler)
}

func getDeliveryInfo(queueName string, delivery amqp.Delivery) DeliveryInfo {
deliveryInfo := DeliveryInfo{
Queue: queueName,
Exchange: delivery.Exchange,
RoutingKey: delivery.RoutingKey,
Headers: Headers(delivery.Headers),
}
return deliveryInfo
}

func (c *Connection) messageHandlerBindQueueToExchange(cfg *QueueBindingConfig) error {
if err := c.addHandler(cfg.queueName, cfg.routingKey, cfg.handler); err != nil {
if err := c.queueConsumers.add(cfg.queueName, cfg.routingKey, cfg.handler); err != nil {
return err
}

Expand Down Expand Up @@ -242,8 +228,10 @@ func newConnection(serviceName string, uri amqp.URI) *Connection {

func (c *Connection) setup() error {
for _, consumer := range *c.queueConsumers {
if err := consumer.consume(c.channel, c.keyToType, c.notificationCh); err != nil {
if deliveries, err := consumer.consume(c.channel, c.keyToType, c.notificationCh); err != nil {
return fmt.Errorf("failed to create consumer for queue %s. %v", consumer.queue, err)
} else {
go consumer.loop(deliveries)
}
}
return nil
Expand Down
19 changes: 0 additions & 19 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,25 +334,6 @@ func TestResponseWrapper(t *testing.T) {
}
}

func Test_messageHandlerBindQueueToExchange(t *testing.T) {
e := errors.New("failed to create queue")
channel := &MockAmqpChannel{
QueueDeclarationError: &e,
}
conn := mockConnection(channel)

cfg := &QueueBindingConfig{
routingKey: "routingkey",
handler: nil,
queueName: "queue",
exchangeName: "exchange",
kind: kindDirect,
headers: nil,
}
err := conn.messageHandlerBindQueueToExchange(cfg)
require.EqualError(t, err, "failed to create queue")
}

func Test_Publisher_ReservedHeader(t *testing.T) {
p := NewPublisher()
err := p.Publish(context.Background(), TestMessage{Msg: "test"}, Header{"service", "header"})
Expand Down
22 changes: 15 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ type queueConsumer struct {
notificationCh chan<- Notification
}

func (c *queueConsumer) consume(channel AmqpChannel, routingKeyToType routingKeyToType, notificationCh chan<- Notification) error {
func (c *queueConsumer) consume(channel AmqpChannel, routingKeyToType routingKeyToType, notificationCh chan<- Notification) (<-chan amqp.Delivery, error) {
c.routingKeyToType = routingKeyToType
c.notificationCh = notificationCh
deliveries, err := channel.Consume(c.queue, "", false, false, false, false, nil)
if err != nil {
return err
return nil, err
}
c.routingKeyToType = routingKeyToType
c.notificationCh = notificationCh
go c.loop(deliveries)

return nil
return deliveries, nil
}

func (c *queueConsumer) loop(deliveries <-chan amqp.Delivery) {
Expand Down Expand Up @@ -104,3 +102,13 @@ func (c *queueConsumers) add(queueName, routingKey string, handler wrappedHandle
consumerForQueue.handlers.add(routingKey, handler)
return nil
}

func getDeliveryInfo(queueName string, delivery amqp.Delivery) DeliveryInfo {
deliveryInfo := DeliveryInfo{
Queue: queueName,
Exchange: delivery.Exchange,
RoutingKey: delivery.RoutingKey,
Headers: Headers(delivery.Headers),
}
return deliveryInfo
}
118 changes: 118 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// MIT License
//
// Copyright (c) 2024 sparetimecoders
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package goamqp

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
)

func Test_Consume(t *testing.T) {
consumer := queueConsumer{
queue: "aQueue",
handlers: routingKeyHandler{},
}
channel := &MockAmqpChannel{consumeFn: func(queue, consumerName string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
require.Equal(t, consumer.queue, queue)
require.Equal(t, "", consumerName)
require.False(t, autoAck)
require.False(t, exclusive)
require.False(t, noLocal)
require.False(t, noWait)
require.Nil(t, args)
deliveries := make(chan amqp.Delivery, 1)
deliveries <- amqp.Delivery{
MessageId: "MESSAGE_ID",
}
close(deliveries)
return deliveries, nil
}}

deliveries, err := consumer.consume(channel, nil, nil)
require.NoError(t, err)
delivery := <-deliveries
require.Equal(t, "MESSAGE_ID", delivery.MessageId)
}

func Test_Consume_Failing(t *testing.T) {
consumer := queueConsumer{
queue: "aQueue",
handlers: routingKeyHandler{},
}
channel := &MockAmqpChannel{consumeFn: func(queue, consumerName string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
return nil, fmt.Errorf("failed")
}}

_, err := consumer.consume(channel, nil, nil)
require.EqualError(t, err, "failed")
}

func Test_ConsumerLoop(t *testing.T) {
acker := MockAcknowledger{
Acks: make(chan Ack, 2),
Nacks: make(chan Nack, 1),
Rejects: make(chan Reject, 1),
}
handler := newWrappedHandler(func(ctx context.Context, msg ConsumableEvent[Message]) error {
if msg.Payload.Ok {
return nil
}
return errors.New("failed")
})

consumer := queueConsumer{
handlers: routingKeyHandler{},
}
consumer.handlers.add("key1", handler)
consumer.handlers.add("key2", handler)

queueDeliveries := make(chan amqp.Delivery, 4)

queueDeliveries <- delivery(acker, "key1", true)
queueDeliveries <- delivery(acker, "key2", true)
queueDeliveries <- delivery(acker, "key2", false)
queueDeliveries <- delivery(acker, "missing", true)
close(queueDeliveries)

consumer.loop(queueDeliveries)

require.Len(t, acker.Rejects, 1)
require.Len(t, acker.Nacks, 1)
require.Len(t, acker.Acks, 2)
}

func delivery(acker MockAcknowledger, routingKey string, success bool) amqp.Delivery {
body, _ := json.Marshal(Message{success})

return amqp.Delivery{
Body: body,
RoutingKey: routingKey,
Acknowledger: &acker,
}
}

0 comments on commit f9d7a05

Please sign in to comment.