Skip to content

Commit

Permalink
perf: 优化 ES index 生成逻辑 (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Dec 11, 2023
1 parent ed49d07 commit ef16362
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 74 deletions.
2 changes: 1 addition & 1 deletion pkg/transfer/elasticsearch/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *BulkHandlerSuite) TestFormatTime() {

s.mockBulkWriter.EXPECT().Write(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, index string, records elasticsearch.Records) (*elasticsearch.Response, error) {
s.Len(records, 1)
record := records[0].Document.(map[string]interface{})
record := records[0].Document
s.Equal(conv.String(ts), record["source_time"])
s.Equal(conv.String(ts), record["time"])

Expand Down
4 changes: 2 additions & 2 deletions pkg/transfer/elasticsearch/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func (r Records) AsBody() (io.Reader, error) {
// ESRecord
type Record struct {
Meta map[string]interface{}
Document interface{}
Document map[string]interface{}
}

// NewRecord
func NewRecord(document interface{}) *Record {
func NewRecord(document map[string]interface{}) *Record {
return &Record{
Meta: make(map[string]interface{}),
Document: document,
Expand Down
63 changes: 10 additions & 53 deletions pkg/transfer/elasticsearch/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
package elasticsearch

import (
"bytes"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
Expand All @@ -30,25 +27,6 @@ func FixedIndexRender(name string) IndexRenderFn {
}
}

// TemplateRender
func TemplateRender(tmpl *template.Template, context interface{}) IndexRenderFn {
return func(record *Record) (index string, err error) {
buf := bytes.NewBuffer(nil)
defer utils.RecoverError(func(e error) {
err = e
logging.Errorf("render index with context %#v error %v", context, e)
})
err = tmpl.Execute(buf, struct {
Record *Record
Context interface{}
}{
Record: record,
Context: context,
})
return buf.String(), err
}
}

// ConfigTemplateRender
func ConfigTemplateRender(config *config.ElasticSearchMetaClusterInfo) (IndexRenderFn, error) {
storageConf := utils.NewMapHelper(config.StorageConfig)
Expand All @@ -58,33 +36,17 @@ func ConfigTemplateRender(config *config.ElasticSearchMetaClusterInfo) (IndexRen
field := storageConf.GetOrDefault("index_datetime_field", "time").(string)
timezone := int(storageConf.GetOrDefault("index_datetime_timezone", 0.0).(float64))
format := storageConf.GetOrDefault("index_datetime_format", "20060102").(string)
timeTemplate := storageConf.GetOrDefault(
"index_datetime_template",
`{{ format_time ( index .Record.Document ( index .Context "field" ) ) ( index .Context "format" ) ( index .Context "timezone" ) }}`,
).(string)
stringTemplate := storageConf.GetOrDefault("index_template", strings.Join(
[]string{timeTemplate, index}, separator,
)).(string)

tmpl, err := template.New(index).Funcs(template.FuncMap{
"format_time": func(v interface{}, format string, timezone *time.Location) string {
tm, err := utils.ParseTime(v)
if err != nil {
logging.Warnf("parse time %v error %v, use local time instead", v, err)
tm = time.Now()
}
return tm.In(timezone).Format(format)
},
}).Parse(stringTemplate)
if err != nil {
return nil, err
}
return func(record *Record) (string, error) {
tm, err := utils.ParseTime(record.Document[field])
if err != nil {
logging.Warnf("parse time %v error %v, use local time instead", tm, err)
tm = time.Now()
}

return TemplateRender(tmpl, map[string]interface{}{
"field": field,
"format": format,
"timezone": utils.ParseFixedTimeZone(timezone),
}), nil
s := tm.In(utils.ParseFixedTimeZone(timezone)).Format(format) + separator + index
return s, nil
}, nil
}

// TimeBasedIndexAliasRender
Expand All @@ -98,12 +60,7 @@ func TimeBasedIndexAliasRender(config *config.ElasticSearchMetaClusterInfo) (Ind
}

return func(record *Record) (s string, e error) {
values, ok := record.Document.(map[string]interface{})
if !ok {
return "", errors.Wrapf(define.ErrType, "document type %T", record.Document)
}

value, ok := values[field]
value, ok := record.Document[field]
if !ok {
return "", errors.Wrapf(define.ErrKey, "document field %s not found", field)
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/transfer/elasticsearch/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,6 @@ func (s *IndexRenderSuite) TestConfigTemplateRender() {
s.Equal("20191019_test", index)
}

// TestConfigTemplateRenderByCustomTemplate
func (s *IndexRenderSuite) TestConfigTemplateRenderByCustomTemplate() {
conf := s.ShipperConfig.AsElasticSearchCluster()
conf.SetIndex("test")

storageConf := utils.NewMapHelper(conf.StorageConfig)
storageConf.Set("index_template", `{{ index .Record.Document "Index" }}`)

fn, err := elasticsearch.ConfigTemplateRender(conf)
s.NoError(err)

index, err := fn(elasticsearch.NewRecord(map[string]interface{}{
"Index": "banana",
}))
s.NoError(err)
s.Equal("banana", index)
}

// TestConfigTemplateRenderByConfig
func (s *IndexRenderSuite) TestConfigTemplateRenderByConfig() {
conf := config.MetaClusterInfo{
Expand Down

0 comments on commit ef16362

Please sign in to comment.