Skip to content

Commit

Permalink
chore: updated some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-svensson committed Feb 13, 2024
1 parent b91ed5c commit 93709b3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 63 deletions.
74 changes: 38 additions & 36 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,26 @@ anything.

```go
conn, _ := goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/")
_ = conn.Start()
_ = conn.Start(context.Background())
_ = conn.Close()
```

To actually create `exchanges`, `queues` and `bindings` we must pass one or more `Setup` funcs to the `Start` method.

### Publishing

The `Publisher` is used to send a message (event) of a certain type.
The `Publisher` is used to send a messages (events) of a certain types.

```go
type AccountCreated struct {
Name string `json:"name"`
}

publisher := goamqp.Must(
goamqp.NewPublisher(goamqp.Route{
Type: AccountCreated{},
Key: "Account.Created",
}))


publisher := goamqp.NewPublisher()
conn.Start(
context.Background(),
goamqp.WithTypeMapping("Account.Created", AccountCreated{}),
)

```
This will create a `Publisher` that will publish `AccountCreated` messages with an associated routing key
Expand All @@ -69,14 +67,17 @@ In this simple case we send messages to the default `event stream` by using the
`EventStreamPublisher`.

```go
_ = conn.Start(goamqp.EventStreamPublisher(publisher))
conn.Start(
context.Background(),
goamqp.WithTypeMapping("Account.Created", AccountCreated{}),
goamqp.EventStreamPublisher(publisher))
```

Now, when we call `Start` entities will be created on the message broker. A new exchange called `events.topic.exchange`
will be created (if it doesn't already exist). Now when we do:

```go
_ = publisher.Publish(&AccountCreated{Name: "test"})
publisher.Publish(&AccountCreated{Name: "test"})
```
the `AccountCreated` struct will be marshalled into JSON:

Expand All @@ -96,12 +97,13 @@ Let's consume messages (in the same service, i.e. we send a message to ourselves
using the `Setup` func `EventStreamConsumer`.

```go
_ = conn.Start(
goamqp.EventStreamPublisher(publisher),
goamqp.EventStreamConsumer("Account.Created", func(msg any, headers goamqp.Headers) (response any, err error) {
fmt.Println("Message received")
return nil, nil
}, AccountCreated{}))
conn.Start(
context.Background(),
goamqp.EventStreamPublisher(publisher),
goamqp.EventStreamConsumer("Account.Created", func(ctx context.Context, event goamqp.ConsumableEvent[AccountCreated]) error {
fmt.Println("Message received")
return nil
}))

```

Expand All @@ -118,40 +120,40 @@ The complete simple program below will send (and receive) a message and print it
```
Message received &{test}
```

```go
package main

import (
"fmt"
"time"
"context"
"fmt"
"time"

"github.com/sparetimecoders/goamqp"
"github.com/sparetimecoders/goamqp"
)

type AccountCreated struct {
Name string `json:"name"`
Name string `json:"name"`
}

func main() {

publisher := goamqp.Must(
goamqp.NewPublisher(goamqp.Route{
Type: AccountCreated{},
Key: "Account.Created",
}))
publisher := goamqp.NewPublisher()

conn, _ := goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/")
conn := goamqp.Must(goamqp.NewFromURL("our-service", "amqp://user:password@localhost:5672/"))

_ = conn.Start(
goamqp.EventStreamPublisher(publisher),
goamqp.EventStreamConsumer("Account.Created", func(msg any, headers goamqp.Headers) (response any, err error) {
fmt.Printf("Message received %s", msg)
return nil, nil
}, AccountCreated{}))
_ = conn.Start(
context.Background(),
goamqp.EventStreamPublisher(publisher),
goamqp.WithTypeMapping("Account.Created", AccountCreated{}),
goamqp.EventStreamConsumer("Account.Created", func(ctx context.Context, event goamqp.ConsumableEvent[AccountCreated]) error {
fmt.Printf("Message received %s", event.Payload.Name)
return nil
}))

_ = publisher.Publish(&AccountCreated{Name: "test"})
_ = publisher.Publish(context.Background(), &AccountCreated{Name: "test"})

time.Sleep(time.Second)
_ = conn.Close()
time.Sleep(time.Second)
_ = conn.Close()
}
```
44 changes: 17 additions & 27 deletions docs/message_processing.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,36 @@
## Message processing and error handling

The `HandlerFunc` is called by goamqp when a message arrive in a queue.
A registered `EventHandler` is called by goamqp when an event arrives on a queue.

```go
type HandlerFunc func(msg any, headers Headers) (response any, err error)
EventHandler[T any] func (ctx context.Context, event ConsumableEvent[T]) error
```

For most purposes an application is only interested in the `msg` parameter, which will be our consumed message. Most
implementations will look like this:
For most purposes a handler is just interested in the `event.Payload`.
You register a handler using one of the `..Consumer` functions, for example:

```go
func handler(msg any, headers Headers) (response any, err error) {
switch msg.(type) {
case *Message:
default:
fmt.Println("Unknown message type")
}
return nil, nil
goamqp.EventStreamConsumer("Order.Created", func(ctx context.Context, event goamqp.ConsumableEvent[Message]) error {
fmt.Printf("handled %s", event.Payload.Text)
return nil
})
```

The `msg` will be a pointer to the type specified when calling `EventStreamConsumer`, for example:
For request-response, use the `RequestResponseEventHandler` and register it with the `RequestResponseHandler`:

```go
EventStreamConsumer("Order.Created", handler, Message{})
RequestResponseHandler[T any, R any](routingKey string, handler RequestResponseEventHandler[T, R])
```

For normal event processing the returned `response` is ignored.
The same `HandlerFunc` is used for request-response handlers however, for example:
```go
RequestResponseHandler(routingKey, handleRequest, Request{})

func handleRequest(msg any, headers Headers) (response any, err error) {
return Response{}}, nil
goamqp.RequestResponseHandler("req.resp", func (ctx context.Context, event goamqp.ConsumableEvent[Request]) (Response, error) {
return Response{Output: event.Payload.Input}, nil
})
```

And in this case the returned `response` (`Response{}` in the code above) will be returned to the calling service.
Returning `nil` as error will Acknowledge the message and it will be removed from the queue.
### Errors

If anything but `nil` is returned from `HandlerFunc` the message will be rejected and requeued (which means that it will
be processed again).
If anything but `nil` is returned from `EventHandler` the event is considered Not Acknowledge and re-queued (which means
that it will be processed again).

If goamqp fails to unmarshal the JSON content in the message, the message will be rejected and **not** requeued again.
If unmarshal the JSON payload in the event, the event will be rejected but **not** re-queued again.

0 comments on commit 93709b3

Please sign in to comment.