From ce5d4f8bc2a818b975fecb55815d313a8f5b88dc Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Thu, 1 Aug 2024 16:16:47 -0400 Subject: [PATCH] feat: support disableable qos message size check - allow disableable qos message size check - this will match parodus's default behavior --- internal/wrphandlers/qos/options.go | 5 +---- internal/wrphandlers/qos/priority_queue.go | 3 ++- internal/wrphandlers/qos/priority_queue_test.go | 6 ++++++ internal/wrphandlers/qos/qos_test.go | 11 +++++++++++ 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..16c26ce 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -9,8 +9,7 @@ import ( ) const ( - DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue - DefaultMaxMessageBytes = 256 * 1024 // 256 KB + DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue ) // MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. @@ -37,8 +36,6 @@ func MaxMessageBytes(s int) Option { func(h *Handler) error { if s < 0 { return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS) - } else if s == 0 { - s = DefaultMaxMessageBytes } h.maxMessageBytes = s diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..6c6c97a 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -52,7 +52,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { // Enqueue queues the given message. func (pq *priorityQueue) Enqueue(msg wrp.Message) error { // Check whether msg violates maxMessageBytes. - if len(msg.Payload) > pq.maxMessageBytes { + // The zero value of `pq.maxMessageBytes` will disable the wrp.Payload size check. + if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes { return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..817b149 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1, expectedQueueSize: 0, }, + { + description: "allow any message size", + messages: []wrp.Message{largeCriticalQOSMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 1, + }, { description: "message too large with a nonempty queue", messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index c643718..dfc5c8d 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) { expectedHandleWRPErr error }{ // success cases + { + description: "enqueued and delivered message prioritizing newer messages with no message size restriction", + maxQueueBytes: 100, + priority: qos.NewestType, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, { description: "enqueued and delivered message prioritizing newer messages", maxQueueBytes: 100,