diff --git a/exporter/exporterhelper/documentation.md b/exporter/exporterhelper/documentation.md index a82163a2bfa..6ee750deb9a 100644 --- a/exporter/exporterhelper/documentation.md +++ b/exporter/exporterhelper/documentation.md @@ -78,6 +78,14 @@ Number of log record successfully sent to destination. | ---- | ----------- | ---------- | --------- | | {records} | Sum | Int | true | +### otelcol_exporter_sent_log_records_bytes + +Bytes of log records successfully sent to destination. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | + ### otelcol_exporter_sent_metric_points Number of metric points successfully sent to destination. @@ -86,6 +94,14 @@ Number of metric points successfully sent to destination. | ---- | ----------- | ---------- | --------- | | {datapoints} | Sum | Int | true | +### otelcol_exporter_sent_metric_points_bytes + +Bytes of metric point successfully sent to destination. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | + ### otelcol_exporter_sent_spans Number of spans successfully sent to destination. @@ -93,3 +109,11 @@ Number of spans successfully sent to destination. | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | + +### otelcol_exporter_sent_spans_bytes + +Bytes of spans successfully sent to destination. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles.go b/exporter/exporterhelper/exporterhelperprofiles/profiles.go index 069613dbed3..347c81bc99f 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles.go @@ -28,12 +28,14 @@ var profilesUnmarshaler = &pprofile.ProtoUnmarshaler{} type profilesRequest struct { pd pprofile.Profiles pusher consumerprofiles.ConsumeProfilesFunc + sizer pprofile.Sizer } func newProfilesRequest(pd pprofile.Profiles, pusher consumerprofiles.ConsumeProfilesFunc) exporterhelper.Request { return &profilesRequest{ pd: pd, pusher: pusher, + sizer: &pprofile.ProtoMarshaler{}, } } @@ -67,6 +69,10 @@ func (req *profilesRequest) ItemsCount() int { return req.pd.SampleCount() } +func (req *profilesRequest) BytesSize() int { + return req.sizer.ProfilesSize(req.pd) +} + type profileExporter struct { *internal.BaseExporter consumerprofiles.Profiles @@ -157,6 +163,6 @@ func (tewo *profilesExporterWithObservability) Send(ctx context.Context, req exp numSamples := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). err := tewo.NextSender.Send(c, req) - tewo.obsrep.EndProfilesOp(c, numSamples, err) + tewo.obsrep.EndProfilesOp(c, numSamples, req.BytesSize(), err) return err } diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go index 9674e2d3fd4..3b4f7633345 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go @@ -156,6 +156,10 @@ func (req *dummyRequest) ItemsCount() int { return 1 } +func (req *dummyRequest) BytesSize() int { + return 0 +} + func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index b61d6cda6f2..73fcd002ce6 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -39,8 +39,11 @@ type TelemetryBuilder struct { ExporterSendFailedMetricPoints metric.Int64Counter ExporterSendFailedSpans metric.Int64Counter ExporterSentLogRecords metric.Int64Counter + ExporterSentLogRecordsBytes metric.Int64Counter ExporterSentMetricPoints metric.Int64Counter + ExporterSentMetricPointsBytes metric.Int64Counter ExporterSentSpans metric.Int64Counter + ExporterSentSpansBytes metric.Int64Counter meters map[configtelemetry.Level]metric.Meter } @@ -142,17 +145,35 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{records}"), ) errs = errors.Join(errs, err) + builder.ExporterSentLogRecordsBytes, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_exporter_sent_log_records_bytes", + metric.WithDescription("Bytes of log records successfully sent to destination."), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) builder.ExporterSentMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_exporter_sent_metric_points", metric.WithDescription("Number of metric points successfully sent to destination."), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) + builder.ExporterSentMetricPointsBytes, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_exporter_sent_metric_points_bytes", + metric.WithDescription("Bytes of metric point successfully sent to destination."), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) builder.ExporterSentSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_exporter_sent_spans", metric.WithDescription("Number of spans successfully sent to destination."), metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ExporterSentSpansBytes, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_exporter_sent_spans_bytes", + metric.WithDescription("Bytes of spans successfully sent to destination."), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/exporter/exporterhelper/internal/obsexporter.go b/exporter/exporterhelper/internal/obsexporter.go index 004e5c48248..ac1ee68be40 100644 --- a/exporter/exporterhelper/internal/obsexporter.go +++ b/exporter/exporterhelper/internal/obsexporter.go @@ -57,10 +57,10 @@ func (or *ObsReport) StartTracesOp(ctx context.Context) context.Context { } // EndTracesOp completes the export operation that was started with startTracesOp. -func (or *ObsReport) EndTracesOp(ctx context.Context, numSpans int, err error) { +func (or *ObsReport) EndTracesOp(ctx context.Context, numSpans, bytesSpans int, err error) { numSent, numFailedToSend := toNumItems(numSpans, err) - or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalTraces, numSent, numFailedToSend) - endSpan(ctx, err, numSent, numFailedToSend, SentSpansKey, FailedToSendSpansKey) + or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalTraces, numSent, int64(bytesSpans), numFailedToSend) + endSpan(ctx, err, numSent, int64(bytesSpans), numFailedToSend, SentSpansKey, SentSpansBytesKey, FailedToSendSpansKey) } // StartMetricsOp is called at the start of an Export operation. @@ -74,10 +74,10 @@ func (or *ObsReport) StartMetricsOp(ctx context.Context) context.Context { // startMetricsOp. // // If needed, report your use case in https://github.com/open-telemetry/opentelemetry-collector/issues/10592. -func (or *ObsReport) EndMetricsOp(ctx context.Context, numMetricPoints int, err error) { +func (or *ObsReport) EndMetricsOp(ctx context.Context, numMetricPoints, bytesMetricPoints int, err error) { numSent, numFailedToSend := toNumItems(numMetricPoints, err) - or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalMetrics, numSent, numFailedToSend) - endSpan(ctx, err, numSent, numFailedToSend, SentMetricPointsKey, FailedToSendMetricPointsKey) + or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalMetrics, numSent, int64(bytesMetricPoints), numFailedToSend) + endSpan(ctx, err, numSent, int64(bytesMetricPoints), numFailedToSend, SentMetricPointsKey, SentMetricPointsBytesKey, FailedToSendMetricPointsKey) } // StartLogsOp is called at the start of an Export operation. @@ -88,10 +88,10 @@ func (or *ObsReport) StartLogsOp(ctx context.Context) context.Context { } // EndLogsOp completes the export operation that was started with startLogsOp. -func (or *ObsReport) EndLogsOp(ctx context.Context, numLogRecords int, err error) { +func (or *ObsReport) EndLogsOp(ctx context.Context, numLogRecords int, bytesLogRecords int, err error) { numSent, numFailedToSend := toNumItems(numLogRecords, err) - or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalLogs, numSent, numFailedToSend) - endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey) + or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalLogs, numSent, int64(bytesLogRecords), numFailedToSend) + endSpan(ctx, err, numSent, int64(bytesLogRecords), numFailedToSend, SentLogRecordsKey, SentLogRecordsBytesKey, FailedToSendLogRecordsKey) } // StartProfilesOp is called at the start of an Export operation. @@ -102,9 +102,9 @@ func (or *ObsReport) StartProfilesOp(ctx context.Context) context.Context { } // EndProfilesOp completes the export operation that was started with startProfilesOp. -func (or *ObsReport) EndProfilesOp(ctx context.Context, numSpans int, err error) { +func (or *ObsReport) EndProfilesOp(ctx context.Context, numSpans, byteSpans int, err error) { numSent, numFailedToSend := toNumItems(numSpans, err) - endSpan(ctx, err, numSent, numFailedToSend, SentSamplesKey, FailedToSendSamplesKey) + endSpan(ctx, err, numSent, int64(byteSpans), numFailedToSend, SentSamplesKey, SentSamplesBytesKey, FailedToSendSamplesKey) } // startOp creates the span used to trace the operation. Returning @@ -115,30 +115,35 @@ func (or *ObsReport) startOp(ctx context.Context, operationSuffix string) contex return ctx } -func (or *ObsReport) recordMetrics(ctx context.Context, signal pipeline.Signal, sent, failed int64) { - var sentMeasure, failedMeasure metric.Int64Counter +func (or *ObsReport) recordMetrics(ctx context.Context, signal pipeline.Signal, sent, bytes, failed int64) { + var sentMeasure, bytesMeasure, failedMeasure metric.Int64Counter switch signal { case pipeline.SignalTraces: sentMeasure = or.TelemetryBuilder.ExporterSentSpans + bytesMeasure = or.TelemetryBuilder.ExporterSentSpansBytes failedMeasure = or.TelemetryBuilder.ExporterSendFailedSpans case pipeline.SignalMetrics: sentMeasure = or.TelemetryBuilder.ExporterSentMetricPoints + bytesMeasure = or.TelemetryBuilder.ExporterSentMetricPointsBytes failedMeasure = or.TelemetryBuilder.ExporterSendFailedMetricPoints case pipeline.SignalLogs: sentMeasure = or.TelemetryBuilder.ExporterSentLogRecords + bytesMeasure = or.TelemetryBuilder.ExporterSentLogRecordsBytes failedMeasure = or.TelemetryBuilder.ExporterSendFailedLogRecords } sentMeasure.Add(ctx, sent, or.otelAttrs) + bytesMeasure.Add(ctx, bytes, or.otelAttrs) failedMeasure.Add(ctx, failed, or.otelAttrs) } -func endSpan(ctx context.Context, err error, numSent, numFailedToSend int64, sentItemsKey, failedToSendItemsKey string) { +func endSpan(ctx context.Context, err error, numSent, bytesSent, numFailedToSend int64, sentItemsKey, sentBytesKey, failedToSendItemsKey string) { span := trace.SpanFromContext(ctx) // End the span according to errors. if span.IsRecording() { span.SetAttributes( attribute.Int64(sentItemsKey, numSent), + attribute.Int64(sentBytesKey, bytesSent), attribute.Int64(failedToSendItemsKey, numFailedToSend), ) if err != nil { diff --git a/exporter/exporterhelper/internal/obsexporter_test.go b/exporter/exporterhelper/internal/obsexporter_test.go index cafe163581c..87419086b1f 100644 --- a/exporter/exporterhelper/internal/obsexporter_test.go +++ b/exporter/exporterhelper/internal/obsexporter_test.go @@ -42,7 +42,7 @@ func TestExportTraceDataOp(t *testing.T) { for i := range params { ctx := obsrep.StartTracesOp(parentCtx) assert.NotNil(t, ctx) - obsrep.EndTracesOp(ctx, params[i].items, params[i].err) + obsrep.EndTracesOp(ctx, params[i].items, 0, params[i].err) } spans := tt.SpanRecorder.Ended() @@ -91,7 +91,7 @@ func TestExportMetricsOp(t *testing.T) { ctx := obsrep.StartMetricsOp(parentCtx) assert.NotNil(t, ctx) - obsrep.EndMetricsOp(ctx, params[i].items, params[i].err) + obsrep.EndMetricsOp(ctx, params[i].items, 0, params[i].err) } spans := tt.SpanRecorder.Ended() @@ -140,7 +140,7 @@ func TestExportLogsOp(t *testing.T) { ctx := obsrep.StartLogsOp(parentCtx) assert.NotNil(t, ctx) - obsrep.EndLogsOp(ctx, params[i].items, params[i].err) + obsrep.EndLogsOp(ctx, params[i].items, 0, params[i].err) } spans := tt.SpanRecorder.Ended() @@ -182,7 +182,7 @@ func TestCheckExporterTracesViews(t *testing.T) { require.NoError(t, err) ctx := obsrep.StartTracesOp(context.Background()) require.NotNil(t, ctx) - obsrep.EndTracesOp(ctx, 7, nil) + obsrep.EndTracesOp(ctx, 7, 0, nil) require.NoError(t, tt.CheckExporterTraces(7, 0)) require.Error(t, tt.CheckExporterTraces(7, 7)) @@ -202,7 +202,7 @@ func TestCheckExporterMetricsViews(t *testing.T) { require.NoError(t, err) ctx := obsrep.StartMetricsOp(context.Background()) require.NotNil(t, ctx) - obsrep.EndMetricsOp(ctx, 7, nil) + obsrep.EndMetricsOp(ctx, 7, 0, nil) require.NoError(t, tt.CheckExporterMetrics(7, 0)) require.Error(t, tt.CheckExporterMetrics(7, 7)) @@ -222,7 +222,7 @@ func TestCheckExporterLogsViews(t *testing.T) { require.NoError(t, err) ctx := obsrep.StartLogsOp(context.Background()) require.NotNil(t, ctx) - obsrep.EndLogsOp(ctx, 7, nil) + obsrep.EndLogsOp(ctx, 7, 0, nil) require.NoError(t, tt.CheckExporterLogs(7, 0)) require.Error(t, tt.CheckExporterLogs(7, 7)) diff --git a/exporter/exporterhelper/internal/obsmetrics.go b/exporter/exporterhelper/internal/obsmetrics.go index ae9e89942b3..ad6b7f088df 100644 --- a/exporter/exporterhelper/internal/obsmetrics.go +++ b/exporter/exporterhelper/internal/obsmetrics.go @@ -15,11 +15,16 @@ const ( // SentSpansKey used to track spans sent by exporters. SentSpansKey = "sent_spans" + // SentSpansBytesKey used to track spans bytes sent by exporters. + SentSpansBytesKey = "sent_spans_bytes" // FailedToSendSpansKey used to track spans that failed to be sent by exporters. FailedToSendSpansKey = "send_failed_spans" - + // SentLogRecordsBytesKey used to track logs bytes sent by exporters. + SentLogRecordsBytesKey = "sent_log_records_bytes" // SentMetricPointsKey used to track metric points sent by exporters. SentMetricPointsKey = "sent_metric_points" + // SentMetricPointsBytesKey used to track metric points bytes sent by exporters. + SentMetricPointsBytesKey = "sent_metric_points_bytes" // FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters. FailedToSendMetricPointsKey = "send_failed_metric_points" @@ -30,6 +35,8 @@ const ( // SentSamplesKey used to track profiles samples sent by exporters. SentSamplesKey = "sent_samples" + // SentSamplesBytesKey used to track profiles samples bytes sent by exporters. + SentSamplesBytesKey = "sent_samples_bytes" // FailedToSendSamplesKey used to track samples that failed to be sent by exporters. FailedToSendSamplesKey = "send_failed_samples" diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 7f71d7e94ea..68482b3e205 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int { return r.items } +func (r *fakeRequest) BytesSize() int { + return 0 +} + func (r *fakeRequest) Merge(_ context.Context, r2 internal.Request) (internal.Request, error) { if r == nil { diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 9ebf4b1f5ad..6d64d7e6a87 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -349,6 +349,10 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } +func (mer *mockErrorRequest) BytesSize() int { + return 0 +} + func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) { return nil, nil } @@ -399,6 +403,10 @@ func (m *mockRequest) ItemsCount() int { return m.cnt } +func (m *mockRequest) BytesSize() int { + return 0 +} + func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 772a5673e24..3a8b69280c9 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -26,12 +26,14 @@ var logsUnmarshaler = &plog.ProtoUnmarshaler{} type logsRequest struct { ld plog.Logs pusher consumer.ConsumeLogsFunc + sizer plog.Sizer } func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { return &logsRequest{ ld: ld, pusher: pusher, + sizer: &plog.ProtoMarshaler{}, } } @@ -65,6 +67,10 @@ func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } +func (req *logsRequest) BytesSize() int { + return req.sizer.LogsSize(req.ld) +} + type logsExporter struct { *internal.BaseExporter consumer.Logs @@ -164,6 +170,6 @@ func (lewo *logsExporterWithObservability) Send(ctx context.Context, req Request c := lewo.obsrep.StartLogsOp(ctx) numLogRecords := req.ItemsCount() err := lewo.NextSender.Send(c, req) - lewo.obsrep.EndLogsOp(c, numLogRecords, err) + lewo.obsrep.EndLogsOp(c, numLogRecords, req.BytesSize(), err) return err } diff --git a/exporter/exporterhelper/metadata.yaml b/exporter/exporterhelper/metadata.yaml index 042dccf0057..52e35c9c997 100644 --- a/exporter/exporterhelper/metadata.yaml +++ b/exporter/exporterhelper/metadata.yaml @@ -17,6 +17,14 @@ telemetry: value_type: int monotonic: true + exporter_sent_spans_bytes: + enabled: true + description: Bytes of spans successfully sent to destination. + unit: By + sum: + value_type: int + monotonic: true + exporter_send_failed_spans: enabled: true description: Number of spans in failed attempts to send to destination. @@ -41,6 +49,14 @@ telemetry: value_type: int monotonic: true + exporter_sent_metric_points_bytes: + enabled: true + description: Bytes of metric point successfully sent to destination. + unit: By + sum: + value_type: int + monotonic: true + exporter_send_failed_metric_points: enabled: true description: Number of metric points in failed attempts to send to destination. @@ -65,6 +81,14 @@ telemetry: value_type: int monotonic: true + exporter_sent_log_records_bytes: + enabled: true + description: Bytes of log records successfully sent to destination. + unit: By + sum: + value_type: int + monotonic: true + exporter_send_failed_log_records: enabled: true description: Number of log records in failed attempts to send to destination. diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index b2da8895f98..383896055cc 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -26,12 +26,14 @@ var metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} type metricsRequest struct { md pmetric.Metrics pusher consumer.ConsumeMetricsFunc + sizer pmetric.Sizer } func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request { return &metricsRequest{ md: md, pusher: pusher, + sizer: &pmetric.ProtoMarshaler{}, } } @@ -65,6 +67,10 @@ func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } +func (req *metricsRequest) BytesSize() int { + return req.sizer.MetricsSize(req.md) +} + type metricsExporter struct { *internal.BaseExporter consumer.Metrics @@ -164,6 +170,6 @@ func (mewo *metricsSenderWithObservability) Send(ctx context.Context, req Reques c := mewo.obsrep.StartMetricsOp(ctx) numMetricDataPoints := req.ItemsCount() err := mewo.NextSender.Send(c, req) - mewo.obsrep.EndMetricsOp(c, numMetricDataPoints, err) + mewo.obsrep.EndMetricsOp(c, numMetricDataPoints, req.BytesSize(), err) return err } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 7d7bedbd289..87bacd61b54 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -26,12 +26,14 @@ var tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} type tracesRequest struct { td ptrace.Traces pusher consumer.ConsumeTracesFunc + sizer ptrace.Sizer } func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request { return &tracesRequest{ td: td, pusher: pusher, + sizer: &ptrace.ProtoMarshaler{}, } } @@ -65,6 +67,10 @@ func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } +func (req *tracesRequest) BytesSize() int { + return req.sizer.TracesSize(req.td) +} + type tracesExporter struct { *internal.BaseExporter consumer.Traces @@ -165,6 +171,6 @@ func (tewo *tracesWithObservability) Send(ctx context.Context, req Request) erro numTraceSpans := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). err := tewo.NextSender.Send(c, req) - tewo.obsrep.EndTracesOp(c, numTraceSpans, err) + tewo.obsrep.EndTracesOp(c, numTraceSpans, req.BytesSize(), err) return err } diff --git a/exporter/internal/request.go b/exporter/internal/request.go index ed6ee39af1c..86731c4901c 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -34,6 +34,8 @@ type Request interface { // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, Request) ([]Request, error) + + BytesSize() int } // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial diff --git a/receiver/internal/obsmetrics.go b/receiver/internal/obsmetrics.go index 7a4ba33de32..6d8d8d76348 100644 --- a/receiver/internal/obsmetrics.go +++ b/receiver/internal/obsmetrics.go @@ -16,17 +16,23 @@ const ( // AcceptedSpansKey used to identify spans accepted by the Collector. AcceptedSpansKey = "accepted_spans" + // AcceptedSpansBytes used to identify spans accepted by the Collector in bytes. + AcceptedSpansBytesKey = "accepted_spans_bytes" // RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector. RefusedSpansKey = "refused_spans" // AcceptedMetricPointsKey used to identify metric points accepted by the Collector. AcceptedMetricPointsKey = "accepted_metric_points" + // AcceptedMetricPointsBytes used to identify metric points accepted by the Collector in bytes. + AcceptedMetricPointsBytesKey = "accepted_metric_points_bytes" // RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the // Collector. RefusedMetricPointsKey = "refused_metric_points" // AcceptedLogRecordsKey used to identify log records accepted by the Collector. AcceptedLogRecordsKey = "accepted_log_records" + // AcceptedLogBytesKey used to identify log records accepted by the Collector in bytes. + AcceptedLogBytesKey = "accepted_log_bytes" // RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the // Collector. RefusedLogRecordsKey = "refused_log_records" diff --git a/receiver/receiverhelper/obsreport.go b/receiver/receiverhelper/obsreport.go index 1be60915288..6ac0375ba42 100644 --- a/receiver/receiverhelper/obsreport.go +++ b/receiver/receiverhelper/obsreport.go @@ -81,9 +81,10 @@ func (rec *ObsReport) EndTracesOp( receiverCtx context.Context, format string, numReceivedSpans int, + bytesReceivedSpans int, err error, ) { - rec.endOp(receiverCtx, format, numReceivedSpans, err, pipeline.SignalTraces) + rec.endOp(receiverCtx, format, numReceivedSpans, bytesReceivedSpans, err, pipeline.SignalTraces) } // StartLogsOp is called when a request is received from a client. @@ -99,9 +100,10 @@ func (rec *ObsReport) EndLogsOp( receiverCtx context.Context, format string, numReceivedLogRecords int, + bytesReceivedLogRecords int, err error, ) { - rec.endOp(receiverCtx, format, numReceivedLogRecords, err, pipeline.SignalLogs) + rec.endOp(receiverCtx, format, numReceivedLogRecords, bytesReceivedLogRecords, err, pipeline.SignalLogs) } // StartMetricsOp is called when a request is received from a client. @@ -117,9 +119,10 @@ func (rec *ObsReport) EndMetricsOp( receiverCtx context.Context, format string, numReceivedPoints int, + bytesReceivedPoints int, err error, ) { - rec.endOp(receiverCtx, format, numReceivedPoints, err, pipeline.SignalMetrics) + rec.endOp(receiverCtx, format, numReceivedPoints, bytesReceivedPoints, err, pipeline.SignalMetrics) } // startOp creates the span used to trace the operation. Returning @@ -152,6 +155,7 @@ func (rec *ObsReport) endOp( receiverCtx context.Context, format string, numReceivedItems int, + bytesReceivedItems int, err error, signal pipeline.Signal, ) { @@ -168,22 +172,26 @@ func (rec *ObsReport) endOp( // end span according to errors if span.IsRecording() { - var acceptedItemsKey, refusedItemsKey string + var acceptedItemsKey, bytesItemsKey, refusedItemsKey string switch signal { case pipeline.SignalTraces: acceptedItemsKey = internal.AcceptedSpansKey + bytesItemsKey = internal.AcceptedSpansBytesKey refusedItemsKey = internal.RefusedSpansKey case pipeline.SignalMetrics: acceptedItemsKey = internal.AcceptedMetricPointsKey + bytesItemsKey = internal.AcceptedMetricPointsBytesKey refusedItemsKey = internal.RefusedMetricPointsKey case pipeline.SignalLogs: acceptedItemsKey = internal.AcceptedLogRecordsKey + bytesItemsKey = internal.AcceptedLogBytesKey refusedItemsKey = internal.RefusedLogRecordsKey } span.SetAttributes( attribute.String(internal.FormatKey, format), attribute.Int64(acceptedItemsKey, int64(numAccepted)), + attribute.Int64(bytesItemsKey, int64(bytesReceivedItems)), attribute.Int64(refusedItemsKey, int64(numRefused)), ) if err != nil {