From c8b9393714e27443f44c9f24e30a85f54287b6ca Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 3 Feb 2025 15:01:03 -0600 Subject: [PATCH] Introduce component logger with appropriate attributes --- .chloggen/component-logger-api.yaml | 25 ++++++++ .chloggen/component-logger-memory.yaml | 25 ++++++++ .chloggen/component-logger-otlp.yaml | 25 ++++++++ .chloggen/component-logger.yaml | 26 ++++++++ cmd/otelcorecol/builder-config.yaml | 1 + cmd/otelcorecol/go.mod | 5 +- component/componentattribute/Makefile | 1 + component/componentattribute/attribute.go | 12 ++++ component/componentattribute/go.mod | 30 +++++++++ component/componentattribute/go.sum | 33 ++++++++++ component/componentattribute/logger.go | 52 +++++++++++++++ component/componentattribute/logger_test.go | 63 ++++++++++++++++++ internal/e2e/go.mod | 3 + otelcol/go.mod | 3 + otelcol/otelcoltest/go.mod | 3 + processor/memorylimiterprocessor/factory.go | 9 +++ processor/memorylimiterprocessor/go.mod | 5 ++ receiver/otlpreceiver/go.mod | 5 ++ receiver/otlpreceiver/otlp.go | 4 ++ service/extensions/extensions.go | 13 +++- service/go.mod | 3 + service/internal/attribute/attribute.go | 67 +++++++++----------- service/internal/attribute/attribute_test.go | 64 ++++++++++--------- service/internal/components/loggers.go | 58 ----------------- service/internal/components/package_test.go | 14 ---- service/internal/graph/capabilities.go | 7 +- service/internal/graph/connector.go | 13 ++-- service/internal/graph/exporter.go | 11 ++-- service/internal/graph/fanout.go | 7 +- service/internal/graph/graph_test.go | 6 +- service/internal/graph/node.go | 22 ------- service/internal/graph/processor.go | 11 ++-- service/internal/graph/receiver.go | 11 ++-- service/internal/graph/util_test.go | 5 +- 34 files changed, 440 insertions(+), 202 deletions(-) create mode 100644 .chloggen/component-logger-api.yaml create mode 100644 .chloggen/component-logger-memory.yaml create mode 100644 .chloggen/component-logger-otlp.yaml create mode 100644 .chloggen/component-logger.yaml create mode 100644 component/componentattribute/Makefile create mode 100644 component/componentattribute/attribute.go create mode 100644 component/componentattribute/go.mod create mode 100644 component/componentattribute/go.sum create mode 100644 component/componentattribute/logger.go create mode 100644 component/componentattribute/logger_test.go delete mode 100644 service/internal/components/loggers.go delete mode 100644 service/internal/components/package_test.go delete mode 100644 service/internal/graph/node.go diff --git a/.chloggen/component-logger-api.yaml b/.chloggen/component-logger-api.yaml new file mode 100644 index 00000000000..02d84010631 --- /dev/null +++ b/.chloggen/component-logger-api.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component/componentattribute + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New module codifies component attributes and provides a zap.Logger for components. + +# One or more tracking issues or pull requests related to the change +issues: [12217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/component-logger-memory.yaml b/.chloggen/component-logger-memory.yaml new file mode 100644 index 00000000000..8418f816b87 --- /dev/null +++ b/.chloggen/component-logger-memory.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Logger no longer attributes to single signal, pipeline, or component. + +# One or more tracking issues or pull requests related to the change +issues: [12217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/component-logger-otlp.yaml b/.chloggen/component-logger-otlp.yaml new file mode 100644 index 00000000000..585fd2d2b2c --- /dev/null +++ b/.chloggen/component-logger-otlp.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlpreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Logger no longer attributes to random signal when receiving multiple signals. + +# One or more tracking issues or pull requests related to the change +issues: [12217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/component-logger.yaml b/.chloggen/component-logger.yaml new file mode 100644 index 00000000000..7ce701d9fb8 --- /dev/null +++ b/.chloggen/component-logger.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Align component logger attributes with those defined in RFC + +# One or more tracking issues or pull requests related to the change +issues: [12217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + See [Pipeline Component Telemetry RFC](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md#attributes) + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index f98ba9eae43..6d7eb699403 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -41,6 +41,7 @@ replaces: - go.opentelemetry.io/collector/client => ../../client - go.opentelemetry.io/collector/component => ../../component - go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest + - go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute - go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus - go.opentelemetry.io/collector/config/configauth => ../../config/configauth - go.opentelemetry.io/collector/config/configcompression => ../../config/configcompression diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index c06fb37dd90..8be7ef27dd3 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -4,7 +4,7 @@ module go.opentelemetry.io/collector/cmd/otelcorecol go 1.22.0 -toolchain go1.22.11 +toolchain go1.23.5 require ( go.opentelemetry.io/collector/component v0.119.0 @@ -82,6 +82,7 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector v0.119.0 // indirect go.opentelemetry.io/collector/client v1.25.0 // indirect + go.opentelemetry.io/collector/component/componentattribute v0.119.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.119.0 // indirect go.opentelemetry.io/collector/component/componenttest v0.119.0 // indirect go.opentelemetry.io/collector/config/configauth v0.119.0 // indirect @@ -168,6 +169,8 @@ replace go.opentelemetry.io/collector/component => ../../component replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest +replace go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute + replace go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus replace go.opentelemetry.io/collector/config/configauth => ../../config/configauth diff --git a/component/componentattribute/Makefile b/component/componentattribute/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/component/componentattribute/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/component/componentattribute/attribute.go b/component/componentattribute/attribute.go new file mode 100644 index 00000000000..3fb20784b10 --- /dev/null +++ b/component/componentattribute/attribute.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package componentattribute // import "go.opentelemetry.io/collector/component/componentattribute" + +const ( + ComponentKindKey = "otelcol.component.kind" + ComponentIDKey = "otelcol.component.id" + PipelineIDKey = "otelcol.pipeline.id" + SignalKey = "otelcol.signal" + SignalOutputKey = "otelcol.signal.output" +) diff --git a/component/componentattribute/go.mod b/component/componentattribute/go.mod new file mode 100644 index 00000000000..5c404fe0275 --- /dev/null +++ b/component/componentattribute/go.mod @@ -0,0 +1,30 @@ +module go.opentelemetry.io/collector/component/componentattribute + +go 1.22.0 + +require ( + github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector/pipeline v0.119.0 + go.opentelemetry.io/otel v1.34.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry + +replace go.opentelemetry.io/collector/component => ../ + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/pipeline => ../../pipeline + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/component/componentattribute/go.sum b/component/componentattribute/go.sum new file mode 100644 index 00000000000..b679fbf9bf5 --- /dev/null +++ b/component/componentattribute/go.sum @@ -0,0 +1,33 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/component/componentattribute/logger.go b/component/componentattribute/logger.go new file mode 100644 index 00000000000..6844ecddaf2 --- /dev/null +++ b/component/componentattribute/logger.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package componentattribute // import "go.opentelemetry.io/collector/component/componentattribute" + +import ( + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var _ zapcore.Core = (*Core)(nil) + +type Core struct { + zapcore.Core + from *zap.Logger + attrs attribute.Set +} + +func NewLogger(from *zap.Logger, attrs *attribute.Set) *zap.Logger { + withAttributes := from + for _, kv := range attrs.ToSlice() { + withAttributes = withAttributes.With(zap.String(string(kv.Key), kv.Value.AsString())) + } + return zap.New(&Core{ + Core: withAttributes.Core(), + from: from, + attrs: *attrs, + }) +} + +func (l *Core) Without(keys ...string) *zap.Logger { + excludeKeys := make(map[string]struct{}) + for _, key := range keys { + excludeKeys[key] = struct{}{} + } + + newAttrs := []attribute.KeyValue{} + withAttributes := l.from + for _, kv := range l.attrs.ToSlice() { + if _, excluded := excludeKeys[string(kv.Key)]; !excluded { + newAttrs = append(newAttrs, kv) + withAttributes = withAttributes.With(zap.String(string(kv.Key), kv.Value.AsString())) + } + } + + return zap.New(&Core{ + Core: withAttributes.Core(), + from: l.from, + attrs: attribute.NewSet(newAttrs...), + }) +} diff --git a/component/componentattribute/logger_test.go b/component/componentattribute/logger_test.go new file mode 100644 index 00000000000..03155ae673b --- /dev/null +++ b/component/componentattribute/logger_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package componentattribute_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "go.opentelemetry.io/collector/component/componentattribute" + "go.opentelemetry.io/collector/pipeline" +) + +func TestLogger(t *testing.T) { + core, observed := observer.New(zap.DebugLevel) + logger := zap.New(core).With(zap.String("preexisting", "value")) + + attrs := attribute.NewSet( + attribute.String(componentattribute.SignalKey, pipeline.SignalLogs.String()), + attribute.String(componentattribute.ComponentIDKey, "filelog"), + ) + + parent := componentattribute.NewLogger(logger, &attrs) + parent.Info("test parent before child") + child := parent.Core().(*componentattribute.Core).Without(string(componentattribute.SignalKey)) + child.Info("test child") + parent.Info("test parent after child") + + observedLogs := observed.All() + require.Len(t, observedLogs, 3) + + parentContext := map[string]string{ + "preexisting": "value", + componentattribute.SignalKey: pipeline.SignalLogs.String(), + componentattribute.ComponentIDKey: "filelog", + } + childContext := map[string]string{ + "preexisting": "value", + componentattribute.ComponentIDKey: "filelog", + } + + require.Equal(t, "test parent before child", observedLogs[0].Message) + require.Len(t, observedLogs[0].Context, len(parentContext)) + for _, field := range observedLogs[0].Context { + require.Equal(t, parentContext[field.Key], field.String) + } + + require.Equal(t, "test child", observedLogs[1].Message) + require.Len(t, observedLogs[1].Context, len(childContext)) + for _, field := range observedLogs[1].Context { + require.Equal(t, childContext[field.Key], field.String) + } + + require.Equal(t, "test parent after child", observedLogs[2].Message) + require.Len(t, observedLogs[2].Context, len(parentContext)) + for _, field := range observedLogs[2].Context { + require.Equal(t, parentContext[field.Key], field.String) + } +} diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index 07447d0a7fc..184f34ef110 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -79,6 +79,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.25.0 // indirect + go.opentelemetry.io/collector/component/componentattribute v0.119.0 // indirect go.opentelemetry.io/collector/config/configauth v0.119.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.25.0 // indirect go.opentelemetry.io/collector/config/confignet v1.25.0 // indirect @@ -242,3 +243,5 @@ replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension replace go.opentelemetry.io/collector/extension/auth/authtest => ../../extension/auth/authtest replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xextension + +replace go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute diff --git a/otelcol/go.mod b/otelcol/go.mod index 06e5762c0f9..993f6cf4c0c 100644 --- a/otelcol/go.mod +++ b/otelcol/go.mod @@ -70,6 +70,7 @@ require ( github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/component/componentattribute v0.119.0 // indirect go.opentelemetry.io/collector/component/componenttest v0.119.0 // indirect go.opentelemetry.io/collector/connector/xconnector v0.119.0 // indirect go.opentelemetry.io/collector/consumer v1.25.0 // indirect @@ -204,3 +205,5 @@ replace go.opentelemetry.io/collector/extension/extensiontest => ../extension/ex replace go.opentelemetry.io/collector/extension/auth/authtest => ../extension/auth/authtest replace go.opentelemetry.io/collector/extension/xextension => ../extension/xextension + +replace go.opentelemetry.io/collector/component/componentattribute => ../component/componentattribute diff --git a/otelcol/otelcoltest/go.mod b/otelcol/otelcoltest/go.mod index 66904323ffd..e28fcc5123a 100644 --- a/otelcol/otelcoltest/go.mod +++ b/otelcol/otelcoltest/go.mod @@ -65,6 +65,7 @@ require ( github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/component/componentattribute v0.119.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.119.0 // indirect go.opentelemetry.io/collector/component/componenttest v0.119.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.119.0 // indirect @@ -219,3 +220,5 @@ replace go.opentelemetry.io/collector/confmap => ../../confmap replace go.opentelemetry.io/collector/processor/processortest => ../../processor/processortest replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata + +replace go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute diff --git a/processor/memorylimiterprocessor/factory.go b/processor/memorylimiterprocessor/factory.go index 03e18aeedc6..46884a2e76a 100644 --- a/processor/memorylimiterprocessor/factory.go +++ b/processor/memorylimiterprocessor/factory.go @@ -10,6 +10,7 @@ import ( "sync" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata" @@ -105,6 +106,14 @@ func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) return memLimiter, nil } + if c, ok := set.Logger.Core().(*componentattribute.Core); ok { + set.Logger = c.Without( + componentattribute.SignalKey, + componentattribute.PipelineIDKey, + componentattribute.ComponentIDKey, + ) + } + memLimiter, err := newMemoryLimiterProcessor(set, cfg.(*Config)) if err != nil { return nil, err diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index 61358e355e8..3493a44709d 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.119.0 + go.opentelemetry.io/collector/component/componentattribute v0.119.0 go.opentelemetry.io/collector/component/componenttest v0.119.0 go.opentelemetry.io/collector/confmap v1.25.0 go.opentelemetry.io/collector/consumer v1.25.0 @@ -103,3 +104,7 @@ replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/memorylimiter => ../../internal/memorylimiter replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror + +replace go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index f6f19494d6c..791f7483c20 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -8,6 +8,7 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector v0.119.0 go.opentelemetry.io/collector/component v0.119.0 + go.opentelemetry.io/collector/component/componentattribute v0.119.0 go.opentelemetry.io/collector/component/componentstatus v0.119.0 go.opentelemetry.io/collector/component/componenttest v0.119.0 go.opentelemetry.io/collector/config/configauth v0.119.0 @@ -121,6 +122,8 @@ replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/co replace go.opentelemetry.io/collector/client => ../../client +replace go.opentelemetry.io/collector/component/componentattribute => ../../component/componentattribute + replace go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus replace go.opentelemetry.io/collector/receiver/xreceiver => ../xreceiver @@ -139,3 +142,5 @@ retract ( ) replace go.opentelemetry.io/collector/extension/auth/authtest => ../../extension/auth/authtest + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 0f8d3c66378..379cbdb11b7 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" @@ -52,6 +53,9 @@ type otlpReceiver struct { // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. func newOtlpReceiver(cfg *Config, set *receiver.Settings) (*otlpReceiver, error) { + if c, ok := set.Logger.Core().(*componentattribute.Core); ok { + set.Logger = c.Without(componentattribute.SignalKey) + } r := &otlpReceiver{ cfg: cfg, nextTraces: nil, diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index fcefcb33f20..b51e8911855 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -13,12 +13,13 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -38,7 +39,10 @@ type Extensions struct { func (bes *Extensions) Start(ctx context.Context, host component.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for _, extID := range bes.extensionIDs { - extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) + extLogger := componentattribute.NewLogger( + bes.telemetry.Logger, + attribute.Extension(extID).Set(), + ) extLogger.Info("Extension is starting...") instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] @@ -216,7 +220,10 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext BuildInfo: set.BuildInfo, ModuleInfo: set.ModuleInfo, } - extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) + extSet.TelemetrySettings.Logger = componentattribute.NewLogger( + set.Telemetry.Logger, + attribute.Extension(extID).Set(), + ) ext, err := set.Extensions.Create(ctx, extSet) if err != nil { diff --git a/service/go.mod b/service/go.mod index d706b1de331..fb5631da67c 100644 --- a/service/go.mod +++ b/service/go.mod @@ -10,6 +10,7 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector v0.119.0 go.opentelemetry.io/collector/component v0.119.0 + go.opentelemetry.io/collector/component/componentattribute v0.119.0 go.opentelemetry.io/collector/component/componentstatus v0.119.0 go.opentelemetry.io/collector/component/componenttest v0.119.0 go.opentelemetry.io/collector/config/confighttp v0.119.0 @@ -136,6 +137,8 @@ replace go.opentelemetry.io/collector/component => ../component replace go.opentelemetry.io/collector/component/componenttest => ../component/componenttest +replace go.opentelemetry.io/collector/component/componentattribute => ../component/componentattribute + replace go.opentelemetry.io/collector/component/componentstatus => ../component/componentstatus replace go.opentelemetry.io/collector/pdata => ../pdata diff --git a/service/internal/attribute/attribute.go b/service/internal/attribute/attribute.go index 7bfdf217961..0dcd0cfc35b 100644 --- a/service/internal/attribute/attribute.go +++ b/service/internal/attribute/attribute.go @@ -9,16 +9,11 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/pipeline" ) const ( - componentKindKey = "otelcol.component.kind" - componentIDKey = "otelcol.component.id" - pipelineIDKey = "otelcol.pipeline.id" - signalKey = "otelcol.signal" - signalOutputKey = "otelcol.signal.output" - capabiltiesKind = "capabilities" fanoutKind = "fanout" ) @@ -28,18 +23,18 @@ type Attributes struct { id int64 } -func newAttributes(attrs ...attribute.KeyValue) *Attributes { +func newAttributes(attrs ...attribute.KeyValue) Attributes { h := fnv.New64a() for _, kv := range attrs { h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")")) } - return &Attributes{ + return Attributes{ set: attribute.NewSet(attrs...), id: int64(h.Sum64()), // #nosec G115 } } -func (a Attributes) Attributes() *attribute.Set { +func (a Attributes) Set() *attribute.Set { return &a.set } @@ -47,57 +42,57 @@ func (a Attributes) ID() int64 { return a.id } -func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { +func Receiver(pipelineType pipeline.Signal, id component.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, component.KindReceiver.String()), - attribute.String(signalKey, pipelineType.String()), - attribute.String(componentIDKey, id.String()), + attribute.String(componentattribute.ComponentKindKey, component.KindReceiver.String()), + attribute.String(componentattribute.SignalKey, pipelineType.String()), + attribute.String(componentattribute.ComponentIDKey, id.String()), ) } -func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { +func Processor(pipelineID pipeline.ID, id component.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, component.KindProcessor.String()), - attribute.String(signalKey, pipelineID.Signal().String()), - attribute.String(pipelineIDKey, pipelineID.String()), - attribute.String(componentIDKey, id.String()), + attribute.String(componentattribute.ComponentKindKey, component.KindProcessor.String()), + attribute.String(componentattribute.SignalKey, pipelineID.Signal().String()), + attribute.String(componentattribute.PipelineIDKey, pipelineID.String()), + attribute.String(componentattribute.ComponentIDKey, id.String()), ) } -func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { +func Exporter(pipelineType pipeline.Signal, id component.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, component.KindExporter.String()), - attribute.String(signalKey, pipelineType.String()), - attribute.String(componentIDKey, id.String()), + attribute.String(componentattribute.ComponentKindKey, component.KindExporter.String()), + attribute.String(componentattribute.SignalKey, pipelineType.String()), + attribute.String(componentattribute.ComponentIDKey, id.String()), ) } -func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { +func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, component.KindConnector.String()), - attribute.String(signalKey, exprPipelineType.String()), - attribute.String(signalOutputKey, rcvrPipelineType.String()), - attribute.String(componentIDKey, id.String()), + attribute.String(componentattribute.ComponentKindKey, component.KindConnector.String()), + attribute.String(componentattribute.SignalKey, exprPipelineType.String()), + attribute.String(componentattribute.SignalOutputKey, rcvrPipelineType.String()), + attribute.String(componentattribute.ComponentIDKey, id.String()), ) } -func Capabilities(pipelineID pipeline.ID) *Attributes { +func Extension(id component.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, capabiltiesKind), - attribute.String(pipelineIDKey, pipelineID.String()), + attribute.String(componentattribute.ComponentKindKey, component.KindExtension.String()), + attribute.String(componentattribute.ComponentIDKey, id.String()), ) } -func Fanout(pipelineID pipeline.ID) *Attributes { +func Capabilities(pipelineID pipeline.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, fanoutKind), - attribute.String(pipelineIDKey, pipelineID.String()), + attribute.String(componentattribute.ComponentKindKey, capabiltiesKind), + attribute.String(componentattribute.PipelineIDKey, pipelineID.String()), ) } -func Extension(id component.ID) *Attributes { +func Fanout(pipelineID pipeline.ID) Attributes { return newAttributes( - attribute.String(componentKindKey, component.KindExtension.String()), - attribute.String(componentIDKey, id.String()), + attribute.String(componentattribute.ComponentKindKey, fanoutKind), + attribute.String(componentattribute.PipelineIDKey, pipelineID.String()), ) } diff --git a/service/internal/attribute/attribute_test.go b/service/internal/attribute/attribute_test.go index 6025f77a20b..eb8eb6ea594 100644 --- a/service/internal/attribute/attribute_test.go +++ b/service/internal/attribute/attribute_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package attribute +package attribute_test import ( "testing" @@ -9,8 +9,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" ) var ( @@ -42,16 +44,16 @@ var ( func TestReceiver(t *testing.T) { for _, sig := range signals { for _, id := range cIDs { - r := Receiver(sig, id) - componentKind, ok := r.Attributes().Value(componentKindKey) + r := attribute.Receiver(sig, id) + componentKind, ok := r.Set().Value(componentattribute.ComponentKindKey) require.True(t, ok) require.Equal(t, component.KindReceiver.String(), componentKind.AsString()) - signal, ok := r.Attributes().Value(signalKey) + signal, ok := r.Set().Value(componentattribute.SignalKey) require.True(t, ok) require.Equal(t, sig.String(), signal.AsString()) - componentID, ok := r.Attributes().Value(componentIDKey) + componentID, ok := r.Set().Value(componentattribute.ComponentIDKey) require.True(t, ok) require.Equal(t, id.String(), componentID.AsString()) } @@ -61,16 +63,16 @@ func TestReceiver(t *testing.T) { func TestProcessor(t *testing.T) { for _, pID := range pIDs { for _, id := range cIDs { - p := Processor(pID, id) - componentKind, ok := p.Attributes().Value(componentKindKey) + p := attribute.Processor(pID, id) + componentKind, ok := p.Set().Value(componentattribute.ComponentKindKey) require.True(t, ok) require.Equal(t, component.KindProcessor.String(), componentKind.AsString()) - pipelineID, ok := p.Attributes().Value(pipelineIDKey) + pipelineID, ok := p.Set().Value(componentattribute.PipelineIDKey) require.True(t, ok) require.Equal(t, pID.String(), pipelineID.AsString()) - componentID, ok := p.Attributes().Value(componentIDKey) + componentID, ok := p.Set().Value(componentattribute.ComponentIDKey) require.True(t, ok) require.Equal(t, id.String(), componentID.AsString()) } @@ -80,16 +82,16 @@ func TestProcessor(t *testing.T) { func TestExporter(t *testing.T) { for _, sig := range signals { for _, id := range cIDs { - e := Exporter(sig, id) - componentKind, ok := e.Attributes().Value(componentKindKey) + e := attribute.Exporter(sig, id) + componentKind, ok := e.Set().Value(componentattribute.ComponentKindKey) require.True(t, ok) require.Equal(t, component.KindExporter.String(), componentKind.AsString()) - signal, ok := e.Attributes().Value(signalKey) + signal, ok := e.Set().Value(componentattribute.SignalKey) require.True(t, ok) require.Equal(t, sig.String(), signal.AsString()) - componentID, ok := e.Attributes().Value(componentIDKey) + componentID, ok := e.Set().Value(componentattribute.ComponentIDKey) require.True(t, ok) require.Equal(t, id.String(), componentID.AsString()) } @@ -100,20 +102,20 @@ func TestConnector(t *testing.T) { for _, exprSig := range signals { for _, rcvrSig := range signals { for _, id := range cIDs { - c := Connector(exprSig, rcvrSig, id) - componentKind, ok := c.Attributes().Value(componentKindKey) + c := attribute.Connector(exprSig, rcvrSig, id) + componentKind, ok := c.Set().Value(componentattribute.ComponentKindKey) require.True(t, ok) require.Equal(t, component.KindConnector.String(), componentKind.AsString()) - signal, ok := c.Attributes().Value(signalKey) + signal, ok := c.Set().Value(componentattribute.SignalKey) require.True(t, ok) require.Equal(t, exprSig.String(), signal.AsString()) - signalOutput, ok := c.Attributes().Value(signalOutputKey) + signalOutput, ok := c.Set().Value(componentattribute.SignalOutputKey) require.True(t, ok) require.Equal(t, rcvrSig.String(), signalOutput.AsString()) - componentID, ok := c.Attributes().Value(componentIDKey) + componentID, ok := c.Set().Value(componentattribute.ComponentIDKey) require.True(t, ok) require.Equal(t, id.String(), componentID.AsString()) } @@ -122,8 +124,8 @@ func TestConnector(t *testing.T) { } func TestExtension(t *testing.T) { - e := Extension(component.MustNewID("foo")) - componentKind, ok := e.Attributes().Value(componentKindKey) + e := attribute.Extension(component.MustNewID("foo")) + componentKind, ok := e.Set().Value(componentattribute.ComponentKindKey) require.True(t, ok) require.Equal(t, component.KindExtension.String(), componentKind.AsString()) } @@ -137,36 +139,38 @@ func TestSetEquality(t *testing.T) { for j, ej := range setJ { if i == j { require.Equal(t, ei.ID(), ej.ID()) - require.True(t, ei.Attributes().Equals(ej.Attributes())) + si, sj := ei.Set(), ej.Set() + require.True(t, si.Equals(sj)) } else { require.NotEqual(t, ei.ID(), ej.ID()) - require.False(t, ei.Attributes().Equals(ej.Attributes())) + si, sj := ei.Set(), ej.Set() + require.False(t, si.Equals(sj)) } } } } -func createExampleSets() []*Attributes { - sets := []*Attributes{} +func createExampleSets() []attribute.Attributes { + sets := []attribute.Attributes{} // Receiver examples. for _, sig := range signals { for _, id := range cIDs { - sets = append(sets, Receiver(sig, id)) + sets = append(sets, attribute.Receiver(sig, id)) } } // Processor examples. for _, pID := range pIDs { for _, cID := range cIDs { - sets = append(sets, Processor(pID, cID)) + sets = append(sets, attribute.Processor(pID, cID)) } } // Exporter examples. for _, sig := range signals { for _, id := range cIDs { - sets = append(sets, Exporter(sig, id)) + sets = append(sets, attribute.Exporter(sig, id)) } } @@ -174,19 +178,19 @@ func createExampleSets() []*Attributes { for _, exprSig := range signals { for _, rcvrSig := range signals { for _, id := range cIDs { - sets = append(sets, Connector(exprSig, rcvrSig, id)) + sets = append(sets, attribute.Connector(exprSig, rcvrSig, id)) } } } // Capabilities examples. for _, pID := range pIDs { - sets = append(sets, Capabilities(pID)) + sets = append(sets, attribute.Capabilities(pID)) } // Fanout examples. for _, pID := range pIDs { - sets = append(sets, Fanout(pID)) + sets = append(sets, attribute.Fanout(pID)) } return sets diff --git a/service/internal/components/loggers.go b/service/internal/components/loggers.go deleted file mode 100644 index f02d19fb082..00000000000 --- a/service/internal/components/loggers.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components // import "go.opentelemetry.io/collector/service/internal/components" - -import ( - "strings" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" -) - -const ( - zapKindKey = "kind" - zapNameKey = "name" - zapDataTypeKey = "data_type" - zapStabilityKey = "stability" - zapPipelineKey = "pipeline" - zapExporterInPipeline = "exporter_in_pipeline" - zapReceiverInPipeline = "receiver_in_pipeline" -) - -func ReceiverLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindReceiver.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapDataTypeKey, dt.String())) -} - -func ProcessorLogger(logger *zap.Logger, id component.ID, pipelineID pipeline.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindProcessor.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapPipelineKey, pipelineID.String())) -} - -func ExporterLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExporter.String())), - zap.String(zapDataTypeKey, dt.String()), - zap.String(zapNameKey, id.String())) -} - -func ExtensionLogger(logger *zap.Logger, id component.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExtension.String())), - zap.String(zapNameKey, id.String())) -} - -func ConnectorLogger(logger *zap.Logger, id component.ID, expDT, rcvDT pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindConnector.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapExporterInPipeline, expDT.String()), - zap.String(zapReceiverInPipeline, rcvDT.String())) -} diff --git a/service/internal/components/package_test.go b/service/internal/components/package_test.go deleted file mode 100644 index 30b5a82311a..00000000000 --- a/service/internal/components/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index 128e6fde926..8b013ae1623 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,10 +7,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/attribute" ) -const capabilitiesSeed = "capabilities" - var _ consumerNode = (*capabilitiesNode)(nil) // Every pipeline has a "virtual" capabilities node immediately after the receiver(s). @@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil) // 2. Present a consistent "first consumer" for each pipeline. // The nodeID is derived from "pipeline ID". type capabilitiesNode struct { - nodeID + attribute.Attributes pipelineID pipeline.ID baseConsumer consumer.ConsumeTracesFunc @@ -30,7 +29,7 @@ type capabilitiesNode struct { func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { return &capabilitiesNode{ - nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), + Attributes: attribute.Capabilities(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index 1f654454ee6..dff3e55c480 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -7,25 +7,22 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/components" ) -const connectorSeed = "connector" - var _ consumerNode = (*connectorNode)(nil) -// A connector instance connects one pipeline type to one other pipeline type. -// Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID". type connectorNode struct { - nodeID + attribute.Attributes componentID component.ID exprPipelineType pipeline.Signal rcvrPipelineType pipeline.Signal @@ -34,7 +31,7 @@ type connectorNode struct { func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode { return &connectorNode{ - nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()), + Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID), componentID: connID, exprPipelineType: exprPipelineType, rcvrPipelineType: rcvrPipelineType, @@ -52,7 +49,7 @@ func (n *connectorNode) buildComponent( builder *builders.ConnectorBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) + tel.Logger = componentattribute.NewLogger(tel.Logger, n.Attributes.Set()) set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} switch n.rcvrPipelineType { case pipeline.SignalTraces: diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index ab7d0f6392b..e36536df0ec 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -8,21 +8,20 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" ) -const exporterSeed = "exporter" - var _ consumerNode = (*exporterNode)(nil) // An exporter instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type exporterNode struct { - nodeID + attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -30,7 +29,7 @@ type exporterNode struct { func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { return &exporterNode{ - nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), + Attributes: attribute.Exporter(pipelineType, exprID), componentID: exprID, pipelineType: pipelineType, } @@ -46,7 +45,7 @@ func (n *exporterNode) buildComponent( info component.BuildInfo, builder *builders.ExporterBuilder, ) error { - tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = componentattribute.NewLogger(tel.Logger, n.Attributes.Set()) set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineType { diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 13c8d4ad1c5..284eb4405a5 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/attribute" ) -const fanOutToExporters = "fanout_to_exporters" - var _ consumerNode = (*fanOutNode)(nil) // Each pipeline has one fan-out node before exporters. // Therefore, nodeID is derived from "pipeline ID". type fanOutNode struct { - nodeID + attribute.Attributes pipelineID pipeline.ID baseConsumer } func newFanOutNode(pipelineID pipeline.ID) *fanOutNode { return &fanOutNode{ - nodeID: newNodeID(fanOutToExporters, pipelineID.String()), + Attributes: attribute.Fanout(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index be4a98e4135..80f46c9f115 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2231,11 +2231,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn1" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/2" -> ` + `connector "nop/conn" (traces to traces) -> ` + `processor "nop" in pipeline "traces/1" -> ` + - `connector "nop/conn1" (traces to traces)`, + `connector "nop/conn1" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/2" -> ` + + `connector "nop/conn" (traces to traces)`, }, { name: "not_allowed_deep_cycle_metrics.yaml", diff --git a/service/internal/graph/node.go b/service/internal/graph/node.go deleted file mode 100644 index 81946a79df4..00000000000 --- a/service/internal/graph/node.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package graph // import "go.opentelemetry.io/collector/service/internal/graph" - -import ( - "hash/fnv" - "strings" -) - -type nodeID int64 - -func (n nodeID) ID() int64 { - return int64(n) -} - -func newNodeID(parts ...string) nodeID { - h := fnv.New64a() - h.Write([]byte(strings.Join(parts, "|"))) - //nolint:gosec - return nodeID(h.Sum64()) -} diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index 3288a505d80..715a8901271 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -8,23 +8,22 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" ) -const processorSeed = "processor" - var _ consumerNode = (*processorNode)(nil) // Every processor instance is unique to one pipeline. // Therefore, nodeID is derived from "pipeline ID" and "component ID". type processorNode struct { - nodeID + attribute.Attributes componentID component.ID pipelineID pipeline.ID component.Component @@ -32,7 +31,7 @@ type processorNode struct { func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode { return &processorNode{ - nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()), + Attributes: attribute.Processor(pipelineID, procID), componentID: procID, pipelineID: pipelineID, } @@ -48,7 +47,7 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *builders.ProcessorBuilder, next baseConsumer, ) error { - tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) + tel.Logger = componentattribute.NewLogger(tel.Logger, n.Attributes.Set()) set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineID.Signal() { diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 48f7a36d148..095e15d98de 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -8,22 +8,21 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentattribute" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" ) -const receiverSeed = "receiver" - // A receiver instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type receiverNode struct { - nodeID + attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -31,7 +30,7 @@ type receiverNode struct { func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { return &receiverNode{ - nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()), + Attributes: attribute.Receiver(pipelineType, recvID), componentID: recvID, pipelineType: pipelineType, } @@ -43,7 +42,7 @@ func (n *receiverNode) buildComponent(ctx context.Context, builder *builders.ReceiverBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = componentattribute.NewLogger(tel.Logger, n.Attributes.Set()) set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineType { diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 60be0a3d57a..5e27bf6f340 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -6,6 +6,7 @@ package graph import ( "context" "errors" + "hash/fnv" "sync" "go.opentelemetry.io/collector/component" @@ -37,7 +38,9 @@ type testNode struct { // ID satisfies the graph.Node interface, allowing // testNode to be used in a simple.DirectedGraph func (n *testNode) ID() int64 { - return int64(newNodeID(n.id.String())) + h := fnv.New64a() + h.Write([]byte(n.id.String())) + return int64(h.Sum64()) // #nosec G115 } func (n *testNode) Start(ctx context.Context, _ component.Host) error {