diff --git a/bunnify/connection.go b/bunnify/connection.go index 616d172..0698c34 100644 --- a/bunnify/connection.go +++ b/bunnify/connection.go @@ -62,13 +62,19 @@ func NewConnection(opts ...func(*connectionOption)) *Connection { } // Start establishes the connection towards the AMQP server. -func (c *Connection) Start() { +// Only returns errors when the uri is not valid (retry won't do a thing) +func (c *Connection) Start() error { var err error var conn *amqp.Connection ticker := time.NewTicker(c.options.reconnectInterval) + uri, err := amqp.ParseURI(c.options.uri) + if err != nil { + return err + } + for { - conn, err = amqp.Dial(c.options.uri) + conn, err = amqp.Dial(uri.String()) if err == nil { break } @@ -84,9 +90,11 @@ func (c *Connection) Start() { <-conn.NotifyClose(make(chan *amqp.Error)) if !c.connectionClosedBySystem { notifyConnectionLost(c.options.notificationChannel) - c.Start() + _ = c.Start() } }() + + return nil } // Closes connection with towards the AMQP server diff --git a/tests/consumer_invalid_options_test.go b/tests/consumer_invalid_options_test.go index 524aee2..1062dcb 100644 --- a/tests/consumer_invalid_options_test.go +++ b/tests/consumer_invalid_options_test.go @@ -7,13 +7,30 @@ import ( "go.uber.org/goleak" ) +func TestConnectionReturnErrorWhenNotValidURI(t *testing.T) { + // Setup + connection := bunnify.NewConnection(bunnify.WithURI("13123")) + + // Exercise + err := connection.Start() + + // Assert + if err == nil { + t.Fatal(err) + } + + goleak.VerifyNone(t) +} + func TestConsumerShouldReturnErrorWhenNoHandlersSpecified(t *testing.T) { // Setup connection := bunnify.NewConnection() - connection.Start() - consumer := connection.NewConsumer("queueName") + if err := connection.Start(); err != nil { + t.Fatal(err) + } // Exercise + consumer := connection.NewConsumer("queueName") err := consumer.Consume() // Assert diff --git a/tests/consumer_publish_metrics_test.go b/tests/consumer_publish_metrics_test.go index 13c1e38..19cdd15 100644 --- a/tests/consumer_publish_metrics_test.go +++ b/tests/consumer_publish_metrics_test.go @@ -17,7 +17,10 @@ import ( func TestConsumerPublisherMetrics(t *testing.T) { t.Run("ACK event", func(t *testing.T) { connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } + publisher := connection.NewPublisher() queueName := uuid.NewString() @@ -75,7 +78,10 @@ func TestConsumerPublisherMetrics(t *testing.T) { t.Run("NACK event", func(t *testing.T) { connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } + publisher := connection.NewPublisher() queueName := uuid.NewString() @@ -133,7 +139,9 @@ func TestConsumerPublisherMetrics(t *testing.T) { } connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } queueName := uuid.NewString() exchangeName := uuid.NewString() @@ -157,7 +165,9 @@ func TestConsumerPublisherMetrics(t *testing.T) { } connection = bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } // Register again but with other routing key // The existing binding on the AMQP instance still exists diff --git a/tests/consumer_publish_test.go b/tests/consumer_publish_test.go index 6111828..6189204 100644 --- a/tests/consumer_publish_test.go +++ b/tests/consumer_publish_test.go @@ -40,7 +40,9 @@ func TestConsumerPublisher(t *testing.T) { bunnify.WithReconnectInterval(1*time.Second), bunnify.WithNotificationChannel(notificationChannel)) - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } var consumedEvent bunnify.ConsumableEvent[orderCreated] eventHandler := func(ctx context.Context, event bunnify.ConsumableEvent[orderCreated]) error { diff --git a/tests/consumer_publish_tracer_test.go b/tests/consumer_publish_tracer_test.go index 941fa46..bc9b430 100644 --- a/tests/consumer_publish_tracer_test.go +++ b/tests/consumer_publish_tracer_test.go @@ -25,7 +25,9 @@ func TestConsumerPublisherTracing(t *testing.T) { routingKey := uuid.NewString() connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } // Exercise consuming var actualTraceID trace.TraceID diff --git a/tests/dead_letter_receives_event_test.go b/tests/dead_letter_receives_event_test.go index 6c2797e..86b2708 100644 --- a/tests/dead_letter_receives_event_test.go +++ b/tests/dead_letter_receives_event_test.go @@ -44,7 +44,9 @@ func TestDeadLetterReceivesEvent(t *testing.T) { // Exercise connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } consumer := connection.NewConsumer( queueName, diff --git a/tests/go_routines_not_leaked_test.go b/tests/go_routines_not_leaked_test.go index 86fa81c..1984f53 100644 --- a/tests/go_routines_not_leaked_test.go +++ b/tests/go_routines_not_leaked_test.go @@ -15,7 +15,9 @@ func TestGoRoutinesAreNotLeaked(t *testing.T) { // Setup ticker := time.NewTicker(2 * time.Second) connection := bunnify.NewConnection() - connection.Start() + if err := connection.Start(); err != nil { + t.Fatal(err) + } // Exercise for i := 0; i < 100; i++ {