Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#187 from tbs60/dev_tming_p2p
Browse files Browse the repository at this point in the history
Dev tming p2p
  • Loading branch information
tming authored Apr 1, 2024
2 parents 751decf + 4dcb696 commit 8fcea27
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 78 deletions.
21 changes: 17 additions & 4 deletions src/backend/booster/server/pkg/engine/disttask/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,14 +541,19 @@ func (de *disttaskEngine) launchTask(tb *engine.TaskBasic, queueName string) err
func (de *disttaskEngine) launchDirectP2PTask(task *distTask, tb *engine.TaskBasic, queueName string) error {
blog.Infof("engine(%s) ready to launch direct p2p task(%s) with queue:%s", EngineName, tb.ID, queueName)

purequeue := getQueueNamePure(queueName)
condition := &resourceCondition{
queueName: getQueueNamePure(queueName),
queueName: purequeue,
leastCPU: task.InheritSetting.LeastCPU,
maxCPU: task.InheritSetting.RequestCPU,
}

// TODO : resourceSelector 需要改为 p2p 定制的,之前的选择条件不能满足p2p的场景
resourceList, err := de.directMgr.GetFreeP2PResource(tb.ID, condition, p2pResourceSelector, nil)
resourceList, err := de.directMgr.GetFreeP2PResource(tb.ID,
condition,
p2pResourceSelector,
purequeue,
getPlatform(queueName))
// add task into public queue
if err == engine.ErrorNoEnoughResources {
if publicQueue := de.getPublicQueueByQueueName(queueName); publicQueue != nil &&
Expand Down Expand Up @@ -982,9 +987,11 @@ func (de *disttaskEngine) releaseTask(taskID string) error {
}

if matchDirectResource(task.InheritSetting.QueueName) {
// TODO : deal with p2p
// TODO : p2p的也需要通知下,方便资源的管理
if containsP2P(task.InheritSetting.QueueName) {
return nil
return de.releaseP2PDirectTask(task,
getPlatform(task.InheritSetting.QueueName),
getQueueNamePure(task.InheritSetting.QueueName))
}
return de.releaseDirectTask(task)
}
Expand Down Expand Up @@ -1022,6 +1029,12 @@ func (de *disttaskEngine) releaseDirectTask(task *distTask) error {
return nil
}

func (de *disttaskEngine) releaseP2PDirectTask(task *distTask, platform, groupKey string) error {
blog.Infof("engine(%s) try to release p2p direct task(%s)", EngineName, task.ID)

return de.directMgr.ReleaseP2PResource(task.ID, platform, groupKey)
}

func (de *disttaskEngine) releaseCRMTask(task *distTask) error {
crmMgr := de.getCrMgr(task.InheritSetting.QueueName)
if crmMgr == nil {
Expand Down
58 changes: 48 additions & 10 deletions src/backend/booster/server/pkg/manager/normal/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ type Cleaner interface {
func NewCleaner(layer TaskBasicLayer) Cleaner {
return &cleaner{
layer: layer,
tasks: map[string]bool{},
}
}

type cleaner struct {
ctx context.Context
layer TaskBasicLayer

tasks map[string]bool
tasksLock sync.RWMutex

c chan *engine.TaskBasic
}

Expand Down Expand Up @@ -73,7 +77,7 @@ func (c *cleaner) check() {
return
}

var wg sync.WaitGroup
// var wg sync.WaitGroup
for _, tb := range terminatedTaskList {
if tb.Status.Released {
continue
Expand All @@ -86,18 +90,51 @@ func (c *cleaner) check() {
continue
}

wg.Add(1)
go c.clean(tb.ID, egn, &wg)
// wg.Add(1)
// go c.clean(tb.ID, egn, &wg)
go c.clean(tb.ID, egn)
}
wg.Wait()
// wg.Wait()
}

func (c *cleaner) clean(taskID string, egn engine.Engine, wg *sync.WaitGroup) {
defer wg.Done()
// 确保一个taskid只有一个协程处理,避免拉起太多协程
func (c *cleaner) setFlag(taskID string) bool {
c.tasksLock.Lock()
defer c.tasksLock.Unlock()

if _, ok := c.tasks[taskID]; !ok {
c.tasks[taskID] = true
return true
} else {
return false
}
}

func (c *cleaner) unsetFlag(taskID string) {
c.tasksLock.Lock()
defer c.tasksLock.Unlock()

delete(c.tasks, taskID)
}

// func (c *cleaner) clean(taskID string, egn engine.Engine, wg *sync.WaitGroup) {
func (c *cleaner) clean(taskID string, egn engine.Engine) {
// defer wg.Done()

blog.Infof("cleaner: start clean task(%s)", taskID)

if c.setFlag(taskID) {
defer c.unsetFlag(taskID)
} else {
blog.Infof("cleaner: task (%s) is cleaning by others, do nothing", taskID)
return
}

c.layer.LockTask(taskID, "clean_of_cleaner")
defer c.layer.UnLockTask(taskID)

blog.Infof("cleaner: start get basic info for task(%s)", taskID)

tb, err := c.layer.GetTaskBasic(taskID)
if err != nil {
blog.Errorf("cleaner: get task(%s) failed: %v", taskID, err)
Expand Down Expand Up @@ -161,10 +198,11 @@ func (c *cleaner) onCleanNotify(tb *engine.TaskBasic) error {
}

// wg 不需要,只是为了保持clean的调用方式
var wg sync.WaitGroup
wg.Add(1)
c.clean(tb.ID, egn, &wg)
wg.Wait()
// var wg sync.WaitGroup
// wg.Add(1)
// c.clean(tb.ID, egn, &wg)
c.clean(tb.ID, egn)
// wg.Wait()

return nil
}
Expand Down
Loading

0 comments on commit 8fcea27

Please sign in to comment.