From 216f1cd35f2ec7a0ec1a49f994991e2f5f903396 Mon Sep 17 00:00:00 2001 From: Cheskel Twersky Date: Sun, 10 Mar 2024 17:53:46 +0200 Subject: [PATCH 1/9] fix: #1864 --- src/message-queues.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index 6dfefc2cf..b7e35f163 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -59,6 +59,7 @@ export interface BatchOptions { callOptions?: CallOptions; maxMessages?: number; maxMilliseconds?: number; + maxBytes?: number; } /** @@ -99,6 +100,7 @@ export class BatchError extends DebugMessage { * @property {number} [maxMilliseconds=100] Maximum duration to wait before * sending a batch. Batches can be sent earlier if the maxMessages option * is met before the configured duration has passed. + * @property {number} [maxBytes=512000] Maximum number of bytes to allow in */ /** * Class for buffering ack/modAck requests. @@ -113,6 +115,7 @@ export abstract class MessageQueue { numPendingRequests: number; numInFlightRequests: number; numInRetryRequests: number; + bytes: number; protected _onFlush?: defer.DeferredPromise; protected _onDrain?: defer.DeferredPromise; protected _options!: BatchOptions; @@ -127,6 +130,7 @@ export abstract class MessageQueue { this.numPendingRequests = 0; this.numInFlightRequests = 0; this.numInRetryRequests = 0; + this.bytes = 0; this._requests = []; this._subscriber = sub; this._retrier = new ExponentialRetry( @@ -194,7 +198,7 @@ export abstract class MessageQueue { } } - const {maxMessages, maxMilliseconds} = this._options; + const {maxMessages, maxMilliseconds, maxBytes} = this._options; const responsePromise = defer(); this._requests.push({ @@ -208,8 +212,9 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; + this.bytes += Buffer.byteLength(ackId, 'utf8'); - if (this._requests.length >= maxMessages!) { + if (this._requests.length >= maxMessages! || this.bytes >= maxBytes!) { this.flush(); } else if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); @@ -273,6 +278,7 @@ export abstract class MessageQueue { const deferred = this._onFlush; this._requests = []; + this.bytes = 0; this.numPendingRequests -= batchSize; delete this._onFlush; @@ -339,7 +345,11 @@ export abstract class MessageQueue { * @private */ setOptions(options: BatchOptions): void { - const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100}; + const defaults: BatchOptions = { + maxMessages: 3000, + maxMilliseconds: 100, + maxBytes: 512000, + }; this._options = Object.assign(defaults, options); } From c81d79a1acf38f5574f8d1584723c28ddcc86c57 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:47:45 -0400 Subject: [PATCH 2/9] feat: update maxBytes changes to use a fixed max size --- src/message-queues.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index b7e35f163..828228e0a 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -59,9 +59,13 @@ export interface BatchOptions { callOptions?: CallOptions; maxMessages?: number; maxMilliseconds?: number; - maxBytes?: number; } +// This is the maximum number of bytes we will send for a batch of +// ack/modack messages. The server itself has a maximum of 512KiB, so +// we just pull back a little from that in case of unknown fenceposts. +const MAX_BATCH_BYTES = 510 * 1024 * 1024; + /** * Error class used to signal a batch failure. * @@ -100,7 +104,6 @@ export class BatchError extends DebugMessage { * @property {number} [maxMilliseconds=100] Maximum duration to wait before * sending a batch. Batches can be sent earlier if the maxMessages option * is met before the configured duration has passed. - * @property {number} [maxBytes=512000] Maximum number of bytes to allow in */ /** * Class for buffering ack/modAck requests. @@ -198,7 +201,7 @@ export abstract class MessageQueue { } } - const {maxMessages, maxMilliseconds, maxBytes} = this._options; + const {maxMessages, maxMilliseconds} = this._options; const responsePromise = defer(); this._requests.push({ @@ -212,9 +215,12 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; - this.bytes += Buffer.byteLength(ackId, 'utf8'); + this.bytes += Buffer.byteLength(message.ackId, 'utf8'); - if (this._requests.length >= maxMessages! || this.bytes >= maxBytes!) { + if ( + this._requests.length >= maxMessages! || + this.bytes >= MAX_BATCH_BYTES + ) { this.flush(); } else if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); @@ -348,7 +354,6 @@ export abstract class MessageQueue { const defaults: BatchOptions = { maxMessages: 3000, maxMilliseconds: 100, - maxBytes: 512000, }; this._options = Object.assign(defaults, options); From eccca4bed242ce8883b33138d797f3b18a0f337c Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:53:18 -0400 Subject: [PATCH 3/9] chore: remove "discount polyfill" for allSettled --- test/message-queues.ts | 42 ++++++------------------------------------ 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/test/message-queues.ts b/test/message-queues.ts index 98e514f98..9505d0521 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -99,36 +99,6 @@ class ModAckQueue extends messageTypes.ModAckQueue { } } -// This discount polyfill for Promise.allSettled can be removed after we drop Node 12. -type AllSettledResult = { - status: 'fulfilled' | 'rejected'; - value?: T; - reason?: U; -}; -function allSettled( - proms: Promise[] -): Promise[]> { - const checkedProms = proms.map((r: Promise) => - r - .then( - (value: T) => - ({ - status: 'fulfilled', - value, - }) as AllSettledResult - ) - .catch( - (error: U) => - ({ - status: 'rejected', - reason: error, - }) as AllSettledResult - ) - ); - - return Promise.all(checkedProms); -} - describe('MessageQueues', () => { const sandbox = sinon.createSandbox(); @@ -498,7 +468,7 @@ describe('MessageQueues', () => { (r: messageTypes.QueuedMessage) => r.responsePromise!.promise ); await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); const oneSuccess = {status: 'fulfilled', value: undefined}; assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); }); @@ -522,7 +492,7 @@ describe('MessageQueues', () => { proms.shift(); await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); assert.strictEqual(results[1].status, 'rejected'); @@ -552,7 +522,7 @@ describe('MessageQueues', () => { ]; await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'INVALID'); @@ -789,7 +759,7 @@ describe('MessageQueues', () => { (r: messageTypes.QueuedMessage) => r.responsePromise!.promise ); await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); const oneSuccess = {status: 'fulfilled', value: undefined}; assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); }); @@ -815,7 +785,7 @@ describe('MessageQueues', () => { proms.shift(); await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); assert.strictEqual(results[1].status, 'rejected'); @@ -847,7 +817,7 @@ describe('MessageQueues', () => { ]; await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'INVALID'); From 96879286ce803eabb788645eede6f2a1520182f6 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Fri, 6 Sep 2024 16:12:02 -0400 Subject: [PATCH 4/9] tests: add unit tests for maxBytes handling --- src/message-queues.ts | 2 +- test/message-queues.ts | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index 828228e0a..b660012a4 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -64,7 +64,7 @@ export interface BatchOptions { // This is the maximum number of bytes we will send for a batch of // ack/modack messages. The server itself has a maximum of 512KiB, so // we just pull back a little from that in case of unknown fenceposts. -const MAX_BATCH_BYTES = 510 * 1024 * 1024; +export const MAX_BATCH_BYTES = 510 * 1024 * 1024; /** * Error class used to signal a batch failure. diff --git a/test/message-queues.ts b/test/message-queues.ts index 9505d0521..b5d42b768 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -24,7 +24,7 @@ import defer = require('p-defer'); import * as messageTypes from '../src/message-queues'; import {BatchError} from '../src/message-queues'; -import {AckError, Message, Subscriber} from '../src/subscriber'; +import {Message, Subscriber} from '../src/subscriber'; import {DebugMessage} from '../src/debug'; class FakeClient { @@ -160,6 +160,15 @@ describe('MessageQueues', () => { assert.strictEqual(stub.callCount, 1); }); + it('should flush the queue if at byte capacity', () => { + const stub = sandbox.stub(messageQueue, 'flush'); + + messageQueue.bytes = messageTypes.MAX_BATCH_BYTES - 10; + messageQueue.add(new FakeMessage() as Message); + + assert.strictEqual(stub.callCount, 1); + }); + it('should schedule a flush if needed', () => { const clock = sandbox.useFakeTimers(); const stub = sandbox.stub(messageQueue, 'flush'); @@ -214,6 +223,13 @@ describe('MessageQueues', () => { assert.strictEqual(messageQueue.numPendingRequests, 0); }); + it('should remove the bytes of messages from the queue', () => { + messageQueue.add(new FakeMessage() as Message); + messageQueue.flush(); + + assert.strictEqual(messageQueue.bytes, 0); + }); + it('should send the batch', () => { const message = new FakeMessage(); const deadline = 10; From 37be5b97571cb14e20cd806246327e1a00927b93 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 11 Sep 2024 16:59:51 -0400 Subject: [PATCH 5/9] fix: do the maxMessages and maxBytes checks _before_ adding --- src/message-queues.ts | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/message-queues.ts b/src/message-queues.ts index b660012a4..a08330a36 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -202,7 +202,18 @@ export abstract class MessageQueue { } const {maxMessages, maxMilliseconds} = this._options; + const size = Buffer.byteLength(message.ackId, 'utf8'); + // If we will go over maxMessages or MAX_BATCH_BYTES by adding this + // message, flush first. (maxMilliseconds is handled by timers.) + if ( + this._requests.length + 1 >= maxMessages! || + this.bytes + size >= MAX_BATCH_BYTES + ) { + this.flush(); + } + + // Add the message to the current batch. const responsePromise = defer(); this._requests.push({ message: { @@ -215,14 +226,10 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; - this.bytes += Buffer.byteLength(message.ackId, 'utf8'); + this.bytes += size; - if ( - this._requests.length >= maxMessages! || - this.bytes >= MAX_BATCH_BYTES - ) { - this.flush(); - } else if (!this._timer) { + // Ensure that we are counting toward maxMilliseconds by timer. + if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); } From 319382233340f56d0b565f1c58157246389d774f Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 11 Sep 2024 17:14:01 -0400 Subject: [PATCH 6/9] fix: roll back nise package to avoid semver breakage --- package.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/package.json b/package.json index 9eabe553b..b305e320a 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,8 @@ "heap-js": "^2.2.0", "is-stream-ended": "^0.1.4", "lodash.snakecase": "^4.1.1", + "nise": "6.0.0", + "path-to-regexp": "6.2.2", "p-defer": "^3.0.0" }, "devDependencies": { From 1134ccb978836f424af0e56fd18bd553b2a82d87 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 11 Sep 2024 17:18:44 -0400 Subject: [PATCH 7/9] chore: move the nise version fix to devDependencies (oops) --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index b305e320a..366c63531 100644 --- a/package.json +++ b/package.json @@ -60,8 +60,6 @@ "heap-js": "^2.2.0", "is-stream-ended": "^0.1.4", "lodash.snakecase": "^4.1.1", - "nise": "6.0.0", - "path-to-regexp": "6.2.2", "p-defer": "^3.0.0" }, "devDependencies": { @@ -91,7 +89,9 @@ "mocha": "^9.2.2", "mv": "^2.1.1", "ncp": "^2.0.0", + "nise": "6.0.0", "null-loader": "^4.0.0", + "path-to-regexp": "6.2.2", "protobufjs": "^7.0.0", "proxyquire": "^2.0.0", "sinon": "^18.0.0", From 1132880557fe6aa8d1b219536cdfa2152b5109c7 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 12 Sep 2024 16:15:07 -0400 Subject: [PATCH 8/9] chore: pin rimraf in samples package to avoid typescript breakage in lru-cache --- samples/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/package.json b/samples/package.json index 416121e92..a171d31e7 100644 --- a/samples/package.json +++ b/samples/package.json @@ -40,7 +40,7 @@ "chai": "^4.2.0", "gts": "^5.0.0", "mocha": "^9.2.2", - "rimraf": "^5.0.0", + "rimraf": "5.0.9", "typescript": "^5.1.6", "uuid": "^9.0.0" } From 08e250b581b6473a7219d7a7c766601ee49598da Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Thu, 12 Sep 2024 16:31:43 -0400 Subject: [PATCH 9/9] chore: also pin lru-cache, remove unneeded rimraf types --- samples/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/package.json b/samples/package.json index a171d31e7..979b5bc96 100644 --- a/samples/package.json +++ b/samples/package.json @@ -36,9 +36,9 @@ "devDependencies": { "@google-cloud/bigquery": "^7.0.0", "@types/chai": "^4.2.16", - "@types/rimraf": "^4.0.0", "chai": "^4.2.0", "gts": "^5.0.0", + "lru-cache": "9.1.2", "mocha": "^9.2.2", "rimraf": "5.0.9", "typescript": "^5.1.6",