diff --git a/.gitignore b/.gitignore index d4f03a0df..661747bf1 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ /.nyc_output /docs/ /out/ -/build/ +**/build/ system-test/secrets.js system-test/*key.json *.lock diff --git a/README.md b/README.md index 735571360..94ac16b3d 100644 --- a/README.md +++ b/README.md @@ -163,8 +163,8 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Listen with exactly-once delivery | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForMessagesWithExactlyOnceDelivery.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForMessagesWithExactlyOnceDelivery.js,samples/README.md) | | Listen For Protobuf Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenForProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenForProtobufMessages.js,samples/README.md) | | Listen For Messages With Custom Attributes | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithCustomAttributes.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithCustomAttributes.js,samples/README.md) | +| Subscribe with OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithOpenTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithOpenTelemetryTracing.js,samples/README.md) | | Modify Push Configuration | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) | -| OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/openTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/openTelemetryTracing.js,samples/README.md) | | Publish Avro Records to a Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishAvroRecords.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishAvroRecords.js,samples/README.md) | | Publish Batched Messages | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishBatchedMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishBatchedMessages.js,samples/README.md) | | Publish Message | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishMessage.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishMessage.js,samples/README.md) | @@ -172,6 +172,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Publish Ordered Message | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishOrderedMessage.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishOrderedMessage.js,samples/README.md) | | Publish Protobuf Messages to a Topic | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishProtobufMessages.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishProtobufMessages.js,samples/README.md) | | Publish with flow control | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishWithFlowControl.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishWithFlowControl.js,samples/README.md) | +| Publish with OpenTelemetry Tracing | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishWithOpenTelemetryTracing.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishWithOpenTelemetryTracing.js,samples/README.md) | | Publish With Retry Settings | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishWithRetrySettings.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishWithRetrySettings.js,samples/README.md) | | Quickstart | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/quickstart.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/quickstart.js,samples/README.md) | | Remove Dead Letter Policy | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/removeDeadLetterPolicy.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/removeDeadLetterPolicy.js,samples/README.md) | diff --git a/package.json b/package.json index 48eb81474..61816dec2 100644 --- a/package.json +++ b/package.json @@ -51,8 +51,8 @@ "@google-cloud/precise-date": "^4.0.0", "@google-cloud/projectify": "^4.0.0", "@google-cloud/promisify": "^4.0.0", - "@opentelemetry/api": "~1.8.0", - "@opentelemetry/semantic-conventions": "~1.21.0", + "@opentelemetry/api": "~1.9.0", + "@opentelemetry/semantic-conventions": "~1.25.1", "arrify": "^2.0.0", "extend": "^3.0.2", "google-auth-library": "^9.3.0", @@ -64,7 +64,8 @@ }, "devDependencies": { "@grpc/proto-loader": "^0.7.0", - "@opentelemetry/tracing": "^0.24.0", + "@opentelemetry/core": "^1.17.0", + "@opentelemetry/sdk-trace-base": "^1.17.0", "@types/duplexify": "^3.6.4", "@types/extend": "^3.0.0", "@types/lodash.snakecase": "^4.1.6", diff --git a/protos/protos.json b/protos/protos.json index 70e570e04..ad639ed00 100644 --- a/protos/protos.json +++ b/protos/protos.json @@ -1,4 +1,7 @@ { + "options": { + "syntax": "proto3" + }, "nested": { "google": { "nested": { diff --git a/samples/README.md b/samples/README.md index f4f59a37c..33710a253 100644 --- a/samples/README.md +++ b/samples/README.md @@ -60,8 +60,8 @@ guides. * [Listen with exactly-once delivery](#listen-with-exactly-once-delivery) * [Listen For Protobuf Messages](#listen-for-protobuf-messages) * [Listen For Messages With Custom Attributes](#listen-for-messages-with-custom-attributes) + * [Subscribe with OpenTelemetry Tracing](#subscribe-with-opentelemetry-tracing) * [Modify Push Configuration](#modify-push-configuration) - * [OpenTelemetry Tracing](#opentelemetry-tracing) * [Publish Avro Records to a Topic](#publish-avro-records-to-a-topic) * [Publish Batched Messages](#publish-batched-messages) * [Publish Message](#publish-message) @@ -69,6 +69,7 @@ guides. * [Publish Ordered Message](#publish-ordered-message) * [Publish Protobuf Messages to a Topic](#publish-protobuf-messages-to-a-topic) * [Publish with flow control](#publish-with-flow-control) + * [Publish with OpenTelemetry Tracing](#publish-with-opentelemetry-tracing) * [Publish With Retry Settings](#publish-with-retry-settings) * [Quickstart](#quickstart) * [Remove Dead Letter Policy](#remove-dead-letter-policy) @@ -862,18 +863,18 @@ __Usage:__ -### Modify Push Configuration +### Subscribe with OpenTelemetry Tracing -Modifies the configuration of an existing push subscription. +Demonstrates how to enable OpenTelemetry tracing in a subscriber. -View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js). +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/listenWithOpenTelemetryTracing.js). -[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/listenWithOpenTelemetryTracing.js,samples/README.md) __Usage:__ -`node modifyPushConfig.js ` +`node listenWithOpenTelemetryTracing.js ` ----- @@ -881,18 +882,18 @@ __Usage:__ -### OpenTelemetry Tracing +### Modify Push Configuration -Demonstrates how to enable OpenTelemetry tracing in a publisher or subscriber. +Modifies the configuration of an existing push subscription. -View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/openTelemetryTracing.js). +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/modifyPushConfig.js). -[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/openTelemetryTracing.js,samples/README.md) +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/modifyPushConfig.js,samples/README.md) __Usage:__ -`node openTelemetryTracing.js ` +`node modifyPushConfig.js ` ----- @@ -1033,6 +1034,25 @@ __Usage:__ +### Publish with OpenTelemetry Tracing + +Demonstrates how to enable OpenTelemetry tracing in a publisher. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/publishWithOpenTelemetryTracing.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/publishWithOpenTelemetryTracing.js,samples/README.md) + +__Usage:__ + + +`node openTelemetryTracing.js ` + + +----- + + + + ### Publish With Retry Settings Publishes a message to a topic with retry settings. diff --git a/samples/listenWithOpenTelemetryTracing.js b/samples/listenWithOpenTelemetryTracing.js new file mode 100644 index 000000000..55178dde5 --- /dev/null +++ b/samples/listenWithOpenTelemetryTracing.js @@ -0,0 +1,134 @@ +// Copyright 2020-2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Subscribe with OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in a subscriber. +// usage: node listenWithOpenTelemetryTracing.js + +const OTEL_TIMEOUT = 2; +const SUBSCRIBER_TIMEOUT = 10; + +// [START pubsub_subscribe_otel_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Imports the OpenTelemetry API +const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node'); +const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api'); +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +// To output to the console for testing, use the ConsoleSpanExporter. +// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base'; + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +const { + TraceExporter, +} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); + +const {Resource} = require('@opentelemetry/resources'); +const { + SEMRESATTRS_SERVICE_NAME, +} = require('@opentelemetry/semantic-conventions'); + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +// const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub({enableOpenTelemetryTracing: true}); + +async function subscriptionListen(subscriptionNameOrId) { + const subscriber = pubSubClient.subscription(subscriptionNameOrId); + + // Message handler for subscriber + const messageHandler = async message => { + console.log(`Message ${message.id} received.`); + message.ack(); + }; + + // Error handler for subscriber + const errorHandler = async error => { + console.log('Received error:', error); + }; + + // Listens for new messages from the topic + subscriber.on('message', messageHandler); + subscriber.on('error', errorHandler); + + // Ensures that all spans got flushed by the exporter. This function + // is in service to making sure that any buffered Pub/Sub messages + // and/or OpenTelemetry spans are properly flushed to the server + // side. In normal usage, you'd only need to do something like this + // on process shutdown. + async function shutdown() { + await subscriber.close(); + await processor.forceFlush(); + await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000)); + } + + // Wait a bit for the subscription to receive messages, then shut down + // gracefully. This is for the sample only; normally you would not need + // this delay. + await new Promise(r => + setTimeout(async () => { + subscriber.removeAllListeners(); + await shutdown(); + r(); + }, SUBSCRIBER_TIMEOUT * 1000) + ); +} +// [END pubsub_subscribe_otel_tracing] + +function main(subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID') { + subscriptionListen(subscriptionNameOrId).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/openTelemetryTracing.js b/samples/openTelemetryTracing.js deleted file mode 100644 index 1c0af995a..000000000 --- a/samples/openTelemetryTracing.js +++ /dev/null @@ -1,136 +0,0 @@ -/*! - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* eslint-disable n/no-process-exit */ - -/** - * This sample demonstrates how to add OpenTelemetry tracing to the - * Google Cloud Pub/Sub API. - * - * For more information, see the README.md under /pubsub and the documentation - * at https://cloud.google.com/pubsub/docs. - */ - -'use strict'; - -// sample-metadata: -// title: OpenTelemetry Tracing -// description: Demonstrates how to enable OpenTelemetry tracing in -// a publisher or subscriber. -// usage: node openTelemetryTracing.js - -const SUBSCRIBER_TIMEOUT = 10; - -function main( - topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', - subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', - data = 'Hello, world!' -) { - // [START opentelemetry_tracing] - /** - * TODO(developer): Uncomment these variables before running the sample. - */ - // const topicNameOrId = 'YOUR_TOPIC_OR_ID'; - // const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; - // const data = 'Hello, world!"; - - // Imports the Google Cloud client library - const {PubSub} = require('@google-cloud/pubsub'); - - // Imports the OpenTelemetry API - const opentelemetry = require('@opentelemetry/api'); - - // Imports the OpenTelemetry span handlers and exporter - const { - SimpleSpanProcessor, - BasicTracerProvider, - ConsoleSpanExporter, - } = require('@opentelemetry/tracing'); - - // Set up span processing and specify the console as the span exporter - const provider = new BasicTracerProvider(); - const exporter = new ConsoleSpanExporter(); - provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); - // Enable the diagnostic logger for Opentelemetry - opentelemetry.diag.setLogger( - new opentelemetry.DiagConsoleLogger(), - opentelemetry.DiagLogLevel.INFO - ); - - provider.register(); - - // OpenTelemetry tracing is an optional feature and can be enabled by setting - // enableOpenTelemetryTracing as a publisher or subscriber option - const enableOpenTelemetryTracing = { - enableOpenTelemetryTracing: true, - }; - - // Creates a client; cache this for further use - const pubSubClient = new PubSub(); - - async function publishMessage() { - // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject) - const dataBuffer = Buffer.from(data); - const messageId = await pubSubClient - .topic(topicNameOrId, enableOpenTelemetryTracing) - .publish(dataBuffer); - console.log(`Message ${messageId} published.`); - } - - async function subscriptionListen() { - // Message handler for subscriber - const messageHandler = message => { - console.log(`Message ${message.id} received.`); - message.ack(); - - // Ensure that all spans got flushed by the exporter - console.log('Cleaning up Opentelemetry exporter...'); - exporter.shutdown().then(() => { - // Cleaned up exporter. - process.exit(0); - }); - }; - - const errorHandler = error => { - console.log('Received error:', error); - - console.log('Cleaning up Opentelemetry exporter...'); - exporter.shutdown().then(() => { - // Cleaned up exporter. - process.exit(0); - }); - }; - - // Listens for new messages from the topic - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .on('message', messageHandler); - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .on('error', errorHandler); - - setTimeout(() => { - pubSubClient - .subscription(subscriptionNameOrId, enableOpenTelemetryTracing) - .removeAllListeners(); - }, SUBSCRIBER_TIMEOUT * 1000); - } - - publishMessage().then(subscriptionListen()); - // [END opentelemetry_tracing] -} - -main(...process.argv.slice(2)); diff --git a/samples/package.json b/samples/package.json index 5d1cb416c..9d2d5ff6f 100644 --- a/samples/package.json +++ b/samples/package.json @@ -21,10 +21,14 @@ "precompile": "npm run clean" }, "dependencies": { + "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.0.0", "@google-cloud/pubsub": "^4.6.0", "@google-cloud/storage": "^7.11.1", "@opentelemetry/api": "^1.6.0", - "@opentelemetry/tracing": "^0.24.0", + "@opentelemetry/resources": "^1.17.0", + "@opentelemetry/sdk-trace-base": "^1.17.0", + "@opentelemetry/sdk-trace-node": "^1.17.0", + "@opentelemetry/semantic-conventions": "^1.17.0", "avro-js": "^1.11.3", "p-defer": "^3.0.0", "protobufjs": "~7.3.0" diff --git a/samples/publishWithOpenTelemetryTracing.js b/samples/publishWithOpenTelemetryTracing.js new file mode 100644 index 000000000..42b529739 --- /dev/null +++ b/samples/publishWithOpenTelemetryTracing.js @@ -0,0 +1,110 @@ +// Copyright 2020-2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Publish with OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in a publisher. +// usage: node openTelemetryTracing.js + +const OTEL_TIMEOUT = 2; + +// [START pubsub_publish_otel_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_OR_ID'; +// const data = 'Hello, world!"; + +// Imports the Google Cloud client library +const {PubSub} = require('@google-cloud/pubsub'); + +// Imports the OpenTelemetry API +const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node'); +const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api'); +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +// To output to the console for testing, use the ConsoleSpanExporter. +// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base'; + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +const { + TraceExporter, +} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); + +const {Resource} = require('@opentelemetry/resources'); +const { + SEMRESATTRS_SERVICE_NAME, +} = require('@opentelemetry/semantic-conventions'); + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +// const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub({enableOpenTelemetryTracing: true}); + +async function publishMessage(topicNameOrId, data) { + // Publishes the message as a string, e.g. "Hello, world!" + // or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); + + // The rest of the sample is in service to making sure that any + // buffered Pub/Sub messages and/or OpenTelemetry spans are properly + // flushed to the server side. In normal usage, you'd only need to do + // something like this on process shutdown. + await publisher.flush(); + await processor.forceFlush(); + await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000)); +} +// [END pubsub_publish_otel_tracing] + +function main(topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', data = 'Hello, world!') { + publishMessage(topicNameOrId, data).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/system-test/openTelemetryTracing.test.ts b/samples/system-test/openTelemetryTracing.test.ts index 90d101ab5..14ff1c9d3 100644 --- a/samples/system-test/openTelemetryTracing.test.ts +++ b/samples/system-test/openTelemetryTracing.test.ts @@ -22,7 +22,7 @@ describe('openTelemetry', () => { const projectId = process.env.GCLOUD_PROJECT; const pubsub = new PubSub({projectId}); - const resources = new TestResources('quickstart'); + const resources = new TestResources('otel'); const topicName = resources.generateName('ot'); const subName = resources.generateName('ot'); @@ -43,12 +43,21 @@ describe('openTelemetry', () => { ); }); - it('should run the openTelemetryTracing sample', async () => { + it('should run the WithOpenTelemetryTracing samples', async () => { const stdout = execSync( - `${commandFor('openTelemetryTracing')} ${topicName} ${subName}` + `${commandFor('publishWithOpenTelemetryTracing')} ${topicName}` ); assert.match(stdout, /Message .* published./); - assert.match(stdout, /Message .* received/); + assert.match(stdout, /Cloud Trace batch writing traces/); + assert.match(stdout, /batchWriteSpans successfully/); assert.notMatch(stdout, /Received error/); + + const stdoutSub = execSync( + `${commandFor('listenWithOpenTelemetryTracing')} ${subName}` + ); + assert.match(stdoutSub, /Message .* received/); + assert.match(stdoutSub, /Cloud Trace batch writing traces/); + assert.match(stdoutSub, /batchWriteSpans successfully/); + assert.notMatch(stdoutSub, /Received error/); }); }); diff --git a/samples/typescript/listenWithOpenTelemetryTracing.ts b/samples/typescript/listenWithOpenTelemetryTracing.ts new file mode 100644 index 000000000..198f51f8c --- /dev/null +++ b/samples/typescript/listenWithOpenTelemetryTracing.ts @@ -0,0 +1,126 @@ +// Copyright 2020-2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Subscribe with OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in a subscriber. +// usage: node listenWithOpenTelemetryTracing.js + +const OTEL_TIMEOUT = 2; +const SUBSCRIBER_TIMEOUT = 10; + +// [START pubsub_subscribe_otel_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID'; + +// Imports the Google Cloud client library +import {Message, PubSub} from '@google-cloud/pubsub'; + +// Imports the OpenTelemetry API +import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node'; +import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api'; +import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base'; + +// To output to the console for testing, use the ConsoleSpanExporter. +// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base'; + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter'; + +import {Resource} from '@opentelemetry/resources'; +import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions'; + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +// const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub({enableOpenTelemetryTracing: true}); + +async function subscriptionListen(subscriptionNameOrId: string) { + const subscriber = pubSubClient.subscription(subscriptionNameOrId); + + // Message handler for subscriber + const messageHandler = async (message: Message) => { + console.log(`Message ${message.id} received.`); + message.ack(); + }; + + // Error handler for subscriber + const errorHandler = async (error: Error) => { + console.log('Received error:', error); + }; + + // Listens for new messages from the topic + subscriber.on('message', messageHandler); + subscriber.on('error', errorHandler); + + // Ensures that all spans got flushed by the exporter. This function + // is in service to making sure that any buffered Pub/Sub messages + // and/or OpenTelemetry spans are properly flushed to the server + // side. In normal usage, you'd only need to do something like this + // on process shutdown. + async function shutdown() { + await subscriber.close(); + await processor.forceFlush(); + await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000)); + } + + // Wait a bit for the subscription to receive messages, then shut down + // gracefully. This is for the sample only; normally you would not need + // this delay. + await new Promise(r => + setTimeout(async () => { + subscriber.removeAllListeners(); + await shutdown(); + r(); + }, SUBSCRIBER_TIMEOUT * 1000) + ); +} +// [END pubsub_subscribe_otel_tracing] + +function main(subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID') { + subscriptionListen(subscriptionNameOrId).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/publishWithOpenTelemetryTracing.ts b/samples/typescript/publishWithOpenTelemetryTracing.ts new file mode 100644 index 000000000..cd7a82b3a --- /dev/null +++ b/samples/typescript/publishWithOpenTelemetryTracing.ts @@ -0,0 +1,102 @@ +// Copyright 2020-2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to add OpenTelemetry tracing to the + * Google Cloud Pub/Sub API. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Publish with OpenTelemetry Tracing +// description: Demonstrates how to enable OpenTelemetry tracing in a publisher. +// usage: node openTelemetryTracing.js + +const OTEL_TIMEOUT = 2; + +// [START pubsub_publish_otel_tracing] +/** + * TODO(developer): Uncomment these variables before running the sample. + */ +// const topicNameOrId = 'YOUR_TOPIC_OR_ID'; +// const data = 'Hello, world!"; + +// Imports the Google Cloud client library +import {PubSub} from '@google-cloud/pubsub'; + +// Imports the OpenTelemetry API +import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node'; +import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api'; +import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base'; + +// To output to the console for testing, use the ConsoleSpanExporter. +// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base'; + +// To output to Cloud Trace, import the OpenTelemetry bridge library. +import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter'; + +import {Resource} from '@opentelemetry/resources'; +import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions'; + +// Enable the diagnostic logger for OpenTelemetry +diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG); + +// Log spans out to the console, for testing. +// const exporter = new ConsoleSpanExporter(); + +// Log spans out to Cloud Trace, for production. +const exporter = new TraceExporter(); + +// Build a tracer provider and a span processor to do +// something with the spans we're generating. +const provider = new NodeTracerProvider({ + resource: new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example', + }), +}); +const processor = new SimpleSpanProcessor(exporter); +provider.addSpanProcessor(processor); +provider.register(); + +// Creates a client; cache this for further use. +const pubSubClient = new PubSub({enableOpenTelemetryTracing: true}); + +async function publishMessage(topicNameOrId: string, data: string) { + // Publishes the message as a string, e.g. "Hello, world!" + // or JSON.stringify(someObject) + const dataBuffer = Buffer.from(data); + const publisher = pubSubClient.topic(topicNameOrId); + const messageId = await publisher.publishMessage({data: dataBuffer}); + console.log(`Message ${messageId} published.`); + + // The rest of the sample is in service to making sure that any + // buffered Pub/Sub messages and/or OpenTelemetry spans are properly + // flushed to the server side. In normal usage, you'd only need to do + // something like this on process shutdown. + await publisher.flush(); + await processor.forceFlush(); + await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000)); +} +// [END pubsub_publish_otel_tracing] + +function main(topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', data = 'Hello, world!') { + publishMessage(topicNameOrId, data).catch(err => { + console.error(err.message); + process.exitCode = 1; + }); +} + +main(...process.argv.slice(2)); diff --git a/src/iam.ts b/src/iam.ts index 87e4184ab..efc775013 100644 --- a/src/iam.ts +++ b/src/iam.ts @@ -34,6 +34,15 @@ export type SetPolicyCallback = RequestCallback; export type SetPolicyResponse = [Policy]; export type GetPolicyResponse = [Policy]; +/** + * Allows us to get the most up to date full name of an object. + * + * @private + */ +export interface Nameable { + name: string; +} + /** * Shows which IAM permissions is allowed. * The key to this object are the IAM permissions (string) and the values are @@ -95,12 +104,22 @@ export type TestIamPermissionsCallback = ResourceCallback< export class IAM { pubsub: PubSub; request: typeof PubSub.prototype.request; - id: string; + private nameable_: Nameable; - constructor(pubsub: PubSub, id: string) { + constructor(pubsub: PubSub, nameOrNameable: Nameable | string) { this.pubsub = pubsub; this.request = pubsub.request.bind(pubsub); - this.id = id; + if (typeof nameOrNameable === 'string') { + this.nameable_ = { + name: nameOrNameable, + }; + } else { + this.nameable_ = nameOrNameable; + } + } + + get id(): string { + return this.nameable_.name; } /** diff --git a/src/index.ts b/src/index.ts index aa80682ba..abc8afb08 100644 --- a/src/index.ts +++ b/src/index.ts @@ -184,3 +184,8 @@ if (process.env.DEBUG_GRPC) { } import * as protos from '../protos/protos'; export {protos}; + +// Deprecated; please see the updated OpenTelemetry sample +// for an example of how to use telemetry in this library. +import {legacyExports} from './telemetry-tracing'; +export {legacyExports as openTelemetry}; diff --git a/src/lease-manager.ts b/src/lease-manager.ts index d31c7111f..88e92640a 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -17,6 +17,7 @@ import {EventEmitter} from 'events'; import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; +import {Duration} from './temporal'; export interface FlowControlOptions { allowExcessMessages?: boolean; @@ -104,6 +105,8 @@ export class LeaseManager extends EventEmitter { this._messages.add(message); this.bytes += message.length; + message.subSpans.flowStart(); + if (allowExcessMessages! || !wasFull) { this._dispense(message); } else { @@ -120,13 +123,14 @@ export class LeaseManager extends EventEmitter { } } /** - * Removes ALL messages from inventory. + * Removes ALL messages from inventory, and returns the ones removed. * @private */ - clear(): void { + clear(): Message[] { const wasFull = this.isFull(); this._pending = []; + const remaining = Array.from(this._messages); this._messages.clear(); this.bytes = 0; @@ -135,6 +139,8 @@ export class LeaseManager extends EventEmitter { } this._cancelExtension(); + + return remaining; } /** * Indicates if we're at or over capacity. @@ -240,7 +246,11 @@ export class LeaseManager extends EventEmitter { */ private _dispense(message: Message): void { if (this._subscriber.isOpen) { - process.nextTick(() => this._subscriber.emit('message', message)); + message.subSpans.flowEnd(); + process.nextTick(() => { + message.subSpans.processingStart(this._subscriber.name); + this._subscriber.emit('message', message); + }); } } /** @@ -257,15 +267,24 @@ export class LeaseManager extends EventEmitter { const lifespan = (Date.now() - message.received) / (60 * 1000); if (lifespan < this._options.maxExtensionMinutes!) { + const deadlineDuration = Duration.from({seconds: deadline}); + message.subSpans.modAckStart(deadlineDuration, false); + if (this._subscriber.isExactlyOnceDelivery) { - message.modAckWithResponse(deadline).catch(e => { - // In the case of a permanent failure (temporary failures are retried), - // we need to stop trying to lease-manage the message. - message.ackFailed(e as AckError); - this.remove(message); - }); + message + .modAckWithResponse(deadline) + .catch(e => { + // In the case of a permanent failure (temporary failures are retried), + // we need to stop trying to lease-manage the message. + message.ackFailed(e as AckError); + this.remove(message); + }) + .finally(() => { + message.subSpans.modAckEnd(); + }); } else { message.modAck(deadline); + message.subSpans.modAckStart(deadlineDuration, false); } } else { this.remove(message); diff --git a/src/message-queues.ts b/src/message-queues.ts index e40be2265..6dfefc2cf 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -33,13 +33,19 @@ import { import {Duration} from './temporal'; import {addToBucket} from './util'; import {DebugMessage} from './debug'; +import * as tracing from './telemetry-tracing'; + +export interface ReducedMessage { + ackId: string; + tracingSpan?: tracing.Span; +} /** * @private */ export interface QueuedMessage { - ackId: string; - deadline?: number; + message: ReducedMessage; + deadline?: number; // seconds responsePromise?: defer.DeferredPromise; retryCount: number; } @@ -176,10 +182,10 @@ export abstract class MessageQueue { * Adds a message to the queue. * * @param {Message} message The message to add. - * @param {number} [deadline] The deadline. + * @param {number} [deadline] The deadline in seconds. * @private */ - add({ackId}: Message, deadline?: number): Promise { + add(message: Message, deadline?: number): Promise { if (this._closed) { if (this._subscriber.isExactlyOnceDelivery) { throw new AckError(AckResponses.Invalid, 'Subscriber closed'); @@ -192,7 +198,10 @@ export abstract class MessageQueue { const responsePromise = defer(); this._requests.push({ - ackId, + message: { + ackId: message.ackId, + tracingSpan: message.parentSpan, + }, deadline, responsePromise, retryCount: 0, @@ -379,9 +388,9 @@ export abstract class MessageQueue { const codes: AckErrorCodes = processAckErrorInfo(rpcError); for (const m of batch) { - if (codes.has(m.ackId)) { + if (codes.has(m.message.ackId)) { // This ack has an ErrorInfo entry, so use that to route it. - const code = codes.get(m.ackId)!; + const code = codes.get(m.message.ackId)!; if (code.transient) { // Transient errors get retried. toRetry.push(m); @@ -407,7 +416,7 @@ export abstract class MessageQueue { // stream message if an unknown error happens during ack. const others = toError.get(AckResponses.Other); if (others?.length) { - const otherIds = others.map(e => e.ackId); + const otherIds = others.map(e => e.message.ackId); const debugMsg = new BatchError(rpcError, otherIds, operation); this._subscriber.emit('debug', debugMsg); } @@ -468,8 +477,13 @@ export class AckQueue extends MessageQueue { * @return {Promise} */ protected async _sendBatch(batch: QueuedMessages): Promise { + const responseSpan = tracing.PubsubSpans.createAckRpcSpan( + batch.map(b => b.message.tracingSpan), + this._subscriber.name, + 'AckQueue._sendBatch' + ); const client = await this._subscriber.getClient(); - const ackIds = batch.map(({ackId}) => ackId); + const ackIds = batch.map(({message}) => message.ackId); const reqOpts = {subscription: this._subscriber.name, ackIds}; try { @@ -477,6 +491,7 @@ export class AckQueue extends MessageQueue { // It's okay if these pass through since they're successful anyway. this.handleAckSuccesses(batch); + responseSpan?.end(); return []; } catch (e) { // If exactly-once delivery isn't enabled, don't do error processing. We'll @@ -500,6 +515,7 @@ export class AckQueue extends MessageQueue { batch.forEach(m => { m.responsePromise?.reject(exc); }); + responseSpan?.end(); return []; } } @@ -541,13 +557,23 @@ export class ModAckQueue extends MessageQueue { const callOptions = this.getCallOptions(); const modAckRequests = Object.keys(modAckTable).map(async deadline => { const messages = modAckTable[deadline]; - const ackIds = messages.map(m => m.ackId); + const ackIds = messages.map(m => m.message.ackId); const ackDeadlineSeconds = Number(deadline); const reqOpts = {subscription, ackIds, ackDeadlineSeconds}; + const responseSpan = tracing.PubsubSpans.createModackRpcSpan( + messages.map(b => b.message.tracingSpan), + this._subscriber.name, + ackDeadlineSeconds === 0 ? 'nack' : 'modack', + 'ModAckQueue._sendBatch', + Duration.from({seconds: ackDeadlineSeconds}) + ); + try { await client.modifyAckDeadline(reqOpts, callOptions); + responseSpan?.end(); + // It's okay if these pass through since they're successful anyway. this.handleAckSuccesses(messages); return []; diff --git a/src/opentelemetry-tracing.ts b/src/opentelemetry-tracing.ts deleted file mode 100644 index df3836f72..000000000 --- a/src/opentelemetry-tracing.ts +++ /dev/null @@ -1,61 +0,0 @@ -/*! - * Copyright 2020 Google LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { - Tracer, - SpanAttributes, - SpanContext, - Span, - context, - trace, - SpanKind, -} from '@opentelemetry/api'; - -// eslint-disable-next-line @typescript-eslint/no-var-requires -const PKG = require('../../package.json'); - -/** - * @internal - * Instantiates a Opentelemetry tracer for the library - */ -const libraryTracer: Tracer = trace.getTracer( - '@google-cloud/pubsub', - PKG.version -); - -/** - * Creates a new span with the given properties - * - * @param {string} spanName the name for the span - * @param {Attributes?} attributes an object containing the attributes to be set for the span - * @param {SpanContext?} parent the context of the parent span to link to the span - */ -export function createSpan( - spanName: string, - kind: SpanKind, - attributes?: SpanAttributes, - parent?: SpanContext -): Span { - return libraryTracer.startSpan( - spanName, - { - // set the kind of the span - kind, - // set the attributes of the span - attributes: attributes, - }, - parent ? trace.setSpanContext(context.active(), parent) : undefined - ); -} diff --git a/src/publisher/flow-publisher.ts b/src/publisher/flow-publisher.ts index ebfee5b0f..74e3b3279 100644 --- a/src/publisher/flow-publisher.ts +++ b/src/publisher/flow-publisher.ts @@ -17,6 +17,7 @@ import {Publisher} from '.'; import {FlowControl} from './flow-control'; import {PubsubMessage, calculateMessageSize} from './pubsub-message'; +import * as tracing from '../telemetry-tracing'; /** * Encapsulates a series of message publishes from a rapid loop (or similar @@ -76,7 +77,11 @@ export class FlowControlledPublisher { * ``` */ publish(message: PubsubMessage): Promise | null { + const flowSpan = message.parentSpan + ? tracing.PubsubSpans.createPublishFlowSpan(message) + : undefined; const doPublish = () => { + flowSpan?.end(); this.doPublish(message); }; diff --git a/src/publisher/index.ts b/src/publisher/index.ts index aa017a46d..a4ab81fa6 100644 --- a/src/publisher/index.ts +++ b/src/publisher/index.ts @@ -16,15 +16,14 @@ import * as extend from 'extend'; import {CallOptions} from 'google-gax'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; -import {isSpanContextValid, Span, SpanKind} from '@opentelemetry/api'; +import {isSpanContextValid, Span} from '@opentelemetry/api'; import {BatchPublishOptions} from './message-batch'; import {Queue, OrderedQueue} from './message-queues'; import {Topic} from '../topic'; import {RequestCallback, EmptyCallback} from '../pubsub'; import {defaultOptions} from '../default-options'; -import {createSpan} from '../opentelemetry-tracing'; +import * as tracing from '../telemetry-tracing'; import {FlowControl, FlowControlOptions} from './flow-control'; import {promisifySome} from '../util'; @@ -39,6 +38,8 @@ export interface PublishOptions { flowControlOptions?: FlowControlOptions; gaxOpts?: CallOptions; messageOrdering?: boolean; + + /** @deprecated Unset and use context propagation. */ enableOpenTelemetryTracing?: boolean; } @@ -209,29 +210,23 @@ export class Publisher { } } - const span: Span | undefined = this.constructSpan(message); + // Ensure that there's a parent span for subsequent publishes + // to hang off of. + this.getParentSpan(message, 'Publisher.publishMessage'); if (!message.orderingKey) { this.queue.add(message, callback!); - if (span) { - span.end(); - } - return; - } - - const key = message.orderingKey; - - if (!this.orderedQueues.has(key)) { - const queue = new OrderedQueue(this, key); - this.orderedQueues.set(key, queue); - queue.once('drain', () => this.orderedQueues.delete(key)); - } + } else { + const key = message.orderingKey; - const queue = this.orderedQueues.get(key)!; - queue.add(message, callback!); + if (!this.orderedQueues.has(key)) { + const queue = new OrderedQueue(this, key); + this.orderedQueues.set(key, queue); + queue.once('drain', () => this.orderedQueues.delete(key)); + } - if (span) { - span.end(); + const queue = this.orderedQueues.get(key)!; + queue.add(message, callback!); } } @@ -332,54 +327,31 @@ export class Publisher { } /** - * Constructs an OpenTelemetry span + * Finds or constructs an telemetry publish/parent span for a message. * * @private * * @param {PubsubMessage} message The message to create a span for */ - constructSpan(message: PubsubMessage): Span | undefined { - if (!this.settings.enableOpenTelemetryTracing) { + getParentSpan(message: PubsubMessage, caller: string): Span | undefined { + const enabled = tracing.isEnabled(this.settings); + if (!enabled) { return undefined; } - const spanAttributes = { - // Add Opentelemetry semantic convention attributes to the span, based on: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md - [SemanticAttributes.MESSAGING_TEMP_DESTINATION]: false, - [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', - [SemanticAttributes.MESSAGING_OPERATION]: 'send', - [SemanticAttributes.MESSAGING_DESTINATION]: this.topic.name, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', - [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.messageId, - [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', - [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: - message.data?.length, - 'messaging.pubsub.ordering_key': message.orderingKey, - } as Attributes; - - const span: Span = createSpan( - `${this.topic.name} send`, - SpanKind.PRODUCER, - spanAttributes - ); + if (message.parentSpan) { + return message.parentSpan; + } - // If the span's context is valid we should pass the span context special attribute - if (isSpanContextValid(span.spanContext())) { - if ( - message.attributes && - message.attributes['googclient_OpenTelemetrySpanContext'] - ) { - console.warn( - 'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.' - ); - } - if (!message.attributes) { - message.attributes = {}; - } + const span = tracing.PubsubSpans.createPublisherSpan( + message, + this.topic.name, + caller + ); - message.attributes['googclient_OpenTelemetrySpanContext'] = - JSON.stringify(span.spanContext()); + // If the span's context is valid we should inject the propagation trace context. + if (span && isSpanContextValid(span.spanContext())) { + tracing.injectSpan(span, message, enabled); } return span; diff --git a/src/publisher/message-batch.ts b/src/publisher/message-batch.ts index 07a6222ce..a1ecdb0bf 100644 --- a/src/publisher/message-batch.ts +++ b/src/publisher/message-batch.ts @@ -16,6 +16,7 @@ import {BATCH_LIMITS, PubsubMessage, PublishCallback} from './'; import {calculateMessageSize} from './pubsub-message'; +import * as tracing from '../telemetry-tracing'; export interface BatchPublishOptions { maxBytes?: number; @@ -23,6 +24,17 @@ export interface BatchPublishOptions { maxMilliseconds?: number; } +/** + * Encapsulates a completed batch of messages. + * + * @private + * @internal + */ +export interface BatchResults { + messages: PubsubMessage[]; + callbacks: PublishCallback[]; +} + /** * @typedef BatchPublishOptions * @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to @@ -40,13 +52,15 @@ export interface BatchPublishOptions { * @param {BatchPublishOptions} options The batching options. */ export class MessageBatch { - options: BatchPublishOptions; messages: PubsubMessage[]; callbacks: PublishCallback[]; created: number; bytes: number; - constructor(options: BatchPublishOptions) { - this.options = options; + + constructor( + public options: BatchPublishOptions, + public topicName: string + ) { this.messages = []; this.callbacks = []; this.created = Date.now(); @@ -72,7 +86,18 @@ export class MessageBatch { this.messages.push(message); this.callbacks.push(callback); this.bytes += calculateMessageSize(message); + + tracing.PubsubSpans.createPublishSchedulerSpan(message); + } + + end(): BatchResults { + this.messages.forEach(m => m.publishSchedulerSpan?.end()); + return { + messages: this.messages, + callbacks: this.callbacks, + }; } + /** * Indicates if a given message can fit in the batch. * @@ -86,6 +111,7 @@ export class MessageBatch { this.bytes + calculateMessageSize(message) <= maxBytes! ); } + /** * Checks to see if this batch is at the maximum allowed payload size. * When publishing ordered messages, it is ok to exceed the user configured @@ -97,6 +123,7 @@ export class MessageBatch { const {maxMessages, maxBytes} = BATCH_LIMITS; return this.messages.length >= maxMessages! || this.bytes >= maxBytes!; } + /** * Indicates if the batch is at capacity. * diff --git a/src/publisher/message-queues.ts b/src/publisher/message-queues.ts index be5742cbf..8ed396bc8 100644 --- a/src/publisher/message-queues.ts +++ b/src/publisher/message-queues.ts @@ -21,7 +21,8 @@ import {BatchPublishOptions, MessageBatch} from './message-batch'; import {PublishError} from './publish-error'; import {Publisher, PubsubMessage, PublishCallback} from './'; import {google} from '../../protos/protos'; - +import * as tracing from '../telemetry-tracing'; +import {filterMessage} from './pubsub-message'; import {promisify} from 'util'; /** @@ -94,12 +95,32 @@ export abstract class MessageQueue extends EventEmitter { const {topic, settings} = this.publisher; const reqOpts = { topic: topic.name, - messages, + messages: messages.map(filterMessage), }; if (messages.length === 0) { return; } + // Make sure we have a projectId filled in to update telemetry spans. + // The overall spans may not have the correct projectId because it wasn't + // known at the time publishMessage was called. + const spanMessages = messages.filter(m => !!m.parentSpan); + if (spanMessages.length) { + if (!topic.pubsub.isIdResolved) { + await topic.pubsub.getClientConfig(); + } + spanMessages.forEach(m => { + tracing.PubsubSpans.updatePublisherTopicName(m.parentSpan!, topic.name); + tracing.PubsubEvents.publishStart(m); + }); + } + + const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan( + spanMessages, + topic.name, + 'MessageQueue._publish' + ); + const requestCallback = topic.request; const request = promisify(requestCallback.bind(topic)); try { @@ -119,6 +140,14 @@ export abstract class MessageQueue extends EventEmitter { callbacks.forEach(callback => callback(err)); throw e; + } finally { + messages.forEach(m => { + // We're finished with both the RPC and the whole publish operation, + // so close out all of the related spans. + rpcSpan?.end(); + tracing.PubsubEvents.publishEnd(m); + m.parentSpan?.end(); + }); } } } @@ -135,7 +164,7 @@ export class Queue extends MessageQueue { batch: MessageBatch; constructor(publisher: Publisher) { super(publisher); - this.batch = new MessageBatch(this.batchOptions); + this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name); } // This needs to update our existing message batch. @@ -200,9 +229,9 @@ export class Queue extends MessageQueue { * @emits Queue#drain when all messages are sent. */ async _publishInternal(fullyDrain: boolean): Promise { - const {messages, callbacks} = this.batch; + const {messages, callbacks} = this.batch.end(); - this.batch = new MessageBatch(this.batchOptions); + this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name); if (this.pending) { clearTimeout(this.pending); @@ -326,7 +355,7 @@ export class OrderedQueue extends MessageQueue { * @returns {MessageBatch} */ createBatch(): MessageBatch { - return new MessageBatch(this.batchOptions); + return new MessageBatch(this.batchOptions, this.publisher.topic.name); } /** * In the event of a publish failure, we need to cache the error in question @@ -369,7 +398,7 @@ export class OrderedQueue extends MessageQueue { delete this.pending; } - const {messages, callbacks} = this.batches.pop()!; + const {messages, callbacks} = this.batches.pop()!.end(); try { await this._publish(messages, callbacks); diff --git a/src/publisher/pubsub-message.ts b/src/publisher/pubsub-message.ts index a1e1283d5..ac69563b1 100644 --- a/src/publisher/pubsub-message.ts +++ b/src/publisher/pubsub-message.ts @@ -15,6 +15,7 @@ */ import {google} from '../../protos/protos'; +import * as tracing from '../telemetry-tracing'; /** * Strings are the only allowed values for keys and values in message attributes. @@ -24,7 +25,9 @@ export type Attributes = Record; /** * The basic {data, attributes} for a message to be published. */ -export interface PubsubMessage extends google.pubsub.v1.IPubsubMessage { +export interface PubsubMessage + extends google.pubsub.v1.IPubsubMessage, + tracing.MessageWithAttributes { /** * If we've calculated the size of this message, it will be cached here. * This is done to avoid having to build up the attribute size over and over. @@ -33,8 +36,76 @@ export interface PubsubMessage extends google.pubsub.v1.IPubsubMessage { * may change, and it may disappear later. * * @private + * @internal */ calculatedSize?: number; + + // The following are here instead of inside an object (like subs) because we + // don't get to control what these objects are. They come from grpc. + + /** + * If tracing is enabled, track the message span. + * + * @private + * @internal + */ + messageSpan?: tracing.Span; + + /** + * If tracing is enabled, track the batching (publish scheduling) period. + * + * @private + * @internal + */ + publishSchedulerSpan?: tracing.Span; + + /** + * If this is a message being received from a subscription, expose the ackId + * internally. Primarily for tracing. + * + * @private + * @internal + */ + ackId?: string; + + /** + * If this is a message being received from a subscription, expose the exactly + * once delivery flag internally. Primarily for tracing. + * + * @private + * @internal + */ + isExactlyOnceDelivery?: boolean; +} + +/** + * Since we tag a fair number of extra things into messages sent to the Pub/Sub + * server, this filters everything down to what needs to be sent. This should be + * used right before gRPC calls. + * + * @private + * @internal + */ +export function filterMessage( + message: PubsubMessage +): google.pubsub.v1.IPubsubMessage { + const filtered = {} as PubsubMessage; + if (message.data) { + filtered.data = message.data; + } + if (message.attributes) { + filtered.attributes = message.attributes; + } + if (message.messageId) { + filtered.messageId = message.messageId; + } + if (message.publishTime) { + filtered.publishTime = message.publishTime; + } + if (message.orderingKey) { + filtered.orderingKey = message.orderingKey; + } + return filtered; } /** @@ -52,6 +123,7 @@ export interface PubsubMessage extends google.pubsub.v1.IPubsubMessage { * may change. * * @private + * @internal */ export function calculateMessageSize( message: PubsubMessage | google.pubsub.v1.IPubsubMessage diff --git a/src/pubsub.ts b/src/pubsub.ts index 03a2ff9eb..cbf0548c5 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -58,6 +58,7 @@ import {CallOptions} from 'google-gax'; import {Transform} from 'stream'; import {google} from '../protos/protos'; import {SchemaServiceClient} from './v1'; +import * as tracing from './telemetry-tracing'; /** * Project ID placeholder. @@ -88,6 +89,12 @@ export interface ClientConfig extends gax.GrpcClientOptions { servicePath?: string; port?: string | number; sslCreds?: gax.grpc.ChannelCredentials; + + /** + * Enables OpenTelemetry tracing (newer, more full implementation). This + * defaults to false/undefined + */ + enableOpenTelemetryTracing?: boolean; } export interface PageOptions { @@ -316,6 +323,11 @@ export class PubSub { }, options ); + + if (this.options.enableOpenTelemetryTracing) { + tracing.setGloballyEnabled(true); + } + /** * @name PubSub#isEmulator * @type {boolean} @@ -578,16 +590,6 @@ export class PubSub { } subscription.metadata = resp!; - // If this is the first call we've made, the projectId might be empty still. - if (subscription.name?.includes(PROJECT_ID_PLACEHOLDER)) { - if (subscription.metadata && subscription.metadata.name) { - subscription.name = Subscription.formatName_( - this.projectId, - subscription.metadata.name - ); - } - } - callback!(null, subscription, resp!); } ); @@ -683,13 +685,6 @@ export class PubSub { } topic.metadata = resp!; - // If this is the first call we've made, the projectId might be empty still. - if (topic.name?.includes(PROJECT_ID_PLACEHOLDER)) { - if (topic.metadata && topic.metadata.name) { - topic.name = Topic.formatName_(this.projectId, topic.metadata.name); - } - } - callback!(null, topic, resp!); } ); diff --git a/src/subscriber.ts b/src/subscriber.ts index fe5defc8e..731c018f5 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -17,9 +17,6 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; -import {EventEmitter} from 'events'; -import {SpanContext, Span, SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {google} from '../protos/protos'; import {Histogram} from './histogram'; @@ -29,8 +26,9 @@ import {MessageStream, MessageStreamOptions} from './message-stream'; import {Subscription} from './subscription'; import {defaultOptions} from './default-options'; import {SubscriberClient} from './v1'; -import {createSpan} from './opentelemetry-tracing'; +import * as tracing from './telemetry-tracing'; import {Duration} from './temporal'; +import {EventEmitter} from 'events'; export type PullResponse = google.pubsub.v1.IStreamingPullResponse; export type SubscriptionProperties = @@ -64,6 +62,136 @@ export class AckError extends Error { } } +/** + * Tracks the various spans related to subscriber/receive tracing. + * + * @private + */ +export class SubscriberSpans { + parent: tracing.MessageWithAttributes; + + // These are always attached to a message. + constructor(parent: tracing.MessageWithAttributes) { + this.parent = parent; + } + + // Start a flow control span if needed. + flowStart() { + if (!this.flow) { + this.flow = tracing.PubsubSpans.createReceiveFlowSpan(this.parent); + } + } + + // End any flow control span. + flowEnd() { + if (this.flow) { + this.flow.end(); + this.flow = undefined; + } + } + + // Emit an event for starting to send an ack. + ackStart() { + tracing.PubsubEvents.ackStart(this.parent); + } + + // Emit an event for the ack having been sent. + ackEnd() { + tracing.PubsubEvents.ackEnd(this.parent); + } + + // Emit an event for calling ack. + ackCall() { + if (this.processing) { + tracing.PubsubEvents.ackCalled(this.processing); + } + } + + // Emit an event for starting to send a nack. + nackStart() { + tracing.PubsubEvents.nackStart(this.parent); + } + + // Emit an event for the nack having been sent. + nackEnd() { + tracing.PubsubEvents.nackEnd(this.parent); + } + + // Emit an event for calling nack. + nackCall() { + if (this.processing) { + tracing.PubsubEvents.nackCalled(this.processing); + } + } + + // Emit an event for starting to send a modAck. + modAckStart(deadline: Duration, isInitial: boolean) { + tracing.PubsubEvents.modAckStart(this.parent, deadline, isInitial); + } + + // Emit an event for the modAck having been sent. + modAckEnd() { + tracing.PubsubEvents.modAckEnd(this.parent); + } + + // Emit an event for calling modAck. + // Note that we don't currently support users calling modAck directly, but + // this may be used in the future for things like fully managed pull + // subscriptions. + modAckCall(deadline: Duration) { + if (this.processing) { + tracing.PubsubEvents.modAckCalled(this.processing, deadline); + } + } + + // Start a scheduler span if needed. + // Note: This is not currently used in Node, because there is no + // scheduler process, due to the way messages are delivered one at a time. + schedulerStart() { + if (!this.scheduler) { + this.scheduler = tracing.PubsubSpans.createReceiveSchedulerSpan( + this.parent + ); + } + } + + // End any scheduler span. + schedulerEnd() { + if (this.scheduler) { + this.scheduler.end(); + this.scheduler = undefined; + } + } + + // Start a processing span if needed. + // This is for user processing, during on('message') delivery. + processingStart(subName: string) { + if (!this.processing) { + this.processing = tracing.PubsubSpans.createReceiveProcessSpan( + this.parent, + subName + ); + } + } + + // End any processing span. + processingEnd() { + if (this.processing) { + this.processing.end(); + this.processing = undefined; + } + } + + // If we shut down before processing can finish. + shutdown() { + tracing.PubsubEvents.shutdown(this.parent); + } + + private flow?: tracing.Span; + private scheduler?: tracing.Span; + private processing?: tracing.Span; +} + /** * Date object with nanosecond precision. Supports all standard Date arguments * in addition to several custom types. @@ -91,7 +219,7 @@ export class AckError extends Error { * }); * ``` */ -export class Message { +export class Message implements tracing.MessageWithAttributes { ackId: string; attributes: {[key: string]: string}; data: Buffer; @@ -105,6 +233,46 @@ export class Message { private _subscriber: Subscriber; private _ackFailed?: AckError; + /** + * @private + * + * Tracks a telemetry tracing parent span through the receive process. This will + * be the original publisher-side span if we have one; otherwise we'll create + * a "publisher" span to hang new subscriber spans onto. + * + * This needs to be declared explicitly here, because having a public class + * implement a private interface seems to confuse TypeScript. (And it's needed + * in unit tests.) + */ + parentSpan?: tracing.Span; + + /** + * We'll save the state of the subscription's exactly once delivery flag at the + * time the message was received. This is pretty much only for tracing, as we will + * generally use the live state of the subscription to figure out how to respond. + * + * @private + * @internal + */ + isExactlyOnceDelivery: boolean; + + /** + * @private + * + * Ends any open subscribe telemetry tracing span. + */ + endParentSpan() { + this.parentSpan?.end(); + delete this.parentSpan; + } + + /** + * @private + * + * Tracks subscriber-specific telemetry objects through the library. + */ + subSpans: SubscriberSpans; + /** * @hideconstructor * @@ -182,6 +350,21 @@ export class Message { */ this.received = Date.now(); + /** + * Telemetry tracing objects. + * + * @private + */ + this.subSpans = new SubscriberSpans(this); + + /** + * Save the state of the subscription into the message for later tracing. + * + * @private + * @internal + */ + this.isExactlyOnceDelivery = sub.isExactlyOnceDelivery; + this._handled = false; this._length = this.data.length; this._subscriber = sub; @@ -219,6 +402,8 @@ export class Message { ack(): void { if (!this._handled) { this._handled = true; + this.subSpans.ackCall(); + this.subSpans.processingEnd(); this._subscriber.ack(this); } } @@ -246,6 +431,8 @@ export class Message { if (!this._handled) { this._handled = true; + this.subSpans.ackCall(); + this.subSpans.processingEnd(); try { return await this._subscriber.ackWithResponse(this); } catch (e) { @@ -259,12 +446,14 @@ export class Message { /** * Modifies the ack deadline. + * At present time, this should generally not be called by users. * * @param {number} deadline The number of seconds to extend the deadline. * @private */ modAck(deadline: number): void { if (!this._handled) { + this.subSpans.modAckCall(Duration.from({seconds: deadline})); this._subscriber.modAck(this, deadline); } } @@ -272,6 +461,7 @@ export class Message { /** * Modifies the ack deadline, expecting a response (for exactly-once delivery subscriptions). * If exactly-once delivery is not enabled, this will immediately resolve successfully. + * At present time, this should generally not be called by users. * * @param {number} deadline The number of seconds to extend the deadline. * @private @@ -287,6 +477,7 @@ export class Message { } if (!this._handled) { + this.subSpans.modAckCall(Duration.from({seconds: deadline})); try { return await this._subscriber.modAckWithResponse(this, deadline); } catch (e) { @@ -311,6 +502,8 @@ export class Message { nack(): void { if (!this._handled) { this._handled = true; + this.subSpans.nackCall(); + this.subSpans.processingEnd(); this._subscriber.nack(this); } } @@ -339,6 +532,8 @@ export class Message { if (!this._handled) { this._handled = true; + this.subSpans.nackCall(); + this.subSpans.processingEnd(); try { return await this._subscriber.nackWithResponse(this); } catch (e) { @@ -380,6 +575,9 @@ export interface SubscriberOptions { flowControl?: FlowControlOptions; useLegacyFlowControl?: boolean; streamingOptions?: MessageStreamOptions; + + /** @deprecated Unset this and instantiate a tracer; support will be + * enabled automatically. */ enableOpenTelemetryTracing?: boolean; } @@ -403,7 +601,7 @@ export class Subscriber extends EventEmitter { private _acks!: AckQueue; private _histogram: Histogram; private _inventory!: LeaseManager; - private _useOpentelemetry: boolean; + private _useLegacyOpenTelemetry: boolean; private _latencies: Histogram; private _modAcks!: ModAckQueue; private _name!: string; @@ -421,7 +619,7 @@ export class Subscriber extends EventEmitter { this.maxBytes = defaultOptions.subscription.maxOutstandingBytes; this.useLegacyFlowControl = false; this.isOpen = false; - this._useOpentelemetry = false; + this._useLegacyOpenTelemetry = false; this._histogram = new Histogram({min: 10, max: 600}); this._latencies = new Histogram(); this._subscription = subscription; @@ -565,12 +763,18 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); + tracing.PubsubEvents.ackStart(message); + // Ignore this in this version of the method (but hook catch // to avoid unhandled exceptions). const resultPromise = this._acks.add(message); resultPromise.catch(() => {}); await this._acks.onFlush(); + + tracing.PubsubEvents.ackEnd(message); + message.endParentSpan(); + this._inventory.remove(message); } @@ -586,7 +790,13 @@ export class Subscriber extends EventEmitter { const ackTimeSeconds = (Date.now() - message.received) / 1000; this.updateAckDeadline(ackTimeSeconds); + tracing.PubsubEvents.ackStart(message); + await this._acks.add(message); + + tracing.PubsubEvents.ackEnd(message); + message.endParentSpan(); + this._inventory.remove(message); // No exception means Success. @@ -607,10 +817,15 @@ export class Subscriber extends EventEmitter { this.isOpen = false; this._stream.destroy(); - this._inventory.clear(); + const remaining = this._inventory.clear(); await this._waitForFlush(); + remaining.forEach(m => { + m.subSpans.shutdown(); + m.endParentSpan(); + }); + this.emit('close'); this._acks.close(); @@ -636,7 +851,7 @@ export class Subscriber extends EventEmitter { * Modifies the acknowledge deadline for the provided message. * * @param {Message} message The message to modify. - * @param {number} deadline The deadline. + * @param {number} deadline The deadline in seconds. * @returns {Promise} * @private */ @@ -685,7 +900,10 @@ export class Subscriber extends EventEmitter { * @private */ async nack(message: Message): Promise { + message.subSpans.nackStart(); await this.modAck(message, 0); + message.subSpans.nackEnd(); + message.endParentSpan(); this._inventory.remove(message); } @@ -699,7 +917,11 @@ export class Subscriber extends EventEmitter { * @private */ async nackWithResponse(message: Message): Promise { - return await this.modAckWithResponse(message, 0); + message.subSpans.nackStart(); + const response = await this.modAckWithResponse(message, 0); + message.subSpans.nackEnd(); + message.endParentSpan(); + return response; } /** @@ -741,7 +963,7 @@ export class Subscriber extends EventEmitter { setOptions(options: SubscriberOptions): void { this._options = options; - this._useOpentelemetry = options.enableOpenTelemetryTracing || false; + this._useLegacyOpenTelemetry = options.enableOpenTelemetryTracing || false; // The user-set ackDeadline value basically pegs the extension time. // We'll emulate it by overwriting min/max. @@ -779,58 +1001,18 @@ export class Subscriber extends EventEmitter { } /** - * Constructs an OpenTelemetry span from the incoming message. + * Constructs a telemetry span from the incoming message. * * @param {Message} message One of the received messages * @private */ - private _constructSpan(message: Message): Span | undefined { - // Handle cases where OpenTelemetry is disabled or no span context was sent through message - if ( - !this._useOpentelemetry || - !message.attributes || - !message.attributes['googclient_OpenTelemetrySpanContext'] - ) { - return undefined; - } - - const spanValue = message.attributes['googclient_OpenTelemetrySpanContext']; - const parentSpanContext: SpanContext | undefined = spanValue - ? JSON.parse(spanValue) - : undefined; - const spanAttributes = { - // Original span attributes - ackId: message.ackId, - deliveryAttempt: message.deliveryAttempt, - // - // based on https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers - [SemanticAttributes.MESSAGING_SYSTEM]: 'pubsub', - [SemanticAttributes.MESSAGING_OPERATION]: 'process', - [SemanticAttributes.MESSAGING_DESTINATION]: this.name, - [SemanticAttributes.MESSAGING_DESTINATION_KIND]: 'topic', - [SemanticAttributes.MESSAGING_MESSAGE_ID]: message.id, - [SemanticAttributes.MESSAGING_PROTOCOL]: 'pubsub', - [SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: ( - message.data as Buffer - ).length, - // Not in Opentelemetry semantic convention but mimics naming - 'messaging.pubsub.received_at': message.received, - 'messaging.pubsub.acknowlege_id': message.ackId, - 'messaging.pubsub.delivery_attempt': message.deliveryAttempt, - }; - - // Subscriber spans should always have a publisher span as a parent. - // Return undefined if no parent is provided - const spanName = `${this.name} process`; - const span = parentSpanContext - ? createSpan( - spanName.trim(), - SpanKind.CONSUMER, - spanAttributes, - parentSpanContext - ) - : undefined; - return span; + private createParentSpan(message: Message): void { + const enabled = tracing.isEnabled({ + enableOpenTelemetryTracing: this._useLegacyOpenTelemetry, + }); + if (enabled) { + tracing.extractSpan(message, this.name, enabled); + } } /** @@ -860,12 +1042,16 @@ export class Subscriber extends EventEmitter { for (const data of receivedMessages!) { const message = new Message(this, data); - const span: Span | undefined = this._constructSpan(message); + this.createParentSpan(message); if (this.isOpen) { if (this.isExactlyOnceDelivery) { // For exactly-once delivery, we must validate that we got a valid // lease on the message before actually leasing it. + message.subSpans.modAckStart( + Duration.from({seconds: this.ackDeadline}), + true + ); message .modAckWithResponse(this.ackDeadline) .then(() => { @@ -875,17 +1061,23 @@ export class Subscriber extends EventEmitter { // Temporary failures will retry, so if an error reaches us // here, that means a permanent failure. Silently drop these. this._discardMessage(message); + }) + .finally(() => { + message.subSpans.modAckEnd(); }); } else { + message.subSpans.modAckStart( + Duration.from({seconds: this.ackDeadline}), + true + ); message.modAck(this.ackDeadline); + message.subSpans.modAckEnd(); this._inventory.add(message); } } else { + message.subSpans.shutdown(); message.nack(); } - if (span) { - span.end(); - } } } diff --git a/src/subscription.ts b/src/subscription.ts index a39f66e8c..0df829a67 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -14,7 +14,6 @@ * limitations under the License. */ -import {EventEmitter} from 'events'; import * as extend from 'extend'; import {CallOptions} from 'google-gax'; import snakeCase = require('lodash.snakecase'); @@ -47,6 +46,7 @@ import {Topic} from './topic'; import {promisifySome} from './util'; import {StatusError} from './message-stream'; import {DebugMessage} from './debug'; +import {EventEmitter} from 'stream'; export {AckError, AckResponse, AckResponses} from './subscriber'; @@ -265,13 +265,16 @@ export declare interface Subscription { * ``` */ export class Subscription extends EventEmitter { + // Note: WrappingEmitter is used here to wrap user processing callbacks. + // We do this to be able to build telemetry spans around them. pubsub: PubSub; iam: IAM; - name: string; topic?: Topic | string; metadata?: google.pubsub.v1.ISubscription; request: typeof PubSub.prototype.request; + private _subscriber: Subscriber; + constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions) { super(); @@ -279,7 +282,7 @@ export class Subscription extends EventEmitter { this.pubsub = pubsub; this.request = pubsub.request.bind(pubsub); - this.name = Subscription.formatName_(this.projectId, name); + this.id_ = name; this.topic = options.topic; /** @@ -320,7 +323,7 @@ export class Subscription extends EventEmitter { * }); * ``` */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, this); this._subscriber = new Subscriber(this, options); this._subscriber @@ -332,6 +335,11 @@ export class Subscription extends EventEmitter { this._listen(); } + private id_: string; + get name(): string { + return Subscription.formatName_(this.pubsub.projectId, this.id_); + } + /** * Indicates if the Subscription is open and receiving messages. * @@ -1194,6 +1202,7 @@ export class Subscription extends EventEmitter { return formatted as google.pubsub.v1.ISubscription; } + /*! * Format the name of a subscription. A subscription's full name is in the * format of projects/{projectId}/subscriptions/{subName}. diff --git a/src/telemetry-tracing.ts b/src/telemetry-tracing.ts new file mode 100644 index 000000000..b921518ab --- /dev/null +++ b/src/telemetry-tracing.ts @@ -0,0 +1,911 @@ +/*! + * Copyright 2020-2024 Google LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + Tracer, + SpanContext, + Span, + context, + trace, + propagation, + SpanKind, + TextMapGetter, + TextMapSetter, + ROOT_CONTEXT, + Context, + Link, +} from '@opentelemetry/api'; +import {Attributes, PubsubMessage} from './publisher/pubsub-message'; +import {PublishOptions} from './publisher/index'; +import {Duration} from './temporal'; + +export {Span}; + +// We need this to get the library version. +// eslint-disable-next-line @typescript-eslint/no-var-requires +const packageJson = require('../../package.json'); + +/** + * Instantiates a Opentelemetry tracer for the library + * + * @private + * @internal + */ +let cachedTracer: Tracer | undefined; +function getTracer(): Tracer { + const tracer = + cachedTracer ?? + trace.getTracer('@google-cloud/pubsub', packageJson.version); + cachedTracer = tracer; + return cachedTracer; +} + +/** + * Determination of the level of OTel support we're providing. + * + * @private + * @internal + */ +export enum OpenTelemetryLevel { + /** + * None: OTel support is not enabled because we found no trace provider, or + * the user has not enabled it. + */ + None = 0, + + /** + * Legacy: We found a trace provider, but the user also specified the old + * manual enable flag; this will trigger the legacy attribute being included. + * The modern propagation attribute will _also_ be included. + */ + Legacy = 1, + + /** + * Modern: We will only inject/extract the modern propagation attribute. + */ + Modern = 2, +} + +// True if user code elsewhere wants to enable OpenTelemetry support. +let globallyEnabled = false; + +/** + * Manually set the OpenTelemetry enabledness. + * + * @param enabled The enabled flag to use, to override any automated methods. + * @private + * @internal + */ +export function setGloballyEnabled(enabled: boolean) { + globallyEnabled = enabled; +} + +/** + * Tries to divine what sort of OpenTelemetry we're supporting. See the enum + * for the meaning of the values, and other notes. + * + * Legacy OTel is no longer officially supported, but we don't want to + * break anyone at a non-major. + * + * @private + * @internal + */ +export function isEnabled( + publishSettings?: PublishOptions +): OpenTelemetryLevel { + // If we're not enabled, skip everything. + if (!globallyEnabled) { + return OpenTelemetryLevel.None; + } + + if (publishSettings?.enableOpenTelemetryTracing) { + return OpenTelemetryLevel.Legacy; + } + + // Enable modern support. + return OpenTelemetryLevel.Modern; +} + +/** + * Our Carrier object for propagation is anything with an 'attributes' + * object, which is one of several possible Message classes. (They're + * different for publish and subscribe.) + * + * Also we add a parentSpan optional member for passing around the + * actual Span object within the client library. This can be a publish + * or subscriber span, depending on the context. + * + * @private + * @internal + */ +export interface MessageWithAttributes { + attributes?: Attributes | null | undefined; + parentSpan?: Span; +} + +/** + * Implements common members for the TextMap getter and setter interfaces for Pub/Sub messages. + * + * @private + * @internal + */ +export class PubsubMessageGetSet { + static keyPrefix = 'googclient_'; + + keys(carrier: MessageWithAttributes): string[] { + return Object.getOwnPropertyNames(carrier.attributes) + .filter(n => n.startsWith(PubsubMessageGetSet.keyPrefix)) + .map(n => n.substring(PubsubMessageGetSet.keyPrefix.length)); + } + + protected attributeName(key: string): string { + return `${PubsubMessageGetSet.keyPrefix}${key}`; + } +} + +/** + * Implements the TextMap getter interface for Pub/Sub messages. + * + * @private + * @internal + */ +export class PubsubMessageGet + extends PubsubMessageGetSet + implements TextMapGetter +{ + get( + carrier: MessageWithAttributes, + key: string + ): string | string[] | undefined { + return carrier?.attributes?.[this.attributeName(key)]; + } +} + +/** + * Implements the TextMap setter interface for Pub/Sub messages. + * + * @private + * @internal + */ +export class PubsubMessageSet + extends PubsubMessageGetSet + implements TextMapSetter +{ + set(carrier: MessageWithAttributes, key: string, value: string): void { + if (!carrier.attributes) { + carrier.attributes = {}; + } + carrier.attributes[this.attributeName(key)] = value; + } +} + +/** + * The getter to use when calling extract() on a Pub/Sub message. + * + * @private + * @internal + */ +export const pubsubGetter = new PubsubMessageGet(); + +/** + * The setter to use when calling inject() on a Pub/Sub message. + * + * @private + * @internal + */ +export const pubsubSetter = new PubsubMessageSet(); + +/** + * Description of the data structure passed for span attributes. + * + * @private + * @internal + */ +export interface SpanAttributes { + [x: string]: string | number | boolean; +} + +/** + * Converts a SpanContext to a full Context, as needed. + * + * @private + * @internal + */ +export function spanContextToContext( + parent?: SpanContext +): Context | undefined { + return parent ? trace.setSpanContext(context.active(), parent) : undefined; +} + +/** + * The modern propagation attribute name. + * + * Technically this is determined by the OpenTelemetry library, but + * in practice, it follows the W3C spec, so this should be the right + * one. The only thing we're using it for, anyway, is emptying user + * supplied attributes. + * + * @private + * @internal + */ +export const modernAttributeName = 'googclient_traceparent'; + +/** + * The old legacy attribute name. + * + * @private + * @internal + */ +export const legacyAttributeName = 'googclient_OpenTelemetrySpanContext'; + +export interface AttributeParams { + // Fully qualified. + topicName?: string; + subName?: string; + + // These are generally split from the fully qualified names. + projectId?: string; + topicId?: string; + subId?: string; +} + +/** + * Break down the subscription's full name into its project and ID. + * + * @private + * @internal + */ +export function getSubscriptionInfo(fullName: string): AttributeParams { + const results = fullName.match(/projects\/([^/]+)\/subscriptions\/(.+)/); + if (!results?.[0]) { + return { + subName: fullName, + }; + } + + return { + subName: fullName, + projectId: results[1], + subId: results[2], + }; +} + +/** + * Break down the subscription's full name into its project and ID. + * + * @private + * @internal + */ +export function getTopicInfo(fullName: string): AttributeParams { + const results = fullName.match(/projects\/([^/]+)\/topics\/(.+)/); + if (!results?.[0]) { + return { + topicName: fullName, + }; + } + + return { + topicName: fullName, + projectId: results[1], + topicId: results[2], + }; +} + +// Determines if a trace is to be sampled. There doesn't appear to be a sanctioned +// way to do this currently (isRecording does something different). +// +// Based on this: https://github.com/open-telemetry/opentelemetry-js/issues/4193 +function isSampled(span: Span) { + const FLAG_MASK_SAMPLED = 0x1; + const spanContext = span.spanContext(); + const traceFlags = spanContext?.traceFlags; + const sampled = !!( + traceFlags && (traceFlags & FLAG_MASK_SAMPLED) === FLAG_MASK_SAMPLED + ); + + return sampled; +} + +/** + * Contains utility methods for creating spans. + * + * @private + * @internal + */ +export class PubsubSpans { + static createAttributes( + params: AttributeParams, + message?: PubsubMessage, + caller?: string + ): SpanAttributes { + const destinationName = params.topicName ?? params.subName; + const destinationId = params.topicId ?? params.subId; + const projectId = params.projectId; + + // Purposefully leaving this debug check here as a comment - this should + // always be true, but we don't want to fail in prod if it's not. + /*if ( + (params.topicName && params.subName) || + (!destinationName && !projectId && !destinationId) + ) { + throw new Error( + 'One of topicName or subName must be specified, and must be fully qualified' + ); + }*/ + + const spanAttributes = { + // Add Opentelemetry semantic convention attributes to the span, based on: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/trace/semantic_conventions/messaging.md + ['messaging.system']: 'gcp_pubsub', + ['messaging.destination.name']: destinationId ?? destinationName, + ['gcp.project_id']: projectId, + ['code.function']: caller ?? 'unknown', + } as SpanAttributes; + + if (message) { + if (message.calculatedSize) { + spanAttributes['messaging.message.envelope.size'] = + message.calculatedSize; + } else { + if (message.data?.length) { + spanAttributes['messaging.message.envelope.size'] = + message.data?.length; + } + } + if (message.orderingKey) { + spanAttributes['messaging.gcp_pubsub.message.ordering_key'] = + message.orderingKey; + } + if (message.isExactlyOnceDelivery) { + spanAttributes['messaging.gcp_pubsub.message.exactly_once_delivery'] = + message.isExactlyOnceDelivery; + } + if (message.ackId) { + spanAttributes['messaging.gcp_pubsub.message.ack_id'] = message.ackId; + } + } + + return spanAttributes; + } + + static createPublisherSpan( + message: PubsubMessage, + topicName: string, + caller: string + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const topicInfo = getTopicInfo(topicName); + const span: Span = getTracer().startSpan(`${topicName} create`, { + kind: SpanKind.PRODUCER, + attributes: PubsubSpans.createAttributes(topicInfo, message, caller), + }); + if (topicInfo.topicId) { + span.updateName(`${topicInfo.topicId} create`); + span.setAttribute('messaging.destination.name', topicInfo.topicId); + } + + return span; + } + + static updatePublisherTopicName(span: Span, topicName: string) { + const topicInfo = getTopicInfo(topicName); + if (topicInfo.topicId) { + span.updateName(`${topicInfo.topicId} create`); + span.setAttribute('messaging.destination.name', topicInfo.topicId); + } else { + span.updateName(`${topicName} create`); + } + if (topicInfo.projectId) { + span.setAttribute('gcp.project_id', topicInfo.projectId); + } + } + + static createReceiveSpan( + message: PubsubMessage, + subName: string, + parent: Context | undefined, + caller: string + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const subInfo = getSubscriptionInfo(subName); + const name = `${subInfo.subId ?? subName} subscribe`; + const attributes = this.createAttributes(subInfo, message, caller); + if (subInfo.subId) { + attributes['messaging.destination.name'] = subInfo.subId; + } + + if (context) { + return getTracer().startSpan( + name, + { + kind: SpanKind.CONSUMER, + attributes, + }, + parent + ); + } else { + return getTracer().startSpan(name, { + kind: SpanKind.CONSUMER, + attributes, + }); + } + } + + static createChildSpan( + name: string, + message?: PubsubMessage, + parentSpan?: Span, + attributes?: SpanAttributes + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const parent = message?.parentSpan ?? parentSpan; + if (parent) { + return getTracer().startSpan( + name, + { + kind: SpanKind.INTERNAL, + attributes: attributes ?? {}, + }, + spanContextToContext(parent.spanContext()) + ); + } else { + return undefined; + } + } + + static createPublishFlowSpan(message: PubsubMessage): Span | undefined { + return PubsubSpans.createChildSpan('publisher flow control', message); + } + + static createPublishSchedulerSpan(message: PubsubMessage): Span | undefined { + return PubsubSpans.createChildSpan('publisher batching', message); + } + + static createPublishRpcSpan( + messages: MessageWithAttributes[], + topicName: string, + caller: string + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const spanAttributes = PubsubSpans.createAttributes( + getTopicInfo(topicName), + undefined, + caller + ); + const links: Link[] = messages + .filter(m => m.parentSpan && isSampled(m.parentSpan)) + .map(m => ({context: m.parentSpan!.spanContext()}) as Link) + .filter(l => l.context); + const span: Span = getTracer().startSpan( + `${topicName} send`, + { + kind: SpanKind.PRODUCER, + attributes: spanAttributes, + links, + }, + ROOT_CONTEXT + ); + span?.setAttribute('messaging.batch.message_count', messages.length); + if (span) { + // Also attempt to link from message spans back to the publish RPC span. + messages.forEach(m => { + if (m.parentSpan && isSampled(m.parentSpan)) { + m.parentSpan.addLink({context: span.spanContext()}); + } + }); + } + + return span; + } + + static createAckRpcSpan( + messageSpans: (Span | undefined)[], + subName: string, + caller: string + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const subInfo = getSubscriptionInfo(subName); + + const spanAttributes = PubsubSpans.createAttributes( + subInfo, + undefined, + caller + ); + const links: Link[] = messageSpans + .filter(m => m && isSampled(m)) + .map(m => ({context: m!.spanContext()}) as Link) + .filter(l => l.context); + const span: Span = getTracer().startSpan( + `${subInfo.subId ?? subInfo.subName} ack`, + { + kind: SpanKind.CONSUMER, + attributes: spanAttributes, + links, + }, + ROOT_CONTEXT + ); + + span?.setAttribute('messaging.batch.message_count', messageSpans.length); + + if (span) { + // Also attempt to link from the subscribe span(s) back to the publish RPC span. + messageSpans.forEach(m => { + if (m && isSampled(m)) { + m.addLink({context: span.spanContext()}); + } + }); + } + + return span; + } + + static createModackRpcSpan( + messageSpans: (Span | undefined)[], + subName: string, + type: 'modack' | 'nack', + caller: string, + deadline?: Duration, + isInitial?: boolean + ): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + const subInfo = getSubscriptionInfo(subName); + + const spanAttributes = PubsubSpans.createAttributes( + subInfo, + undefined, + caller + ); + const links: Link[] = messageSpans + .filter(m => m && isSampled(m)) + .map(m => ({context: m!.spanContext()}) as Link) + .filter(l => l.context); + const span: Span = getTracer().startSpan( + `${subInfo.subId ?? subInfo.subName} ${type}`, + { + kind: SpanKind.CONSUMER, + attributes: spanAttributes, + links, + }, + ROOT_CONTEXT + ); + + span?.setAttribute('messaging.batch.message_count', messageSpans.length); + + if (span) { + // Also attempt to link from the subscribe span(s) back to the publish RPC span. + messageSpans.forEach(m => { + if (m && isSampled(m)) { + m.addLink({context: span.spanContext()}); + } + }); + } + + if (deadline) { + span?.setAttribute( + 'messaging.gcp_pubsub.message.ack_deadline_seconds', + deadline.totalOf('second') + ); + } + + if (isInitial !== undefined) { + span?.setAttribute('messaging.gcp_pubsub.is_receipt_modack', isInitial); + } + + return span; + } + + static createReceiveFlowSpan( + message: MessageWithAttributes + ): Span | undefined { + return PubsubSpans.createChildSpan( + 'subscriber concurrency control', + message + ); + } + + static createReceiveSchedulerSpan( + message: MessageWithAttributes + ): Span | undefined { + return PubsubSpans.createChildSpan('subscriber scheduler', message); + } + + static createReceiveProcessSpan( + message: MessageWithAttributes, + subName: string + ): Span | undefined { + const subInfo = getSubscriptionInfo(subName); + return PubsubSpans.createChildSpan( + `${subInfo.subId ?? subName} process`, + message + ); + } + + static setReceiveProcessResult(span: Span, isAck: boolean) { + span.setAttribute('messaging.gcp_pubsub.result', isAck ? 'ack' : 'nack'); + } +} + +/** + * Creates and manipulates Pub/Sub-related events on spans. + * + * @private + * @internal + */ +export class PubsubEvents { + static addEvent( + text: string, + message: MessageWithAttributes, + attributes?: Attributes + ): void { + const parent = message.parentSpan; + if (!parent) { + return; + } + + parent.addEvent(text, attributes); + } + + static publishStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('publish start', message); + } + + static publishEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('publish end', message); + } + + static ackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('ack start', message); + } + + static ackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('ack end', message); + } + + static modackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack start', message); + } + + static modackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack end', message); + } + + static nackStart(message: MessageWithAttributes) { + PubsubEvents.addEvent('nack start', message); + } + + static nackEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('nack end', message); + } + + static ackCalled(span: Span) { + span.addEvent('ack called'); + } + + static nackCalled(span: Span) { + span.addEvent('nack called'); + } + + static modAckCalled(span: Span, deadline: Duration) { + // User-called modAcks are never initial ones. + span.addEvent('modack called', { + 'messaging.gcp_pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'second' + )}`, + 'messaging.gcp_pubsub.is_receipt_modack': 'false', + }); + } + + static modAckStart( + message: MessageWithAttributes, + deadline: Duration, + isInitial: boolean + ) { + PubsubEvents.addEvent('modack start', message, { + 'messaging.gcp_pubsub.modack_deadline_seconds': `${deadline.totalOf( + 'second' + )}`, + 'messaging.gcp_pubsub.is_receipt_modack': isInitial ? 'true' : 'false', + }); + } + + static modAckEnd(message: MessageWithAttributes) { + PubsubEvents.addEvent('modack end', message); + } + + // Add this event any time the process is shut down before processing + // of the message can complete. + static shutdown(message: MessageWithAttributes) { + PubsubEvents.addEvent('shutdown', message); + } +} + +/** + * Injects the trace context into a Pub/Sub message (or other object with + * an 'attributes' object) for propagation. + * + * This is for the publish side. + * + * @private + * @internal + */ +export function injectSpan( + span: Span, + message: MessageWithAttributes, + enabled: OpenTelemetryLevel +): void { + if (!globallyEnabled) { + return; + } + + if (!message.attributes) { + message.attributes = {}; + } + + if (message.attributes[modernAttributeName]) { + console.warn( + `${modernAttributeName} key set as message attribute, but will be overridden.` + ); + + delete message.attributes[modernAttributeName]; + } + + // If we're in legacy mode, add that header as well. + if (enabled === OpenTelemetryLevel.Legacy) { + if (message.attributes[legacyAttributeName]) { + console.warn( + `${legacyAttributeName} key set as message attribute, but will be overridden.` + ); + } + message.attributes[legacyAttributeName] = JSON.stringify( + span.spanContext() + ); + } + + // Always do propagation injection with the trace context. + const context = trace.setSpanContext(ROOT_CONTEXT, span.spanContext()); + propagation.inject(context, message, pubsubSetter); + + // Also put the direct reference to the Span object for while we're + // passing it around in the client library. + message.parentSpan = span; +} + +/** + * Returns true if this message potentially contains a span context. + * + * @private + * @internal + */ +export function containsSpanContext(message: MessageWithAttributes): boolean { + if (message.parentSpan) { + return true; + } + + if (!message.attributes) { + return false; + } + + const keys = Object.getOwnPropertyNames(message.attributes); + return !!keys.find( + n => n === legacyAttributeName || n === modernAttributeName + ); +} + +/** + * Extracts the trace context from a Pub/Sub message (or other object with + * an 'attributes' object) from a propagation, for receive processing. If no + * context was present, create a new parent span. + * + * This is for the receive side. + * + * @private + * @internal + */ +export function extractSpan( + message: MessageWithAttributes, + subName: string, + enabled: OpenTelemetryLevel +): Span | undefined { + if (!globallyEnabled) { + return undefined; + } + + if (message.parentSpan) { + return message.parentSpan; + } + + const keys = Object.getOwnPropertyNames(message.attributes ?? {}); + + let context: Context | undefined; + + if (enabled === OpenTelemetryLevel.Legacy) { + // Only prefer the legacy attributes to no trace context attribute. + if ( + keys.includes(legacyAttributeName) && + !keys.includes(modernAttributeName) + ) { + const legacyValue = message.attributes?.[legacyAttributeName]; + if (legacyValue) { + const parentSpanContext: SpanContext | undefined = legacyValue + ? JSON.parse(legacyValue) + : undefined; + if (parentSpanContext) { + context = spanContextToContext(parentSpanContext); + } + } + } + } else { + if (keys.includes(modernAttributeName)) { + context = propagation.extract(ROOT_CONTEXT, message, pubsubGetter); + } + } + + const span = PubsubSpans.createReceiveSpan( + message, + subName, + context, + 'extractSpan' + ); + message.parentSpan = span; + return span; +} + +// Since these were exported on the main Pub/Sub index in the previous +// version, we have to export them until the next major. +export const legacyExports = { + /** + * @deprecated + * Use the new telemetry functionality instead; see the updated OpenTelemetry + * sample for an example. + */ + createSpan: function ( + spanName: string, + kind: SpanKind, + attributes?: SpanAttributes, + parent?: SpanContext + ): Span { + if (!globallyEnabled) { + // This isn't great, but it's the fact of the situation. + return undefined as unknown as Span; + } else { + return getTracer().startSpan( + spanName, + { + kind, + attributes, + }, + parent ? trace.setSpanContext(context.active(), parent) : undefined + ); + } + }, +}; diff --git a/src/topic.ts b/src/topic.ts index 96b33490e..e0a5d03b6 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -106,7 +106,6 @@ export type MessageOptions = PubsubMessage & {json?: any}; * ``` */ export class Topic { - name: string; parent: PubSub; pubsub: PubSub; request: typeof PubSub.prototype.request; @@ -119,12 +118,12 @@ export class Topic { constructor(pubsub: PubSub, name: string, options?: PublishOptions) { /** - * The fully qualified name of this topic. + * The fully qualified name of this topic. May have a placeholder for + * the projectId if it's not been resolved. * @name Topic#name * @type {string} */ - this.name = Topic.formatName_(pubsub.projectId, name); - this.publisher = new Publisher(this, options); + this.id_ = name; /** * The parent {@link PubSub} instance of this topic instance. * @name Topic#pubsub @@ -136,6 +135,7 @@ export class Topic { * @type {PubSub} */ this.parent = this.pubsub = pubsub; + this.publisher = new Publisher(this, options); this.request = pubsub.request.bind(pubsub); /** * [IAM (Identity and Access @@ -180,7 +180,12 @@ export class Topic { * }); * ``` */ - this.iam = new IAM(pubsub, this.name); + this.iam = new IAM(pubsub, this); + } + + private id_: string; + get name(): string { + return Topic.formatName_(this.parent.projectId, this.id_); } /** diff --git a/test/iam.ts b/test/iam.ts index a2dab3ddc..d5f5763e2 100644 --- a/test/iam.ts +++ b/test/iam.ts @@ -81,10 +81,19 @@ describe('IAM', () => { assert.strictEqual(iam.request, fakeRequest); }); - it('should localize the ID', () => { + it('should localize the ID string', () => { assert.strictEqual(iam.id, ID); }); + it('should localize the ID getter', () => { + iam = new IAM(PUBSUB, { + get name() { + return 'test'; + }, + }); + assert.strictEqual(iam.id, 'test'); + }); + it('should promisify some of the things', () => { assert(promisified); }); diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 2ceb17cf7..ca26a4be0 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -47,9 +47,22 @@ class FakeSubscriber extends EventEmitter { isExactlyOnceDelivery = false; } +class FakeSubscriberTelemetry { + flowStart() {} + flowEnd() {} + schedulerStart() {} + schedulerEnd() {} + modAckStart() {} + modAckStop() {} + processingStart() {} + processingEnd() {} +} + class FakeMessage { length = 20; received: number; + subSpans: FakeSubscriberTelemetry = new FakeSubscriberTelemetry(); + constructor() { this.received = Date.now(); } @@ -58,6 +71,7 @@ class FakeMessage { return AckResponses.Success; } ackFailed() {} + endParentSpan() {} } interface LeaseManagerInternals { @@ -132,6 +146,15 @@ describe('LeaseManager', () => { }); describe('add', () => { + it('should start a flow span', () => { + const message = new FakeMessage() as {} as Message; + const stub = sandbox.spy(message.subSpans, 'flowStart'); + + leaseManager.add(message); + + assert.strictEqual(stub.calledOnce, true); + }); + it('should update the bytes/size values', () => { const message = new FakeMessage() as {} as Message; diff --git a/test/message-queues.ts b/test/message-queues.ts index 6fe7f829f..98e514f98 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -54,7 +54,7 @@ class FakeSubscriber extends EventEmitter { constructor() { super(); - this.name = uuid.v4(); + this.name = `projects/test/subscriptions/${uuid.v4()}`; this.client = new FakeClient(); this.iEOS = false; } @@ -252,7 +252,7 @@ describe('MessageQueues', () => { messageQueue.flush(); const [batch] = messageQueue.batches; - assert.strictEqual(batch[0].ackId, message.ackId); + assert.strictEqual(batch[0].message.ackId, message.ackId); assert.strictEqual(batch[0].deadline, deadline); assert.ok(batch[0].responsePromise?.resolve); }); @@ -593,7 +593,7 @@ describe('MessageQueues', () => { clock.tick(1000); assert.strictEqual(ackQueue.requests.length, 1); - assert.strictEqual(ackQueue.requests[0].ackId, message.ackId); + assert.strictEqual(ackQueue.requests[0].message.ackId, message.ackId); assert.strictEqual(ackQueue.numInRetryRequests, 0); assert.strictEqual(ackQueue.numPendingRequests, 1); }); diff --git a/test/opentelemetry-tracing.ts b/test/opentelemetry-tracing.ts deleted file mode 100644 index dc6a1423e..000000000 --- a/test/opentelemetry-tracing.ts +++ /dev/null @@ -1,60 +0,0 @@ -/*! - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as assert from 'assert'; -import {describe, it, beforeEach} from 'mocha'; - -import * as api from '@opentelemetry/api'; -import * as trace from '@opentelemetry/tracing'; -import {createSpan} from '../src/opentelemetry-tracing'; -import {exporter} from './tracing'; -import {SpanKind} from '@opentelemetry/api'; - -describe('OpenTelemetryTracer', () => { - let span: trace.Span; - const spanName = 'test-span'; - const spanContext: api.SpanContext = { - traceId: 'd4cda95b652f4a1592b449d5929fda1b', - spanId: '6e0c63257de34c92', - traceFlags: api.TraceFlags.SAMPLED, - }; - const spanAttributes: api.SpanAttributes = { - foo: 'bar', - }; - - beforeEach(() => { - exporter.reset(); - }); - - it('creates a span', () => { - span = createSpan( - spanName, - SpanKind.PRODUCER, - spanAttributes, - spanContext - ) as trace.Span; - span.end(); - - const spans = exporter.getFinishedSpans(); - assert.notStrictEqual(spans.length, 0); - const exportedSpan = spans.concat().pop()!; - - assert.strictEqual(exportedSpan.name, spanName); - assert.deepStrictEqual(exportedSpan.attributes, spanAttributes); - assert.strictEqual(exportedSpan.parentSpanId, spanContext.spanId); - assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); - }); -}); diff --git a/test/publisher/flow-publisher.ts b/test/publisher/flow-publisher.ts index cdeebdb37..c4c0ed6d4 100644 --- a/test/publisher/flow-publisher.ts +++ b/test/publisher/flow-publisher.ts @@ -27,10 +27,11 @@ import { } from '../../src/publisher'; import {FlowControl} from '../../src/publisher/flow-control'; import * as fp from '../../src/publisher/flow-publisher'; +import * as tracing from '../../src/telemetry-tracing'; class FakePublisher { flowControl!: FlowControl; - publishMessage() {} + async publishMessage() {} setOptions(options: PublishOptions) { this.flowControl.setOptions(options.flowControlOptions!); } @@ -47,6 +48,30 @@ describe('Flow control publisher', () => { afterEach(() => { sandbox.restore(); + tracing.setGloballyEnabled(false); + }); + + it('should create a flow span if a parent exists', async () => { + tracing.setGloballyEnabled(true); + + const fcp = new fp.FlowControlledPublisher(publisher); + const message = { + data: Buffer.from('foo'), + parentSpan: tracing.PubsubSpans.createPublisherSpan( + {}, + 'projects/foo/topics/topic', + 'tests' + ), + }; + fcp.publish(message as unknown as PubsubMessage); + assert.strictEqual(!!message.parentSpan, true); + }); + + it('should not create a flow span if no parent exists', async () => { + const fcp = new fp.FlowControlledPublisher(publisher); + const message = {data: Buffer.from('foo'), parentSpan: undefined}; + fcp.publish(message as unknown as PubsubMessage); + assert.strictEqual(!message.parentSpan, true); }); it('should get no promise if there is flow control space left', async () => { diff --git a/test/publisher/index.ts b/test/publisher/index.ts index 451bb82f2..a4e6f28fb 100644 --- a/test/publisher/index.ts +++ b/test/publisher/index.ts @@ -28,9 +28,9 @@ import {PublishError} from '../../src/publisher/publish-error'; import * as util from '../../src/util'; import {defaultOptions} from '../../src/default-options'; +import * as tracing from '../../src/telemetry-tracing'; import {exporter} from '../tracing'; import {SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; let promisified = false; const fakeUtil = Object.assign({}, util, { @@ -99,9 +99,11 @@ class FakeOrderedQueue extends FakeQueue { describe('Publisher', () => { let sandbox: sinon.SinonSandbox; let spy: sinon.SinonSpy; + const topicId = 'topic-name'; + const projectId = 'PROJECT_ID'; const topic = { - name: 'topic-name', - pubsub: {projectId: 'PROJECT_ID'}, + name: `projects/${projectId}/topics/${topicId}`, + pubsub: {projectId}, } as Topic; // tslint:disable-next-line variable-name @@ -127,6 +129,7 @@ describe('Publisher', () => { afterEach(() => { sandbox.restore(); + tracing.setGloballyEnabled(false); }); describe('initialization', () => { @@ -184,20 +187,24 @@ describe('Publisher', () => { describe('OpenTelemetry tracing', () => { let tracingPublisher: p.Publisher = {} as p.Publisher; - const enableTracing: p.PublishOptions = { - enableOpenTelemetryTracing: true, - }; const buffer = Buffer.from('Hello, world!'); beforeEach(() => { exporter.reset(); }); - it('export created spans', () => { + it('export created spans', async () => { + tracing.setGloballyEnabled(true); + // Setup trace exporting - tracingPublisher = new Publisher(topic, enableTracing); + tracingPublisher = new Publisher(topic); + const msg = {data: buffer} as p.PubsubMessage; + tracingPublisher.publishMessage(msg); + + // publishMessage is only the first part of the process now, + // so we need to manually end the span. + msg.parentSpan?.end(); - tracingPublisher.publish(buffer); const spans = exporter.getFinishedSpans(); assert.notStrictEqual(spans.length, 0, 'has span'); const createdSpan = spans.concat().pop()!; @@ -206,22 +213,14 @@ describe('Publisher', () => { opentelemetry.SpanStatusCode.UNSET ); assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_OPERATION], - 'send' - ); - assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM], - 'pubsub' + createdSpan.attributes['messaging.system'], + 'gcp_pubsub' ); assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION], - topic.name + createdSpan.attributes['messaging.destination.name'], + topicId ); - assert.strictEqual( - createdSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND], - 'topic' - ); - assert.strictEqual(createdSpan.name, 'topic-name send'); + assert.strictEqual(createdSpan.name, `${topicId} create`); assert.strictEqual( createdSpan.kind, SpanKind.PRODUCER, @@ -378,9 +377,11 @@ describe('Publisher', () => { }); it('should issue a warning if OpenTelemetry span context key is set', () => { + tracing.setGloballyEnabled(true); + const warnSpy = sinon.spy(console, 'warn'); const attributes = { - googclient_OpenTelemetrySpanContext: 'foobar', + [tracing.legacyAttributeName]: 'foobar', }; const fakeMessageWithOTKey = {data, attributes}; const publisherTracing = new Publisher(topic, { diff --git a/test/publisher/message-batch.ts b/test/publisher/message-batch.ts index 161756500..f1cfe0b29 100644 --- a/test/publisher/message-batch.ts +++ b/test/publisher/message-batch.ts @@ -33,7 +33,7 @@ describe('MessageBatch', () => { }; beforeEach(() => { - batch = new MessageBatch(Object.assign({}, options)); + batch = new MessageBatch(Object.assign({}, options), 'topicName'); }); afterEach(() => { @@ -57,7 +57,7 @@ describe('MessageBatch', () => { const now = Date.now(); sandbox.stub(Date, 'now').returns(now); - batch = new MessageBatch(options); + batch = new MessageBatch(options, 'topicName'); assert.strictEqual(batch.created, now); }); @@ -198,4 +198,10 @@ describe('MessageBatch', () => { assert.strictEqual(newOptions, batch.options); }); }); + + it('returns data from end()', () => { + const output = batch.end(); + assert.strictEqual(output.messages, batch.messages); + assert.strictEqual(output.callbacks, batch.callbacks); + }); }); diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index a74b60f53..b9cefea53 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -28,7 +28,7 @@ import * as q from '../../src/publisher/message-queues'; import {PublishError} from '../../src/publisher/publish-error'; class FakeTopic { - name = 'fake-topic'; + name = 'projects/foo/topics/fake-topic'; // eslint-disable-next-line @typescript-eslint/no-unused-vars request(config: RequestConfig, callback: RequestCallback): void {} } @@ -54,11 +54,13 @@ class FakeMessageBatch { messages: p.PubsubMessage[]; options: b.BatchPublishOptions; bytes: number; - constructor(options = {} as b.BatchPublishOptions) { + topicName: string; + constructor(options = {} as b.BatchPublishOptions, topicName = 'topicName') { this.callbacks = []; this.created = Date.now(); this.messages = []; this.options = options; + this.topicName = topicName; this.bytes = 0; } // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -76,6 +78,12 @@ class FakeMessageBatch { setOptions(options: b.BatchPublishOptions) { this.options = options; } + end() { + return { + messages: this.messages, + callbacks: this.callbacks, + }; + } } class FakePublishError { diff --git a/test/pubsub.ts b/test/pubsub.ts index d76312fd0..df330f692 100644 --- a/test/pubsub.ts +++ b/test/pubsub.ts @@ -25,6 +25,7 @@ import {google} from '../protos/protos'; import * as pubsubTypes from '../src/pubsub'; import {Snapshot} from '../src/snapshot'; import * as subby from '../src/subscription'; +import * as tracing from '../src/telemetry-tracing'; import {Topic} from '../src/topic'; import * as util from '../src/util'; import {Schema, SchemaTypes, ISchema, SchemaViews} from '../src/schema'; @@ -97,6 +98,20 @@ class FakeTopic { this.calledWith_ = args; } + // Simulate the on-demand name getter for Topic, unless a test + // explicitly sets one. + setName?: string; + get name() { + if (this.setName) { + return this.setName; + } + const pubsub = this.calledWith_[0] as pubsubTypes.PubSub; + return pubsub.projectId + '/foo'; + } + set name(val: string) { + this.setName = val; + } + static formatName_(): string { return 'foo'; } @@ -289,6 +304,17 @@ describe('PubSub', () => { assert.strictEqual(pubsub.isOpen, true); }); + it('should enable OpenTelemetry if requested', () => { + const options: pubsubTypes.ClientConfig = { + enableOpenTelemetryTracing: true, + }; + const pubsub = new PubSub(options); + assert.strictEqual( + tracing.isEnabled(), + tracing.OpenTelemetryLevel.Modern + ); + }); + it('should not be in the opened state after close()', async () => { await pubsub.close?.(); assert.strictEqual(pubsub.isOpen, false); @@ -592,18 +618,13 @@ describe('PubSub', () => { }); it('should fill the subscription object name if projectId was empty', async () => { - const subscription = {}; - pubsub.projectId = undefined; - sandbox.stub(pubsub, 'subscription').callsFake(() => { - // Simulate the project ID not being resolved. - const sub = subscription as subby.Subscription; - sub.name = '{{projectId}}/foo/bar'; - return sub; - }); + // Simulate the project ID not being resolved. + pubsub.projectId = '{{projectId}}'; sandbox .stub(pubsub, 'request') .callsFake((config, callback: Function) => { + pubsub.projectId = 'something'; callback(null, apiResponse); }); @@ -611,7 +632,6 @@ describe('PubSub', () => { TOPIC_NAME, SUB_NAME )!; - assert.strictEqual(sub, subscription); assert.strictEqual(sub.name.includes('{{'), false); assert.strictEqual(resp, apiResponse); }); @@ -704,16 +724,9 @@ describe('PubSub', () => { it('should fill the topic object name if projectId was empty', async () => { const topicName = 'new-topic'; - const topicInstance = {}; - sandbox.stub(pubsub, 'topic').callsFake(name => { - assert.strictEqual(name, topicName); - - // Simulate the project ID not being resolved. - const topic = topicInstance as Topic; - topic.name = 'projects/{{projectId}}/topics/new-topic'; - return topic; - }); + // Simulate the project ID not being resolved. + pubsub.projectId = '{{projectId}}'; requestStub.restore(); sandbox @@ -724,7 +737,6 @@ describe('PubSub', () => { }); const [topic, resp] = await pubsub.createTopic!(topicName)!; - assert.strictEqual(topic, topicInstance); assert.strictEqual(topic.name.includes('{{'), false); assert.strictEqual(resp, apiResponse); }); diff --git a/test/subscriber.ts b/test/subscriber.ts index cab00e286..f6bc4d2e2 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -28,14 +28,14 @@ import {google} from '../protos/protos'; import * as defer from 'p-defer'; import {HistogramOptions} from '../src/histogram'; -import {FlowControlOptions} from '../src/lease-manager'; +import {FlowControlOptions, LeaseManager} from '../src/lease-manager'; import {BatchOptions} from '../src/message-queues'; import {MessageStreamOptions} from '../src/message-stream'; import * as s from '../src/subscriber'; import {Subscription} from '../src/subscription'; import {SpanKind} from '@opentelemetry/api'; -import {SemanticAttributes} from '@opentelemetry/semantic-conventions'; import {Duration} from '../src'; +import * as tracing from '../src/telemetry-tracing'; type PullResponse = google.pubsub.v1.IStreamingPullResponse; @@ -58,12 +58,18 @@ class FakePubSub { } } +const projectId = uuid.v4(); +const subId = uuid.v4(); + class FakeSubscription { - name = uuid.v4(); - projectId = uuid.v4(); + name = `projects/${projectId}/subscriptions/${subId}`; pubsub = new FakePubSub(); } +interface PublicInventory { + _inventory: LeaseManager; +} + class FakeHistogram { options?: HistogramOptions; constructor(options?: HistogramOptions) { @@ -89,7 +95,9 @@ class FakeLeaseManager extends EventEmitter { } // eslint-disable-next-line @typescript-eslint/no-unused-vars add(message: s.Message): void {} - clear(): void {} + clear(): s.Message[] { + return []; + } // eslint-disable-next-line @typescript-eslint/no-unused-vars remove(message: s.Message): void {} } @@ -189,9 +197,15 @@ describe('Subscriber', () => { beforeEach(() => { sandbox = sinon.createSandbox(); fakeProjectify = { - replaceProjectIdToken: sandbox.stub().callsFake((name, projectId) => { - return `projects/${projectId}/name/${name}`; - }), + replaceProjectIdToken: sandbox + .stub() + .callsFake((name: string, projectId: string) => { + if (name.indexOf('/') >= 0) { + return name; + } else { + return `projects/${projectId}/name/${name}`; + } + }), }; const s = proxyquire('../src/subscriber.js', { @@ -219,6 +233,7 @@ describe('Subscriber', () => { afterEach(() => { sandbox.restore(); subscriber.close(); + tracing.setGloballyEnabled(false); }); describe('initialization', () => { @@ -543,12 +558,15 @@ describe('Subscriber', () => { assert.strictEqual(stub.callCount, 1); }); - it('should clear the inventory', () => { + it('should clear the inventory', async () => { + const message = new Message(subscriber, RECEIVED_MESSAGE); + const shutdownStub = sandbox.stub(tracing.PubsubEvents, 'shutdown'); const inventory: FakeLeaseManager = stubs.get('inventory'); - const stub = sandbox.stub(inventory, 'clear'); + const stub = sandbox.stub(inventory, 'clear').returns([message]); - subscriber.close(); + await subscriber.close(); assert.strictEqual(stub.callCount, 1); + assert.strictEqual(shutdownStub.callCount, 1); }); it('should emit a close event', done => { @@ -559,6 +577,7 @@ describe('Subscriber', () => { it('should nack any messages that come in after', () => { const stream: FakeMessageStream = stubs.get('messageStream'); const stub = sandbox.stub(subscriber, 'nack'); + const shutdownStub = sandbox.stub(tracing.PubsubEvents, 'shutdown'); const pullResponse = {receivedMessages: [RECEIVED_MESSAGE]}; subscriber.close(); @@ -566,6 +585,7 @@ describe('Subscriber', () => { const [{ackId}] = stub.lastCall.args; assert.strictEqual(ackId, RECEIVED_MESSAGE.ackId); + assert.strictEqual(shutdownStub.callCount, 1); }); describe('flushing the queues', () => { @@ -756,6 +776,8 @@ describe('Subscriber', () => { }); it('should add messages to the inventory', done => { + const message = new Message(subscriber, RECEIVED_MESSAGE); + subscriber.open(); const modAckStub = sandbox.stub(subscriber, 'modAck'); @@ -766,6 +788,15 @@ describe('Subscriber', () => { const inventory: FakeLeaseManager = stubs.get('inventory'); const addStub = sandbox.stub(inventory, 'add').callsFake(() => { const [addMsg] = addStub.lastCall.args; + + // OTel is enabled during tests, so we need to delete the baggage. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const [addMsgAny, msgAny] = [addMsg as any, message as any]; + delete addMsgAny.parentSpan; + delete addMsgAny.subSpans; + delete msgAny.parentSpan; + delete msgAny.subSpans; + assert.deepStrictEqual(addMsg, message); // test for receipt @@ -850,31 +881,33 @@ describe('Subscriber', () => { it('should not instantiate a tracer when tracing is disabled', () => { subscriber = new Subscriber(subscription, {}); - assert.strictEqual(subscriber['_useOpentelemetry'], false); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], false); }); it('should instantiate a tracer when tracing is enabled through constructor', () => { subscriber = new Subscriber(subscription, enableTracing); - assert.ok(subscriber['_useOpentelemetry']); + assert.ok(subscriber['_useLegacyOpenTelemetry']); }); it('should instantiate a tracer when tracing is enabled through setOptions', () => { subscriber = new Subscriber(subscription, {}); subscriber.setOptions(enableTracing); - assert.ok(subscriber['_useOpentelemetry']); + assert.ok(subscriber['_useLegacyOpenTelemetry']); }); it('should disable tracing when tracing is disabled through setOptions', () => { subscriber = new Subscriber(subscription, enableTracing); subscriber.setOptions(disableTracing); - assert.strictEqual(subscriber['_useOpentelemetry'], false); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], false); }); it('exports a span once it is created', () => { + tracing.setGloballyEnabled(true); + subscription = new FakeSubscription() as {} as Subscription; subscriber = new Subscriber(subscription, enableTracing); message = new Message(subscriber, RECEIVED_MESSAGE); - assert.strictEqual(subscriber['_useOpentelemetry'], true); + assert.strictEqual(subscriber['_useLegacyOpenTelemetry'], true); subscriber.open(); // Construct mock of received message with span context @@ -900,18 +933,26 @@ describe('Subscriber', () => { receivedMessages: [messageWithSpanContext], }; + const openedSub = subscriber as unknown as PublicInventory; + sandbox.stub(openedSub._inventory, 'add').callsFake((m: s.Message) => { + message = m; + }); + // Receive message and assert that it was exported const msgStream = stubs.get('messageStream'); msgStream.emit('data', pullResponse); + message.endParentSpan(); + const spans = exporter.getFinishedSpans(); assert.strictEqual(spans.length, 1); - const firstSpan = spans.concat().shift(); + assert.strictEqual(spans[0].events.length, 2); + const firstSpan = spans.pop(); assert.ok(firstSpan); assert.strictEqual(firstSpan.parentSpanId, parentSpanContext.spanId); assert.strictEqual( firstSpan.name, - `${subscriber.name} process`, + `${subId} subscribe`, 'name of span should match' ); assert.strictEqual( @@ -919,42 +960,29 @@ describe('Subscriber', () => { SpanKind.CONSUMER, 'span kind should be CONSUMER' ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_OPERATION], - 'process', - 'span messaging operation should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM], - 'pubsub' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID], - messageWithSpanContext.message.messageId, - 'span messaging id should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION], - subscriber.name, - 'span messaging destination should match' - ); - assert.strictEqual( - firstSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND], - 'topic' - ); }); - it('does not export a span when a span context is not present on message', () => { + it('exports a span even when a span context is not present on message', () => { + tracing.setGloballyEnabled(true); + subscriber = new Subscriber(subscription, enableTracing); + subscriber.open(); const pullResponse: s.PullResponse = { receivedMessages: [RECEIVED_MESSAGE], }; + const openedSub = subscriber as unknown as PublicInventory; + sandbox.stub(openedSub._inventory, 'add').callsFake((m: s.Message) => { + message = m; + }); + // Receive message and assert that it was exported const stream: FakeMessageStream = stubs.get('messageStream'); stream.emit('data', pullResponse); - assert.strictEqual(exporter.getFinishedSpans().length, 0); + + message.endParentSpan(); + assert.strictEqual(exporter.getFinishedSpans().length, 1); }); }); @@ -1165,4 +1193,102 @@ describe('Subscriber', () => { }); }); }); + + describe('SubscriberSpans', () => { + const message: tracing.MessageWithAttributes = { + attributes: {}, + parentSpan: undefined, + }; + const spans = new s.SubscriberSpans(message); + const fakeSpan = { + end() {}, + } as unknown as opentelemetry.Span; + + it('starts a flow span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveFlowSpan') + .returns(fakeSpan); + spans.flowStart(); + assert.strictEqual(stub.calledOnce, true); + assert.strictEqual(stub.args[0][0], message); + spans.flowStart(); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a flow span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveFlowSpan') + .returns(fakeSpan); + spans.flowStart(); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.flowEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.flowEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + + it('fires a modAck start event', () => { + const stub = sandbox.stub(tracing.PubsubEvents, 'modAckStart'); + spans.modAckStart(Duration.from({seconds: 10}), true); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.args[0][1].totalOf('second'), 10); + assert.strictEqual(stub.args[0][2], true); + assert.strictEqual(stub.calledOnce, true); + }); + + it('fires a modAck end event', () => { + const stub = sandbox.stub(tracing.PubsubEvents, 'modAckEnd'); + spans.modAckEnd(); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.calledOnce, true); + }); + + it('starts a scheduler span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.schedulerStart(); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.calledOnce, true); + spans.schedulerStart(); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a scheduler span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.schedulerStart(); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.schedulerEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.schedulerEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + + it('starts a processing span', () => { + const stub = sandbox + .stub(tracing.PubsubSpans, 'createReceiveProcessSpan') + .returns(fakeSpan); + const subName = 'foozle'; + spans.processingStart(subName); + assert.strictEqual(stub.args[0][0], message); + assert.strictEqual(stub.args[0][1], subName); + assert.strictEqual(stub.calledOnce, true); + spans.processingStart('boo'); + assert.strictEqual(stub.calledOnce, true); + }); + + it('ends a processing span', () => { + sandbox + .stub(tracing.PubsubSpans, 'createReceiveSchedulerSpan') + .returns(fakeSpan); + spans.processingStart('foozle'); + const spy = sandbox.spy(fakeSpan, 'end'); + spans.processingEnd(); + assert.strictEqual(spy.calledOnce, true); + spans.processingEnd(); + assert.strictEqual(spy.calledOnce, true); + }); + }); }); diff --git a/test/subscription.ts b/test/subscription.ts index c90835c1a..77717f617 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -170,7 +170,7 @@ describe('Subscription', () => { assert(subscription.iam instanceof FakeIAM); const args = (subscription.iam as {} as FakeIAM).calledWith_; assert.strictEqual(args[0], PUBSUB); - assert.strictEqual(args[1], subscription.name); + assert.strictEqual(args[1], subscription); }); it('should create a Subscriber', () => { @@ -200,7 +200,12 @@ describe('Subscription', () => { }); it('should emit messages', done => { - const message = {}; + const message = { + subSpans: { + processingStart() {}, + processingEnd() {}, + }, + }; subscription.on?.('message', (msg: Message) => { assert.strictEqual(msg, message); diff --git a/test/telemetry-tracing.ts b/test/telemetry-tracing.ts new file mode 100644 index 000000000..d624a6146 --- /dev/null +++ b/test/telemetry-tracing.ts @@ -0,0 +1,397 @@ +/*! + * Copyright 2020-2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {describe, it, beforeEach} from 'mocha'; + +import * as trace from '@opentelemetry/sdk-trace-base'; +import * as otel from '../src/telemetry-tracing'; +import {exporter} from './tracing'; +import {SpanKind} from '@opentelemetry/api'; +import sinon = require('sinon'); +import {PubsubMessage} from '../src/publisher'; + +describe('OpenTelemetryTracer', () => { + beforeEach(() => { + exporter.reset(); + otel.setGloballyEnabled(true); + }); + afterEach(() => { + exporter.reset(); + otel.setGloballyEnabled(false); + }); + + describe('project parser', () => { + it('parses subscription info', () => { + const name = 'projects/project-name/subscriptions/sub-name'; + const info = otel.getSubscriptionInfo(name); + assert.strictEqual(info.subName, name); + assert.strictEqual(info.projectId, 'project-name'); + assert.strictEqual(info.subId, 'sub-name'); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, undefined); + }); + + it('parses topic info', () => { + const name = 'projects/project-name/topics/topic-name'; + const info = otel.getTopicInfo(name); + assert.strictEqual(info.topicName, name); + assert.strictEqual(info.projectId, 'project-name'); + assert.strictEqual(info.topicId, 'topic-name'); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.subName, undefined); + }); + + it('parses broken subscription info', () => { + const name = 'projec/foo_foo/subs/sub_sub'; + const info = otel.getSubscriptionInfo(name); + assert.strictEqual(info.subName, name); + assert.strictEqual(info.projectId, undefined); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, undefined); + }); + + it('parses broken topic info', () => { + const name = 'projec/foo_foo/tops/top_top'; + const info = otel.getTopicInfo(name); + assert.strictEqual(info.subName, undefined); + assert.strictEqual(info.projectId, undefined); + assert.strictEqual(info.subId, undefined); + assert.strictEqual(info.topicId, undefined); + assert.strictEqual(info.topicName, name); + }); + }); + + describe('basic span creation', () => { + it('creates a span', () => { + const message: PubsubMessage = {}; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests' + ) as trace.Span; + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.notStrictEqual(spans.length, 0); + const exportedSpan = spans.concat().pop()!; + + assert.strictEqual(exportedSpan.name, 'topicfoo create'); + assert.strictEqual(exportedSpan.kind, SpanKind.PRODUCER); + }); + + it('injects a trace context', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests' + ) as trace.Span; + + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Modern); + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + }); + }); + + describe('context propagation', () => { + it('injects a trace context and legacy baggage', () => { + const message: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests' + ); + assert.ok(span); + + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); + + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.modernAttributeName + ), + true + ); + assert.strictEqual( + Object.getOwnPropertyNames(message.attributes).includes( + otel.legacyAttributeName + ), + true + ); + }); + + it('should issue a warning if OpenTelemetry span context key is set', () => { + const message: PubsubMessage = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + [otel.modernAttributeName]: 'bazbar', + }, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests' + ); + assert.ok(span); + + const warnSpy = sinon.spy(console, 'warn'); + try { + otel.injectSpan(span, message, otel.OpenTelemetryLevel.Legacy); + assert.strictEqual(warnSpy.callCount, 2); + } finally { + warnSpy.restore(); + } + }); + + it('should be able to determine if attributes are present', () => { + let message: otel.MessageWithAttributes = { + attributes: { + [otel.legacyAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = { + attributes: { + [otel.modernAttributeName]: 'foobar', + }, + }; + assert.strictEqual(otel.containsSpanContext(message), true); + + message = {}; + assert.strictEqual(otel.containsSpanContext(message), false); + }); + + it('extracts a trace context', () => { + const message = { + attributes: { + [otel.modernAttributeName]: + '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + otel.OpenTelemetryLevel.Modern + ); + assert.strictEqual( + childSpan!.spanContext().traceId, + 'd4cda95b652f4a1592b449d5929fda1b' + ); + }); + }); + + describe('attribute creation', () => { + it('creates attributes for publish', () => { + const topicInfo: otel.AttributeParams = { + topicName: 'projects/foo/topics/top', + topicId: 'top', + projectId: 'foo', + }; + const message: PubsubMessage = { + data: Buffer.from('test'), + attributes: {}, + calculatedSize: 1234, + orderingKey: 'key', + isExactlyOnceDelivery: true, + ackId: 'ackack', + }; + + const topicAttrs = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); + assert.deepStrictEqual(topicAttrs, { + 'messaging.system': 'gcp_pubsub', + 'messaging.destination.name': topicInfo.topicId, + 'gcp.project_id': topicInfo.projectId, + 'messaging.message.envelope.size': message.calculatedSize, + 'messaging.gcp_pubsub.message.ordering_key': message.orderingKey, + 'messaging.gcp_pubsub.message.exactly_once_delivery': + message.isExactlyOnceDelivery, + 'messaging.gcp_pubsub.message.ack_id': message.ackId, + 'code.function': 'tests', + }); + + // Check again with no calculated size and other parameters missing. + delete message.calculatedSize; + delete message.orderingKey; + delete message.isExactlyOnceDelivery; + delete message.ackId; + + const topicAttrs2 = otel.PubsubSpans.createAttributes( + topicInfo, + message, + 'tests' + ); + assert.deepStrictEqual(topicAttrs2, { + 'messaging.system': 'gcp_pubsub', + 'messaging.destination.name': topicInfo.topicId, + 'gcp.project_id': topicInfo.projectId, + 'messaging.message.envelope.size': message.data?.length, + 'code.function': 'tests', + }); + }); + }); + + describe('specialized span creation', () => { + const tests = { + topicInfo: { + topicName: 'projects/foo/topics/top', + topicId: 'top', + projectId: 'foo', + } as otel.AttributeParams, + subInfo: { + subName: 'projects/foo/subscriptions/sub', + subId: 'sub', + projectId: 'foo', + } as otel.AttributeParams, + message: { + data: Buffer.from('test'), + attributes: {}, + calculatedSize: 1234, + orderingKey: 'key', + isExactlyOnceDelivery: true, + ackId: 'ackack', + } as PubsubMessage, + }; + + it('creates publisher spans', () => { + const span = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName!, + 'tests' + ); + assert.ok(span); + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + + const firstSpan = spans.pop(); + assert.ok(firstSpan); + assert.strictEqual(firstSpan.name, `${tests.topicInfo.topicId} create`); + assert.strictEqual( + firstSpan.attributes['messaging.destination.name'], + tests.topicInfo.topicId + ); + assert.strictEqual( + firstSpan.attributes['messaging.system'], + 'gcp_pubsub' + ); + }); + + it('updates publisher topic names', () => { + const span = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName!, + 'tests' + ); + assert.ok(span); + otel.PubsubSpans.updatePublisherTopicName( + span, + 'projects/foo/topics/other' + ); + span.end(); + + const spans = exporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + + const firstSpan = spans.pop(); + assert.ok(firstSpan); + assert.strictEqual(firstSpan.name, 'other create'); + + assert.strictEqual( + firstSpan.attributes['messaging.destination.name'], + 'other' + ); + }); + + it('creates receive spans', () => { + const parentSpan = otel.PubsubSpans.createPublisherSpan( + tests.message, + tests.topicInfo.topicName!, + 'tests' + ); + assert.ok(parentSpan); + const span = otel.PubsubSpans.createReceiveSpan( + tests.message, + tests.subInfo.subName!, + otel.spanContextToContext(parentSpan.spanContext()), + 'tests' + ); + assert.ok(span); + span.end(); + parentSpan.end(); + + const spans = exporter.getFinishedSpans(); + const parentReadSpan = spans.pop(); + const childReadSpan = spans.pop(); + assert.ok(parentReadSpan && childReadSpan); + + assert.strictEqual(childReadSpan.name, 'sub subscribe'); + assert.strictEqual( + childReadSpan.attributes['messaging.destination.name'], + 'sub' + ); + assert.strictEqual(childReadSpan.kind, SpanKind.CONSUMER); + assert.ok(childReadSpan.parentSpanId); + }); + + it('creates publish RPC spans', () => { + const message: PubsubMessage = {}; + const topicName = 'projects/test/topics/topicfoo'; + const span = otel.PubsubSpans.createPublisherSpan( + message, + topicName, + 'test' + ) as trace.Span; + message.parentSpan = span; + span.end(); + + const publishSpan = otel.PubsubSpans.createPublishRpcSpan( + [message], + topicName, + 'test' + ); + + publishSpan?.end(); + const spans = exporter.getFinishedSpans(); + const publishReadSpan = spans.pop(); + const childReadSpan = spans.pop(); + assert.ok(publishReadSpan && childReadSpan); + + assert.strictEqual( + publishReadSpan.attributes['messaging.batch.message_count'], + 1 + ); + assert.strictEqual(publishReadSpan.links.length, 1); + assert.strictEqual(childReadSpan.links.length, 1); + }); + }); +}); diff --git a/test/topic.ts b/test/topic.ts index 1179c718b..bdbb7c808 100644 --- a/test/topic.ts +++ b/test/topic.ts @@ -188,7 +188,7 @@ describe('Topic', () => { }); it('should create an iam object', () => { - assert.deepStrictEqual(topic.iam.calledWith_, [PUBSUB, TOPIC_NAME]); + assert.deepStrictEqual(topic.iam.calledWith_, [PUBSUB, topic]); }); }); diff --git a/test/tracing.ts b/test/tracing.ts index 8b1b31146..7689253ad 100644 --- a/test/tracing.ts +++ b/test/tracing.ts @@ -18,7 +18,7 @@ import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor, -} from '@opentelemetry/tracing'; +} from '@opentelemetry/sdk-trace-base'; /** * This file is used to initialise a global tracing provider and span exporter