From 0ebed119d10b6edd06ce968c5954b376caf76f0a Mon Sep 17 00:00:00 2001 From: Shamcle Ren Date: Wed, 4 Sep 2024 12:09:58 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20bkbase=20api=20=E5=A2=9E=E5=8A=A0=20app?= =?UTF-8?q?=5Fcode=20=E5=92=8C=20space=5Fuid=20=E5=8F=82=E6=95=B0=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A=E4=BD=BF=E7=94=A8=20body=20=E9=89=B4=E6=9D=83=20--sto?= =?UTF-8?q?ry=3D119462756=20(#516)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/unify-query/bkapi/bkapi.go | 47 +- pkg/unify-query/bkapi/bkdata.go | 70 +-- pkg/unify-query/bkapi/bkdata_test.go | 25 + pkg/unify-query/bkapi/settings.go | 6 +- pkg/unify-query/cmdb/v1beta1/v1beta1_test.go | 30 +- pkg/unify-query/consul/tsdb.go | 2 +- pkg/unify-query/curl/mock.go | 69 +-- pkg/unify-query/metadata/headers.go | 24 + pkg/unify-query/mock/mock.go | 6 +- pkg/unify-query/service/http/api_test.go | 4 +- pkg/unify-query/service/http/query.go | 2 + pkg/unify-query/service/http/query_test.go | 57 +- pkg/unify-query/service/http/tsquery_test.go | 30 +- pkg/unify-query/tsdb/bksql/client.go | 16 +- pkg/unify-query/tsdb/bksql/instance.go | 20 +- pkg/unify-query/tsdb/bksql/instance_test.go | 27 +- .../tsdb/elasticsearch/instance.go | 175 +++--- .../tsdb/elasticsearch/instance_test.go | 2 +- pkg/unify-query/tsdb/influxdb/instance.go | 14 +- .../tsdb/influxdb/instance_test.go | 2 +- pkg/unify-query/tsdb/prometheus/querier.go | 127 ----- .../tsdb/prometheus/tsdb_instance.go | 150 ++++++ pkg/unify-query/tsdb/redis/instance.go | 44 +- .../tsdb/victoriaMetrics/instance.go | 17 +- .../tsdb/victoriaMetrics/instance_test.go | 499 ++---------------- 25 files changed, 582 insertions(+), 883 deletions(-) create mode 100644 pkg/unify-query/bkapi/bkdata_test.go create mode 100644 pkg/unify-query/metadata/headers.go create mode 100644 pkg/unify-query/tsdb/prometheus/tsdb_instance.go diff --git a/pkg/unify-query/bkapi/bkapi.go b/pkg/unify-query/bkapi/bkapi.go index 943107474..32220b99f 100644 --- a/pkg/unify-query/bkapi/bkapi.go +++ b/pkg/unify-query/bkapi/bkapi.go @@ -10,6 +10,7 @@ package bkapi import ( + "encoding/json" "fmt" "sync" @@ -18,45 +19,53 @@ import ( const ( AdminUserName = "admin" - BkApiAuthorization = "X-Bkapi-Authorization" + BkAPIAuthorization = "X-Bkapi-Authorization" + + BkUserNameKey = "bk_username" + BkAppCodeKey = "bk_app_code" + BkSecretKey = "bk_app_secret" ) -type BkApi struct { +type BkAPI struct { address string - code string - secret string + authConfig map[string]string } var ( - onceBkApi sync.Once - defaultBkApi *BkApi + onceBkAPI sync.Once + defaultBkAPI *BkAPI ) -func GetBkApi() *BkApi { - onceBkApi.Do(func() { - defaultBkApi = &BkApi{ - address: viper.GetString(BkApiAddressConfigPath), - code: viper.GetString(BkApiCodeConfigPath), - secret: viper.GetString(BkApiSecretConfigPath), +func GetBkAPI() *BkAPI { + onceBkAPI.Do(func() { + defaultBkAPI = &BkAPI{ + address: viper.GetString(BkAPIAddressConfigPath), + authConfig: map[string]string{ + BkAppCodeKey: viper.GetString(BkAPICodeConfigPath), + BkSecretKey: viper.GetString(BkAPISecretConfigPath), + BkUserNameKey: AdminUserName, + }, } }) - return defaultBkApi + return defaultBkAPI +} + +func (i *BkAPI) GetCode() string { + return i.authConfig[BkAppCodeKey] } -func (i *BkApi) Headers(headers map[string]string) map[string]string { +func (i *BkAPI) Headers(headers map[string]string) map[string]string { if len(headers) == 0 { headers = make(map[string]string) } - headers[BkApiAuthorization] = fmt.Sprintf( - `{"bk_username": "%s", "bk_app_code": "%s", "bk_app_secret": "%s"}`, - AdminUserName, i.code, i.secret, - ) + auth, _ := json.Marshal(i.authConfig) + headers[BkAPIAuthorization] = string(auth) return headers } -func (i *BkApi) Url(path string) string { +func (i *BkAPI) Url(path string) string { url := i.address if path != "" { url = fmt.Sprintf("%s/%s", i.address, path) diff --git a/pkg/unify-query/bkapi/bkdata.go b/pkg/unify-query/bkapi/bkdata.go index 3f232f62b..efb2690ea 100644 --- a/pkg/unify-query/bkapi/bkdata.go +++ b/pkg/unify-query/bkapi/bkdata.go @@ -10,8 +10,8 @@ package bkapi import ( + "encoding/json" "fmt" - "net/http" "sync" "github.com/spf13/viper" @@ -22,69 +22,71 @@ const ( QuerySync = "query_sync" QueryAsync = "query_async" + + BkDataAuthenticationMethodKey = "bkdata_authentication_method" + BkDataDataTokenKey = "bkdata_data_token" ) var ( - onceBkDataApi sync.Once - defaultBkDataApi *BkDataApi + onceBkDataAPI sync.Once + defaultBkDataAPI *BkDataAPI ) -type BkDataApi struct { - bkApi *BkApi +type BkDataAPI struct { + bkAPI *BkAPI + + uriPath string - uriPath string - token string - authenticationMethod string + authConfig map[string]string } -func GetBkDataApi() *BkDataApi { - onceBkDataApi.Do(func() { - defaultBkDataApi = &BkDataApi{ - bkApi: GetBkApi(), - token: viper.GetString(BkDataTokenConfigPath), - authenticationMethod: viper.GetString(BkDataAuthenticationMethodConfigPath), - uriPath: viper.GetString(BkDataUriPathConfigPath), +func GetBkDataAPI() *BkDataAPI { + onceBkDataAPI.Do(func() { + bkAPI := GetBkAPI() + defaultBkDataAPI = &BkDataAPI{ + bkAPI: bkAPI, + uriPath: viper.GetString(BkDataUriPathConfigPath), + authConfig: map[string]string{ + BkDataDataTokenKey: viper.GetString(BkDataTokenConfigPath), + BkDataAuthenticationMethodKey: viper.GetString(BkDataAuthenticationMethodConfigPath), + BkUserNameKey: AdminUserName, + BkAppCodeKey: bkAPI.GetCode(), + }, } }) - return defaultBkDataApi + return defaultBkDataAPI } -func (i *BkDataApi) HttpHeaders(headers map[string]string) http.Header { - headers = i.Headers(headers) - netHeaders := make(http.Header, len(headers)) - for k, v := range headers { - netHeaders[k] = []string{v} - } - return netHeaders +func (i *BkDataAPI) GetDataAuth() map[string]string { + return i.authConfig } -func (i *BkDataApi) Headers(headers map[string]string) map[string]string { +func (i *BkDataAPI) Headers(headers map[string]string) map[string]string { if len(headers) == 0 { headers = make(map[string]string) } - headers[BkDataAuthorization] = fmt.Sprintf( - `{"bkdata_authentication_method": "%s", "bkdata_data_token": "%s", "bk_username": "%s"}`, - i.authenticationMethod, i.token, AdminUserName, - ) - return i.bkApi.Headers(headers) + + auth, _ := json.Marshal(i.authConfig) + headers[BkDataAuthorization] = string(auth) + return i.bkAPI.Headers(headers) } -func (i *BkDataApi) url(path string) string { - url := i.bkApi.Url(i.uriPath) +func (i *BkDataAPI) url(path string) string { + url := i.bkAPI.Url(i.uriPath) if path != "" { url = fmt.Sprintf("%s/%s", url, path) } return url } -func (i *BkDataApi) QueryAsyncUrl() string { +func (i *BkDataAPI) QueryAsyncUrl() string { return i.url(QueryAsync) } -func (i *BkDataApi) QuerySyncUrl() string { +func (i *BkDataAPI) QuerySyncUrl() string { return i.url(QuerySync) } -func (i *BkDataApi) QueryEsUrl() string { +func (i *BkDataAPI) QueryEsUrl() string { return fmt.Sprintf("%s/es", i.QuerySyncUrl()) } diff --git a/pkg/unify-query/bkapi/bkdata_test.go b/pkg/unify-query/bkapi/bkdata_test.go new file mode 100644 index 000000000..b15860762 --- /dev/null +++ b/pkg/unify-query/bkapi/bkdata_test.go @@ -0,0 +1,25 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package bkapi + +import ( + "fmt" + "testing" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/mock" +) + +func TestGetDataAuth(t *testing.T) { + mock.Path = `../dist/local/unify-query.yaml` + mock.Init() + + headers := GetBkDataAPI().Headers(nil) + fmt.Println(headers) +} diff --git a/pkg/unify-query/bkapi/settings.go b/pkg/unify-query/bkapi/settings.go index 51491c016..26fa26900 100644 --- a/pkg/unify-query/bkapi/settings.go +++ b/pkg/unify-query/bkapi/settings.go @@ -16,7 +16,7 @@ const ( BkDataTokenConfigPath = "bk_data.token" // BKAPI 配置 - BkApiAddressConfigPath = "bk_api.address" - BkApiCodeConfigPath = "bk_api.code" - BkApiSecretConfigPath = "bk_api.secret" + BkAPIAddressConfigPath = "bk_api.address" + BkAPICodeConfigPath = "bk_api.code" + BkAPISecretConfigPath = "bk_api.secret" ) diff --git a/pkg/unify-query/cmdb/v1beta1/v1beta1_test.go b/pkg/unify-query/cmdb/v1beta1/v1beta1_test.go index 4f2eb367d..d9e173935 100644 --- a/pkg/unify-query/cmdb/v1beta1/v1beta1_test.go +++ b/pkg/unify-query/cmdb/v1beta1/v1beta1_test.go @@ -269,20 +269,28 @@ func TestModel_GetPath(t *testing.T) { } } -func mockData(ctx context.Context) *curl.TestCurl { - mockCurl := curl.NewMockCurl(map[string]string{ - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} +func mockData(ctx context.Context) *curl.MockCurl { + mockCurl := &curl.MockCurl{} + mockCurl.WithF(func(opt curl.Options) []byte { + res := map[string]string{ + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+bk_target_ip%3D%27127.0.0.1%27++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+bk_target_ip%3D%27127.0.0.1%27++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_pod_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_pod_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-1","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-2","bkmonitor-operator-operator"]]}]}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_pod_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_pod_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-1","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-2","bkmonitor-operator-operator"]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_pod_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+%28namespace%3D%27bkmonitor-operator%27+and+pod%3D%27bkm-pod-1%27%29%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_pod_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-1","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-2","bkmonitor-operator-operator"]]}]}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_pod_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+%28namespace%3D%27bkmonitor-operator%27+and+pod%3D%27bkm-pod-1%27%29%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_pod_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-1","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-pod-2","bkmonitor-operator-operator"]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29+or+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=2_bkmonitor_time_series_1572864&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+node_with_system_relation+where+time+%3E+1693973867000000000+and+time+%3C+1693973987000000000+and+%28%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29+or+%28bcs_cluster_id%3D%27BCS-K8S-00000%27+and+node%3D%27node-127-0-0-1%27%29%29++limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"series":[{"name":"node_with_system_relation","columns":["_time","_value","bcs_cluster_id","bk_biz_id","bk_endpoint_index","bk_endpoint_url","bk_instance","bk_job","bk_monitor_name","bk_monitor_namespace","bk_target_ip","endpoint","instance","job","monitor_type","namespace","node","pod","service"],"values":[[1693973874000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"],[1693973934000000000,1,"BCS-K8S-00000","2","0","http://127.0.0.1:8080/relation/metrics","127.0.0.1:8080","bkmonitor-operator-operator","bkmonitor-operator-operator","bkmonitor-operator","127.0.0.1","http","127.0.0.1:8080","bkmonitor-operator-operator","ServiceMonitor","bkmonitor-operator","node-127-0-0-1","bkm-operator-6b4768bb58-lxhnr","bkmonitor-operator-operator"]]}]}]} `, - `victoria_metric/api`: `{}`, - }, nil) + `victoria_metric/api`: `{}`, + } + if v, ok := res[opt.UrlPath]; ok { + return []byte(v) + } + + return nil + }) metadata.GetQueryRouter().MockSpaceUid(consul.VictoriaMetricsStorageType) @@ -290,7 +298,7 @@ func mockData(ctx context.Context) *curl.TestCurl { influxdbStorageID := "2" influxdbStorageIDInt := int64(2) - vmInstance, err := victoriaMetrics.NewInstance(ctx, victoriaMetrics.Options{ + vmInstance, err := victoriaMetrics.NewInstance(ctx, &victoriaMetrics.Options{ Curl: mockCurl, InfluxCompatible: true, UseNativeOr: true, @@ -305,7 +313,7 @@ func mockData(ctx context.Context) *curl.TestCurl { influxInstance, err := tsdbInfluxdb.NewInstance( context.TODO(), - tsdbInfluxdb.Options{ + &tsdbInfluxdb.Options{ Host: "127.0.0.1", Port: 80, Curl: mockCurl, diff --git a/pkg/unify-query/consul/tsdb.go b/pkg/unify-query/consul/tsdb.go index 8ed8ed53b..06daad0cb 100644 --- a/pkg/unify-query/consul/tsdb.go +++ b/pkg/unify-query/consul/tsdb.go @@ -19,7 +19,7 @@ const ( ElasticsearchStorageType = "elasticsearch" ) -var typeList = []string{InfluxDBStorageType, ElasticsearchStorageType} +var typeList = []string{InfluxDBStorageType, ElasticsearchStorageType, BkSqlStorageType} // GetTsDBStorageInfo 获取 tsDB 存储实例 func GetTsDBStorageInfo() (map[string]*Storage, error) { diff --git a/pkg/unify-query/curl/mock.go b/pkg/unify-query/curl/mock.go index 891187d81..5010e924d 100644 --- a/pkg/unify-query/curl/mock.go +++ b/pkg/unify-query/curl/mock.go @@ -10,74 +10,37 @@ package curl import ( - "bytes" "context" "encoding/json" "io" - "net/http" - - "github.com/pkg/errors" - - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" ) -func NewMockCurl(data map[string]string, log *log.Logger) *TestCurl { - return &TestCurl{ - log: log, - data: data, - } +type MockCurl struct { + f func(opt Options) []byte + Opts Options } -type TestCurl struct { - log *log.Logger - data map[string]string - - Url string - Params []byte -} +var _ Curl = &MockCurl{} -func (c *TestCurl) WithDecoder(decoder func(ctx context.Context, reader io.Reader, resp interface{}) (int, error)) { +func (c *MockCurl) WithDecoder(decoder func(ctx context.Context, reader io.Reader, resp any) (int, error)) { return } -func (c *TestCurl) hashOption(opt Options) string { - s := opt.UrlPath + string(opt.Body) - return s +func (c *MockCurl) WithF(f func(opt Options) []byte) { + c.f = f } -func (c *TestCurl) Request(ctx context.Context, method string, opt Options, res interface{}) (int, error) { - c.log.Infof(ctx, "http %s: %s", method, opt.UrlPath) +func (c *MockCurl) Request(ctx context.Context, method string, opt Options, res interface{}) (int, error) { + c.Opts = opt - c.Url = opt.UrlPath - c.Params = opt.Body - - var ( - err error - out string - ok bool - ) - - hashKey := c.hashOption(opt) - if out, ok = c.data[hashKey]; ok { - err = json.Unmarshal([]byte(out), res) - } else { - err = errors.New("mock data is not exists: " + hashKey) + var out []byte + if c.f != nil { + out = c.f(opt) } - return len(out), err -} - -var _ Curl = &TestCurl{} - -func (c *TestCurl) resp(body string) *http.Response { - return &http.Response{ - Status: "200 OK", - StatusCode: 200, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - Body: io.NopCloser(bytes.NewBufferString(body)), - ContentLength: int64(len(body)), - Header: make(http.Header, 0), + if len(out) > 0 { + err := json.Unmarshal(out, res) + return len(out), err } + return 0, nil } diff --git a/pkg/unify-query/metadata/headers.go b/pkg/unify-query/metadata/headers.go new file mode 100644 index 000000000..87989a712 --- /dev/null +++ b/pkg/unify-query/metadata/headers.go @@ -0,0 +1,24 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package metadata + +import "context" + +// Headers 统一注入请求 header 头信息 +func Headers(ctx context.Context, headers map[string]string) map[string]string { + if headers == nil { + headers = make(map[string]string) + } + + user := GetUser(ctx) + headers[BkQuerySourceHeader] = user.Key + headers[SpaceUIDHeader] = user.SpaceUid + return headers +} diff --git a/pkg/unify-query/mock/mock.go b/pkg/unify-query/mock/mock.go index 533a653fb..a2ea761f9 100644 --- a/pkg/unify-query/mock/mock.go +++ b/pkg/unify-query/mock/mock.go @@ -29,11 +29,15 @@ import ( var ( once sync.Once + Path string ) func Init() { once.Do(func() { - config.CustomConfigFilePath = `../../dist/local/unify-query.yaml` + if Path == "" { + Path = `../../dist/local/unify-query.yaml` + } + config.CustomConfigFilePath = Path config.InitConfig() log.InitTestLogger() diff --git a/pkg/unify-query/service/http/api_test.go b/pkg/unify-query/service/http/api_test.go index 821948b5f..9bb2d9fe4 100644 --- a/pkg/unify-query/service/http/api_test.go +++ b/pkg/unify-query/service/http/api_test.go @@ -188,9 +188,9 @@ func TestQueryInfo(t *testing.T) { assert.Equal(t, c.err, err) } else { if c.spaceUid == consul.VictoriaMetricsStorageType { - assert.Equal(t, c.data, string(mockCurl.Params)) + assert.Equal(t, c.data, string(mockCurl.Opts.Body)) } else { - assert.Equal(t, c.data, mockCurl.Url) + assert.Equal(t, c.data, mockCurl.Opts.Body) } } }) diff --git a/pkg/unify-query/service/http/query.go b/pkg/unify-query/service/http/query.go index 667c32707..a83fea93d 100644 --- a/pkg/unify-query/service/http/query.go +++ b/pkg/unify-query/service/http/query.go @@ -449,6 +449,8 @@ func queryTsWithPromEngine(ctx context.Context, query *structured.QueryTs) (inte //} metadata.SetExpand(ctx, vmExpand) instance = prometheus.GetTsDbInstance(ctx, &metadata.Query{ + // 兼容 storage 结构体,用于单元测试 + StorageID: consul.VictoriaMetricsStorageType, StorageType: consul.VictoriaMetricsStorageType, }) if instance == nil { diff --git a/pkg/unify-query/service/http/query_test.go b/pkg/unify-query/service/http/query_test.go index b360e0c14..b2b8190e7 100644 --- a/pkg/unify-query/service/http/query_test.go +++ b/pkg/unify-query/service/http/query_test.go @@ -32,11 +32,12 @@ import ( "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/redis" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb" tsdbInfluxdb "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/influxdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/victoriaMetrics" ir "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/router/influxdb" ) // mockData comment lint rebel -func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { +func mockData(ctx context.Context, path, bucket string) *curl.MockCurl { featureFlag.MockFeatureFlag(ctx, `{ "vm-query-or": { "variations": { @@ -297,8 +298,11 @@ func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { }}, nil, nil, ) - mockCurl := curl.NewMockCurl(map[string]string{ - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=pushgateway_bkmonitor_unify_query&q=select+metric_value+as+_value%2C+time+as+_time%2C+bk_trace_id%2C+bk_span_id%2C+bk_trace_value%2C+bk_trace_timestamp+from+group2_cmdb_level+where+time+%3E+1682149980000000000+and+time+%3C+1682154605000000000+and+%28bk_obj_id%3D%27module%27+and+%28ip%3D%27127.0.0.2%27+and+%28bk_inst_id%3D%2714261%27+and+bk_biz_id%3D%277%27%29%29%29+and+metric_name+%3D+%27unify_query_request_count_total%27+and+%28bk_span_id+%21%3D+%27%27+or+bk_trace_id+%21%3D+%27%27%29++limit+100000000+slimit+100000000`: `{"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T07:53:28Z",114938716,"d8952469b9014ed6b36c19d396b15c61","0a97123ee5ad7fd8",1,1682150008967],["2023-04-22T07:53:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:53:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:53:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T07:53:28Z",114939201,"771073eb573336a6d3365022a512d6d8","fca46f1c065452e8",1,1682150008969],["2023-04-22T07:54:28Z",114949368,"5b4931bbeb7bf497ff46d9cd9579ab60","0b0713e4e0106e55",1,1682150068965],["2023-04-22T07:54:28Z",114949853,"7c3c66f8763071d315fe8136bf8ff35c","159d9534754dc66d",1,1682150068965],["2023-04-22T07:54:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:54:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:54:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]],"partial":true}],"partial":true}]} + + mockCurl := &curl.MockCurl{} + mockCurl.WithF(func(opt curl.Options) []byte { + res := map[string]string{ + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=pushgateway_bkmonitor_unify_query&q=select+metric_value+as+_value%2C+time+as+_time%2C+bk_trace_id%2C+bk_span_id%2C+bk_trace_value%2C+bk_trace_timestamp+from+group2_cmdb_level+where+time+%3E+1682149980000000000+and+time+%3C+1682154605000000000+and+%28bk_obj_id%3D%27module%27+and+%28ip%3D%27127.0.0.2%27+and+%28bk_inst_id%3D%2714261%27+and+bk_biz_id%3D%277%27%29%29%29+and+metric_name+%3D+%27unify_query_request_count_total%27+and+%28bk_span_id+%21%3D+%27%27+or+bk_trace_id+%21%3D+%27%27%29++limit+100000000+slimit+100000000`: `{"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T07:53:28Z",114938716,"d8952469b9014ed6b36c19d396b15c61","0a97123ee5ad7fd8",1,1682150008967],["2023-04-22T07:53:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:53:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:53:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T07:53:28Z",114939201,"771073eb573336a6d3365022a512d6d8","fca46f1c065452e8",1,1682150008969],["2023-04-22T07:54:28Z",114949368,"5b4931bbeb7bf497ff46d9cd9579ab60","0b0713e4e0106e55",1,1682150068965],["2023-04-22T07:54:28Z",114949853,"7c3c66f8763071d315fe8136bf8ff35c","159d9534754dc66d",1,1682150068965],["2023-04-22T07:54:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:54:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:54:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T07:55:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:55:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:55:28Z",114959046,"c9659d0b28bdb0c8afbd21aedd6bacd3","94bc25ffa3cc44e5",1,1682150128965],["2023-04-22T07:55:28Z",114959529,"c9659d0b28bdb0c8afbd21aedd6bacd3","94bc25ffa3cc44e5",1,1682150128962],["2023-04-22T07:55:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T07:56:28Z",114968813,"5def2a6568efe57199022da6f7cfcf3f","1d761b6cc2aabe5e",1,1682150188964],["2023-04-22T07:56:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:56:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:56:28Z",114969300,"ca18ec94669be35fee1b4ae4a2e3df2a","c113aa392812404a",1,1682150188968],["2023-04-22T07:56:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T07:57:28Z",114986134,"ab5b57bea973133a5cb1f89ee93ffd5a","191d6c2087b110de",1,1682150248965],["2023-04-22T07:57:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:57:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:57:28Z",114986622,"032fa9e22eaa486349f3a9d8ac1a0c76","7832acea944dc180",1,1682150248967],["2023-04-22T07:57:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T07:58:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:58:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:58:28Z",114997601,"0bd0b98c89250d1ab3e4fc77cdc9d619","4d3a86e07ac11e04",1,1682150308961],["2023-04-22T07:58:28Z",114998085,"09aeca94699ae82eec038c85573b68c4","687b908edb32b09f",1,1682150308967],["2023-04-22T07:58:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T07:59:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T07:59:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T07:59:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T07:59:28Z",115009204,"595fb8f3671365cc1ad920d30a7c9c6e","d1f36e8cbdb25ce8",1,1682150368961],["2023-04-22T07:59:28Z",115008721,"595fb8f3671365cc1ad920d30a7c9c6e","d1f36e8cbdb25ce8",1,1682150368964],["2023-04-22T08:00:28Z",115021473,"5a6dbfa27835ac9b22d8a795477e3155","1954ae8cddc3a6fc",1,1682150428955],["2023-04-22T08:00:28Z",115020990,"5a6dbfa27835ac9b22d8a795477e3155","1954ae8cddc3a6fc",1,1682150428959],["2023-04-22T08:00:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T08:00:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T08:00:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]],"partial":true}],"partial":true}]} @@ -338,7 +342,7 @@ func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { {"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T09:07:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T09:07:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T09:07:28Z",115825085,"efd0e34e9126172d47ded9fdddec5a7b","8ddb9404521f177b",1,1682154448967],["2023-04-22T09:07:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T09:07:28Z",115824600,"b0705f41034b923baad5b10e376a58de","e1f7bb8fd52d0a7b",1,1682154448962],["2023-04-22T09:08:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T09:08:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T09:08:28Z",115835645,"42bb8976800e050326fa14c25b281924","600873b41a6ff1b2",1,1682154508966],["2023-04-22T09:08:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937],["2023-04-22T09:08:28Z",115836130,"984e5e6ca13c09397100769e6d336039","db4129d930407fc8",1,1682154508964]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"group2_cmdb_level","columns":["_time","_value","bk_trace_id","bk_span_id","bk_trace_value","bk_trace_timestamp"],"values":[["2023-04-22T09:09:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900669],["2023-04-22T09:09:28Z",5,"b9cc0e45d58a70b61e8db6fffb5e3376","3d2a373cbeefa1f8",1,1680157900736],["2023-04-22T09:09:28Z",115849354,"ff3d29f3ec4a8b2364d2a5ba7b0751be","66103693e7484b95",1,1682154568967],["2023-04-22T09:09:28Z",115849837,"ff3d29f3ec4a8b2364d2a5ba7b0751be","66103693e7484b95",1,1682154568963],["2023-04-22T09:09:28Z",483,"fe45f0eccdce3e643a77504f6e6bd87a","c72dcc8fac9bcead",1,1682121442937]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+mean%28%22usage%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+mean%28%22usage%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:04:00Z",20.468538578157883],["2023-02-22T16:05:00Z",20.25296970605787],["2023-02-22T16:06:00Z",19.9283445874921],["2023-02-22T16:07:00Z",19.612237758778733],["2023-02-22T16:08:00Z",20.187296617920314]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:09:00Z",20.916380134086413],["2023-02-22T16:10:00Z",22.554908120339377],["2023-02-22T16:11:00Z",20.253084390783837],["2023-02-22T16:12:00Z",20.48536897192481],["2023-02-22T16:13:00Z",20.090785116663426]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:14:00Z",20.25654085898734],["2023-02-22T16:15:00Z",21.041731249213385],["2023-02-22T16:16:00Z",20.43003902957978],["2023-02-22T16:17:00Z",20.038367095325594],["2023-02-22T16:18:00Z",20.202399021312875]],"partial":true}],"partial":true}]} @@ -353,7 +357,7 @@ func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:59:00Z",21.344305007289915],["2023-02-22T17:00:00Z",25.937044373952602],["2023-02-22T17:01:00Z",20.421952975501853],["2023-02-22T17:02:00Z",20.121773311320066],["2023-02-22T17:03:00Z",19.74360429634455]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T17:04:00Z",19.90800208328392],["2023-02-22T17:05:00Z",20.48559522490759],["2023-02-22T17:06:00Z",20.0645267193599],["2023-02-22T17:07:00Z",26.77071273642816]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+mean%28%22rate%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+mean%28%22rate%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:04:00Z",20.468538578157883],["2023-02-22T16:05:00Z",20.25296970605787],["2023-02-22T16:06:00Z",19.9283445874921],["2023-02-22T16:07:00Z",19.612237758778733],["2023-02-22T16:08:00Z",20.187296617920314]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:09:00Z",20.916380134086413],["2023-02-22T16:10:00Z",22.554908120339377],["2023-02-22T16:11:00Z",20.253084390783837],["2023-02-22T16:12:00Z",20.48536897192481],["2023-02-22T16:13:00Z",20.090785116663426]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:14:00Z",20.25654085898734],["2023-02-22T16:15:00Z",21.041731249213385],["2023-02-22T16:16:00Z",20.43003902957978],["2023-02-22T16:17:00Z",20.038367095325594],["2023-02-22T16:18:00Z",20.202399021312875]],"partial":true}],"partial":true}]} @@ -368,18 +372,24 @@ func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T16:59:00Z",21.344305007289915],["2023-02-22T17:00:00Z",25.937044373952602],["2023-02-22T17:01:00Z",20.421952975501853],["2023-02-22T17:02:00Z",20.121773311320066],["2023-02-22T17:03:00Z",19.74360429634455]],"partial":true}],"partial":true}]} {"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T17:04:00Z",19.90800208328392],["2023-02-22T17:05:00Z",20.48559522490759],["2023-02-22T17:06:00Z",20.0645267193599],["2023-02-22T17:07:00Z",26.77071273642816]]}]}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=custom_report_aggate&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+bkmonitor_action_notice_api_call_count_total+where+time+%3E+1692584759999000000+and+time+%3C+1692585659999000000+and+%28notice_way%3D%27weixin%27+and+status%3D%27failed%27%29++limit+100000000+slimit+100000000`: `{"results":[{"statement_id":0,"series":[{"name":"bkmonitor_action_notice_api_call_count_total","columns":["_time","_value","job","notice_way","status","target"],"values":[["2023-08-21T02:26:34.603Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:27:33.198Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:28:32.629Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:29:36.848Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:32:35.819Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:32:55.496Z",14569,"SLI","weixin","failed","unknown"],["2023-08-21T02:33:39.496Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:34:43.517Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:37:35.203Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:38:32.111Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:39:32.135Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:40:40.788Z",14570,"SLI","weixin","failed","unknown"]]}]}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=custom_report_aggate&q=select+%22value%22+as+_value%2C+time+as+_time%2C%2A%3A%3Atag+from+bkmonitor_action_notice_api_call_count_total+where+time+%3E+1692584759999000000+and+time+%3C+1692585659999000000+and+%28notice_way%3D%27weixin%27+and+status%3D%27failed%27%29++limit+100000000+slimit+100000000`: `{"results":[{"statement_id":0,"series":[{"name":"bkmonitor_action_notice_api_call_count_total","columns":["_time","_value","job","notice_way","status","target"],"values":[["2023-08-21T02:26:34.603Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:27:33.198Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:28:32.629Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:29:36.848Z",14568,"SLI","weixin","failed","unknown"],["2023-08-21T02:32:35.819Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:32:55.496Z",14569,"SLI","weixin","failed","unknown"],["2023-08-21T02:33:39.496Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:34:43.517Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:37:35.203Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:38:32.111Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:39:32.135Z",14570,"SLI","weixin","failed","unknown"],["2023-08-21T02:40:40.788Z",14570,"SLI","weixin","failed","unknown"]]}]}]} `, - `victoria_metric/api`: `{"result": true, "code":"00", "data":{}}`, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+count%28%22rate%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} + `victoria_metric/api`: `{"result": true, "code":"00", "data":{}}`, + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+count%28%22rate%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} `, - `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+count%28%22usage%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} + `http://127.0.0.1:80/query?chunk_size=10&chunked=true&db=system&q=select+count%28%22usage%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1677081599999000000+and+time+%3C+1677085659999000000++group+by+time%281m0s%29+limit+100000000+slimit+100000000+tz%28%27UTC%27%29`: `{"results":[{"statement_id":0,"series":[{"name":"cpu_summary","columns":["_time","_value"],"values":[["2023-02-22T15:59:00Z",null],["2023-02-22T16:00:00Z",25.124152312094484],["2023-02-22T16:01:00Z",20.724334166696504],["2023-02-22T16:02:00Z",20.426171484280808],["2023-02-22T16:03:00Z",20.327529103992745]],"partial":true}],"partial":true}]} `, - }, log.DefaultLogger) + } + + if v, ok := res[opt.UrlPath]; ok { + return []byte(v) + } + return nil + }) inst, _ := tsdbInfluxdb.NewInstance( - context.TODO(), - tsdbInfluxdb.Options{ + ctx, + &tsdbInfluxdb.Options{ Host: "127.0.0.1", Port: 80, Curl: mockCurl, @@ -394,6 +404,18 @@ func mockData(ctx context.Context, path, bucket string) *curl.TestCurl { Instance: inst, }) + vmInst, _ := victoriaMetrics.NewInstance(ctx, &victoriaMetrics.Options{ + Address: "127.0.0.1", + Curl: mockCurl, + + InfluxCompatible: true, + UseNativeOr: true, + }) + tsdb.SetStorage(consul.VictoriaMetricsStorageType, &tsdb.Storage{ + Type: consul.VictoriaMetricsStorageType, + Instance: vmInst, + }) + mock.SetRedisClient(context.TODO(), "test") return mockCurl } @@ -1155,7 +1177,7 @@ func TestVmQueryParams(t *testing.T) { username: "vm-query", spaceUid: consul.VictoriaMetricsStorageType, query: `{"query_list":[{"field_name":"bk_split_measurement","function":[{"method":"sum","dimensions":["bcs_cluster_id","namespace"]}],"time_aggregation":{"function":"increase","window":"1m0s"},"reference_name":"a","conditions":{"field_list":[{"field_name":"bcs_cluster_id","value":["cls-2"],"op":"req"},{"field_name":"bcs_cluster_id","value":["cls-2"],"op":"req"},{"field_name":"bk_biz_id","value":["100801"],"op":"eq"}],"condition_list":["and", "and"]}},{"field_name":"bk_split_measurement","function":[{"method":"sum","dimensions":["bcs_cluster_id","namespace"]}],"time_aggregation":{"function":"delta","window":"1m0s"},"reference_name":"b"}],"metric_merge":"a / b","start_time":"0","end_time":"600","step":"60s"}`, - params: `{"influx_compatible":true,"use_native_or":true,"api_type":"query_range","cluster_name":"","api_params":{"query":"sum by (bcs_cluster_id, namespace) (increase_prometheus(a[1m] offset -59s999ms)) / sum by (bcs_cluster_id, namespace) (delta_prometheus(b[1m] offset -59s999ms))","start":0,"end":600,"step":60},"result_table_list":["victoria_metrics"],"metric_filter_condition":{"a":"filter=\"bk_split_measurement\", bcs_cluster_id=~\"cls-2\", bcs_cluster_id=~\"cls-2\", bk_biz_id=\"100801\", result_table_id=\"victoria_metrics\", __name__=\"bk_split_measurement_value\"","b":"filter=\"bk_split_measurement\", result_table_id=\"victoria_metrics\", __name__=\"bk_split_measurement_value\""}}`, + params: `{"influx_compatible":true,"use_native_or":true,"api_type":"query_range","cluster_name":"","api_params":{"query":"sum by (bcs_cluster_id, namespace) (increase(a[1m] offset -59s999ms)) / sum by (bcs_cluster_id, namespace) (delta(b[1m] offset -59s999ms))","start":0,"end":600,"step":60},"result_table_list":["victoria_metrics"],"metric_filter_condition":{"a":"filter=\"bk_split_measurement\", bcs_cluster_id=~\"cls-2\", bcs_cluster_id=~\"cls-2\", bk_biz_id=\"100801\", result_table_id=\"victoria_metrics\", __name__=\"bk_split_measurement_value\"","b":"filter=\"bk_split_measurement\", result_table_id=\"victoria_metrics\", __name__=\"bk_split_measurement_value\""}}`, }, { username: "vm-query-or", @@ -1207,7 +1229,7 @@ func TestVmQueryParams(t *testing.T) { query *structured.QueryTs err error ) - ctx, _ = context.WithCancel(ctx) + ctx := metadata.InitHashID(ctx) metadata.SetUser(ctx, fmt.Sprintf("username:%s", c.username), c.spaceUid, "") if c.promql != "" { @@ -1228,12 +1250,9 @@ func TestVmQueryParams(t *testing.T) { if c.error != nil { assert.Contains(t, err.Error(), c.error.Error()) } else { - if len(mockCurl.Params) == 0 { - assert.Nil(t, err) - } var vmParams map[string]string - if mockCurl.Params != nil { - err = json.Unmarshal(mockCurl.Params, &vmParams) + if mockCurl.Opts.Body != nil { + err = json.Unmarshal(mockCurl.Opts.Body, &vmParams) assert.Nil(t, err) } if vmParams != nil { diff --git a/pkg/unify-query/service/http/tsquery_test.go b/pkg/unify-query/service/http/tsquery_test.go index c19e1494f..8f8253a69 100644 --- a/pkg/unify-query/service/http/tsquery_test.go +++ b/pkg/unify-query/service/http/tsquery_test.go @@ -22,7 +22,6 @@ import ( "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/consul" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/influxdb/decoder" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/mock" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/query/promql" @@ -116,20 +115,27 @@ func generateData(metricName string, startValue int, dimensionsPrefix string, di } func MockTsDB(t *testing.T) { - mockCurl := curl.NewMockCurl(map[string]string{ - `http://127.0.0.1:80/query?db=2_bkmonitor_time_series_1582626&q=select+count%28%22value%22%29+as+_value%2C+time+as+_time+from+container_cpu_system_seconds_total+where+time+%3E+1669717379999000000+and+time+%3C+1669717739999000000+and+bcs_cluster_id%3D%27BCS-K8S-40949%27++group+by+time%281m0s%29+tz%28%27UTC%27%29`: ``, - `http://127.0.0.1/api/query_range?end=1669717680&query=count%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"35895"],[1669717440,"35900"],[1669717500,"39424"],[1669717560,"41380"],[1669717620,"43604"],[1669717680,"42659"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29+%2B+count%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"70639"],[1669717440,"74007"],[1669717500,"79092"],[1669717560,"83808"],[1669717620,"85899"],[1669717680,"85261"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669717680&query=sum+by%28pod_name%29+%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cpod_name%3D~%22actor.%2A%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{"pod_name":"actor-train-train-11291730-bot-1f42-0"},"values":[[1669717380,"2"],[1669717560,"2"],[1669717620,"2"],[1669717680,"2"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"34744"],[1669717440,"38107"],[1669717500,"39668"],[1669717560,"42428"],[1669717620,"42295"],[1669717680,"42602"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbk_biz_id%3D%22930%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"34744"],[1669717440,"38107"],[1669717500,"39668"],[1669717560,"42428"],[1669717620,"42295"],[1669717680,"42602"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbk_biz_id%3D%22930%22%7D%5B1m%5D+offset+-59s999ms%29%29+%2F+count%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"1"],[1669717440,"1"],[1669717500,"1"],[1669717560,"1"],[1669717620,"1"],[1669717680,"1"]]}]}}`, - `http://127.0.0.1:80/query?db=system&q=select+mean%28%22metric%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1629820739999000000+and+time+%3C+1630252859999000000+and+dim_0%3D%271%27++group+by+time%282m0s%29`: generateData("metric", 0, "dim", 5, 1629861029, 1629861329, 2*time.Minute), - }, log.DefaultLogger) + mockCurl := &curl.MockCurl{} + mockCurl.WithF(func(opt curl.Options) []byte { + res := map[string]string{ + `http://127.0.0.1:80/query?db=2_bkmonitor_time_series_1582626&q=select+count%28%22value%22%29+as+_value%2C+time+as+_time+from+container_cpu_system_seconds_total+where+time+%3E+1669717379999000000+and+time+%3C+1669717739999000000+and+bcs_cluster_id%3D%27BCS-K8S-40949%27++group+by+time%281m0s%29+tz%28%27UTC%27%29`: ``, + `http://127.0.0.1/api/query_range?end=1669717680&query=count%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"35895"],[1669717440,"35900"],[1669717500,"39424"],[1669717560,"41380"],[1669717620,"43604"],[1669717680,"42659"]]}]}}`, + `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29+%2B+count%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"70639"],[1669717440,"74007"],[1669717500,"79092"],[1669717560,"83808"],[1669717620,"85899"],[1669717680,"85261"]]}]}}`, + `http://127.0.0.1/api/query_range?end=1669717680&query=sum+by%28pod_name%29+%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cpod_name%3D~%22actor.%2A%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{"pod_name":"actor-train-train-11291730-bot-1f42-0"},"values":[[1669717380,"2"],[1669717560,"2"],[1669717620,"2"],[1669717680,"2"]]}]}}`, + `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"34744"],[1669717440,"38107"],[1669717500,"39668"],[1669717560,"42428"],[1669717620,"42295"],[1669717680,"42602"]]}]}}`, + `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbk_biz_id%3D%22930%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"34744"],[1669717440,"38107"],[1669717500,"39668"],[1669717560,"42428"],[1669717620,"42295"],[1669717680,"42602"]]}]}}`, + `http://127.0.0.1/api/query_range?end=1669717680&query=sum%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbk_biz_id%3D%22930%22%7D%5B1m%5D+offset+-59s999ms%29%29+%2F+count%28count_over_time%28container_cpu_system_seconds_total_value%7Bbcs_cluster_id%3D%22BCS-K8S-40949%22%2Cbcs_cluster_id%3D%22BCS-K8S-40949%22%7D%5B1m%5D+offset+-59s999ms%29%29&start=1669717380&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669717380,"1"],[1669717440,"1"],[1669717500,"1"],[1669717560,"1"],[1669717620,"1"],[1669717680,"1"]]}]}}`, + `http://127.0.0.1:80/query?db=system&q=select+mean%28%22metric%22%29+as+_value%2C+time+as+_time+from+cpu_summary+where+time+%3E+1629820739999000000+and+time+%3C+1630252859999000000+and+dim_0%3D%271%27++group+by+time%282m0s%29`: generateData("metric", 0, "dim", 5, 1629861029, 1629861329, 2*time.Minute), + } + if v, ok := res[opt.UrlPath]; ok { + return []byte(v) + } + return nil + }) inst, _ := tsdbInfluxdb.NewInstance( context.TODO(), - tsdbInfluxdb.Options{ + &tsdbInfluxdb.Options{ Host: "127.0.0.1", Port: 80, Curl: mockCurl, diff --git a/pkg/unify-query/tsdb/bksql/client.go b/pkg/unify-query/tsdb/bksql/client.go index 93cf8368b..8aa650c36 100644 --- a/pkg/unify-query/tsdb/bksql/client.go +++ b/pkg/unify-query/tsdb/bksql/client.go @@ -12,8 +12,10 @@ package bksql import ( "context" "encoding/json" + "fmt" "time" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/bkapi" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/consul" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" @@ -44,13 +46,19 @@ func (c *Client) WithHeader(headers map[string]string) *Client { } func (c *Client) curlGet(ctx context.Context, method, sql string, res *Result, span *trace.Span) error { + if sql == "" { + return fmt.Errorf("query sql is empty") + } + if method == "" { method = curl.Post } - params := &Params{} + params := make(map[string]string) + params["sql"] = sql - if sql != "" { - params.SQL = sql + // body 增加 bkdata auth 信息 + for k, v := range bkapi.GetBkDataAPI().GetDataAuth() { + params[k] = v } body, err := json.Marshal(params) @@ -64,7 +72,7 @@ func (c *Client) curlGet(ctx context.Context, method, sql string, res *Result, s curl.Options{ UrlPath: c.url, Body: body, - Headers: c.headers, + Headers: metadata.Headers(ctx, c.headers), }, res, ) diff --git a/pkg/unify-query/tsdb/bksql/instance.go b/pkg/unify-query/tsdb/bksql/instance.go index 8fd6e005e..5527eb2b0 100644 --- a/pkg/unify-query/tsdb/bksql/instance.go +++ b/pkg/unify-query/tsdb/bksql/instance.go @@ -58,25 +58,23 @@ type Options struct { Address string Headers map[string]string - Timeout time.Duration - IntervalTime time.Duration - MaxLimit int - Tolerance int + Timeout time.Duration + MaxLimit int + Tolerance int Curl curl.Curl } -func NewInstance(ctx context.Context, opt Options) (*Instance, error) { +func NewInstance(ctx context.Context, opt *Options) (*Instance, error) { if opt.Address == "" { return nil, fmt.Errorf("address is empty") } instance := &Instance{ - ctx: ctx, - timeout: opt.Timeout, - intervalTime: opt.IntervalTime, - maxLimit: opt.MaxLimit, - tolerance: opt.Tolerance, - client: (&Client{}).WithUrl(opt.Address).WithHeader(opt.Headers).WithCurl(opt.Curl), + ctx: ctx, + timeout: opt.Timeout, + maxLimit: opt.MaxLimit, + tolerance: opt.Tolerance, + client: (&Client{}).WithUrl(opt.Address).WithHeader(opt.Headers).WithCurl(opt.Curl), } return instance, nil } diff --git a/pkg/unify-query/tsdb/bksql/instance_test.go b/pkg/unify-query/tsdb/bksql/instance_test.go index f7b885f71..e7fc11224 100644 --- a/pkg/unify-query/tsdb/bksql/instance_test.go +++ b/pkg/unify-query/tsdb/bksql/instance_test.go @@ -27,15 +27,19 @@ func TestInstance_QueryRaw(t *testing.T) { ctx := context.Background() mock.Init() - cli := mockClient() - - ins := &Instance{ - Ctx: ctx, - IntervalTime: 3e2 * time.Millisecond, - Timeout: 3e1 * time.Second, - Client: cli, - MaxLimit: 1e4, - Tolerance: 5, + + ins, err := NewInstance(ctx, Options{ + Address: "localhost", + Timeout: time.Minute, + MaxLimit: 1e4, + Tolerance: 5, + }) + if err != nil { + log.Fatalf(ctx, err.Error()) + } + ins.client = mockClient() + if err != nil { + log.Fatalf(ctx, err.Error()) } end := time.Now() start := end.Add(time.Minute * -5) @@ -178,10 +182,7 @@ func TestInstance_bkSql(t *testing.T) { }, } - ins := Instance{ - MaxLimit: 2e5, - Tolerance: 5, - } + ins := &Instance{} for i, c := range testCases { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { diff --git a/pkg/unify-query/tsdb/elasticsearch/instance.go b/pkg/unify-query/tsdb/elasticsearch/instance.go index eece42578..00c0fffcb 100644 --- a/pkg/unify-query/tsdb/elasticsearch/instance.go +++ b/pkg/unify-query/tsdb/elasticsearch/instance.go @@ -41,12 +41,18 @@ import ( var _ tsdb.Instance = (*Instance)(nil) type Instance struct { - ctx context.Context - wg sync.WaitGroup - client *elastic.Client + ctx context.Context + wg sync.WaitGroup lock sync.Mutex + address string + username string + password string + healthCheck bool + + headers map[string]string + timeout time.Duration maxSize int @@ -54,41 +60,6 @@ type Instance struct { toProm func(string) string } -// QueryRange 使用 es 直接查询引擎 -func (i *Instance) QueryRange(ctx context.Context, referenceName string, start, end time.Time, step time.Duration) (promql.Matrix, error) { - //TODO implement me - panic("implement me") -} - -func (i *Instance) Query(ctx context.Context, qs string, end time.Time) (promql.Vector, error) { - //TODO implement me - panic("implement me") -} - -func (i *Instance) QueryExemplar(ctx context.Context, fields []string, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) (*decoder.Response, error) { - //TODO implement me - panic("implement me") -} - -func (i *Instance) LabelNames(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (i *Instance) LabelValues(ctx context.Context, query *metadata.Query, name string, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { - //TODO implement me - panic("implement me") -} - -func (i *Instance) Series(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) storage.SeriesSet { - //TODO implement me - panic("implement me") -} - -func (i *Instance) GetInstanceType() string { - return consul.ElasticsearchStorageType -} - type InstanceOption struct { Address string Username string @@ -96,7 +67,7 @@ type InstanceOption struct { MaxSize int MaxRouting int Timeout time.Duration - Headers http.Header + Headers map[string]string HealthCheck bool } @@ -125,46 +96,59 @@ var TimeSeriesResultPool = sync.Pool{ func NewInstance(ctx context.Context, opt *InstanceOption) (*Instance, error) { ins := &Instance{ ctx: ctx, - timeout: opt.Timeout, maxSize: opt.MaxSize, - toEs: structured.QueryRawFormat(ctx), - toProm: structured.PromQueryFormat(ctx), + + address: opt.Address, + username: opt.Username, + password: opt.Password, + headers: opt.Headers, + healthCheck: opt.HealthCheck, + timeout: opt.Timeout, + + toEs: structured.QueryRawFormat(ctx), + toProm: structured.PromQueryFormat(ctx), } if opt.Address == "" { return ins, errors.New("empty es client options") } + if opt.MaxRouting > 0 { + err := pool.Tune(opt.MaxRouting) + if err != nil { + return ins, err + } + } + + return ins, nil +} + +func (i *Instance) getClient(ctx context.Context) (*elastic.Client, error) { cliOpts := []elastic.ClientOptionFunc{ - elastic.SetURL(opt.Address), + elastic.SetURL(i.address), elastic.SetSniff(false), - elastic.SetHealthcheck(opt.HealthCheck), + elastic.SetHealthcheck(i.healthCheck), } - if len(opt.Headers) > 0 { - cliOpts = append(cliOpts, elastic.SetHeaders(opt.Headers)) + ctx, cancel := context.WithTimeout(ctx, i.timeout) + defer cancel() + + headers := metadata.Headers(ctx, i.headers) + if len(headers) > 0 { + httpHeaders := make(http.Header, len(headers)) + for k, v := range headers { + httpHeaders[k] = []string{v} + } + cliOpts = append(cliOpts, elastic.SetHeaders(httpHeaders)) } - if opt.Username != "" && opt.Password != "" { + if i.username != "" && i.password != "" { cliOpts = append( cliOpts, - elastic.SetBasicAuth(opt.Username, opt.Password), + elastic.SetBasicAuth(i.username, i.password), ) } - cli, err := elastic.NewClient(cliOpts...) - if err != nil { - return ins, err - } - - if opt.MaxRouting > 0 { - err = pool.Tune(opt.MaxRouting) - if err != nil { - return ins, err - } - } - - ins.client = cli - return ins, nil + return elastic.DialContext(ctx, cliOpts...) } func (i *Instance) Check(ctx context.Context, promql string, start, end time.Time, step time.Duration) string { @@ -187,7 +171,11 @@ func (i *Instance) getMappings(ctx context.Context, aliases []string) ([]map[str }() span.Set("alias", aliases) - mappingMap, err := i.client.GetMapping().Index(aliases...).Type("").Do(ctx) + client, err := i.getClient(ctx) + if err != nil { + return nil, err + } + mappingMap, err := client.GetMapping().Index(aliases...).Type("").Do(ctx) indexes := make([]string, 0, len(mappingMap)) for index := range mappingMap { @@ -294,8 +282,12 @@ func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFac log.Infof(ctx, "elasticsearch-query indexes: %s", qo.indexes) log.Infof(ctx, "elasticsearch-query body: %s", bodyString) - startAnaylize := time.Now() - search := i.client.Search().Index(qo.indexes...).SearchSource(source) + startAnalyze := time.Now() + client, err := i.getClient(ctx) + if err != nil { + return nil, err + } + search := client.Search().Index(qo.indexes...).SearchSource(source) res, err := search.Do(ctx) @@ -314,7 +306,7 @@ func (i *Instance) esQuery(ctx context.Context, qo *queryOption, fact *FormatFac } } - queryCost := time.Since(startAnaylize) + queryCost := time.Since(startAnalyze) span.Set("query-cost", queryCost.String()) metric.TsDBRequestSecond( ctx, queryCost, user.SpaceUid, consul.ElasticsearchStorageType, @@ -397,7 +389,11 @@ func (i *Instance) queryWithoutAgg(ctx context.Context, qo *queryOption, fact *F } func (i *Instance) getIndexes(ctx context.Context, aliases ...string) ([]string, error) { - catAlias, err := i.client.CatAliases().Alias(aliases...).Do(ctx) + client, err := i.getClient(ctx) + if err != nil { + return nil, err + } + catAlias, err := client.CatAliases().Alias(aliases...).Do(ctx) if err != nil { return nil, err } @@ -418,7 +414,11 @@ func (i *Instance) getIndexes(ctx context.Context, aliases ...string) ([]string, } func (i *Instance) indexOption(ctx context.Context, index string) (docCount int64, storeSize int64, err error) { - cats, err := i.client.CatIndices().Index(index).Do(ctx) + client, err := i.getClient(ctx) + if err != nil { + return + } + cats, err := client.CatIndices().Index(index).Do(ctx) if err != nil { return } @@ -573,10 +573,6 @@ func (i *Instance) QueryRaw( ctx, span := trace.NewSpan(ctx, "elasticsearch-query-reference") defer span.End(&err) - if i.client == nil { - return storage.ErrSeriesSet(fmt.Errorf("es client is nil")) - } - rets := make(chan *TimeSeriesResult, 1) go func() { @@ -658,3 +654,38 @@ func (i *Instance) QueryRaw( return remote.FromQueryResult(false, qr) } + +// QueryRange 使用 es 直接查询引擎 +func (i *Instance) QueryRange(ctx context.Context, referenceName string, start, end time.Time, step time.Duration) (promql.Matrix, error) { + //TODO implement me + panic("implement me") +} + +func (i *Instance) Query(ctx context.Context, qs string, end time.Time) (promql.Vector, error) { + //TODO implement me + panic("implement me") +} + +func (i *Instance) QueryExemplar(ctx context.Context, fields []string, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) (*decoder.Response, error) { + //TODO implement me + panic("implement me") +} + +func (i *Instance) LabelNames(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { + //TODO implement me + panic("implement me") +} + +func (i *Instance) LabelValues(ctx context.Context, query *metadata.Query, name string, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { + //TODO implement me + panic("implement me") +} + +func (i *Instance) Series(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) storage.SeriesSet { + //TODO implement me + panic("implement me") +} + +func (i *Instance) GetInstanceType() string { + return consul.ElasticsearchStorageType +} diff --git a/pkg/unify-query/tsdb/elasticsearch/instance_test.go b/pkg/unify-query/tsdb/elasticsearch/instance_test.go index ee3e4c05e..915e4c54a 100644 --- a/pkg/unify-query/tsdb/elasticsearch/instance_test.go +++ b/pkg/unify-query/tsdb/elasticsearch/instance_test.go @@ -45,7 +45,7 @@ func TestInstance_queryReference(t *testing.T) { metadata.GetQueryParams(ctx).SetDataSource(structured.BkLog) if sourceType == "bkdata" { - address = bkapi.GetBkDataApi().QueryEsUrl() + address = bkapi.GetBkDataAPI().QueryEsUrl() } ins, err := NewInstance(ctx, &InstanceOption{ diff --git a/pkg/unify-query/tsdb/influxdb/instance.go b/pkg/unify-query/tsdb/influxdb/instance.go index 50ec74766..2c9ee6458 100644 --- a/pkg/unify-query/tsdb/influxdb/instance.go +++ b/pkg/unify-query/tsdb/influxdb/instance.go @@ -64,15 +64,7 @@ var ( ) // NewInstance 初始化引擎 -func NewInstance(ctx context.Context, opt Options) (*Instance, error) { - headers := map[string]string{} - if opt.Accept != "" { - headers[ContentType] = opt.Accept - } - if opt.AcceptEncoding != "" { - headers[ContentEncoding] = opt.AcceptEncoding - } - +func NewInstance(ctx context.Context, opt *Options) (*Instance, error) { if opt.Host == "" { return nil, fmt.Errorf("host is empty %+v", opt) } @@ -204,9 +196,9 @@ func (i *Instance) QueryExemplar(ctx context.Context, fields []string, query *me ctx, curl.Get, curl.Options{ UrlPath: urlPath, - Headers: map[string]string{ + Headers: metadata.Headers(ctx, map[string]string{ ContentType: i.contentType, - }, + }), UserName: i.username, Password: i.password, }, diff --git a/pkg/unify-query/tsdb/influxdb/instance_test.go b/pkg/unify-query/tsdb/influxdb/instance_test.go index 9b77c6415..79ca9baf8 100644 --- a/pkg/unify-query/tsdb/influxdb/instance_test.go +++ b/pkg/unify-query/tsdb/influxdb/instance_test.go @@ -83,7 +83,7 @@ func TestInstance_MakeSQL(t *testing.T) { } start := time.UnixMilli(1718175000000) end := time.UnixMilli(1718175600000) - option := Options{ + option := &Options{ Host: "127.0.0.1", Port: 80, Timeout: time.Hour, diff --git a/pkg/unify-query/tsdb/prometheus/querier.go b/pkg/unify-query/tsdb/prometheus/querier.go index 47b699e40..4858c95f7 100644 --- a/pkg/unify-query/tsdb/prometheus/querier.go +++ b/pkg/unify-query/tsdb/prometheus/querier.go @@ -20,21 +20,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/bkapi" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/consul" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" - baseInfluxdb "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/influxdb" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/query/structured" - tsDBService "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/service/tsdb" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/trace" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/bksql" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/elasticsearch" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/influxdb" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/victoriaMetrics" - routerInfluxdb "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/router/influxdb" ) const ( @@ -377,119 +366,3 @@ func (q *Querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War func (q *Querier) Close() error { return nil } - -func GetTsDbInstance(ctx context.Context, qry *metadata.Query) tsdb.Instance { - var ( - instance tsdb.Instance - err error - ) - - ctx, span := trace.NewSpan(ctx, "get-ts-db-instance") - defer func() { - if err != nil { - log.Errorf(ctx, err.Error()) - } - span.End(&err) - }() - - span.Set("storage-id", qry.StorageID) - - // 兼容原逻辑,storageType 通过 storageMap 获取 - stg, err := tsdb.GetStorage(qry.StorageID) - if stg != nil { - qry.StorageType = stg.Type - } - - span.Set("storage-type", qry.StorageType) - curlGet := &curl.HttpCurl{Log: log.DefaultLogger} - - switch qry.StorageType { - case consul.InfluxDBStorageType: - opt := influxdb.Options{ - Timeout: tsDBService.InfluxDBTimeout, - ContentType: tsDBService.InfluxDBContentType, - ChunkSize: tsDBService.InfluxDBChunkSize, - RawUriPath: tsDBService.InfluxDBQueryRawUriPath, - Accept: tsDBService.InfluxDBQueryRawAccept, - AcceptEncoding: tsDBService.InfluxDBQueryRawAcceptEncoding, - MaxLimit: tsDBService.InfluxDBMaxLimit, - MaxSlimit: tsDBService.InfluxDBMaxSLimit, - Tolerance: tsDBService.InfluxDBTolerance, - ReadRateLimit: tsDBService.InfluxDBQueryReadRateLimit, - Curl: curlGet, - } - var host *routerInfluxdb.Host - host, err = baseInfluxdb.GetInfluxDBRouter().GetInfluxDBHost( - ctx, qry.TagsKey, qry.ClusterName, qry.DB, qry.Measurement, qry.Condition, - ) - if err != nil { - return nil - } - opt.Host = host.DomainName - opt.Port = host.Port - opt.GrpcPort = host.GrpcPort - opt.Protocol = host.Protocol - opt.Username = host.Username - opt.Password = host.Password - // 如果 host 有单独配置,则替换默认限速配置 - if host.ReadRateLimit > 0 { - opt.ReadRateLimit = host.ReadRateLimit - } - - span.Set("cluster-name", qry.ClusterName) - span.Set("tag-keys", qry.TagsKey) - span.Set("ins-option", opt) - - instance, err = influxdb.NewInstance(ctx, opt) - case consul.ElasticsearchStorageType: - opt := &elasticsearch.InstanceOption{ - MaxSize: tsDBService.EsMaxSize, - Timeout: tsDBService.EsTimeout, - MaxRouting: tsDBService.EsMaxRouting, - } - if qry.SourceType == structured.BkData { - opt.Address = bkapi.GetBkDataApi().QueryEsUrl() - opt.Headers = bkapi.GetBkDataApi().HttpHeaders(nil) - opt.HealthCheck = false - } else { - if stg == nil { - err = fmt.Errorf("%s storage is nil in %s", consul.ElasticsearchStorageType, qry.StorageID) - return nil - } - opt.Address = stg.Address - opt.Username = stg.Username - opt.Password = stg.Password - opt.HealthCheck = true - } - instance, err = elasticsearch.NewInstance(ctx, opt) - case consul.BkSqlStorageType: - instance, err = bksql.NewInstance(ctx, bksql.Options{ - Address: bkapi.GetBkDataApi().QuerySyncUrl(), - Headers: bkapi.GetBkDataApi().Headers(map[string]string{ - bksql.ContentType: tsDBService.BkSqlContentType, - }), - Timeout: tsDBService.BkSqlTimeout, - IntervalTime: tsDBService.BkSqlIntervalTime, - MaxLimit: tsDBService.BkSqlLimit, - Tolerance: tsDBService.BkSqlTolerance, - Curl: curlGet, - }) - case consul.VictoriaMetricsStorageType: - instance, err = victoriaMetrics.NewInstance(ctx, victoriaMetrics.Options{ - Address: bkapi.GetBkDataApi().QuerySyncUrl(), - Headers: bkapi.GetBkDataApi().Headers(map[string]string{ - victoriaMetrics.ContentType: tsDBService.VmContentType, - }), - MaxConditionNum: tsDBService.VmMaxConditionNum, - Timeout: tsDBService.VmTimeout, - InfluxCompatible: tsDBService.VmInfluxCompatible, - UseNativeOr: tsDBService.VmUseNativeOr, - Curl: curlGet, - }) - default: - err = fmt.Errorf("sotrage type is error %+v", qry) - return nil - } - - return instance -} diff --git a/pkg/unify-query/tsdb/prometheus/tsdb_instance.go b/pkg/unify-query/tsdb/prometheus/tsdb_instance.go new file mode 100644 index 000000000..38d44b4eb --- /dev/null +++ b/pkg/unify-query/tsdb/prometheus/tsdb_instance.go @@ -0,0 +1,150 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package prometheus + +import ( + "context" + "fmt" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/bkapi" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/consul" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" + baseInfluxdb "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/influxdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/query/structured" + tsDBService "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/service/tsdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/trace" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/bksql" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/elasticsearch" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/influxdb" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb/victoriaMetrics" + routerInfluxdb "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/router/influxdb" +) + +func GetTsDbInstance(ctx context.Context, qry *metadata.Query) tsdb.Instance { + var ( + instance tsdb.Instance + err error + ) + + ctx, span := trace.NewSpan(ctx, "get-ts-db-instance") + defer func() { + if err != nil { + log.Errorf(ctx, err.Error()) + } + span.End(&err) + }() + + span.Set("storage-id", qry.StorageID) + + // 兼容原逻辑,storageType 通过 storageMap 获取 + stg, _ := tsdb.GetStorage(qry.StorageID) + if stg != nil { + span.Set("storage-info", stg) + qry.StorageType = stg.Type + } + if stg != nil && stg.Instance != nil { + return stg.Instance + } + + span.Set("storage-type", qry.StorageType) + curlGet := &curl.HttpCurl{Log: log.DefaultLogger} + + switch qry.StorageType { + case consul.InfluxDBStorageType: + opt := &influxdb.Options{ + Timeout: tsDBService.InfluxDBTimeout, + ContentType: tsDBService.InfluxDBContentType, + ChunkSize: tsDBService.InfluxDBChunkSize, + RawUriPath: tsDBService.InfluxDBQueryRawUriPath, + Accept: tsDBService.InfluxDBQueryRawAccept, + AcceptEncoding: tsDBService.InfluxDBQueryRawAcceptEncoding, + MaxLimit: tsDBService.InfluxDBMaxLimit, + MaxSlimit: tsDBService.InfluxDBMaxSLimit, + Tolerance: tsDBService.InfluxDBTolerance, + ReadRateLimit: tsDBService.InfluxDBQueryReadRateLimit, + Curl: curlGet, + } + var host *routerInfluxdb.Host + host, err = baseInfluxdb.GetInfluxDBRouter().GetInfluxDBHost( + ctx, qry.TagsKey, qry.ClusterName, qry.DB, qry.Measurement, qry.Condition, + ) + if err != nil { + return nil + } + opt.Host = host.DomainName + opt.Port = host.Port + opt.GrpcPort = host.GrpcPort + opt.Protocol = host.Protocol + opt.Username = host.Username + opt.Password = host.Password + // 如果 host 有单独配置,则替换默认限速配置 + if host.ReadRateLimit > 0 { + opt.ReadRateLimit = host.ReadRateLimit + } + + span.Set("cluster-name", qry.ClusterName) + span.Set("tag-keys", qry.TagsKey) + span.Set("ins-option", opt) + + instance, err = influxdb.NewInstance(ctx, opt) + case consul.ElasticsearchStorageType: + opt := &elasticsearch.InstanceOption{ + MaxSize: tsDBService.EsMaxSize, + Timeout: tsDBService.EsTimeout, + MaxRouting: tsDBService.EsMaxRouting, + } + if qry.SourceType == structured.BkData { + opt.Address = bkapi.GetBkDataAPI().QueryEsUrl() + opt.Headers = bkapi.GetBkDataAPI().Headers(nil) + opt.HealthCheck = false + } else { + if stg == nil { + err = fmt.Errorf("%s storage is nil in %s", consul.ElasticsearchStorageType, qry.StorageID) + return nil + } + opt.Address = stg.Address + opt.Username = stg.Username + opt.Password = stg.Password + opt.HealthCheck = true + } + instance, err = elasticsearch.NewInstance(ctx, opt) + case consul.BkSqlStorageType: + instance, err = bksql.NewInstance(ctx, &bksql.Options{ + Address: bkapi.GetBkDataAPI().QuerySyncUrl(), + Headers: bkapi.GetBkDataAPI().Headers(map[string]string{ + bksql.ContentType: tsDBService.BkSqlContentType, + }), + Timeout: tsDBService.BkSqlTimeout, + MaxLimit: tsDBService.BkSqlLimit, + Tolerance: tsDBService.BkSqlTolerance, + Curl: curlGet, + }) + case consul.VictoriaMetricsStorageType: + instance, err = victoriaMetrics.NewInstance(ctx, &victoriaMetrics.Options{ + Address: bkapi.GetBkDataAPI().QuerySyncUrl(), + Headers: bkapi.GetBkDataAPI().Headers(map[string]string{ + victoriaMetrics.ContentType: tsDBService.VmContentType, + }), + MaxConditionNum: tsDBService.VmMaxConditionNum, + Timeout: tsDBService.VmTimeout, + InfluxCompatible: tsDBService.VmInfluxCompatible, + UseNativeOr: tsDBService.VmUseNativeOr, + Curl: curlGet, + }) + default: + err = fmt.Errorf("sotrage type is error %+v", qry) + return nil + } + + return instance +} diff --git a/pkg/unify-query/tsdb/redis/instance.go b/pkg/unify-query/tsdb/redis/instance.go index 990b61c1e..0cdc79c2f 100644 --- a/pkg/unify-query/tsdb/redis/instance.go +++ b/pkg/unify-query/tsdb/redis/instance.go @@ -43,59 +43,59 @@ type Instance struct { ClusterMetricPrefix string } -func (instance *Instance) Check(ctx context.Context, promql string, start, end time.Time, step time.Duration) string { +func (i *Instance) Check(ctx context.Context, promql string, start, end time.Time, step time.Duration) string { //TODO implement me panic("implement me") } -func (instance *Instance) QueryRaw(ctx context.Context, query *metadata.Query, start, end time.Time) storage.SeriesSet { +func (i *Instance) QueryRaw(ctx context.Context, query *metadata.Query, start, end time.Time) storage.SeriesSet { //TODO implement me panic("implement me") } -func (instance *Instance) QueryExemplar(ctx context.Context, fields []string, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) (*decoder.Response, error) { +func (i *Instance) QueryExemplar(ctx context.Context, fields []string, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) (*decoder.Response, error) { //TODO implement me panic("implement me") } -func (instance *Instance) LabelNames(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { +func (i *Instance) LabelNames(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { //TODO implement me panic("implement me") } -func (instance *Instance) LabelValues(ctx context.Context, query *metadata.Query, name string, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { +func (i *Instance) LabelValues(ctx context.Context, query *metadata.Query, name string, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) { //TODO implement me panic("implement me") } -func (instance *Instance) Series(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) storage.SeriesSet { +func (i *Instance) Series(ctx context.Context, query *metadata.Query, start, end time.Time, matchers ...*labels.Matcher) storage.SeriesSet { //TODO implement me panic("implement me") } -func (instance *Instance) GetInstanceType() string { +func (i *Instance) GetInstanceType() string { return consul.RedisStorageType } var _ tsdb.Instance = (*Instance)(nil) -func (instance *Instance) Query(ctx context.Context, qs string, end time.Time) (promql.Vector, error) { - df, err := instance.rawQuery(ctx, time.Time{}, end, time.Duration(0)) +func (i *Instance) Query(ctx context.Context, qs string, end time.Time) (promql.Vector, error) { + df, err := i.rawQuery(ctx, time.Time{}, end, time.Duration(0)) if err != nil { return nil, err } - return instance.vectorFormat(ctx, *df) + return i.vectorFormat(ctx, *df) } -func (instance *Instance) QueryRange(ctx context.Context, promql string, start, end time.Time, step time.Duration) (promql.Matrix, error) { - df, err := instance.rawQuery(ctx, start, end, step) +func (i *Instance) QueryRange(ctx context.Context, promql string, start, end time.Time, step time.Duration) (promql.Matrix, error) { + df, err := i.rawQuery(ctx, start, end, step) if err != nil { return nil, err } - return instance.matrixFormat(ctx, *df) + return i.matrixFormat(ctx, *df) } -func (instance *Instance) rawQuery(ctx context.Context, start, end time.Time, step time.Duration) (*dataframe.DataFrame, error) { +func (i *Instance) rawQuery(ctx context.Context, start, end time.Time, step time.Duration) (*dataframe.DataFrame, error) { var ( startAnaylize time.Time ) @@ -118,10 +118,10 @@ func (instance *Instance) rawQuery(ctx context.Context, start, end time.Time, st if len(clusterNames) == 0 { return nil, errors.Errorf("Dimension(%s) must be passed in query-condition ", ClusterMetricFieldClusterName) } - stoCtx, _ := context.WithTimeout(ctx, instance.Timeout) + stoCtx, _ := context.WithTimeout(ctx, i.Timeout) startAnaylize = time.Now() - sto := MetricStorage{ctx: stoCtx, storagePrefix: instance.ClusterMetricPrefix} + sto := MetricStorage{ctx: stoCtx, storagePrefix: i.ClusterMetricPrefix} metricMeta, err := sto.GetMetricMeta(metricName) if err != nil { // 指标配置不存在,则返回空 DF @@ -139,21 +139,21 @@ func (instance *Instance) rawQuery(ctx context.Context, start, end time.Time, st df = df.RBind(*dfPointer) } } - df = instance.handleDFQuery(df, query, start, end, step) + df = i.handleDFQuery(df, query, start, end, step) if df.Error() != nil { return nil, df.Error() } queryCost := time.Since(startAnaylize) metric.TsDBRequestSecond( - ctx, queryCost, user.SpaceUid, instance.GetInstanceType(), + ctx, queryCost, user.SpaceUid, i.GetInstanceType(), ) return &df, nil } -func (instance *Instance) vectorFormat(ctx context.Context, df dataframe.DataFrame) (promql.Vector, error) { +func (i *Instance) vectorFormat(ctx context.Context, df dataframe.DataFrame) (promql.Vector, error) { vector := make(promql.Vector, 0) - matrix, err := instance.matrixFormat(ctx, df) + matrix, err := i.matrixFormat(ctx, df) if err != nil { return nil, err } @@ -166,7 +166,7 @@ func (instance *Instance) vectorFormat(ctx context.Context, df dataframe.DataFra return vector, nil } -func (instance *Instance) matrixFormat(ctx context.Context, df dataframe.DataFrame) (promql.Matrix, error) { +func (i *Instance) matrixFormat(ctx context.Context, df dataframe.DataFrame) (promql.Matrix, error) { names := df.Names() groupPoints := map[string]promql.Series{} for idx, row := range df.Records() { @@ -227,7 +227,7 @@ func arrToPoint(colNames []string, row []string) (labels.Labels, *promql.Point, } // handleDFQuery 根据传入的查询配置,处理 DF 数据 -func (instance *Instance) handleDFQuery( +func (i *Instance) handleDFQuery( df dataframe.DataFrame, query *metadata.QueryClusterMetric, start, end time.Time, step time.Duration) dataframe.DataFrame { // 时间过滤 df = df.FilterAggregation( diff --git a/pkg/unify-query/tsdb/victoriaMetrics/instance.go b/pkg/unify-query/tsdb/victoriaMetrics/instance.go index 9f1028bd2..98494f8fa 100644 --- a/pkg/unify-query/tsdb/victoriaMetrics/instance.go +++ b/pkg/unify-query/tsdb/victoriaMetrics/instance.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/bkapi" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/consul" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/influxdb/decoder" @@ -79,7 +80,7 @@ type Instance struct { var _ tsdb.Instance = (*Instance)(nil) -func NewInstance(ctx context.Context, opt Options) (*Instance, error) { +func NewInstance(ctx context.Context, opt *Options) (*Instance, error) { if opt.Address == "" { return nil, fmt.Errorf("address is empty") } @@ -341,6 +342,12 @@ func (i *Instance) vmQuery( params := make(map[string]string) params["sql"] = sql params["prefer_storage"] = PreferStorage + + // body 增加 bkdata auth 信息 + for k, v := range bkapi.GetBkDataAPI().GetDataAuth() { + params[k] = v + } + body, err := json.Marshal(params) if err != nil { return err @@ -356,8 +363,10 @@ func (i *Instance) vmQuery( span.Set("query-address", i.url) - headersString, _ := json.Marshal(i.headers) - span.Set("query-headers", headersString) + headers := metadata.Headers(ctx, i.headers) + + headersString, _ := json.Marshal(headers) + span.Set("query-headers", string(headersString)) log.Infof(ctx, "victoria metrics query: %s, headers: %s, body: %s", @@ -369,7 +378,7 @@ func (i *Instance) vmQuery( curl.Options{ UrlPath: i.url, Body: body, - Headers: i.headers, + Headers: headers, }, data, ) diff --git a/pkg/unify-query/tsdb/victoriaMetrics/instance_test.go b/pkg/unify-query/tsdb/victoriaMetrics/instance_test.go index 7b30487cd..816f88eb8 100644 --- a/pkg/unify-query/tsdb/victoriaMetrics/instance_test.go +++ b/pkg/unify-query/tsdb/victoriaMetrics/instance_test.go @@ -10,25 +10,16 @@ package victoriaMetrics import ( - "bufio" "context" "encoding/json" - "errors" - "fmt" - "os" - "strings" - "sync" "testing" "time" - "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/curl" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/mock" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/tsdb" ) const ( @@ -37,479 +28,63 @@ const ( ) var ( - once sync.Once - instance tsdb.Instance -) - -var ( - end = time.Now() - start = end.Add(-10 * time.Minute) + start = time.Unix(0, 0) + end = time.Unix(60*60*6, 0) step = time.Minute - rts = []string{"100147_vm_100768_bkmonitor_time_series_560915"} -) - -func mockData(ctx context.Context) { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: []string{"vm1"}, - }) -} - -func query(ctx context.Context, promql string, rts []string, data map[string]float64) error { - if len(rts) > 0 { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: rts, - }) - res, err := instance.Query(ctx, promql, time.Now()) - if err != nil { - return err - } - if len(res) > 0 { - for _, r := range res { - var ( - metric string - id string - ) - for _, l := range r.Metric { - switch { - case l.Name == "__name__": - metric = l.Value - case l.Name == "bcs_cluster_id": - id = l.Value - default: - panic(fmt.Sprintf("%s=%s", l.Name, l.Value)) - } - } - - if _, ok := data[metric+","+id]; !ok { - data[metric+","+id] = r.V - } - } - - return nil - } - } - return fmt.Errorf("empty data in %+v", rts) -} - -func TestPromQL(t *testing.T) { - ctx := context.Background() - mock.Init() - - once.Do(func() { - instance = &Instance{ - ContentType: "application/json", - InfluxCompatible: true, - UseNativeOr: true, - Timeout: time.Second * 30, - Curl: &curl.HttpCurl{ - Log: log.DefaultLogger, - }, - } - }) - - vectors := []string{ - `kube_node_status_allocatable_cpu_cores_value`, - `kube_node_status_capacity_cpu_cores_value`, - //`kube_pod_container_resource_requests_value{resource="cpu"}`, - `kube_pod_container_resource_requests_cpu_cores_value`, - //`kube_pod_container_resource_limits_value{resource="cpu"}`, - `kube_pod_container_resource_limits_cpu_cores_value`, - } - dims := []string{ - "bcs_cluster_id", - "__name__", - } - - var ( - data = make(map[string]float64) - ) - f, err := os.Open("vmrt.list") - if err != nil { - log.Errorf(ctx, err.Error()) - } - defer f.Close() - - batch := 100 - br := bufio.NewReader(f) - rts := make([]string, 0, batch) - for { - rt, _, readErr := br.ReadLine() - if len(rt) > 0 { - rts = append(rts, string(rt)) - } - - if readErr != nil || len(rts) == batch { - promql := fmt.Sprintf(`sum({__name__=~"%s", result_table_id=~"%s"})`, strings.Join(vectors, "|"), strings.Join(rts, "|")) - if len(dims) > 0 { - promql = fmt.Sprintf(`%s by (%s)`, promql, strings.Join(dims, ", ")) - } - - err = query(ctx, promql, rts, data) - if err != nil { - log.Errorf(ctx, err.Error()) - } - rts = rts[:0] - } - - if readErr != nil { - break - } - } - - file, err := os.Create("output.csv") - if err != nil { - return - } - defer file.Close() - for k, v := range data { - _, err = file.WriteString(fmt.Sprintf("%s,%.f\n", k, v)) - if err != nil { - log.Errorf(ctx, err.Error()) - } - } -} - -func TestRealQueryRange(t *testing.T) { - mock.Init() - ctx := metadata.InitHashID(context.Background()) - - vmRT := "2_bcs_prom_computation_result_table" - metric := "container_cpu_usage_seconds_total" - - a := "a" - metricFilterCondition := map[string]string{ - a: fmt.Sprintf(`__name__="%s_value", result_table_id="%s"`, metric, vmRT), - } - - timeout := time.Minute + url = "http://127.0.0.1/query_engine" + sourceKey = "username:kit" + spaceUid = "space_103" - ins := &Instance{ - ctx: ctx, - Timeout: timeout, - ContentType: "application/json", + resultTableList = []string{"victor_metrics_result_table_1"} + bkDataAuthorization = map[string]string{"bkdata_authentication_method": "user", "bkdata_data_token": "", "bk_username": "admin"} +) - Curl: &curl.HttpCurl{Log: log.DefaultLogger}, +func mockInstance(ctx context.Context, mockCurl *curl.MockCurl) *Instance { + headers := map[string]string{} + instance, _ := NewInstance(ctx, &Options{ + Address: url, + Timeout: time.Minute, + Curl: mockCurl, InfluxCompatible: true, UseNativeOr: true, - } - - testCase := map[string]struct { - q string - e *metadata.VmExpand - }{ - "test_1": { - q: `count(a)`, - e: &metadata.VmExpand{ - ResultTableList: []string{ - vmRT, - }, - // condition 需要进行二次转义 - MetricFilterCondition: metricFilterCondition, - }, - }, - } + Headers: headers, + }) - for n, c := range testCase { - t.Run(n, func(t *testing.T) { - metadata.SetExpand(ctx, c.e) - res, err := ins.Query(ctx, c.q, end) - if err != nil { - panic(err) - } - fmt.Println(res) - }) - } + return instance } -func TestInstance_Query_Url(t *testing.T) { +func mockData(ctx context.Context) { mock.Init() - mockCurl := curl.NewMockCurl(map[string]string{ - `http://127.0.0.1/api/{"sql":"{\"influx_compatible\":false,\"use_native_or\":false,\"api_type\":\"query\",\"cluster_name\":\"\",\"api_params\":{\"query\":\"count(container_cpu_system_seconds_total_value)\",\"time\":1669600800,\"timeout\":0},\"result_table_list\":[\"vm1\"],\"metric_filter_condition\":null}","bkdata_authentication_method":"","bk_app_code":"","prefer_storage":"vm","bkdata_data_token":""}`: `{ - "result": true, - "message": "成功", - "code": "00", - "data": { - "list": [ - { - "status": "success", - "isPartial": false, - "data": { - "resultType": "vector", - "result": [ - { - "metric": {}, - "value": [ - 1716522171, - "169.52247191011236" - ] - } - ] - }, - "stats": { - "seriesFetched": "40" - } - } - ], - "select_fields_order": [], - "sql": "count(container_cpu_system_seconds_total_value)", - "total_record_size": 1704, - "device": "vm" - } -}`, - `http://127.0.0.1/api/{"sql":"{\"influx_compatible\":false,\"use_native_or\":false,\"api_type\":\"query\",\"cluster_name\":\"\",\"api_params\":{\"query\":\"count by (__bk_db__, bk_biz_id, bcs_cluster_id) (container_cpu_system_seconds_total_value{})\",\"time\":1669600800,\"timeout\":0},\"result_table_list\":[\"vm1\"],\"metric_filter_condition\":null}","bkdata_authentication_method":"","bk_app_code":"","prefer_storage":"vm","bkdata_data_token":""}`: `{ -"result": true, - "message": "成功", - "code": "00", - "data": { - "list": [ - {"status":"success","isPartial":false,"data":{"resultType":"vector","result":[{"metric":{"__bk_db__":"mydb","bcs_cluster_id":"BCS-K8S-40949","bk_biz_id":"930"},"value":[1669600800,"31949"]}]}} - ], - "select_fields_order": [], - "sql": "count(container_cpu_system_seconds_total_value)", - "total_record_size": 1704, - "device": "vm" - } -}`, - `http://127.0.0.1/api/{"sql":"{\"influx_compatible\":false,\"use_native_or\":false,\"api_type\":\"query\",\"cluster_name\":\"\",\"api_params\":{\"query\":\"sum(111gggggggggggggggg11\",\"time\":1669600800,\"timeout\":0},\"result_table_list\":[\"vm1\"],\"metric_filter_condition\":null}","bkdata_authentication_method":"","bk_app_code":"","prefer_storage":"vm","bkdata_data_token":""}`: `{ - "result": false, - "message": "BKPromqlApi 接口调用异常", - "code": "1532618", - "data": null, - "errors": { - "error": "Failed to convert promql with influx filter" - } -}`, - }, log.DefaultLogger) - - ctx := metadata.InitHashID(context.Background()) - ins := &Instance{ - ctx: ctx, - Curl: mockCurl, - } - mockData(ctx) - - endTime, _ := time.ParseInLocation(ParseTime, TestTime, time.Local) - - testCases := map[string]struct { - promql string - expected string - err error - }{ - "count": { - promql: `count(container_cpu_system_seconds_total_value)`, - expected: `[{"metric":{},"value":[1716522171,"169.52247191011236"]}]`, - }, - "count rate metric": { - promql: `count by (__bk_db__, bk_biz_id, bcs_cluster_id) (container_cpu_system_seconds_total_value{})`, - expected: `[{"metric":{"__bk_db__":"mydb","bcs_cluster_id":"BCS-K8S-40949","bk_biz_id":"930"},"value":[1669600800,"31949"]}]`, - }, - "error metric 1": { - promql: `sum(111gggggggggggggggg11`, - err: errors.New(`BKPromqlApi 接口调用异常, Failed to convert promql with influx filter, `), - }, - } - - for name, c := range testCases { - t.Run(name, func(t *testing.T) { - data, err := ins.Query(ctx, c.promql, endTime) - if c.err != nil { - assert.Equal(t, c.err, err) - } else { - assert.Nil(t, err) - res, err1 := json.Marshal(data) - assert.Nil(t, err1) - assert.Equal(t, c.expected, string(res)) - } - - }) - } + metadata.SetExpand(ctx, &metadata.VmExpand{ + ResultTableList: resultTableList, + }) + metadata.SetUser(ctx, sourceKey, spaceUid, "") } -func TestInstance_QueryRange_Url(t *testing.T) { - log.InitTestLogger() +func TestOptions(t *testing.T) { ctx := context.Background() - - mockCurl := curl.NewMockCurl(map[string]string{ - `http://127.0.0.1/api/query_range?end=1669600800&query=count%28kube_pod_container_resource_limits_value%29&start=1669600500&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{},"values":[[1669600500,"61305"],[1669600560,"61305"],[1669600620,"61305"],[1669600680,"61311"],[1669600740,"61311"],[1669600800,"61314"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669600800&query=count+by+%28__bk_db__%2C+bk_biz_id%2C+bcs_cluster_id%29+%28container_cpu_system_seconds_total_value%7B%7D%29&start=1669600500&step=60`: `{"status":"success","isPartial":false,"data":{"resultType":"matrix","result":[{"metric":{"__bk_db__":"mydb","bcs_cluster_id":"BCS-K8S-40949","bk_biz_id":"930"},"values":[[1669600500,"31949"],[1669600560,"31949"],[1669600620,"31949"],[1669600680,"31949"],[1669600740,"31949"],[1669600800,"31949"]]}]}}`, - `http://127.0.0.1/api/query_range?end=1669600800&query=sum%28111gggggggggggggggg11&start=1669600500&step=60`: `{"status":"error","errorType":"422","error":"error when executing query=\"sum(111gggggggggggggggg11\" on the time range (start=1669600500000, end=1669600800000, step=60000): argList: unexpected token \"gggggggggggggggg11\"; want \",\", \")\"; unparsed data: \"gggggggggggggggg11\""}`, - `http://127.0.0.1/api/query_range?end=1669600800&query=top%28sum%28kube_pod_container_resource_limits_value%29%29&start=1669600500&step=60`: `{"status":"error","errorType":"422","error":"unknown func \"top\""}`, - }, log.DefaultLogger) - - ins := &Instance{ - ctx: ctx, - Timeout: time.Minute, - Curl: mockCurl, - } + ctx = metadata.InitHashID(ctx) mockData(ctx) - leftTime := time.Minute * -5 + mockCurl := &curl.MockCurl{} + instance := mockInstance(ctx, mockCurl) - endTime, _ := time.ParseInLocation(ParseTime, TestTime, time.Local) - startTime := endTime.Add(leftTime) - stepTime := time.Minute + q := "count(my_metric)" + _, _ = instance.QueryRange(ctx, q, start, end, step) - testCases := map[string]struct { - promql string - expected string - err error - }{ - "count": { - promql: `count(kube_pod_container_resource_limits_value)`, - expected: `[{"metric":{},"values":[[1669600500,"61305"],[1669600560,"61305"],[1669600620,"61305"],[1669600680,"61311"],[1669600740,"61311"],[1669600800,"61314"]]}]`, - }, - "count rate metric": { - promql: `count by (__bk_db__, bk_biz_id, bcs_cluster_id) (container_cpu_system_seconds_total_value{})`, - expected: `[{"metric":{"__bk_db__":"mydb","bcs_cluster_id":"BCS-K8S-40949","bk_biz_id":"930"},"values":[[1669600500,"31949"],[1669600560,"31949"],[1669600620,"31949"],[1669600680,"31949"],[1669600740,"31949"],[1669600800,"31949"]]}]`, - }, - "error metric 1": { - promql: `sum(111gggggggggggggggg11`, - err: errors.New(`error when executing query="sum(111gggggggggggggggg11" on the time range (start=1669600500000, end=1669600800000, step=60000): argList: unexpected token "gggggggggggggggg11"; want ",", ")"; unparsed data: "gggggggggggggggg11"`), - }, - "error metric 2": { - promql: `top(sum(kube_pod_container_resource_limits_value))`, - err: errors.New(`unknown func "top"`), - }, - } + assert.Equal(t, mockCurl.Opts.Headers[metadata.SpaceUIDHeader], spaceUid) + assert.Equal(t, mockCurl.Opts.Headers[metadata.BkQuerySourceHeader], sourceKey) - for name, c := range testCases { - t.Run(name, func(t *testing.T) { - data, err := ins.QueryRange(ctx, c.promql, startTime, endTime, stepTime) - if c.err != nil { - assert.Equal(t, c.err, err) - } else { - assert.Nil(t, err) - res, err1 := json.Marshal(data) - assert.Nil(t, err1) - assert.Equal(t, c.expected, string(res)) - } + params := make(map[string]string) + err := json.Unmarshal(mockCurl.Opts.Body, ¶ms) + assert.Nil(t, err) - }) - } -} - -func mockInstance(ctx context.Context) { - instance = &Instance{ - ctx: ctx, - ContentType: "application/json", - InfluxCompatible: true, - UseNativeOr: true, - Timeout: time.Minute, - Curl: &curl.HttpCurl{Log: log.DefaultLogger}, - } -} - -func TestInstance_QueryRange(t *testing.T) { - ctx := context.Background() - mock.Init() - mockInstance(ctx) - - for i, c := range []struct { - promQL string - filters map[string]string - }{ - { - promQL: `count(a[1m] offset -59s999ms) by (bcs_cluster_id)`, - filters: map[string]string{ - `a`: `result_table_id="100147_vm_100768_bkmonitor_time_series_560915", __name__="container_cpu_usage_seconds_total_value", bcs_cluster_id="BCS-K8S-41264"`, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: rts, - MetricFilterCondition: c.filters, - }) - res, err := instance.QueryRange(ctx, c.promQL, start, end, step) - assert.Nil(t, err) - log.Infof(ctx, "%+v", res) - }) - } -} - -func TestInstance_Query(t *testing.T) { - ctx := context.Background() - mock.Init() - mockInstance(ctx) - - for i, c := range []struct { - promQL string - filters map[string]string - }{ - { - promQL: `count(a[1m] offset -59s999ms) by (bcs_cluster_id)`, - filters: map[string]string{ - `a`: `result_table_id="100147_vm_100768_bkmonitor_time_series_560915", __name__="container_cpu_usage_seconds_total_value", bcs_cluster_id="BCS-K8S-41264"`, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: rts, - MetricFilterCondition: c.filters, - }) - res, err := instance.Query(ctx, c.promQL, end) - assert.Nil(t, err) - log.Infof(ctx, "%+v", res) - }) - } -} - -func TestInstance_LabelNames(t *testing.T) { - ctx := context.Background() - mock.Init() - mockInstance(ctx) - - lbl, _ := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "a") - - for i, c := range []struct { - filters map[string]string - }{ - { - filters: map[string]string{ - `a`: `result_table_id="100147_vm_100768_bkmonitor_time_series_560915", __name__="container_cpu_usage_seconds_total_value", bcs_cluster_id="BCS-K8S-41264"`, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: rts, - MetricFilterCondition: c.filters, - }) - res, err := instance.LabelNames(ctx, nil, start, end, lbl) - assert.Nil(t, err) - log.Infof(ctx, "%+v", res) - }) - } -} - -func TestInstance_LabelValues(t *testing.T) { - ctx := context.Background() - mock.Init() - mockInstance(ctx) - - lbl, _ := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "a") - - for i, c := range []struct { - filters map[string]string - }{ - { - filters: map[string]string{ - `a`: `result_table_id="100147_vm_100768_bkmonitor_time_series_560915", __name__="container_cpu_usage_seconds_total_value", bcs_cluster_id="BCS-K8S-41264"`, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - metadata.SetExpand(ctx, &metadata.VmExpand{ - ResultTableList: rts, - MetricFilterCondition: c.filters, - }) - res, err := instance.LabelValues(ctx, nil, "namespace", start, end, lbl) - assert.Nil(t, err) - log.Infof(ctx, "%+v", res) - }) + if err == nil { + for k, v := range bkDataAuthorization { + assert.Equal(t, params[k], v) + } } }