Skip to content

Commit

Permalink
feat: 支持 cmdbmeta v2 数据格式 (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Dec 15, 2023
1 parent e8076ed commit 8d355af
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
74 changes: 65 additions & 9 deletions pkg/transfer/template/etl/formatter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,73 @@ func MetricsCutterHandler(record *define.ETLRecord, next define.ETLRecordHandler
return nil
}

func TransferRecordCutterByExtraMetaCreator(store define.Store, enable bool) define.ETLRecordChainingHandler {
if !enable {
func tryDecodeExtraMeta(s string) ([]map[string]string, error) {
type V1Meta struct {
Common map[string]string `json:"common"`
Custom []map[string]string `json:"custom"`
}

parseV0Meta := func(b []byte) ([]map[string]string, error) {
ret := make([]map[string]string, 0)
err := json.Unmarshal(b, &ret)
if err != nil {
return nil, err
}
return ret, nil
}

parseV1Meta := func(b []byte) ([]map[string]string, error) {
var v1Meta V1Meta
if err := json.Unmarshal(b, &v1Meta); err != nil {
return nil, err
}
ret := make([]map[string]string, 0)
for _, custom := range v1Meta.Custom {
item := make(map[string]string)
for k, v := range custom {
item[k] = v
}
for k, v := range v1Meta.Common {
item[k] = v
}
ret = append(ret, item)
}
return ret, nil
}

type tryV1 struct {
Version string `json:"version"`
}

var tryv1 tryV1
var ret []map[string]string

// 尝试用最小代价解析 version 字段,判断其是否为 v1 格式
// 不同版本格式
// v0: []map[string]string
// v1: V1Meta
err := json.Unmarshal([]byte(s), &tryv1)
if err == nil && tryv1.Version == "v1" {
ret, err = parseV1Meta([]byte(s))
} else {
ret, err = parseV0Meta([]byte(s))
}

if err != nil {
return nil, err
}
if len(ret) <= 0 {
return nil, errors.New("empty extra meta record items")
}
return ret, nil
}

func TransferRecordCutterByExtraMetaCreator(store define.Store, enabled bool) define.ETLRecordChainingHandler {
if !enabled {
return nil
}

return func(record *define.ETLRecord, next define.ETLRecordHandler) error {
items := make([]map[string]string, 0)
body, err := fetchExtraMetaResponseStore(record, store)
if err != nil {
return errors.Wrap(err, "failed to fetch extra meta response")
Expand All @@ -377,13 +437,9 @@ func TransferRecordCutterByExtraMetaCreator(store define.Store, enable bool) def
return errors.New("empty extra meta response")
}

err = json.Unmarshal([]byte(body), &items)
items, err := tryDecodeExtraMeta(body)
if err != nil {
return errors.Wrap(err, "failed to decode extra-meta field or empty items")
}

if len(items) <= 0 {
return errors.New("empty extra meta record items")
return err
}

// 维度补充
Expand Down
22 changes: 12 additions & 10 deletions pkg/transfer/template/etl/formatter/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ func (s *HandlerSuite) TestFillBizIDHandlerCreator() {
})
}

func (s *HandlerSuite) TestCutterByDbmMetaMatched() {
func (s *HandlerSuite) TestCutterByDbmMetaV0() {
hostInfo := models.CCHostInfo{
IP: "127.0.0.1",
CloudID: 1,
CCTopoBaseModelInfo: &models.CCTopoBaseModelInfo{
BizID: []int{2},
Topo: []map[string]string{},
},
DbmMeta: `[{"role":"master","cluster":"ssd.nvmessd.dba.db "},{"role":"slave","cluster":"ssd.abcd.dba.db "}]`,
DbmMeta: `[{"role":"master","cluster":"ssd.nvmessd.dba.db"},{"role":"slave","cluster":"ssd.abcd.dba.db"}]`,
}
s.StoreHost(&hostInfo).AnyTimes()
s.Store.EXPECT().Get(gomock.Any()).Return(nil, define.ErrItemNotFound).AnyTimes()
Expand All @@ -244,7 +244,7 @@ func (s *HandlerSuite) TestCutterByDbmMetaMatched() {
dims := record.Dimensions
s.NotNil(dims["role"])
s.NotNil(dims["cluster"])
s.T().Logf("dbm-meta record: %+v", record)
s.T().Logf("dbm-meta/v0 record: %+v", record)
}, []handlerCase{
// 有biz id 无ip cloud
{
Expand All @@ -259,31 +259,33 @@ func (s *HandlerSuite) TestCutterByDbmMetaMatched() {
})
}

func (s *HandlerSuite) TestCutterByDbmMetaMiss() {
func (s *HandlerSuite) TestCutterByDbmMetaV1() {
hostInfo := models.CCHostInfo{
IP: "127.0.0.1",
CloudID: 1,
CCTopoBaseModelInfo: &models.CCTopoBaseModelInfo{
BizID: []int{2},
Topo: []map[string]string{},
},
DbmMeta: `[{"role":"master","cluster":"ssd.nvmessd.dba.db "},{"role":"slave","cluster":"ssd.abcd.dba.db "}]`,
DbmMeta: `{"version":"v1","common":{"region":"gz","status":"prod"},"custom":[{"role":"master","cluster":"ssd.nvmessd.dba.db"},{"role":"slave","cluster":"ssd.abcd.dba.db"}]}`,
}
s.StoreHost(&hostInfo).AnyTimes()
s.Store.EXPECT().Get(gomock.Any()).Return(nil, define.ErrItemNotFound).AnyTimes()

s.runHandler(TransferRecordCutterByExtraMetaCreator(s.Store, true), func(record *define.ETLRecord) {
dims := record.Dimensions
s.Nil(dims["role"])
s.Nil(dims["cluster"])
s.NotNil(dims["role"])
s.NotNil(dims["cluster"])
s.NotNil(dims["region"])
s.NotNil(dims["status"])
s.T().Logf("dbm-meta/v1 record: %+v", record)
}, []handlerCase{
// 有biz id 无ip cloud
{
1, nil, define.ETLRecord{
2, nil, define.ETLRecord{
Dimensions: map[string]interface{}{
define.RecordBizIDFieldName: 3,
define.RecordIPFieldName: "127.0.0.1",
define.RecordCloudIDFieldName: "2",
define.RecordCloudIDFieldName: "1",
},
},
},
Expand Down

0 comments on commit 8d355af

Please sign in to comment.