Skip to content

Commit

Permalink
fix progress bars
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Feb 20, 2024
1 parent e505436 commit 5fb09e5
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 74 deletions.
33 changes: 33 additions & 0 deletions bar.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,39 @@ import (
"github.com/vbauerster/mpb/v8"
)

type ProxiedReader struct {
io.Reader
bar, total *mpb.Bar
lastRead time.Time
lastInc *atomic.Int64
}

func ProxyReader(r io.Reader, bar, total *mpb.Bar, lastInc *atomic.Int64) *ProxiedReader {
return &ProxiedReader{
Reader: r,
bar: bar,
total: total,
lastInc: lastInc,
}
}

func (p *ProxiedReader) Read(buf []byte) (n int, err error) {
start := p.lastRead
if start.IsZero() {
start = time.Now()
}
n, err = p.Reader.Read(buf)
end := time.Now()
p.lastRead = end
used := end.Sub(start)

p.bar.EwmaIncrBy(n, used)
nowSt := end.UnixNano()
last := p.lastInc.Swap(nowSt)
p.total.EwmaIncrBy(n, (time.Duration)(nowSt-last)*time.Nanosecond)
return
}

type ProxiedReadSeeker struct {
io.ReadSeeker
bar, total *mpb.Bar
Expand Down
204 changes: 148 additions & 56 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ type Cluster struct {
fileset map[string]int64

client *http.Client
bufSlots chan []byte
bufSlots chan slotInfo

handlerAPIv0 http.Handler
handlerAPIv1 http.Handler
}

type slotInfo struct {
id int
buf []byte
}

func NewCluster(
ctx context.Context,
prefix string,
Expand Down Expand Up @@ -122,9 +127,12 @@ func NewCluster(
}
close(cr.disabled)

cr.bufSlots = make(chan []byte, cr.maxConn)
cr.bufSlots = make(chan slotInfo, cr.maxConn)
for i := 0; i < cr.maxConn; i++ {
cr.bufSlots <- make([]byte, 1024*512)
cr.bufSlots <- slotInfo{
id: i,
buf: make([]byte, 1024*512),
}
}

{
Expand Down Expand Up @@ -157,6 +165,17 @@ func (cr *Cluster) Init(ctx context.Context) error {
return nil
}

func (cr *Cluster) allocBuf(ctx context.Context) (slotId int, buf []byte, free func()) {
select {
case slot := <-cr.bufSlots:
return slot.id, slot.buf, func() {
cr.bufSlots <- slot
}
case <-ctx.Done():
return 0, nil, nil
}
}

func (cr *Cluster) Connect(ctx context.Context) bool {
cr.mux.Lock()
defer cr.mux.Unlock()
Expand Down Expand Up @@ -552,10 +571,13 @@ func (cr *Cluster) GetConfig(ctx context.Context) (cfg *OpenbmclapiAgentConfig,
}

type syncStats struct {
totalsize float64
downloaded float64
fcount atomic.Int32
fl int
totalSize int64
okCount, failCount atomic.Int32
totalFiles int

pg *mpb.Progress
totalBar *mpb.Bar
lastInc atomic.Int64
}

func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) {
Expand Down Expand Up @@ -601,17 +623,28 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m

logInfof("Start checking files for %s, heavy = %v", storage.String(), heavy)

pb := mpb.New(mpb.WithAutoRefresh())
setLogOutput(pb)
pg := mpb.New(mpb.WithAutoRefresh())
setLogOutput(pg)
defer setLogOutput(nil)

bar := pb.AddBar((int64)(len(files)),
var (
checkingHashMux sync.RWMutex
checkingHash string
)

bar := pg.AddBar((int64)(len(files)),
mpb.BarRemoveOnComplete(),
mpb.PrependDecorators(
decor.Name("Checking "+storage.String()),
decor.Any(func(decor.Statistics) string {
checkingHashMux.RLock()
defer checkingHashMux.RUnlock()
return "/" + checkingHash
}),
),
mpb.AppendDecorators(
decor.CountersNoUnit("%.d / %.d", decor.WCSyncSpaceR),
decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR),
decor.NewPercentage("%d"),
),
)

Expand All @@ -624,16 +657,17 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
var buf [1024 * 32]byte
for _, f := range files {
hash := f.Hash
// logDebugf("Checking file %s [%.2f%%]", hash, (float32)(i+1)/(float32)(len(files))*100)
checkingHashMux.Lock()
checkingHash = hash
checkingHashMux.Unlock()
bar.Increment()
if f.Size == 0 {
logDebugf("Skipped empty file %s", hash)
continue
}
if size, ok := sizeMap[hash]; ok {
if size != f.Size {
logInfof("Found modified file: size of %q is %s, expect %s",
hash, bytesToUnit((float64)(size)), bytesToUnit((float64)(f.Size)))
logInfof("Found modified file: size of %q is %d, expect %d", hash, size, f.Size)
goto MISSING
}
if heavy {
Expand Down Expand Up @@ -666,7 +700,7 @@ func (cr *Cluster) checkFileFor(storage Storage, files []FileInfo, heavy bool, m
MISSING:
addMissing(f)
}
pb.Wait()
pg.Wait()
logInfof("File check finished for %s", storage.String())
return
}
Expand All @@ -677,8 +711,8 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
cr.checkFileFor(s, files, heavyCheck, missingMap)
}

fl := len(missingMap.m)
if fl == 0 {
totalFiles := len(missingMap.m)
if totalFiles == 0 {
logInfo("All files was synchronized")
return nil
}
Expand All @@ -689,12 +723,38 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
}

var stats syncStats
stats.fl = fl
stats.totalFiles = totalFiles
for _, f := range missing {
stats.totalsize += (float64)(f.Size)
stats.totalSize += f.Size
}

logInfof("Starting sync files, count: %d, total: %s", fl, bytesToUnit(stats.totalsize))
pg := mpb.New(mpb.WithAutoRefresh())
stats.pg = pg
setLogOutput(pg)
defer setLogOutput(nil)

var barUnit decor.SizeB1024
stats.lastInc.Store(time.Now().UnixNano())
stats.totalBar = pg.AddBar(stats.totalSize,
mpb.BarRemoveOnComplete(),
mpb.BarPriority(cap(cr.bufSlots)),
mpb.PrependDecorators(
decor.Name("Total: "),
decor.NewPercentage("%.2f"),
),
mpb.AppendDecorators(
decor.Any(func(decor.Statistics) string {
return fmt.Sprintf("(%d + %d / %d) ", stats.okCount.Load(), stats.failCount.Load(), stats.totalFiles)
}),
decor.Counters(barUnit, "(%.1f/%.1f) "),
decor.EwmaSpeed(barUnit, "%.1f ", 30),
decor.OnComplete(
decor.EwmaETA(decor.ET_STYLE_GO, 30), "done",
),
),
)

logInfof("Starting sync files, count: %d, total: %s", totalFiles, bytesToUnit((float64)(stats.totalSize)))
start := time.Now()

done := make(chan struct{}, 1)
Expand All @@ -718,10 +778,12 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
if path != "" {
defer os.Remove(path)
// acquire slot here
buf := <-cr.bufSlots
defer func() {
cr.bufSlots <- buf
}()
slotId, buf, free := cr.allocBuf(ctx)
if buf == nil {
return
}
defer free()
_ = slotId
var srcFd *os.File
if srcFd, err = os.Open(path); err != nil {
return
Expand Down Expand Up @@ -754,7 +816,9 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
}

use := time.Since(start)
logInfof("All files was synchronized, use time: %v, %s/s", use, bytesToUnit(stats.totalsize/use.Seconds()))
pg.Wait()

logInfof("All files was synchronized, use time: %v, %s/s", use, bytesToUnit((float64)(stats.totalSize)/use.Seconds()))
return nil
}

Expand Down Expand Up @@ -788,54 +852,77 @@ func (cr *Cluster) gcFor(s Storage) {
}

func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) {
var buf []byte
WAIT_SLOT:
for {
select {
case buf = <-cr.bufSlots:
break WAIT_SLOT
case <-ctx.Done():
return nil, ctx.Err()
}
const maxRetryCount = 5

slotId, buf, free := cr.allocBuf(ctx)
if buf == nil {
return nil, ctx.Err()
}

pathRes := make(chan string, 1)
go func() {
defer func() {
cr.bufSlots <- buf
}()
defer free()
defer close(pathRes)

for trycount := 1; trycount <= 3; trycount++ {
logInfof("Downloading: %s [%s]", f.Path, bytesToUnit((float64)(f.Size)))
var barUnit decor.SizeB1024
var trycount atomic.Int32
trycount.Store(1)
bar := stats.pg.AddBar(f.Size,
mpb.BarRemoveOnComplete(),
mpb.BarPriority(slotId),
mpb.PrependDecorators(
decor.Name("> Downloading "),
decor.Any(func(decor.Statistics) string {
tc := trycount.Load()
if tc <= 1 {
return ""
}
return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount)
}),
decor.Name(f.Path, decor.WCSyncSpaceR),
),
mpb.AppendDecorators(
decor.NewPercentage("%d", decor.WCSyncSpace),
decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace),
decor.EwmaSpeed(barUnit, "%.1f", 10, decor.WCSyncSpace),
decor.OnComplete(
decor.EwmaETA(decor.ET_STYLE_GO, 10, decor.WCSyncSpace), "done",
),
),
)
defer bar.Abort(true)

interval := time.Second
for {
bar.SetCurrent(0)
hashMethod, err := getHashMethod(len(f.Hash))
if err == nil {
var path string
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf); err == nil {
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, func(r io.Reader) io.Reader {
return ProxyReader(r, bar, stats.totalBar, &stats.lastInc)
}); err == nil {
pathRes <- path
stats.downloaded += (float64)(f.Size)
stats.fcount.Add(1)
logInfof("Downloaded: %s [%s/%s ; %d/%d] %.2f%%", f.Path,
bytesToUnit(stats.downloaded), bytesToUnit(stats.totalsize),
stats.fcount.Load(), stats.fl,
stats.downloaded/stats.totalsize*100)
stats.okCount.Add(1)
logInfof("Downloaded %s [%s] %.2f%%", f.Path,
bytesToUnit((float64)(f.Size)),
(float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100)
return
}
}
bar.SetRefill(bar.Current())

logErrorf("File download error: %s [%s/%s ; %d/%d] %.2f%%\n\t%s",
f.Path,
bytesToUnit(stats.downloaded), bytesToUnit(stats.totalsize),
stats.fcount.Load(), stats.fl,
stats.downloaded/stats.totalsize*100,
err)
logErrorf("Download error %s:\n\t%s", f.Path, err)
if trycount.Add(1) > maxRetryCount {
break
}
select {
case <-time.After(time.Second):
case <-time.After(interval):
interval *= 2
case <-ctx.Done():
return
}
}
stats.fcount.Add(1)
stats.failCount.Add(1)
}()
return pathRes, nil
}
Expand All @@ -844,7 +931,9 @@ var noOpenQuery = url.Values{
"noopen": {"1"},
}

func (cr *Cluster) fetchFileWithBuf(ctx context.Context, f FileInfo, hashMethod crypto.Hash, buf []byte) (path string, err error) {
func (cr *Cluster) fetchFileWithBuf(ctx context.Context, f FileInfo,
hashMethod crypto.Hash, buf []byte,
wrapper func(io.Reader) io.Reader) (path string, err error) {
var (
query url.Values = nil
req *http.Request
Expand Down Expand Up @@ -885,6 +974,9 @@ func (cr *Cluster) fetchFileWithBuf(ctx context.Context, f FileInfo, hashMethod
err = fmt.Errorf("Unexpected Content-Encoding %q", ce)
return
}
if wrapper != nil {
r = wrapper(r)
}

hw := hashMethod.New()

Expand Down Expand Up @@ -960,7 +1052,7 @@ func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) {
done <- err
}()

path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf)
path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, nil)
if err != nil {
return
}
Expand Down
Loading

0 comments on commit 5fb09e5

Please sign in to comment.