diff --git a/package.json b/package.json index 9eabe553b..366c63531 100644 --- a/package.json +++ b/package.json @@ -89,7 +89,9 @@ "mocha": "^9.2.2", "mv": "^2.1.1", "ncp": "^2.0.0", + "nise": "6.0.0", "null-loader": "^4.0.0", + "path-to-regexp": "6.2.2", "protobufjs": "^7.0.0", "proxyquire": "^2.0.0", "sinon": "^18.0.0", diff --git a/samples/package.json b/samples/package.json index 416121e92..979b5bc96 100644 --- a/samples/package.json +++ b/samples/package.json @@ -36,11 +36,11 @@ "devDependencies": { "@google-cloud/bigquery": "^7.0.0", "@types/chai": "^4.2.16", - "@types/rimraf": "^4.0.0", "chai": "^4.2.0", "gts": "^5.0.0", + "lru-cache": "9.1.2", "mocha": "^9.2.2", - "rimraf": "^5.0.0", + "rimraf": "5.0.9", "typescript": "^5.1.6", "uuid": "^9.0.0" } diff --git a/src/message-queues.ts b/src/message-queues.ts index 6dfefc2cf..a08330a36 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -61,6 +61,11 @@ export interface BatchOptions { maxMilliseconds?: number; } +// This is the maximum number of bytes we will send for a batch of +// ack/modack messages. The server itself has a maximum of 512KiB, so +// we just pull back a little from that in case of unknown fenceposts. +export const MAX_BATCH_BYTES = 510 * 1024 * 1024; + /** * Error class used to signal a batch failure. * @@ -113,6 +118,7 @@ export abstract class MessageQueue { numPendingRequests: number; numInFlightRequests: number; numInRetryRequests: number; + bytes: number; protected _onFlush?: defer.DeferredPromise; protected _onDrain?: defer.DeferredPromise; protected _options!: BatchOptions; @@ -127,6 +133,7 @@ export abstract class MessageQueue { this.numPendingRequests = 0; this.numInFlightRequests = 0; this.numInRetryRequests = 0; + this.bytes = 0; this._requests = []; this._subscriber = sub; this._retrier = new ExponentialRetry( @@ -195,7 +202,18 @@ export abstract class MessageQueue { } const {maxMessages, maxMilliseconds} = this._options; + const size = Buffer.byteLength(message.ackId, 'utf8'); + // If we will go over maxMessages or MAX_BATCH_BYTES by adding this + // message, flush first. (maxMilliseconds is handled by timers.) + if ( + this._requests.length + 1 >= maxMessages! || + this.bytes + size >= MAX_BATCH_BYTES + ) { + this.flush(); + } + + // Add the message to the current batch. const responsePromise = defer(); this._requests.push({ message: { @@ -208,10 +226,10 @@ export abstract class MessageQueue { }); this.numPendingRequests++; this.numInFlightRequests++; + this.bytes += size; - if (this._requests.length >= maxMessages!) { - this.flush(); - } else if (!this._timer) { + // Ensure that we are counting toward maxMilliseconds by timer. + if (!this._timer) { this._timer = setTimeout(() => this.flush(), maxMilliseconds!); } @@ -273,6 +291,7 @@ export abstract class MessageQueue { const deferred = this._onFlush; this._requests = []; + this.bytes = 0; this.numPendingRequests -= batchSize; delete this._onFlush; @@ -339,7 +358,10 @@ export abstract class MessageQueue { * @private */ setOptions(options: BatchOptions): void { - const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100}; + const defaults: BatchOptions = { + maxMessages: 3000, + maxMilliseconds: 100, + }; this._options = Object.assign(defaults, options); } diff --git a/test/message-queues.ts b/test/message-queues.ts index 98e514f98..b5d42b768 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -24,7 +24,7 @@ import defer = require('p-defer'); import * as messageTypes from '../src/message-queues'; import {BatchError} from '../src/message-queues'; -import {AckError, Message, Subscriber} from '../src/subscriber'; +import {Message, Subscriber} from '../src/subscriber'; import {DebugMessage} from '../src/debug'; class FakeClient { @@ -99,36 +99,6 @@ class ModAckQueue extends messageTypes.ModAckQueue { } } -// This discount polyfill for Promise.allSettled can be removed after we drop Node 12. -type AllSettledResult = { - status: 'fulfilled' | 'rejected'; - value?: T; - reason?: U; -}; -function allSettled( - proms: Promise[] -): Promise[]> { - const checkedProms = proms.map((r: Promise) => - r - .then( - (value: T) => - ({ - status: 'fulfilled', - value, - }) as AllSettledResult - ) - .catch( - (error: U) => - ({ - status: 'rejected', - reason: error, - }) as AllSettledResult - ) - ); - - return Promise.all(checkedProms); -} - describe('MessageQueues', () => { const sandbox = sinon.createSandbox(); @@ -190,6 +160,15 @@ describe('MessageQueues', () => { assert.strictEqual(stub.callCount, 1); }); + it('should flush the queue if at byte capacity', () => { + const stub = sandbox.stub(messageQueue, 'flush'); + + messageQueue.bytes = messageTypes.MAX_BATCH_BYTES - 10; + messageQueue.add(new FakeMessage() as Message); + + assert.strictEqual(stub.callCount, 1); + }); + it('should schedule a flush if needed', () => { const clock = sandbox.useFakeTimers(); const stub = sandbox.stub(messageQueue, 'flush'); @@ -244,6 +223,13 @@ describe('MessageQueues', () => { assert.strictEqual(messageQueue.numPendingRequests, 0); }); + it('should remove the bytes of messages from the queue', () => { + messageQueue.add(new FakeMessage() as Message); + messageQueue.flush(); + + assert.strictEqual(messageQueue.bytes, 0); + }); + it('should send the batch', () => { const message = new FakeMessage(); const deadline = 10; @@ -498,7 +484,7 @@ describe('MessageQueues', () => { (r: messageTypes.QueuedMessage) => r.responsePromise!.promise ); await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); const oneSuccess = {status: 'fulfilled', value: undefined}; assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); }); @@ -522,7 +508,7 @@ describe('MessageQueues', () => { proms.shift(); await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); assert.strictEqual(results[1].status, 'rejected'); @@ -552,7 +538,7 @@ describe('MessageQueues', () => { ]; await ackQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'INVALID'); @@ -789,7 +775,7 @@ describe('MessageQueues', () => { (r: messageTypes.QueuedMessage) => r.responsePromise!.promise ); await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); const oneSuccess = {status: 'fulfilled', value: undefined}; assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]); }); @@ -815,7 +801,7 @@ describe('MessageQueues', () => { proms.shift(); await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'OTHER'); assert.strictEqual(results[1].status, 'rejected'); @@ -847,7 +833,7 @@ describe('MessageQueues', () => { ]; await modAckQueue.flush(); - const results = await allSettled(proms); + const results = await Promise.allSettled(proms); assert.strictEqual(results[0].status, 'rejected'); assert.strictEqual(results[0].reason?.errorCode, 'INVALID');