From 80112b5a9c3c8d0cc4f36a658622aef176f4615e Mon Sep 17 00:00:00 2001 From: Qiu Jian Date: Sun, 26 Jan 2025 12:49:32 +0800 Subject: [PATCH] fix: image support s3 parallel --- cmd/climc/shell/compute/hosts.go | 2 +- go.mod | 4 +- go.sum | 8 +- .../storageman/diskhandlers/diskhandler.go | 1 - pkg/image/drivers/s3/minio.go | 19 +- pkg/image/models/images.go | 59 +-- pkg/image/models/storage.go | 2 +- pkg/image/options/options.go | 3 + pkg/image/service/service.go | 1 + vendor/modules.txt | 4 +- .../cloudmux/pkg/cloudprovider/objectstore.go | 455 ++++++++++++++++-- .../pkg/multicloud/objectstore/shell.go | 59 ++- vendor/yunion.io/x/pkg/util/qemuimgfmt/fmt.go | 5 +- 13 files changed, 497 insertions(+), 125 deletions(-) diff --git a/cmd/climc/shell/compute/hosts.go b/cmd/climc/shell/compute/hosts.go index d8ebc3958b3..3c0be87d7d0 100644 --- a/cmd/climc/shell/compute/hosts.go +++ b/cmd/climc/shell/compute/hosts.go @@ -349,7 +349,7 @@ func init() { ID string `help:"ID or name of host"` IMAGE string `help:"ID or name of image"` Force bool `help:"Force refresh cache, even if the image exists in cache"` - Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd"` + Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd|tgz"` } R(&HostCacheImageActionOptions{}, "host-cache-image", "Ask a host to cache a image", func(s *mcclient.ClientSession, args *HostCacheImageActionOptions) error { params := jsonutils.NewDict() diff --git a/go.mod b/go.mod index 020688e0e67..7d4a79d9eac 100644 --- a/go.mod +++ b/go.mod @@ -93,12 +93,12 @@ require ( k8s.io/cri-api v0.22.17 k8s.io/klog/v2 v2.20.0 moul.io/http2curl/v2 v2.3.0 - yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75 + yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09 yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256 yunion.io/x/jsonutils v1.0.1-0.20240930100528-1671a2d0d22f yunion.io/x/log v1.0.1-0.20240305175729-7cf2d6cd5a91 yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900 - yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07 + yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882 yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1 yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1 yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c diff --git a/go.sum b/go.sum index d8b976eb1dc..5e3898ec8de 100644 --- a/go.sum +++ b/go.sum @@ -1376,8 +1376,8 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= -yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75 h1:3GfPFiOXGcNlbMZbLg2TTov7VdRx2FS+FzrHQsmj64o= -yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y= +yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09 h1:xfZ2R9kuaqbxASpc1ohWHMswEQwaAUExoVg8gob7Iqo= +yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y= yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256 h1:kLKQ6zbgPDQflRwoHFAjxNChcbhXIFgsUVLkJwiXu/8= yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256/go.mod h1:Uxuou9WQIeJXNpy7t2fPLL0BYLvLiMvGQwY7Qc6aSws= yunion.io/x/jsonutils v0.0.0-20190625054549-a964e1e8a051/go.mod h1:4N0/RVzsYL3kH3WE/H1BjUQdFiWu50JGCFQuuy+Z634= @@ -1391,8 +1391,8 @@ yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900 h1:Hu/4ERvoWaN6aiFs4h4/yvVB yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900/go.mod h1:0vLkNEhlmA64HViPBAnSTUMrx5QP1CLsxXmxDKQ80tc= yunion.io/x/pkg v0.0.0-20190620104149-945c25821dbf/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E= yunion.io/x/pkg v0.0.0-20190628082551-f4033ba2ea30/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E= -yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07 h1:XZsL8+YpgVBHMcRwdpZ4cOfSZCc+KsgUCTdV3KeK8FI= -yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM= +yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882 h1:Nl4qngZ4mrVe2xsMVrCHLe9t+nY/KpaBXO9a0sTbg4M= +yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM= yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1 h1:1KJ3YYinydPHpDEQRXdr/T8SYcKZ5Er+m489H+PnaQ4= yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1/go.mod h1:0iFKpOs1y4lbCxeOmq3Xx/0AcQoewVPwj62eRluioEo= yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1 h1:HWPqY1I5JSmM6Sks6FyK9hnq/MjL7FDghM6M8DXHob0= diff --git a/pkg/hostman/storageman/diskhandlers/diskhandler.go b/pkg/hostman/storageman/diskhandlers/diskhandler.go index f73ed8d29e2..1de7d9eb500 100644 --- a/pkg/hostman/storageman/diskhandlers/diskhandler.go +++ b/pkg/hostman/storageman/diskhandlers/diskhandler.go @@ -140,7 +140,6 @@ func perfetchImageCache(ctx context.Context, w http.ResponseWriter, r *http.Requ func deleteImageCache(ctx context.Context, w http.ResponseWriter, r *http.Request) { performImageCache(ctx, w, r, "delete") - } func getDiskStatus(ctx context.Context, w http.ResponseWriter, r *http.Request) { diff --git a/pkg/image/drivers/s3/minio.go b/pkg/image/drivers/s3/minio.go index 122e017dc22..1a13c9707f4 100644 --- a/pkg/image/drivers/s3/minio.go +++ b/pkg/image/drivers/s3/minio.go @@ -21,7 +21,6 @@ import ( "os" "yunion.io/x/cloudmux/pkg/cloudprovider" - "yunion.io/x/cloudmux/pkg/multicloud" "yunion.io/x/cloudmux/pkg/multicloud/objectstore" "yunion.io/x/log" "yunion.io/x/pkg/errors" @@ -54,11 +53,14 @@ func (c *S3Client) getBucket() (cloudprovider.ICloudBucket, error) { return c.osc.GetIBucketByName(c.bucket) } -func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool) error { +func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool, signVer string) error { if client != nil { return nil } cfg := objectstore.NewObjectStoreClientConfig(endpoint, accessKey, secretKey) + if len(signVer) > 0 { + cfg.SignVersion(objectstore.S3SignVersion(signVer)) + } minioClient, err := objectstore.NewObjectStoreClient(cfg) if err != nil { return errors.Wrap(err, "new minio client") @@ -88,7 +90,7 @@ func ensureBucket() error { return nil } -func PutStream(ctx context.Context, file io.Reader, fSize int64, objName string, progresser func(saved int64)) (string, error) { +func PutStream(ctx context.Context, file io.ReaderAt, fSize int64, objName string, partSizeMb int64, parallel int, progresser func(saved int64)) (string, error) { if client == nil { return "", ErrClientNotInit } @@ -96,13 +98,12 @@ func PutStream(ctx context.Context, file io.Reader, fSize int64, objName string, if err != nil { return "", errors.Wrap(err, "client.getBucket") } - const blockSizeMB = 100 - pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) { + /* pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) { if progresser != nil { progresser(int64(float64(ratio) * float64(fSize))) } - }) - err = cloudprovider.UploadObject(ctx, bucket, objName, blockSizeMB*1000*1000, pFile, fSize, cloudprovider.ACLPrivate, "", nil, false) + }) */ + err = cloudprovider.UploadObjectParallel(ctx, bucket, objName, partSizeMb*1000*1000, file, fSize, cloudprovider.ACLPrivate, "", nil, false, parallel) if err != nil { return "", errors.Wrap(err, "cloudprovider.UploadObject") } @@ -110,7 +111,7 @@ func PutStream(ctx context.Context, file io.Reader, fSize int64, objName string, return client.Location(objName), nil } -func Put(ctx context.Context, filePath, objName string, progresser func(int64)) (string, error) { +func Put(ctx context.Context, filePath, objName string, partSizeMb int64, parallel int, progresser func(int64)) (string, error) { finfo, err := os.Stat(filePath) if err != nil { return "", errors.Wrap(err, "os.Stat") @@ -121,7 +122,7 @@ func Put(ctx context.Context, filePath, objName string, progresser func(int64)) return "", errors.Wrap(err, "os.Open") } defer file.Close() - return PutStream(ctx, file, fSize, objName, progresser) + return PutStream(ctx, file, fSize, objName, partSizeMb, parallel, progresser) } func Get(ctx context.Context, fileName string) (int64, io.ReadCloser, error) { diff --git a/pkg/image/models/images.go b/pkg/image/models/images.go index 87b675eb76a..d68817a495b 100644 --- a/pkg/image/models/images.go +++ b/pkg/image/models/images.go @@ -224,7 +224,12 @@ func (self *SImage) CustomizedGetDetailsBody(ctx context.Context, userCred mccli if self.IsGuestImage.IsFalse() { formatStr := jsonutils.GetAnyString(query, []string{"format", "disk_format"}) - if len(formatStr) > 0 && formatStr != api.IMAGE_DISK_FORMAT_TGZ { + if len(formatStr) > 0 { + subImages := ImageSubformatManager.GetAllSubImages(self.Id) + if len(subImages) == 1 { + // ignore format field + formatStr = subImages[0].Format + } subimg := ImageSubformatManager.FetchSubImage(self.Id, formatStr) if subimg != nil { if strings.HasPrefix(subimg.Location, api.LocalFilePrefix) { @@ -648,7 +653,7 @@ func (self *SImage) PostCreate(ctx context.Context, userCred mcclient.TokenCrede // After image probe and customization, image size and checksum changed // will recalculate checksum in the end -func (self *SImage) StartImagePipeline( +func (image *SImage) StartImagePipeline( ctx context.Context, userCred mcclient.TokenCredential, skipProbe bool, ) error { data := jsonutils.NewDict() @@ -656,7 +661,7 @@ func (self *SImage) StartImagePipeline( data.Set("skip_probe", jsonutils.JSONTrue) } task, err := taskman.TaskManager.NewTask( - ctx, "ImagePipelineTask", self, userCred, data, "", "", nil) + ctx, "ImagePipelineTask", image, userCred, data, "", "", nil) if err != nil { return err } @@ -1421,33 +1426,33 @@ func (self *SImage) IsIso() bool { return self.DiskFormat == string(api.ImageTypeISO) } -func (self *SImage) isActive(useFast bool, noChecksum bool) bool { - active, reason := isActive(self.GetLocalLocation(), self.Size, self.Checksum, self.FastHash, useFast, noChecksum) +func (image *SImage) isActive(useFast bool, noChecksum bool) bool { + active, reason := isActive(image.GetLocalLocation(), image.Size, image.Checksum, image.FastHash, useFast, noChecksum) if active || reason != FileChecksumMismatch { return active } data := jsonutils.NewDict() - data.Set("name", jsonutils.NewString(self.Name)) + data.Set("name", jsonutils.NewString(image.Name)) notifyclient.SystemExceptionNotifyWithResult(context.TODO(), noapi.ActionChecksumTest, noapi.TOPIC_RESOURCE_IMAGE, noapi.ResultFailed, data) return false } -func (self *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) { - if utils.IsInStringArray(self.Status, api.ImageDeadStatus) { +func (image *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) { + if utils.IsInStringArray(image.Status, api.ImageDeadStatus) { return } - if IsCheckStatusEnabled(self) { - if self.isActive(useFast, true) { - if self.Status != api.IMAGE_STATUS_ACTIVE { - self.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active") + if IsCheckStatusEnabled(image) { + if image.isActive(useFast, true) { + if image.Status != api.IMAGE_STATUS_ACTIVE { + image.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active") } - if len(self.FastHash) == 0 { - fastHash, err := fileutils2.FastCheckSum(self.GetLocalLocation()) + if len(image.FastHash) == 0 { + fastHash, err := fileutils2.FastCheckSum(image.GetLocalLocation()) if err != nil { log.Errorf("DoCheckStatus fileutils2.FastChecksum fail %s", err) } else { - _, err := db.Update(self, func() error { - self.FastHash = fastHash + _, err := db.Update(image, func() error { + image.FastHash = fastHash return nil }) if err != nil { @@ -1455,33 +1460,33 @@ func (self *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCr } } } - img, err := qemuimg.NewQemuImage(self.GetLocalLocation()) + img, err := qemuimg.NewQemuImage(image.GetLocalLocation()) if err == nil { format := string(img.Format) virtualSizeMB := int32(img.SizeBytes / 1024 / 1024) - if (len(format) > 0 && self.DiskFormat != format) || (virtualSizeMB > 0 && self.MinDiskMB != virtualSizeMB) { - db.Update(self, func() error { + if (len(format) > 0 && image.DiskFormat != format) || (virtualSizeMB > 0 && image.MinDiskMB != virtualSizeMB) { + db.Update(image, func() error { if len(format) > 0 { - self.DiskFormat = format + image.DiskFormat = format } - if virtualSizeMB > 0 && self.MinDiskMB < virtualSizeMB { - self.MinDiskMB = virtualSizeMB + if virtualSizeMB > 0 && image.MinDiskMB < virtualSizeMB { + image.MinDiskMB = virtualSizeMB } return nil }) } } else { - log.Warningf("fail to check image size of %s(%s)", self.Id, self.Name) + log.Warningf("fail to check image size of %s(%s)", image.Id, image.Name) } } else { - if self.Status != api.IMAGE_STATUS_QUEUED { - self.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive") + if image.Status != api.IMAGE_STATUS_QUEUED { + image.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive") } } } - if self.Status == api.IMAGE_STATUS_ACTIVE { - self.StartImagePipeline(ctx, userCred, true) + if image.Status == api.IMAGE_STATUS_ACTIVE { + image.StartImagePipeline(ctx, userCred, true) } } diff --git a/pkg/image/models/storage.go b/pkg/image/models/storage.go index 9543c157ff4..a1e7c02d89d 100644 --- a/pkg/image/models/storage.go +++ b/pkg/image/models/storage.go @@ -159,7 +159,7 @@ func (s *S3Storage) SaveImage(ctx context.Context, imagePath string, progresser if !fileutils2.IsFile(imagePath) { return "", fmt.Errorf("%s not valid file", imagePath) } - return s3.Put(ctx, imagePath, imagePathToName(imagePath), progresser) + return s3.Put(ctx, imagePath, imagePathToName(imagePath), options.Options.S3UploadPartSizeMb, options.Options.S3UploadParallel, progresser) } func (s *S3Storage) CleanTempfile(filePath string) error { diff --git a/pkg/image/options/options.go b/pkg/image/options/options.go index e1a308dcd3f..e7d42ab5c7c 100644 --- a/pkg/image/options/options.go +++ b/pkg/image/options/options.go @@ -51,6 +51,9 @@ type SImageOptions struct { S3BucketName string `help:"s3 bucket name" default:"onecloud-images"` S3MountPoint string `help:"s3fs mount point" default:"/opt/cloud/workspace/data/glance/s3images"` S3CheckImageStatus bool `help:"Enable s3 check image status"` + S3SignVersion string `help:"signing version"` + S3UploadPartSizeMb int64 `help:"s3 upload part size in MB, default to 50MB" default:"50"` + S3UploadParallel int `help:"s3 upload parallel count" default:"1"` ImageStreamWorkerCount int `help:"Image stream worker count" default:"10"` } diff --git a/pkg/image/service/service.go b/pkg/image/service/service.go index 77a57006d86..295757416ad 100644 --- a/pkg/image/service/service.go +++ b/pkg/image/service/service.go @@ -208,6 +208,7 @@ func initS3() { options.Options.S3SecretKey, options.Options.S3BucketName, options.Options.S3UseSSL, + options.Options.S3SignVersion, ) if err != nil { log.Fatalf("failed init s3 client %s", err) diff --git a/vendor/modules.txt b/vendor/modules.txt index de3f04815e8..4fd20c8ac10 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1785,7 +1785,7 @@ sigs.k8s.io/structured-merge-diff/v4/value # sigs.k8s.io/yaml v1.2.0 ## explicit; go 1.12 sigs.k8s.io/yaml -# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75 +# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09 ## explicit; go 1.21 yunion.io/x/cloudmux/pkg/apis yunion.io/x/cloudmux/pkg/apis/billing @@ -1882,7 +1882,7 @@ yunion.io/x/log/hooks yunion.io/x/ovsdb/cli_util yunion.io/x/ovsdb/schema/ovn_nb yunion.io/x/ovsdb/types -# yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07 +# yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882 ## explicit; go 1.18 yunion.io/x/pkg/appctx yunion.io/x/pkg/errors diff --git a/vendor/yunion.io/x/cloudmux/pkg/cloudprovider/objectstore.go b/vendor/yunion.io/x/cloudmux/pkg/cloudprovider/objectstore.go index 58e5142347c..149250c35c8 100644 --- a/vendor/yunion.io/x/cloudmux/pkg/cloudprovider/objectstore.go +++ b/vendor/yunion.io/x/cloudmux/pkg/cloudprovider/objectstore.go @@ -23,11 +23,13 @@ import ( "sort" "strconv" "strings" + "sync" "time" "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/streamutils" "yunion.io/x/s3cli" ) @@ -185,6 +187,7 @@ type SListObjectResult struct { IsTruncated bool } +// range start from 0 type SGetObjectRange struct { Start int64 End int64 @@ -492,6 +495,89 @@ func Makedir(ctx context.Context, bucket ICloudBucket, key string) error { } func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.Reader, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool) error { + return UploadObjectParallel(ctx, bucket, key, blocksz, newReaderAt(input), sizeBytes, cannedAcl, storageClass, meta, debug, 1) +} + +type sSeqReader struct { + reader io.Reader + offset int64 +} + +func newReaderAt(input io.Reader) io.ReaderAt { + return &sSeqReader{ + reader: input, + offset: 0, + } +} + +func (sr *sSeqReader) ReadAt(p []byte, offset int64) (int, error) { + return sr.reader.Read(p) +} + +type sOffsetReader struct { + readerAt io.ReaderAt + offset int64 +} + +func newReader(input io.ReaderAt, inputOffset int64) io.Reader { + return &sOffsetReader{ + readerAt: input, + offset: inputOffset, + } +} + +func (or *sOffsetReader) Read(p []byte) (int, error) { + n, err := or.readerAt.ReadAt(p, or.offset) + or.offset += int64(n) + return n, err +} + +type uploadPartOfMultipartJob struct { + ctx context.Context + bucket ICloudBucket + key string + input io.Reader + sizeBytes int64 + uploadId string + partIndex int + partSize int64 + offset int64 + debug bool + etags []string + errs []error +} + +func uploadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan uploadPartOfMultipartJob) { + defer wg.Done() + for job := range queue { + tag, err := uploadPartOfMultipart(job.ctx, job.bucket, job.key, job.input, job.sizeBytes, job.uploadId, job.partIndex, job.partSize, job.offset, job.debug) + if err != nil { + job.errs = append(job.errs, err) + } else { + job.etags[job.partIndex] = tag + } + } +} + +func uploadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, input io.Reader, sizeBytes int64, uploadId string, partIndex int, partSize int64, offset int64, debug bool) (string, error) { + var startAt time.Time + if debug { + startAt = time.Now() + log.Debugf("UploadPart %d %d", partIndex+1, partSize) + } + etag, err := bucket.UploadPart(ctx, key, uploadId, partIndex+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes) + if err != nil { + return "", errors.Wrapf(err, "bucket.UploadPart %d", partIndex) + } + if debug { + duration := time.Since(startAt) + rateMbps := calculateRateMbps(partSize, duration) + log.Debugf("End of uploadPart %d %d takes %f seconds at %fMbps", partIndex+1, partSize, float64(duration)/float64(time.Second), rateMbps) + } + return etag, nil +} + +func UploadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, blocksz int64, input io.ReaderAt, sizeBytes int64, cannedAcl TBucketACLType, storageClass string, meta http.Header, debug bool, parallel int) error { if blocksz <= 0 { blocksz = MAX_PUT_OBJECT_SIZEBYTES } @@ -499,7 +585,7 @@ func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz if debug { log.Debugf("too small, put object in one shot") } - return bucket.PutObject(ctx, key, input, sizeBytes, cannedAcl, storageClass, meta) + return bucket.PutObject(ctx, key, newReader(input, 0), sizeBytes, cannedAcl, storageClass, meta) } partSize := blocksz partCount := sizeBytes / partSize @@ -524,24 +610,52 @@ func UploadObject(ctx context.Context, bucket ICloudBucket, key string, blocksz return errors.Wrap(err, "bucket.NewMultipartUpload") } etags := make([]string, partCount) - offset := int64(0) - for i := 0; i < int(partCount); i += 1 { - if i == int(partCount)-1 { - partSize = sizeBytes - partSize*(partCount-1) + var errs []error + { + if parallel < 1 { + parallel = 1 } - if debug { - log.Debugf("UploadPart %d %d", i+1, partSize) + queue := make(chan uploadPartOfMultipartJob, parallel) + wg := &sync.WaitGroup{} + for i := 0; i < parallel; i++ { + wg.Add(1) + go uploadPartOfMultipartWorker(wg, queue) } - etag, err := bucket.UploadPart(ctx, key, uploadId, i+1, io.LimitReader(input, partSize), partSize, offset, sizeBytes) - if err != nil { - err2 := bucket.AbortMultipartUpload(ctx, key, uploadId) - if err2 != nil { - log.Errorf("bucket.AbortMultipartUpload error %s", err2) + for i := 0; i < int(partCount); i += 1 { + offset := int64(i) * partSize + blockSize := partSize + if i == int(partCount)-1 { + blockSize = sizeBytes - partSize*(partCount-1) } - return errors.Wrap(err, "bucket.UploadPart") + partIndex := i + job := uploadPartOfMultipartJob{ + ctx: ctx, + bucket: bucket, + key: key, + input: newReader(input, offset), + sizeBytes: sizeBytes, + uploadId: uploadId, + partIndex: partIndex, + partSize: blockSize, + offset: offset, + debug: debug, + etags: etags, + errs: errs, + } + queue <- job } - offset += partSize - etags[i] = etag + close(queue) + wg.Wait() + } + + if len(errs) > 0 { + // upload part error + err2 := bucket.AbortMultipartUpload(ctx, key, uploadId) + if err2 != nil { + log.Errorf("bucket.AbortMultipartUpload error %s", err2) + errs = append(errs, err2) + } + return errors.Wrap(errors.NewAggregate(errs), "uploadPartOfMultipart") } err = bucket.CompleteMultipartUpload(ctx, key, uploadId, etags) if err != nil { @@ -592,7 +706,61 @@ func MergeMeta(src http.Header, dst http.Header) http.Header { } func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool) error { + return CopyObjectParallel(ctx, blocksz, dstBucket, dstKey, srcBucket, srcKey, dstMeta, debug, 1) +} + +type copyPartOfMultipartJob struct { + ctx context.Context + dstBucket ICloudBucket + dstKey string + srcBucket ICloudBucket + srcKey string + rangeOpt *SGetObjectRange + sizeBytes int64 + uploadId string + partIndex int + debug bool + etags []string + errs []error +} + +func copyPartOfMultipartWorker(wg *sync.WaitGroup, queue chan copyPartOfMultipartJob) { + defer wg.Done() + for job := range queue { + tag, err := copyPartOfMultipart(job.ctx, job.dstBucket, job.dstKey, job.srcBucket, job.srcKey, job.rangeOpt, job.sizeBytes, job.uploadId, job.partIndex, job.debug) + if err != nil { + job.errs = append(job.errs, err) + } else { + job.etags[job.partIndex] = tag + } + } +} + +func copyPartOfMultipart(ctx context.Context, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange, sizeBytes int64, uploadId string, partIndex int, debug bool) (string, error) { + partSize := rangeOpt.SizeBytes() + var startAt time.Time + if debug { + startAt = time.Now() + log.Debugf("CopyPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes()) + } + srcStream, err := srcBucket.GetObject(ctx, srcKey, rangeOpt) + if err != nil { + return "", errors.Wrapf(err, "srcBucket.GetObject %d", partIndex) + } + defer srcStream.Close() + etag, err := dstBucket.UploadPart(ctx, dstKey, uploadId, partIndex+1, io.LimitReader(srcStream, partSize), partSize, rangeOpt.Start, sizeBytes) + if err != nil { + return "", errors.Wrapf(err, "dstBucket.UploadPart %d", partIndex) + } + if debug { + duration := time.Since(startAt) + rateMbps := calculateRateMbps(partSize, duration) + log.Debugf("End of copyPart %d %d range: %s (%d) takes %d seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), duration/time.Second, rateMbps) + } + return etag, nil +} +func CopyObjectParallel(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstKey string, srcBucket ICloudBucket, srcKey string, dstMeta http.Header, debug bool, parallel int) error { srcObj, err := GetIObject(srcBucket, srcKey) if err != nil { return errors.Wrap(err, "GetIObject") @@ -638,39 +806,59 @@ func CopyObject(ctx context.Context, blocksz int64, dstBucket ICloudBucket, dstK if err != nil { return errors.Wrap(err, "bucket.NewMultipartUpload") } + etags := make([]string, partCount) - offset := int64(0) - for i := 0; i < int(partCount); i += 1 { - start := int64(i) * partSize - if i == int(partCount)-1 { - partSize = sizeBytes - partSize*(partCount-1) + var errs []error + { + if parallel < 1 { + parallel = 1 } - end := start + partSize - 1 - rangeOpt := SGetObjectRange{ - Start: start, - End: end, + queue := make(chan copyPartOfMultipartJob, parallel) + var wg sync.WaitGroup + for i := 0; i < parallel; i++ { + wg.Add(1) + go copyPartOfMultipartWorker(&wg, queue) } - if debug { - log.Debugf("UploadPart %d %d range: %s (%d)", i+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes()) - } - srcStream, err := srcBucket.GetObject(ctx, srcKey, &rangeOpt) - if err == nil { - defer srcStream.Close() - var etag string - etag, err = dstBucket.UploadPart(ctx, dstKey, uploadId, i+1, io.LimitReader(srcStream, partSize), partSize, offset, sizeBytes) - if err == nil { - etags[i] = etag - continue + + for i := 0; i < int(partCount); i += 1 { + start := int64(i) * partSize + blockSize := partSize + if i == int(partCount)-1 { + blockSize = sizeBytes - partSize*(partCount-1) } - } - offset += partSize - if err != nil { - err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId) - if err2 != nil { - log.Errorf("bucket.AbortMultipartUpload error %s", err2) + end := start + blockSize - 1 + rangeOpt := SGetObjectRange{ + Start: start, + End: end, } - return errors.Wrap(err, "bucket.UploadPart") + partIndex := i + job := copyPartOfMultipartJob{ + ctx: ctx, + dstBucket: dstBucket, + dstKey: dstKey, + srcBucket: srcBucket, + srcKey: srcKey, + rangeOpt: &rangeOpt, + sizeBytes: sizeBytes, + uploadId: uploadId, + partIndex: partIndex, + debug: debug, + etags: etags, + errs: errs, + } + queue <- job + } + close(queue) + wg.Wait() + } + if len(errs) > 0 { + // upload part error + err2 := dstBucket.AbortMultipartUpload(ctx, dstKey, uploadId) + if err2 != nil { + log.Errorf("bucket.AbortMultipartUpload error %s", err2) + errs = append(errs, err2) } + return errors.Wrap(errors.NewAggregate(errs), "copyPartOfMultipart") } err = dstBucket.CompleteMultipartUpload(ctx, dstKey, uploadId, etags) if err != nil { @@ -687,17 +875,7 @@ func CopyPart(ctx context.Context, iDstBucket ICloudBucket, dstKey string, uploadId string, partNumber int, iSrcBucket ICloudBucket, srcKey string, rangeOpt *SGetObjectRange, ) (string, error) { - srcReader, err := iSrcBucket.GetObject(ctx, srcKey, rangeOpt) - if err != nil { - return "", errors.Wrap(err, "iSrcBucket.GetObject") - } - defer srcReader.Close() - - etag, err := iDstBucket.UploadPart(ctx, dstKey, uploadId, partNumber, io.LimitReader(srcReader, rangeOpt.SizeBytes()), rangeOpt.SizeBytes(), 0, 0) - if err != nil { - return "", errors.Wrap(err, "iDstBucket.UploadPart") - } - return etag, nil + return copyPartOfMultipart(ctx, iDstBucket, dstKey, iSrcBucket, srcKey, rangeOpt, 0, uploadId, partNumber, false) } func ObjectSetMeta(ctx context.Context, @@ -854,3 +1032,174 @@ func SetBucketTags(ctx context.Context, iBucket ICloudBucket, mangerId string, t } return ret, SetTags(ctx, iBucket, mangerId, tags, true) } + +type sOffsetWriter struct { + writerAt io.WriterAt + offset int64 +} + +func newWriter(output io.WriterAt, outputOffset int64) io.Writer { + return &sOffsetWriter{ + writerAt: output, + offset: outputOffset, + } +} + +func (ow *sOffsetWriter) Write(p []byte) (int, error) { + n, err := ow.writerAt.WriteAt(p, ow.offset) + ow.offset += int64(n) + return n, err +} + +func calculateRateMbps(sizeBytes int64, duration time.Duration) float64 { + return float64(sizeBytes*8*int64(time.Second)) / float64(duration) / 1000 / 1000 +} + +type downloadPartOfMultipartJob struct { + ctx context.Context + bucket ICloudBucket + key string + rangeOpt *SGetObjectRange + output io.Writer + partIndex int + debug bool + segSizes []int64 + errs []error +} + +func downloadPartOfMultipartWorker(wg *sync.WaitGroup, queue chan downloadPartOfMultipartJob) { + defer wg.Done() + for job := range queue { + sz, err := downloadPartOfMultipart(job.ctx, job.bucket, job.key, job.rangeOpt, job.output, job.partIndex, job.debug) + if err != nil { + job.errs = append(job.errs, err) + } else { + job.segSizes[job.partIndex] = sz + } + } +} + +func downloadPartOfMultipart(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.Writer, partIndex int, debug bool) (int64, error) { + partSize := rangeOpt.SizeBytes() + var startAt time.Time + if debug { + startAt = time.Now() + log.Debugf("downloadPart %d %d range: %s (%d)", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes()) + } + stream, err := bucket.GetObject(ctx, key, rangeOpt) + if err != nil { + return 0, errors.Wrap(err, "bucket.GetObject") + } + defer stream.Close() + prop, err := streamutils.StreamPipe(stream, output, false, nil) + if err != nil { + return 0, errors.Wrap(err, "StreamPipe") + } + if debug { + duration := time.Since(startAt) + rateMbps := calculateRateMbps(partSize, duration) + log.Debugf("End of downloadPart %d %d range: %s (%d) takes %f seconds at %fMbps", partIndex+1, partSize, rangeOpt.String(), rangeOpt.SizeBytes(), float64(duration)/float64(time.Second), rateMbps) + } + return prop.Size, nil +} + +func DownloadObjectParallel(ctx context.Context, bucket ICloudBucket, key string, rangeOpt *SGetObjectRange, output io.WriterAt, outputOffset int64, blocksz int64, debug bool, parallel int) (int64, error) { + obj, err := GetIObject(bucket, key) + if err != nil { + return 0, errors.Wrap(err, "GetIObject") + } + if blocksz <= 0 { + blocksz = MAX_PUT_OBJECT_SIZEBYTES + } + sizeBytes := obj.GetSizeBytes() + if rangeOpt == nil { + rangeOpt = &SGetObjectRange{ + Start: 0, + End: sizeBytes - 1, + } + } else { + if rangeOpt.End < rangeOpt.Start { + tmp := rangeOpt.Start + rangeOpt.Start = rangeOpt.End + rangeOpt.End = tmp + } + if rangeOpt.End >= sizeBytes { + rangeOpt.End = sizeBytes - 1 + } + if rangeOpt.Start < 0 { + rangeOpt.Start = 0 + } + sizeBytes = rangeOpt.SizeBytes() + } + if sizeBytes < blocksz { + if debug { + log.Debugf("too small, download object in one shot") + } + size, err := downloadPartOfMultipart(ctx, bucket, key, rangeOpt, newWriter(output, outputOffset), 0, true) + if err != nil { + return 0, errors.Wrap(err, "downloadPartOfMultipart") + } + return size, nil + } + partSize := blocksz + partCount := sizeBytes / partSize + if partCount*partSize < sizeBytes { + partCount += 1 + } + if debug { + log.Debugf("multipart download part count %d part size %d", partCount, partSize) + } + + var errs []error + segSizes := make([]int64, partCount) + { + if parallel < 1 { + parallel = 1 + } + queue := make(chan downloadPartOfMultipartJob, parallel) + wg := &sync.WaitGroup{} + for i := 0; i < parallel; i++ { + wg.Add(1) + go downloadPartOfMultipartWorker(wg, queue) + } + for i := 0; i < int(partCount); i += 1 { + dstOffset := outputOffset + int64(i)*partSize + start := rangeOpt.Start + int64(i)*partSize + if i == int(partCount)-1 { + partSize = sizeBytes - partSize*(partCount-1) + } + end := start + partSize - 1 + srcRangeOpt := SGetObjectRange{ + Start: start, + End: end, + } + + partIndex := i + + job := downloadPartOfMultipartJob{ + ctx: ctx, + bucket: bucket, + key: key, + rangeOpt: &srcRangeOpt, + output: newWriter(output, dstOffset), + partIndex: partIndex, + debug: debug, + segSizes: segSizes, + errs: errs, + } + queue <- job + } + close(queue) + wg.Wait() + } + + if len(errs) > 0 { + return 0, errors.Wrap(errors.NewAggregate(errs), "downloadPartOfMultipart") + } + + totalSize := int64(0) + for i := range segSizes { + totalSize += segSizes[i] + } + return totalSize, nil +} diff --git a/vendor/yunion.io/x/cloudmux/pkg/multicloud/objectstore/shell.go b/vendor/yunion.io/x/cloudmux/pkg/multicloud/objectstore/shell.go index f0aeacad3ce..8d25c9b72f7 100644 --- a/vendor/yunion.io/x/cloudmux/pkg/multicloud/objectstore/shell.go +++ b/vendor/yunion.io/x/cloudmux/pkg/multicloud/objectstore/shell.go @@ -29,7 +29,6 @@ import ( "yunion.io/x/pkg/util/fileutils" "yunion.io/x/pkg/util/printutils" "yunion.io/x/pkg/util/shellutils" - "yunion.io/x/pkg/util/streamutils" "yunion.io/x/cloudmux/pkg/cloudprovider" ) @@ -275,7 +274,7 @@ func S3Shell() { Demiliter string `help:"delimiter"` Max int `help:"Max count"` } - shellutils.R(&BucketObjectsOptions{}, "bucket-object", "List objects in a bucket", func(cli cloudprovider.ICloudRegion, args *BucketObjectsOptions) error { + objectListFunc := func(cli cloudprovider.ICloudRegion, args *BucketObjectsOptions) error { bucket, err := cli.GetIBucketById(args.BUCKET) if err != nil { return err @@ -292,7 +291,9 @@ func S3Shell() { fmt.Println("Objects:") printutils.PrintGetterList(result.Objects, []string{"key", "size_bytes"}) return nil - }) + } + shellutils.R(&BucketObjectsOptions{}, "bucket-object", "List objects in a bucket (deprecated)", objectListFunc) + shellutils.R(&BucketObjectsOptions{}, "object-list", "List objects in a bucket", objectListFunc) type BucketListObjectsOptions struct { BUCKET string `help:"name of bucket to list objects"` @@ -360,9 +361,11 @@ func S3Shell() { StorageClass string `help:"storage class"` + Parallel int `help:"upload object parts in parallel"` + ObjectHeaderOptions } - shellutils.R(&BucketPutObjectOptions{}, "put-object", "Put object into a bucket", func(cli cloudprovider.ICloudRegion, args *BucketPutObjectOptions) error { + objectPutFunc := func(cli cloudprovider.ICloudRegion, args *BucketPutObjectOptions) error { bucket, err := cli.GetIBucketById(args.BUCKET) if err != nil { return err @@ -401,7 +404,7 @@ func S3Shell() { } } - err = cloudprovider.UploadObject(context.Background(), bucket, key, args.BlockSize*1000*1000, file, fSize, cloudprovider.TBucketACLType(args.Acl), args.StorageClass, meta, true) + err = cloudprovider.UploadObjectParallel(context.Background(), bucket, key, args.BlockSize*1000*1000, file, fSize, cloudprovider.TBucketACLType(args.Acl), args.StorageClass, meta, true, args.Parallel) if err != nil { return err } @@ -440,7 +443,9 @@ func S3Shell() { fmt.Printf("Upload success\n") return nil - }) + } + shellutils.R(&BucketPutObjectOptions{}, "put-object", "Put object into a bucket (deprecated)", objectPutFunc) + shellutils.R(&BucketPutObjectOptions{}, "object-upload", "Upload object into a bucket", objectPutFunc) type BucketDeleteObjectOptions struct { BUCKET string `help:"name of bucket to put object"` @@ -834,6 +839,9 @@ func S3Shell() { return err } uplaods, err := bucket.ListMultipartUploads() + if err != nil { + return err + } printList(uplaods, len(uplaods), 0, len(uplaods), nil) return nil }) @@ -873,6 +881,10 @@ func S3Shell() { Output string `help:"target output, default to stdout"` Start int64 `help:"partial download start"` End int64 `help:"partial download end"` + + BlockSize int64 `help:"blocksz in MB" default:"100"` + + Parallel int `help:"upload object parts in parallel"` } shellutils.R(&BucketObjectDownloadOptions{}, "object-download", "Download", func(cli cloudprovider.ICloudRegion, args *BucketObjectDownloadOptions) error { bucket, err := cli.GetIBucketById(args.BUCKET) @@ -891,12 +903,7 @@ func S3Shell() { } rangeOpt = &cloudprovider.SGetObjectRange{Start: args.Start, End: args.End} } - output, err := bucket.GetObject(context.Background(), args.KEY, rangeOpt) - if err != nil { - return err - } - defer output.Close() - var target io.Writer + var target io.WriterAt if len(args.Output) == 0 { target = os.Stdout } else { @@ -907,12 +914,13 @@ func S3Shell() { defer fp.Close() target = fp } - prop, err := streamutils.StreamPipe(output, target, false, nil) + + sz, err := cloudprovider.DownloadObjectParallel(context.Background(), bucket, args.KEY, rangeOpt, target, 0, args.BlockSize*1000*1000, true, args.Parallel) if err != nil { return err } if len(args.Output) > 0 { - fmt.Println("Success:", prop.Size, "written") + fmt.Println("Success:", sz, "written") } return nil }) @@ -938,17 +946,19 @@ func S3Shell() { }) type BucketObjectCopyOptions struct { - SRC string `help:"name of source bucket"` - SRCKEY string `help:"Key of source object"` - DST string `help:"name of destination bucket"` - DSTKEY string `help:"key of destination object"` - Debug bool `help:"show debug info"` - BlockSize int64 `help:"block size in MB"` - Native bool `help:"Use native copy"` + SRC string `help:"name of source bucket"` + SRCKEY string `help:"Key of source object"` + DST string `help:"name of destination bucket"` + DSTKEY string `help:"key of destination object"` + + BlockSize int64 `help:"block size in MB"` + Native bool `help:"Use native copy"` + + Parallel int `help:"copy object parts in parallel"` ObjectHeaderOptions } - shellutils.R(&BucketObjectCopyOptions{}, "object-copy", "Copy object", func(cli cloudprovider.ICloudRegion, args *BucketObjectCopyOptions) error { + objectCopyFunc := func(cli cloudprovider.ICloudRegion, args *BucketObjectCopyOptions) error { ctx := context.Background() dstBucket, err := cli.GetIBucketByName(args.DST) if err != nil { @@ -969,14 +979,15 @@ func S3Shell() { return err } } else { - err = cloudprovider.CopyObject(ctx, args.BlockSize*1000*1000, dstBucket, args.DSTKEY, srcBucket, args.SRCKEY, meta, args.Debug) + err = cloudprovider.CopyObjectParallel(ctx, args.BlockSize*1000*1000, dstBucket, args.DSTKEY, srcBucket, args.SRCKEY, meta, true, args.Parallel) if err != nil { return err } } fmt.Println("Success!") return nil - }) + } + shellutils.R(&BucketObjectCopyOptions{}, "object-copy", "Copy object", objectCopyFunc) type ObjectMetaOptions struct { BUCKET string `help:"bucket name"` diff --git a/vendor/yunion.io/x/pkg/util/qemuimgfmt/fmt.go b/vendor/yunion.io/x/pkg/util/qemuimgfmt/fmt.go index 6425ffbb195..afadd186f70 100644 --- a/vendor/yunion.io/x/pkg/util/qemuimgfmt/fmt.go +++ b/vendor/yunion.io/x/pkg/util/qemuimgfmt/fmt.go @@ -26,10 +26,11 @@ const ( VHD = TImageFormat("vhd") ISO = TImageFormat("iso") RAW = TImageFormat("raw") + TGZ = TImageFormat("tgz") ) var supportedImageFormats = []TImageFormat{ - QCOW2, VMDK, VHD, ISO, RAW, + QCOW2, VMDK, VHD, ISO, RAW, TGZ, } func IsSupportedImageFormat(fmtStr string) bool { @@ -62,6 +63,8 @@ func String2ImageFormat(fmt string) TImageFormat { return ISO case "raw": return RAW + case "tgz", "tar": + return TGZ } // log.Fatalf("unknown image format!!! %s", fmt) return TImageFormat(fmt)