From 94b4c2343ec4a1f170ca6fc1556603605f7ef68f Mon Sep 17 00:00:00 2001 From: yanafu Date: Mon, 4 Nov 2024 19:53:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20worker=E9=80=89=E5=8F=96=E5=8F=8A?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6=E4=BC=98=E5=8C=96=20#311?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/pkg/manager/remote/mgr.go | 9 ++---- .../pkg/manager/remote/slotbyworkeroffer.go | 30 ----------------- .../controller/pkg/manager/remote/slots.go | 32 ------------------- 3 files changed, 3 insertions(+), 68 deletions(-) diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go index cf4535a8..5239775b 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/mgr.go @@ -39,8 +39,6 @@ const ( corkMaxSize = 1024 * 1024 * 10 // corkMaxSize = 1024 * 1024 * 1024 largeFileSize = 1024 * 1024 * 100 // 100MB - - //fileMaxFailCount = 5 ) // NewMgr get a new Remote Mgr @@ -1070,8 +1068,7 @@ func (m *Mgr) ensureFiles( if v.info.SendStatus == types.FileSendSucceed { wg <- nil continue - } else if v.info.SendStatus == types.FileSendFailed || - v.info.SendStatus == types.FileSendUnknown { + } else if v.info.SendStatus == types.FileSendFailed { wg <- types.ErrSendFileFailed continue } @@ -1436,7 +1433,7 @@ type matchResult struct { // checkOrLockCorkFiles 批量检查目标file的sendStatus, 如果已经被发送, 则返回当前状态和true; 如果没有被发送过, 则将其置于sending, 并返回false func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []matchResult { - if len(descs) == 0 || !descs[0].Retry { // 普通的文件 + if len(descs) == 0 || !descs[0].Retry { // 第一次发送的文件 m.fileSendMutex.Lock() target, ok := m.fileSendMap[server] if !ok { @@ -1446,7 +1443,7 @@ func (m *Mgr) checkOrLockCorkFiles(server string, descs []*dcSDK.FileDesc) []mat m.fileSendMutex.Unlock() return target.matchOrInserts(descs) - } else { // 失败的文件 + } else { // 失败重试的文件 m.fileSendMutex.Lock() target, ok := m.failFileSendMap[server] if !ok { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go index bd3b545d..17b94f80 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slotbyworkeroffer.go @@ -282,36 +282,6 @@ func (wo *workerOffer) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wo.validWorkerNum, *host) } -/*func (wo *workerOffer) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wo.workerLock.Lock() - defer wo.workerLock.Unlock() - - for _, wk := range wo.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead,do nothing now", host) - return false - } - - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying,do nothing now", host) - return true - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return false - } - - return false -}*/ - func (wo *workerOffer) SetWorkerStatus(host *dcProtocol.Host, s Status) { wo.workerLock.Lock() defer wo.workerLock.Unlock() diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go index 1e309d32..6273946e 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/remote/slots.go @@ -28,7 +28,6 @@ type RemoteSlotMgr interface { RecoverDeadWorker(w *worker) DisableWorker(host *dcProtocol.Host) EnableWorker(host *dcProtocol.Host) - //CanWorkerRetry(host *dcProtocol.Host) bool // check if worker can retry, if can set worker status to retrying SetWorkerStatus(host *dcProtocol.Host, status Status) Lock(usage dcSDK.JobUsage, f string, banWorkerList []*dcProtocol.Host) *dcProtocol.Host Unlock(usage dcSDK.JobUsage, host *dcProtocol.Host) @@ -299,37 +298,6 @@ func (wr *resource) EnableWorker(host *dcProtocol.Host) { blog.Infof("remote slot: total slot:%d after enable host:%v", wr.totalSlots, *host) } -/*func (wr *resource) CanWorkerRetry(host *dcProtocol.Host) bool { - if host == nil { - return false - } - - wr.workerLock.Lock() - defer wr.workerLock.Unlock() - for _, wk := range wr.worker { - if !wk.host.Equal(host) { - continue - } - - if wk.dead { - blog.Infof("remote slot: host:%v is already dead, do nothing now", host) - return false - } - if !wk.disabled { - return false - } - if wk.status == Retrying { - blog.Infof("remote slot: host:%v is retrying, do nothing now", host) - return false - } - blog.Info("remote slot: host:%v can retry, change worker from %s to %s", host, wk.status, Retrying) - wk.status = Retrying - return true - } - - return false -}*/ - func (wr *resource) SetWorkerStatus(host *dcProtocol.Host, s Status) { wr.workerLock.Lock() defer wr.workerLock.Unlock()