Skip to content

Commit

Permalink
feat: es 查询支持自定义时间字段 --story=118404350 (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamcleren authored Jul 23, 2024
1 parent f33fbee commit a8d08cd
Show file tree
Hide file tree
Showing 11 changed files with 558 additions and 64 deletions.
22 changes: 15 additions & 7 deletions pkg/unify-query/metadata/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ const (
UUID = "query_uuid"
)

type TimeField struct {
Name string
Type string
Unit string
UnitRate int64
}

// Aggregate 聚合方法
type Aggregate struct {
Name string
Expand Down Expand Up @@ -72,13 +79,14 @@ type Query struct {
IsSingleMetric bool

// 兼容 InfluxDB 结构体
RetentionPolicy string // 存储 RP
DB string // 存储 DB
Measurement string // 存储 Measurement
Field string // 存储 Field
Timezone string // 存储 Timezone
Fields []string // 存储命中的 Field 列表,一般情况下为一个,当 Field 为模糊匹配时,解析为多个
Measurements []string // 存储命中的 Measurement 列表,一般情况下为一个,当 Measurement 为模糊匹配时,解析为多个
RetentionPolicy string // 存储 RP
DB string // 存储 DB
Measurement string // 存储 Measurement
Field string // 存储 Field
TimeField TimeField // 时间字段
Timezone string // 存储 Timezone
Fields []string // 存储命中的 Field 列表,一般情况下为一个,当 Field 为模糊匹配时,解析为多个
Measurements []string // 存储命中的 Measurement 列表,一般情况下为一个,当 Measurement 为模糊匹配时,解析为多个

// 用于 promql 查询
IsHasOr bool // 标记是否有 or 条件
Expand Down
3 changes: 3 additions & 0 deletions pkg/unify-query/query/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package query
import (
"fmt"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis"
)

Expand All @@ -36,6 +37,8 @@ type TsDBV2 struct {
// 补充检索的元信息
MetricName string `json:"metric_name"`
ExpandMetricNames []string `json:"expand_metric_names"`
// timeField
TimeField metadata.TimeField
}

func (z *TsDBV2) IsSplit() bool {
Expand Down
8 changes: 8 additions & 0 deletions pkg/unify-query/query/structured/query_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ func (q *QueryTs) ToPromExpr(
return result, nil
}

type TimeField struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Unit string `json:"unit,omitempty"`
}

type Query struct {
// DataSource 暂不使用
DataSource string `json:"data_source,omitempty" swaggerignore:"true"`
Expand Down Expand Up @@ -555,6 +561,7 @@ func (q *Query) BuildMetadataQuery(
span.Set("tsdb-vm-rt", vmRt)
span.Set("tsdb-db", db)
span.Set("tsdb-measurements", fmt.Sprintf("%+v", measurements))
span.Set("tsdb-time-field", tsDB.TimeField)

if q.Offset != "" {
dTmp, err := model.ParseDuration(q.Offset)
Expand Down Expand Up @@ -720,6 +727,7 @@ func (q *Query) BuildMetadataQuery(
query.Timezone = timezone
query.Fields = fields
query.Measurements = measurements
query.TimeField = tsDB.TimeField

query.Condition = whereList.String()
query.VmCondition, query.VmConditionNum = allCondition.VMString(vmRt, vmMetric, q.IsRegexp)
Expand Down
5 changes: 5 additions & 0 deletions pkg/unify-query/query/structured/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (s *SpaceFilter) NewTsDBs(spaceTable *routerInfluxdb.SpaceResultTable, fiel
VmRt: rtDetail.VmRt,
StorageName: rtDetail.StorageName,
MetricName: fieldName,
TimeField: metadata.TimeField{
Name: rtDetail.Options.TimeField.Name,
Type: rtDetail.Options.TimeField.Type,
Unit: rtDetail.Options.TimeField.Unit,
},
}
// 字段为空时,需要返回结果表的信息,表示无需过滤字段过滤
// bklog 或者 bkapm 则不判断 field 是否存在
Expand Down
120 changes: 93 additions & 27 deletions pkg/unify-query/tsdb/elasticsearch/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (
FieldValue = "_value"
FieldTime = "_time"

Timestamp = "dtEventTimeStamp"
TimeFormat = "epoch_millis"
DefaultTimeFieldName = "dtEventTimeStamp"
DefaultTimeFieldType = TimeFieldTypeTime
DefaultTimeFieldUnit = Millisecond

Type = "type"
Properties = "properties"
Expand All @@ -59,6 +60,23 @@ const (
Date = "date"
)

const (
TimeFieldTypeTime = "date"
TimeFieldTypeInt = "long"
)

const (
Second = "second"
Millisecond = "millisecond"
Microsecond = "microsecond"
Nanosecond = "nanosecond"

EpochSecond = "epoch_second"
EpochMillis = "epoch_millis"
EpochMicroseconds = "epoch_microseconds"
EpochNanoseconds = "epoch_nanoseconds"
)

type TimeSeriesResult struct {
TimeSeriesMap map[string]*prompb.TimeSeries
Error error
Expand Down Expand Up @@ -126,7 +144,8 @@ type aggInfoList []any
type FormatFactory struct {
ctx context.Context

valueKey string
valueField string
timeField metadata.TimeField

toEs func(k string) string
toProm func(k string) string
Expand Down Expand Up @@ -162,11 +181,35 @@ func (f *FormatFactory) WithIsReference(isReference bool) *FormatFactory {
return f
}

func (f *FormatFactory) WithQuery(valueKey string, start, end int64, timezone string, from, size int) *FormatFactory {
f.valueKey = valueKey
func (f *FormatFactory) WithQuery(valueKey string, timeField metadata.TimeField, start, end int64, from, size int) *FormatFactory {
if timeField.Name == "" {
timeField.Name = DefaultTimeFieldName
}
if timeField.Type == "" {
timeField.Type = DefaultTimeFieldType
}
if timeField.Unit == "" {
timeField.Unit = DefaultTimeFieldUnit
}

timeField.UnitRate = int64(1)
switch timeField.Unit {
case Millisecond:
timeField.UnitRate = 1e3
case Microsecond:
timeField.UnitRate = 1e6
case Nanosecond:
timeField.UnitRate = 1e9
default:
timeField.UnitRate = 1
}

// 根据时间单位把考试时间和结束实践转换为对应值
f.start = start
f.end = end
f.timezone = timezone

f.valueField = valueKey
f.timeField = timeField
f.from = from
f.size = size
return f
Expand All @@ -188,6 +231,24 @@ func (f *FormatFactory) WithMapping(mapping map[string]any) *FormatFactory {
return f
}

func (f *FormatFactory) RangeQuery() (elastic.Query, error) {
var err error
fieldName := f.timeField.Name
fieldType := f.timeField.Type
unitRate := f.timeField.UnitRate

var query elastic.Query
switch fieldType {
case TimeFieldTypeInt:
query = elastic.NewRangeQuery(fieldName).Gte(f.start * unitRate).Lt(f.end * unitRate)
case TimeFieldTypeTime:
query = elastic.NewRangeQuery(fieldName).Gte(f.start).Lt(f.end).Format(EpochSecond)
default:
err = fmt.Errorf("time field type is error %s", fieldType)
}
return query, err
}

func (f *FormatFactory) timeAgg(name string, window, timezone string) {
f.aggInfoList = append(
f.aggInfoList, TimeAgg{
Expand Down Expand Up @@ -301,7 +362,7 @@ func (f *FormatFactory) AggDataFormat(data elastic.Aggregations) (map[string]*pr
}

if im.timestamp == 0 {
im.timestamp = f.start
im.timestamp = f.start * 1e3
}

timeSeriesMap[seriesKey].Labels = tsLabels
Expand Down Expand Up @@ -332,7 +393,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
switch info.FuncType {
case Min:
curName := FieldValue
curAgg := elastic.NewMinAggregation().Field(f.valueKey)
curAgg := elastic.NewMinAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -341,7 +402,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
name = curName
case Max:
curName := FieldValue
curAgg := elastic.NewMaxAggregation().Field(f.valueKey)
curAgg := elastic.NewMaxAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -350,7 +411,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
name = curName
case Avg:
curName := FieldValue
curAgg := elastic.NewAvgAggregation().Field(f.valueKey)
curAgg := elastic.NewAvgAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -359,7 +420,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
name = curName
case Sum:
curName := FieldValue
curAgg := elastic.NewSumAggregation().Field(f.valueKey)
curAgg := elastic.NewSumAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -368,7 +429,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
name = curName
case Count:
curName := FieldValue
curAgg := elastic.NewValueCountAggregation().Field(f.valueKey)
curAgg := elastic.NewValueCountAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -377,7 +438,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
name = curName
case Cardinality:
curName := FieldValue
curAgg := elastic.NewCardinalityAggregation().Field(f.valueKey)
curAgg := elastic.NewCardinalityAggregation().Field(f.valueField)
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand All @@ -403,7 +464,7 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
percents = append(percents, percent)
}

curAgg := elastic.NewPercentilesAggregation().Field(f.valueKey).Percentiles(percents...)
curAgg := elastic.NewPercentilesAggregation().Field(f.valueField).Percentiles(percents...)
curName := FieldValue
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
Expand All @@ -417,9 +478,14 @@ func (f *FormatFactory) Agg() (name string, agg elastic.Aggregation, err error)
}
case TimeAgg:
curName := info.Name

curAgg := elastic.NewDateHistogramAggregation().
Field(Timestamp).FixedInterval(info.Window).TimeZone(info.Timezone).
MinDocCount(0).ExtendedBounds(f.start, f.end)
Field(f.timeField.Name).FixedInterval(info.Window).MinDocCount(0).
ExtendedBounds(f.start*f.timeField.UnitRate, f.end*f.timeField.UnitRate)
// https://github.com/elastic/elasticsearch/issues/42270 非date类型不支持timezone, time format也无效
if f.timeField.Type == TimeFieldTypeTime {
curAgg = curAgg.TimeZone(info.Timezone)
}
if agg != nil {
curAgg = curAgg.SubAggregation(name, agg)
}
Expand Down Expand Up @@ -459,14 +525,14 @@ func (f *FormatFactory) EsAgg(aggregates metadata.Aggregates) (string, elastic.A
for _, am := range aggregates {
switch am.Name {
case DateHistogram:
f.timeAgg(Timestamp, shortDur(am.Window), f.timezone)
f.timeAgg(f.timeField.Name, shortDur(am.Window), am.TimeZone)
case Max, Min, Avg, Sum, Count, Cardinality, Percentiles:
f.valueAgg(FieldValue, am.Name, am.Args...)
f.nestedAgg(f.valueKey)
f.nestedAgg(f.valueField)

if am.Window > 0 && !am.Without {
// 增加时间函数
f.timeAgg(Timestamp, shortDur(am.Window), am.TimeZone)
f.timeAgg(f.timeField.Name, shortDur(am.Window), am.TimeZone)
}

for idx, dim := range am.Dimensions {
Expand All @@ -491,9 +557,9 @@ func (f *FormatFactory) Order() map[string]bool {
order := make(map[string]bool)
for name, asc := range f.orders {
if name == FieldValue {
name = f.valueKey
name = f.valueField
} else if name == FieldTime {
name = Timestamp
name = f.timeField.Name
}

if _, ok := f.mapping[name]; ok {
Expand Down Expand Up @@ -612,7 +678,7 @@ func (f *FormatFactory) Sample() (prompb.Sample, error) {
return sample, nil
}

if value, ok = f.data[f.valueKey]; ok {
if value, ok = f.data[f.valueField]; ok {
switch value.(type) {
case float64:
sample.Value = value.(float64)
Expand All @@ -626,13 +692,13 @@ func (f *FormatFactory) Sample() (prompb.Sample, error) {
return sample, err
}
default:
return sample, fmt.Errorf("value key %s type is error: %T, %v", f.valueKey, value, value)
return sample, fmt.Errorf("value key %s type is error: %T, %v", f.valueField, value, value)
}
} else {
sample.Value = 0
}

if timestamp, ok = f.data[Timestamp]; ok {
if timestamp, ok = f.data[f.timeField.Name]; ok {
switch timestamp.(type) {
case int64:
sample.Timestamp = timestamp.(int64) * 1e3
Expand All @@ -644,7 +710,7 @@ func (f *FormatFactory) Sample() (prompb.Sample, error) {
return sample, fmt.Errorf("timestamp key type is error: %T, %v", timestamp, timestamp)
}
} else {
return sample, fmt.Errorf("timestamp is empty %s", Timestamp)
return sample, fmt.Errorf("timestamp is empty %s", f.timeField.Name)
}

return sample, nil
Expand All @@ -655,10 +721,10 @@ func (f *FormatFactory) Labels() (lbs *prompb.Labels, err error) {
for k := range f.data {
// 只有 promEngine 查询的场景需要跳过该字段
if !f.isReference {
if k == f.valueKey {
if k == f.valueField {
continue
}
if k == Timestamp {
if k == f.timeField.Name {
continue
}
}
Expand Down
Loading

0 comments on commit a8d08cd

Please sign in to comment.