From 0dfc9e64c576c53418ac2cb3b22ad05df8e1c655 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:08:22 -0400 Subject: [PATCH 1/2] feat: add async helper for ordered async processing of messages --- src/async-helper.ts | 103 +++++++++++++++++++++++++++++++++++++++++++ test/async-helper.ts | 62 ++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 src/async-helper.ts create mode 100644 test/async-helper.ts diff --git a/src/async-helper.ts b/src/async-helper.ts new file mode 100644 index 000000000..c6b96a6d9 --- /dev/null +++ b/src/async-helper.ts @@ -0,0 +1,103 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Message} from './subscriber'; + +/** + * Represents an async function that can process a message and return + * a Promise for the function's completion. + */ +export interface UserHandler { + (message: Message): Promise; +} + +/** + * A handler for sub.on('message', x) that can be passed to .on() to do + * the async processing in this class. + */ +export interface StreamHandler { + (message: Message): void; +} + +/** + * When executing an async function, the Node runtime is really getting + * a Promise; these are guaranteed not to complete until another cycle of + * the event loop (at least the micro-loop). This can be problematic for + * ordered queue receipts, since the library doesn't take special pains to + * deliver the messages one at a time to async functions (they all push + * through in one go, which just results in a bunch of outstanding Promises). + * + * This helper acts as a funnel for the subscriber so that it can do the + * normal "push all the messages" as before, but each message will be + * allowed to process fully before the next message is delivered to user code. + * + * This should not be used for non-async handlers. It's not being built + * into the library itself, because it's difficult for us to second-guess + * what users might want in a given situation. This lets you decide explicitly. + * (Also, event handlers for on() are not something we have direct access + * to, so guessing whether it's a Promise and waiting on it would be difficult.) + * + * @example + * ``` + * const {PubSub, AsyncHelper} = require('@google-cloud/pubsub'); + * const pubsub = new PubSub(); + * + * const sub = pubsub.subscription('my-sub'); + * const helper = new AsyncHelper(async (m) => console.log(m)); + * sub.on('message', helper.handler); + * ``` + */ +export class AsyncHelper { + // The queue of messages we need to process in order. + queue: Message[] = []; + + // The "tail" Promise, i.e. the previous processing step (or resolve()). + tailPromise: Promise = Promise.resolve(); + + // The user's handler that will be called to take a message and get back a Promise. + userHandler: UserHandler; + + /** + * @param userHandler The async function we'll call for each message. + */ + constructor(userHandler: UserHandler) { + this.userHandler = userHandler; + } + + /** + * A handler function that you can pass to .on('message'). + */ + get handler(): StreamHandler { + return this.streamHandler.bind(this); + } + + // Pushes new messages on the queue and starts (or chains) a + // processing step. + private streamHandler(message: Message): void { + this.queue.push(message); + + // This should be either Promise.resolve() (instant callback) + // or the previous work item the user's function returned. + this.tailPromise.then(() => { + const message = this.queue.shift(); + if (!message) { + // No message -> go back to resolve() to signal ready. + this.tailPromise = Promise.resolve(); + } else { + // Message -> chain to the previous tail and replace it. + this.tailPromise = this.userHandler(message); + } + }); + } +} diff --git a/test/async-helper.ts b/test/async-helper.ts new file mode 100644 index 000000000..49ceabe2e --- /dev/null +++ b/test/async-helper.ts @@ -0,0 +1,62 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {describe, it} from 'mocha'; +import * as assert from 'assert'; +import * as ah from '../src/async-helper'; +import {Message} from '../src/subscriber'; + +class FakeMessage { + constructor(public id: string) {} +} + +function fakeMessage(id: string) { + return new FakeMessage(id) as unknown as Message; +} + +describe('async-helper', () => { + it('processes new messages', async () => { + const helper = new ah.AsyncHelper(async (m: Message) => { + assert.strictEqual(m.id, '1'); + }); + const handler = helper.handler; + const msg = fakeMessage('1'); + handler(msg); + }); + + it('processes multiple messages in order', async () => { + const items = ['1', '2', '3']; + const helper = new ah.AsyncHelper(async (m: Message) => { + assert.strictEqual(m.id, items.shift()); + }); + const handler = helper.handler; + handler(fakeMessage('1')); + handler(fakeMessage('2')); + handler(fakeMessage('3')); + }); + + it('processes unevenly timed messages in order', async () => { + const items = ['1', '2', '3']; + const helper = new ah.AsyncHelper(async (m: Message) => { + if (m.id === '2') { + await new Promise(r => setTimeout(r, 100)); + } + assert.strictEqual(m.id, items.shift()); + }); + const handler = helper.handler; + handler(fakeMessage('1')); + handler(fakeMessage('2')); + handler(fakeMessage('3')); + }); +}); From f152573a9c869382634ab17d0783f4c715b34279 Mon Sep 17 00:00:00 2001 From: feywind <57276408+feywind@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:11:06 -0400 Subject: [PATCH 2/2] fix: export types from index --- src/async-helper.ts | 10 +++++----- src/index.ts | 5 +++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/async-helper.ts b/src/async-helper.ts index c6b96a6d9..5ffe87284 100644 --- a/src/async-helper.ts +++ b/src/async-helper.ts @@ -18,7 +18,7 @@ import {Message} from './subscriber'; * Represents an async function that can process a message and return * a Promise for the function's completion. */ -export interface UserHandler { +export interface AsyncMessageHandler { (message: Message): Promise; } @@ -26,7 +26,7 @@ export interface UserHandler { * A handler for sub.on('message', x) that can be passed to .on() to do * the async processing in this class. */ -export interface StreamHandler { +export interface StreamMessageHandler { (message: Message): void; } @@ -66,19 +66,19 @@ export class AsyncHelper { tailPromise: Promise = Promise.resolve(); // The user's handler that will be called to take a message and get back a Promise. - userHandler: UserHandler; + userHandler: AsyncMessageHandler; /** * @param userHandler The async function we'll call for each message. */ - constructor(userHandler: UserHandler) { + constructor(userHandler: AsyncMessageHandler) { this.userHandler = userHandler; } /** * A handler function that you can pass to .on('message'). */ - get handler(): StreamHandler { + get handler(): StreamMessageHandler { return this.streamHandler.bind(this); } diff --git a/src/index.ts b/src/index.ts index 15b5a1d32..e46f400f5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -175,6 +175,11 @@ export { } from './topic'; export {Duration, TotalOfUnit, DurationLike} from './temporal'; export {DebugMessage} from './debug'; +export { + AsyncHelper, + AsyncMessageHandler, + StreamMessageHandler, +} from './async-helper'; if (process.env.DEBUG_GRPC) { console.info('gRPC logging set to verbose');