From 37b9f71e75df6122789e6a2ec27487d3253646f0 Mon Sep 17 00:00:00 2001 From: Megan Potter <57276408+feywind@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:11:03 -0400 Subject: [PATCH] samples: add optimisticSubscribe, plus a few small library changes to support it (#1973) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * samples: add optimisticSubscribe, plus a few small library changes to support it * fix: simplify DebugMessage handling of StatusError * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: fix copyright date * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 1 + samples/README.md | 20 +++++ samples/listenForMessages.js | 3 +- samples/optimisticSubscribe.js | 101 ++++++++++++++++++++++ samples/typescript/listenForMessages.ts | 3 +- samples/typescript/optimisticSubscribe.ts | 101 ++++++++++++++++++++++ src/index.ts | 2 +- src/message-stream.ts | 8 +- src/subscriber.ts | 2 + 9 files changed, 236 insertions(+), 5 deletions(-) create mode 100644 samples/optimisticSubscribe.js create mode 100644 samples/typescript/optimisticSubscribe.ts diff --git a/README.md b/README.md index 94ac16b3d..cafebdefe 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) | | Subscribe with OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithOpenTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithOpenTelemetryTracing.js,samples/README.md) | | Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) | +| Optimistic Subscribe | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md) | | Publish Avro Records to a Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishAvroRecords.js,samples/README.md) | | Publish Batched Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishBatchedMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishBatchedMessages.js,samples/README.md) | | Publish Message | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishMessage.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishMessage.js,samples/README.md) | diff --git a/samples/README.md b/samples/README.md index 33710a253..c3e17d913 100644 --- a/samples/README.md +++ b/samples/README.md @@ -62,6 +62,7 @@ guides. * [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes) * [Subscribe with OpenTelemetry Tracing](#subscribe-with-opentelemetry-tracing) * [Modify Push Configuration](#modify-push-configuration) + * [Optimistic Subscribe](#optimistic-subscribe) * [Publish Avro Records to a Topic](#publish-avro-records-to-a-topic) * [Publish Batched Messages](#publish-batched-messages) * [Publish Message](#publish-message) @@ -901,6 +902,25 @@ __Usage:__ +### Optimistic Subscribe + +Listens for messages from a subscription, creating it if needed. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md) + +__Usage:__ + + +`node optimisticSubscribe.js [timeout-in-seconds]` + + +----- + + + + ### Publish Avro Records to a Topic Publishes a record in Avro to a topic with a schema. diff --git a/samples/listenForMessages.js b/samples/listenForMessages.js index 3f3f35547..ebfdbbeb7 100644 --- a/samples/listenForMessages.js +++ b/samples/listenForMessages.js @@ -44,7 +44,8 @@ const {PubSub} = require('@google-cloud/pubsub'); const pubSubClient = new PubSub(); function listenForMessages(subscriptionNameOrId, timeout) { - // References an existing subscription + // References an existing subscription; if you are unsure if the + // subscription will exist, try the optimisticSubscribe sample. const subscription = pubSubClient.subscription(subscriptionNameOrId); // Create an event handler to handle messages diff --git a/samples/optimisticSubscribe.js b/samples/optimisticSubscribe.js new file mode 100644 index 000000000..758410d60 --- /dev/null +++ b/samples/optimisticSubscribe.js @@ -0,0 +1,101 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Optimistic Subscribe +// description: Listens for messages from a subscription, creating it if needed. +// usage: node optimisticSubscribe.js [timeout-in-seconds] + +// [START pubsub_optimistic_subscribe] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const timeout = 60; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) { + // Try using an existing subscription + let subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = message => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Set an error handler so that we're notified if the subscription doesn't + // already exist. + subscription.on('error', async e => { + // Resource Not Found + if (e.code === 5) { + console.log('Subscription not found, creating it'); + await pubSubClient.createSubscription( + topicNameOrId, + subscriptionNameOrId + ); + + // Refresh our subscriber object and re-attach the message handler. + subscription = pubSubClient.subscription(subscriptionNameOrId); + subscription.on('message', messageHandler); + } + }); + + // Listen for new messages until timeout is hit; this will attempt to + // open the actual subscriber streams. If it fails, the error handler + // above will be called. + subscription.on('message', messageHandler); + + // Wait a while for the subscription to run. (Part of the sample only.) + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_optimistic_subscribe] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + timeout = 60 +) { + timeout = Number(timeout); + optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/listenForMessages.ts b/samples/typescript/listenForMessages.ts index 72cb468df..582e04649 100644 --- a/samples/typescript/listenForMessages.ts +++ b/samples/typescript/listenForMessages.ts @@ -40,7 +40,8 @@ import {PubSub, Message} from '@google-cloud/pubsub'; const pubSubClient = new PubSub(); function listenForMessages(subscriptionNameOrId: string, timeout: number) { - // References an existing subscription + // References an existing subscription; if you are unsure if the + // subscription will exist, try the optimisticSubscribe sample. const subscription = pubSubClient.subscription(subscriptionNameOrId); // Create an event handler to handle messages diff --git a/samples/typescript/optimisticSubscribe.ts b/samples/typescript/optimisticSubscribe.ts new file mode 100644 index 000000000..e1fb6aafa --- /dev/null +++ b/samples/typescript/optimisticSubscribe.ts @@ -0,0 +1,101 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to perform basic operations on + * subscriptions with the Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Optimistic Subscribe +// description: Listens for messages from a subscription, creating it if needed. +// usage: node optimisticSubscribe.js [timeout-in-seconds] + +// [START pubsub_optimistic_subscribe] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID'; +// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID'; +// const timeout = 60; + +// Imports the Google Cloud client library +import {PubSub, Message, StatusError} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubSubClient = new PubSub(); + +function optimisticSubscribe( + subscriptionNameOrId: string, + topicNameOrId: string, + timeout: number +) { + // Try using an existing subscription + let subscription = pubSubClient.subscription(subscriptionNameOrId); + + // Create an event handler to handle messages + let messageCount = 0; + const messageHandler = (message: Message) => { + console.log(`Received message ${message.id}:`); + console.log(`\tData: ${message.data}`); + console.log(`\tAttributes: ${message.attributes}`); + messageCount += 1; + + // "Ack" (acknowledge receipt of) the message + message.ack(); + }; + + // Set an error handler so that we're notified if the subscription doesn't + // already exist. + subscription.on('error', async (e: StatusError) => { + // Resource Not Found + if (e.code === 5) { + console.log('Subscription not found, creating it'); + await pubSubClient.createSubscription( + topicNameOrId, + subscriptionNameOrId + ); + + // Refresh our subscriber object and re-attach the message handler. + subscription = pubSubClient.subscription(subscriptionNameOrId); + subscription.on('message', messageHandler); + } + }); + + // Listen for new messages until timeout is hit; this will attempt to + // open the actual subscriber streams. If it fails, the error handler + // above will be called. + subscription.on('message', messageHandler); + + // Wait a while for the subscription to run. (Part of the sample only.) + setTimeout(() => { + subscription.removeListener('message', messageHandler); + console.log(`${messageCount} message(s) received.`); + }, timeout * 1000); +} +// [END pubsub_optimistic_subscribe] + +function main( + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + timeout = 60 +) { + timeout = Number(timeout); + optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout); +} + +main(...process.argv.slice(2)); diff --git a/src/index.ts b/src/index.ts index abc8afb08..15b5a1d32 100644 --- a/src/index.ts +++ b/src/index.ts @@ -124,7 +124,7 @@ export { SeekResponse, Snapshot, } from './snapshot'; -export {Message, SubscriberOptions} from './subscriber'; +export {Message, StatusError, SubscriberOptions} from './subscriber'; export { Schema, CreateSchemaResponse, diff --git a/src/message-stream.ts b/src/message-stream.ts index 80b714496..073658382 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -384,11 +384,14 @@ export class MessageStream extends PassThrough { private _onEnd(index: number, status: grpc.StatusObject): void { this._removeStream(index); + const statusError = new StatusError(status); + if (PullRetry.retry(status)) { this.emit( 'debug', new DebugMessage( - `Subscriber stream ${index} has ended with status ${status.code}; will be retried.` + `Subscriber stream ${index} has ended with status ${status.code}; will be retried.`, + statusError ) ); if (PullRetry.resetFailures(status)) { @@ -401,7 +404,8 @@ export class MessageStream extends PassThrough { this.emit( 'debug', new DebugMessage( - `Subscriber stream ${index} has ended with status ${status.code}; will not be retried.` + `Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`, + statusError ) ); diff --git a/src/subscriber.ts b/src/subscriber.ts index fd3dc30a4..c20b63be6 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -30,6 +30,8 @@ import * as tracing from './telemetry-tracing'; import {Duration} from './temporal'; import {EventEmitter} from 'events'; +export {StatusError} from './message-stream'; + export type PullResponse = google.pubsub.v1.IStreamingPullResponse; export type SubscriptionProperties = google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;