Skip to content

Commit

Permalink
Fix(core / tracing): Auto bind queue to exchange (#75)
Browse files Browse the repository at this point in the history
* Fix(core/measurement): Auto bind storage to exchange

* Fix(tracing): Auto bind pipeline messages queue to xchg

* Add context to err messages
  • Loading branch information
TimVosch authored Jan 11, 2024
1 parent 31cebf4 commit 18d69f9
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 38 deletions.
1 change: 1 addition & 0 deletions docs/development/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The Core service performs multiple core functions in SensorBucket:
| DB_DSN | The PostgreSQL connection string | yes | |
| AMQP_HOST | The RabbitMQ host | yes | |
| AMQP_QUEUE_MEASUREMENTS | Queue from which to read measurements that need to be stored | no | measurements |
| AMQP_XCHG_MEASUREMENTS_TOPIC| The RabbitMQ exchange topic for incoming measurement storage | no | storage |
| AMQP_QUEUE_INGRESS | Queue from which to read new incoming raw data | no | core-ingress |
| AMQP_XCHG_INGRESS | The RabbitMQ exchange for incoming raw data | no | ingress |
| AMQP_XCHG_INGRESS_TOPIC | The RabbitMQ exchange topic for incoming raw data | no | ingress.* |
Expand Down
5 changes: 3 additions & 2 deletions docs/development/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ The tracing service requires the following environment variables to be set in or
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------- | -------- | --------------- |
| DB_DSN | The connection string for the PostgreSQL database | Yes | |
| AMQP_HOST | The RabbitMQ host | Yes | |
| AMQP_QUEUE_PIPELINEMESSAGES | The queue on which pipeline messages appear | Yes | |
| AMQP_QUEUE_ERRORS | The queue on which any errors produced by workers appear | Yes | |
| AMQP_QUEUE_PIPELINEMESSAGES | The queue on which pipeline messages appear | Yes | tracing_pipeline_messages |
| AMQP_QUEUE_INGRESS | The topic on which new datapoints will appear | No | archive-ingress |
| AMQP_XCHG_INGRESS | The exchange on which ingress messages will appear. The tracing service will declare the exchange if it doesn't exist yet | No | ingress |
| AMQP_XCHG_INGRESS_TOPIC | The topic on the exhange where ingress messages will be read from | No | ingress.* |
| AMQP_XCHG_PIPELINEMESSAGES| The exchange on which all pipeline messages are published | Yes | pipeline.messages |
| AMQP_XCHG_PIPELINEMESSAGES_TOPIC | The binding between the pipeline messages exchange and the queue | Yes | # |

## Domain

Expand Down
32 changes: 20 additions & 12 deletions services/core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ import (
)

var (
DB_DSN = env.Must("DB_DSN")
AMQP_HOST = env.Must("AMQP_HOST")
AMQP_QUEUE_MEASUREMENTS = env.Could("AMQP_QUEUE_MEASUREMENTS", "measurements")
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "core-ingress")
AMQP_QUEUE_ERRORS = env.Could("AMQP_QUEUE_ERRORS", "errors")
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
AMQP_XCHG_PIPELINE_MESSAGES = env.Could("AMQP_XCHG_PIPELINE_MESSAGES", "pipeline.messages")
HTTP_ADDR = env.Could("HTTP_ADDR", ":3000")
HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api")
SYS_ARCHIVE_TIME = env.Could("SYS_ARCHIVE_TIME", "30")
DB_DSN = env.Must("DB_DSN")
AMQP_HOST = env.Must("AMQP_HOST")
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
AMQP_XCHG_PIPELINE_MESSAGES = env.Could("AMQP_XCHG_PIPELINE_MESSAGES", "pipeline.messages")
AMQP_QUEUE_MEASUREMENTS = env.Could("AMQP_QUEUE_MEASUREMENTS", "measurements")
AMQP_XCHG_MEASUREMENTS_TOPIC = env.Could("AMQP_XCHG_MEASUREMENTS_TOPIC", "storage")
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "core-ingress")
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
AMQP_QUEUE_ERRORS = env.Could("AMQP_QUEUE_ERRORS", "errors")
HTTP_ADDR = env.Could("HTTP_ADDR", ":3000")
HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api")
SYS_ARCHIVE_TIME = env.Could("SYS_ARCHIVE_TIME", "30")
)

func main() {
Expand Down Expand Up @@ -92,7 +93,14 @@ func Run() error {
log.Printf("HTTP Listening: %s\n", httpsrv.Addr)

// Setup MQ Transports
measurementtransport.StartMQ(measurementservice, amqpConn, AMQP_QUEUE_MEASUREMENTS, AMQP_XCHG_PIPELINE_MESSAGES, AMQP_QUEUE_ERRORS)
measurementtransport.StartMQ(
measurementservice,
amqpConn,
AMQP_XCHG_PIPELINE_MESSAGES,
AMQP_QUEUE_MEASUREMENTS,
AMQP_XCHG_MEASUREMENTS_TOPIC,
AMQP_QUEUE_ERRORS,
)
go processingtransport.StartIngressDTOConsumer(
amqpConn,
processingservice,
Expand Down
35 changes: 27 additions & 8 deletions services/core/measurements/transport/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package measurementtransport
import (
"context"
"encoding/json"
"fmt"
"log"
"time"

Expand All @@ -13,14 +14,32 @@ import (
"sensorbucket.nl/sensorbucket/services/core/measurements"
)

func StartMQ(svc *measurements.Service, conn *mq.AMQPConnection, queue, xchg, errorTopic string) func() {
func StartMQ(
svc *measurements.Service,
conn *mq.AMQPConnection,
pipelineMessagesExchange,
measurementQueue,
measurementStorageTopic,
measurementErrorTopic string,
) func() {
done := make(chan struct{})
consume := mq.Consume(conn, queue, func(c *amqp091.Channel) error {
_, err := c.QueueDeclare(queue, true, false, false, false, nil)
return err
consume := mq.Consume(conn, measurementQueue, func(c *amqp091.Channel) error {
q, err := c.QueueDeclare(measurementQueue, true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp queue: %w", err)
}
err = c.ExchangeDeclare(pipelineMessagesExchange, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
err = c.QueueBind(q.Name, measurementStorageTopic, pipelineMessagesExchange, false, nil)
if err != nil {
return fmt.Errorf("error binding amqp queue to exchange: %w", err)
}
return nil
})
publish := mq.Publisher(conn, xchg, func(c *amqp091.Channel) error {
err := c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil)
publish := mq.Publisher(conn, pipelineMessagesExchange, func(c *amqp091.Channel) error {
err := c.ExchangeDeclare(pipelineMessagesExchange, "topic", true, false, false, false, nil)
return err
})

Expand Down Expand Up @@ -52,7 +71,7 @@ func StartMQ(svc *measurements.Service, conn *mq.AMQPConnection, queue, xchg, er
continue
}
publish <- mq.PublishMessage{
Topic: errorTopic,
Topic: measurementErrorTopic,
Publishing: amqp091.Publishing{
Body: msgErrorBytes,
},
Expand All @@ -62,7 +81,7 @@ func StartMQ(svc *measurements.Service, conn *mq.AMQPConnection, queue, xchg, er
}
msg.Ack(false)
case <-done:
break
return
}
}
}()
Expand Down
27 changes: 17 additions & 10 deletions services/tracing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
)

var (
HTTP_ADDR = env.Could("HTTP_ADDR", ":3000")
HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api")
DB_DSN = env.Must("DB_DSN")
AMQP_HOST = env.Must("AMQP_HOST")
AMQP_QUEUE_PIPELINEMESSAGES = env.Must("AMQP_QUEUE_PIPELINEMESSAGES")
AMQP_QUEUE_ERRORS = env.Must("AMQP_QUEUE_ERRORS")
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "archive-ingress")
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
HTTP_ADDR = env.Could("HTTP_ADDR", ":3000")
HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api")
DB_DSN = env.Must("DB_DSN")
AMQP_HOST = env.Must("AMQP_HOST")
AMQP_QUEUE_PIPELINEMESSAGES = env.Could("AMQP_QUEUE_PIPELINEMESSAGES", "tracing_pipeline_messages")
AMQP_XCHG_PIPELINEMESSAGES = env.Could("AMQP_XCHG_PIPELINEMESSAGES", "pipeline.messages")
AMQP_XCHG_PIPELINEMESSAGES_TOPIC = env.Could("AMQP_XCHG_PIPELINEMESSAGES_TOPIC", "#")
AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "archive-ingress")
AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress")
AMQP_XCHG_INGRESS_TOPIC = env.Could("AMQP_XCHG_INGRESS_TOPIC", "ingress.*")
)

func main() {
Expand Down Expand Up @@ -66,7 +67,13 @@ func main() {
go func() {
tracingStepStore := tracinginfra.NewStorePSQL(db)
tracingService := tracing.New(tracingStepStore)
go tracingtransport.StartMQ(tracingService, mqConn, AMQP_QUEUE_ERRORS, AMQP_QUEUE_PIPELINEMESSAGES)
go tracingtransport.StartMQ(
tracingService,
mqConn,
AMQP_QUEUE_PIPELINEMESSAGES,
AMQP_XCHG_PIPELINEMESSAGES,
AMQP_XCHG_PIPELINEMESSAGES_TOPIC,
)
tracinghttp := tracingtransport.NewHTTP(tracingService, HTTP_BASE)
tracinghttp.SetupRoutes(r)
}()
Expand Down
24 changes: 18 additions & 6 deletions services/tracing/tracing/transport/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tracingtransport

import (
"encoding/json"
"fmt"
"log"
"time"

Expand All @@ -12,14 +13,14 @@ import (
"sensorbucket.nl/sensorbucket/services/tracing/tracing"
)

func StartMQ(svc *tracing.Service, conn *mq.AMQPConnection, errQueue string, queue string) {
pipelineMessages := mq.Consume(conn, queue, setupFunc(queue))
func StartMQ(svc *tracing.Service, conn *mq.AMQPConnection, queue, xchg, topic string) {
pipelineMessages := mq.Consume(conn, queue, setupFunc(queue, xchg, topic))

log.Println("Measurement MQ Transport running")
go processMessage(queue, pipelineMessages, svc)
go processMessage(pipelineMessages, svc)
}

func processMessage(queue string, deliveries <-chan amqp091.Delivery, svc *tracing.Service) {
func processMessage(deliveries <-chan amqp091.Delivery, svc *tracing.Service) {
log.Println("Measurement MQ Transport running, tracing pipeline errors...")
for msg := range deliveries {
tsHeader, ok := msg.Headers["timestamp"]
Expand Down Expand Up @@ -72,9 +73,20 @@ func processMessage(queue string, deliveries <-chan amqp091.Delivery, svc *traci
}
}

func setupFunc(queue string) mq.AMQPSetupFunc {
func setupFunc(queue, xchg, topic string) mq.AMQPSetupFunc {
return func(c *amqp091.Channel) error {
_, err := c.QueueDeclare(queue, true, false, false, false, nil)
return err
if err != nil {
return fmt.Errorf("error declaring amqp queue: %w", err)
}
err = c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("error declaring amqp exchange: %w", err)
}
err = c.QueueBind(queue, topic, xchg, false, nil)
if err != nil {
return fmt.Errorf("error binding amqp queue to exchange: %w", err)
}
return nil
}
}

0 comments on commit 18d69f9

Please sign in to comment.