diff --git a/cmd/ctr-cli/diff/diff.go b/cmd/ctr-cli/diff/diff.go index 60a4727..0a1ca4e 100644 --- a/cmd/ctr-cli/diff/diff.go +++ b/cmd/ctr-cli/diff/diff.go @@ -51,6 +51,18 @@ func DimgCommand() *cli.Command { Value: false, Required: false, }, + &cli.IntFlag{ + Name: "threadNum", + Usage: "The number of threads to process", + Value: 1, + Required: false, + }, + &cli.StringFlag{ + Name: "threadSchedMode", + Usage: "Multithread scheduling mode", + Value: "none", + Required: false, + }, }, } @@ -58,11 +70,12 @@ func DimgCommand() *cli.Command { } func dimgAction(c *cli.Context) error { - logger.Logger.SetLevel(logrus.WarnLevel) oldDimg := c.String("oldDimg") newDimg := c.String("newDimg") outDimg := c.String("outDimg") mode := c.String("mode") + threadNum := c.Int("threadNum") + threadSchedMode := c.String("threadSchedMode") enableBench := c.Bool("benchmark") logger.WithFields(logrus.Fields{ "oldDimg": oldDimg, @@ -87,7 +100,11 @@ func dimgAction(c *cli.Context) error { start := time.Now() - err = image.GenerateDiffFromDimg(oldDimg, newDimg, outDimg, mode == ModeDiffBinary) + dc := image.DiffMultihreadConfig{ + ThreadNum: threadNum, + ScheduleMode: threadSchedMode, + } + err = image.GenerateDiffFromDimg(oldDimg, newDimg, outDimg, mode == ModeDiffBinary, dc) if err != nil { panic(err) } @@ -147,6 +164,18 @@ func CdimgCommand() *cli.Command { Value: false, Required: false, }, + &cli.IntFlag{ + Name: "threadNum", + Usage: "The number of threads to process", + Value: 1, + Required: false, + }, + &cli.StringFlag{ + Name: "threadSchedMode", + Usage: "Multithread scheduling mode", + Value: "none", + Required: false, + }, }, } @@ -154,12 +183,13 @@ func CdimgCommand() *cli.Command { } func cdimgAction(c *cli.Context) error { - logger.Logger.SetLevel(logrus.WarnLevel) oldCdimg := c.String("oldCdimg") newCdimg := c.String("newCdimg") outCdimg := c.String("outCdimg") mode := c.String("mode") enableBench := c.Bool("benchmark") + threadNum := c.Int("threadNum") + threadSchedMode := c.String("threadSchedMode") logger.WithFields(logrus.Fields{ "oldCdimg": oldCdimg, "newCdimg": newCdimg, @@ -183,7 +213,11 @@ func cdimgAction(c *cli.Context) error { start := time.Now() - err = image.GenerateDiffFromCdimg(oldCdimg, newCdimg, outCdimg, mode == ModeDiffBinary) + dc := image.DiffMultihreadConfig{ + ThreadNum: threadNum, + ScheduleMode: threadSchedMode, + } + err = image.GenerateDiffFromCdimg(oldCdimg, newCdimg, outCdimg, mode == ModeDiffBinary, dc) if err != nil { panic(err) } diff --git a/pkg/image/diff.go b/pkg/image/diff.go index da944b0..1806baa 100644 --- a/pkg/image/diff.go +++ b/pkg/image/diff.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/icedream/go-bsdiff" "github.com/naoki9911/fuse-diff-containerd/pkg/utils" @@ -19,7 +20,7 @@ func getFileSize(path string) (int, error) { return int(fileInfo.Size()), nil } -func GenerateDiffFromDimg(oldDimgPath, newDimgPath, diffDimgPath string, isBinaryDiff bool) error { +func GenerateDiffFromDimg(oldDimgPath, newDimgPath, diffDimgPath string, isBinaryDiff bool, dc DiffMultihreadConfig) error { oldDimg, err := OpenDimgFile(oldDimgPath) if err != nil { return err @@ -39,7 +40,7 @@ func GenerateDiffFromDimg(oldDimgPath, newDimgPath, diffDimgPath string, isBinar defer diffFile.Close() diffOut := bytes.Buffer{} - _, err = generateDiffFromDimg(oldDimg, newDimg, &oldDimg.Header().FileEntry, &newDimg.Header().FileEntry, &diffOut, isBinaryDiff) + err = generateDiffMultithread(oldDimg, newDimg, &oldDimg.Header().FileEntry, &newDimg.Header().FileEntry, &diffOut, isBinaryDiff, dc) if err != nil { return err } @@ -58,7 +59,7 @@ func GenerateDiffFromDimg(oldDimgPath, newDimgPath, diffDimgPath string, isBinar return nil } -func GenerateDiffFromCdimg(oldCdimgPath, newCdimgPath, diffCdimgPath string, isBinaryDiff bool) error { +func GenerateDiffFromCdimg(oldCdimgPath, newCdimgPath, diffCdimgPath string, isBinaryDiff bool, dc DiffMultihreadConfig) error { oldCdimg, err := OpenCdimgFile(oldCdimgPath) if err != nil { return err @@ -80,7 +81,7 @@ func GenerateDiffFromCdimg(oldCdimgPath, newCdimgPath, diffCdimgPath string, isB defer diffCdimg.Close() diffOut := bytes.Buffer{} - _, err = generateDiffFromDimg(oldDimg, newDimg, &oldDimg.Header().FileEntry, &newDimg.Header().FileEntry, &diffOut, isBinaryDiff) + err = generateDiffMultithread(oldDimg, newDimg, &oldDimg.Header().FileEntry, &newDimg.Header().FileEntry, &diffOut, isBinaryDiff, dc) if err != nil { return err } @@ -108,15 +109,192 @@ func GenerateDiffFromCdimg(oldCdimgPath, newCdimgPath, diffCdimgPath string, isB return nil } -// @return bool: is entirly new ? -func generateDiffFromDimg(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry *FileEntry, diffBody *bytes.Buffer, isBinaryDiff bool) (bool, error) { +type diffTask struct { + oldEntry *FileEntry + newEntry *FileEntry + data []byte +} + +const ( + DIFF_MULTI_SCHED_NONE = "none" + DIFF_MULTI_SCHED_SIZE_ORDERED = "size-ordered" +) + +type DiffMultihreadConfig struct { + ThreadNum int + ScheduleMode string +} + +func (dc *DiffMultihreadConfig) Validate() error { + if dc.ThreadNum <= 0 { + return fmt.Errorf("invalid ThreadNum: %d", dc.ThreadNum) + } + + if dc.ScheduleMode != DIFF_MULTI_SCHED_NONE && dc.ScheduleMode != DIFF_MULTI_SCHED_SIZE_ORDERED { + return fmt.Errorf("invalid ScheduleMode: %s", dc.ScheduleMode) + } + return nil +} + +func generateDiffMultithread(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry *FileEntry, diffBody *bytes.Buffer, isBinaryDiff bool, dc DiffMultihreadConfig) error { + diffTasks := make(chan diffTask, 1000) + writeTasks := make(chan diffTask, 1000) + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("started diff task enqueu thread") + err := enqueueDiffTaskToChannel(oldDimgFile, newDimgFile, oldEntry, newEntry, diffTasks) + if err != nil { + logger.Errorf("failed to enque: %v", err) + } + close(diffTasks) + logger.Info("finished diff task enqueu thread") + }() + + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("started diff write thread") + for { + wt, more := <-writeTasks + if !more { + break + } + wt.newEntry.Offset = int64(diffBody.Len()) + _, err := diffBody.Write(wt.data) + if err != nil { + logger.Errorf("failed to write to diffBody: %v", err) + return + } + } + logger.Info("finished diff write thread") + }() + + compWg := sync.WaitGroup{} + for i := 0; i < dc.ThreadNum; i++ { + wg.Add(1) + compWg.Add(1) + go func(threadId int) { + logger.Infof("started diff thread idx=%d", threadId) + defer wg.Done() + defer compWg.Done() + for { + dt, more := <-diffTasks + if !more { + break + } + + if dt.oldEntry == nil { + dt.data = make([]byte, dt.newEntry.CompressedSize) + _, err := newDimgFile.ReadAt(dt.data, dt.newEntry.Offset) + if err != nil { + logger.Errorf("failed to read from newDimgFile at 0x%x: %v", dt.newEntry.Offset, err) + break + } + } else { + newCompressedBytes := make([]byte, dt.newEntry.CompressedSize) + _, err := newDimgFile.ReadAt(newCompressedBytes, dt.newEntry.Offset) + if err != nil { + logger.Errorf("failed to read from newDimgFile at 0x%x: %v", dt.newEntry.Offset, err) + break + } + newBytes, err := utils.DecompressWithZstd(newCompressedBytes) + if err != nil { + logger.Errorf("failed to decompress newBytes: %v", err) + break + } + + oldCompressedBytes := make([]byte, dt.oldEntry.CompressedSize) + _, err = oldDimgFile.ReadAt(oldCompressedBytes, dt.oldEntry.Offset) + if err != nil { + logger.Errorf("failed to read from oldDimgFile at 0x%x: %v", dt.oldEntry.Offset, err) + break + } + oldBytes, err := utils.DecompressWithZstd(oldCompressedBytes) + if err != nil { + logger.Errorf("failed to decompress oldBytes: %v", err) + break + } + isSame := bytes.Equal(newBytes, oldBytes) + if isSame { + dt.newEntry.Type = FILE_ENTRY_FILE_SAME + dt.newEntry.CompressedSize = 0 + continue + } + if len(oldBytes) > 0 && isBinaryDiff { + // old File may be 0-bytes + diffWriter := new(bytes.Buffer) + //fmt.Printf("oldBytes=%d newBytes=%d old=%v new=%v\n", len(oldBytes), len(newBytes), *oldChildEntry, *newChildEntry) + err = bsdiff.Diff(bytes.NewBuffer(oldBytes), bytes.NewBuffer(newBytes), diffWriter) + if err != nil { + logger.Errorf("failed to bsdiff.Diff: %v", err) + break + } + dt.newEntry.Type = FILE_ENTRY_FILE_DIFF + dt.newEntry.CompressedSize = int64(diffWriter.Len()) + dt.data = diffWriter.Bytes() + } else { + dt.newEntry.Type = FILE_ENTRY_FILE_NEW + dt.data = make([]byte, dt.newEntry.CompressedSize) + _, err := newDimgFile.ReadAt(dt.data, dt.newEntry.Offset) + if err != nil { + logger.Errorf("failed to read from newDimgFile at 0x%x: %v", dt.newEntry.Offset, err) + break + } + } + } + writeTasks <- dt + } + logger.Infof("finished diff thread idx=%d", threadId) + }(i) + } + + go func() { + compWg.Wait() + close(writeTasks) + logger.Infof("all diff tasks finished") + }() + + wg.Wait() + + logger.Info("started to update dir entry") + updateDirFileEntry(newEntry) + logger.Info("finished to update dir entry") + return nil +} + +// updates FileEntry.Type to FILE_ENTRY_DIR or FILE_ENTRY_DIR_NEW +func updateDirFileEntry(entry *FileEntry) { + if !entry.IsDir() { + return + } + entireNew := true + for _, childEntry := range entry.Childs { + if childEntry.IsDir() { + updateDirFileEntry(childEntry) + } + if !childEntry.IsNew() { + entireNew = false + } + } + + if entireNew { + entry.Type = FILE_ENTRY_DIR_NEW + } else { + entry.Type = FILE_ENTRY_DIR + } +} + +func enqueueDiffTaskToChannel(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry *FileEntry, taskChan chan diffTask) error { for fName := range newEntry.Childs { newChildEntry := newEntry.Childs[fName] if newChildEntry.Type == FILE_ENTRY_FILE_SAME || newChildEntry.Type == FILE_ENTRY_FILE_DIFF { - return false, fmt.Errorf("invalid dimg") + return fmt.Errorf("invalid dimg") } if newChildEntry.Type == FILE_ENTRY_OPAQUE || @@ -128,20 +306,14 @@ func generateDiffFromDimg(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry // newly created file or directory if oldEntry == nil { if newChildEntry.IsDir() { - _, err := generateDiffFromDimg(oldDimgFile, newDimgFile, nil, newChildEntry, diffBody, isBinaryDiff) + err := enqueueDiffTaskToChannel(oldDimgFile, newDimgFile, nil, newChildEntry, taskChan) if err != nil { - return false, err + return err } } else { - newBytes := make([]byte, newChildEntry.CompressedSize) - _, err := newDimgFile.ReadAt(newBytes, newChildEntry.Offset) - if err != nil { - return false, err - } - newChildEntry.Offset = int64(len(diffBody.Bytes())) - _, err = diffBody.Write(newBytes) - if err != nil { - return false, err + taskChan <- diffTask{ + oldEntry: nil, + newEntry: newChildEntry, } } @@ -155,20 +327,14 @@ func generateDiffFromDimg(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry oldChildEntry.Name != newChildEntry.Name || oldChildEntry.Type != newChildEntry.Type { if newChildEntry.IsDir() { - _, err := generateDiffFromDimg(oldDimgFile, newDimgFile, nil, newChildEntry, diffBody, isBinaryDiff) + err := enqueueDiffTaskToChannel(oldDimgFile, newDimgFile, nil, newChildEntry, taskChan) if err != nil { - return false, err + return err } } else { - newBytes := make([]byte, newChildEntry.CompressedSize) - _, err := newDimgFile.ReadAt(newBytes, newChildEntry.Offset) - if err != nil { - return false, err - } - newChildEntry.Offset = int64(len(diffBody.Bytes())) - _, err = diffBody.Write(newBytes) - if err != nil { - return false, err + taskChan <- diffTask{ + oldEntry: nil, + newEntry: newChildEntry, } } @@ -177,79 +343,18 @@ func generateDiffFromDimg(oldDimgFile, newDimgFile *DimgFile, oldEntry, newEntry // if both new and old are directory, recursively generate diff if newChildEntry.IsDir() { - new, err := generateDiffFromDimg(oldDimgFile, newDimgFile, oldChildEntry, newChildEntry, diffBody, isBinaryDiff) + err := enqueueDiffTaskToChannel(oldDimgFile, newDimgFile, oldChildEntry, newChildEntry, taskChan) if err != nil { - return false, err - } - if !new { - entireNew = false + return err } continue } - newCompressedBytes := make([]byte, newChildEntry.CompressedSize) - _, err := newDimgFile.ReadAt(newCompressedBytes, newChildEntry.Offset) - if err != nil { - return false, err - } - newBytes, err := utils.DecompressWithZstd(newCompressedBytes) - if err != nil { - return false, err - } - - oldCompressedBytes := make([]byte, oldChildEntry.CompressedSize) - _, err = oldDimgFile.ReadAt(oldCompressedBytes, oldChildEntry.Offset) - if err != nil { - return false, err - } - oldBytes, err := utils.DecompressWithZstd(oldCompressedBytes) - if err != nil { - return false, err - } - isSame := bytes.Equal(newBytes, oldBytes) - if isSame { - entireNew = false - newChildEntry.Type = FILE_ENTRY_FILE_SAME - continue - } - - // old File may be 0-bytes - if len(oldBytes) > 0 && isBinaryDiff { - entireNew = false - diffWriter := new(bytes.Buffer) - //fmt.Printf("oldBytes=%d newBytes=%d old=%v new=%v\n", len(oldBytes), len(newBytes), *oldChildEntry, *newChildEntry) - err = bsdiff.Diff(bytes.NewBuffer(oldBytes), bytes.NewBuffer(newBytes), diffWriter) - if err != nil { - return false, err - } - newChildEntry.Offset = int64(len(diffBody.Bytes())) - newChildEntry.CompressedSize = int64(len(diffWriter.Bytes())) - _, err = diffBody.Write(diffWriter.Bytes()) - if err != nil { - return false, err - } - newChildEntry.Type = FILE_ENTRY_FILE_DIFF - } else { - newBytes := make([]byte, newChildEntry.CompressedSize) - _, err := newDimgFile.ReadAt(newBytes, newChildEntry.Offset) - if err != nil { - return false, err - } - newChildEntry.Offset = int64(len(diffBody.Bytes())) - _, err = diffBody.Write(newBytes) - if err != nil { - return false, err - } - newChildEntry.Type = FILE_ENTRY_FILE_NEW - } - } - if newEntry.IsDir() { - if entireNew { - newEntry.Type = FILE_ENTRY_DIR_NEW - } else { - newEntry.Type = FILE_ENTRY_DIR + taskChan <- diffTask{ + oldEntry: oldChildEntry, + newEntry: newChildEntry, } } - return entireNew, nil + return nil } diff --git a/tests/bench_cdimg_impl.sh b/tests/bench_cdimg_impl.sh index 6949a41..4c7200a 100755 --- a/tests/bench_cdimg_impl.sh +++ b/tests/bench_cdimg_impl.sh @@ -51,7 +51,7 @@ for ((i=0; i < $(expr ${#IMAGE_VERSIONS[@]} - 1); i++));do for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark diff $DIFF_NAME binary-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI cdimg diff --oldCdimg=./$LOWER.cdimg --newCdimg=./$UPPER.cdimg --outCdimg=./diff_$DIFF_NAME.cdimg --mode=binary-diff --benchmark + $BIN_CTR_CLI cdimg diff --oldCdimg=./$LOWER.cdimg --newCdimg=./$UPPER.cdimg --outCdimg=./diff_$DIFF_NAME.cdimg --mode=binary-diff --benchmark --threadNum $THREAD_NUM done # packing diff data @@ -81,7 +81,7 @@ for ((i=0; i < $(expr ${#IMAGE_VERSIONS[@]} - 1); i++));do for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark diff $DIFF_NAME file-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI cdimg diff --oldCdimg=./$LOWER.cdimg --newCdimg=./$UPPER.cdimg --outCdimg=./diff_file_$DIFF_NAME.cdimg --mode=file-diff --benchmark + $BIN_CTR_CLI cdimg diff --oldCdimg=./$LOWER.cdimg --newCdimg=./$UPPER.cdimg --outCdimg=./diff_file_$DIFF_NAME.cdimg --mode=file-diff --benchmark --threadNum $THREAD_NUM done # packing diff data and test it @@ -111,14 +111,14 @@ fusermount3 -u /tmp/fuse for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark regen-diff $MERGED binary-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI cdimg diff --oldCdimg=./$IMAGE_LOWER.cdimg --newCdimg=./$IMAGE_UPPER.cdimg --outCdimg=./diff_$MERGED.cdimg --mode=binary-diff --benchmark + $BIN_CTR_CLI cdimg diff --oldCdimg=./$IMAGE_LOWER.cdimg --newCdimg=./$IMAGE_UPPER.cdimg --outCdimg=./diff_$MERGED.cdimg --mode=binary-diff --benchmark --threadNum $THREAD_NUM done ls -l diff_$MERGED.cdimg for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark regen-diff $MERGED file-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI cdimg diff --oldCdimg=./$IMAGE_LOWER.cdimg --newCdimg=./$IMAGE_UPPER.cdimg --outCdimg=./diff_file_$MERGED.cdimg --mode=file-diff --benchmark + $BIN_CTR_CLI cdimg diff --oldCdimg=./$IMAGE_LOWER.cdimg --newCdimg=./$IMAGE_UPPER.cdimg --outCdimg=./diff_file_$MERGED.cdimg --mode=file-diff --benchmark --threadNum $THREAD_NUM done ls -l diff_file_$MERGED.cdimg diff --git a/tests/bench_impl.sh b/tests/bench_impl.sh index 1b1132f..81197ad 100755 --- a/tests/bench_impl.sh +++ b/tests/bench_impl.sh @@ -49,7 +49,7 @@ for ((i=0; i < $(expr ${#IMAGE_VERSIONS[@]} - 1); i++));do for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark diff $DIFF_NAME binary-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI dimg diff --oldDimg=./$LOWER.dimg --newDimg=./$UPPER.dimg --outDimg=./diff_$DIFF_NAME.dimg --mode=binary-diff --benchmark + $BIN_CTR_CLI dimg diff --oldDimg=./$LOWER.dimg --newDimg=./$UPPER.dimg --outDimg=./diff_$DIFF_NAME.dimg --mode=binary-diff --benchmark --threadNum $THREAD_NUM done # packing diff data @@ -79,7 +79,7 @@ for ((i=0; i < $(expr ${#IMAGE_VERSIONS[@]} - 1); i++));do for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark diff $DIFF_NAME file-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI dimg diff --oldDimg=./$LOWER.dimg --newDimg=./$UPPER.dimg --outDimg=./diff_file_$DIFF_NAME.dimg --mode=file-diff --benchmark + $BIN_CTR_CLI dimg diff --oldDimg=./$LOWER.dimg --newDimg=./$UPPER.dimg --outDimg=./diff_file_$DIFF_NAME.dimg --mode=file-diff --benchmark --threadNum $THREAD_NUM done # packing diff data and test it @@ -109,14 +109,14 @@ fusermount3 -u /tmp/fuse for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark regen-diff $MERGED binary-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI dimg diff --oldDimg=./$IMAGE_LOWER.dimg --newDimg=./$IMAGE_UPPER.dimg --outDimg=./diff_$MERGED.dimg --mode=binary-diff --benchmark + $BIN_CTR_CLI dimg diff --oldDimg=./$IMAGE_LOWER.dimg --newDimg=./$IMAGE_UPPER.dimg --outDimg=./diff_$MERGED.dimg --mode=binary-diff --benchmark --threadNum $THREAD_NUM done ls -l diff_$MERGED.dimg for ((j=0; j < $RUN_NUM; j++));do NOW_COUNT=$(expr $j + 1) echo "Benchmark regen-diff $MERGED file-diff ($NOW_COUNT/$RUN_NUM)" - $BIN_CTR_CLI dimg diff --oldDimg=./$IMAGE_LOWER.dimg --newDimg=./$IMAGE_UPPER.dimg --outDimg=./diff_file_$MERGED.dimg --mode=file-diff --benchmark + $BIN_CTR_CLI dimg diff --oldDimg=./$IMAGE_LOWER.dimg --newDimg=./$IMAGE_UPPER.dimg --outDimg=./diff_file_$MERGED.dimg --mode=file-diff --benchmark --threadNum $THREAD_NUM done ls -l diff_file_$MERGED.dimg