Skip to content

Commit

Permalink
feat: BMW:数据源刷新任务,在对单个数据源执行操作时,从DB中刷新获取最新记录并进行double check,实现数据强一致性 -…
Browse files Browse the repository at this point in the history
…-story=121088968 (#654)
  • Loading branch information
EASYGOING45 authored Dec 6, 2024
1 parent d8565de commit ca70e93
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions pkg/bk-monitor-worker/internal/metadata/task/config_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,15 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
}
}

// 初次筛选数据源
var dataSourceList []resulttable.DataSource
// data id 数量可控,先不拆分;仅刷新未迁移到计算平台的数据源 ID 及通过 gse 创建的数据源 ID
if err := resulttable.NewDataSourceQuerySet(db).CreatedFromEq(common.DataIdFromBkGse).IsEnableEq(true).
BkDataIdIn(dataIdList...).OrderDescByLastModifyTime().All(&dataSourceList); err != nil {
if err := resulttable.NewDataSourceQuerySet(db).
CreatedFromEq(common.DataIdFromBkGse).
IsEnableEq(true).
BkDataIdIn(dataIdList...).
OrderDescByLastModifyTime().
All(&dataSourceList); err != nil {
logger.Errorf("RefreshDatasource: query datasource record error, %v", err)
return err
}
Expand All @@ -254,17 +259,35 @@ func RefreshDatasource(ctx context.Context, t *t.Task) error {
return nil
}

// 协程控制
wg := &sync.WaitGroup{}
ch := make(chan struct{}, GetGoroutineLimit("refresh_datasource"))
wg.Add(len(dataSourceList))

for _, dataSource := range dataSourceList {
ch <- struct{}{}
go func(ds resulttable.DataSource, wg *sync.WaitGroup, ch chan struct{}) {
defer func() {
<-ch
wg.Done()
}()
dsSvc := service.NewDataSourceSvc(&ds)
// 处理单个数据源前,刷新以从DB中获取最新数据,进行double check
var latestDataSource resulttable.DataSource
if err := resulttable.NewDataSourceQuerySet(db).
BkDataIdEq(ds.BkDataId). // 根据数据ID精确查询
Select("bk_data_id", "is_enable", "created_from", "last_modify_time").
One(&latestDataSource); err != nil {
logger.Warnf("RefreshDatasource: data_id [%v] not found or query error, skip", ds.BkDataId)
return
}

// double check bkdata v4数据源不应进行刷新
if !latestDataSource.IsEnable || latestDataSource.CreatedFrom == common.DataIdFromBkData {
logger.Warnf("RefreshDatasource: data_id [%v] is not enable or created from bkdata, skip", ds.BkDataId)
return
}

dsSvc := service.NewDataSourceSvc(&latestDataSource)
consulClient, err := consul.GetInstance()
if err != nil {
logger.Errorf("RefreshDatasource: data_id [%v] failed to get consul client, %v,skip", dsSvc.BkDataId, err)
Expand Down

0 comments on commit ca70e93

Please sign in to comment.