From e18df90dadeca5a1558a300f33d6631fcf5b84cf Mon Sep 17 00:00:00 2001 From: Cheskel Twersky Date: Sun, 10 Mar 2024 17:53:46 +0200 Subject: [PATCH] fix: #1864 --- src/message-queues.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index e40be2265..d948af9c3 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -53,6 +53,7 @@ export interface BatchOptions { callOptions?: CallOptions; maxMessages?: number; maxMilliseconds?: number; + maxBytes?: number; } /** @@ -93,6 +94,7 @@ export class BatchError extends DebugMessage { * @property {number} [maxMilliseconds=100] Maximum duration to wait before * sending a batch. Batches can be sent earlier if the maxMessages option * is met before the configured duration has passed. + * @property {number} [maxBytes=512000] Maximum number of bytes to allow in */ /** * Class for buffering ack/modAck requests. @@ -107,6 +109,7 @@ export abstract class MessageQueue { numPendingRequests: number; numInFlightRequests: number; numInRetryRequests: number; + bytes: number; protected _onFlush?: defer.DeferredPromise; protected _onDrain?: defer.DeferredPromise; protected _options!: BatchOptions; @@ -121,6 +124,7 @@ export abstract class MessageQueue { this.numPendingRequests = 0; this.numInFlightRequests = 0; this.numInRetryRequests = 0; + this.bytes = 0; this._requests = []; this._subscriber = sub; this._retrier = new ExponentialRetry( @@ -188,7 +192,7 @@ export abstract class MessageQueue { } } - const {maxMessages, maxMilliseconds} = this._options; + const {maxMessages, maxMilliseconds, maxBytes} = this._options; const responsePromise = defer(); this._requests.push({ @@ -199,8 +203,9 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; + this.bytes += Buffer.byteLength(ackId, 'utf8'); - if (this._requests.length >= maxMessages!) { + if (this._requests.length >= maxMessages! || this.bytes >= maxBytes!) { this.flush(); } else if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); @@ -264,6 +269,7 @@ export abstract class MessageQueue { const deferred = this._onFlush; this._requests = []; + this.bytes = 0; this.numPendingRequests -= batchSize; delete this._onFlush; @@ -330,7 +336,11 @@ export abstract class MessageQueue { * @private */ setOptions(options: BatchOptions): void { - const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100}; + const defaults: BatchOptions = { + maxMessages: 3000, + maxMilliseconds: 100, + maxBytes: 512000, + }; this._options = Object.assign(defaults, options); }