From 9a0723e588ec39a65b10c5b8cb17a93d7f5f2cbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BD=D0=B8=D1=81=20=D0=A1=D0=BE=D0=BA=D0=BE?= =?UTF-8?q?=D0=BB=D0=BE=D0=B2?= Date: Sun, 26 Jan 2025 00:19:05 +0300 Subject: [PATCH] init traces exporter tests --- .../ydbexporter/internal/logs/exporter.go | 76 ++++---- .../internal/logs/exporter_test.go | 7 +- .../{exporter_mock.go => logging_exporter.go} | 12 +- .../ydbexporter/internal/traces/exporter.go | 49 +++--- .../internal/traces/exporter_test.go | 166 ++++++++++++++---- .../internal/traces/logging_exporter.go | 41 +++++ 6 files changed, 242 insertions(+), 109 deletions(-) rename exporter/ydbexporter/internal/logs/{exporter_mock.go => logging_exporter.go} (68%) create mode 100644 exporter/ydbexporter/internal/traces/logging_exporter.go diff --git a/exporter/ydbexporter/internal/logs/exporter.go b/exporter/ydbexporter/internal/logs/exporter.go index e69722dc4901..b16ecd610f0b 100644 --- a/exporter/ydbexporter/internal/logs/exporter.go +++ b/exporter/ydbexporter/internal/logs/exporter.go @@ -80,44 +80,6 @@ func (e *Exporter) PushData(ctx context.Context, ld plog.Logs) error { return e.client.BulkUpsert(ctx, e.cfg.Name, rows) } -func (e *Exporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) { - var serviceName string - if v, ok := resourceLog.Resource().Attributes().Get(conventions.AttributeServiceName); ok { - serviceName = v.Str() - } - recordAttributes, err := json.Marshal(record.Attributes().AsRaw()) - if err != nil { - return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) - } - scopeAttributes, err := json.Marshal(scopeLog.Scope().Attributes().AsRaw()) - if err != nil { - return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) - } - resourceAttributes, err := json.Marshal(resourceLog.Resource().Attributes().AsRaw()) - if err != nil { - return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) - } - - return types.StructValue( - types.StructFieldValue("timestamp", types.TimestampValueFromTime(record.Timestamp().AsTime())), - types.StructFieldValue("uuid", types.UTF8Value(uuid.New().String())), - types.StructFieldValue("traceId", types.UTF8Value(traceutil.TraceIDToHexOrEmptyString(record.TraceID()))), - types.StructFieldValue("spanId", types.UTF8Value(traceutil.SpanIDToHexOrEmptyString(record.SpanID()))), - types.StructFieldValue("traceFlags", types.Uint32Value(uint32(record.Flags()))), - types.StructFieldValue("severityText", types.UTF8Value(record.SeverityText())), - types.StructFieldValue("severityNumber", types.Int32Value(int32(record.SeverityNumber()))), - types.StructFieldValue("serviceName", types.UTF8Value(serviceName)), - types.StructFieldValue("body", types.OptionalValue(types.UTF8Value(record.Body().AsString()))), - types.StructFieldValue("resourceSchemaUrl", types.UTF8Value(resourceLog.SchemaUrl())), - types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)), - types.StructFieldValue("scopeSchemaUrl", types.UTF8Value(scopeLog.SchemaUrl())), - types.StructFieldValue("scopeName", types.UTF8Value(scopeLog.Scope().Name())), - types.StructFieldValue("scopeVersion", types.UTF8Value(scopeLog.Scope().Version())), - types.StructFieldValue("scopeAttributes", types.JSONDocumentValueFromBytes(scopeAttributes)), - types.StructFieldValue("logAttributes", types.JSONDocumentValueFromBytes(recordAttributes)), - ), nil -} - func (e *Exporter) createTable(ctx context.Context) error { opts := []options.CreateTableOption{ options.WithColumn("timestamp", types.TypeTimestamp), @@ -158,3 +120,41 @@ func (e *Exporter) createTable(ctx context.Context) error { } return e.client.CreateTable(ctx, e.cfg.Name, opts...) } + +func (e *Exporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) { + var serviceName string + if v, ok := resourceLog.Resource().Attributes().Get(conventions.AttributeServiceName); ok { + serviceName = v.Str() + } + recordAttributes, err := json.Marshal(record.Attributes().AsRaw()) + if err != nil { + return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) + } + scopeAttributes, err := json.Marshal(scopeLog.Scope().Attributes().AsRaw()) + if err != nil { + return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) + } + resourceAttributes, err := json.Marshal(resourceLog.Resource().Attributes().AsRaw()) + if err != nil { + return nil, fmt.Errorf("%w: %q", errCannotMarshal, err) + } + + return types.StructValue( + types.StructFieldValue("timestamp", types.TimestampValueFromTime(record.Timestamp().AsTime())), + types.StructFieldValue("uuid", types.UTF8Value(uuid.New().String())), + types.StructFieldValue("traceId", types.UTF8Value(traceutil.TraceIDToHexOrEmptyString(record.TraceID()))), + types.StructFieldValue("spanId", types.UTF8Value(traceutil.SpanIDToHexOrEmptyString(record.SpanID()))), + types.StructFieldValue("traceFlags", types.Uint32Value(uint32(record.Flags()))), + types.StructFieldValue("severityText", types.UTF8Value(record.SeverityText())), + types.StructFieldValue("severityNumber", types.Int32Value(int32(record.SeverityNumber()))), + types.StructFieldValue("serviceName", types.UTF8Value(serviceName)), + types.StructFieldValue("body", types.OptionalValue(types.UTF8Value(record.Body().AsString()))), + types.StructFieldValue("resourceSchemaUrl", types.UTF8Value(resourceLog.SchemaUrl())), + types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)), + types.StructFieldValue("scopeSchemaUrl", types.UTF8Value(scopeLog.SchemaUrl())), + types.StructFieldValue("scopeName", types.UTF8Value(scopeLog.Scope().Name())), + types.StructFieldValue("scopeVersion", types.UTF8Value(scopeLog.Scope().Version())), + types.StructFieldValue("scopeAttributes", types.JSONDocumentValueFromBytes(scopeAttributes)), + types.StructFieldValue("logAttributes", types.JSONDocumentValueFromBytes(recordAttributes)), + ), nil +} diff --git a/exporter/ydbexporter/internal/logs/exporter_test.go b/exporter/ydbexporter/internal/logs/exporter_test.go index 86f2502c894f..83233596d1a3 100644 --- a/exporter/ydbexporter/internal/logs/exporter_test.go +++ b/exporter/ydbexporter/internal/logs/exporter_test.go @@ -63,7 +63,7 @@ func TestLogsExporter_createTable(t *testing.T) { }) } -func TestLogsExporter_createRecord(t *testing.T) { +func TestLogsExporter_PushData(t *testing.T) { t.Run("test check records metadata", func(t *testing.T) { assertRecordData(t, func(t *testing.T, exporter *loggingExporter) { mustPushLogsData(t, exporter, simpleLogs(1)) @@ -126,10 +126,7 @@ func TestLogsExporter_createRecord(t *testing.T) { require.Equal(t, "ydb", v.Str()) }) }) -} - -func TestLogsExporter_PushData(t *testing.T) { - t.Run("push data success", func(t *testing.T) { + t.Run("push data more than once success", func(t *testing.T) { exporter := newTestLogsExporter(t, defaultEndpoint) loggingExporter := newLoggingExporter(exporter) diff --git a/exporter/ydbexporter/internal/logs/exporter_mock.go b/exporter/ydbexporter/internal/logs/logging_exporter.go similarity index 68% rename from exporter/ydbexporter/internal/logs/exporter_mock.go rename to exporter/ydbexporter/internal/logs/logging_exporter.go index e51a859316ab..e20cbe58f1d3 100644 --- a/exporter/ydbexporter/internal/logs/exporter_mock.go +++ b/exporter/ydbexporter/internal/logs/logging_exporter.go @@ -2,7 +2,6 @@ package logs import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.opentelemetry.io/collector/pdata/plog" ) @@ -15,21 +14,13 @@ type loggingExporter struct { } func newLoggingExporter(exporter *Exporter) *loggingExporter { - return &loggingExporter{exporter: exporter, lastUpsertedRecord: plog.LogRecord{}} + return &loggingExporter{exporter: exporter} } func (l *loggingExporter) createTable(ctx context.Context) error { return l.exporter.createTable(ctx) } -func (l *loggingExporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) { - l.lastUpsertedResourceLog = resourceLog - l.lastUpsertedRecord = record - l.lastUpsertedScopeLog = scopeLog - l.rows++ - return l.exporter.createRecord(resourceLog, record, scopeLog) -} - func (l *loggingExporter) pushData(ctx context.Context, ld plog.Logs) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { resourceLog := ld.ResourceLogs().At(i) @@ -37,6 +28,7 @@ func (l *loggingExporter) pushData(ctx context.Context, ld plog.Logs) error { scopeLog := resourceLog.ScopeLogs().At(j) for k := 0; k < scopeLog.LogRecords().Len(); k++ { record := scopeLog.LogRecords().At(k) + l.lastUpsertedResourceLog = resourceLog l.lastUpsertedRecord = record l.lastUpsertedScopeLog = scopeLog diff --git a/exporter/ydbexporter/internal/traces/exporter.go b/exporter/ydbexporter/internal/traces/exporter.go index 9203781c87e6..a0eabf34f015 100644 --- a/exporter/ydbexporter/internal/traces/exporter.go +++ b/exporter/ydbexporter/internal/traces/exporter.go @@ -52,6 +52,25 @@ func (e *Exporter) Shutdown(ctx context.Context) error { return nil } +func (e *Exporter) PushData(ctx context.Context, td ptrace.Traces) error { + var values []types.Value + for i := 0; i < td.ResourceSpans().Len(); i++ { + resourceSpans := td.ResourceSpans().At(i) + for j := 0; j < resourceSpans.ScopeSpans().Len(); j++ { + scopeSpans := resourceSpans.ScopeSpans().At(j) + for k := 0; k < scopeSpans.Spans().Len(); k++ { + span := scopeSpans.Spans().At(k) + value, err := e.createRecord(resourceSpans, scopeSpans, span) + if err != nil { + return err + } + values = append(values, value) + } + } + } + return e.client.BulkUpsert(ctx, e.cfg.Name, values) +} + func (e *Exporter) createTable(ctx context.Context) error { opts := []options.CreateTableOption{ options.WithColumn("timestamp", types.TypeTimestamp), @@ -94,39 +113,19 @@ func (e *Exporter) createTable(ctx context.Context) error { return e.client.CreateTable(ctx, e.cfg.Name, opts...) } -func (e *Exporter) PushData(ctx context.Context, td ptrace.Traces) error { - var values []types.Value - for i := 0; i < td.ResourceSpans().Len(); i++ { - resourceSpan := td.ResourceSpans().At(i) - for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { - scopeSpans := resourceSpan.ScopeSpans().At(j) - for k := 0; k < scopeSpans.Spans().Len(); k++ { - span := scopeSpans.Spans().At(k) - value, err := e.createRecord(resourceSpan, scopeSpans, span) - if err != nil { - return err - } - values = append(values, value) - } - } - } - return e.client.BulkUpsert(ctx, e.cfg.Name, values) -} - -func (e *Exporter) createRecord(spans ptrace.ResourceSpans, scopeSpans ptrace.ScopeSpans, span ptrace.Span) (types.Value, error) { +func (e *Exporter) createRecord(resourceSpans ptrace.ResourceSpans, scopeSpans ptrace.ScopeSpans, span ptrace.Span) (types.Value, error) { var serviceName string - if v, ok := spans.Resource().Attributes().Get(conventions.AttributeServiceName); ok { + if v, ok := resourceSpans.Resource().Attributes().Get(conventions.AttributeServiceName); ok { serviceName = v.Str() } - resourceAttributes, err := json.Marshal(spans.Resource().Attributes().AsRaw()) + resourceAttributes, err := json.Marshal(resourceSpans.Resource().Attributes().AsRaw()) if err != nil { return nil, err } startTime := span.StartTimestamp().AsTime() duration := span.EndTimestamp().AsTime().Sub(startTime) - spanAttributes := span.Attributes().AsRaw() - spanAttributesJSON, err := json.Marshal(spanAttributes) + spanAttributes, err := json.Marshal(span.Attributes().AsRaw()) if err != nil { return nil, err } @@ -155,7 +154,7 @@ func (e *Exporter) createRecord(spans ptrace.ResourceSpans, scopeSpans ptrace.Sc types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)), types.StructFieldValue("scopeName", types.UTF8Value(scopeSpans.Scope().Name())), types.StructFieldValue("scopeVersion", types.UTF8Value(scopeSpans.Scope().Version())), - types.StructFieldValue("spanAttributes", types.JSONDocumentValueFromBytes(spanAttributesJSON)), + types.StructFieldValue("spanAttributes", types.JSONDocumentValueFromBytes(spanAttributes)), types.StructFieldValue("duration", types.Uint64Value(uint64(duration.Nanoseconds()))), types.StructFieldValue("statusCode", types.UTF8Value(traceutil.StatusCodeStr(span.Status().Code()))), types.StructFieldValue("statusMessage", types.UTF8Value(span.Status().Message())), diff --git a/exporter/ydbexporter/internal/traces/exporter_test.go b/exporter/ydbexporter/internal/traces/exporter_test.go index 9ea95e5851ca..8957aca6ac2a 100644 --- a/exporter/ydbexporter/internal/traces/exporter_test.go +++ b/exporter/ydbexporter/internal/traces/exporter_test.go @@ -50,9 +50,6 @@ func TestTracesExporter_New(t *testing.T) { } func TestTracesExporter_createTable(t *testing.T) { - if os.Getenv("RUN_DOCKER_TESTS") == "" { - t.Skip() - } createTable := func(t *testing.T, endpoint string) error { exporter, err := NewExporter(zaptest.NewLogger(t), withTestExporterConfig()(endpoint)) require.NoError(t, err) @@ -65,28 +62,120 @@ func TestTracesExporter_createTable(t *testing.T) { }) } -func TestTracesExporter_createRecord(t *testing.T) { - if os.Getenv("RUN_DOCKER_TESTS") == "" { - t.Skip() - } - t.Run("create record success", func(t *testing.T) { - exporter := newTestTracesExporter(t, defaultEndpoint) - _, err := exporter.createRecord(ptrace.NewResourceSpans(), ptrace.NewScopeSpans(), ptrace.NewSpan()) - require.NoError(t, err) +func TestTracesExporter_PushData(t *testing.T) { + t.Run("test check span metadata", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + startTime := exporter.lastUpsertedSpan.StartTimestamp().AsTime() + + require.Equal(t, time.Minute, exporter.lastUpsertedSpan.EndTimestamp().AsTime().Sub(startTime)) + require.Equal(t, pcommon.SpanID{2, 3}, exporter.lastUpsertedSpan.SpanID()) + require.Equal(t, pcommon.TraceID{4, 5}, exporter.lastUpsertedSpan.TraceID()) + require.Equal(t, pcommon.SpanID{6, 7}, exporter.lastUpsertedSpan.ParentSpanID()) + require.Equal(t, "span1", exporter.lastUpsertedSpan.Name()) + require.Equal(t, ptrace.SpanKind(1), exporter.lastUpsertedSpan.Kind()) + }) }) -} + t.Run("test check scope metadata", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) -func TestTracesExporter_PushData(t *testing.T) { - if os.Getenv("RUN_DOCKER_TESTS") == "" { - t.Skip() - } - t.Run("push data success", func(t *testing.T) { + require.Equal(t, + "io.opentelemetry.contrib.ydb", + exporter.lastUpsertedScopeSpan.Scope().Name()) + + require.Equal(t, "1.0.0", exporter.lastUpsertedScopeSpan.Scope().Version()) + }) + }) + t.Run("test check span event metadata", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + require.Equal(t, "event1", exporter.lastUpsertedSpan.Events().At(0).Name()) + require.Equal(t, "event2", exporter.lastUpsertedSpan.Events().At(1).Name()) + }) + }) + t.Run("test check span link metadata", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + require.Equal(t, pcommon.SpanID{8, 9}, exporter.lastUpsertedSpan.Links().At(0).SpanID()) + require.Equal(t, pcommon.TraceID{10, 11}, exporter.lastUpsertedSpan.Links().At(0).TraceID()) + require.Equal(t, pcommon.SpanID{12, 13}, exporter.lastUpsertedSpan.Links().At(1).SpanID()) + require.Equal(t, pcommon.TraceID{14, 15}, exporter.lastUpsertedSpan.Links().At(1).TraceID()) + }) + }) + t.Run("test check span attributes", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + v, ok := exporter.lastUpsertedSpan.Attributes().Get(conventions.AttributeServiceName) + require.True(t, ok) + require.Equal(t, "span", v.Str()) + }) + }) + t.Run("test check resource attributes", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + v, ok := exporter.lastUpsertedResourceSpan.Resource().Attributes().Get(conventions.AttributeServiceName) + require.True(t, ok) + require.Equal(t, "test-service", v.Str()) + }) + }) + t.Run("test check scope attributes", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + v, ok := exporter.lastUpsertedScopeSpan.Scope().Attributes().Get("lib") + require.True(t, ok) + require.Equal(t, "ydb", v.Str()) + }) + }) + t.Run("test check span event attributes", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + v, ok := exporter.lastUpsertedSpan.Events().At(0).Attributes().Get("event1") + require.True(t, ok) + require.Equal(t, "1", v.Str()) + + v, ok = exporter.lastUpsertedSpan.Events().At(1).Attributes().Get("event2") + require.True(t, ok) + require.Equal(t, "2", v.Str()) + }) + }) + t.Run("test check span links attributes", func(t *testing.T) { + assertSpanData(t, func(t *testing.T, exporter *loggingExporter) { + mustPushTracesData(t, exporter, simpleTraces(1)) + + v, ok := exporter.lastUpsertedSpan.Links().At(0).Attributes().Get("link1") + require.True(t, ok) + require.Equal(t, "3", v.Str()) + + v, ok = exporter.lastUpsertedSpan.Links().At(1).Attributes().Get("link2") + require.True(t, ok) + require.Equal(t, "4", v.Str()) + }) + }) + t.Run("push data more than once success", func(t *testing.T) { exporter := newTestTracesExporter(t, defaultEndpoint) - mustPushTracesData(t, exporter, simpleTraces(1)) - mustPushTracesData(t, exporter, simpleTraces(2)) + loggingExporter := newLoggingExporter(exporter) + + mustPushTracesData(t, loggingExporter, simpleTraces(1)) + mustPushTracesData(t, loggingExporter, simpleTraces(2)) + require.Equal(t, 3, loggingExporter.rows) }) } +func assertSpanData(t *testing.T, assert func(*testing.T, *loggingExporter)) { + exporter := newTestTracesExporter(t, defaultEndpoint) + loggingExporter := newLoggingExporter(exporter) + + assert(t, loggingExporter) +} + func newTestTracesExporter(t *testing.T, dsn string, fns ...func(*config.Config)) *Exporter { exporter, err := NewExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(dsn)) require.NoError(t, err) @@ -113,30 +202,45 @@ func withTestExporterConfig(fns ...func(*config.Config)) func(string) *config.Co func simpleTraces(count int) ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() - rs.SetSchemaUrl("https://opentelemetry.io/schemas/1.4.0") - rs.Resource().SetDroppedAttributesCount(10) - rs.Resource().Attributes().PutStr("service.name", "test-service") + rs.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test-service") ss := rs.ScopeSpans().AppendEmpty() ss.Scope().SetName("io.opentelemetry.contrib.ydb") ss.Scope().SetVersion("1.0.0") - ss.SetSchemaUrl("https://opentelemetry.io/schemas/1.7.0") - ss.Scope().SetDroppedAttributesCount(20) ss.Scope().Attributes().PutStr("lib", "ydb") for i := 0; i < count; i++ { s := ss.Spans().AppendEmpty() - s.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) - s.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) - s.Attributes().PutStr(conventions.AttributeServiceName, "v") + now := time.Now() + s.SetStartTimestamp(pcommon.NewTimestampFromTime(now)) + s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(time.Minute))) + s.SetSpanID([8]byte{2, 3}) + s.SetTraceID([16]byte{4, 5}) + s.SetParentSpanID([8]byte{6, 7}) + s.SetName("span1") + s.SetKind(1) + s.Attributes().PutStr(conventions.AttributeServiceName, "span") + event := s.Events().AppendEmpty() event.SetName("event1") - event.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + event.Attributes().PutStr("event1", "1") + + event = s.Events().AppendEmpty() + event.SetName("event2") + event.Attributes().PutStr("event2", "2") + link := s.Links().AppendEmpty() - link.Attributes().PutStr("k", "v") + link.SetSpanID([8]byte{8, 9}) + link.SetTraceID([16]byte{10, 11}) + link.Attributes().PutStr("link1", "3") + + link = s.Links().AppendEmpty() + link.SetSpanID([8]byte{12, 13}) + link.SetTraceID([16]byte{14, 15}) + link.Attributes().PutStr("link2", "4") } return traces } -func mustPushTracesData(t *testing.T, exporter *Exporter, td ptrace.Traces) { - err := exporter.PushData(context.TODO(), td) +func mustPushTracesData(t *testing.T, exporter *loggingExporter, td ptrace.Traces) { + err := exporter.pushData(context.TODO(), td) require.NoError(t, err) } diff --git a/exporter/ydbexporter/internal/traces/logging_exporter.go b/exporter/ydbexporter/internal/traces/logging_exporter.go new file mode 100644 index 000000000000..1c0d06c4f610 --- /dev/null +++ b/exporter/ydbexporter/internal/traces/logging_exporter.go @@ -0,0 +1,41 @@ +package traces + +import ( + "context" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type loggingExporter struct { + exporter *Exporter + rows int + lastUpsertedResourceSpan ptrace.ResourceSpans + lastUpsertedScopeSpan ptrace.ScopeSpans + lastUpsertedSpan ptrace.Span +} + +func newLoggingExporter(exporter *Exporter) *loggingExporter { + return &loggingExporter{exporter: exporter} +} + +func (l *loggingExporter) createTable(ctx context.Context) error { + return l.exporter.createTable(ctx) +} + +func (l *loggingExporter) pushData(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + resourceSpan := td.ResourceSpans().At(i) + for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { + scopeSpans := resourceSpan.ScopeSpans().At(j) + for k := 0; k < scopeSpans.Spans().Len(); k++ { + span := scopeSpans.Spans().At(k) + + l.lastUpsertedResourceSpan = resourceSpan + l.lastUpsertedScopeSpan = scopeSpans + l.lastUpsertedSpan = span + l.rows++ + } + } + } + + return l.exporter.PushData(ctx, td) +}