Skip to content

Commit

Permalink
feat: support disableable qos message size check
Browse files Browse the repository at this point in the history
- allow disableable qos message size check
- this will match parodus's default behavior
  • Loading branch information
denopink committed Aug 1, 2024
1 parent 31387eb commit ce5d4f8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
5 changes: 1 addition & 4 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
)

const (
DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue
DefaultMaxMessageBytes = 256 * 1024 // 256 KB
DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue
)

// MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload.
Expand All @@ -37,8 +36,6 @@ func MaxMessageBytes(s int) Option {
func(h *Handler) error {
if s < 0 {
return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS)
} else if s == 0 {
s = DefaultMaxMessageBytes
}

h.maxMessageBytes = s
Expand Down
3 changes: 2 additions & 1 deletion internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
// Check whether msg violates maxMessageBytes.
if len(msg.Payload) > pq.maxMessageBytes {
// The zero value of `pq.maxMessageBytes` will disable the wrp.Payload size check.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) {
maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1,
expectedQueueSize: 0,
},
{
description: "allow any message size",
messages: []wrp.Message{largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
Expand Down
11 changes: 11 additions & 0 deletions internal/wrphandlers/qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) {
expectedHandleWRPErr error
}{
// success cases
{
description: "enqueued and delivered message prioritizing newer messages with no message size restriction",
maxQueueBytes: 100,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)

return nil
}),
},
{
description: "enqueued and delivered message prioritizing newer messages",
maxQueueBytes: 100,
Expand Down

0 comments on commit ce5d4f8

Please sign in to comment.