Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-v0.112.0 #2

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions exporter/exporterhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ Number of log record successfully sent to destination.
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_exporter_sent_log_records_bytes

Bytes of log records successfully sent to destination.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_exporter_sent_metric_points

Number of metric points successfully sent to destination.
Expand All @@ -86,10 +94,26 @@ Number of metric points successfully sent to destination.
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_exporter_sent_metric_points_bytes

Bytes of metric point successfully sent to destination.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |

### otelcol_exporter_sent_spans

Number of spans successfully sent to destination.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_exporter_sent_spans_bytes

Bytes of spans successfully sent to destination.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| By | Sum | Int | true |
8 changes: 7 additions & 1 deletion exporter/exporterhelper/exporterhelperprofiles/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ var profilesUnmarshaler = &pprofile.ProtoUnmarshaler{}
type profilesRequest struct {
pd pprofile.Profiles
pusher consumerprofiles.ConsumeProfilesFunc
sizer pprofile.Sizer
}

func newProfilesRequest(pd pprofile.Profiles, pusher consumerprofiles.ConsumeProfilesFunc) exporterhelper.Request {
return &profilesRequest{
pd: pd,
pusher: pusher,
sizer: &pprofile.ProtoMarshaler{},
}
}

Expand Down Expand Up @@ -67,6 +69,10 @@ func (req *profilesRequest) ItemsCount() int {
return req.pd.SampleCount()
}

func (req *profilesRequest) BytesSize() int {
return req.sizer.ProfilesSize(req.pd)
}

type profileExporter struct {
*internal.BaseExporter
consumerprofiles.Profiles
Expand Down Expand Up @@ -157,6 +163,6 @@ func (tewo *profilesExporterWithObservability) Send(ctx context.Context, req exp
numSamples := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := tewo.NextSender.Send(c, req)
tewo.obsrep.EndProfilesOp(c, numSamples, err)
tewo.obsrep.EndProfilesOp(c, numSamples, req.BytesSize(), err)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (req *dummyRequest) ItemsCount() int {
return 1
}

func (req *dummyRequest) BytesSize() int {
return 0
}

func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) {
return nil, nil
}
Expand Down
21 changes: 21 additions & 0 deletions exporter/exporterhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 19 additions & 14 deletions exporter/exporterhelper/internal/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (or *ObsReport) StartTracesOp(ctx context.Context) context.Context {
}

// EndTracesOp completes the export operation that was started with startTracesOp.
func (or *ObsReport) EndTracesOp(ctx context.Context, numSpans int, err error) {
func (or *ObsReport) EndTracesOp(ctx context.Context, numSpans, bytesSpans int, err error) {
numSent, numFailedToSend := toNumItems(numSpans, err)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalTraces, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, SentSpansKey, FailedToSendSpansKey)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalTraces, numSent, int64(bytesSpans), numFailedToSend)
endSpan(ctx, err, numSent, int64(bytesSpans), numFailedToSend, SentSpansKey, SentSpansBytesKey, FailedToSendSpansKey)
}

// StartMetricsOp is called at the start of an Export operation.
Expand All @@ -74,10 +74,10 @@ func (or *ObsReport) StartMetricsOp(ctx context.Context) context.Context {
// startMetricsOp.
//
// If needed, report your use case in https://github.com/open-telemetry/opentelemetry-collector/issues/10592.
func (or *ObsReport) EndMetricsOp(ctx context.Context, numMetricPoints int, err error) {
func (or *ObsReport) EndMetricsOp(ctx context.Context, numMetricPoints, bytesMetricPoints int, err error) {
numSent, numFailedToSend := toNumItems(numMetricPoints, err)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalMetrics, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, SentMetricPointsKey, FailedToSendMetricPointsKey)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalMetrics, numSent, int64(bytesMetricPoints), numFailedToSend)
endSpan(ctx, err, numSent, int64(bytesMetricPoints), numFailedToSend, SentMetricPointsKey, SentMetricPointsBytesKey, FailedToSendMetricPointsKey)
}

// StartLogsOp is called at the start of an Export operation.
Expand All @@ -88,10 +88,10 @@ func (or *ObsReport) StartLogsOp(ctx context.Context) context.Context {
}

// EndLogsOp completes the export operation that was started with startLogsOp.
func (or *ObsReport) EndLogsOp(ctx context.Context, numLogRecords int, err error) {
func (or *ObsReport) EndLogsOp(ctx context.Context, numLogRecords int, bytesLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalLogs, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey)
or.recordMetrics(context.WithoutCancel(ctx), pipeline.SignalLogs, numSent, int64(bytesLogRecords), numFailedToSend)
endSpan(ctx, err, numSent, int64(bytesLogRecords), numFailedToSend, SentLogRecordsKey, SentLogRecordsBytesKey, FailedToSendLogRecordsKey)
}

// StartProfilesOp is called at the start of an Export operation.
Expand All @@ -102,9 +102,9 @@ func (or *ObsReport) StartProfilesOp(ctx context.Context) context.Context {
}

// EndProfilesOp completes the export operation that was started with startProfilesOp.
func (or *ObsReport) EndProfilesOp(ctx context.Context, numSpans int, err error) {
func (or *ObsReport) EndProfilesOp(ctx context.Context, numSpans, byteSpans int, err error) {
numSent, numFailedToSend := toNumItems(numSpans, err)
endSpan(ctx, err, numSent, numFailedToSend, SentSamplesKey, FailedToSendSamplesKey)
endSpan(ctx, err, numSent, int64(byteSpans), numFailedToSend, SentSamplesKey, SentSamplesBytesKey, FailedToSendSamplesKey)
}

// startOp creates the span used to trace the operation. Returning
Expand All @@ -115,30 +115,35 @@ func (or *ObsReport) startOp(ctx context.Context, operationSuffix string) contex
return ctx
}

func (or *ObsReport) recordMetrics(ctx context.Context, signal pipeline.Signal, sent, failed int64) {
var sentMeasure, failedMeasure metric.Int64Counter
func (or *ObsReport) recordMetrics(ctx context.Context, signal pipeline.Signal, sent, bytes, failed int64) {
var sentMeasure, bytesMeasure, failedMeasure metric.Int64Counter
switch signal {
case pipeline.SignalTraces:
sentMeasure = or.TelemetryBuilder.ExporterSentSpans
bytesMeasure = or.TelemetryBuilder.ExporterSentSpansBytes
failedMeasure = or.TelemetryBuilder.ExporterSendFailedSpans
case pipeline.SignalMetrics:
sentMeasure = or.TelemetryBuilder.ExporterSentMetricPoints
bytesMeasure = or.TelemetryBuilder.ExporterSentMetricPointsBytes
failedMeasure = or.TelemetryBuilder.ExporterSendFailedMetricPoints
case pipeline.SignalLogs:
sentMeasure = or.TelemetryBuilder.ExporterSentLogRecords
bytesMeasure = or.TelemetryBuilder.ExporterSentLogRecordsBytes
failedMeasure = or.TelemetryBuilder.ExporterSendFailedLogRecords
}

sentMeasure.Add(ctx, sent, or.otelAttrs)
bytesMeasure.Add(ctx, bytes, or.otelAttrs)
failedMeasure.Add(ctx, failed, or.otelAttrs)
}

func endSpan(ctx context.Context, err error, numSent, numFailedToSend int64, sentItemsKey, failedToSendItemsKey string) {
func endSpan(ctx context.Context, err error, numSent, bytesSent, numFailedToSend int64, sentItemsKey, sentBytesKey, failedToSendItemsKey string) {
span := trace.SpanFromContext(ctx)
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(sentItemsKey, numSent),
attribute.Int64(sentBytesKey, bytesSent),
attribute.Int64(failedToSendItemsKey, numFailedToSend),
)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/obsexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestExportTraceDataOp(t *testing.T) {
for i := range params {
ctx := obsrep.StartTracesOp(parentCtx)
assert.NotNil(t, ctx)
obsrep.EndTracesOp(ctx, params[i].items, params[i].err)
obsrep.EndTracesOp(ctx, params[i].items, 0, params[i].err)
}

spans := tt.SpanRecorder.Ended()
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestExportMetricsOp(t *testing.T) {
ctx := obsrep.StartMetricsOp(parentCtx)
assert.NotNil(t, ctx)

obsrep.EndMetricsOp(ctx, params[i].items, params[i].err)
obsrep.EndMetricsOp(ctx, params[i].items, 0, params[i].err)
}

spans := tt.SpanRecorder.Ended()
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestExportLogsOp(t *testing.T) {
ctx := obsrep.StartLogsOp(parentCtx)
assert.NotNil(t, ctx)

obsrep.EndLogsOp(ctx, params[i].items, params[i].err)
obsrep.EndLogsOp(ctx, params[i].items, 0, params[i].err)
}

spans := tt.SpanRecorder.Ended()
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestCheckExporterTracesViews(t *testing.T) {
require.NoError(t, err)
ctx := obsrep.StartTracesOp(context.Background())
require.NotNil(t, ctx)
obsrep.EndTracesOp(ctx, 7, nil)
obsrep.EndTracesOp(ctx, 7, 0, nil)

require.NoError(t, tt.CheckExporterTraces(7, 0))
require.Error(t, tt.CheckExporterTraces(7, 7))
Expand All @@ -202,7 +202,7 @@ func TestCheckExporterMetricsViews(t *testing.T) {
require.NoError(t, err)
ctx := obsrep.StartMetricsOp(context.Background())
require.NotNil(t, ctx)
obsrep.EndMetricsOp(ctx, 7, nil)
obsrep.EndMetricsOp(ctx, 7, 0, nil)

require.NoError(t, tt.CheckExporterMetrics(7, 0))
require.Error(t, tt.CheckExporterMetrics(7, 7))
Expand All @@ -222,7 +222,7 @@ func TestCheckExporterLogsViews(t *testing.T) {
require.NoError(t, err)
ctx := obsrep.StartLogsOp(context.Background())
require.NotNil(t, ctx)
obsrep.EndLogsOp(ctx, 7, nil)
obsrep.EndLogsOp(ctx, 7, 0, nil)

require.NoError(t, tt.CheckExporterLogs(7, 0))
require.Error(t, tt.CheckExporterLogs(7, 7))
Expand Down
9 changes: 8 additions & 1 deletion exporter/exporterhelper/internal/obsmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ const (

// SentSpansKey used to track spans sent by exporters.
SentSpansKey = "sent_spans"
// SentSpansBytesKey used to track spans bytes sent by exporters.
SentSpansBytesKey = "sent_spans_bytes"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
FailedToSendSpansKey = "send_failed_spans"

// SentLogRecordsBytesKey used to track logs bytes sent by exporters.
SentLogRecordsBytesKey = "sent_log_records_bytes"
// SentMetricPointsKey used to track metric points sent by exporters.
SentMetricPointsKey = "sent_metric_points"
// SentMetricPointsBytesKey used to track metric points bytes sent by exporters.
SentMetricPointsBytesKey = "sent_metric_points_bytes"
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
FailedToSendMetricPointsKey = "send_failed_metric_points"

Expand All @@ -30,6 +35,8 @@ const (

// SentSamplesKey used to track profiles samples sent by exporters.
SentSamplesKey = "sent_samples"
// SentSamplesBytesKey used to track profiles samples bytes sent by exporters.
SentSamplesBytesKey = "sent_samples_bytes"
// FailedToSendSamplesKey used to track samples that failed to be sent by exporters.
FailedToSendSamplesKey = "send_failed_samples"

Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) BytesSize() int {
return 0
}

func (r *fakeRequest) Merge(_ context.Context,
r2 internal.Request) (internal.Request, error) {
if r == nil {
Expand Down
8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
return 7
}

func (mer *mockErrorRequest) BytesSize() int {
return 0
}

func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
return nil, nil
}
Expand Down Expand Up @@ -399,6 +403,10 @@ func (m *mockRequest) ItemsCount() int {
return m.cnt
}

func (m *mockRequest) BytesSize() int {
return 0
}

func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) {
return nil, nil
}
Expand Down
8 changes: 7 additions & 1 deletion exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ var logsUnmarshaler = &plog.ProtoUnmarshaler{}
type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
sizer plog.Sizer
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
sizer: &plog.ProtoMarshaler{},
}
}

Expand Down Expand Up @@ -65,6 +67,10 @@ func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}

func (req *logsRequest) BytesSize() int {
return req.sizer.LogsSize(req.ld)
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down Expand Up @@ -164,6 +170,6 @@ func (lewo *logsExporterWithObservability) Send(ctx context.Context, req Request
c := lewo.obsrep.StartLogsOp(ctx)
numLogRecords := req.ItemsCount()
err := lewo.NextSender.Send(c, req)
lewo.obsrep.EndLogsOp(c, numLogRecords, err)
lewo.obsrep.EndLogsOp(c, numLogRecords, req.BytesSize(), err)
return err
}
Loading
Loading