From e17590aebb770d489f2fbc2060c207b5415d180a Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Tue, 17 Dec 2024 11:18:33 +0100 Subject: [PATCH 1/7] chore: remove dead code --- pkg/worker/worker.go | 160 ------------------------------- pkg/worker/worker_test.go | 193 -------------------------------------- 2 files changed, 353 deletions(-) delete mode 100644 pkg/worker/worker.go delete mode 100644 pkg/worker/worker_test.go diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go deleted file mode 100644 index e6524af1..00000000 --- a/pkg/worker/worker.go +++ /dev/null @@ -1,160 +0,0 @@ -package worker - -import ( - "encoding/json" - "errors" - "fmt" - "log" - "strconv" - - "github.com/rabbitmq/amqp091-go" - - "sensorbucket.nl/sensorbucket/internal/env" - "sensorbucket.nl/sensorbucket/pkg/mq" - "sensorbucket.nl/sensorbucket/pkg/pipeline" -) - -// errors -var ErrNoDeviceMatch = errors.New("no device in device service matches EUI of uplink") - -func NewWorker(name string, version string, processsor processor) *worker { - // First ensure all the required env variables are present - w := worker{ - id: fmt.Sprintf("%s@%s-(%s)", name, version, env.Must("HOSTNAME")), - mqQueue: env.Must("AMQP_QUEUE"), - mqErrTopic: env.Must("AMQP_ERR_TOPIC"), - mqHost: env.Must("AMQP_HOST"), - mqXchg: env.Must("AMQP_XCHG"), - mqPrefetch: env.Could("AMQP_PREFETCH", "5"), - } - - prefetch, err := strconv.Atoi(w.mqPrefetch) - if err != nil { - panic(err) - } - conn := mq.NewConnection(w.mqHost) - publisher := conn.Publisher(w.mqXchg, func(c *amqp091.Channel) error { - return c.ExchangeDeclare(w.mqXchg, "topic", true, false, false, false, nil) - }) - consumer := conn.Consume(w.mqQueue, func(c *amqp091.Channel) error { - if _, err := c.QueueDeclare(w.mqQueue, true, false, false, false, amqp091.Table{}); err != nil { - return err - } - if err := c.Qos(prefetch, 0, true); err != nil { - return err - } - return nil - }) - cancelToken := make(chan any, 1) - - go conn.Start() - go func(conn *mq.AMQPConnection) { - // Whenever a value is put in the cancelToken, shutdown the AMQP connnection - <-cancelToken - conn.Shutdown() - }(conn) - - w.processor = processsor - w.cancelToken = cancelToken - w.publisher = publisher - w.consumer = consumer - - return &w -} - -// Will run the given processor. Any returned message will be sent to it's next defined step in the pipeline -func (w *worker) Run() { - processError := func(delivery amqp091.Delivery, incoming, result pipeline.Message, err error) { - if err := w.publishError(incoming, result, err); err != nil { - log.Printf("Error publishing error to queue: %v\n", err) - } - if err := delivery.Nack(false, false); err != nil { - log.Printf("Error Nacking delivery message: %v\n", err) - } - } - - // Await any messages that appear on the message queue - for delivery := range w.consumer { - var incoming pipeline.Message - if err := json.Unmarshal(delivery.Body, &incoming); err != nil { - log.Printf("Error converting delivery: %v\n", err) - processError(delivery, incoming, pipeline.Message{}, err) - continue - } - - // Once a message has been received, process it using the worker-unique processor - result, err := w.processor(incoming) - if err != nil { - log.Printf("Error processing delivery: %v\n", err) - processError(delivery, incoming, result, err) - continue - } - - // If the worker succesfully processed the result, publish it to the next message queue - topic, err := result.NextStep() - if err != nil { - log.Printf("Error getting next step: %v\n", err) - processError(delivery, incoming, result, err) - continue - } - msgJSON, err := json.Marshal(result) - if err != nil { - log.Printf("Error marshalling result: %v\n", err) - processError(delivery, incoming, result, err) - continue - } - w.publisher <- mq.PublishMessage{Topic: topic, Publishing: amqp091.Publishing{ - MessageId: result.TracingID, - Body: msgJSON, - }} - - // The message was succesfully handled, ack the message. - if err := delivery.Ack(false); err != nil { - log.Printf("Error Acking delivery message: %v\n", err) - } - } - - // Shutdown the MQ connection - w.cancelToken <- true -} - -func (w *worker) publishError(message pipeline.Message, attempt pipeline.Message, err error) error { - errJSON, err := json.Marshal(pipeline.PipelineError{ - ReceivedByWorker: message, - ProcessingAttempt: attempt, - Timestamp: message.Timestamp, - Queue: w.mqQueue, - Worker: w.id, - Error: err.Error(), - }) - if err != nil { - return fmt.Errorf("could not marshal json: %w", err) - } - w.publisher <- mq.PublishMessage{Topic: w.mqErrTopic, Publishing: amqp091.Publishing{ - Body: errJSON, - }} - return nil -} - -type worker struct { - // Worker info - id string - - // MQ settings - mqHost string - mqQueue string - mqErrTopic string - mqXchg string - mqPrefetch string - - processor processor - cancelToken chan any - publisher publisher - consumer consumer -} - -type ( - processor func(pipeline.Message) (pipeline.Message, error) - publisher chan<- mq.PublishMessage - consumer <-chan amqp091.Delivery -) diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go deleted file mode 100644 index f36cf8a0..00000000 --- a/pkg/worker/worker_test.go +++ /dev/null @@ -1,193 +0,0 @@ -package worker - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/google/uuid" - "github.com/rabbitmq/amqp091-go" - "github.com/stretchr/testify/suite" - - "sensorbucket.nl/sensorbucket/pkg/mq" - "sensorbucket.nl/sensorbucket/pkg/pipeline" -) - -func TestWorkerSuite(t *testing.T) { - suite.Run(t, new(workerSuite)) -} - -func (s *workerSuite) TestWorkerProcessorReturnsAnError() { - // Arrange - acker := &ackMock{} - publisher := make(chan mq.PublishMessage) - consumer := make(chan amqp091.Delivery) - id := uuid.NewString() - incomingMessage := pipeline.Message{ - TracingID: id, - } - expectedMessage := pipeline.PipelineError{ - ReceivedByWorker: pipeline.Message{ - TracingID: id, - }, - Worker: "some-worker", - Queue: "message came from this topic", - Error: "unexpected error occurred!!", - } - w := worker{ - id: "some-worker", - mqErrTopic: "this is an error topic", - mqQueue: "message came from this topic", - publisher: publisher, - consumer: consumer, - processor: func(m pipeline.Message) (pipeline.Message, error) { - return pipeline.Message{}, fmt.Errorf("unexpected error occurred!!") - }, - cancelToken: make(chan any), - } - - // Act - go w.Run() - consumer <- amqp091.Delivery{ - Acknowledger: acker, - Body: toBytes(incomingMessage), - } - result := <-publisher - close(consumer) - <-w.cancelToken - - // Assert - s.Equal(0, acker.ackCalled) - s.Equal(1, acker.nackCalled) - s.Equal(0, acker.rejectCalled) - s.Equal(mq.PublishMessage{ - Topic: "this is an error topic", - Publishing: amqp091.Publishing{ - Body: toBytes(expectedMessage), - }, - }, result) -} - -func (s *workerSuite) TestIncomingMessageIsInvalidJson() { - // Arrange - acker := &ackMock{} - publisher := make(chan mq.PublishMessage) - consumer := make(chan amqp091.Delivery) - expectedMessage := pipeline.PipelineError{ - Worker: "some-worker", - ReceivedByWorker: pipeline.Message{}, - Queue: "message came from this topic", - Error: "json: cannot unmarshal string into Go value of type pipeline.Message", - } - w := worker{ - id: "some-worker", - mqQueue: "message came from this topic", - mqErrTopic: "this is an error topic", - publisher: publisher, - consumer: consumer, - cancelToken: make(chan any), - } - - // Act - go w.Run() - consumer <- amqp091.Delivery{ - Acknowledger: acker, - Body: toBytes("broken json!!"), - } - result := <-publisher - close(consumer) - <-w.cancelToken - - // Assert - s.Equal(0, acker.ackCalled) - s.Equal(1, acker.nackCalled) - s.Equal(0, acker.rejectCalled) - s.Equal(mq.PublishMessage{ - Topic: "this is an error topic", - Publishing: amqp091.Publishing{ - Body: toBytes(expectedMessage), - }, - }, result) -} - -func (s *workerSuite) TestIncomingMessageWithNextStep() { - // Arrange - acker := &ackMock{} - publisher := make(chan mq.PublishMessage) - consumer := make(chan amqp091.Delivery) - incomingMessage := pipeline.Message{ - TracingID: "very-unique-id", - StepIndex: 0, - PipelineSteps: []string{"step1", "step2"}, - Measurements: []pipeline.Measurement{}, - } - expectedMessage := pipeline.Message{ - TracingID: "very-unique-id", - StepIndex: 1, - PipelineSteps: []string{"step1", "step2"}, - Measurements: []pipeline.Measurement{}, - } - w := worker{ - publisher: publisher, - consumer: consumer, - processor: func(m pipeline.Message) (pipeline.Message, error) { - return m, nil - }, - cancelToken: make(chan any), - } - - // Act - go w.Run() - consumer <- amqp091.Delivery{ - Acknowledger: acker, - Body: toBytes(incomingMessage), - } - result := <-publisher - close(consumer) - <-w.cancelToken - - // Assert - s.Equal(1, acker.ackCalled) - s.Equal(0, acker.nackCalled) - s.Equal(0, acker.rejectCalled) - s.Equal(mq.PublishMessage{ - Topic: "step2", - Publishing: amqp091.Publishing{ - MessageId: expectedMessage.TracingID, - Body: toBytes(expectedMessage), - }, - }, result) -} - -type workerSuite struct { - suite.Suite -} - -func toBytes[T interface{}](val T) []byte { - b, err := json.Marshal(&val) - if err != nil { - panic(err) - } - return b -} - -type ackMock struct { - ackCalled int - nackCalled int - rejectCalled int -} - -func (m *ackMock) Ack(tag uint64, multiple bool) error { - m.ackCalled++ - return nil -} - -func (m *ackMock) Nack(tag uint64, multiple bool, requeue bool) error { - m.nackCalled++ - return nil -} - -func (m *ackMock) Reject(tag uint64, requeue bool) error { - m.rejectCalled++ - return nil -} From 37bc9fbb11adbfdf4c16d57d71a244da4525b609 Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Fri, 3 Jan 2025 11:52:02 +0100 Subject: [PATCH 2/7] refactor mq to use setup funcs --- internal/env/env.go | 14 + pkg/mq/amqp_connection.go | 4 +- pkg/mq/amqp_consumer.go | 6 +- pkg/mq/amqp_publisher.go | 6 +- pkg/mq/queue_processor.go | 47 +++ pkg/mq/setup_builder.go | 85 ++++ services/core/main.go | 15 +- services/core/measurements/application.go | 143 ++----- .../core/measurements/application_test.go | 87 ++-- services/core/measurements/datastreams.go | 24 -- .../core/measurements/datastreams_test.go | 43 -- .../core/measurements/infra/store_psql.go | 105 +++++ services/core/measurements/measurements.go | 13 +- services/core/measurements/mock_test.go | 388 ++++++++---------- .../core/measurements/pipeline_message.go | 106 +++++ services/core/measurements/transport/mq.go | 31 +- .../transport/ingressDTOConsumer.go | 56 --- services/fission-rmq-connector/main.go | 43 +- services/tracing/tracing/transport/mq.go | 22 - 19 files changed, 638 insertions(+), 600 deletions(-) create mode 100644 pkg/mq/queue_processor.go create mode 100644 pkg/mq/setup_builder.go delete mode 100644 services/core/measurements/datastreams_test.go create mode 100644 services/core/measurements/pipeline_message.go delete mode 100644 services/core/processing/transport/ingressDTOConsumer.go diff --git a/internal/env/env.go b/internal/env/env.go index f6a8078c..7a908a14 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -2,7 +2,9 @@ package env import ( "fmt" + "log" "os" + "strconv" ) func Could(key, value string) string { @@ -13,6 +15,18 @@ func Could(key, value string) string { return v } +func CouldInt(key string, fallback int) int { + str := os.Getenv(key) + if str == "" { + return fallback + } + v, err := strconv.Atoi(str) + if err != nil { + log.Fatalf("env %s should be an integer: %s\n", key, err.Error()) + } + return v +} + func Must(key string) string { v := os.Getenv(key) if v == "" { diff --git a/pkg/mq/amqp_connection.go b/pkg/mq/amqp_connection.go index 04fa5aef..1090adca 100644 --- a/pkg/mq/amqp_connection.go +++ b/pkg/mq/amqp_connection.go @@ -134,10 +134,10 @@ func (c *AMQPConnection) UseConnection() <-chan *amqp.Connection { return user } -func (c *AMQPConnection) Consume(queue string, setup AMQPSetupFunc) <-chan amqp.Delivery { +func (c *AMQPConnection) Consume(queue string, setup ...SetupOption) <-chan amqp.Delivery { return Consume(c, queue, setup) } -func (c *AMQPConnection) Publisher(xchg string, setup AMQPSetupFunc) chan<- PublishMessage { +func (c *AMQPConnection) Publisher(xchg string, setup ...SetupOption) chan<- PublishMessage { return Publisher(c, xchg, setup) } diff --git a/pkg/mq/amqp_consumer.go b/pkg/mq/amqp_consumer.go index 92f8f7e0..30b08213 100644 --- a/pkg/mq/amqp_consumer.go +++ b/pkg/mq/amqp_consumer.go @@ -4,8 +4,8 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amqp.Delivery { - ch := make(chan amqp.Delivery, 10) +func Consume(conn *AMQPConnection, queue string, opts ...SetupOption) <-chan amqp.Delivery { + ch := make(chan amqp.Delivery, DefaultPrefetch()) newConnection := conn.UseConnection() go func() { @@ -19,7 +19,7 @@ func Consume(conn *AMQPConnection, queue string, setup AMQPSetupFunc) <-chan amq if err != nil { continue } - err = setup(amqpChan) + err = setupChannel(amqpChan, opts) if err != nil { continue } diff --git a/pkg/mq/amqp_publisher.go b/pkg/mq/amqp_publisher.go index 2a6c041b..5efdfdea 100644 --- a/pkg/mq/amqp_publisher.go +++ b/pkg/mq/amqp_publisher.go @@ -12,8 +12,8 @@ type PublishMessage struct { Publishing amqp.Publishing } -func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- PublishMessage { - ch := make(chan PublishMessage, 10) +func Publisher(conn *AMQPConnection, xchg string, opts ...SetupOption) chan<- PublishMessage { + ch := make(chan PublishMessage, DefaultPrefetch()) newConnection := conn.UseConnection() go func() { @@ -30,7 +30,7 @@ func Publisher(conn *AMQPConnection, xchg string, setup AMQPSetupFunc) chan<- Pu } returns := make(chan amqp.Return) amqpChan.NotifyReturn(returns) - err = setup(amqpChan) + err = setupChannel(amqpChan, opts) if err != nil { continue } diff --git a/pkg/mq/queue_processor.go b/pkg/mq/queue_processor.go new file mode 100644 index 00000000..1350f56c --- /dev/null +++ b/pkg/mq/queue_processor.go @@ -0,0 +1,47 @@ +package mq + +import ( + "encoding/json" + "fmt" + "sync" +) + +func StartQueueProcessor[T any](conn *AMQPConnection, queue, exchange, topic string, processFn func(T) error) { + var wg sync.WaitGroup + + consume := conn.Consume(queue, WithDefaults(), WithTopicBinding(queue, exchange, topic)) + + for i := 0; i < DefaultPrefetch(); i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + var dto T + + for delivery := range consume { + dto = *new(T) + + if err := json.Unmarshal(delivery.Body, &dto); err != nil { + fmt.Printf("Error unmarshalling ingress DTO: %v\n", err) + if err := delivery.Nack(false, false); err != nil { + fmt.Printf("Error Nacking ingress delivery: %v\n", err) + } + continue + } + + if err := processFn(dto); err != nil { + fmt.Printf("Error processing ingress DTO: %v\n", err) + if err := delivery.Nack(false, false); err != nil { + fmt.Printf("Error Nacking ingress delivery: %v\n", err) + } + continue + } + + if err := delivery.Ack(false); err != nil { + fmt.Printf("Error Nacking ingress delivery: %v\n", err) + } + } + }(i) + } + + wg.Wait() +} diff --git a/pkg/mq/setup_builder.go b/pkg/mq/setup_builder.go new file mode 100644 index 00000000..d9c97514 --- /dev/null +++ b/pkg/mq/setup_builder.go @@ -0,0 +1,85 @@ +package mq + +import ( + "fmt" + "log" + "os" + "strconv" + + "github.com/rabbitmq/amqp091-go" +) + +// Must set QOS +// Must default to Quoroum queue but offer alternative +// Must offer queue/xchg bind +// Must offer + +var defaultPrefetchCount int = 0 + +func DefaultPrefetch() int { + return defaultPrefetchCount +} + +type SetupOpts struct { + Queue string + Exchange string + Topic string +} + +type SetupOption func(c *amqp091.Channel) error + +func setupChannel(c *amqp091.Channel, opts []SetupOption) error { + for _, f := range opts { + if err := f(c); err != nil { + return err + } + } + return nil +} + +func WithDefaults() SetupOption { + return func(c *amqp091.Channel) error { + return c.Qos(defaultPrefetchCount, 0, false) + } +} + +func WithTopicBinding(queue, exchange, topic string) SetupOption { + return func(c *amqp091.Channel) error { + _, err := c.QueueDeclare(queue, true, false, false, false, amqp091.Table{ + "x-queue-type": "quorum", + }) + if err != nil { + return fmt.Errorf("error declaring amqp queue: %w", err) + } + err = c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil) + if err != nil { + return fmt.Errorf("error declaring amqp exchange: %w", err) + } + err = c.QueueBind(queue, topic, exchange, false, nil) + if err != nil { + return fmt.Errorf("error binding amqp queue to exchange: %w", err) + } + return nil + } +} + +func WithExchange(exchange string) SetupOption { + return func(c *amqp091.Channel) error { + err := c.ExchangeDeclare(exchange, "topic", true, false, false, false, nil) + if err != nil { + return fmt.Errorf("error declaring amqp exchange: %w", err) + } + return nil + } +} + +func init() { + prefetchStr, ok := os.LookupEnv("AMQP_PREFETCH") + if ok { + prefetch, err := strconv.Atoi(prefetchStr) + if err != nil { + log.Fatalf("AMQP_PREFETCH env set but not a number: %s\n", err.Error()) + } + defaultPrefetchCount = prefetch + } +} diff --git a/services/core/main.go b/services/core/main.go index 145b9c96..ef7b234d 100644 --- a/services/core/main.go +++ b/services/core/main.go @@ -26,11 +26,9 @@ import ( deviceinfra "sensorbucket.nl/sensorbucket/services/core/devices/infra" "sensorbucket.nl/sensorbucket/services/core/measurements" measurementsinfra "sensorbucket.nl/sensorbucket/services/core/measurements/infra" - measurementtransport "sensorbucket.nl/sensorbucket/services/core/measurements/transport" "sensorbucket.nl/sensorbucket/services/core/migrations" "sensorbucket.nl/sensorbucket/services/core/processing" processinginfra "sensorbucket.nl/sensorbucket/services/core/processing/infra" - processingtransport "sensorbucket.nl/sensorbucket/services/core/processing/transport" coretransport "sensorbucket.nl/sensorbucket/services/core/transport" ) @@ -124,22 +122,19 @@ func Run(cleanup cleanupper.Cleanupper) error { log.Printf("HTTP Listening: %s\n", httpsrv.Addr) // Setup MQ Transports - measurementtransport.StartMQ( - measurementservice, + go mq.StartQueueProcessor( amqpConn, - AMQP_XCHG_PIPELINE_MESSAGES, AMQP_QUEUE_MEASUREMENTS, + AMQP_XCHG_PIPELINE_MESSAGES, AMQP_XCHG_MEASUREMENTS_TOPIC, - AMQP_QUEUE_ERRORS, - prefetch, + measurementservice.StorePipelineMessage, ) - go processingtransport.StartIngressDTOConsumer( + go mq.StartQueueProcessor( amqpConn, - processingservice, AMQP_QUEUE_INGRESS, AMQP_XCHG_INGRESS, AMQP_XCHG_INGRESS_TOPIC, - prefetch, + processingservice.ProcessIngressDTO, ) go amqpConn.Start() diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index 9056ec3a..ad5ed819 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -2,172 +2,75 @@ package measurements import ( "context" - "fmt" - "log" "time" "github.com/google/uuid" - "github.com/samber/lo" "sensorbucket.nl/sensorbucket/internal/pagination" "sensorbucket.nl/sensorbucket/pkg/auth" - "sensorbucket.nl/sensorbucket/pkg/pipeline" - "sensorbucket.nl/sensorbucket/services/core/devices" ) -// iService is an interface for the service's exported interface, it can be used as a developer reference -type iService interface { - StoreMeasurement(context.Context, Measurement) error - StorePipelineMessage(pipeline.Message) error - QueryMeasurements(context.Context, Filter, pagination.Request) (*pagination.Page[Measurement], error) - ListDatastreams(ctx context.Context, filter DatastreamFilter, r pagination.Request) (*pagination.Page[Datastream], error) - GetDatastream(ctx context.Context, id uuid.UUID) (*Datastream, error) -} - -// Ensure Service implements iService -var _ iService = (*Service)(nil) - // Store stores measurement data type Store interface { - DatastreamFinderCreater - - Insert(Measurement) error Query(Filter, pagination.Request) (*pagination.Page[Measurement], error) ListDatastreams(DatastreamFilter, pagination.Request) (*pagination.Page[Datastream], error) GetDatastream(id uuid.UUID, filter DatastreamFilter) (*Datastream, error) } +type MeasurementStoreBuilder interface { + Begin() (MeasurementStorer, error) +} +type MeasurementStorer interface { + GetDatastream(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*Datastream, error) + AddMeasurements([]Measurement) error + Finish() error +} + // Service is the measurement service which stores measurement data. type Service struct { store Store + measurementStore MeasurementStoreBuilder systemArchiveTime int keyClient auth.JWKSClient } -func New(store Store, systemArchiveTime int, keyClient auth.JWKSClient) *Service { +func New(store Store, measurementStore MeasurementStoreBuilder, systemArchiveTime int, keyClient auth.JWKSClient) *Service { return &Service{ store: store, + measurementStore: measurementStore, systemArchiveTime: systemArchiveTime, keyClient: keyClient, } } -func (s *Service) StoreMeasurement(ctx context.Context, m Measurement) error { - if err := auth.MustHavePermissions(ctx, auth.Permissions{auth.WRITE_MEASUREMENTS}); err != nil { - return err - } - - if err := m.Validate(); err != nil { - return fmt.Errorf("validation failed for measurement: %w", err) - } - - return s.store.Insert(m) -} - -func (s *Service) StorePipelineMessage(msg pipeline.Message) error { - ctx, err := auth.AuthenticateContext(context.Background(), msg.AccessToken, s.keyClient) +func (s *Service) ProcessPipelineMessage(msg *PipelineMessage) error { + // Only error when internal error and not a business error + _, err := msg.Authorize(s.keyClient) if err != nil { return err } - if err := auth.MustHavePermissions(ctx, auth.Permissions{auth.WRITE_MEASUREMENTS}); err != nil { + if err := msg.Validate(); err != nil { return err } - // TODO: get organisation from context - - // Validate incoming message for completeness - if msg.Device == nil { - return ErrMissingDeviceInMeasurement - } - if len(msg.Measurements) == 0 { - log.Printf("[warn] got pipeline message (%v) but it has no measurements\n", msg.TracingID) - return nil - } - - for _, measurement := range msg.Measurements { - err := s.storePipelineMeasurement(ctx, msg, measurement) - if err != nil { - return err - } - } - - return nil -} - -func (s *Service) storePipelineMeasurement(ctx context.Context, msg pipeline.Message, m pipeline.Measurement) error { - dev := (*devices.Device)(msg.Device) - sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID) - if err != nil { - return fmt.Errorf("cannot get sensor: %w", err) - } - if sensor.ExternalID != m.SensorExternalID { - fmt.Printf("warning: no sensor found for external id '%s' on device id '%d' while storing pipeline measurements\n", m.SensorExternalID, msg.Device.ID) - m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty - } - - tenantID, err := auth.GetTenant(ctx) + storer, err := s.measurementStore.Begin() if err != nil { return err } - // Find or create datastream - ds, err := FindOrCreateDatastream(tenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement, s.store) + measurements, err := buildMeasurements(msg, storer, s.systemArchiveTime) if err != nil { return err } - // TODO: Get organisation archive time - // Time is by default in days - archiveTimeDays, _ := lo.Coalesce(sensor.ArchiveTime, &s.systemArchiveTime) // msg.Organisation.ArchiveTime) - - measurement := Measurement{ - UplinkMessageID: msg.TracingID, - - OrganisationID: int(msg.TenantID), - - DeviceID: msg.Device.ID, - DeviceCode: msg.Device.Code, - DeviceDescription: msg.Device.Description, - DeviceLatitude: msg.Device.Latitude, - DeviceLongitude: msg.Device.Longitude, - DeviceAltitude: msg.Device.Altitude, - DeviceLocationDescription: msg.Device.LocationDescription, - DeviceProperties: msg.Device.Properties, - DeviceState: msg.Device.State, - - SensorID: sensor.ID, - SensorCode: sensor.Code, - SensorDescription: sensor.Description, - SensorExternalID: sensor.ExternalID, - SensorProperties: sensor.Properties, - SensorBrand: sensor.Brand, - SensorArchiveTime: sensor.ArchiveTime, - SensorIsFallback: sensor.IsFallback, - - DatastreamID: ds.ID, - DatastreamDescription: ds.Description, - DatastreamObservedProperty: ds.ObservedProperty, - DatastreamUnitOfMeasurement: ds.UnitOfMeasurement, - - MeasurementTimestamp: time.UnixMilli(m.Timestamp), - MeasurementValue: m.Value, - MeasurementLatitude: msg.Device.Latitude, - MeasurementLongitude: msg.Device.Longitude, - MeasurementAltitude: msg.Device.Altitude, - MeasurementProperties: m.Properties, - MeasurementExpiration: time.UnixMilli(msg.ReceivedAt).Add(time.Duration(*archiveTimeDays) * 24 * time.Hour), - - CreatedAt: time.Now(), + if err := storer.AddMeasurements(measurements); err != nil { + return err } - // Measurement location is either explicitly set or falls back to device location - if m.Latitude != nil && m.Longitude != nil { - measurement.MeasurementLatitude = m.Latitude - measurement.MeasurementLongitude = m.Longitude - measurement.MeasurementAltitude = m.Altitude + if err := storer.Finish(); err != nil { + return err } - - return s.StoreMeasurement(ctx, measurement) + return nil } // Filter contains query information for a list of measurements diff --git a/services/core/measurements/application_test.go b/services/core/measurements/application_test.go index 096323f1..6622ac06 100644 --- a/services/core/measurements/application_test.go +++ b/services/core/measurements/application_test.go @@ -129,19 +129,17 @@ func TestShouldErrorIfNoDeviceOrNoSensor(t *testing.T) { err := msg.NewMeasurement().SetValue(5, tC.observationProperty, "1").SetSensor(tC.sensorExternalID).Add() msg.AccessToken = authtest.CreateToken() require.NoError(t, err) - store := &StoreMock{ - FindDatastreamFunc: func(tenantID, sensorID int64, obs string) (*measurements.Datastream, error) { - assert.Equal(t, tC.expectedObservationProperty, obs, "expected observation property to match in query") - return &measurements.Datastream{}, nil - }, - InsertFunc: func(measurement measurements.Measurement) error { - return nil + store := &StoreMock{} + measurementStorer := &MeasurementStorerMock{} + measurementStoreBuilder := &MeasurementStoreBuilderMock{ + BeginFunc: func() (measurements.MeasurementStorer, error) { + return measurementStorer, nil }, } - svc := measurements.New(store, 0, authtest.JWKS()) + svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) // Act - err = svc.StorePipelineMessage(*msg) + err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) if tC.err != nil { assert.Error(t, tC.err, err) } else { @@ -188,23 +186,33 @@ func TestShouldCopyOverDefaultFields(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{ - FindDatastreamFunc: func(tenantID, sensorID int64, obs string) (*measurements.Datastream, error) { + store := &StoreMock{} + measurementStorer := &MeasurementStorerMock{ + GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - InsertFunc: func(measurement measurements.Measurement) error { + AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - svc := measurements.New(store, 0, authtest.JWKS()) + measurementStoreBuilder := &MeasurementStoreBuilderMock{ + BeginFunc: func() (measurements.MeasurementStorer, error) { + return measurementStorer, nil + }, + } + svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) // Act - err = svc.StorePipelineMessage(*msg) + err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) require.NoError(t, err) // Assert - require.Len(t, store.calls.Insert, 1, "SQL Insert should've been called") - measurement := store.calls.Insert[0].Measurement + assert.Len(t, measurementStoreBuilder.calls.Begin, 1, "SQL Insert should've been called") + assert.Greater(t, measurementStorer.calls.GetDatastream, 0, "SQL Insert should've been called") + require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") + require.Len(t, measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam, 1, "SQL Insert should've been called") + assert.Len(t, measurementStorer.calls.Finish, 1, "SQL Insert should've been called") + measurement := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam[0] assert.Equal(t, msg.TracingID, measurement.UplinkMessageID) // assert.Equal(t, OrganisationName, measurement.OrganisationName) // assert.Equal(t, OrganisationAddress, measurement.OrganisationAddress) @@ -325,24 +333,34 @@ func TestShouldChooseMeasurementLocationOverDeviceLocation(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{ - FindDatastreamFunc: func(tenantID, sensorID int64, obs string) (*measurements.Datastream, error) { + store := &StoreMock{} + measurementStorer := &MeasurementStorerMock{ + GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - InsertFunc: func(measurement measurements.Measurement) error { + AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - svc := measurements.New(store, 0, authtest.JWKS()) + measurementStoreBuilder := &MeasurementStoreBuilderMock{ + BeginFunc: func() (measurements.MeasurementStorer, error) { + return measurementStorer, nil + }, + } + svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) // Act require.NoError(t, - svc.StorePipelineMessage(*msg), + svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)), ) // Assert - require.Len(t, store.calls.Insert, 1, "SQL Insert should've been called") - measurement := store.calls.Insert[0].Measurement + assert.Len(t, measurementStoreBuilder.calls.Begin, 1, "SQL Insert should've been called") + assert.Greater(t, measurementStorer.calls.GetDatastream, 0, "SQL Insert should've been called") + require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") + require.Len(t, measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam, 1, "SQL Insert should've been called") + assert.Len(t, measurementStorer.calls.Finish, 1, "SQL Insert should've been called") + measurement := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam[0] assert.Equal(t, tC.ExpectedLatitude, measurement.MeasurementLatitude) assert.Equal(t, tC.ExpectedLongitude, measurement.MeasurementLongitude) assert.Equal(t, tC.ExpectedAltitude, measurement.MeasurementAltitude) @@ -406,28 +424,35 @@ func TestShouldSetExpirationDate(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{ - FindDatastreamFunc: func(tenantID, sensorID int64, obs string) (*measurements.Datastream, error) { + store := &StoreMock{} + measurementStorer := &MeasurementStorerMock{ + GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - InsertFunc: func(measurement measurements.Measurement) error { + AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - svc := measurements.New(store, sysArchiveTime, authtest.JWKS()) + measurementStoreBuilder := &MeasurementStoreBuilderMock{ + BeginFunc: func() (measurements.MeasurementStorer, error) { + return measurementStorer, nil + }, + } + svc := measurements.New(store, measurementStoreBuilder, sysArchiveTime, authtest.JWKS()) // Act - err = svc.StorePipelineMessage(*msg) + err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) require.NoError(t, err) // Assert - require.Len(t, store.calls.Insert, 1, "SQL Insert should've been called") - measurement := store.calls.Insert[0].Measurement + require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") + call := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam + require.Len(t, call, 1, "Should've tried to insert 1 measurement") // Check if the difference in seconds is 0, otherwise there might be a subsecond difference // due to parsing assert.Equal(t, float64(0), - math.Abs(float64(tC.expectedArchiveTime.Unix()-measurement.MeasurementExpiration.Unix())), + math.Abs(float64(tC.expectedArchiveTime.Unix()-call[0].MeasurementExpiration.Unix())), "", ) } diff --git a/services/core/measurements/datastreams.go b/services/core/measurements/datastreams.go index d5f7f54e..13248d3c 100644 --- a/services/core/measurements/datastreams.go +++ b/services/core/measurements/datastreams.go @@ -1,7 +1,6 @@ package measurements import ( - "errors" "net/http" "time" @@ -16,11 +15,6 @@ var ( ErrInvalidSensorID = web.NewError(http.StatusBadRequest, "Invalid sensorID", "ERR_SENSORID_INVALID") ) -type DatastreamFinderCreater interface { - FindDatastream(tenantID, sensorID int64, observedProperty string) (*Datastream, error) - CreateDatastream(*Datastream) error -} - type Datastream struct { ID uuid.UUID `json:"id"` Description string `json:"description"` @@ -49,21 +43,3 @@ func newDatastream(tenantID, sensorID int64, obs, uom string) (*Datastream, erro CreatedAt: time.Now(), }, nil } - -func FindOrCreateDatastream(tenantID, sensorID int64, obs, uom string, store DatastreamFinderCreater) (*Datastream, error) { - ds, err := store.FindDatastream(tenantID, sensorID, obs) - if errors.Is(err, ErrDatastreamNotFound) { - ds, err := newDatastream(tenantID, sensorID, obs, uom) - if err != nil { - return nil, err - } - if err := store.CreateDatastream(ds); err != nil { - return nil, err - } - return ds, nil - } - if err != nil { - return nil, err - } - return ds, nil -} diff --git a/services/core/measurements/datastreams_test.go b/services/core/measurements/datastreams_test.go deleted file mode 100644 index 8ca46c21..00000000 --- a/services/core/measurements/datastreams_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package measurements_test - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "sensorbucket.nl/sensorbucket/pkg/authtest" - "sensorbucket.nl/sensorbucket/services/core/measurements" -) - -func TestFindOrCreateDatastreamWorks(t *testing.T) { - store := &DatastreamFinderCreaterMock{ - FindDatastreamFunc: func(tenantID, sensorID int64, observationProperty string) (*measurements.Datastream, error) { - return nil, measurements.ErrDatastreamNotFound - }, - CreateDatastreamFunc: func(datastream *measurements.Datastream) error { - return nil - }, - } - sensorID := int64(5) - obs := "test_obs" - uom := "1/cm3" - - ds, err := measurements.FindOrCreateDatastream(authtest.DefaultTenantID, sensorID, obs, uom, store) - require.NoError(t, err) - assert.NotNil(t, ds, "FindOrCreateDatastream must return datastream if no error") - - // Should've tested existance - require.Len(t, store.calls.FindDatastream, 1) - assert.Equal(t, sensorID, store.calls.FindDatastream[0].SensorID) - assert.Equal(t, obs, store.calls.FindDatastream[0].ObservedProperty) - // Should create - require.Len(t, store.calls.CreateDatastream, 1) - cds := store.calls.CreateDatastream[0].Datastream - assert.NotEqual(t, uuid.UUID{}, cds.ID) - assert.Equal(t, sensorID, cds.SensorID) - assert.Equal(t, obs, cds.ObservedProperty) - assert.Equal(t, uom, cds.UnitOfMeasurement) - assert.Equal(t, authtest.DefaultTenantID, cds.TenantID) -} diff --git a/services/core/measurements/infra/store_psql.go b/services/core/measurements/infra/store_psql.go index c372bc78..e5e025db 100644 --- a/services/core/measurements/infra/store_psql.go +++ b/services/core/measurements/infra/store_psql.go @@ -1,6 +1,7 @@ package measurementsinfra import ( + "context" "database/sql" "errors" "fmt" @@ -352,3 +353,107 @@ func (s *MeasurementStorePSQL) GetDatastream(id uuid.UUID, filter measurements.D } return &ds, nil } + +func (s *MeasurementStorePSQL) BeginMeasurementTransaction() (*MeasurementPSQLStorer, error) { + tx, err := s.db.BeginTxx(context.Background(), nil) + if err != nil { + return nil, err + } + return &MeasurementPSQLStorer{tx, nil}, nil +} + +type MeasurementPSQLStorer struct { + tx *sqlx.Tx + error error +} + +func (measurementStorer *MeasurementPSQLStorer) GetDatastream(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { +} + +func (measurementStorer *MeasurementPSQLStorer) AddMeasurements(measurements []measurements.Measurement) error { + q := pq.Insert("measurements").Columns( + "uplink_message_id", + "organisation_id", + "organisation_name", + "organisation_address", + "organisation_zipcode", + "organisation_city", + "organisation_chamber_of_commerce_id", + "organisation_headquarter_id", + "organisation_state", + "organisation_archive_time", + "device_id", + "device_code", + "device_description", + "device_location", + "device_altitude", + "device_location_description", + "device_state", + "device_properties", + "sensor_id", + "sensor_code", + "sensor_description", + "sensor_external_id", + "sensor_properties", + "sensor_brand", + "sensor_archive_time", + "datastream_id", + "datastream_description", + "datastream_observed_property", + "datastream_unit_of_measurement", + "measurement_timestamp", + "measurement_value", + "measurement_location", + "measurement_altitude", + "measurement_expiration", + "created_at", + ) + for _, measurement := range measurements { + q = q.Values( + measurement.UplinkMessageID, + measurement.OrganisationID, + measurement.OrganisationName, + measurement.OrganisationAddress, + measurement.OrganisationZipcode, + measurement.OrganisationCity, + measurement.OrganisationChamberOfCommerceID, + measurement.OrganisationHeadquarterID, + measurement.OrganisationState, + measurement.OrganisationArchiveTime, + measurement.DeviceID, + measurement.DeviceCode, + measurement.DeviceDescription, + sq.Expr("ST_SETSRID(ST_POINT(?,?),4326)", measurement.DeviceLongitude, measurement.DeviceLatitude), + measurement.DeviceAltitude, + measurement.DeviceLocationDescription, + measurement.DeviceState, + measurement.DeviceProperties, + measurement.SensorID, + measurement.SensorCode, + measurement.SensorDescription, + measurement.SensorExternalID, + measurement.SensorProperties, + measurement.SensorBrand, + measurement.SensorArchiveTime, + measurement.DatastreamID, + measurement.DatastreamDescription, + measurement.DatastreamObservedProperty, + measurement.DatastreamUnitOfMeasurement, + measurement.MeasurementTimestamp, + measurement.MeasurementValue, + sq.Expr("ST_SETSRID(ST_POINT(?,?),4326)", measurement.MeasurementLongitude, measurement.MeasurementLatitude), + measurement.MeasurementAltitude, + measurement.MeasurementExpiration, + measurement.CreatedAt, + ) + } + + return nil +} + +func (measurementStorer *MeasurementPSQLStorer) Finish() error { + if measurementStorer.error != nil { + return measurementStorer.tx.Rollback() + } + return measurementStorer.tx.Commit() +} diff --git a/services/core/measurements/measurements.go b/services/core/measurements/measurements.go index ffec30b6..ff0bc3a7 100644 --- a/services/core/measurements/measurements.go +++ b/services/core/measurements/measurements.go @@ -1,6 +1,6 @@ package measurements -//go:generate moq -pkg measurements_test -out mock_test.go . Store DatastreamFinderCreater +//go:generate moq -pkg measurements_test -out mock_test.go . Store MeasurementStoreBuilder MeasurementStorer import ( "encoding/json" @@ -59,14 +59,3 @@ type Measurement struct { MeasurementExpiration time.Time `json:"measurement_expiration"` CreatedAt time.Time `json:"created_at"` } - -func (m *Measurement) Validate() error { - if m.DeviceID == 0 { - return ErrMissingDeviceInMeasurement - } - if m.MeasurementTimestamp.IsZero() { - return ErrMissingTimestampInMeasurement - } - // TODO: Add validation - return nil -} diff --git a/services/core/measurements/mock_test.go b/services/core/measurements/mock_test.go index b05f9427..cb3c9bd5 100644 --- a/services/core/measurements/mock_test.go +++ b/services/core/measurements/mock_test.go @@ -20,18 +20,9 @@ var _ measurements.Store = &StoreMock{} // // // make and configure a mocked measurements.Store // mockedStore := &StoreMock{ -// CreateDatastreamFunc: func(datastream *measurements.Datastream) error { -// panic("mock out the CreateDatastream method") -// }, -// FindDatastreamFunc: func(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) { -// panic("mock out the FindDatastream method") -// }, // GetDatastreamFunc: func(id uuid.UUID, filter measurements.DatastreamFilter) (*measurements.Datastream, error) { // panic("mock out the GetDatastream method") // }, -// InsertFunc: func(measurement measurements.Measurement) error { -// panic("mock out the Insert method") -// }, // ListDatastreamsFunc: func(datastreamFilter measurements.DatastreamFilter, request pagination.Request) (*pagination.Page[measurements.Datastream], error) { // panic("mock out the ListDatastreams method") // }, @@ -45,18 +36,9 @@ var _ measurements.Store = &StoreMock{} // // } type StoreMock struct { - // CreateDatastreamFunc mocks the CreateDatastream method. - CreateDatastreamFunc func(datastream *measurements.Datastream) error - - // FindDatastreamFunc mocks the FindDatastream method. - FindDatastreamFunc func(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) - // GetDatastreamFunc mocks the GetDatastream method. GetDatastreamFunc func(id uuid.UUID, filter measurements.DatastreamFilter) (*measurements.Datastream, error) - // InsertFunc mocks the Insert method. - InsertFunc func(measurement measurements.Measurement) error - // ListDatastreamsFunc mocks the ListDatastreams method. ListDatastreamsFunc func(datastreamFilter measurements.DatastreamFilter, request pagination.Request) (*pagination.Page[measurements.Datastream], error) @@ -65,20 +47,6 @@ type StoreMock struct { // calls tracks calls to the methods. calls struct { - // CreateDatastream holds details about calls to the CreateDatastream method. - CreateDatastream []struct { - // Datastream is the datastream argument value. - Datastream *measurements.Datastream - } - // FindDatastream holds details about calls to the FindDatastream method. - FindDatastream []struct { - // TenantID is the tenantID argument value. - TenantID int64 - // SensorID is the sensorID argument value. - SensorID int64 - // ObservedProperty is the observedProperty argument value. - ObservedProperty string - } // GetDatastream holds details about calls to the GetDatastream method. GetDatastream []struct { // ID is the id argument value. @@ -86,11 +54,6 @@ type StoreMock struct { // Filter is the filter argument value. Filter measurements.DatastreamFilter } - // Insert holds details about calls to the Insert method. - Insert []struct { - // Measurement is the measurement argument value. - Measurement measurements.Measurement - } // ListDatastreams holds details about calls to the ListDatastreams method. ListDatastreams []struct { // DatastreamFilter is the datastreamFilter argument value. @@ -106,84 +69,9 @@ type StoreMock struct { Request pagination.Request } } - lockCreateDatastream sync.RWMutex - lockFindDatastream sync.RWMutex - lockGetDatastream sync.RWMutex - lockInsert sync.RWMutex - lockListDatastreams sync.RWMutex - lockQuery sync.RWMutex -} - -// CreateDatastream calls CreateDatastreamFunc. -func (mock *StoreMock) CreateDatastream(datastream *measurements.Datastream) error { - if mock.CreateDatastreamFunc == nil { - panic("StoreMock.CreateDatastreamFunc: method is nil but Store.CreateDatastream was just called") - } - callInfo := struct { - Datastream *measurements.Datastream - }{ - Datastream: datastream, - } - mock.lockCreateDatastream.Lock() - mock.calls.CreateDatastream = append(mock.calls.CreateDatastream, callInfo) - mock.lockCreateDatastream.Unlock() - return mock.CreateDatastreamFunc(datastream) -} - -// CreateDatastreamCalls gets all the calls that were made to CreateDatastream. -// Check the length with: -// -// len(mockedStore.CreateDatastreamCalls()) -func (mock *StoreMock) CreateDatastreamCalls() []struct { - Datastream *measurements.Datastream -} { - var calls []struct { - Datastream *measurements.Datastream - } - mock.lockCreateDatastream.RLock() - calls = mock.calls.CreateDatastream - mock.lockCreateDatastream.RUnlock() - return calls -} - -// FindDatastream calls FindDatastreamFunc. -func (mock *StoreMock) FindDatastream(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) { - if mock.FindDatastreamFunc == nil { - panic("StoreMock.FindDatastreamFunc: method is nil but Store.FindDatastream was just called") - } - callInfo := struct { - TenantID int64 - SensorID int64 - ObservedProperty string - }{ - TenantID: tenantID, - SensorID: sensorID, - ObservedProperty: observedProperty, - } - mock.lockFindDatastream.Lock() - mock.calls.FindDatastream = append(mock.calls.FindDatastream, callInfo) - mock.lockFindDatastream.Unlock() - return mock.FindDatastreamFunc(tenantID, sensorID, observedProperty) -} - -// FindDatastreamCalls gets all the calls that were made to FindDatastream. -// Check the length with: -// -// len(mockedStore.FindDatastreamCalls()) -func (mock *StoreMock) FindDatastreamCalls() []struct { - TenantID int64 - SensorID int64 - ObservedProperty string -} { - var calls []struct { - TenantID int64 - SensorID int64 - ObservedProperty string - } - mock.lockFindDatastream.RLock() - calls = mock.calls.FindDatastream - mock.lockFindDatastream.RUnlock() - return calls + lockGetDatastream sync.RWMutex + lockListDatastreams sync.RWMutex + lockQuery sync.RWMutex } // GetDatastream calls GetDatastreamFunc. @@ -222,38 +110,6 @@ func (mock *StoreMock) GetDatastreamCalls() []struct { return calls } -// Insert calls InsertFunc. -func (mock *StoreMock) Insert(measurement measurements.Measurement) error { - if mock.InsertFunc == nil { - panic("StoreMock.InsertFunc: method is nil but Store.Insert was just called") - } - callInfo := struct { - Measurement measurements.Measurement - }{ - Measurement: measurement, - } - mock.lockInsert.Lock() - mock.calls.Insert = append(mock.calls.Insert, callInfo) - mock.lockInsert.Unlock() - return mock.InsertFunc(measurement) -} - -// InsertCalls gets all the calls that were made to Insert. -// Check the length with: -// -// len(mockedStore.InsertCalls()) -func (mock *StoreMock) InsertCalls() []struct { - Measurement measurements.Measurement -} { - var calls []struct { - Measurement measurements.Measurement - } - mock.lockInsert.RLock() - calls = mock.calls.Insert - mock.lockInsert.RUnlock() - return calls -} - // ListDatastreams calls ListDatastreamsFunc. func (mock *StoreMock) ListDatastreams(datastreamFilter measurements.DatastreamFilter, request pagination.Request) (*pagination.Page[measurements.Datastream], error) { if mock.ListDatastreamsFunc == nil { @@ -326,124 +182,226 @@ func (mock *StoreMock) QueryCalls() []struct { return calls } -// Ensure, that DatastreamFinderCreaterMock does implement measurements.DatastreamFinderCreater. +// Ensure, that MeasurementStoreBuilderMock does implement measurements.MeasurementStoreBuilder. // If this is not the case, regenerate this file with moq. -var _ measurements.DatastreamFinderCreater = &DatastreamFinderCreaterMock{} +var _ measurements.MeasurementStoreBuilder = &MeasurementStoreBuilderMock{} -// DatastreamFinderCreaterMock is a mock implementation of measurements.DatastreamFinderCreater. +// MeasurementStoreBuilderMock is a mock implementation of measurements.MeasurementStoreBuilder. // -// func TestSomethingThatUsesDatastreamFinderCreater(t *testing.T) { +// func TestSomethingThatUsesMeasurementStoreBuilder(t *testing.T) { // -// // make and configure a mocked measurements.DatastreamFinderCreater -// mockedDatastreamFinderCreater := &DatastreamFinderCreaterMock{ -// CreateDatastreamFunc: func(datastream *measurements.Datastream) error { -// panic("mock out the CreateDatastream method") +// // make and configure a mocked measurements.MeasurementStoreBuilder +// mockedMeasurementStoreBuilder := &MeasurementStoreBuilderMock{ +// BeginFunc: func() (measurements.MeasurementStorer, error) { +// panic("mock out the Begin method") // }, -// FindDatastreamFunc: func(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) { -// panic("mock out the FindDatastream method") +// } +// +// // use mockedMeasurementStoreBuilder in code that requires measurements.MeasurementStoreBuilder +// // and then make assertions. +// +// } +type MeasurementStoreBuilderMock struct { + // BeginFunc mocks the Begin method. + BeginFunc func() (measurements.MeasurementStorer, error) + + // calls tracks calls to the methods. + calls struct { + // Begin holds details about calls to the Begin method. + Begin []struct { + } + } + lockBegin sync.RWMutex +} + +// Begin calls BeginFunc. +func (mock *MeasurementStoreBuilderMock) Begin() (measurements.MeasurementStorer, error) { + if mock.BeginFunc == nil { + panic("MeasurementStoreBuilderMock.BeginFunc: method is nil but MeasurementStoreBuilder.Begin was just called") + } + callInfo := struct { + }{} + mock.lockBegin.Lock() + mock.calls.Begin = append(mock.calls.Begin, callInfo) + mock.lockBegin.Unlock() + return mock.BeginFunc() +} + +// BeginCalls gets all the calls that were made to Begin. +// Check the length with: +// +// len(mockedMeasurementStoreBuilder.BeginCalls()) +func (mock *MeasurementStoreBuilderMock) BeginCalls() []struct { +} { + var calls []struct { + } + mock.lockBegin.RLock() + calls = mock.calls.Begin + mock.lockBegin.RUnlock() + return calls +} + +// Ensure, that MeasurementStorerMock does implement measurements.MeasurementStorer. +// If this is not the case, regenerate this file with moq. +var _ measurements.MeasurementStorer = &MeasurementStorerMock{} + +// MeasurementStorerMock is a mock implementation of measurements.MeasurementStorer. +// +// func TestSomethingThatUsesMeasurementStorer(t *testing.T) { +// +// // make and configure a mocked measurements.MeasurementStorer +// mockedMeasurementStorer := &MeasurementStorerMock{ +// AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { +// panic("mock out the AddMeasurements method") +// }, +// FinishFunc: func() error { +// panic("mock out the Finish method") +// }, +// GetDatastreamFunc: func(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) { +// panic("mock out the GetDatastream method") // }, // } // -// // use mockedDatastreamFinderCreater in code that requires measurements.DatastreamFinderCreater +// // use mockedMeasurementStorer in code that requires measurements.MeasurementStorer // // and then make assertions. // // } -type DatastreamFinderCreaterMock struct { - // CreateDatastreamFunc mocks the CreateDatastream method. - CreateDatastreamFunc func(datastream *measurements.Datastream) error +type MeasurementStorerMock struct { + // AddMeasurementsFunc mocks the AddMeasurements method. + AddMeasurementsFunc func(measurementsMoqParam []measurements.Measurement) error - // FindDatastreamFunc mocks the FindDatastream method. - FindDatastreamFunc func(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) + // FinishFunc mocks the Finish method. + FinishFunc func() error + + // GetDatastreamFunc mocks the GetDatastream method. + GetDatastreamFunc func(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) // calls tracks calls to the methods. calls struct { - // CreateDatastream holds details about calls to the CreateDatastream method. - CreateDatastream []struct { - // Datastream is the datastream argument value. - Datastream *measurements.Datastream + // AddMeasurements holds details about calls to the AddMeasurements method. + AddMeasurements []struct { + // MeasurementsMoqParam is the measurementsMoqParam argument value. + MeasurementsMoqParam []measurements.Measurement } - // FindDatastream holds details about calls to the FindDatastream method. - FindDatastream []struct { + // Finish holds details about calls to the Finish method. + Finish []struct { + } + // GetDatastream holds details about calls to the GetDatastream method. + GetDatastream []struct { // TenantID is the tenantID argument value. TenantID int64 // SensorID is the sensorID argument value. SensorID int64 // ObservedProperty is the observedProperty argument value. ObservedProperty string + // UnitOfMeasurement is the unitOfMeasurement argument value. + UnitOfMeasurement string } } - lockCreateDatastream sync.RWMutex - lockFindDatastream sync.RWMutex + lockAddMeasurements sync.RWMutex + lockFinish sync.RWMutex + lockGetDatastream sync.RWMutex } -// CreateDatastream calls CreateDatastreamFunc. -func (mock *DatastreamFinderCreaterMock) CreateDatastream(datastream *measurements.Datastream) error { - if mock.CreateDatastreamFunc == nil { - panic("DatastreamFinderCreaterMock.CreateDatastreamFunc: method is nil but DatastreamFinderCreater.CreateDatastream was just called") +// AddMeasurements calls AddMeasurementsFunc. +func (mock *MeasurementStorerMock) AddMeasurements(measurementsMoqParam []measurements.Measurement) error { + if mock.AddMeasurementsFunc == nil { + panic("MeasurementStorerMock.AddMeasurementsFunc: method is nil but MeasurementStorer.AddMeasurements was just called") } callInfo := struct { - Datastream *measurements.Datastream + MeasurementsMoqParam []measurements.Measurement }{ - Datastream: datastream, + MeasurementsMoqParam: measurementsMoqParam, + } + mock.lockAddMeasurements.Lock() + mock.calls.AddMeasurements = append(mock.calls.AddMeasurements, callInfo) + mock.lockAddMeasurements.Unlock() + return mock.AddMeasurementsFunc(measurementsMoqParam) +} + +// AddMeasurementsCalls gets all the calls that were made to AddMeasurements. +// Check the length with: +// +// len(mockedMeasurementStorer.AddMeasurementsCalls()) +func (mock *MeasurementStorerMock) AddMeasurementsCalls() []struct { + MeasurementsMoqParam []measurements.Measurement +} { + var calls []struct { + MeasurementsMoqParam []measurements.Measurement } - mock.lockCreateDatastream.Lock() - mock.calls.CreateDatastream = append(mock.calls.CreateDatastream, callInfo) - mock.lockCreateDatastream.Unlock() - return mock.CreateDatastreamFunc(datastream) + mock.lockAddMeasurements.RLock() + calls = mock.calls.AddMeasurements + mock.lockAddMeasurements.RUnlock() + return calls } -// CreateDatastreamCalls gets all the calls that were made to CreateDatastream. +// Finish calls FinishFunc. +func (mock *MeasurementStorerMock) Finish() error { + if mock.FinishFunc == nil { + panic("MeasurementStorerMock.FinishFunc: method is nil but MeasurementStorer.Finish was just called") + } + callInfo := struct { + }{} + mock.lockFinish.Lock() + mock.calls.Finish = append(mock.calls.Finish, callInfo) + mock.lockFinish.Unlock() + return mock.FinishFunc() +} + +// FinishCalls gets all the calls that were made to Finish. // Check the length with: // -// len(mockedDatastreamFinderCreater.CreateDatastreamCalls()) -func (mock *DatastreamFinderCreaterMock) CreateDatastreamCalls() []struct { - Datastream *measurements.Datastream +// len(mockedMeasurementStorer.FinishCalls()) +func (mock *MeasurementStorerMock) FinishCalls() []struct { } { var calls []struct { - Datastream *measurements.Datastream } - mock.lockCreateDatastream.RLock() - calls = mock.calls.CreateDatastream - mock.lockCreateDatastream.RUnlock() + mock.lockFinish.RLock() + calls = mock.calls.Finish + mock.lockFinish.RUnlock() return calls } -// FindDatastream calls FindDatastreamFunc. -func (mock *DatastreamFinderCreaterMock) FindDatastream(tenantID int64, sensorID int64, observedProperty string) (*measurements.Datastream, error) { - if mock.FindDatastreamFunc == nil { - panic("DatastreamFinderCreaterMock.FindDatastreamFunc: method is nil but DatastreamFinderCreater.FindDatastream was just called") +// GetDatastream calls GetDatastreamFunc. +func (mock *MeasurementStorerMock) GetDatastream(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) { + if mock.GetDatastreamFunc == nil { + panic("MeasurementStorerMock.GetDatastreamFunc: method is nil but MeasurementStorer.GetDatastream was just called") } callInfo := struct { - TenantID int64 - SensorID int64 - ObservedProperty string + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string }{ - TenantID: tenantID, - SensorID: sensorID, - ObservedProperty: observedProperty, + TenantID: tenantID, + SensorID: sensorID, + ObservedProperty: observedProperty, + UnitOfMeasurement: unitOfMeasurement, } - mock.lockFindDatastream.Lock() - mock.calls.FindDatastream = append(mock.calls.FindDatastream, callInfo) - mock.lockFindDatastream.Unlock() - return mock.FindDatastreamFunc(tenantID, sensorID, observedProperty) + mock.lockGetDatastream.Lock() + mock.calls.GetDatastream = append(mock.calls.GetDatastream, callInfo) + mock.lockGetDatastream.Unlock() + return mock.GetDatastreamFunc(tenantID, sensorID, observedProperty, unitOfMeasurement) } -// FindDatastreamCalls gets all the calls that were made to FindDatastream. +// GetDatastreamCalls gets all the calls that were made to GetDatastream. // Check the length with: // -// len(mockedDatastreamFinderCreater.FindDatastreamCalls()) -func (mock *DatastreamFinderCreaterMock) FindDatastreamCalls() []struct { - TenantID int64 - SensorID int64 - ObservedProperty string +// len(mockedMeasurementStorer.GetDatastreamCalls()) +func (mock *MeasurementStorerMock) GetDatastreamCalls() []struct { + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string } { var calls []struct { - TenantID int64 - SensorID int64 - ObservedProperty string + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string } - mock.lockFindDatastream.RLock() - calls = mock.calls.FindDatastream - mock.lockFindDatastream.RUnlock() + mock.lockGetDatastream.RLock() + calls = mock.calls.GetDatastream + mock.lockGetDatastream.RUnlock() return calls } diff --git a/services/core/measurements/pipeline_message.go b/services/core/measurements/pipeline_message.go new file mode 100644 index 00000000..a75d0802 --- /dev/null +++ b/services/core/measurements/pipeline_message.go @@ -0,0 +1,106 @@ +package measurements + +import ( + "context" + "fmt" + "time" + + "github.com/samber/lo" + "sensorbucket.nl/sensorbucket/pkg/auth" + "sensorbucket.nl/sensorbucket/pkg/pipeline" + "sensorbucket.nl/sensorbucket/services/core/devices" +) + +type PipelineMessage pipeline.Message + +func (msg *PipelineMessage) Authorize(keyClient auth.JWKSClient) (context.Context, error) { + ctx, err := auth.AuthenticateContext(context.Background(), msg.AccessToken, keyClient) + if err != nil { + return ctx, err + } + if err := auth.MustHavePermissions(ctx, auth.Permissions{auth.WRITE_MEASUREMENTS}); err != nil { + return ctx, err + } + msg.TenantID, err = auth.GetTenant(ctx) + if err != nil { + return ctx, err + } + return ctx, nil +} + +func (msg *PipelineMessage) Validate() error { + if msg.Device == nil { + return ErrMissingDeviceInMeasurement + } + return nil +} + +func buildMeasurements(msg *PipelineMessage, storer MeasurementStorer, archiveTime int) ([]Measurement, error) { + dev := (*devices.Device)(msg.Device) + + baseMeasurement := Measurement{ + UplinkMessageID: msg.TracingID, + OrganisationID: int(msg.TenantID), + DeviceID: msg.Device.ID, + DeviceCode: msg.Device.Code, + DeviceDescription: msg.Device.Description, + DeviceLatitude: msg.Device.Latitude, + DeviceLongitude: msg.Device.Longitude, + DeviceAltitude: msg.Device.Altitude, + DeviceLocationDescription: msg.Device.LocationDescription, + DeviceProperties: msg.Device.Properties, + DeviceState: msg.Device.State, + MeasurementLatitude: msg.Device.Latitude, + MeasurementLongitude: msg.Device.Longitude, + MeasurementAltitude: msg.Device.Altitude, + CreatedAt: time.Now(), + } + + measurements := make([]Measurement, len(msg.Measurements)) + for ix, m := range msg.Measurements { + + sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID) + if err != nil { + return nil, fmt.Errorf("cannot get sensor: %w", err) + } + if sensor.ExternalID != m.SensorExternalID { + m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty + } + + archiveTimeDays, _ := lo.Coalesce(sensor.ArchiveTime, &archiveTime) // msg.Organisation.ArchiveTime) + + ds, err := storer.GetDatastream(msg.TenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement) + if err != nil { + return nil, err + } + + measurement := baseMeasurement + measurement.SensorID = sensor.ID + measurement.SensorCode = sensor.Code + measurement.SensorDescription = sensor.Description + measurement.SensorExternalID = sensor.ExternalID + measurement.SensorProperties = sensor.Properties + measurement.SensorBrand = sensor.Brand + measurement.SensorArchiveTime = sensor.ArchiveTime + measurement.SensorIsFallback = sensor.IsFallback + measurement.DatastreamID = ds.ID + measurement.DatastreamDescription = ds.Description + measurement.DatastreamObservedProperty = ds.ObservedProperty + measurement.DatastreamUnitOfMeasurement = ds.UnitOfMeasurement + measurement.MeasurementTimestamp = time.UnixMilli(m.Timestamp) + measurement.MeasurementValue = m.Value + measurement.MeasurementProperties = m.Properties + measurement.MeasurementExpiration = time.UnixMilli(msg.ReceivedAt).Add(time.Duration(*archiveTimeDays) * 24 * time.Hour) + + // Measurement location is either explicitly set or falls back to device location + if m.Latitude != nil && m.Longitude != nil { + measurement.MeasurementLatitude = m.Latitude + measurement.MeasurementLongitude = m.Longitude + measurement.MeasurementAltitude = m.Altitude + } + + measurements[ix] = measurement + } + + return measurements, nil +} diff --git a/services/core/measurements/transport/mq.go b/services/core/measurements/transport/mq.go index e0fe809f..2f34610e 100644 --- a/services/core/measurements/transport/mq.go +++ b/services/core/measurements/transport/mq.go @@ -1,6 +1,7 @@ package measurementtransport import ( + "context" "encoding/json" "fmt" "log" @@ -8,6 +9,7 @@ import ( "github.com/rabbitmq/amqp091-go" + "sensorbucket.nl/sensorbucket/internal/cleanupper" "sensorbucket.nl/sensorbucket/pkg/mq" "sensorbucket.nl/sensorbucket/pkg/pipeline" "sensorbucket.nl/sensorbucket/services/core/measurements" @@ -21,30 +23,10 @@ func StartMQ( measurementStorageTopic, measurementErrorTopic string, prefetch int, -) func() { +) cleanupper.Shutdown { done := make(chan struct{}) - consume := mq.Consume(conn, measurementQueue, func(c *amqp091.Channel) error { - if err := c.Qos(prefetch, 0, false); err != nil { - return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err) - } - q, err := c.QueueDeclare(measurementQueue, true, false, false, false, nil) - if err != nil { - return fmt.Errorf("error declaring amqp queue: %w", err) - } - err = c.ExchangeDeclare(pipelineMessagesExchange, "topic", true, false, false, false, nil) - if err != nil { - return fmt.Errorf("error declaring amqp exchange: %w", err) - } - err = c.QueueBind(q.Name, measurementStorageTopic, pipelineMessagesExchange, false, nil) - if err != nil { - return fmt.Errorf("error binding amqp queue to exchange: %w", err) - } - return nil - }) - publish := mq.Publisher(conn, pipelineMessagesExchange, func(c *amqp091.Channel) error { - err := c.ExchangeDeclare(pipelineMessagesExchange, "topic", true, false, false, false, nil) - return err - }) + consume := mq.Consume(conn, measurementQueue, mq.WithDefaults(), mq.WithTopicBinding()) + publish := mq.Publisher(conn, pipelineMessagesExchange, mq.WithDefaults(), mq.WithExchange()) go func() { log.Println("Measurement MQ Transport running...") @@ -95,7 +77,8 @@ func StartMQ( } }() - return func() { + return func(ctx context.Context) error { close(done) + return nil } } diff --git a/services/core/processing/transport/ingressDTOConsumer.go b/services/core/processing/transport/ingressDTOConsumer.go deleted file mode 100644 index abccf165..00000000 --- a/services/core/processing/transport/ingressDTOConsumer.go +++ /dev/null @@ -1,56 +0,0 @@ -package processingtransport - -import ( - "encoding/json" - "fmt" - - "github.com/rabbitmq/amqp091-go" - - "sensorbucket.nl/sensorbucket/pkg/mq" - "sensorbucket.nl/sensorbucket/services/core/processing" -) - -func StartIngressDTOConsumer(conn *mq.AMQPConnection, svc *processing.Service, queue, xchg, topic string, prefetch int) { - consume := conn.Consume(queue, func(c *amqp091.Channel) error { - if err := c.Qos(prefetch, 0, false); err != nil { - return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err) - } - _, err := c.QueueDeclare(queue, true, false, false, false, nil) - if err != nil { - return err - } - - // Create exchange and bind if both arguments are provided, this is optional - if xchg != "" && topic != "" { - if err := c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil); err != nil { - return err - } - if err := c.QueueBind(queue, topic, xchg, false, nil); err != nil { - return err - } - } - return nil - }) - - for delivery := range consume { - var dto processing.IngressDTO - if err := json.Unmarshal(delivery.Body, &dto); err != nil { - fmt.Printf("Error unmarshalling ingress DTO: %v\n", err) - if err := delivery.Nack(false, false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) - } - continue - } - - if err := svc.ProcessIngressDTO(dto); err != nil { - fmt.Printf("Error processing ingress DTO: %v\n", err) - if err := delivery.Nack(false, false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) - } - continue - } - if err := delivery.Ack(false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) - } - } -} diff --git a/services/fission-rmq-connector/main.go b/services/fission-rmq-connector/main.go index 706682c7..3dc08138 100644 --- a/services/fission-rmq-connector/main.go +++ b/services/fission-rmq-connector/main.go @@ -56,7 +56,6 @@ var ( AMQP_HOST = env.Must("AMQP_HOST") AMQP_QUEUE = env.Must("QUEUE_NAME") AMQP_TOPIC = env.Must("TOPIC") - AMQP_PREFETCH = env.Could("AMQP_PREFETCH", "5") AMQP_XCHG = env.Must("EXCHANGE") HTTP_ENDPOINT = env.Must("HTTP_ENDPOINT") MAX_RETRIES = env.Could("MAX_RETRIES", "3") @@ -64,10 +63,6 @@ var ( ) func Run(cleanup cleanupper.Cleanupper) error { - prefetch, err := strconv.Atoi(AMQP_PREFETCH) - if err != nil { - return err - } maxRetries, err := strconv.Atoi(MAX_RETRIES) if err != nil { return err @@ -88,26 +83,10 @@ func Run(cleanup cleanupper.Cleanupper) error { successChan := conn.Publisher(AMQP_XCHG, func(c *amqp091.Channel) error { return nil }) - consumeChan := conn.Consume(AMQP_QUEUE, func(c *amqp091.Channel) error { - // Cannot set autodelete because KEDA can't scale if the queue does not exist at all - _, err := c.QueueDeclare(AMQP_QUEUE, true, false, false, false, nil) - if err != nil { - return err - } - err = c.ExchangeDeclare(AMQP_XCHG, "topic", true, false, false, false, nil) - if err != nil { - return err - } - err = c.QueueBind(AMQP_QUEUE, AMQP_TOPIC, AMQP_XCHG, false, nil) - if err != nil { - return err - } - err = c.Qos(prefetch, 0, true) - if err != nil { - return err - } - return nil - }) + consumeChan := conn.Consume(AMQP_QUEUE, + mq.WithDefaults(), + mq.WithTopicBinding(AMQP_QUEUE, AMQP_XCHG, AMQP_TOPIC), + ) go conn.Start() connector := Connector{ @@ -117,17 +96,11 @@ func Run(cleanup cleanupper.Cleanupper) error { Result: successChan, } - concurrency := make(chan int, prefetch) - go func(incoming <-chan amqp091.Delivery) { - for delivery := range incoming { - concurrency <- 1 - go func(delivery amqp091.Delivery) { - log.Printf("Processing: %s\n", delivery.MessageId) - connector.handleDelivery(delivery) - <-concurrency - }(delivery) + go func() { + for delivery := range consumeChan { + go connector.handleDelivery(delivery) } - }(consumeChan) + }() if METRICS_ADDR != "" { go func() { diff --git a/services/tracing/tracing/transport/mq.go b/services/tracing/tracing/transport/mq.go index b568572a..d57ddc26 100644 --- a/services/tracing/tracing/transport/mq.go +++ b/services/tracing/tracing/transport/mq.go @@ -2,7 +2,6 @@ package tracingtransport import ( "encoding/json" - "fmt" "log" "time" @@ -88,24 +87,3 @@ func processMessage(deliveries <-chan amqp091.Delivery, svc *tracing.Service) { } } } - -func setupFunc(prefetch int, queue, xchg, topic string) mq.AMQPSetupFunc { - return func(c *amqp091.Channel) error { - if err := c.Qos(prefetch, 0, false); err != nil { - return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err) - } - _, err := c.QueueDeclare(queue, true, false, false, false, nil) - if err != nil { - return fmt.Errorf("error declaring amqp queue: %w", err) - } - err = c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil) - if err != nil { - return fmt.Errorf("error declaring amqp exchange: %w", err) - } - err = c.QueueBind(queue, topic, xchg, false, nil) - if err != nil { - return fmt.Errorf("error binding amqp queue to exchange: %w", err) - } - return nil - } -} From a6433782a4053979a8c4230dfea771ad1402029f Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Fri, 3 Jan 2025 16:16:04 +0100 Subject: [PATCH 3/7] Use QueueProcessors and quorum queues --- pkg/mq/amqp_connection.go | 4 +- pkg/mq/queue_processor.go | 45 +++--- pkg/mq/setup_builder.go | 7 +- services/core/main.go | 43 +++--- services/core/measurements/application.go | 146 ++++++++++++++---- .../core/measurements/infra/store_psql.go | 34 ++-- .../core/measurements/pipeline_message.go | 74 --------- services/core/measurements/transport.go | 23 +++ services/core/measurements/transport/mq.go | 84 ---------- ...2842_add_datastream_get_or_create.down.sql | 1 + ...112842_add_datastream_get_or_create.up.sql | 43 ++++++ services/core/processing/transport.go | 22 +++ services/fission-rmq-connector/main.go | 2 +- .../service/dockerworker/main.py | 2 +- .../ingress-archiver/service/transport.go | 45 ++---- services/tracing/main.go | 17 +- services/tracing/tracing/transport/mq.go | 88 ++++------- 17 files changed, 314 insertions(+), 366 deletions(-) create mode 100644 services/core/measurements/transport.go delete mode 100644 services/core/measurements/transport/mq.go create mode 100644 services/core/migrations/20250103112842_add_datastream_get_or_create.down.sql create mode 100644 services/core/migrations/20250103112842_add_datastream_get_or_create.up.sql create mode 100644 services/core/processing/transport.go diff --git a/pkg/mq/amqp_connection.go b/pkg/mq/amqp_connection.go index 1090adca..2abf0b16 100644 --- a/pkg/mq/amqp_connection.go +++ b/pkg/mq/amqp_connection.go @@ -135,9 +135,9 @@ func (c *AMQPConnection) UseConnection() <-chan *amqp.Connection { } func (c *AMQPConnection) Consume(queue string, setup ...SetupOption) <-chan amqp.Delivery { - return Consume(c, queue, setup) + return Consume(c, queue, setup...) } func (c *AMQPConnection) Publisher(xchg string, setup ...SetupOption) chan<- PublishMessage { - return Publisher(c, xchg, setup) + return Publisher(c, xchg, setup...) } diff --git a/pkg/mq/queue_processor.go b/pkg/mq/queue_processor.go index 1350f56c..d9f935ac 100644 --- a/pkg/mq/queue_processor.go +++ b/pkg/mq/queue_processor.go @@ -1,12 +1,27 @@ package mq import ( - "encoding/json" + "errors" "fmt" "sync" + + "github.com/rabbitmq/amqp091-go" ) -func StartQueueProcessor[T any](conn *AMQPConnection, queue, exchange, topic string, processFn func(T) error) { +var ErrMalformed = errors.New("delivery malformed") + +type ( + ProcessorFunc = func(delivery amqp091.Delivery) error + ProcessorFuncBuilder = func() ProcessorFunc +) + +// StartQueueProcessor opens a basic consume channel with a queue and exchange topic binding. +// Multiple workers will be started based on the prefetch count. Each worker will call the ProcessFuncBuilder, +// which allows a closure per worker, ie an instantiation of a variable for that worker. +// The processor function will be called for each message received from the queue. +// In case of an error, the message will be requeued unless the error wraps mq.ErrMalformed +// The processFunc parameter is a builder which will be called for +func StartQueueProcessor(conn *AMQPConnection, queue, exchange, topic string, processFunc ProcessorFuncBuilder) { var wg sync.WaitGroup consume := conn.Consume(queue, WithDefaults(), WithTopicBinding(queue, exchange, topic)) @@ -15,29 +30,21 @@ func StartQueueProcessor[T any](conn *AMQPConnection, queue, exchange, topic str wg.Add(1) go func(id int) { defer wg.Done() - var dto T - + process := processFunc() for delivery := range consume { - dto = *new(T) - - if err := json.Unmarshal(delivery.Body, &dto); err != nil { - fmt.Printf("Error unmarshalling ingress DTO: %v\n", err) - if err := delivery.Nack(false, false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) - } - continue - } - - if err := processFn(dto); err != nil { - fmt.Printf("Error processing ingress DTO: %v\n", err) - if err := delivery.Nack(false, false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) + err := process(delivery) + if err != nil { + fmt.Printf("Error: QueueProcessorFunc failed: %s\n", err.Error()) + // Only requeue if err is not an ErrMalformed and it is not already redelivered + requeue := !errors.Is(err, ErrMalformed) && !delivery.Redelivered + if err := delivery.Nack(false, requeue); err != nil { + fmt.Printf("Error: could not NAck delivery: %s\n", err.Error()) } continue } if err := delivery.Ack(false); err != nil { - fmt.Printf("Error Nacking ingress delivery: %v\n", err) + fmt.Printf("Error: could not Ack delivery: %s\n", err.Error()) } } }(i) diff --git a/pkg/mq/setup_builder.go b/pkg/mq/setup_builder.go index d9c97514..ec9fc440 100644 --- a/pkg/mq/setup_builder.go +++ b/pkg/mq/setup_builder.go @@ -9,12 +9,7 @@ import ( "github.com/rabbitmq/amqp091-go" ) -// Must set QOS -// Must default to Quoroum queue but offer alternative -// Must offer queue/xchg bind -// Must offer - -var defaultPrefetchCount int = 0 +var defaultPrefetchCount int = 50 func DefaultPrefetch() int { return defaultPrefetchCount diff --git a/services/core/main.go b/services/core/main.go index ef7b234d..d16e925d 100644 --- a/services/core/main.go +++ b/services/core/main.go @@ -42,11 +42,12 @@ var ( AMQP_QUEUE_INGRESS = env.Could("AMQP_QUEUE_INGRESS", "core-ingress") AMQP_XCHG_INGRESS = env.Could("AMQP_XCHG_INGRESS", "ingress") AMQP_QUEUE_ERRORS = env.Could("AMQP_QUEUE_ERRORS", "errors") - AMQP_PREFETCH = env.Could("AMQP_PREFETCH", "5") HTTP_ADDR = env.Could("HTTP_ADDR", ":3000") HTTP_BASE = env.Could("HTTP_BASE", "http://localhost:3000/api") AUTH_JWKS_URL = env.Could("AUTH_JWKS_URL", "http://oathkeeper:4456/.well-known/jwks.json") SYS_ARCHIVE_TIME = env.Could("SYS_ARCHIVE_TIME", "30") + MEASUREMENT_BATCH_SIZE = env.CouldInt("MEASUREMENT_BATCH_SIZE", 1024) + MEASUREMENT_COMMIT_INTERVAL = env.CouldInt("MEASUREMENT_COMMIT_INTERVAL", 1) ) func main() { @@ -67,11 +68,6 @@ func Run(cleanup cleanupper.Cleanupper) error { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - prefetch, err := strconv.Atoi(AMQP_PREFETCH) - if err != nil { - return err - } - stopProfiler, err := web.RunProfiler() if err != nil { fmt.Printf("could not setup profiler server: %s\n", err) @@ -101,43 +97,44 @@ func Run(cleanup cleanupper.Cleanupper) error { } measurementstore := measurementsinfra.NewPSQL(db) measurementservice := measurements.New(measurementstore, sysArchiveTime, keyClient) + cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Second)) processingstore := processinginfra.NewPSQLStore(db) processingPipelinePublisher := processinginfra.NewPipelineMessagePublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES) processingservice := processing.New(processingstore, processingPipelinePublisher, keyClient) - // Setup HTTP Transport - httpsrv := createHTTPServer(coretransport.New( - HTTP_BASE, - keyClient, - deviceservice, - measurementservice, - processingservice, - )) - go func() { - if err := httpsrv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) && err != nil { - fmt.Printf("HTTP Server error: %v\n", err) - } - }() - log.Printf("HTTP Listening: %s\n", httpsrv.Addr) - // Setup MQ Transports go mq.StartQueueProcessor( amqpConn, AMQP_QUEUE_MEASUREMENTS, AMQP_XCHG_PIPELINE_MESSAGES, AMQP_XCHG_MEASUREMENTS_TOPIC, - measurementservice.StorePipelineMessage, + measurements.MQMessageProcessor(measurementservice), ) go mq.StartQueueProcessor( amqpConn, AMQP_QUEUE_INGRESS, AMQP_XCHG_INGRESS, AMQP_XCHG_INGRESS_TOPIC, - processingservice.ProcessIngressDTO, + processing.MQIngressDTOProcessor(processingservice), ) go amqpConn.Start() + // Setup HTTP Transport + httpsrv := createHTTPServer(coretransport.New( + HTTP_BASE, + keyClient, + deviceservice, + measurementservice, + processingservice, + )) + go func() { + if err := httpsrv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) && err != nil { + fmt.Printf("HTTP Server error: %v\n", err) + } + }() + log.Printf("HTTP Listening: %s\n", httpsrv.Addr) + healthShutdown := healthchecker.Create().WithEnv().WithMessagQueue(amqpConn).Start(ctx) cleanup.Add(healthShutdown) diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index ad5ed819..121be270 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -2,12 +2,18 @@ package measurements import ( "context" + "fmt" + "log" "time" "github.com/google/uuid" + "github.com/samber/lo" + "sensorbucket.nl/sensorbucket/internal/cleanupper" "sensorbucket.nl/sensorbucket/internal/pagination" "sensorbucket.nl/sensorbucket/pkg/auth" + "sensorbucket.nl/sensorbucket/pkg/pipeline" + "sensorbucket.nl/sensorbucket/services/core/devices" ) // Store stores measurement data @@ -15,35 +21,75 @@ type Store interface { Query(Filter, pagination.Request) (*pagination.Page[Measurement], error) ListDatastreams(DatastreamFilter, pagination.Request) (*pagination.Page[Datastream], error) GetDatastream(id uuid.UUID, filter DatastreamFilter) (*Datastream, error) -} - -type MeasurementStoreBuilder interface { - Begin() (MeasurementStorer, error) -} -type MeasurementStorer interface { - GetDatastream(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*Datastream, error) - AddMeasurements([]Measurement) error - Finish() error + FindOrCreateDatastream(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*Datastream, error) + StoreMeasurements([]Measurement) error } // Service is the measurement service which stores measurement data. type Service struct { store Store - measurementStore MeasurementStoreBuilder systemArchiveTime int keyClient auth.JWKSClient + measurementBatch chan Measurement } -func New(store Store, measurementStore MeasurementStoreBuilder, systemArchiveTime int, keyClient auth.JWKSClient) *Service { +func New(store Store, systemArchiveTime int, keyClient auth.JWKSClient) *Service { return &Service{ store: store, - measurementStore: measurementStore, systemArchiveTime: systemArchiveTime, keyClient: keyClient, } } -func (s *Service) ProcessPipelineMessage(msg *PipelineMessage) error { +func (s *Service) StartMeasurementBatchStorer(batchSize int, interval time.Duration) cleanupper.Shutdown { + log.Println("Measurement service batch storer started") + defer log.Println("Measurement service batch storer stopped!") + stop := make(chan struct{}) + done := make(chan struct{}) + s.measurementBatch = make(chan Measurement, batchSize) + measurements := make([]Measurement, 0, batchSize) + t := time.NewTicker(interval) + + commit := func() { + if len(measurements) > 0 { + log.Printf("Committing %d measurements\n", len(measurements)) + } + err := s.store.StoreMeasurements(measurements) + if err != nil { + log.Printf("Error storing measurements: %s\n", err.Error()) + } + measurements = measurements[:0] + } + + go func() { + outer: + for { + select { + case <-stop: + commit() + break outer + case m := <-s.measurementBatch: + measurements = append(measurements, m) + if len(measurements) == batchSize { + commit() + } + case <-t.C: + commit() + } + } + close(done) + }() + + return func(ctx context.Context) error { + close(stop) + <-done + return nil + } +} + +func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error { + msg := PipelineMessage(pmsg) + // Only error when internal error and not a business error _, err := msg.Authorize(s.keyClient) if err != nil { @@ -53,23 +99,71 @@ func (s *Service) ProcessPipelineMessage(msg *PipelineMessage) error { return err } - storer, err := s.measurementStore.Begin() - if err != nil { - return err + dev := (*devices.Device)(msg.Device) + baseMeasurement := Measurement{ + UplinkMessageID: msg.TracingID, + OrganisationID: int(msg.TenantID), + DeviceID: msg.Device.ID, + DeviceCode: msg.Device.Code, + DeviceDescription: msg.Device.Description, + DeviceLatitude: msg.Device.Latitude, + DeviceLongitude: msg.Device.Longitude, + DeviceAltitude: msg.Device.Altitude, + DeviceLocationDescription: msg.Device.LocationDescription, + DeviceProperties: msg.Device.Properties, + DeviceState: msg.Device.State, + MeasurementLatitude: msg.Device.Latitude, + MeasurementLongitude: msg.Device.Longitude, + MeasurementAltitude: msg.Device.Altitude, + CreatedAt: time.Now(), } - measurements, err := buildMeasurements(msg, storer, s.systemArchiveTime) - if err != nil { - return err + measurements := make([]Measurement, len(msg.Measurements)) + for ix, m := range msg.Measurements { + + sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID) + if err != nil { + return fmt.Errorf("cannot get sensor: %w", err) + } + if sensor.ExternalID != m.SensorExternalID { + m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty + } + + archiveTimeDays, _ := lo.Coalesce(sensor.ArchiveTime, &s.systemArchiveTime) // msg.Organisation.ArchiveTime) + + ds, err := s.store.FindOrCreateDatastream(msg.TenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement) + if err != nil { + return err + } + + measurement := baseMeasurement + measurement.SensorID = sensor.ID + measurement.SensorCode = sensor.Code + measurement.SensorDescription = sensor.Description + measurement.SensorExternalID = sensor.ExternalID + measurement.SensorProperties = sensor.Properties + measurement.SensorBrand = sensor.Brand + measurement.SensorArchiveTime = sensor.ArchiveTime + measurement.SensorIsFallback = sensor.IsFallback + measurement.DatastreamID = ds.ID + measurement.DatastreamDescription = ds.Description + measurement.DatastreamObservedProperty = ds.ObservedProperty + measurement.DatastreamUnitOfMeasurement = ds.UnitOfMeasurement + measurement.MeasurementTimestamp = time.UnixMilli(m.Timestamp) + measurement.MeasurementValue = m.Value + measurement.MeasurementProperties = m.Properties + measurement.MeasurementExpiration = time.UnixMilli(msg.ReceivedAt).Add(time.Duration(*archiveTimeDays) * 24 * time.Hour) + + // Measurement location is either explicitly set or falls back to device location + if m.Latitude != nil && m.Longitude != nil { + measurement.MeasurementLatitude = m.Latitude + measurement.MeasurementLongitude = m.Longitude + measurement.MeasurementAltitude = m.Altitude + } + + measurements[ix] = measurement } - if err := storer.AddMeasurements(measurements); err != nil { - return err - } - - if err := storer.Finish(); err != nil { - return err - } return nil } diff --git a/services/core/measurements/infra/store_psql.go b/services/core/measurements/infra/store_psql.go index e5e025db..ace0a181 100644 --- a/services/core/measurements/infra/store_psql.go +++ b/services/core/measurements/infra/store_psql.go @@ -1,7 +1,6 @@ package measurementsinfra import ( - "context" "database/sql" "errors" "fmt" @@ -354,23 +353,23 @@ func (s *MeasurementStorePSQL) GetDatastream(id uuid.UUID, filter measurements.D return &ds, nil } -func (s *MeasurementStorePSQL) BeginMeasurementTransaction() (*MeasurementPSQLStorer, error) { - tx, err := s.db.BeginTxx(context.Background(), nil) +func (s *MeasurementStorePSQL) FindOrCreateDatastream(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) { + var ds measurements.Datastream + err := s.db.QueryRowx( + `SELECT + id, description, sensor_id, observed_property, unit_of_measurement, created_at, tenant_id + FROM find_or_create_datastream($1, $2, $3, $4)`, + tenantID, sensorID, observedProperty, UnitOfMeasurement, + ).Scan( + &ds.ID, &ds.Description, &ds.SensorID, &ds.ObservedProperty, &ds.UnitOfMeasurement, &ds.CreatedAt, &ds.TenantID, + ) if err != nil { - return nil, err + return nil, fmt.Errorf("could not query datastream: %w", err) } - return &MeasurementPSQLStorer{tx, nil}, nil -} - -type MeasurementPSQLStorer struct { - tx *sqlx.Tx - error error -} - -func (measurementStorer *MeasurementPSQLStorer) GetDatastream(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { + return &ds, nil } -func (measurementStorer *MeasurementPSQLStorer) AddMeasurements(measurements []measurements.Measurement) error { +func (s *MeasurementStorePSQL) StoreMeasurements(measurements []measurements.Measurement) error { q := pq.Insert("measurements").Columns( "uplink_message_id", "organisation_id", @@ -450,10 +449,3 @@ func (measurementStorer *MeasurementPSQLStorer) AddMeasurements(measurements []m return nil } - -func (measurementStorer *MeasurementPSQLStorer) Finish() error { - if measurementStorer.error != nil { - return measurementStorer.tx.Rollback() - } - return measurementStorer.tx.Commit() -} diff --git a/services/core/measurements/pipeline_message.go b/services/core/measurements/pipeline_message.go index a75d0802..11a13a71 100644 --- a/services/core/measurements/pipeline_message.go +++ b/services/core/measurements/pipeline_message.go @@ -2,13 +2,9 @@ package measurements import ( "context" - "fmt" - "time" - "github.com/samber/lo" "sensorbucket.nl/sensorbucket/pkg/auth" "sensorbucket.nl/sensorbucket/pkg/pipeline" - "sensorbucket.nl/sensorbucket/services/core/devices" ) type PipelineMessage pipeline.Message @@ -34,73 +30,3 @@ func (msg *PipelineMessage) Validate() error { } return nil } - -func buildMeasurements(msg *PipelineMessage, storer MeasurementStorer, archiveTime int) ([]Measurement, error) { - dev := (*devices.Device)(msg.Device) - - baseMeasurement := Measurement{ - UplinkMessageID: msg.TracingID, - OrganisationID: int(msg.TenantID), - DeviceID: msg.Device.ID, - DeviceCode: msg.Device.Code, - DeviceDescription: msg.Device.Description, - DeviceLatitude: msg.Device.Latitude, - DeviceLongitude: msg.Device.Longitude, - DeviceAltitude: msg.Device.Altitude, - DeviceLocationDescription: msg.Device.LocationDescription, - DeviceProperties: msg.Device.Properties, - DeviceState: msg.Device.State, - MeasurementLatitude: msg.Device.Latitude, - MeasurementLongitude: msg.Device.Longitude, - MeasurementAltitude: msg.Device.Altitude, - CreatedAt: time.Now(), - } - - measurements := make([]Measurement, len(msg.Measurements)) - for ix, m := range msg.Measurements { - - sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID) - if err != nil { - return nil, fmt.Errorf("cannot get sensor: %w", err) - } - if sensor.ExternalID != m.SensorExternalID { - m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty - } - - archiveTimeDays, _ := lo.Coalesce(sensor.ArchiveTime, &archiveTime) // msg.Organisation.ArchiveTime) - - ds, err := storer.GetDatastream(msg.TenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement) - if err != nil { - return nil, err - } - - measurement := baseMeasurement - measurement.SensorID = sensor.ID - measurement.SensorCode = sensor.Code - measurement.SensorDescription = sensor.Description - measurement.SensorExternalID = sensor.ExternalID - measurement.SensorProperties = sensor.Properties - measurement.SensorBrand = sensor.Brand - measurement.SensorArchiveTime = sensor.ArchiveTime - measurement.SensorIsFallback = sensor.IsFallback - measurement.DatastreamID = ds.ID - measurement.DatastreamDescription = ds.Description - measurement.DatastreamObservedProperty = ds.ObservedProperty - measurement.DatastreamUnitOfMeasurement = ds.UnitOfMeasurement - measurement.MeasurementTimestamp = time.UnixMilli(m.Timestamp) - measurement.MeasurementValue = m.Value - measurement.MeasurementProperties = m.Properties - measurement.MeasurementExpiration = time.UnixMilli(msg.ReceivedAt).Add(time.Duration(*archiveTimeDays) * 24 * time.Hour) - - // Measurement location is either explicitly set or falls back to device location - if m.Latitude != nil && m.Longitude != nil { - measurement.MeasurementLatitude = m.Latitude - measurement.MeasurementLongitude = m.Longitude - measurement.MeasurementAltitude = m.Altitude - } - - measurements[ix] = measurement - } - - return measurements, nil -} diff --git a/services/core/measurements/transport.go b/services/core/measurements/transport.go new file mode 100644 index 00000000..c0aee224 --- /dev/null +++ b/services/core/measurements/transport.go @@ -0,0 +1,23 @@ +package measurements + +import ( + "encoding/json" + "fmt" + + "github.com/rabbitmq/amqp091-go" + + "sensorbucket.nl/sensorbucket/pkg/mq" + "sensorbucket.nl/sensorbucket/pkg/pipeline" +) + +func MQMessageProcessor(svc *Service) mq.ProcessorFuncBuilder { + return func() mq.ProcessorFunc { + var msg pipeline.Message + return func(delivery amqp091.Delivery) error { + if err := json.Unmarshal(delivery.Body, &msg); err != nil { + return fmt.Errorf("%w: could not unmarshal delivery body as Pipeline Message: %w", mq.ErrMalformed, err) + } + return svc.ProcessPipelineMessage(msg) + } + } +} diff --git a/services/core/measurements/transport/mq.go b/services/core/measurements/transport/mq.go deleted file mode 100644 index 2f34610e..00000000 --- a/services/core/measurements/transport/mq.go +++ /dev/null @@ -1,84 +0,0 @@ -package measurementtransport - -import ( - "context" - "encoding/json" - "fmt" - "log" - "time" - - "github.com/rabbitmq/amqp091-go" - - "sensorbucket.nl/sensorbucket/internal/cleanupper" - "sensorbucket.nl/sensorbucket/pkg/mq" - "sensorbucket.nl/sensorbucket/pkg/pipeline" - "sensorbucket.nl/sensorbucket/services/core/measurements" -) - -func StartMQ( - svc *measurements.Service, - conn *mq.AMQPConnection, - pipelineMessagesExchange, - measurementQueue, - measurementStorageTopic, - measurementErrorTopic string, - prefetch int, -) cleanupper.Shutdown { - done := make(chan struct{}) - consume := mq.Consume(conn, measurementQueue, mq.WithDefaults(), mq.WithTopicBinding()) - publish := mq.Publisher(conn, pipelineMessagesExchange, mq.WithDefaults(), mq.WithExchange()) - - go func() { - log.Println("Measurement MQ Transport running...") - for { - select { - case msg := <-consume: - var pmsg pipeline.Message - if err := json.Unmarshal(msg.Body, &pmsg); err != nil { - if nerr := msg.Nack(false, false); nerr != nil { - err = fmt.Errorf("error nacking message: %w, while handling another error: %w", nerr, err) - } - log.Printf("Error unmarshalling amqp message body to pipeline.Message: %v", err) - continue - } - - if err := svc.StorePipelineMessage(pmsg); err != nil { - if nerr := msg.Nack(false, false); nerr != nil { - err = fmt.Errorf("error nacking message: %w, while handling another error: %w", nerr, err) - } - log.Printf("Error storing pipeline message: %v\n", err) - // Create error - msgError := pipeline.PipelineError{ - ReceivedByWorker: pmsg, - Error: err.Error(), - Timestamp: time.Now().UnixMilli(), - Worker: "core-measurements", - } - msgErrorBytes, err := json.Marshal(msgError) - if err != nil { - log.Printf("error marshalling pipeline ErrorMessage into json: %v\n", err) - continue - } - publish <- mq.PublishMessage{ - Topic: measurementErrorTopic, - Publishing: amqp091.Publishing{ - Body: msgErrorBytes, - }, - } - - continue - } - if err := msg.Ack(false); err != nil { - log.Printf("Error Acking message: %s\n", err.Error()) - } - case <-done: - return - } - } - }() - - return func(ctx context.Context) error { - close(done) - return nil - } -} diff --git a/services/core/migrations/20250103112842_add_datastream_get_or_create.down.sql b/services/core/migrations/20250103112842_add_datastream_get_or_create.down.sql new file mode 100644 index 00000000..c5175366 --- /dev/null +++ b/services/core/migrations/20250103112842_add_datastream_get_or_create.down.sql @@ -0,0 +1 @@ +DROP FUNCTION find_or_create_datastream; diff --git a/services/core/migrations/20250103112842_add_datastream_get_or_create.up.sql b/services/core/migrations/20250103112842_add_datastream_get_or_create.up.sql new file mode 100644 index 00000000..ac666ca4 --- /dev/null +++ b/services/core/migrations/20250103112842_add_datastream_get_or_create.up.sql @@ -0,0 +1,43 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +CREATE OR REPLACE FUNCTION find_or_create_datastream( + arg_tenant_id datastreams.tenant_id%TYPE, + arg_sensor_id datastreams.sensor_id%TYPE, + arg_observed_property datastreams.observed_property%TYPE, + arg_unit_of_measurement datastreams.unit_of_measurement%TYPE +) +RETURNS SETOF datastreams AS $$ +DECLARE + return_datastreams datastreams%ROWTYPE; +BEGIN + SELECT + id, description, sensor_id, observed_property, unit_of_measurement, created_at, tenant_id + INTO return_datastreams FROM datastreams WHERE + tenant_id = arg_tenant_id + AND observed_property = arg_observed_property + AND sensor_id = arg_sensor_id + AND unit_of_measurement = arg_unit_of_measurement; + IF FOUND THEN + RETURN NEXT return_datastreams; + ELSE + BEGIN + RETURN QUERY INSERT INTO datastreams ( + id, tenant_id, sensor_id, observed_property, unit_of_measurement + ) VALUES ( + uuid_generate_v4(), arg_tenant_id, arg_sensor_id, arg_observed_property, arg_unit_of_measurement + ) RETURNING + id, description, sensor_id, observed_property, + unit_of_measurement, created_at, tenant_id; + EXCEPTION WHEN unique_violation THEN + RETURN QUERY SELECT + id, description, sensor_id, observed_property, unit_of_measurement, + created_at, tenant_id + FROM datastreams WHERE + tenant_id = tenant_id + AND observed_property = observed_property + AND sensor_id = sensor_id + AND unit_of_measurement = unit_of_measurement; + END; + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/services/core/processing/transport.go b/services/core/processing/transport.go new file mode 100644 index 00000000..4334f7f9 --- /dev/null +++ b/services/core/processing/transport.go @@ -0,0 +1,22 @@ +package processing + +import ( + "encoding/json" + "fmt" + + "github.com/rabbitmq/amqp091-go" + + "sensorbucket.nl/sensorbucket/pkg/mq" +) + +func MQIngressDTOProcessor(svc *Service) mq.ProcessorFuncBuilder { + return func() mq.ProcessorFunc { + var dto IngressDTO + return func(delivery amqp091.Delivery) error { + if err := json.Unmarshal(delivery.Body, &dto); err != nil { + return fmt.Errorf("%w: could not unmarshal delivery body as IngressDTO: %w", mq.ErrMalformed, err) + } + return svc.ProcessIngressDTO(dto) + } + } +} diff --git a/services/fission-rmq-connector/main.go b/services/fission-rmq-connector/main.go index 3dc08138..42015692 100644 --- a/services/fission-rmq-connector/main.go +++ b/services/fission-rmq-connector/main.go @@ -194,7 +194,7 @@ func doHTTPRequest(body []byte, endpoint string, retries int) (*http.Response, e return nil, fmt.Errorf("error creating invocation for function: %s, error: %w", endpoint, err) } req.Header.Set("X-AMQP-Topic", AMQP_TOPIC) - res, err := http.DefaultClient.Do(req) + res, err = http.DefaultClient.Do(req) if err != nil { log.Printf("Invocation for %s failed with: %v\n", endpoint, err) continue diff --git a/services/fission-user-workers/service/dockerworker/main.py b/services/fission-user-workers/service/dockerworker/main.py index 06313cc0..a8698775 100644 --- a/services/fission-user-workers/service/dockerworker/main.py +++ b/services/fission-user-workers/service/dockerworker/main.py @@ -82,7 +82,7 @@ def on_message(channel, method_frame, header_frame, body): parameters = pika.URLParameters(amqp_url) conn = pika.BlockingConnection(parameters=parameters) chan = conn.channel() - chan.queue_declare("worker_" + worker_id, False, False, False, True, {}) + chan.queue_declare("worker_" + worker_id, False, True, False, False, {"x-queue-type": "quorum"}) chan.queue_bind("worker_" + worker_id, amqp_xchg, worker_id) chan.basic_consume("worker_" + worker_id, on_message) print("Worker ready. Starting consuming....") diff --git a/services/tracing/ingress-archiver/service/transport.go b/services/tracing/ingress-archiver/service/transport.go index 02873d24..f6b23e96 100644 --- a/services/tracing/ingress-archiver/service/transport.go +++ b/services/tracing/ingress-archiver/service/transport.go @@ -14,43 +14,18 @@ import ( "sensorbucket.nl/sensorbucket/pkg/mq" ) -func StartIngressDTOConsumer(conn *mq.AMQPConnection, svc *Application, queue, xchg, topic string, prefetch int) { - consume := conn.Consume(queue, func(c *amqp091.Channel) error { - if err := c.Qos(prefetch, 0, false); err != nil { - return fmt.Errorf("error setting Qos with prefetch on amqp: %w", err) - } - _, err := c.QueueDeclare(queue, true, false, false, false, nil) - if err != nil { - return err - } - // Create exchange and bind if both arguments are provided, this is optional - if xchg != "" && topic != "" { - if err := c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil); err != nil { - return err +func MQIngressProcessor(svc *Application) mq.ProcessorFuncBuilder { + return func() mq.ProcessorFunc { + return func(delivery amqp091.Delivery) error { + tracingID, err := uuid.Parse(delivery.MessageId) + if err != nil { + fmt.Printf("Delivery TracingID is not a UUID (%v)\n", err.Error()) + tracingID = uuid.UUID{} } - if err := c.QueueBind(queue, topic, xchg, false, nil); err != nil { - return err - } - } - return nil - }) - - for delivery := range consume { - tracingID, err := uuid.Parse(delivery.MessageId) - if err != nil { - fmt.Printf("Delivery TracingID is not a UUID (%v)\n", err.Error()) - tracingID = uuid.UUID{} - } - rawMessage := delivery.Body - if err := svc.ArchiveIngressDTO(tracingID, rawMessage); err != nil { - fmt.Printf("Error processing ingress DTO: %v\n", err) - if err := delivery.Nack(false, false); err != nil { - fmt.Printf("Error Nacking message: %s\n", err.Error()) + if err := svc.ArchiveIngressDTO(tracingID, delivery.Body); err != nil { + return fmt.Errorf("processing ingress DTO: %w", err) } - continue - } - if err := delivery.Ack(false); err != nil { - fmt.Printf("Error Acking message: %s\n", err.Error()) + return nil } } } diff --git a/services/tracing/main.go b/services/tracing/main.go index a5be2b41..30143b1b 100644 --- a/services/tracing/main.go +++ b/services/tracing/main.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "os/signal" - "strconv" "time" "github.com/go-chi/chi/v5" @@ -63,11 +62,6 @@ func Run(cleanup cleanupper.Cleanupper) error { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - prefetch, err := strconv.Atoi(AMQP_PREFETCH) - if err != nil { - return err - } - stopProfiler, err := web.RunProfiler() if err != nil { log.Printf("could not setup profiler server: %s\n", err) @@ -97,10 +91,10 @@ func Run(cleanup cleanupper.Cleanupper) error { { store := ingressarchiver.NewStorePSQL(db) svc := ingressarchiver.New(store) - go ingressarchiver.StartIngressDTOConsumer( - mqConn, svc, + go mq.StartQueueProcessor( + mqConn, AMQP_QUEUE_INGRESS, AMQP_XCHG_INGRESS, AMQP_XCHG_INGRESS_TOPIC, - prefetch, + ingressarchiver.MQIngressProcessor(svc), ) ingressarchiver.CreateHTTPTransport(r, svc) } @@ -109,13 +103,12 @@ func Run(cleanup cleanupper.Cleanupper) error { { tracingStepStore := tracinginfra.NewStorePSQL(db) tracingService := tracing.New(tracingStepStore) - go tracingtransport.StartMQ( - tracingService, + go mq.StartQueueProcessor( mqConn, AMQP_QUEUE_PIPELINEMESSAGES, AMQP_XCHG_PIPELINEMESSAGES, AMQP_XCHG_PIPELINEMESSAGES_TOPIC, - prefetch, + tracingtransport.MQMessageProcessor(tracingService), ) tracinghttp := tracingtransport.NewHTTP(tracingService, HTTP_BASE) tracinghttp.SetupRoutes(r) diff --git a/services/tracing/tracing/transport/mq.go b/services/tracing/tracing/transport/mq.go index d57ddc26..8fb1bfa8 100644 --- a/services/tracing/tracing/transport/mq.go +++ b/services/tracing/tracing/transport/mq.go @@ -2,7 +2,7 @@ package tracingtransport import ( "encoding/json" - "log" + "fmt" "time" "github.com/rabbitmq/amqp091-go" @@ -12,78 +12,42 @@ import ( "sensorbucket.nl/sensorbucket/services/tracing/tracing" ) -func StartMQ(svc *tracing.Service, conn *mq.AMQPConnection, queue, xchg, topic string, prefetch int) { - pipelineMessages := mq.Consume(conn, queue, setupFunc(prefetch, queue, xchg, topic)) - - log.Println("Measurement MQ Transport running") - go processMessage(pipelineMessages, svc) -} - -func processMessage(deliveries <-chan amqp091.Delivery, svc *tracing.Service) { - log.Println("Measurement MQ Transport running, tracing pipeline errors...") - for msg := range deliveries { - tsHeader, ok := msg.Headers["timestamp"] - if !ok { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) +func MQMessageProcessor(svc *tracing.Service) mq.ProcessorFuncBuilder { + return func() mq.ProcessorFunc { + return func(delivery amqp091.Delivery) error { + tsHeader, ok := delivery.Headers["timestamp"] + if !ok { + return fmt.Errorf("%w: message missing timestamp HEADER", mq.ErrMalformed) } - log.Printf("Error: Message missing timestamp HEADER\n") - continue - } - tsMilli, ok := tsHeader.(int64) - if !ok { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + tsMilli, ok := tsHeader.(int64) + if !ok { + return fmt.Errorf("%w: message timestamp header is invalid type: %T", mq.ErrMalformed, tsHeader) } - log.Printf("Error: Message timestamp header is invalid type: %T\n", tsHeader) - continue - } - ts := time.UnixMilli(tsMilli) - if ts.IsZero() { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + ts := time.UnixMilli(tsMilli) + if ts.IsZero() { + return fmt.Errorf("%w: delivery timestamp cannot be empty", mq.ErrMalformed) } - log.Printf("Error: msg timestamp cannot be empty\n") - continue - } - if msg.RoutingKey == "errors" { - var res pipeline.PipelineError - if err := json.Unmarshal(msg.Body, &res); err != nil { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + if delivery.RoutingKey == "errors" { + var res pipeline.PipelineError + if err := json.Unmarshal(delivery.Body, &res); err != nil { + return fmt.Errorf("%w: unmarshalling amqp message body to pipeline.Message: %w", mq.ErrMalformed, err) } - log.Printf("Error unmarshalling amqp message body to pipeline.Message: %v", err) - continue - } - if err := svc.HandlePipelineError(res, ts); err != nil { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + if err := svc.HandlePipelineError(res, ts); err != nil { + return fmt.Errorf("handling pipeline message: %w", err) } - log.Printf("Error handling pipeline message: %v\n", err) - continue - } - } else { - var res pipeline.Message - if err := json.Unmarshal(msg.Body, &res); err != nil { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + } else { + var res pipeline.Message + if err := json.Unmarshal(delivery.Body, &res); err != nil { + return fmt.Errorf("%w: unmarshalling amqp message body to pipeline.Message: %w", mq.ErrMalformed, err) } - log.Printf("Error unmarshalling amqp message body to pipeline.Message: %v", err) - continue - } - if err := svc.HandlePipelineMessage(res, ts); err != nil { - if err := msg.Nack(false, false); err != nil { - log.Printf("Error: failed to NACK message: %s\n", err.Error()) + if err := svc.HandlePipelineMessage(res, ts); err != nil { + return fmt.Errorf("handling pipeline message: %w", err) } - log.Printf("Error handling pipeline message: %v\n", err) - continue } - } - if err := msg.Ack(false); err != nil { - log.Printf("Error: failed to ACK message: %s\n", err.Error()) + return nil } } } From 7cf0b09cbbe114070b0b453c92373d973301fb59 Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Fri, 3 Jan 2025 16:32:36 +0100 Subject: [PATCH 4/7] Fixes --- services/core/main.go | 4 ++-- services/core/measurements/application.go | 10 +++++----- services/core/measurements/infra/store_psql.go | 5 +++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/services/core/main.go b/services/core/main.go index d16e925d..ef419a7d 100644 --- a/services/core/main.go +++ b/services/core/main.go @@ -47,7 +47,7 @@ var ( AUTH_JWKS_URL = env.Could("AUTH_JWKS_URL", "http://oathkeeper:4456/.well-known/jwks.json") SYS_ARCHIVE_TIME = env.Could("SYS_ARCHIVE_TIME", "30") MEASUREMENT_BATCH_SIZE = env.CouldInt("MEASUREMENT_BATCH_SIZE", 1024) - MEASUREMENT_COMMIT_INTERVAL = env.CouldInt("MEASUREMENT_COMMIT_INTERVAL", 1) + MEASUREMENT_COMMIT_INTERVAL = env.CouldInt("MEASUREMENT_COMMIT_INTERVAL", 1000) ) func main() { @@ -97,7 +97,7 @@ func Run(cleanup cleanupper.Cleanupper) error { } measurementstore := measurementsinfra.NewPSQL(db) measurementservice := measurements.New(measurementstore, sysArchiveTime, keyClient) - cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Second)) + cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Millisecond)) processingstore := processinginfra.NewPSQLStore(db) processingPipelinePublisher := processinginfra.NewPipelineMessagePublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES) diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index 121be270..ee339003 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -51,9 +51,10 @@ func (s *Service) StartMeasurementBatchStorer(batchSize int, interval time.Durat t := time.NewTicker(interval) commit := func() { - if len(measurements) > 0 { - log.Printf("Committing %d measurements\n", len(measurements)) + if len(measurements) == 0 { + return } + log.Printf("Committing %d measurements\n", len(measurements)) err := s.store.StoreMeasurements(measurements) if err != nil { log.Printf("Error storing measurements: %s\n", err.Error()) @@ -118,8 +119,7 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error { CreatedAt: time.Now(), } - measurements := make([]Measurement, len(msg.Measurements)) - for ix, m := range msg.Measurements { + for _, m := range msg.Measurements { sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID) if err != nil { @@ -161,7 +161,7 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error { measurement.MeasurementAltitude = m.Altitude } - measurements[ix] = measurement + s.measurementBatch <- measurement } return nil diff --git a/services/core/measurements/infra/store_psql.go b/services/core/measurements/infra/store_psql.go index ace0a181..248755bb 100644 --- a/services/core/measurements/infra/store_psql.go +++ b/services/core/measurements/infra/store_psql.go @@ -447,5 +447,10 @@ func (s *MeasurementStorePSQL) StoreMeasurements(measurements []measurements.Mea ) } + _, err := q.RunWith(s.db).Exec() + if err != nil { + return fmt.Errorf("could not insert measurements: %w", err) + } + return nil } From d1a076273328b67be9bb70c8d3cf989e693ca1f6 Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Tue, 7 Jan 2025 15:32:55 +0100 Subject: [PATCH 5/7] fix: tests --- services/core/main.go | 4 +- services/core/measurements/application.go | 70 ++-- .../core/measurements/application_test.go | 104 +++--- services/core/measurements/measurements.go | 2 - services/core/measurements/mock_test.go | 300 ++++++------------ services/core/transport/devices_test.go | 2 +- 6 files changed, 172 insertions(+), 310 deletions(-) diff --git a/services/core/main.go b/services/core/main.go index ef419a7d..7bce7049 100644 --- a/services/core/main.go +++ b/services/core/main.go @@ -96,8 +96,8 @@ func Run(cleanup cleanupper.Cleanupper) error { return fmt.Errorf("could not convert SYS_ARCHIVE_TIME to integer: %w", err) } measurementstore := measurementsinfra.NewPSQL(db) - measurementservice := measurements.New(measurementstore, sysArchiveTime, keyClient) - cleanup.Add(measurementservice.StartMeasurementBatchStorer(MEASUREMENT_BATCH_SIZE, time.Duration(MEASUREMENT_COMMIT_INTERVAL)*time.Millisecond)) + measurementservice := measurements.New(measurementstore, MEASUREMENT_BATCH_SIZE, sysArchiveTime, keyClient) + cleanup.Add(measurementservice.StartMeasurementBatchStorer(time.Duration(MEASUREMENT_COMMIT_INTERVAL) * time.Millisecond)) processingstore := processinginfra.NewPSQLStore(db) processingPipelinePublisher := processinginfra.NewPipelineMessagePublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES) diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index ee339003..f7db00cb 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -1,5 +1,7 @@ package measurements +//go:generate moq -pkg measurements_test -out mock_test.go . Store + import ( "context" "fmt" @@ -27,55 +29,44 @@ type Store interface { // Service is the measurement service which stores measurement data. type Service struct { - store Store - systemArchiveTime int - keyClient auth.JWKSClient - measurementBatch chan Measurement + store Store + systemArchiveTime int + keyClient auth.JWKSClient + measurementBatchChan chan Measurement + measurementBatch []Measurement } -func New(store Store, systemArchiveTime int, keyClient auth.JWKSClient) *Service { +func New(store Store, systemArchiveTime, batchSize int, keyClient auth.JWKSClient) *Service { return &Service{ - store: store, - systemArchiveTime: systemArchiveTime, - keyClient: keyClient, + store: store, + systemArchiveTime: systemArchiveTime, + keyClient: keyClient, + measurementBatch: make([]Measurement, 0, batchSize), + measurementBatchChan: make(chan Measurement, batchSize), } } -func (s *Service) StartMeasurementBatchStorer(batchSize int, interval time.Duration) cleanupper.Shutdown { +func (s *Service) StartMeasurementBatchStorer(interval time.Duration) cleanupper.Shutdown { log.Println("Measurement service batch storer started") defer log.Println("Measurement service batch storer stopped!") stop := make(chan struct{}) done := make(chan struct{}) - s.measurementBatch = make(chan Measurement, batchSize) - measurements := make([]Measurement, 0, batchSize) t := time.NewTicker(interval) - commit := func() { - if len(measurements) == 0 { - return - } - log.Printf("Committing %d measurements\n", len(measurements)) - err := s.store.StoreMeasurements(measurements) - if err != nil { - log.Printf("Error storing measurements: %s\n", err.Error()) - } - measurements = measurements[:0] - } - go func() { outer: for { select { case <-stop: - commit() + s.CommitBatch(false) break outer - case m := <-s.measurementBatch: - measurements = append(measurements, m) - if len(measurements) == batchSize { - commit() + case m := <-s.measurementBatchChan: + s.measurementBatch = append(s.measurementBatch, m) + if len(s.measurementBatch) == cap(s.measurementBatch) { + s.CommitBatch(false) } case <-t.C: - commit() + s.CommitBatch(false) } } close(done) @@ -88,6 +79,25 @@ func (s *Service) StartMeasurementBatchStorer(batchSize int, interval time.Durat } } +func (s *Service) CommitBatch(collect bool) error { + if len(s.measurementBatch) == 0 { + if !collect || len(s.measurementBatchChan) == 0 { + return nil + } + count := len(s.measurementBatchChan) + for i := 0; i < count; i++ { + s.measurementBatch = append(s.measurementBatch, <-s.measurementBatchChan) + } + } + log.Printf("Committing %d measurements\n", len(s.measurementBatch)) + err := s.store.StoreMeasurements(s.measurementBatch) + if err != nil { + return fmt.Errorf("committing measurements failed: %w", err) + } + s.measurementBatch = s.measurementBatch[:0] + return nil +} + func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error { msg := PipelineMessage(pmsg) @@ -161,7 +171,7 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error { measurement.MeasurementAltitude = m.Altitude } - s.measurementBatch <- measurement + s.measurementBatchChan <- measurement } return nil diff --git a/services/core/measurements/application_test.go b/services/core/measurements/application_test.go index 6622ac06..cf18b9f7 100644 --- a/services/core/measurements/application_test.go +++ b/services/core/measurements/application_test.go @@ -20,8 +20,8 @@ func ptr[T any](v T) *T { return &v } -func newPipelineMessage(plID string, steps []string) *pipeline.Message { - return &pipeline.Message{ +func newPipelineMessage(plID string, steps []string) pipeline.Message { + return pipeline.Message{ TracingID: uuid.NewString(), ReceivedAt: time.Now().UnixMilli(), Timestamp: time.Now().UnixMilli(), @@ -129,17 +129,16 @@ func TestShouldErrorIfNoDeviceOrNoSensor(t *testing.T) { err := msg.NewMeasurement().SetValue(5, tC.observationProperty, "1").SetSensor(tC.sensorExternalID).Add() msg.AccessToken = authtest.CreateToken() require.NoError(t, err) - store := &StoreMock{} - measurementStorer := &MeasurementStorerMock{} - measurementStoreBuilder := &MeasurementStoreBuilderMock{ - BeginFunc: func() (measurements.MeasurementStorer, error) { - return measurementStorer, nil + + store := &StoreMock{ + FindOrCreateDatastreamFunc: func(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) { + return &measurements.Datastream{}, nil }, } - svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) + svc := measurements.New(store, 0, 1, authtest.JWKS()) // Act - err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) + err = svc.ProcessPipelineMessage(msg) if tC.err != nil { assert.Error(t, tC.err, err) } else { @@ -155,13 +154,13 @@ func TestShouldCopyOverDefaultFields(t *testing.T) { msg := newPipelineMessage(uuid.NewString(), []string{}) msg.Device = &pipeline.Device{ ID: 1, - Code: "", + Code: "123", Description: "", TenantID: 10, Sensors: []devices.Sensor{ { ID: 1, - Code: "", + Code: "123", Description: "", Brand: "", ArchiveTime: nil, @@ -186,33 +185,23 @@ func TestShouldCopyOverDefaultFields(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{} - measurementStorer := &MeasurementStorerMock{ - GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { + store := &StoreMock{ + FindOrCreateDatastreamFunc: func(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { - return nil - }, + StoreMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - measurementStoreBuilder := &MeasurementStoreBuilderMock{ - BeginFunc: func() (measurements.MeasurementStorer, error) { - return measurementStorer, nil - }, - } - svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) + svc := measurements.New(store, 0, 1, authtest.JWKS()) // Act - err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) + err = svc.ProcessPipelineMessage(msg) require.NoError(t, err) + svc.CommitBatch(true) // Assert - assert.Len(t, measurementStoreBuilder.calls.Begin, 1, "SQL Insert should've been called") - assert.Greater(t, measurementStorer.calls.GetDatastream, 0, "SQL Insert should've been called") - require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") - require.Len(t, measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam, 1, "SQL Insert should've been called") - assert.Len(t, measurementStorer.calls.Finish, 1, "SQL Insert should've been called") - measurement := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam[0] + require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") + require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements") + measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0] assert.Equal(t, msg.TracingID, measurement.UplinkMessageID) // assert.Equal(t, OrganisationName, measurement.OrganisationName) // assert.Equal(t, OrganisationAddress, measurement.OrganisationAddress) @@ -333,34 +322,24 @@ func TestShouldChooseMeasurementLocationOverDeviceLocation(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{} - measurementStorer := &MeasurementStorerMock{ - GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { + store := &StoreMock{ + FindOrCreateDatastreamFunc: func(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { - return nil - }, + StoreMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - measurementStoreBuilder := &MeasurementStoreBuilderMock{ - BeginFunc: func() (measurements.MeasurementStorer, error) { - return measurementStorer, nil - }, - } - svc := measurements.New(store, measurementStoreBuilder, 0, authtest.JWKS()) + svc := measurements.New(store, 0, 1, authtest.JWKS()) // Act require.NoError(t, - svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)), + svc.ProcessPipelineMessage(msg), ) + svc.CommitBatch(true) // Assert - assert.Len(t, measurementStoreBuilder.calls.Begin, 1, "SQL Insert should've been called") - assert.Greater(t, measurementStorer.calls.GetDatastream, 0, "SQL Insert should've been called") - require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") - require.Len(t, measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam, 1, "SQL Insert should've been called") - assert.Len(t, measurementStorer.calls.Finish, 1, "SQL Insert should've been called") - measurement := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam[0] + require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") + require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements") + measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0] assert.Equal(t, tC.ExpectedLatitude, measurement.MeasurementLatitude) assert.Equal(t, tC.ExpectedLongitude, measurement.MeasurementLongitude) assert.Equal(t, tC.ExpectedAltitude, measurement.MeasurementAltitude) @@ -424,35 +403,28 @@ func TestShouldSetExpirationDate(t *testing.T) { ObservedProperty: msg.Measurements[0].ObservedProperty, UnitOfMeasurement: msg.Measurements[0].UnitOfMeasurement, } - store := &StoreMock{} - measurementStorer := &MeasurementStorerMock{ - GetDatastreamFunc: func(tenantID, sensorID int64, observedProperty, unitOfMeasurement string) (*measurements.Datastream, error) { + store := &StoreMock{ + FindOrCreateDatastreamFunc: func(tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) { return &ds, nil }, - AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { - return nil - }, - } - measurementStoreBuilder := &MeasurementStoreBuilderMock{ - BeginFunc: func() (measurements.MeasurementStorer, error) { - return measurementStorer, nil - }, + StoreMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { return nil }, } - svc := measurements.New(store, measurementStoreBuilder, sysArchiveTime, authtest.JWKS()) + svc := measurements.New(store, sysArchiveTime, 1, authtest.JWKS()) // Act - err = svc.ProcessPipelineMessage((*measurements.PipelineMessage)(msg)) + err = svc.ProcessPipelineMessage(msg) require.NoError(t, err) + svc.CommitBatch(true) // Assert - require.Len(t, measurementStorer.calls.AddMeasurements, 1, "SQL Insert should've been called") - call := measurementStorer.calls.AddMeasurements[0].MeasurementsMoqParam - require.Len(t, call, 1, "Should've tried to insert 1 measurement") + require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") + require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements") + measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0] // Check if the difference in seconds is 0, otherwise there might be a subsecond difference // due to parsing assert.Equal(t, float64(0), - math.Abs(float64(tC.expectedArchiveTime.Unix()-call[0].MeasurementExpiration.Unix())), + math.Abs(float64(tC.expectedArchiveTime.Unix()-measurement.MeasurementExpiration.Unix())), "", ) } diff --git a/services/core/measurements/measurements.go b/services/core/measurements/measurements.go index ff0bc3a7..942f78e5 100644 --- a/services/core/measurements/measurements.go +++ b/services/core/measurements/measurements.go @@ -1,7 +1,5 @@ package measurements -//go:generate moq -pkg measurements_test -out mock_test.go . Store MeasurementStoreBuilder MeasurementStorer - import ( "encoding/json" "errors" diff --git a/services/core/measurements/mock_test.go b/services/core/measurements/mock_test.go index cb3c9bd5..afde437e 100644 --- a/services/core/measurements/mock_test.go +++ b/services/core/measurements/mock_test.go @@ -20,6 +20,9 @@ var _ measurements.Store = &StoreMock{} // // // make and configure a mocked measurements.Store // mockedStore := &StoreMock{ +// FindOrCreateDatastreamFunc: func(tenantID int64, sensorID int64, observedProperty string, UnitOfMeasurement string) (*measurements.Datastream, error) { +// panic("mock out the FindOrCreateDatastream method") +// }, // GetDatastreamFunc: func(id uuid.UUID, filter measurements.DatastreamFilter) (*measurements.Datastream, error) { // panic("mock out the GetDatastream method") // }, @@ -29,6 +32,9 @@ var _ measurements.Store = &StoreMock{} // QueryFunc: func(filter measurements.Filter, request pagination.Request) (*pagination.Page[measurements.Measurement], error) { // panic("mock out the Query method") // }, +// StoreMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { +// panic("mock out the StoreMeasurements method") +// }, // } // // // use mockedStore in code that requires measurements.Store @@ -36,6 +42,9 @@ var _ measurements.Store = &StoreMock{} // // } type StoreMock struct { + // FindOrCreateDatastreamFunc mocks the FindOrCreateDatastream method. + FindOrCreateDatastreamFunc func(tenantID int64, sensorID int64, observedProperty string, UnitOfMeasurement string) (*measurements.Datastream, error) + // GetDatastreamFunc mocks the GetDatastream method. GetDatastreamFunc func(id uuid.UUID, filter measurements.DatastreamFilter) (*measurements.Datastream, error) @@ -45,8 +54,22 @@ type StoreMock struct { // QueryFunc mocks the Query method. QueryFunc func(filter measurements.Filter, request pagination.Request) (*pagination.Page[measurements.Measurement], error) + // StoreMeasurementsFunc mocks the StoreMeasurements method. + StoreMeasurementsFunc func(measurementsMoqParam []measurements.Measurement) error + // calls tracks calls to the methods. calls struct { + // FindOrCreateDatastream holds details about calls to the FindOrCreateDatastream method. + FindOrCreateDatastream []struct { + // TenantID is the tenantID argument value. + TenantID int64 + // SensorID is the sensorID argument value. + SensorID int64 + // ObservedProperty is the observedProperty argument value. + ObservedProperty string + // UnitOfMeasurement is the UnitOfMeasurement argument value. + UnitOfMeasurement string + } // GetDatastream holds details about calls to the GetDatastream method. GetDatastream []struct { // ID is the id argument value. @@ -68,10 +91,61 @@ type StoreMock struct { // Request is the request argument value. Request pagination.Request } + // StoreMeasurements holds details about calls to the StoreMeasurements method. + StoreMeasurements []struct { + // MeasurementsMoqParam is the measurementsMoqParam argument value. + MeasurementsMoqParam []measurements.Measurement + } + } + lockFindOrCreateDatastream sync.RWMutex + lockGetDatastream sync.RWMutex + lockListDatastreams sync.RWMutex + lockQuery sync.RWMutex + lockStoreMeasurements sync.RWMutex +} + +// FindOrCreateDatastream calls FindOrCreateDatastreamFunc. +func (mock *StoreMock) FindOrCreateDatastream(tenantID int64, sensorID int64, observedProperty string, UnitOfMeasurement string) (*measurements.Datastream, error) { + if mock.FindOrCreateDatastreamFunc == nil { + panic("StoreMock.FindOrCreateDatastreamFunc: method is nil but Store.FindOrCreateDatastream was just called") + } + callInfo := struct { + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string + }{ + TenantID: tenantID, + SensorID: sensorID, + ObservedProperty: observedProperty, + UnitOfMeasurement: UnitOfMeasurement, + } + mock.lockFindOrCreateDatastream.Lock() + mock.calls.FindOrCreateDatastream = append(mock.calls.FindOrCreateDatastream, callInfo) + mock.lockFindOrCreateDatastream.Unlock() + return mock.FindOrCreateDatastreamFunc(tenantID, sensorID, observedProperty, UnitOfMeasurement) +} + +// FindOrCreateDatastreamCalls gets all the calls that were made to FindOrCreateDatastream. +// Check the length with: +// +// len(mockedStore.FindOrCreateDatastreamCalls()) +func (mock *StoreMock) FindOrCreateDatastreamCalls() []struct { + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string +} { + var calls []struct { + TenantID int64 + SensorID int64 + ObservedProperty string + UnitOfMeasurement string } - lockGetDatastream sync.RWMutex - lockListDatastreams sync.RWMutex - lockQuery sync.RWMutex + mock.lockFindOrCreateDatastream.RLock() + calls = mock.calls.FindOrCreateDatastream + mock.lockFindOrCreateDatastream.RUnlock() + return calls } // GetDatastream calls GetDatastreamFunc. @@ -182,226 +256,34 @@ func (mock *StoreMock) QueryCalls() []struct { return calls } -// Ensure, that MeasurementStoreBuilderMock does implement measurements.MeasurementStoreBuilder. -// If this is not the case, regenerate this file with moq. -var _ measurements.MeasurementStoreBuilder = &MeasurementStoreBuilderMock{} - -// MeasurementStoreBuilderMock is a mock implementation of measurements.MeasurementStoreBuilder. -// -// func TestSomethingThatUsesMeasurementStoreBuilder(t *testing.T) { -// -// // make and configure a mocked measurements.MeasurementStoreBuilder -// mockedMeasurementStoreBuilder := &MeasurementStoreBuilderMock{ -// BeginFunc: func() (measurements.MeasurementStorer, error) { -// panic("mock out the Begin method") -// }, -// } -// -// // use mockedMeasurementStoreBuilder in code that requires measurements.MeasurementStoreBuilder -// // and then make assertions. -// -// } -type MeasurementStoreBuilderMock struct { - // BeginFunc mocks the Begin method. - BeginFunc func() (measurements.MeasurementStorer, error) - - // calls tracks calls to the methods. - calls struct { - // Begin holds details about calls to the Begin method. - Begin []struct { - } - } - lockBegin sync.RWMutex -} - -// Begin calls BeginFunc. -func (mock *MeasurementStoreBuilderMock) Begin() (measurements.MeasurementStorer, error) { - if mock.BeginFunc == nil { - panic("MeasurementStoreBuilderMock.BeginFunc: method is nil but MeasurementStoreBuilder.Begin was just called") - } - callInfo := struct { - }{} - mock.lockBegin.Lock() - mock.calls.Begin = append(mock.calls.Begin, callInfo) - mock.lockBegin.Unlock() - return mock.BeginFunc() -} - -// BeginCalls gets all the calls that were made to Begin. -// Check the length with: -// -// len(mockedMeasurementStoreBuilder.BeginCalls()) -func (mock *MeasurementStoreBuilderMock) BeginCalls() []struct { -} { - var calls []struct { - } - mock.lockBegin.RLock() - calls = mock.calls.Begin - mock.lockBegin.RUnlock() - return calls -} - -// Ensure, that MeasurementStorerMock does implement measurements.MeasurementStorer. -// If this is not the case, regenerate this file with moq. -var _ measurements.MeasurementStorer = &MeasurementStorerMock{} - -// MeasurementStorerMock is a mock implementation of measurements.MeasurementStorer. -// -// func TestSomethingThatUsesMeasurementStorer(t *testing.T) { -// -// // make and configure a mocked measurements.MeasurementStorer -// mockedMeasurementStorer := &MeasurementStorerMock{ -// AddMeasurementsFunc: func(measurementsMoqParam []measurements.Measurement) error { -// panic("mock out the AddMeasurements method") -// }, -// FinishFunc: func() error { -// panic("mock out the Finish method") -// }, -// GetDatastreamFunc: func(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) { -// panic("mock out the GetDatastream method") -// }, -// } -// -// // use mockedMeasurementStorer in code that requires measurements.MeasurementStorer -// // and then make assertions. -// -// } -type MeasurementStorerMock struct { - // AddMeasurementsFunc mocks the AddMeasurements method. - AddMeasurementsFunc func(measurementsMoqParam []measurements.Measurement) error - - // FinishFunc mocks the Finish method. - FinishFunc func() error - - // GetDatastreamFunc mocks the GetDatastream method. - GetDatastreamFunc func(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) - - // calls tracks calls to the methods. - calls struct { - // AddMeasurements holds details about calls to the AddMeasurements method. - AddMeasurements []struct { - // MeasurementsMoqParam is the measurementsMoqParam argument value. - MeasurementsMoqParam []measurements.Measurement - } - // Finish holds details about calls to the Finish method. - Finish []struct { - } - // GetDatastream holds details about calls to the GetDatastream method. - GetDatastream []struct { - // TenantID is the tenantID argument value. - TenantID int64 - // SensorID is the sensorID argument value. - SensorID int64 - // ObservedProperty is the observedProperty argument value. - ObservedProperty string - // UnitOfMeasurement is the unitOfMeasurement argument value. - UnitOfMeasurement string - } - } - lockAddMeasurements sync.RWMutex - lockFinish sync.RWMutex - lockGetDatastream sync.RWMutex -} - -// AddMeasurements calls AddMeasurementsFunc. -func (mock *MeasurementStorerMock) AddMeasurements(measurementsMoqParam []measurements.Measurement) error { - if mock.AddMeasurementsFunc == nil { - panic("MeasurementStorerMock.AddMeasurementsFunc: method is nil but MeasurementStorer.AddMeasurements was just called") +// StoreMeasurements calls StoreMeasurementsFunc. +func (mock *StoreMock) StoreMeasurements(measurementsMoqParam []measurements.Measurement) error { + if mock.StoreMeasurementsFunc == nil { + panic("StoreMock.StoreMeasurementsFunc: method is nil but Store.StoreMeasurements was just called") } callInfo := struct { MeasurementsMoqParam []measurements.Measurement }{ MeasurementsMoqParam: measurementsMoqParam, } - mock.lockAddMeasurements.Lock() - mock.calls.AddMeasurements = append(mock.calls.AddMeasurements, callInfo) - mock.lockAddMeasurements.Unlock() - return mock.AddMeasurementsFunc(measurementsMoqParam) + mock.lockStoreMeasurements.Lock() + mock.calls.StoreMeasurements = append(mock.calls.StoreMeasurements, callInfo) + mock.lockStoreMeasurements.Unlock() + return mock.StoreMeasurementsFunc(measurementsMoqParam) } -// AddMeasurementsCalls gets all the calls that were made to AddMeasurements. +// StoreMeasurementsCalls gets all the calls that were made to StoreMeasurements. // Check the length with: // -// len(mockedMeasurementStorer.AddMeasurementsCalls()) -func (mock *MeasurementStorerMock) AddMeasurementsCalls() []struct { +// len(mockedStore.StoreMeasurementsCalls()) +func (mock *StoreMock) StoreMeasurementsCalls() []struct { MeasurementsMoqParam []measurements.Measurement } { var calls []struct { MeasurementsMoqParam []measurements.Measurement } - mock.lockAddMeasurements.RLock() - calls = mock.calls.AddMeasurements - mock.lockAddMeasurements.RUnlock() - return calls -} - -// Finish calls FinishFunc. -func (mock *MeasurementStorerMock) Finish() error { - if mock.FinishFunc == nil { - panic("MeasurementStorerMock.FinishFunc: method is nil but MeasurementStorer.Finish was just called") - } - callInfo := struct { - }{} - mock.lockFinish.Lock() - mock.calls.Finish = append(mock.calls.Finish, callInfo) - mock.lockFinish.Unlock() - return mock.FinishFunc() -} - -// FinishCalls gets all the calls that were made to Finish. -// Check the length with: -// -// len(mockedMeasurementStorer.FinishCalls()) -func (mock *MeasurementStorerMock) FinishCalls() []struct { -} { - var calls []struct { - } - mock.lockFinish.RLock() - calls = mock.calls.Finish - mock.lockFinish.RUnlock() - return calls -} - -// GetDatastream calls GetDatastreamFunc. -func (mock *MeasurementStorerMock) GetDatastream(tenantID int64, sensorID int64, observedProperty string, unitOfMeasurement string) (*measurements.Datastream, error) { - if mock.GetDatastreamFunc == nil { - panic("MeasurementStorerMock.GetDatastreamFunc: method is nil but MeasurementStorer.GetDatastream was just called") - } - callInfo := struct { - TenantID int64 - SensorID int64 - ObservedProperty string - UnitOfMeasurement string - }{ - TenantID: tenantID, - SensorID: sensorID, - ObservedProperty: observedProperty, - UnitOfMeasurement: unitOfMeasurement, - } - mock.lockGetDatastream.Lock() - mock.calls.GetDatastream = append(mock.calls.GetDatastream, callInfo) - mock.lockGetDatastream.Unlock() - return mock.GetDatastreamFunc(tenantID, sensorID, observedProperty, unitOfMeasurement) -} - -// GetDatastreamCalls gets all the calls that were made to GetDatastream. -// Check the length with: -// -// len(mockedMeasurementStorer.GetDatastreamCalls()) -func (mock *MeasurementStorerMock) GetDatastreamCalls() []struct { - TenantID int64 - SensorID int64 - ObservedProperty string - UnitOfMeasurement string -} { - var calls []struct { - TenantID int64 - SensorID int64 - ObservedProperty string - UnitOfMeasurement string - } - mock.lockGetDatastream.RLock() - calls = mock.calls.GetDatastream - mock.lockGetDatastream.RUnlock() + mock.lockStoreMeasurements.RLock() + calls = mock.calls.StoreMeasurements + mock.lockStoreMeasurements.RUnlock() return calls } diff --git a/services/core/transport/devices_test.go b/services/core/transport/devices_test.go index ae869c87..6aba63f8 100644 --- a/services/core/transport/devices_test.go +++ b/services/core/transport/devices_test.go @@ -109,7 +109,7 @@ func (s *IntegrationTestSuite) SetupSuite() { // Create measurements service measurementStore := measurementsinfra.NewPSQL(db) - s.measurements = measurements.New(measurementStore, 10, authtest.JWKS()) + s.measurements = measurements.New(measurementStore, 10, 1, authtest.JWKS()) // Create processing service processingStore := processinginfra.NewPSQLStore(db) From 5d5c4dde9be2bf1c98136c46e53d8924fbb5aec3 Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Tue, 7 Jan 2025 15:41:18 +0100 Subject: [PATCH 6/7] style: Fix golangci lint issues --- services/core/measurements/application.go | 12 +++++++++--- .../core/measurements/application_test.go | 6 +++--- services/core/measurements/datastreams.go | 19 ------------------- 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index f7db00cb..8b80df42 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -58,15 +58,21 @@ func (s *Service) StartMeasurementBatchStorer(interval time.Duration) cleanupper for { select { case <-stop: - s.CommitBatch(false) + if err := s.CommitBatch(false); err != nil { + log.Printf("error committing batch: %s\n", err.Error()) + } break outer case m := <-s.measurementBatchChan: s.measurementBatch = append(s.measurementBatch, m) if len(s.measurementBatch) == cap(s.measurementBatch) { - s.CommitBatch(false) + if err := s.CommitBatch(false); err != nil { + log.Printf("error committing batch: %s\n", err.Error()) + } } case <-t.C: - s.CommitBatch(false) + if err := s.CommitBatch(false); err != nil { + log.Printf("error committing batch: %s\n", err.Error()) + } } } close(done) diff --git a/services/core/measurements/application_test.go b/services/core/measurements/application_test.go index cf18b9f7..549cf63b 100644 --- a/services/core/measurements/application_test.go +++ b/services/core/measurements/application_test.go @@ -196,7 +196,7 @@ func TestShouldCopyOverDefaultFields(t *testing.T) { // Act err = svc.ProcessPipelineMessage(msg) require.NoError(t, err) - svc.CommitBatch(true) + assert.NoError(t, svc.CommitBatch(true)) // Assert require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") @@ -334,7 +334,7 @@ func TestShouldChooseMeasurementLocationOverDeviceLocation(t *testing.T) { require.NoError(t, svc.ProcessPipelineMessage(msg), ) - svc.CommitBatch(true) + assert.NoError(t, svc.CommitBatch(true)) // Assert require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") @@ -414,7 +414,7 @@ func TestShouldSetExpirationDate(t *testing.T) { // Act err = svc.ProcessPipelineMessage(msg) require.NoError(t, err) - svc.CommitBatch(true) + assert.NoError(t, svc.CommitBatch(true)) // Assert require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called") diff --git a/services/core/measurements/datastreams.go b/services/core/measurements/datastreams.go index 13248d3c..5d83cca3 100644 --- a/services/core/measurements/datastreams.go +++ b/services/core/measurements/datastreams.go @@ -24,22 +24,3 @@ type Datastream struct { CreatedAt time.Time `json:"created_at" db:"created_at"` TenantID int64 `json:"-"` } - -func newDatastream(tenantID, sensorID int64, obs, uom string) (*Datastream, error) { - // TODO: Check UoM conforms to UCUM - if uom == "" || false { - return nil, ErrUoMInvalid - } - if sensorID == 0 { - return nil, ErrInvalidSensorID - } - return &Datastream{ - ID: uuid.New(), - TenantID: tenantID, - Description: "", - SensorID: sensorID, - ObservedProperty: obs, - UnitOfMeasurement: uom, - CreatedAt: time.Now(), - }, nil -} From 4c634f68ed6f151c874e6a0d17e6953f639f3d2f Mon Sep 17 00:00:00 2001 From: Tim van Osch Date: Tue, 7 Jan 2025 17:07:02 +0100 Subject: [PATCH 7/7] fix: add logs and error logging --- pkg/mq/amqp_consumer.go | 5 +++++ services/core/measurements/application.go | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/mq/amqp_consumer.go b/pkg/mq/amqp_consumer.go index 30b08213..73fd1a43 100644 --- a/pkg/mq/amqp_consumer.go +++ b/pkg/mq/amqp_consumer.go @@ -1,6 +1,8 @@ package mq import ( + "fmt" + amqp "github.com/rabbitmq/amqp091-go" ) @@ -17,15 +19,18 @@ func Consume(conn *AMQPConnection, queue string, opts ...SetupOption) <-chan amq } amqpChan, err := amqpConn.Channel() if err != nil { + fmt.Printf("Error creating channel: %s\n", err.Error()) continue } err = setupChannel(amqpChan, opts) if err != nil { + fmt.Printf("Error setting up consume channel: %s\n", err.Error()) continue } amqpDeliveryChan, err := amqpChan.Consume(queue, "", false, false, false, false, nil) if err != nil { + fmt.Printf("Error consuming from consume channel: %s\n", err.Error()) continue } diff --git a/services/core/measurements/application.go b/services/core/measurements/application.go index 8b80df42..1b29d949 100644 --- a/services/core/measurements/application.go +++ b/services/core/measurements/application.go @@ -47,13 +47,13 @@ func New(store Store, systemArchiveTime, batchSize int, keyClient auth.JWKSClien } func (s *Service) StartMeasurementBatchStorer(interval time.Duration) cleanupper.Shutdown { - log.Println("Measurement service batch storer started") - defer log.Println("Measurement service batch storer stopped!") stop := make(chan struct{}) done := make(chan struct{}) t := time.NewTicker(interval) go func() { + log.Println("Measurement service batch storer started") + defer log.Println("Measurement service batch storer stopped!") outer: for { select {