Skip to content

Commit

Permalink
Refactor(MessageQueue/Measuremets): New mq.SetupOptions, Quorum Queue…
Browse files Browse the repository at this point in the history
… default, Batched Measurement inserts (#124)

* chore: remove dead code

* refactor mq to use setup funcs

* Use QueueProcessors and quorum queues

* Fixes

* fix: tests

* style: Fix golangci lint issues

* fix: add logs and error logging
  • Loading branch information
TimVosch authored Jan 7, 2025
1 parent dcbc8eb commit af24d6a
Show file tree
Hide file tree
Showing 29 changed files with 684 additions and 1,192 deletions.
14 changes: 14 additions & 0 deletions internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package env

import (
"fmt"
"log"
"os"
"strconv"
)

func Could(key, value string) string {
Expand All @@ -13,6 +15,18 @@ func Could(key, value string) string {
return v
}

func CouldInt(key string, fallback int) int {
str := os.Getenv(key)
if str == "" {
return fallback
}
v, err := strconv.Atoi(str)
if err != nil {
log.Fatalf("env %s should be an integer: %s\n", key, err.Error())
}
return v
}

func Must(key string) string {
v := os.Getenv(key)
if v == "" {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mq/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func (c *AMQPConnection) UseConnection() <-chan *amqp.Connection {
return user
}

func (c *AMQPConnection) Consume(queue string, setup AMQPSetupFunc) <-chan amqp.Delivery {
return Consume(c, queue, setup)
func (c *AMQPConnection) Consume(queue string, setup ...SetupOption) <-chan amqp.Delivery {
return Consume(c, queue, setup...)
}

func (c *AMQPConnection) Publisher(xchg string, setup AMQPSetupFunc) chan<- PublishMessage {
return Publisher(c, xchg, setup)
func (c *AMQPConnection) Publisher(xchg string, setup ...SetupOption) chan<- PublishMessage {
return Publisher(c, xchg, setup...)
}
11 changes: 8 additions & 3 deletions pkg/mq/amqp_consumer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package mq

import (
"fmt"

amqp "github.com/rabbitmq/amqp091-go"
)

func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amqp.Delivery {
ch := make(chan amqp.Delivery, 10)
func Consume(conn *AMQPConnection, queue string, opts ...SetupOption) <-chan amqp.Delivery {
ch := make(chan amqp.Delivery, DefaultPrefetch())
newConnection := conn.UseConnection()

go func() {
Expand All @@ -17,15 +19,18 @@ func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amq
}
amqpChan, err := amqpConn.Channel()
if err != nil {
fmt.Printf("Error creating channel: %s\n", err.Error())
continue
}
err = setup(amqpChan)
err = setupChannel(amqpChan, opts)
if err != nil {
fmt.Printf("Error setting up consume channel: %s\n", err.Error())
continue
}

amqpDeliveryChan, err := amqpChan.Consume(queue, "", false, false, false, false, nil)
if err != nil {
fmt.Printf("Error consuming from consume channel: %s\n", err.Error())
continue
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/amqp_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type PublishMessage struct {
Publishing amqp.Publishing
}

func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- PublishMessage {
ch := make(chan PublishMessage, 10)
func Publisher(conn *AMQPConnection, xchg string, opts ...SetupOption) chan<- PublishMessage {
ch := make(chan PublishMessage, DefaultPrefetch())
newConnection := conn.UseConnection()

go func() {
Expand All @@ -30,7 +30,7 @@ func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- Pu
}
returns := make(chan amqp.Return)
amqpChan.NotifyReturn(returns)
err = setup(amqpChan)
err = setupChannel(amqpChan, opts)
if err != nil {
continue
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/mq/queue_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mq

import (
"errors"
"fmt"
"sync"

"github.com/rabbitmq/amqp091-go"
)

var ErrMalformed = errors.New("delivery malformed")

type (
ProcessorFunc = func(delivery amqp091.Delivery) error
ProcessorFuncBuilder = func() ProcessorFunc
)

// StartQueueProcessor opens a basic consume channel with a queue and exchange topic binding.
// Multiple workers will be started based on the prefetch count. Each worker will call the ProcessFuncBuilder,
// which allows a closure per worker, ie an instantiation of a variable for that worker.
// The processor function will be called for each message received from the queue.
// In case of an error, the message will be requeued unless the error wraps mq.ErrMalformed
// The processFunc parameter is a builder which will be called for
func StartQueueProcessor(conn *AMQPConnection, queue, exchange, topic string, processFunc ProcessorFuncBuilder) {
var wg sync.WaitGroup

consume := conn.Consume(queue, WithDefaults(), WithTopicBinding(queue, exchange, topic))

for i := 0; i < DefaultPrefetch(); i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
process := processFunc()
for delivery := range consume {
err := process(delivery)
if err != nil {
fmt.Printf("Error: QueueProcessorFunc failed: %s\n", err.Error())
// Only requeue if err is not an ErrMalformed and it is not already redelivered
requeue := !errors.Is(err, ErrMalformed) && !delivery.Redelivered
if err := delivery.Nack(false, requeue); err != nil {
fmt.Printf("Error: could not NAck delivery: %s\n", err.Error())
}
continue
}

if err := delivery.Ack(false); err != nil {
fmt.Printf("Error: could not Ack delivery: %s\n", err.Error())
}
}
}(i)
}

wg.Wait()
}
80 changes: 80 additions & 0 deletions pkg/mq/setup_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mq

import (
"fmt"
"log"
"os"
"strconv"

"github.com/rabbitmq/amqp091-go"
)

var defaultPrefetchCount int = 50

func DefaultPrefetch() int {
return defaultPrefetchCount
}

type SetupOpts struct {
Queue string
Exchange string
Topic string
}

type SetupOption func(c *amqp091.Channel) error

func setupChannel(c *amqp091.Channel, opts []SetupOption) error {
for _, f := range opts {
if err := f(c); err != nil {
return err
}
}
return nil
}

func WithDefaults() SetupOption {
return func(c *amqp091.Channel) error {
return c.Qos(defaultPrefetchCount, 0, false)
}
}

func WithTopicBinding(queue, exchange, topic string) SetupOption {
return func(c *amqp091.Channel) error {
_, err := c.QueueDeclare(queue, true, false, false, false, amqp091.Table{
"x-queue-type": "quorum",
})
if err != nil {
return fmt.Errorf("error declaring amqp queue: %w", err)
}
err = c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
err = c.QueueBind(queue, topic, exchange, false, nil)
if err != nil {
return fmt.Errorf("error binding amqp queue to exchange: %w", err)
}
return nil
}
}

func WithExchange(exchange string) SetupOption {
return func(c *amqp091.Channel) error {
err := c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
return nil
}
}

func init() {
prefetchStr, ok := os.LookupEnv("AMQP_PREFETCH")
if ok {
prefetch, err := strconv.Atoi(prefetchStr)
if err != nil {
log.Fatalf("AMQP_PREFETCH env set but not a number: %s\n", err.Error())
}
defaultPrefetchCount = prefetch
}
}
160 changes: 0 additions & 160 deletions pkg/worker/worker.go

This file was deleted.

Loading

0 comments on commit af24d6a

Please sign in to comment.