diff --git a/docs/README.md b/docs/README.md index d3c4a37..a7c310f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -34,7 +34,7 @@ anything. ```go conn, _ := goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/") -_ = conn.Start() +_ = conn.Start(context.Background()) _ = conn.Close() ``` @@ -42,20 +42,18 @@ To actually create `exchanges`, `queues` and `bindings` we must pass one or more ### Publishing -The `Publisher` is used to send a message (event) of a certain type. +The `Publisher` is used to send a messages (events) of a certain types. ```go type AccountCreated struct { Name string `json:"name"` } -publisher := goamqp.Must( - goamqp.NewPublisher(goamqp.Route{ - Type: AccountCreated{}, - Key: "Account.Created", - })) - - +publisher := goamqp.NewPublisher() +conn.Start( + context.Background(), + goamqp.WithTypeMapping("Account.Created", AccountCreated{}), +) ``` This will create a `Publisher` that will publish `AccountCreated` messages with an associated routing key @@ -69,14 +67,17 @@ In this simple case we send messages to the default `event stream` by using the `EventStreamPublisher`. ```go -_ = conn.Start(goamqp.EventStreamPublisher(publisher)) +conn.Start( + context.Background(), + goamqp.WithTypeMapping("Account.Created", AccountCreated{}), + goamqp.EventStreamPublisher(publisher)) ``` Now, when we call `Start` entities will be created on the message broker. A new exchange called `events.topic.exchange` will be created (if it doesn't already exist). Now when we do: ```go -_ = publisher.Publish(&AccountCreated{Name: "test"}) +publisher.Publish(&AccountCreated{Name: "test"}) ``` the `AccountCreated` struct will be marshalled into JSON: @@ -96,12 +97,13 @@ Let's consume messages (in the same service, i.e. we send a message to ourselves using the `Setup` func `EventStreamConsumer`. ```go -_ = conn.Start( - goamqp.EventStreamPublisher(publisher), - goamqp.EventStreamConsumer("Account.Created", func(msg any, headers goamqp.Headers) (response any, err error) { - fmt.Println("Message received") - return nil, nil - }, AccountCreated{})) +conn.Start( + context.Background(), + goamqp.EventStreamPublisher(publisher), + goamqp.EventStreamConsumer("Account.Created", func(ctx context.Context, event goamqp.ConsumableEvent[AccountCreated]) error { + fmt.Println("Message received") + return nil + })) ``` @@ -118,40 +120,40 @@ The complete simple program below will send (and receive) a message and print it ``` Message received &{test} ``` + ```go package main import ( - "fmt" - "time" + "context" + "fmt" + "time" - "github.com/sparetimecoders/goamqp" + "github.com/sparetimecoders/goamqp" ) type AccountCreated struct { - Name string `json:"name"` + Name string `json:"name"` } func main() { - publisher := goamqp.Must( - goamqp.NewPublisher(goamqp.Route{ - Type: AccountCreated{}, - Key: "Account.Created", - })) + publisher := goamqp.NewPublisher() - conn, _ := goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/") + conn := goamqp.Must(goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/")) - _ = conn.Start( - goamqp.EventStreamPublisher(publisher), - goamqp.EventStreamConsumer("Account.Created", func(msg any, headers goamqp.Headers) (response any, err error) { - fmt.Printf("Message received %s", msg) - return nil, nil - }, AccountCreated{})) + _ = conn.Start( + context.Background(), + goamqp.EventStreamPublisher(publisher), + goamqp.WithTypeMapping("Account.Created", AccountCreated{}), + goamqp.EventStreamConsumer("Account.Created", func(ctx context.Context, event goamqp.ConsumableEvent[AccountCreated]) error { + fmt.Printf("Message received %s", event.Payload.Name) + return nil + })) - _ = publisher.Publish(&AccountCreated{Name: "test"}) + _ = publisher.Publish(context.Background(), &AccountCreated{Name: "test"}) - time.Sleep(time.Second) - _ = conn.Close() + time.Sleep(time.Second) + _ = conn.Close() } ``` diff --git a/docs/message_processing.md b/docs/message_processing.md index ce2d118..5b87fe6 100644 --- a/docs/message_processing.md +++ b/docs/message_processing.md @@ -1,46 +1,36 @@ ## Message processing and error handling -The `HandlerFunc` is called by goamqp when a message arrive in a queue. +A registered `EventHandler` is called by goamqp when an event arrives on a queue. ```go -type HandlerFunc func(msg any, headers Headers) (response any, err error) +EventHandler[T any] func (ctx context.Context, event ConsumableEvent[T]) error ``` -For most purposes an application is only interested in the `msg` parameter, which will be our consumed message. Most -implementations will look like this: +For most purposes a handler is just interested in the `event.Payload`. +You register a handler using one of the `..Consumer` functions, for example: ```go -func handler(msg any, headers Headers) (response any, err error) { - switch msg.(type) { - case *Message: - default: - fmt.Println("Unknown message type") - } - return nil, nil +goamqp.EventStreamConsumer("Order.Created", func(ctx context.Context, event goamqp.ConsumableEvent[Message]) error { + fmt.Printf("handled %s", event.Payload.Text) + return nil +}) ``` -The `msg` will be a pointer to the type specified when calling `EventStreamConsumer`, for example: +For request-response, use the `RequestResponseEventHandler` and register it with the `RequestResponseHandler`: + ```go -EventStreamConsumer("Order.Created", handler, Message{}) +RequestResponseHandler[T any, R any](routingKey string, handler RequestResponseEventHandler[T, R]) ``` -For normal event processing the returned `response` is ignored. -The same `HandlerFunc` is used for request-response handlers however, for example: - ```go -RequestResponseHandler(routingKey, handleRequest, Request{}) - -func handleRequest(msg any, headers Headers) (response any, err error) { - return Response{}}, nil +goamqp.RequestResponseHandler("req.resp", func (ctx context.Context, event goamqp.ConsumableEvent[Request]) (Response, error) { + return Response{Output: event.Payload.Input}, nil +}) ``` -And in this case the returned `response` (`Response{}` in the code above) will be returned to the calling service. - -Returning `nil` as error will Acknowledge the message and it will be removed from the queue. - ### Errors -If anything but `nil` is returned from `HandlerFunc` the message will be rejected and requeued (which means that it will -be processed again). +If anything but `nil` is returned from `EventHandler` the event is considered Not Acknowledge and re-queued (which means +that it will be processed again). -If goamqp fails to unmarshal the JSON content in the message, the message will be rejected and **not** requeued again. +If unmarshal the JSON payload in the event, the event will be rejected but **not** re-queued again.