Skip to content

Commit

Permalink
init traces exporter tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Денис Соколов committed Jan 25, 2025
1 parent 70cd61e commit 9a0723e
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 109 deletions.
76 changes: 38 additions & 38 deletions exporter/ydbexporter/internal/logs/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,44 +80,6 @@ func (e *Exporter) PushData(ctx context.Context, ld plog.Logs) error {
return e.client.BulkUpsert(ctx, e.cfg.Name, rows)
}

func (e *Exporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) {
var serviceName string
if v, ok := resourceLog.Resource().Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
recordAttributes, err := json.Marshal(record.Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}
scopeAttributes, err := json.Marshal(scopeLog.Scope().Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}
resourceAttributes, err := json.Marshal(resourceLog.Resource().Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}

return types.StructValue(
types.StructFieldValue("timestamp", types.TimestampValueFromTime(record.Timestamp().AsTime())),
types.StructFieldValue("uuid", types.UTF8Value(uuid.New().String())),
types.StructFieldValue("traceId", types.UTF8Value(traceutil.TraceIDToHexOrEmptyString(record.TraceID()))),
types.StructFieldValue("spanId", types.UTF8Value(traceutil.SpanIDToHexOrEmptyString(record.SpanID()))),
types.StructFieldValue("traceFlags", types.Uint32Value(uint32(record.Flags()))),
types.StructFieldValue("severityText", types.UTF8Value(record.SeverityText())),
types.StructFieldValue("severityNumber", types.Int32Value(int32(record.SeverityNumber()))),
types.StructFieldValue("serviceName", types.UTF8Value(serviceName)),
types.StructFieldValue("body", types.OptionalValue(types.UTF8Value(record.Body().AsString()))),
types.StructFieldValue("resourceSchemaUrl", types.UTF8Value(resourceLog.SchemaUrl())),
types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)),
types.StructFieldValue("scopeSchemaUrl", types.UTF8Value(scopeLog.SchemaUrl())),
types.StructFieldValue("scopeName", types.UTF8Value(scopeLog.Scope().Name())),
types.StructFieldValue("scopeVersion", types.UTF8Value(scopeLog.Scope().Version())),
types.StructFieldValue("scopeAttributes", types.JSONDocumentValueFromBytes(scopeAttributes)),
types.StructFieldValue("logAttributes", types.JSONDocumentValueFromBytes(recordAttributes)),
), nil
}

func (e *Exporter) createTable(ctx context.Context) error {
opts := []options.CreateTableOption{
options.WithColumn("timestamp", types.TypeTimestamp),
Expand Down Expand Up @@ -158,3 +120,41 @@ func (e *Exporter) createTable(ctx context.Context) error {
}
return e.client.CreateTable(ctx, e.cfg.Name, opts...)
}

func (e *Exporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) {
var serviceName string
if v, ok := resourceLog.Resource().Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
recordAttributes, err := json.Marshal(record.Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}
scopeAttributes, err := json.Marshal(scopeLog.Scope().Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}
resourceAttributes, err := json.Marshal(resourceLog.Resource().Attributes().AsRaw())
if err != nil {
return nil, fmt.Errorf("%w: %q", errCannotMarshal, err)
}

return types.StructValue(
types.StructFieldValue("timestamp", types.TimestampValueFromTime(record.Timestamp().AsTime())),
types.StructFieldValue("uuid", types.UTF8Value(uuid.New().String())),
types.StructFieldValue("traceId", types.UTF8Value(traceutil.TraceIDToHexOrEmptyString(record.TraceID()))),
types.StructFieldValue("spanId", types.UTF8Value(traceutil.SpanIDToHexOrEmptyString(record.SpanID()))),
types.StructFieldValue("traceFlags", types.Uint32Value(uint32(record.Flags()))),
types.StructFieldValue("severityText", types.UTF8Value(record.SeverityText())),
types.StructFieldValue("severityNumber", types.Int32Value(int32(record.SeverityNumber()))),
types.StructFieldValue("serviceName", types.UTF8Value(serviceName)),
types.StructFieldValue("body", types.OptionalValue(types.UTF8Value(record.Body().AsString()))),
types.StructFieldValue("resourceSchemaUrl", types.UTF8Value(resourceLog.SchemaUrl())),
types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)),
types.StructFieldValue("scopeSchemaUrl", types.UTF8Value(scopeLog.SchemaUrl())),
types.StructFieldValue("scopeName", types.UTF8Value(scopeLog.Scope().Name())),
types.StructFieldValue("scopeVersion", types.UTF8Value(scopeLog.Scope().Version())),
types.StructFieldValue("scopeAttributes", types.JSONDocumentValueFromBytes(scopeAttributes)),
types.StructFieldValue("logAttributes", types.JSONDocumentValueFromBytes(recordAttributes)),
), nil
}
7 changes: 2 additions & 5 deletions exporter/ydbexporter/internal/logs/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLogsExporter_createTable(t *testing.T) {
})
}

func TestLogsExporter_createRecord(t *testing.T) {
func TestLogsExporter_PushData(t *testing.T) {
t.Run("test check records metadata", func(t *testing.T) {
assertRecordData(t, func(t *testing.T, exporter *loggingExporter) {
mustPushLogsData(t, exporter, simpleLogs(1))
Expand Down Expand Up @@ -126,10 +126,7 @@ func TestLogsExporter_createRecord(t *testing.T) {
require.Equal(t, "ydb", v.Str())
})
})
}

func TestLogsExporter_PushData(t *testing.T) {
t.Run("push data success", func(t *testing.T) {
t.Run("push data more than once success", func(t *testing.T) {
exporter := newTestLogsExporter(t, defaultEndpoint)
loggingExporter := newLoggingExporter(exporter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logs

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand All @@ -15,28 +14,21 @@ type loggingExporter struct {
}

func newLoggingExporter(exporter *Exporter) *loggingExporter {
return &loggingExporter{exporter: exporter, lastUpsertedRecord: plog.LogRecord{}}
return &loggingExporter{exporter: exporter}
}

func (l *loggingExporter) createTable(ctx context.Context) error {
return l.exporter.createTable(ctx)
}

func (l *loggingExporter) createRecord(resourceLog plog.ResourceLogs, record plog.LogRecord, scopeLog plog.ScopeLogs) (types.Value, error) {
l.lastUpsertedResourceLog = resourceLog
l.lastUpsertedRecord = record
l.lastUpsertedScopeLog = scopeLog
l.rows++
return l.exporter.createRecord(resourceLog, record, scopeLog)
}

func (l *loggingExporter) pushData(ctx context.Context, ld plog.Logs) error {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLog := ld.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
record := scopeLog.LogRecords().At(k)

l.lastUpsertedResourceLog = resourceLog
l.lastUpsertedRecord = record
l.lastUpsertedScopeLog = scopeLog
Expand Down
49 changes: 24 additions & 25 deletions exporter/ydbexporter/internal/traces/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
return nil
}

func (e *Exporter) PushData(ctx context.Context, td ptrace.Traces) error {
var values []types.Value
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpans := td.ResourceSpans().At(i)
for j := 0; j < resourceSpans.ScopeSpans().Len(); j++ {
scopeSpans := resourceSpans.ScopeSpans().At(j)
for k := 0; k < scopeSpans.Spans().Len(); k++ {
span := scopeSpans.Spans().At(k)
value, err := e.createRecord(resourceSpans, scopeSpans, span)
if err != nil {
return err
}
values = append(values, value)
}
}
}
return e.client.BulkUpsert(ctx, e.cfg.Name, values)
}

func (e *Exporter) createTable(ctx context.Context) error {
opts := []options.CreateTableOption{
options.WithColumn("timestamp", types.TypeTimestamp),
Expand Down Expand Up @@ -94,39 +113,19 @@ func (e *Exporter) createTable(ctx context.Context) error {
return e.client.CreateTable(ctx, e.cfg.Name, opts...)
}

func (e *Exporter) PushData(ctx context.Context, td ptrace.Traces) error {
var values []types.Value
for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpans := resourceSpan.ScopeSpans().At(j)
for k := 0; k < scopeSpans.Spans().Len(); k++ {
span := scopeSpans.Spans().At(k)
value, err := e.createRecord(resourceSpan, scopeSpans, span)
if err != nil {
return err
}
values = append(values, value)
}
}
}
return e.client.BulkUpsert(ctx, e.cfg.Name, values)
}

func (e *Exporter) createRecord(spans ptrace.ResourceSpans, scopeSpans ptrace.ScopeSpans, span ptrace.Span) (types.Value, error) {
func (e *Exporter) createRecord(resourceSpans ptrace.ResourceSpans, scopeSpans ptrace.ScopeSpans, span ptrace.Span) (types.Value, error) {
var serviceName string
if v, ok := spans.Resource().Attributes().Get(conventions.AttributeServiceName); ok {
if v, ok := resourceSpans.Resource().Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}
resourceAttributes, err := json.Marshal(spans.Resource().Attributes().AsRaw())
resourceAttributes, err := json.Marshal(resourceSpans.Resource().Attributes().AsRaw())
if err != nil {
return nil, err
}
startTime := span.StartTimestamp().AsTime()
duration := span.EndTimestamp().AsTime().Sub(startTime)

spanAttributes := span.Attributes().AsRaw()
spanAttributesJSON, err := json.Marshal(spanAttributes)
spanAttributes, err := json.Marshal(span.Attributes().AsRaw())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,7 +154,7 @@ func (e *Exporter) createRecord(spans ptrace.ResourceSpans, scopeSpans ptrace.Sc
types.StructFieldValue("resourceAttributes", types.JSONDocumentValueFromBytes(resourceAttributes)),
types.StructFieldValue("scopeName", types.UTF8Value(scopeSpans.Scope().Name())),
types.StructFieldValue("scopeVersion", types.UTF8Value(scopeSpans.Scope().Version())),
types.StructFieldValue("spanAttributes", types.JSONDocumentValueFromBytes(spanAttributesJSON)),
types.StructFieldValue("spanAttributes", types.JSONDocumentValueFromBytes(spanAttributes)),
types.StructFieldValue("duration", types.Uint64Value(uint64(duration.Nanoseconds()))),
types.StructFieldValue("statusCode", types.UTF8Value(traceutil.StatusCodeStr(span.Status().Code()))),
types.StructFieldValue("statusMessage", types.UTF8Value(span.Status().Message())),
Expand Down
Loading

0 comments on commit 9a0723e

Please sign in to comment.