Skip to content

Commit

Permalink
chore: use Context with Publisher (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-svensson authored Oct 24, 2022
1 parent 452b695 commit c45fc92
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 38 deletions.
19 changes: 9 additions & 10 deletions _integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (suite *IntegrationTestSuite) Test_RequestResponse() {
}, IncomingResponse{}))
defer client.Close()

err = publish.Publish(&Incoming{Query: clientQuery})
err = publish.PublishWithContext(context.Background(), &Incoming{Query: clientQuery})
require.NoError(suite.T(), err)

<-closer
Expand Down Expand Up @@ -234,12 +234,11 @@ func (suite *IntegrationTestSuite) Test_EventStream_MultipleConsumers() {
closer := make(chan bool, 2)
routingKey := "key1"
clientQuery := "test"
publish, err := NewPublisher2(Incoming{})
//publish, err := NewPublisher(
// Route{
// Type: Incoming{},
// Key: routingKey,
// })
publish, err := NewPublisher(
Route{
Type: Incoming{},
Key: routingKey,
})
require.NoError(suite.T(), err)
server := createConnection(suite, serverServiceName,
EventStreamPublisher(publish))
Expand All @@ -260,7 +259,7 @@ func (suite *IntegrationTestSuite) Test_EventStream_MultipleConsumers() {
}, Incoming{}))
defer client2.Close()

err = publish.Publish(&Incoming{Query: clientQuery})
err = publish.PublishWithContext(context.Background(), &Incoming{Query: clientQuery})
require.NoError(suite.T(), err)

go forceClose(closer, 3)
Expand Down Expand Up @@ -371,9 +370,9 @@ func (suite *IntegrationTestSuite) Test_EventStream() {
}, IncomingResponse{}))
defer client1.Close()

err = publish.Publish(&Incoming{Query: clientQuery})
err = publish.PublishWithContext(context.Background(), &Incoming{Query: clientQuery})
require.NoError(suite.T(), err)
err = publish.Publish(&IncomingResponse{Value: clientQuery})
err = publish.PublishWithContext(context.Background(), &IncomingResponse{Value: clientQuery})
require.NoError(suite.T(), err)

go forceClose(closer, 3)
Expand Down
16 changes: 10 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Connection struct {
}

// ServiceResponsePublisher represents the function that is called to publish a response
type ServiceResponsePublisher func(targetService, routingKey string, msg any) error
type ServiceResponsePublisher func(ctx context.Context, targetService, routingKey string, msg any) error

// QueueBindingConfig is a wrapper around the actual amqp queue configuration
type QueueBindingConfig struct {
Expand Down Expand Up @@ -346,8 +346,8 @@ func RequestResponseHandler(routingKey string, handler HandlerFunc, eventType an
}

// PublishServiceResponse sends a message to targetService as a handler response
func (c *Connection) PublishServiceResponse(targetService, routingKey string, msg any) error {
return c.publishMessage(msg, routingKey, serviceResponseExchangeName(c.serviceName), amqp.Table{headerService: targetService})
func (c *Connection) PublishServiceResponse(ctx context.Context, targetService, routingKey string, msg any) error {
return c.publishMessage(ctx, msg, routingKey, serviceResponseExchangeName(c.serviceName), amqp.Table{headerService: targetService})
}

// PublishNotify see amqp.Channel.Confirm
Expand Down Expand Up @@ -403,7 +403,9 @@ type AmqpChannel interface {
QueueBind(queue, key, exchange string, noWait bool, args amqp.Table) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
// Deprecated: Use PublishWithContext instead.
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation
NotifyClose(c chan *amqp.Error) chan *amqp.Error
Expand Down Expand Up @@ -457,7 +459,9 @@ func responseWrapper(handler HandlerFunc, routingKey string, publisher ServiceRe
if err != nil {
return nil, errors.Wrap(err, "failed to extract service name")
}
err = publisher(service, routingKey, resp)
// TODO Pass context to HandlerFunc instead and use here?
ctx := context.Background()
err = publisher(ctx, service, routingKey, resp)
if err != nil {
return nil, errors.Wrapf(err, "failed to publish response")
}
Expand Down Expand Up @@ -585,7 +589,7 @@ func (c *Connection) parseMessage(jsonContent []byte, eventType eventType) (any,
return target, nil
}

func (c *Connection) publishMessage(msg any, routingKey, exchangeName string, headers amqp.Table) error {
func (c *Connection) publishMessage(ctx context.Context, msg any, routingKey, exchangeName string, headers amqp.Table) error {
jsonBytes, err := json.Marshal(msg)
if err != nil {
return err
Expand All @@ -599,7 +603,7 @@ func (c *Connection) publishMessage(msg any, routingKey, exchangeName string, he
Headers: headers,
}

return c.channel.Publish(exchangeName,
return c.channel.PublishWithContext(ctx, exchangeName,
routingKey,
false,
false,
Expand Down
28 changes: 14 additions & 14 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func Test_Publish(t *testing.T) {
channel: channel,
messageLogger: noOpMessageLogger(),
}
err := c.publishMessage(Message{true}, "key", "exchange", headers)
err := c.publishMessage(context.Background(), Message{true}, "key", "exchange", headers)
require.NoError(t, err)

publish := <-channel.Published
Expand Down Expand Up @@ -345,7 +345,7 @@ func Test_Publish_Marshal_Errir(t *testing.T) {
channel: channel,
messageLogger: noOpMessageLogger(),
}
err := c.publishMessage(math.Inf(1), "key", "exchange", headers)
err := c.publishMessage(context.Background(), math.Inf(1), "key", "exchange", headers)
require.EqualError(t, err, "json: unsupported value: +Inf")
}
func TestResponseWrapper(t *testing.T) {
Expand Down Expand Up @@ -558,13 +558,13 @@ func Test_EventStreamPublisher_Ok(t *testing.T) {
require.Equal(t, 0, len(channel.QueueDeclarations))
require.Equal(t, 0, len(channel.BindingDeclarations))

err = p.Publish(TestMessage{"test", true})
err = p.PublishWithContext(context.Background(), TestMessage{"test", true})
require.NoError(t, err)

published := <-channel.Published
require.Equal(t, "key", published.key)

err = p.Publish(TestMessage{Msg: "test"}, Header{"x-header", "header"})
err = p.PublishWithContext(context.Background(), TestMessage{Msg: "test"}, Header{"x-header", "header"})
require.NoError(t, err)
published = <-channel.Published

Expand All @@ -586,13 +586,13 @@ func Test_QueuePublisher_Ok(t *testing.T) {
require.Equal(t, 0, len(channel.QueueDeclarations))
require.Equal(t, 0, len(channel.BindingDeclarations))

err = p.Publish(TestMessage{"test", true})
err = p.PublishWithContext(context.Background(), TestMessage{"test", true})
require.NoError(t, err)

published := <-channel.Published
require.Equal(t, "key", published.key)

err = p.Publish(TestMessage{Msg: "test"}, Header{"x-header", "header"})
err = p.PublishWithContext(context.Background(), TestMessage{Msg: "test"}, Header{"x-header", "header"})
require.NoError(t, err)
published = <-channel.Published

Expand All @@ -605,7 +605,7 @@ func Test_QueuePublisher_Ok(t *testing.T) {
func Test_Publisher_ReservedHeader(t *testing.T) {
p, err := NewPublisher(Route{TestMessage{}, "key"}, Route{TestMessage2{}, "key2"})
require.NoError(t, err)
err = p.Publish(TestMessage{Msg: "test"}, Header{"service", "header"})
err = p.PublishWithContext(context.Background(), TestMessage{Msg: "test"}, Header{"service", "header"})
require.EqualError(t, err, "reserved key service used, please change to use another one")
}

Expand Down Expand Up @@ -635,7 +635,7 @@ func Test_UseMessageLogger(t *testing.T) {
)
require.NotNil(t, conn.messageLogger)

err = p.Publish(TestMessage{"test", true})
err = p.PublishWithContext(context.Background(), TestMessage{"test", true})
require.NoError(t, err)
<-channel.Published

Expand Down Expand Up @@ -830,7 +830,7 @@ func Test_ServicePublisher_Ok(t *testing.T) {
require.Equal(t, 0, len(channel.QueueDeclarations))
require.Equal(t, 0, len(channel.BindingDeclarations))

err = p.Publish(TestMessage{"test", true})
err = p.PublishWithContext(context.Background(), TestMessage{"test", true})
require.NoError(t, err)
published := <-channel.Published
require.Equal(t, "key", published.key)
Expand All @@ -852,11 +852,11 @@ func Test_ServicePublisher_Multiple(t *testing.T) {
require.Equal(t, 0, len(channel.QueueDeclarations))
require.Equal(t, 0, len(channel.BindingDeclarations))

err = p.Publish(TestMessage{"test", true})
err = p.PublishWithContext(context.Background(), TestMessage{"test", true})
require.NoError(t, err)
err = p.Publish(TestMessage2{Msg: "msg"})
err = p.PublishWithContext(context.Background(), TestMessage2{Msg: "msg"})
require.NoError(t, err)
err = p.Publish(TestMessage{"test2", false})
err = p.PublishWithContext(context.Background(), TestMessage{"test2", false})
require.NoError(t, err)
published := <-channel.Published
require.Equal(t, "key", published.key)
Expand Down Expand Up @@ -885,7 +885,7 @@ func Test_ServicePublisher_NoMatchingRoute(t *testing.T) {
require.Equal(t, 0, len(channel.QueueDeclarations))
require.Equal(t, 0, len(channel.BindingDeclarations))

err = p.Publish(&TestMessage{Msg: "test"})
err = p.PublishWithContext(context.Background(), &TestMessage{Msg: "test"})
require.EqualError(t, err, "no routingkey configured for message of type *goamqp.TestMessage")
}

Expand Down Expand Up @@ -1005,7 +1005,7 @@ type mockPublisher struct {
published any
}

func (m *mockPublisher) publish(targetService, routingKey string, msg any) error {
func (m *mockPublisher) publish(ctx context.Context, targetService, routingKey string, msg any) error {
if m.err != nil {
return m.err
}
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Example() {
EventStreamPublisher(publisher),
)
checkError(err)
err = publisher.Publish(IncomingMessage{"OK"})
err = publisher.PublishWithContext(ctx, IncomingMessage{"OK"})
checkError(err)
time.Sleep(time.Second)
err = connection.Close()
Expand Down
4 changes: 2 additions & 2 deletions examples/event-stream/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func ExampleEventStream() {
err = statService.Start(ctx)
checkError(err)

err = orderPublisher.Publish(OrderCreated{Id: "id"})
err = orderPublisher.PublishWithContext(context.Background(), OrderCreated{Id: "id"})
checkError(err)
err = orderPublisher.Publish(OrderUpdated{Id: "id"})
err = orderPublisher.PublishWithContext(context.Background(), OrderUpdated{Id: "id"})
checkError(err)

time.Sleep(2 * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion examples/request-response/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func ExampleRequestResponse() {
)
checkError(err)

err = publisher.Publish(Request{Data: "test"})
err = publisher.PublishWithContext(context.Background(), Request{Data: "test"})
checkError(err)

time.Sleep(time.Second)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/sparetimecoders/goamqp
go 1.18

require (
github.com/caarlos0/env v3.5.0+incompatible
github.com/google/uuid v1.3.0
github.com/pkg/errors v0.9.1
github.com/rabbitmq/amqp091-go v1.5.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/caarlos0/env v3.5.0+incompatible h1:Yy0UN8o9Wtr/jGHZDpCBLpNrzcFLLM2yixi/rBrKyJs=
github.com/caarlos0/env v3.5.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 4 additions & 0 deletions mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package goamqp

import (
"context"
"errors"
"reflect"

Expand Down Expand Up @@ -178,6 +179,9 @@ func (m *MockAmqpChannel) ExchangeDeclare(name, kind string, durable, autoDelete
return nil
}
func (m *MockAmqpChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
return m.PublishWithContext(context.Background(), exchange, key, mandatory, immediate, msg)
}
func (m *MockAmqpChannel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
if key == "failed" {
return errors.New("failed")
}
Expand Down
8 changes: 7 additions & 1 deletion publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package goamqp

import (
"context"
"fmt"
"reflect"

Expand Down Expand Up @@ -55,7 +56,12 @@ func NewPublisher(routes ...Route) (*Publisher, error) {
}

// Publish publishes a message to a given exchange
// Deprecated: Use PublishWithContext instead.
func (p *Publisher) Publish(msg any, headers ...Header) error {
return p.PublishWithContext(context.Background(), msg, headers...)
}

func (p *Publisher) PublishWithContext(ctx context.Context, msg any, headers ...Header) error {
table := amqp.Table{}
for _, v := range p.defaultHeaders {
table[v.Key] = v.Value
Expand All @@ -73,7 +79,7 @@ func (p *Publisher) Publish(msg any, headers ...Header) error {
key = t.Elem()
}
if key, ok := p.typeToRoutingKey[key]; ok {
return p.connection.publishMessage(msg, key, p.exchange, table)
return p.connection.publishMessage(ctx, msg, key, p.exchange, table)
}
return fmt.Errorf("no routingkey configured for message of type %s", t)
}
Expand Down

0 comments on commit c45fc92

Please sign in to comment.