Skip to content

Commit

Permalink
samples: add optimisticSubscribe, plus a few small library changes to…
Browse files Browse the repository at this point in the history
… support it (#1973)

* samples: add optimisticSubscribe, plus a few small library changes to support it

* fix: simplify DebugMessage handling of StatusError

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* chore: fix copyright date

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
feywind and gcf-owl-bot[bot] authored Sep 24, 2024
1 parent c3abf92 commit 37b9f71
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree
| Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) |
| Subscribe with OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithOpenTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithOpenTelemetryTracing.js,samples/README.md) |
| Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) |
| Optimistic Subscribe | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md) |
| Publish Avro Records to a Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishAvroRecords.js,samples/README.md) |
| Publish Batched Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishBatchedMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishBatchedMessages.js,samples/README.md) |
| Publish Message | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishMessage.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishMessage.js,samples/README.md) |
Expand Down
20 changes: 20 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ guides.
* [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes)
* [Subscribe with OpenTelemetry Tracing](#subscribe-with-opentelemetry-tracing)
* [Modify Push Configuration](#modify-push-configuration)
* [Optimistic Subscribe](#optimistic-subscribe)
* [Publish Avro Records to a Topic](#publish-avro-records-to-a-topic)
* [Publish Batched Messages](#publish-batched-messages)
* [Publish Message](#publish-message)
Expand Down Expand Up @@ -901,6 +902,25 @@ __Usage:__



### Optimistic Subscribe

Listens for messages from a subscription, creating it if needed.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/optimisticSubscribe.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/optimisticSubscribe.js,samples/README.md)

__Usage:__


`node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]`


-----




### Publish Avro Records to a Topic

Publishes a record in Avro to a topic with a schema.
Expand Down
3 changes: 2 additions & 1 deletion samples/listenForMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const {PubSub} = require('@google-cloud/pubsub');
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/optimisticSubscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async e => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
3 changes: 2 additions & 1 deletion samples/typescript/listenForMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import {PubSub, Message} from '@google-cloud/pubsub';
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
// References an existing subscription; if you are unsure if the
// subscription will exist, try the optimisticSubscribe sample.
const subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
Expand Down
101 changes: 101 additions & 0 deletions samples/typescript/optimisticSubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to perform basic operations on
* subscriptions with the Google Cloud Pub/Sub API.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Optimistic Subscribe
// description: Listens for messages from a subscription, creating it if needed.
// usage: node optimisticSubscribe.js <subscription-name-or-id> <topic-name-or-id> [timeout-in-seconds]

// [START pubsub_optimistic_subscribe]
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
subscriptionNameOrId: string,
topicNameOrId: string,
timeout: number
) {
// Try using an existing subscription
let subscription = pubSubClient.subscription(subscriptionNameOrId);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;

// "Ack" (acknowledge receipt of) the message
message.ack();
};

// Set an error handler so that we're notified if the subscription doesn't
// already exist.
subscription.on('error', async (e: StatusError) => {
// Resource Not Found
if (e.code === 5) {
console.log('Subscription not found, creating it');
await pubSubClient.createSubscription(
topicNameOrId,
subscriptionNameOrId
);

// Refresh our subscriber object and re-attach the message handler.
subscription = pubSubClient.subscription(subscriptionNameOrId);
subscription.on('message', messageHandler);
}
});

// Listen for new messages until timeout is hit; this will attempt to
// open the actual subscriber streams. If it fails, the error handler
// above will be called.
subscription.on('message', messageHandler);

// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
// [END pubsub_optimistic_subscribe]

function main(
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
timeout = 60
) {
timeout = Number(timeout);
optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout);
}

main(...process.argv.slice(2));
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export {
SeekResponse,
Snapshot,
} from './snapshot';
export {Message, SubscriberOptions} from './subscriber';
export {Message, StatusError, SubscriberOptions} from './subscriber';
export {
Schema,
CreateSchemaResponse,
Expand Down
8 changes: 6 additions & 2 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,14 @@ export class MessageStream extends PassThrough {
private _onEnd(index: number, status: grpc.StatusObject): void {
this._removeStream(index);

const statusError = new StatusError(status);

if (PullRetry.retry(status)) {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will be retried.`,
statusError
)
);
if (PullRetry.resetFailures(status)) {
Expand All @@ -401,7 +404,8 @@ export class MessageStream extends PassThrough {
this.emit(
'debug',
new DebugMessage(
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`
`Subscriber stream ${index} has ended with status ${status.code}; will not be retried.`,
statusError
)
);

Expand Down
2 changes: 2 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import * as tracing from './telemetry-tracing';
import {Duration} from './temporal';
import {EventEmitter} from 'events';

export {StatusError} from './message-stream';

export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
export type SubscriptionProperties =
google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;
Expand Down

0 comments on commit 37b9f71

Please sign in to comment.