From 18d332cf58c2b46544449031507a77345c173788 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Fri, 24 Jan 2025 10:09:49 +0100 Subject: [PATCH] move esIndex into elasticsearch.Index --- .../data_stream_router.go | 30 ++++--------------- .../data_stream_router_test.go | 8 +++-- exporter/elasticsearchexporter/exporter.go | 23 +++++++------- .../internal/elasticsearch/index.go | 26 ++++++++++++++++ exporter/elasticsearchexporter/model.go | 30 +++++++++---------- exporter/elasticsearchexporter/model_test.go | 29 +++++++++--------- .../elasticsearchexporter/pdata_serializer.go | 12 ++++---- .../pdata_serializer_test.go | 6 ++-- 8 files changed, 89 insertions(+), 75 deletions(-) create mode 100644 exporter/elasticsearchexporter/internal/elasticsearch/index.go diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 9e8054efc7e5..2a55d3c4bb82 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -10,6 +10,8 @@ import ( "unicode" "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" ) var receiverRegex = regexp.MustCompile(`/receiver/(\w*receiver)`) @@ -46,7 +48,7 @@ func routeWithDefaults(defaultDSType string) func( string, bool, string, -) esIndex { +) elasticsearch.Index { return func( recordAttr pcommon.Map, scopeAttr pcommon.Map, @@ -54,7 +56,7 @@ func routeWithDefaults(defaultDSType string) func( fIndex string, otel bool, scopeName string, - ) esIndex { + ) elasticsearch.Index { // Order: // 1. read data_stream.* from attributes // 2. read elasticsearch.index.* from attributes @@ -67,7 +69,7 @@ func routeWithDefaults(defaultDSType string) func( prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) suffix, suffixExists := getFromAttributes(indexSuffix, "", resourceAttr, scopeAttr, recordAttr) if prefixExists || suffixExists { - return esIndex{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)} + return elasticsearch.Index{Index: fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)} } } @@ -89,30 +91,10 @@ func routeWithDefaults(defaultDSType string) func( dataset = sanitizeDataStreamField(dataset, disallowedDatasetRunes, datasetSuffix) namespace = sanitizeDataStreamField(namespace, disallowedNamespaceRunes, "") - return newDataStream(defaultDSType, dataset, namespace) - } -} - -type esIndex struct { - Index string - Type string - Dataset string - Namespace string -} - -func newDataStream(typ, dataset, namespace string) esIndex { - return esIndex{ - Index: fmt.Sprintf("%s-%s-%s", typ, dataset, namespace), - Type: typ, - Dataset: dataset, - Namespace: namespace, + return elasticsearch.NewDataStreamIndex(defaultDSType, dataset, namespace) } } -func (i esIndex) isDataStream() bool { - return i.Type != "" && i.Dataset != "" && i.Namespace != "" -} - var ( // routeLogRecord returns the name of the index to send the log record to according to data stream routing related attributes. // This function may mutate record attributes. diff --git a/exporter/elasticsearchexporter/data_stream_router_test.go b/exporter/elasticsearchexporter/data_stream_router_test.go index 3bce5cea6d84..20993d230d7c 100644 --- a/exporter/elasticsearchexporter/data_stream_router_test.go +++ b/exporter/elasticsearchexporter/data_stream_router_test.go @@ -8,21 +8,23 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" ) type routeTestCase struct { name string otel bool scopeName string - want esIndex + want elasticsearch.Index } func createRouteTests(dsType string) []routeTestCase { - renderWantRoute := func(dsType, dsDataset string, otel bool) esIndex { + renderWantRoute := func(dsType, dsDataset string, otel bool) elasticsearch.Index { if otel { dsDataset += ".otel" } - return newDataStream(dsType, dsDataset, defaultDataStreamNamespace) + return elasticsearch.NewDataStreamIndex(dsType, dsDataset, defaultDataStreamNamespace) } return []routeTestCase{ diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 52bb13a599e3..6022abf30595 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool" ) @@ -167,7 +168,7 @@ func (e *elasticsearchExporter) pushLogRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := esIndex{Index: e.index} + fIndex := elasticsearch.Index{Index: e.index} if e.dynamicIndex { fIndex = routeLogRecord(record.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } @@ -177,7 +178,7 @@ func (e *elasticsearchExporter) pushLogRecord( if err != nil { return err } - fIndex = esIndex{Index: formattedIndex} + fIndex = elasticsearch.Index{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() @@ -216,7 +217,7 @@ func (e *elasticsearchExporter) pushMetricsData( var validationErrs []error // log instead of returning these so that upstream does not retry scopeMetrics := scopeMetrics.At(j) scope := scopeMetrics.Scope() - groupedDataPointsByIndex := make(map[esIndex]map[uint32][]dataPoint) + groupedDataPointsByIndex := make(map[elasticsearch.Index]map[uint32][]dataPoint) for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) @@ -334,8 +335,8 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( resource pcommon.Resource, scope pcommon.InstrumentationScope, dataPoint dataPoint, -) (esIndex, error) { - fIndex := esIndex{Index: e.index} +) (elasticsearch.Index, error) { + fIndex := elasticsearch.Index{Index: e.index} if e.dynamicIndex { fIndex = routeDataPoint(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } @@ -343,9 +344,9 @@ func (e *elasticsearchExporter) getMetricDataPointIndex( if e.logstashFormat.Enabled { formattedIndex, err := generateIndexWithLogstashFormat(fIndex.Index, &e.logstashFormat, time.Now()) if err != nil { - return esIndex{}, err + return elasticsearch.Index{}, err } - fIndex = esIndex{Index: formattedIndex} + fIndex = elasticsearch.Index{Index: formattedIndex} } return fIndex, nil } @@ -409,7 +410,7 @@ func (e *elasticsearchExporter) pushTraceRecord( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := esIndex{Index: e.index} + fIndex := elasticsearch.Index{Index: e.index} if e.dynamicIndex { fIndex = routeSpan(span.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, span.Name()) } @@ -419,7 +420,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return err } - fIndex = esIndex{Index: formattedIndex} + fIndex = elasticsearch.Index{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() @@ -442,7 +443,7 @@ func (e *elasticsearchExporter) pushSpanEvent( scopeSchemaURL string, bulkIndexerSession bulkIndexerSession, ) error { - fIndex := esIndex{Index: e.index} + fIndex := elasticsearch.Index{Index: e.index} if e.dynamicIndex { fIndex = routeSpanEvent(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), e.index, e.otel, scope.Name()) } @@ -452,7 +453,7 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - fIndex = esIndex{Index: formattedIndex} + fIndex = elasticsearch.Index{Index: formattedIndex} } buf := e.bufferPool.NewPooledBuffer() e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL, fIndex, buf.Buffer) diff --git a/exporter/elasticsearchexporter/internal/elasticsearch/index.go b/exporter/elasticsearchexporter/internal/elasticsearch/index.go new file mode 100644 index 000000000000..830d53c65613 --- /dev/null +++ b/exporter/elasticsearchexporter/internal/elasticsearch/index.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearch // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" + +import "fmt" + +type Index struct { + Index string + Type string + Dataset string + Namespace string +} + +func NewDataStreamIndex(typ, dataset, namespace string) Index { + return Index{ + Index: fmt.Sprintf("%s-%s-%s", typ, dataset, namespace), + Type: typ, + Dataset: dataset, + Namespace: namespace, + } +} + +func (i Index) IsDataStream() bool { + return i.Type != "" && i.Dataset != "" && i.Namespace != "" +} diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 3843e9bc0f60..e19aa8da4e26 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -76,12 +76,12 @@ var resourceAttrsToPreserve = map[string]bool{ var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") type mappingModel interface { - encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error - encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, esIndex, *bytes.Buffer) error - encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) + encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error + encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string, elasticsearch.Index, *bytes.Buffer) error + encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) hashDataPoint(dataPoint) uint32 encodeDocument(objmodel.Document, *bytes.Buffer) error - encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) + encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -112,7 +112,7 @@ const ( attributeField = "attribute" ) -func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error { +func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingECS: @@ -129,7 +129,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str return document.Serialize(buf, m.dedot) } -func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { +func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document { var document objmodel.Document docTimeStamp := record.Timestamp() @@ -160,7 +160,7 @@ func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord, buf *bytes.Buf return nil } -func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { +func (m *encodeModel) encodeLogECSMode(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document { var document objmodel.Document // First, try to map resource-level attributes to ECS fields. @@ -224,7 +224,7 @@ func (m *encodeModel) hashDataPoint(dp dataPoint) uint32 { } } -func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { +func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) { dp0 := dataPoints[0] var document objmodel.Document encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve) @@ -245,15 +245,15 @@ func (m *encodeModel) encodeDataPointsECSMode(resource pcommon.Resource, dataPoi return document.DynamicTemplates(), err } -func addDataStreamAttributes(document *objmodel.Document, key string, idx esIndex) { - if idx.isDataStream() { +func addDataStreamAttributes(document *objmodel.Document, key string, idx elasticsearch.Index) { + if idx.IsDataStream() { document.AddString(key+"data_stream.type", idx.Type) document.AddString(key+"data_stream.dataset", idx.Dataset) document.AddString(key+"data_stream.namespace", idx.Namespace) } } -func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { +func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) { switch m.mode { case MappingOTel: return serializeMetrics(resource, resourceSchemaURL, scope, scopeSchemaURL, dataPoints, validationErrors, idx, buf) @@ -494,7 +494,7 @@ func (dp numberDataPoint) Metric() pmetric.Metric { var errInvalidNumberDataPoint = errors.New("invalid number data point") -func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) error { +func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) error { var document objmodel.Document switch m.mode { case MappingOTel: @@ -507,7 +507,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL st return err } -func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope, idx esIndex) objmodel.Document { +func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope, idx elasticsearch.Index) objmodel.Document { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. document.AddTimestamp("EndTimestamp", span.EndTimestamp()) @@ -527,7 +527,7 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra return document } -func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx esIndex, buf *bytes.Buffer) { +func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string, idx elasticsearch.Index, buf *bytes.Buffer) { if m.mode != MappingOTel { // Currently span events are stored separately only in OTel mapping mode. // In other modes, they are stored within the span document. @@ -536,7 +536,7 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU serializeSpanEvent(resource, resourceSchemaURL, scope, scopeSchemaURL, span, spanEvent, idx, buf) } -func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx esIndex) { +func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map, idx elasticsearch.Index) { key := "Attributes" if m.mode == MappingRaw { key = "" diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 34315b2f67b0..aef4263538c5 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) @@ -57,7 +58,7 @@ func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceSpans() var buf bytes.Buffer - err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", esIndex{}, &buf) + err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), "", td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope(), "", elasticsearch.Index{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedSpanBody, buf.String()) } @@ -68,7 +69,7 @@ func TestEncodeLog(t *testing.T) { td := mockResourceLogs() td.ScopeLogs().At(0).LogRecords().At(0).SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Date(2023, 4, 19, 3, 4, 5, 6, time.UTC))) var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), elasticsearch.Index{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedLogBody, buf.String()) }) @@ -77,7 +78,7 @@ func TestEncodeLog(t *testing.T) { model := &encodeModel{dedot: false} td := mockResourceLogs() var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), elasticsearch.Index{}, &buf) assert.NoError(t, err) assert.Equal(t, expectedLogBodyWithEmptyTimestamp, buf.String()) }) @@ -87,7 +88,7 @@ func TestEncodeLog(t *testing.T) { td := mockResourceLogs() td.Resource().Attributes().PutStr("foo.bar", "baz") var buf bytes.Buffer - err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), esIndex{}, &buf) + err := model.encodeLog(td.Resource(), td.SchemaUrl(), td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), td.ScopeLogs().At(0).SchemaUrl(), elasticsearch.Index{}, &buf) require.NoError(t, err) require.Equal(t, expectedLogBodyDeDottedWithEmptyTimestamp, buf.String()) }) @@ -124,7 +125,7 @@ func TestEncodeMetric(t *testing.T) { for _, dataPoints := range groupedDataPoints { var buf bytes.Buffer errors := make([]error, 0) - _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, esIndex{}, &buf) + _, err := model.encodeMetrics(rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), dataPoints, &errors, elasticsearch.Index{}, &buf) require.Empty(t, errors, err) require.NoError(t, err) docsBytes = append(docsBytes, buf.Bytes()) @@ -253,7 +254,7 @@ func TestEncodeAttributes(t *testing.T) { } doc := objmodel.Document{} - m.encodeAttributes(&doc, attributes, esIndex{}) + m.encodeAttributes(&doc, attributes, elasticsearch.Index{}) require.Equal(t, test.want(), doc) }) } @@ -350,7 +351,7 @@ func TestEncodeLogECSModeDuplication(t *testing.T) { dedot: true, } var buf bytes.Buffer - err = m.encodeLog(resource, "", record, scope, "", esIndex{}, &buf) + err = m.encodeLog(resource, "", record, scope, "", elasticsearch.Index{}, &buf) require.NoError(t, err) assert.Equal(t, want, buf.String()) @@ -424,7 +425,7 @@ func TestEncodeLogECSMode(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) + doc := m.encodeLogECSMode(resource, record, scope, elasticsearch.Index{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, `{ @@ -557,7 +558,7 @@ func TestEncodeLogECSModeAgentName(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) + doc := m.encodeLogECSMode(resource, record, scope, elasticsearch.Index{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf(`{ "@timestamp": "2024-03-13T23:50:59.123456789Z", @@ -611,7 +612,7 @@ func TestEncodeLogECSModeAgentVersion(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) + doc := m.encodeLogECSMode(resource, record, scope, elasticsearch.Index{}) require.NoError(t, doc.Serialize(&buf, false)) if test.expectedAgentVersion == "" { @@ -720,7 +721,7 @@ func TestEncodeLogECSModeHostOSType(t *testing.T) { var buf bytes.Buffer m := encodeModel{} logs.MarkReadOnly() - doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) + doc := m.encodeLogECSMode(resource, record, scope, elasticsearch.Index{}) require.NoError(t, doc.Serialize(&buf, false)) expectedJSON := `{"@timestamp":"2024-03-13T23:50:59.123456789Z", "agent.name":"otlp"` @@ -771,7 +772,7 @@ func TestEncodeLogECSModeTimestamps(t *testing.T) { var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogECSMode(resource, record, scope, esIndex{}) + doc := m.encodeLogECSMode(resource, record, scope, elasticsearch.Index{}) require.NoError(t, doc.Serialize(&buf, false)) require.JSONEq(t, fmt.Sprintf( @@ -1265,7 +1266,7 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo", "scalar") td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.bar", "baz") var buf bytes.Buffer - err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", esIndex{}, &buf) + err := model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", elasticsearch.Index{}, &buf) assert.NoError(t, err) encoded := buf.Bytes() @@ -1279,7 +1280,7 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { // If there is an attribute named "foo.value", then "foo" would be omitted rather than renamed. td.ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("foo.value", "foovalue") buf = bytes.Buffer{} - err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", esIndex{}, &buf) + err = model.encodeLog(td.Resource(), "", td.ScopeLogs().At(0).LogRecords().At(0), td.ScopeLogs().At(0).Scope(), "", elasticsearch.Index{}, &buf) assert.NoError(t, err) encoded = buf.Bytes() diff --git a/exporter/elasticsearchexporter/pdata_serializer.go b/exporter/elasticsearchexporter/pdata_serializer.go index 76eb2a988372..8c9834357974 100644 --- a/exporter/elasticsearchexporter/pdata_serializer.go +++ b/exporter/elasticsearchexporter/pdata_serializer.go @@ -21,7 +21,7 @@ import ( const tsLayout = "2006-01-02T15:04:05.000000000Z" -func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx esIndex, buf *bytes.Buffer) (map[string]string, error) { +func serializeMetrics(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, dataPoints []dataPoint, validationErrors *[]error, idx elasticsearch.Index, buf *bytes.Buffer) (map[string]string, error) { if len(dataPoints) == 0 { return nil, nil } @@ -94,7 +94,7 @@ func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErro return dynamicTemplates } -func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, idx esIndex, buf *bytes.Buffer) { +func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, idx elasticsearch.Index, buf *bytes.Buffer) { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. @@ -121,7 +121,7 @@ func serializeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, sco _ = v.OnObjectFinished() } -func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, idx esIndex, buf *bytes.Buffer) error { +func serializeSpan(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, span ptrace.Span, idx elasticsearch.Index, buf *bytes.Buffer) error { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. @@ -181,7 +181,7 @@ func serializeMap(m pcommon.Map, buf *bytes.Buffer) { writeMap(v, m, false) } -func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, idx esIndex, buf *bytes.Buffer) error { +func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, record plog.LogRecord, idx elasticsearch.Index, buf *bytes.Buffer) error { v := json.NewVisitor(buf) // Enable ExplicitRadixPoint such that 1.0 is encoded as 1.0 instead of 1. // This is required to generate the correct dynamic mapping in ES. @@ -215,8 +215,8 @@ func serializeLog(resource pcommon.Resource, resourceSchemaURL string, scope pco return nil } -func writeDataStream(v *json.Visitor, idx esIndex) { - if !idx.isDataStream() { +func writeDataStream(v *json.Visitor, idx elasticsearch.Index) { + if !idx.IsDataStream() { return } _ = v.OnKey("data_stream") diff --git a/exporter/elasticsearchexporter/pdata_serializer_test.go b/exporter/elasticsearchexporter/pdata_serializer_test.go index 26d514757fd4..673fb338d7b9 100644 --- a/exporter/elasticsearchexporter/pdata_serializer_test.go +++ b/exporter/elasticsearchexporter/pdata_serializer_test.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch" ) func TestSerializeLog(t *testing.T) { @@ -185,7 +187,7 @@ func TestSerializeLog(t *testing.T) { logs.MarkReadOnly() var buf bytes.Buffer - err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, esIndex{}, &buf) + err := serializeLog(resourceLogs.Resource(), "", scopeLogs.Scope(), "", record, elasticsearch.Index{}, &buf) if (err != nil) != tt.wantErr { t.Errorf("serializeLog() error = %v, wantErr %v", err, tt.wantErr) } @@ -220,7 +222,7 @@ func TestSerializeMetricsConflict(t *testing.T) { var validationErrors []error var buf bytes.Buffer - _, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, esIndex{}, &buf) + _, err := serializeMetrics(resourceMetrics.Resource(), "", scopeMetrics.Scope(), "", dataPoints, &validationErrors, elasticsearch.Index{}, &buf) if err != nil { t.Errorf("serializeMetrics() error = %v", err) }