Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(MessageQueue/Measuremets): New mq.SetupOptions, Quorum Queue default, Batched Measurement inserts #124

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package env

import (
"fmt"
"log"
"os"
"strconv"
)

func Could(key, value string) string {
Expand All @@ -13,6 +15,18 @@ func Could(key, value string) string {
return v
}

func CouldInt(key string, fallback int) int {
str := os.Getenv(key)
if str == "" {
return fallback
}
v, err := strconv.Atoi(str)
if err != nil {
log.Fatalf("env %s should be an integer: %s\n", key, err.Error())
}
return v
}

func Must(key string) string {
v := os.Getenv(key)
if v == "" {
Expand Down
8 changes: 4 additions & 4 deletions pkg/mq/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func (c *AMQPConnection) UseConnection() <-chan *amqp.Connection {
return user
}

func (c *AMQPConnection) Consume(queue string, setup AMQPSetupFunc) <-chan amqp.Delivery {
return Consume(c, queue, setup)
func (c *AMQPConnection) Consume(queue string, setup ...SetupOption) <-chan amqp.Delivery {
return Consume(c, queue, setup...)
}

func (c *AMQPConnection) Publisher(xchg string, setup AMQPSetupFunc) chan<- PublishMessage {
return Publisher(c, xchg, setup)
func (c *AMQPConnection) Publisher(xchg string, setup ...SetupOption) chan<- PublishMessage {
return Publisher(c, xchg, setup...)
}
11 changes: 8 additions & 3 deletions pkg/mq/amqp_consumer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package mq

import (
"fmt"

amqp "github.com/rabbitmq/amqp091-go"
)

func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amqp.Delivery {
ch := make(chan amqp.Delivery, 10)
func Consume(conn *AMQPConnection, queue string, opts ...SetupOption) <-chan amqp.Delivery {
ch := make(chan amqp.Delivery, DefaultPrefetch())
newConnection := conn.UseConnection()

go func() {
Expand All @@ -17,15 +19,18 @@ func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amq
}
amqpChan, err := amqpConn.Channel()
if err != nil {
fmt.Printf("Error creating channel: %s\n", err.Error())
continue
}
err = setup(amqpChan)
err = setupChannel(amqpChan, opts)
if err != nil {
fmt.Printf("Error setting up consume channel: %s\n", err.Error())
continue
}

amqpDeliveryChan, err := amqpChan.Consume(queue, "", false, false, false, false, nil)
if err != nil {
fmt.Printf("Error consuming from consume channel: %s\n", err.Error())
continue
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/amqp_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type PublishMessage struct {
Publishing amqp.Publishing
}

func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- PublishMessage {
ch := make(chan PublishMessage, 10)
func Publisher(conn *AMQPConnection, xchg string, opts ...SetupOption) chan<- PublishMessage {
ch := make(chan PublishMessage, DefaultPrefetch())
newConnection := conn.UseConnection()

go func() {
Expand All @@ -30,7 +30,7 @@ func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- Pu
}
returns := make(chan amqp.Return)
amqpChan.NotifyReturn(returns)
err = setup(amqpChan)
err = setupChannel(amqpChan, opts)
if err != nil {
continue
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/mq/queue_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mq

import (
"errors"
"fmt"
"sync"

"github.com/rabbitmq/amqp091-go"
)

var ErrMalformed = errors.New("delivery malformed")

type (
ProcessorFunc = func(delivery amqp091.Delivery) error
ProcessorFuncBuilder = func() ProcessorFunc
)

// StartQueueProcessor opens a basic consume channel with a queue and exchange topic binding.
// Multiple workers will be started based on the prefetch count. Each worker will call the ProcessFuncBuilder,
// which allows a closure per worker, ie an instantiation of a variable for that worker.
// The processor function will be called for each message received from the queue.
// In case of an error, the message will be requeued unless the error wraps mq.ErrMalformed
// The processFunc parameter is a builder which will be called for
func StartQueueProcessor(conn *AMQPConnection, queue, exchange, topic string, processFunc ProcessorFuncBuilder) {
var wg sync.WaitGroup

consume := conn.Consume(queue, WithDefaults(), WithTopicBinding(queue, exchange, topic))

for i := 0; i < DefaultPrefetch(); i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
process := processFunc()
for delivery := range consume {
err := process(delivery)
if err != nil {
fmt.Printf("Error: QueueProcessorFunc failed: %s\n", err.Error())
// Only requeue if err is not an ErrMalformed and it is not already redelivered
requeue := !errors.Is(err, ErrMalformed) && !delivery.Redelivered
if err := delivery.Nack(false, requeue); err != nil {
fmt.Printf("Error: could not NAck delivery: %s\n", err.Error())
}
continue
}

if err := delivery.Ack(false); err != nil {
fmt.Printf("Error: could not Ack delivery: %s\n", err.Error())
}
}
}(i)
}

wg.Wait()
}
80 changes: 80 additions & 0 deletions pkg/mq/setup_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mq

import (
"fmt"
"log"
"os"
"strconv"

"github.com/rabbitmq/amqp091-go"
)

var defaultPrefetchCount int = 50

func DefaultPrefetch() int {
return defaultPrefetchCount
}

type SetupOpts struct {
Queue string
Exchange string
Topic string
}

type SetupOption func(c *amqp091.Channel) error

func setupChannel(c *amqp091.Channel, opts []SetupOption) error {
for _, f := range opts {
if err := f(c); err != nil {
return err
}
}
return nil
}

func WithDefaults() SetupOption {
return func(c *amqp091.Channel) error {
return c.Qos(defaultPrefetchCount, 0, false)
}
}

func WithTopicBinding(queue, exchange, topic string) SetupOption {
return func(c *amqp091.Channel) error {
_, err := c.QueueDeclare(queue, true, false, false, false, amqp091.Table{
"x-queue-type": "quorum",
})
if err != nil {
return fmt.Errorf("error declaring amqp queue: %w", err)
}
err = c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
err = c.QueueBind(queue, topic, exchange, false, nil)
if err != nil {
return fmt.Errorf("error binding amqp queue to exchange: %w", err)
}
return nil
}
}

func WithExchange(exchange string) SetupOption {
return func(c *amqp091.Channel) error {
err := c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
return nil
}
}

func init() {
prefetchStr, ok := os.LookupEnv("AMQP_PREFETCH")
if ok {
prefetch, err := strconv.Atoi(prefetchStr)
if err != nil {
log.Fatalf("AMQP_PREFETCH env set but not a number: %s\n", err.Error())
}
defaultPrefetchCount = prefetch
}
}
160 changes: 0 additions & 160 deletions pkg/worker/worker.go

This file was deleted.

Loading
Loading