Skip to content

Commit

Permalink
Merge pull request #297 from tbs60/dev_tming
Browse files Browse the repository at this point in the history
Dev tming
  • Loading branch information
tming authored Sep 24, 2024
2 parents da43513 + d9151d4 commit 619a47b
Show file tree
Hide file tree
Showing 14 changed files with 1,497 additions and 687 deletions.
130 changes: 130 additions & 0 deletions src/backend/booster/bk_dist/common/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
)
Expand Down Expand Up @@ -168,3 +169,132 @@ func (i *Info) Md5() (string, error) {
md5string := fmt.Sprintf("%x", md5hash.Sum(nil))
return md5string, nil
}

var (
fileInfoCacheLock sync.RWMutex
fileInfoCache = map[string]*Info{}
)

func ResetFileInfoCache() {
fileInfoCacheLock.Lock()
defer fileInfoCacheLock.Unlock()

fileInfoCache = map[string]*Info{}
}

// 支持并发read,但会有重复Stat操作,考虑并发和去重的平衡
func GetFileInfo(fs []string, mustexisted bool, notdir bool, statbysearchdir bool) ([]*Info, error) {
// read
fileInfoCacheLock.RLock()
notfound := []string{}
is := make([]*Info, 0, len(fs))
for _, f := range fs {
i, ok := fileInfoCache[f]
if !ok {
notfound = append(notfound, f)
continue
}

if !i.Exist() {
if mustexisted {
// continue
// TODO : return fail if not existed
blog.Warnf("common util: depend file:%s not existed ", f)
return nil, fmt.Errorf("%s not existed", f)
} else {
continue
}
}

if notdir && i.Basic().IsDir() {
continue
}
is = append(is, i)
}
fileInfoCacheLock.RUnlock()

blog.Infof("common util: got %d file stat and %d not found", len(is), len(notfound))
if len(notfound) == 0 {
return is, nil
}

// query
tempis := make(map[string]*Info, len(notfound))
for _, notf := range notfound {
tempf := notf
try := 0
maxtry := 10
for {
var i *Info
if statbysearchdir {
i = GetFileInfoByEnumDir(tempf)
} else {
i = Lstat(tempf)
}
tempis[tempf] = i
try++

if !i.Exist() {
if mustexisted {
// TODO : return fail if not existed
// continue
blog.Warnf("common util: depend file:%s not existed ", tempf)
return nil, fmt.Errorf("%s not existed", tempf)
} else {
// continue
break
}
}

loopagain := false
if i.Basic().Mode()&os.ModeSymlink != 0 {
originFile, err := os.Readlink(tempf)
if err == nil {
if !filepath.IsAbs(originFile) {
originFile, err = filepath.Abs(filepath.Join(filepath.Dir(tempf), originFile))
if err == nil {
i.LinkTarget = originFile
blog.Infof("common util: symlink %s to %s", tempf, originFile)
} else {
blog.Infof("common util: symlink %s origin %s, got abs path error:%s",
tempf, originFile, err)
}
} else {
i.LinkTarget = originFile
blog.Infof("common util: symlink %s to %s", tempf, originFile)
}

// 如果是链接,并且指向了其它文件,则需要将指向的文件也包含进来
// 增加寻找次数限制,避免死循环
if try < maxtry {
loopagain = true
tempf = originFile
}
} else {
blog.Infof("common util: symlink %s Readlink error:%s", tempf, err)
}
}

if notdir && i.Basic().IsDir() {
continue
}

is = append(is, i)

if !loopagain {
break
}
}
}

// write
go func(tempis *map[string]*Info) {
fileInfoCacheLock.Lock()
for f, i := range *tempis {
fileInfoCache[f] = i
}
fileInfoCacheLock.Unlock()
}(&tempis)

return is, nil
}
57 changes: 23 additions & 34 deletions src/backend/booster/bk_dist/common/sdk/toolchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"

dcUtil "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/util"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
)

Expand Down Expand Up @@ -107,6 +108,7 @@ func checkAndAdd(i *dcFile.Info, remotepath string, files *[]FileDesc) error {
Targetrelativepath: remotepath,
Filemode: i.Mode32(),
LinkTarget: i.LinkTarget,
Priority: GetPriority(i),
}

if i.LinkTarget == "" {
Expand Down Expand Up @@ -200,22 +202,6 @@ func (t *Toolchain) ToFileDesc() ([]FileDesc, error) {
// TODO : 将链接展开,直到得到所有相关文件,比如 a->b,b->c,则需要将a/b/c都包含进来
toolfiles := make([]FileDesc, 0, 0)
for _, v := range t.Toolchains {
// existed, fileSize, modifyTime, fileMode := dcFile.Stat(v.ToolLocalFullPath).Batch()
// if !existed {
// err := fmt.Errorf("tool chain file %s not existed", v.ToolLocalFullPath)
// blog.Errorf("%v", err)
// return nil, err
// }

// toolfiles = append(toolfiles, FileDesc{
// FilePath: v.ToolLocalFullPath,
// Compresstype: protocol.CompressLZ4,
// FileSize: fileSize,
// Lastmodifytime: modifyTime,
// Md5: "",
// Targetrelativepath: v.ToolRemoteRelativePath,
// Filemode: fileMode,
// })
files, err := getAssociatedFiles(v.ToolLocalFullPath, v.ToolRemoteRelativePath)
if err != nil {
return nil, err
Expand All @@ -229,23 +215,6 @@ func (t *Toolchain) ToFileDesc() ([]FileDesc, error) {
}

for _, f := range v.Files {
// existed, fileSize, modifyTime, fileMode = dcFile.Stat(f.LocalFullPath).Batch()
// if !existed {
// err := fmt.Errorf("tool chain file %s not existed", f.LocalFullPath)
// blog.Errorf("%v", err)
// return nil, err
// }

// toolfiles = append(toolfiles, FileDesc{
// FilePath: f.LocalFullPath,
// Compresstype: protocol.CompressLZ4,
// FileSize: fileSize,
// Lastmodifytime: modifyTime,
// Md5: "",
// Targetrelativepath: f.RemoteRelativePath,
// Filemode: fileMode,
// })

files, err := getAssociatedFiles(f.LocalFullPath, f.RemoteRelativePath)
if err != nil {
return nil, err
Expand All @@ -260,11 +229,31 @@ func (t *Toolchain) ToFileDesc() ([]FileDesc, error) {
}
}

blog.Debugf("toolchain: get all files:%v", toolfiles)
// 将文件集合中涉及到目录的链接列出来,这种需要提前发送
getAllLinkDirs(&toolfiles)

blog.Infof("toolchain: get all files:%+v", toolfiles)

return toolfiles, nil
}

func getAllLinkDirs(files *[]FileDesc) ([]FileDesc, error) {
lines := make([]string, 0, len(*files))
for _, v := range *files {
lines = append(lines, v.FilePath)
}

uniqlines := dcUtil.UniqArr(lines)
linkdirs := dcUtil.GetAllLinkDir(uniqlines)
if len(linkdirs) > 0 {
for _, v := range linkdirs {
getRecursiveFiles(v, filepath.Dir(v), files)
}
}

return nil, nil
}

func GetAdditionFileKey() string {
return "addition\\file|key"
}
50 changes: 50 additions & 0 deletions src/backend/booster/bk_dist/common/sdk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package sdk
import (
"fmt"
"net"
"os"

dcFile "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/file"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"
dcProtocol "github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/protocol"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/bk_dist/common/syscall"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
)

// RemoteWorker describe the remote worker SDK
Expand Down Expand Up @@ -181,3 +184,50 @@ type BKQuerySlotResult struct {
type BKSlotRspAck struct {
Consumeslotnum int32 `json:"consume_slot_num"`
}

func GetPriority(i *dcFile.Info) FileDescPriority {
isLink := i.Basic().Mode()&os.ModeSymlink != 0
if !isLink {
if i.Basic().IsDir() {
return RealDirPriority
} else {
return RealFilePriority
}
}

// symlink 需要判断是指向文件还是目录
if i.LinkTarget != "" {
targetfs, err := dcFile.GetFileInfo([]string{i.LinkTarget}, true, false, false)
if err == nil && len(targetfs) > 0 {
if targetfs[0].Basic().IsDir() {
return LinkDirPriority
} else {
return LinkFilePriority
}
}
}

// 尝试读文件
linkpath := i.Path()
targetPath, err := os.Readlink(linkpath)
if err != nil {
blog.Infof("common util: Error reading symbolic link: %v", err)
return LinkFilePriority
}

// 获取符号链接指向路径的文件信息
targetInfo, err := os.Stat(targetPath)
if err != nil {
blog.Infof("common util: Error getting target file info: %v", err)
return LinkFilePriority
}

// 判断符号链接指向的路径是否是目录
if targetInfo.IsDir() {
blog.Infof("common util: %s is a symbolic link to a directory", linkpath)
return LinkDirPriority
} else {
blog.Infof("common util: %s is a symbolic link, but not to a directory", linkpath)
return LinkFilePriority
}
}
Loading

0 comments on commit 619a47b

Please sign in to comment.