From 69d6ab05346948d405978fd136f2cbe7fee87044 Mon Sep 17 00:00:00 2001 From: zyxkad Date: Sat, 8 Jun 2024 08:46:39 -0600 Subject: [PATCH] fix sync logic --- handler.go | 2 +- lang/en/us.go | 27 ++++++++++++++------------- lang/zh/cn.go | 27 ++++++++++++++------------- sync.go | 37 ++++++++++++++++++++++--------------- 4 files changed, 51 insertions(+), 42 deletions(-) diff --git a/handler.go b/handler.go index d80f2b0e..baf45d26 100644 --- a/handler.go +++ b/handler.go @@ -212,7 +212,7 @@ func (cr *Cluster) getRecordMiddleWare() utils.MiddleWareFunc { case <-updateTicker.C: cr.stats.Lock() - log.Infof("Served %d requests, total responsed body = %s, total used CPU time = %.2fs", + log.Infof("Served %d requests, total responsed body = %s, total IO waiting time = %.2fs", total, utils.BytesToUnit(totalBytes), totalUsed) for ua, v := range uas { if ua == "" { diff --git a/lang/en/us.go b/lang/en/us.go index 13f50700..33309a63 100644 --- a/lang/en/us.go +++ b/lang/en/us.go @@ -58,19 +58,20 @@ var areaUS = map[string]string{ "error.check.open.failed": "Cannot open %q: %v", "error.check.hash.failed": "Cannot calculate hash for %s: %v", - "info.sync.prepare": "Preparing to sync files, length of filelist is %d ...", - "hint.sync.start": "Starting sync files, count: %d, bytes: %s", - "hint.sync.done": "All files were synchronized, use time: %v, %s/s", - "error.sync.failed": "File sync failed: %v", - "info.sync.none": "All files were synchronized", - "warn.sync.interrupted": "File sync interrupted", - "info.sync.config": "Sync config: %#v", - "hint.sync.total": "Total: ", - "hint.sync.downloading": "> Downloading ", - "hint.sync.downloading.handler": "Downloading %s from handler", - "info.sync.downloaded": "Downloaded %s [%s] %.2f%%", - "error.sync.download.failed": "Download error %s:\n\t%s", - "error.sync.create.failed": "Cannot create %s/%s: %v", + "info.sync.prepare": "Preparing to sync files, length of filelist is %d ...", + "hint.sync.start": "Starting sync files, count: %d, bytes: %s", + "hint.sync.done": "All files were synchronized, use time: %v, %s/s", + "error.sync.failed": "File sync failed: %v", + "info.sync.none": "All files were synchronized", + "warn.sync.interrupted": "File sync interrupted", + "info.sync.config": "Sync config: %#v", + "hint.sync.total": "Total: ", + "hint.sync.downloading": "> Downloading ", + "hint.sync.downloading.handler": "Downloading %s from handler", + "info.sync.downloaded": "Downloaded %s [%s] %.2f%%", + "error.sync.download.failed": "Download error %s:\n\t%s", + "error.sync.download.failed.retry": "Download error %s, retry after %v:\n\t%s", + "error.sync.create.failed": "Cannot create %s/%s: %v", "info.gc.start": "Starting garbage collector for %s", "info.gc.done": "Garbage collect finished for %s", diff --git a/lang/zh/cn.go b/lang/zh/cn.go index 6e21f433..a016356d 100644 --- a/lang/zh/cn.go +++ b/lang/zh/cn.go @@ -58,19 +58,20 @@ var areaCN = map[string]string{ "error.check.open.failed": "无法打开 %q: %v", "error.check.hash.failed": "无法为 %s 计算哈希值: %v", - "info.sync.prepare": "准备同步中, 文件列表长度为 %d ...", - "hint.sync.start": "开始同步, 总计: %d, 字节: %s", - "hint.sync.done": "文件同步完成, 用时: %v, %s/s", - "error.sync.failed": "文件同步失败: %v", - "info.sync.none": "所有文件已同步", - "warn.sync.interrupted": "同步已中断", - "info.sync.config": "同步配置: %#v", - "hint.sync.total": "总计: ", - "hint.sync.downloading": "> 下载中 ", - "hint.sync.downloading.handler": "Downloading %s from handler", - "info.sync.downloaded": "已下载 %s [%s] %.2f%%", - "error.sync.download.failed": "下载失败 %s:\n\t%s", - "error.sync.create.failed": "无法创建 %s/%s: %v", + "info.sync.prepare": "准备同步中, 文件列表长度为 %d ...", + "hint.sync.start": "开始同步, 总计: %d, 字节: %s", + "hint.sync.done": "文件同步完成, 用时: %v, %s/s", + "error.sync.failed": "文件同步失败: %v", + "info.sync.none": "所有文件已同步", + "warn.sync.interrupted": "同步已中断", + "info.sync.config": "同步配置: %#v", + "hint.sync.total": "总计: ", + "hint.sync.downloading": "> 下载中 ", + "hint.sync.downloading.handler": "Downloading %s from handler", + "info.sync.downloaded": "已下载 %s [%s] %.2f%%", + "error.sync.download.failed": "下载失败 %s:\n\t%s", + "error.sync.download.failed.retry": "下载失败 %s, %v 后重新尝试:\n\t%s", + "error.sync.create.failed": "无法创建 %s/%s: %v", "info.gc.start": "正在清理 %s", "info.gc.done": "已清理 %s", diff --git a/sync.go b/sync.go index 1b95dc12..6d3e3185 100644 --- a/sync.go +++ b/sync.go @@ -267,7 +267,7 @@ func (cr *Cluster) checkFileFor( mpb.AppendDecorators( decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), decor.NewPercentage("%d", decor.WCSyncSpaceR), - decor.EwmaETA(decor.ET_STYLE_GO, 30), + decor.EwmaETA(decor.ET_STYLE_GO, 60), ), mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) { if checkingHashMux.TryLock() { @@ -489,7 +489,7 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b var stats syncStats stats.pg = pg stats.noOpen = syncCfg.Source == "center" - stats.slots = limited.NewBufSlots(syncCfg.Concurrency) + stats.slots = limited.NewBufSlots(syncCfg.Concurrency + 1) stats.totalFiles = totalFiles for _, f := range missing { stats.totalSize += f.Size @@ -669,15 +669,15 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) defer close(pathRes) var barUnit decor.SizeB1024 - var trycount atomic.Int32 - trycount.Store(1) + var tried atomic.Int32 + tried.Store(1) bar := stats.pg.AddBar(f.Size, mpb.BarRemoveOnComplete(), mpb.BarPriority(slotId), mpb.PrependDecorators( decor.Name(Tr("hint.sync.downloading")), decor.Any(func(decor.Statistics) string { - tc := trycount.Load() + tc := tried.Load() if tc <= 1 { return "" } @@ -688,22 +688,23 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) mpb.AppendDecorators( decor.NewPercentage("%d", decor.WCSyncSpace), decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace), - decor.EwmaSpeed(barUnit, "%.1f", 10, decor.WCSyncSpace), + decor.EwmaSpeed(barUnit, "%.1f", 30, decor.WCSyncSpace), decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_GO, 10, decor.WCSyncSpace), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncSpace), "done", ), ), ) defer bar.Abort(true) noOpen := stats.noOpen + badOpen := false interval := time.Second for { bar.SetCurrent(0) hashMethod, err := getHashMethod(len(f.Hash)) if err == nil { var path string - if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader { + if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, badOpen, func(r io.Reader) io.Reader { return ProxyReader(r, bar, stats.totalBar, &stats.lastInc) }); err == nil { pathRes <- path @@ -716,14 +717,15 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) } bar.SetRefill(bar.Current()) - log.Errorf(Tr("error.sync.download.failed"), f.Path, err) - c := trycount.Add(1) + c := tried.Add(1) if c > maxRetryCount { + log.Errorf(Tr("error.sync.download.failed"), f.Path, err) break } if c > maxTryWithOpen { - noOpen = true + badOpen = true } + log.Errorf(Tr("error.sync.download.failed.retry"), f.Path, interval, err) select { case <-time.After(interval): interval *= 2 @@ -739,20 +741,25 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) func (cr *Cluster) fetchFileWithBuf( ctx context.Context, f FileInfo, hashMethod crypto.Hash, buf []byte, - noOpen bool, + noOpen bool, badOpen bool, wrapper func(io.Reader) io.Reader, ) (path string, err error) { var ( reqPath = f.Path + query url.Values req *http.Request res *http.Response fd *os.File r io.Reader ) - if noOpen { + if badOpen { reqPath = "/openbmclapi/download/" + f.Hash + } else if noOpen { + query = url.Values{ + "noopen": {"1"}, + } } - if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, nil); err != nil { + if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, query); err != nil { return } req.Header.Set("Accept-Encoding", "gzip, deflate") @@ -904,7 +911,7 @@ func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) { } defer free() - path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, nil) + path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, true, nil) if err != nil { return }