Skip to content

Commit

Permalink
feat: es 查询优化 --story=118743765 (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamcleren authored Jul 31, 2024
1 parent 4f6c303 commit 214fed0
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 47 deletions.
3 changes: 1 addition & 2 deletions pkg/bk-monitor-worker/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
"github.com/spf13/viper"
"golang.org/x/exp/slices"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
)

var (
Expand Down
3 changes: 1 addition & 2 deletions pkg/bk-monitor-worker/internal/alarm/cmdbcache/cmdb_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import (

"github.com/pkg/errors"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/alarm/redis"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/cmdb"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
)

// CmdbResourceType cmdb监听资源类型
Expand Down
3 changes: 1 addition & 2 deletions pkg/bk-monitor-worker/internal/alarm/cmdbcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/alarm/redis"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/cmdb"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
)

// hostFields 主机字段
Expand Down
1 change: 1 addition & 0 deletions pkg/unify-query/metadata/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type Query struct {
From int
Size int
Orders Orders
NeedAddTime bool
}

type Orders map[string]bool
Expand Down
4 changes: 3 additions & 1 deletion pkg/unify-query/query/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type TsDBV2 struct {
MetricName string `json:"metric_name"`
ExpandMetricNames []string `json:"expand_metric_names"`
// timeField
TimeField metadata.TimeField
TimeField metadata.TimeField `json:"time_field"`
// NeedAddTime
NeedAddTime bool `json:"need_add_time"`
}

func (z *TsDBV2) IsSplit() bool {
Expand Down
3 changes: 3 additions & 0 deletions pkg/unify-query/query/structured/query_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func (q *Query) BuildMetadataQuery(
span.Set("tsdb-db", db)
span.Set("tsdb-measurements", fmt.Sprintf("%+v", measurements))
span.Set("tsdb-time-field", tsDB.TimeField)
span.Set("tsdb-need-add-time", tsDB.NeedAddTime)

if q.Offset != "" {
dTmp, err := model.ParseDuration(q.Offset)
Expand Down Expand Up @@ -728,6 +729,7 @@ func (q *Query) BuildMetadataQuery(
query.Fields = fields
query.Measurements = measurements
query.TimeField = tsDB.TimeField
query.NeedAddTime = tsDB.NeedAddTime

query.Condition = whereList.String()
query.VmCondition, query.VmConditionNum = allCondition.VMString(vmRt, vmMetric, q.IsRegexp)
Expand Down Expand Up @@ -784,6 +786,7 @@ func (q *Query) BuildMetadataQuery(
span.Set("query-cluster-name", query.ClusterName)
span.Set("query-tag-keys", query.TagsKey)
span.Set("query-vm-rt", query.VmRt)
span.Set("query-need-add-time", query.NeedAddTime)

return query, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/unify-query/query/structured/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *SpaceFilter) NewTsDBs(spaceTable *routerInfluxdb.SpaceResultTable, fiel
Type: rtDetail.Options.TimeField.Type,
Unit: rtDetail.Options.TimeField.Unit,
},
NeedAddTime: rtDetail.Options.NeedAddTime,
}
// 字段为空时,需要返回结果表的信息,表示无需过滤字段过滤
// bklog 或者 bkapm 则不判断 field 是否存在
Expand Down
7 changes: 5 additions & 2 deletions pkg/unify-query/tsdb/elasticsearch/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ func (f *FormatFactory) WithOrders(orders map[string]bool) *FormatFactory {
return f
}

func (f *FormatFactory) WithMapping(mapping map[string]any) *FormatFactory {
mapProperties("", mapping, f.mapping)
// WithMappings 合并 mapping,后面的合并前面的
func (f *FormatFactory) WithMappings(mappings ...map[string]any) *FormatFactory {
for _, mapping := range mappings {
mapProperties("", mapping, f.mapping)
}
return f
}

Expand Down
120 changes: 87 additions & 33 deletions pkg/unify-query/tsdb/elasticsearch/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type InstanceOption struct {
}

type queryOption struct {
index string
indexes []string
// 单位是 s
start int64
end int64
Expand Down Expand Up @@ -154,7 +154,7 @@ func NewInstance(ctx context.Context, opt *InstanceOption) (*Instance, error) {
return ins, nil
}

func (i *Instance) getMapping(ctx context.Context, alias string) (map[string]interface{}, error) {
func (i *Instance) getMappings(ctx context.Context, aliases []string) ([]map[string]any, error) {
var (
err error
)
Expand All @@ -169,32 +169,26 @@ func (i *Instance) getMapping(ctx context.Context, alias string) (map[string]int
span.End(&err)
}()

span.Set("alias", alias)
span.Set("alias", aliases)
mappingMap, err := i.client.GetMapping().Index(aliases...).Do(ctx)

indexs, err := i.getIndexes(ctx, alias)
if err != nil {
return nil, err
indexes := make([]string, 0, len(mappingMap))
for index := range mappingMap {
indexes = append(indexes, index)
}
// 按照正序排列,最新的覆盖老的
sort.Strings(indexes)
span.Set("indexes", indexes)

span.Set("indexs", indexs)

for _, index := range indexs {
mappings, err := i.client.GetMapping().Index(index).Do(ctx)
if err != nil {
return nil, err
}

if mapping, ok := mappings[index].(map[string]any)["mappings"].(map[string]any); ok {
span.Set("index", index)
span.Set("mapping", mapping)

return mapping, nil
} else {
return nil, fmt.Errorf("get mappings error with index: %s", index)
mappings := make([]map[string]any, 0, len(mappingMap))
for _, index := range indexes {
if mapping, ok := mappingMap[index].(map[string]any)["mappings"].(map[string]any); ok {
log.Infof(ctx, "elasticsearch-get-mapping: es [%s] mapping %+v", index, mapping)
mappings = append(mappings, mapping)
}
}

return nil, nil
return mappings, nil
}

func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFactory) (*elastic.SearchResult, error) {
Expand Down Expand Up @@ -278,14 +272,13 @@ func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFac
bodyJson, _ := json.Marshal(body)
bodyString := string(bodyJson)

span.Set("query-index", qo.index)
span.Set("query-body", bodyString)
span.Set("query-indexes", qo.indexes)

log.Infof(ctx, "es query index: %s", qo.index)
log.Infof(ctx, "es query body: %s", bodyString)
log.Infof(ctx, "elasticsearch-query indexes: %s", qo.indexes)
log.Infof(ctx, "elasticsearch-query body: %s", bodyString)

startAnaylize := time.Now()
search := i.client.Search().Index(qo.index).SearchSource(source)
search := i.client.Search().Index(qo.indexes...).SearchSource(source)

res, err := search.Do(ctx)

Expand Down Expand Up @@ -401,6 +394,9 @@ func (i *Instance) getIndexes(ctx context.Context, aliases ...string) ([]string,
indexes = append(indexes, idx)
}

sort.Slice(indexes, func(i, j int) bool {
return indexes[i] > indexes[j]
})
return indexes, nil
}

Expand Down Expand Up @@ -463,6 +459,57 @@ func (i *Instance) mergeTimeSeries(rets chan *TimeSeriesResult) (*prompb.QueryRe
return qr, nil
}

func (i *Instance) getAlias(ctx context.Context, db string, needAddTime bool, start, end time.Time, timezone string) []string {
var (
alias []string
_, span = trace.NewSpan(ctx, "get-alias")
err error
)
defer span.End(&err)

span.Set("need-add-time", needAddTime)

if needAddTime {
loc, err := time.LoadLocation(timezone)
if err != nil {
loc = time.UTC
}
start = start.In(loc)
end = end.In(loc)

left := end.Unix() - start.Unix()
// 超过 6 个月

span.Set("timezone", loc.String())
span.Set("start", start.String())
span.Set("end", end.String())
span.Set("left", left)

addMonth := 0
addDay := 1
dateFormat := "20060102"
if left > int64(time.Hour.Seconds()*24*14) {
addDay = 0
addMonth = 1
dateFormat = "200601"
halfYear := time.Hour * 24 * 30 * 6
if left > int64(halfYear.Seconds()) {
start = end.Add(halfYear * -1)
}
}

for d := start; !d.After(end); d = d.AddDate(0, addMonth, addDay) {
alias = append(alias, fmt.Sprintf("%s_%s*", db, d.Format(dateFormat)))
}
} else {
alias = append(alias, db)
}

span.Set("alias_num", len(alias))

return alias
}

// QueryRaw 给 PromEngine 提供查询接口
func (i *Instance) QueryRaw(
ctx context.Context,
Expand Down Expand Up @@ -494,14 +541,21 @@ func (i *Instance) QueryRaw(
close(rets)
}()

aliases := i.getAlias(ctx, query.DB, query.NeedAddTime, start, end, query.Timezone)

qo := &queryOption{
index: query.DB,
start: start.Unix(),
end: end.Unix(),
query: query,
indexes: aliases,
start: start.Unix(),
end: end.Unix(),
query: query,
}

mappings, err1 := i.getMappings(ctx, qo.indexes)
// index 不存在,mappings 获取异常直接返回空
if len(mappings) == 0 {
return
}

mapping, err1 := i.getMapping(ctx, qo.index)
if err1 != nil {
rets <- &TimeSeriesResult{
Error: err1,
Expand All @@ -518,7 +572,7 @@ func (i *Instance) QueryRaw(
fact := NewFormatFactory(ctx).
WithIsReference(metadata.GetQueryParams(ctx).IsReference).
WithQuery(query.Field, query.TimeField, qo.start, qo.end, query.From, size).
WithMapping(mapping).
WithMappings(mappings...).
WithOrders(query.Orders).
WithTransform(i.toEs, i.toProm)

Expand Down
63 changes: 63 additions & 0 deletions pkg/unify-query/tsdb/elasticsearch/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
Expand Down Expand Up @@ -343,3 +344,65 @@ func TestInstance_queryReference(t *testing.T) {
})
}
}

func TestInstance_getAlias(t *testing.T) {
metadata.InitMetadata()
ctx := metadata.InitHashID(context.Background())
inst, err := NewInstance(ctx, &InstanceOption{
Timeout: time.Minute,
})
if err != nil {
log.Panicf(ctx, err.Error())
}

for name, c := range map[string]struct {
start time.Time
end time.Time
timezone string

expected []string
}{
"3d with UTC": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 3, 20, 0, 0, 0, time.UTC),
expected: []string{"db_test_20240101*", "db_test_20240102*", "db_test_20240103*"},
},
"3d with Asia/ShangHai": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 3, 20, 0, 0, 0, time.UTC),
timezone: "Asia/ShangHai",
expected: []string{"db_test_20240102*", "db_test_20240103*", "db_test_20240104*"},
},
"14d with Asia/ShangHai": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 15, 20, 0, 0, 0, time.UTC),
timezone: "Asia/ShangHai",
expected: []string{"db_test_20240102*", "db_test_20240103*", "db_test_20240104*", "db_test_20240105*", "db_test_20240106*", "db_test_20240107*", "db_test_20240108*", "db_test_20240109*", "db_test_20240110*", "db_test_20240111*", "db_test_20240112*", "db_test_20240113*", "db_test_20240114*", "db_test_20240115*", "db_test_20240116*"},
},
"15d with Asia/ShangHai": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 16, 20, 0, 0, 0, time.UTC),
timezone: "Asia/ShangHai",
expected: []string{"db_test_202401*"},
},
"6m with Asia/ShangHai": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 7, 1, 20, 0, 0, 0, time.UTC),
timezone: "Asia/ShangHai",
expected: []string{"db_test_202401*", "db_test_202402*", "db_test_202403*", "db_test_202404*", "db_test_202405*", "db_test_202406*"},
},
"7m with Asia/ShangHai": {
start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC),
end: time.Date(2024, 8, 1, 20, 0, 0, 0, time.UTC),
timezone: "Asia/ShangHai",
expected: []string{"db_test_202402*", "db_test_202403*", "db_test_202404*", "db_test_202405*", "db_test_202406*", "db_test_202407*"},
},
} {
t.Run(name, func(t *testing.T) {
ctx = metadata.InitHashID(ctx)
actual := inst.getAlias(ctx, "db_test", true, c.start, c.end, c.timezone)

assert.Equal(t, c.expected, actual)
})
}
}
3 changes: 3 additions & 0 deletions pkg/utils/router/influxdb/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ type ResultTableDetail struct {
TagsKey []string `json:"tags_key"`
DataId int64 `json:"bk_data_id"`
Options struct {
// 自定义时间聚合字段
TimeField struct {
Name string `json:"name"`
Type string `json:"type"`
Unit string `json:"unit"`
} `json:"time_field"`
// db 是否拼接时间格式
NeedAddTime bool `json:"need_add_time"`
} `json:"options"`
}

Expand Down
Loading

0 comments on commit 214fed0

Please sign in to comment.