diff --git a/pkg/bk-monitor-worker/config/config.go b/pkg/bk-monitor-worker/config/config.go index 2dce15a3a..f6a529bc2 100644 --- a/pkg/bk-monitor-worker/config/config.go +++ b/pkg/bk-monitor-worker/config/config.go @@ -19,9 +19,8 @@ import ( "github.com/spf13/viper" "golang.org/x/exp/slices" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) var ( diff --git a/pkg/bk-monitor-worker/internal/alarm/cmdbcache/cmdb_watch.go b/pkg/bk-monitor-worker/internal/alarm/cmdbcache/cmdb_watch.go index 9c8a2a8af..aee3f0f26 100644 --- a/pkg/bk-monitor-worker/internal/alarm/cmdbcache/cmdb_watch.go +++ b/pkg/bk-monitor-worker/internal/alarm/cmdbcache/cmdb_watch.go @@ -32,11 +32,10 @@ import ( "github.com/pkg/errors" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/alarm/redis" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/cmdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) // CmdbResourceType cmdb监听资源类型 diff --git a/pkg/bk-monitor-worker/internal/alarm/cmdbcache/host.go b/pkg/bk-monitor-worker/internal/alarm/cmdbcache/host.go index 0aabfbe8a..672d8ae29 100644 --- a/pkg/bk-monitor-worker/internal/alarm/cmdbcache/host.go +++ b/pkg/bk-monitor-worker/internal/alarm/cmdbcache/host.go @@ -35,11 +35,10 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/alarm/redis" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/cmdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) // hostFields 主机字段 diff --git a/pkg/unify-query/metadata/struct.go b/pkg/unify-query/metadata/struct.go index 260f3e99e..206fe2b2a 100644 --- a/pkg/unify-query/metadata/struct.go +++ b/pkg/unify-query/metadata/struct.go @@ -116,6 +116,7 @@ type Query struct { From int Size int Orders Orders + NeedAddTime bool } type Orders map[string]bool diff --git a/pkg/unify-query/query/interfaces.go b/pkg/unify-query/query/interfaces.go index a7759c520..786e98a64 100644 --- a/pkg/unify-query/query/interfaces.go +++ b/pkg/unify-query/query/interfaces.go @@ -38,7 +38,9 @@ type TsDBV2 struct { MetricName string `json:"metric_name"` ExpandMetricNames []string `json:"expand_metric_names"` // timeField - TimeField metadata.TimeField + TimeField metadata.TimeField `json:"time_field"` + // NeedAddTime + NeedAddTime bool `json:"need_add_time"` } func (z *TsDBV2) IsSplit() bool { diff --git a/pkg/unify-query/query/structured/query_ts.go b/pkg/unify-query/query/structured/query_ts.go index 8c360e926..1bf1b9bbf 100644 --- a/pkg/unify-query/query/structured/query_ts.go +++ b/pkg/unify-query/query/structured/query_ts.go @@ -562,6 +562,7 @@ func (q *Query) BuildMetadataQuery( span.Set("tsdb-db", db) span.Set("tsdb-measurements", fmt.Sprintf("%+v", measurements)) span.Set("tsdb-time-field", tsDB.TimeField) + span.Set("tsdb-need-add-time", tsDB.NeedAddTime) if q.Offset != "" { dTmp, err := model.ParseDuration(q.Offset) @@ -728,6 +729,7 @@ func (q *Query) BuildMetadataQuery( query.Fields = fields query.Measurements = measurements query.TimeField = tsDB.TimeField + query.NeedAddTime = tsDB.NeedAddTime query.Condition = whereList.String() query.VmCondition, query.VmConditionNum = allCondition.VMString(vmRt, vmMetric, q.IsRegexp) @@ -784,6 +786,7 @@ func (q *Query) BuildMetadataQuery( span.Set("query-cluster-name", query.ClusterName) span.Set("query-tag-keys", query.TagsKey) span.Set("query-vm-rt", query.VmRt) + span.Set("query-need-add-time", query.NeedAddTime) return query, nil } diff --git a/pkg/unify-query/query/structured/space.go b/pkg/unify-query/query/structured/space.go index 6a2d920a0..c5caad70e 100644 --- a/pkg/unify-query/query/structured/space.go +++ b/pkg/unify-query/query/structured/space.go @@ -147,6 +147,7 @@ func (s *SpaceFilter) NewTsDBs(spaceTable *routerInfluxdb.SpaceResultTable, fiel Type: rtDetail.Options.TimeField.Type, Unit: rtDetail.Options.TimeField.Unit, }, + NeedAddTime: rtDetail.Options.NeedAddTime, } // 字段为空时,需要返回结果表的信息,表示无需过滤字段过滤 // bklog 或者 bkapm 则不判断 field 是否存在 diff --git a/pkg/unify-query/tsdb/elasticsearch/format.go b/pkg/unify-query/tsdb/elasticsearch/format.go index b9359e713..5a5b6bf22 100644 --- a/pkg/unify-query/tsdb/elasticsearch/format.go +++ b/pkg/unify-query/tsdb/elasticsearch/format.go @@ -226,8 +226,11 @@ func (f *FormatFactory) WithOrders(orders map[string]bool) *FormatFactory { return f } -func (f *FormatFactory) WithMapping(mapping map[string]any) *FormatFactory { - mapProperties("", mapping, f.mapping) +// WithMappings 合并 mapping,后面的合并前面的 +func (f *FormatFactory) WithMappings(mappings ...map[string]any) *FormatFactory { + for _, mapping := range mappings { + mapProperties("", mapping, f.mapping) + } return f } diff --git a/pkg/unify-query/tsdb/elasticsearch/instance.go b/pkg/unify-query/tsdb/elasticsearch/instance.go index 5fb10325c..342303f53 100644 --- a/pkg/unify-query/tsdb/elasticsearch/instance.go +++ b/pkg/unify-query/tsdb/elasticsearch/instance.go @@ -95,7 +95,7 @@ type InstanceOption struct { } type queryOption struct { - index string + indexes []string // 单位是 s start int64 end int64 @@ -154,7 +154,7 @@ func NewInstance(ctx context.Context, opt *InstanceOption) (*Instance, error) { return ins, nil } -func (i *Instance) getMapping(ctx context.Context, alias string) (map[string]interface{}, error) { +func (i *Instance) getMappings(ctx context.Context, aliases []string) ([]map[string]any, error) { var ( err error ) @@ -169,32 +169,26 @@ func (i *Instance) getMapping(ctx context.Context, alias string) (map[string]int span.End(&err) }() - span.Set("alias", alias) + span.Set("alias", aliases) + mappingMap, err := i.client.GetMapping().Index(aliases...).Do(ctx) - indexs, err := i.getIndexes(ctx, alias) - if err != nil { - return nil, err + indexes := make([]string, 0, len(mappingMap)) + for index := range mappingMap { + indexes = append(indexes, index) } + // 按照正序排列,最新的覆盖老的 + sort.Strings(indexes) + span.Set("indexes", indexes) - span.Set("indexs", indexs) - - for _, index := range indexs { - mappings, err := i.client.GetMapping().Index(index).Do(ctx) - if err != nil { - return nil, err - } - - if mapping, ok := mappings[index].(map[string]any)["mappings"].(map[string]any); ok { - span.Set("index", index) - span.Set("mapping", mapping) - - return mapping, nil - } else { - return nil, fmt.Errorf("get mappings error with index: %s", index) + mappings := make([]map[string]any, 0, len(mappingMap)) + for _, index := range indexes { + if mapping, ok := mappingMap[index].(map[string]any)["mappings"].(map[string]any); ok { + log.Infof(ctx, "elasticsearch-get-mapping: es [%s] mapping %+v", index, mapping) + mappings = append(mappings, mapping) } } - return nil, nil + return mappings, nil } func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFactory) (*elastic.SearchResult, error) { @@ -278,14 +272,13 @@ func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFac bodyJson, _ := json.Marshal(body) bodyString := string(bodyJson) - span.Set("query-index", qo.index) - span.Set("query-body", bodyString) + span.Set("query-indexes", qo.indexes) - log.Infof(ctx, "es query index: %s", qo.index) - log.Infof(ctx, "es query body: %s", bodyString) + log.Infof(ctx, "elasticsearch-query indexes: %s", qo.indexes) + log.Infof(ctx, "elasticsearch-query body: %s", bodyString) startAnaylize := time.Now() - search := i.client.Search().Index(qo.index).SearchSource(source) + search := i.client.Search().Index(qo.indexes...).SearchSource(source) res, err := search.Do(ctx) @@ -401,6 +394,9 @@ func (i *Instance) getIndexes(ctx context.Context, aliases ...string) ([]string, indexes = append(indexes, idx) } + sort.Slice(indexes, func(i, j int) bool { + return indexes[i] > indexes[j] + }) return indexes, nil } @@ -463,6 +459,57 @@ func (i *Instance) mergeTimeSeries(rets chan *TimeSeriesResult) (*prompb.QueryRe return qr, nil } +func (i *Instance) getAlias(ctx context.Context, db string, needAddTime bool, start, end time.Time, timezone string) []string { + var ( + alias []string + _, span = trace.NewSpan(ctx, "get-alias") + err error + ) + defer span.End(&err) + + span.Set("need-add-time", needAddTime) + + if needAddTime { + loc, err := time.LoadLocation(timezone) + if err != nil { + loc = time.UTC + } + start = start.In(loc) + end = end.In(loc) + + left := end.Unix() - start.Unix() + // 超过 6 个月 + + span.Set("timezone", loc.String()) + span.Set("start", start.String()) + span.Set("end", end.String()) + span.Set("left", left) + + addMonth := 0 + addDay := 1 + dateFormat := "20060102" + if left > int64(time.Hour.Seconds()*24*14) { + addDay = 0 + addMonth = 1 + dateFormat = "200601" + halfYear := time.Hour * 24 * 30 * 6 + if left > int64(halfYear.Seconds()) { + start = end.Add(halfYear * -1) + } + } + + for d := start; !d.After(end); d = d.AddDate(0, addMonth, addDay) { + alias = append(alias, fmt.Sprintf("%s_%s*", db, d.Format(dateFormat))) + } + } else { + alias = append(alias, db) + } + + span.Set("alias_num", len(alias)) + + return alias +} + // QueryRaw 给 PromEngine 提供查询接口 func (i *Instance) QueryRaw( ctx context.Context, @@ -494,14 +541,21 @@ func (i *Instance) QueryRaw( close(rets) }() + aliases := i.getAlias(ctx, query.DB, query.NeedAddTime, start, end, query.Timezone) + qo := &queryOption{ - index: query.DB, - start: start.Unix(), - end: end.Unix(), - query: query, + indexes: aliases, + start: start.Unix(), + end: end.Unix(), + query: query, + } + + mappings, err1 := i.getMappings(ctx, qo.indexes) + // index 不存在,mappings 获取异常直接返回空 + if len(mappings) == 0 { + return } - mapping, err1 := i.getMapping(ctx, qo.index) if err1 != nil { rets <- &TimeSeriesResult{ Error: err1, @@ -518,7 +572,7 @@ func (i *Instance) QueryRaw( fact := NewFormatFactory(ctx). WithIsReference(metadata.GetQueryParams(ctx).IsReference). WithQuery(query.Field, query.TimeField, qo.start, qo.end, query.From, size). - WithMapping(mapping). + WithMappings(mappings...). WithOrders(query.Orders). WithTransform(i.toEs, i.toProm) diff --git a/pkg/unify-query/tsdb/elasticsearch/instance_test.go b/pkg/unify-query/tsdb/elasticsearch/instance_test.go index 0d852bf23..46656250c 100644 --- a/pkg/unify-query/tsdb/elasticsearch/instance_test.go +++ b/pkg/unify-query/tsdb/elasticsearch/instance_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" @@ -343,3 +344,65 @@ func TestInstance_queryReference(t *testing.T) { }) } } + +func TestInstance_getAlias(t *testing.T) { + metadata.InitMetadata() + ctx := metadata.InitHashID(context.Background()) + inst, err := NewInstance(ctx, &InstanceOption{ + Timeout: time.Minute, + }) + if err != nil { + log.Panicf(ctx, err.Error()) + } + + for name, c := range map[string]struct { + start time.Time + end time.Time + timezone string + + expected []string + }{ + "3d with UTC": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 1, 3, 20, 0, 0, 0, time.UTC), + expected: []string{"db_test_20240101*", "db_test_20240102*", "db_test_20240103*"}, + }, + "3d with Asia/ShangHai": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 1, 3, 20, 0, 0, 0, time.UTC), + timezone: "Asia/ShangHai", + expected: []string{"db_test_20240102*", "db_test_20240103*", "db_test_20240104*"}, + }, + "14d with Asia/ShangHai": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 1, 15, 20, 0, 0, 0, time.UTC), + timezone: "Asia/ShangHai", + expected: []string{"db_test_20240102*", "db_test_20240103*", "db_test_20240104*", "db_test_20240105*", "db_test_20240106*", "db_test_20240107*", "db_test_20240108*", "db_test_20240109*", "db_test_20240110*", "db_test_20240111*", "db_test_20240112*", "db_test_20240113*", "db_test_20240114*", "db_test_20240115*", "db_test_20240116*"}, + }, + "15d with Asia/ShangHai": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 1, 16, 20, 0, 0, 0, time.UTC), + timezone: "Asia/ShangHai", + expected: []string{"db_test_202401*"}, + }, + "6m with Asia/ShangHai": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 7, 1, 20, 0, 0, 0, time.UTC), + timezone: "Asia/ShangHai", + expected: []string{"db_test_202401*", "db_test_202402*", "db_test_202403*", "db_test_202404*", "db_test_202405*", "db_test_202406*"}, + }, + "7m with Asia/ShangHai": { + start: time.Date(2024, 1, 1, 20, 0, 0, 0, time.UTC), + end: time.Date(2024, 8, 1, 20, 0, 0, 0, time.UTC), + timezone: "Asia/ShangHai", + expected: []string{"db_test_202402*", "db_test_202403*", "db_test_202404*", "db_test_202405*", "db_test_202406*", "db_test_202407*"}, + }, + } { + t.Run(name, func(t *testing.T) { + ctx = metadata.InitHashID(ctx) + actual := inst.getAlias(ctx, "db_test", true, c.start, c.end, c.timezone) + + assert.Equal(t, c.expected, actual) + }) + } +} diff --git a/pkg/utils/router/influxdb/space.go b/pkg/utils/router/influxdb/space.go index a1417a748..accaa2e5f 100644 --- a/pkg/utils/router/influxdb/space.go +++ b/pkg/utils/router/influxdb/space.go @@ -53,11 +53,14 @@ type ResultTableDetail struct { TagsKey []string `json:"tags_key"` DataId int64 `json:"bk_data_id"` Options struct { + // 自定义时间聚合字段 TimeField struct { Name string `json:"name"` Type string `json:"type"` Unit string `json:"unit"` } `json:"time_field"` + // db 是否拼接时间格式 + NeedAddTime bool `json:"need_add_time"` } `json:"options"` } diff --git a/pkg/utils/router/influxdb/space_gen.go b/pkg/utils/router/influxdb/space_gen.go index acbd98387..8a1209a73 100644 --- a/pkg/utils/router/influxdb/space_gen.go +++ b/pkg/utils/router/influxdb/space_gen.go @@ -490,6 +490,12 @@ func (z *ResultTableDetail) DecodeMsg(dc *msgp.Reader) (err error) { } } } + case "NeedAddTime": + z.Options.NeedAddTime, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Options", "NeedAddTime") + return + } default: err = dc.Skip() if err != nil { @@ -661,9 +667,9 @@ func (z *ResultTableDetail) EncodeMsg(en *msgp.Writer) (err error) { if err != nil { return } - // map header, size 1 + // map header, size 2 // write "TimeField" - err = en.Append(0x81, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64) + err = en.Append(0x82, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64) if err != nil { return } @@ -698,6 +704,16 @@ func (z *ResultTableDetail) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Options", "TimeField", "Unit") return } + // write "NeedAddTime" + err = en.Append(0xab, 0x4e, 0x65, 0x65, 0x64, 0x41, 0x64, 0x64, 0x54, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteBool(z.Options.NeedAddTime) + if err != nil { + err = msgp.WrapError(err, "Options", "NeedAddTime") + return + } return } @@ -752,9 +768,9 @@ func (z *ResultTableDetail) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendInt64(o, z.DataId) // string "Options" o = append(o, 0xa7, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73) - // map header, size 1 + // map header, size 2 // string "TimeField" - o = append(o, 0x81, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64) + o = append(o, 0x82, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x46, 0x69, 0x65, 0x6c, 0x64) // map header, size 3 // string "Name" o = append(o, 0x83, 0xa4, 0x4e, 0x61, 0x6d, 0x65) @@ -765,6 +781,9 @@ func (z *ResultTableDetail) MarshalMsg(b []byte) (o []byte, err error) { // string "Unit" o = append(o, 0xa4, 0x55, 0x6e, 0x69, 0x74) o = msgp.AppendString(o, z.Options.TimeField.Unit) + // string "NeedAddTime" + o = append(o, 0xab, 0x4e, 0x65, 0x65, 0x64, 0x41, 0x64, 0x64, 0x54, 0x69, 0x6d, 0x65) + o = msgp.AppendBool(o, z.Options.NeedAddTime) return } @@ -946,6 +965,12 @@ func (z *ResultTableDetail) UnmarshalMsg(bts []byte) (o []byte, err error) { } } } + case "NeedAddTime": + z.Options.NeedAddTime, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Options", "NeedAddTime") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -976,7 +1001,7 @@ func (z *ResultTableDetail) Msgsize() (s int) { for za0002 := range z.TagsKey { s += msgp.StringPrefixSize + len(z.TagsKey[za0002]) } - s += 7 + msgp.Int64Size + 8 + 1 + 10 + 1 + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Name) + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Type) + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Unit) + s += 7 + msgp.Int64Size + 8 + 1 + 10 + 1 + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Name) + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Type) + 5 + msgp.StringPrefixSize + len(z.Options.TimeField.Unit) + 12 + msgp.BoolSize return }