Skip to content

Commit

Permalink
fix: image support s3 parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Qiu Jian committed Jan 29, 2025
1 parent 0c4892b commit a3345e4
Show file tree
Hide file tree
Showing 13 changed files with 497 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cmd/climc/shell/compute/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func init() {
ID string `help:"ID or name of host"`
IMAGE string `help:"ID or name of image"`
Force bool `help:"Force refresh cache, even if the image exists in cache"`
Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd"`
Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd|tgz"`
}
R(&HostCacheImageActionOptions{}, "host-cache-image", "Ask a host to cache a image", func(s *mcclient.ClientSession, args *HostCacheImageActionOptions) error {
params := jsonutils.NewDict()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ require (
k8s.io/cri-api v0.22.17
k8s.io/klog/v2 v2.20.0
moul.io/http2curl/v2 v2.3.0
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256
yunion.io/x/jsonutils v1.0.1-0.20240930100528-1671a2d0d22f
yunion.io/x/log v1.0.1-0.20240305175729-7cf2d6cd5a91
yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1
yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1
yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1376,8 +1376,8 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75 h1:3GfPFiOXGcNlbMZbLg2TTov7VdRx2FS+FzrHQsmj64o=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09 h1:xfZ2R9kuaqbxASpc1ohWHMswEQwaAUExoVg8gob7Iqo=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y=
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256 h1:kLKQ6zbgPDQflRwoHFAjxNChcbhXIFgsUVLkJwiXu/8=
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256/go.mod h1:Uxuou9WQIeJXNpy7t2fPLL0BYLvLiMvGQwY7Qc6aSws=
yunion.io/x/jsonutils v0.0.0-20190625054549-a964e1e8a051/go.mod h1:4N0/RVzsYL3kH3WE/H1BjUQdFiWu50JGCFQuuy+Z634=
Expand All @@ -1391,8 +1391,8 @@ yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900 h1:Hu/4ERvoWaN6aiFs4h4/yvVB
yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900/go.mod h1:0vLkNEhlmA64HViPBAnSTUMrx5QP1CLsxXmxDKQ80tc=
yunion.io/x/pkg v0.0.0-20190620104149-945c25821dbf/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E=
yunion.io/x/pkg v0.0.0-20190628082551-f4033ba2ea30/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E=
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07 h1:XZsL8+YpgVBHMcRwdpZ4cOfSZCc+KsgUCTdV3KeK8FI=
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM=
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882 h1:Nl4qngZ4mrVe2xsMVrCHLe9t+nY/KpaBXO9a0sTbg4M=
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM=
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1 h1:1KJ3YYinydPHpDEQRXdr/T8SYcKZ5Er+m489H+PnaQ4=
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1/go.mod h1:0iFKpOs1y4lbCxeOmq3Xx/0AcQoewVPwj62eRluioEo=
yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1 h1:HWPqY1I5JSmM6Sks6FyK9hnq/MjL7FDghM6M8DXHob0=
Expand Down
1 change: 0 additions & 1 deletion pkg/hostman/storageman/diskhandlers/diskhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func perfetchImageCache(ctx context.Context, w http.ResponseWriter, r *http.Requ

func deleteImageCache(ctx context.Context, w http.ResponseWriter, r *http.Request) {
performImageCache(ctx, w, r, "delete")

}

func getDiskStatus(ctx context.Context, w http.ResponseWriter, r *http.Request) {
Expand Down
19 changes: 10 additions & 9 deletions pkg/image/drivers/s3/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"

"yunion.io/x/cloudmux/pkg/cloudprovider"
"yunion.io/x/cloudmux/pkg/multicloud"
"yunion.io/x/cloudmux/pkg/multicloud/objectstore"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
Expand Down Expand Up @@ -54,11 +53,14 @@ func (c *S3Client) getBucket() (cloudprovider.ICloudBucket, error) {
return c.osc.GetIBucketByName(c.bucket)
}

func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool) error {
func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool, signVer string) error {
if client != nil {
return nil
}
cfg := objectstore.NewObjectStoreClientConfig(endpoint, accessKey, secretKey)
if len(signVer) > 0 {
cfg.SignVersion(objectstore.S3SignVersion(signVer))
}
minioClient, err := objectstore.NewObjectStoreClient(cfg)
if err != nil {
return errors.Wrap(err, "new minio client")
Expand Down Expand Up @@ -88,29 +90,28 @@ func ensureBucket() error {
return nil
}

func PutStream(ctx context.Context, file io.Reader, fSize int64, objName string, progresser func(saved int64)) (string, error) {
func PutStream(ctx context.Context, file io.ReaderAt, fSize int64, objName string, partSizeMb int64, parallel int, progresser func(saved int64)) (string, error) {
if client == nil {
return "", ErrClientNotInit
}
bucket, err := client.getBucket()
if err != nil {
return "", errors.Wrap(err, "client.getBucket")
}
const blockSizeMB = 100
pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) {
/* pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) {
if progresser != nil {
progresser(int64(float64(ratio) * float64(fSize)))
}
})
err = cloudprovider.UploadObject(ctx, bucket, objName, blockSizeMB*1000*1000, pFile, fSize, cloudprovider.ACLPrivate, "", nil, false)
}) */
err = cloudprovider.UploadObjectParallel(ctx, bucket, objName, partSizeMb*1000*1000, file, fSize, cloudprovider.ACLPrivate, "", nil, false, parallel)
if err != nil {
return "", errors.Wrap(err, "cloudprovider.UploadObject")
}
log.Debugf("put object %s size %d", objName, fSize)
return client.Location(objName), nil
}

func Put(ctx context.Context, filePath, objName string, progresser func(int64)) (string, error) {
func Put(ctx context.Context, filePath, objName string, partSizeMb int64, parallel int, progresser func(int64)) (string, error) {
finfo, err := os.Stat(filePath)
if err != nil {
return "", errors.Wrap(err, "os.Stat")
Expand All @@ -121,7 +122,7 @@ func Put(ctx context.Context, filePath, objName string, progresser func(int64))
return "", errors.Wrap(err, "os.Open")
}
defer file.Close()
return PutStream(ctx, file, fSize, objName, progresser)
return PutStream(ctx, file, fSize, objName, partSizeMb, parallel, progresser)
}

func Get(ctx context.Context, fileName string) (int64, io.ReadCloser, error) {
Expand Down
59 changes: 32 additions & 27 deletions pkg/image/models/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func (self *SImage) CustomizedGetDetailsBody(ctx context.Context, userCred mccli

if self.IsGuestImage.IsFalse() {
formatStr := jsonutils.GetAnyString(query, []string{"format", "disk_format"})
if len(formatStr) > 0 && formatStr != api.IMAGE_DISK_FORMAT_TGZ {
if len(formatStr) > 0 {
subImages := ImageSubformatManager.GetAllSubImages(self.Id)
if len(subImages) == 1 {
// ignore format field
formatStr = subImages[0].Format
}
subimg := ImageSubformatManager.FetchSubImage(self.Id, formatStr)
if subimg != nil {
if strings.HasPrefix(subimg.Location, api.LocalFilePrefix) {
Expand Down Expand Up @@ -648,15 +653,15 @@ func (self *SImage) PostCreate(ctx context.Context, userCred mcclient.TokenCrede

// After image probe and customization, image size and checksum changed
// will recalculate checksum in the end
func (self *SImage) StartImagePipeline(
func (image *SImage) StartImagePipeline(
ctx context.Context, userCred mcclient.TokenCredential, skipProbe bool,
) error {
data := jsonutils.NewDict()
if skipProbe {
data.Set("skip_probe", jsonutils.JSONTrue)
}
task, err := taskman.TaskManager.NewTask(
ctx, "ImagePipelineTask", self, userCred, data, "", "", nil)
ctx, "ImagePipelineTask", image, userCred, data, "", "", nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1421,67 +1426,67 @@ func (self *SImage) IsIso() bool {
return self.DiskFormat == string(api.ImageTypeISO)
}

func (self *SImage) isActive(useFast bool, noChecksum bool) bool {
active, reason := isActive(self.GetLocalLocation(), self.Size, self.Checksum, self.FastHash, useFast, noChecksum)
func (image *SImage) isActive(useFast bool, noChecksum bool) bool {
active, reason := isActive(image.GetLocalLocation(), image.Size, image.Checksum, image.FastHash, useFast, noChecksum)
if active || reason != FileChecksumMismatch {
return active
}
data := jsonutils.NewDict()
data.Set("name", jsonutils.NewString(self.Name))
data.Set("name", jsonutils.NewString(image.Name))
notifyclient.SystemExceptionNotifyWithResult(context.TODO(), noapi.ActionChecksumTest, noapi.TOPIC_RESOURCE_IMAGE, noapi.ResultFailed, data)
return false
}

func (self *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) {
if utils.IsInStringArray(self.Status, api.ImageDeadStatus) {
func (image *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) {
if utils.IsInStringArray(image.Status, api.ImageDeadStatus) {
return
}
if IsCheckStatusEnabled(self) {
if self.isActive(useFast, true) {
if self.Status != api.IMAGE_STATUS_ACTIVE {
self.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active")
if IsCheckStatusEnabled(image) {
if image.isActive(useFast, true) {
if image.Status != api.IMAGE_STATUS_ACTIVE {
image.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active")
}
if len(self.FastHash) == 0 {
fastHash, err := fileutils2.FastCheckSum(self.GetLocalLocation())
if len(image.FastHash) == 0 {
fastHash, err := fileutils2.FastCheckSum(image.GetLocalLocation())
if err != nil {
log.Errorf("DoCheckStatus fileutils2.FastChecksum fail %s", err)
} else {
_, err := db.Update(self, func() error {
self.FastHash = fastHash
_, err := db.Update(image, func() error {
image.FastHash = fastHash
return nil
})
if err != nil {
log.Errorf("DoCheckStatus save FastHash fail %s", err)
}
}
}
img, err := qemuimg.NewQemuImage(self.GetLocalLocation())
img, err := qemuimg.NewQemuImage(image.GetLocalLocation())
if err == nil {
format := string(img.Format)
virtualSizeMB := int32(img.SizeBytes / 1024 / 1024)
if (len(format) > 0 && self.DiskFormat != format) || (virtualSizeMB > 0 && self.MinDiskMB != virtualSizeMB) {
db.Update(self, func() error {
if (len(format) > 0 && image.DiskFormat != format) || (virtualSizeMB > 0 && image.MinDiskMB != virtualSizeMB) {
db.Update(image, func() error {
if len(format) > 0 {
self.DiskFormat = format
image.DiskFormat = format
}
if virtualSizeMB > 0 && self.MinDiskMB < virtualSizeMB {
self.MinDiskMB = virtualSizeMB
if virtualSizeMB > 0 && image.MinDiskMB < virtualSizeMB {
image.MinDiskMB = virtualSizeMB
}
return nil
})
}
} else {
log.Warningf("fail to check image size of %s(%s)", self.Id, self.Name)
log.Warningf("fail to check image size of %s(%s)", image.Id, image.Name)
}
} else {
if self.Status != api.IMAGE_STATUS_QUEUED {
self.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive")
if image.Status != api.IMAGE_STATUS_QUEUED {
image.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive")
}
}
}

if self.Status == api.IMAGE_STATUS_ACTIVE {
self.StartImagePipeline(ctx, userCred, true)
if image.Status == api.IMAGE_STATUS_ACTIVE {
image.StartImagePipeline(ctx, userCred, true)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/image/models/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *S3Storage) SaveImage(ctx context.Context, imagePath string, progresser
if !fileutils2.IsFile(imagePath) {
return "", fmt.Errorf("%s not valid file", imagePath)
}
return s3.Put(ctx, imagePath, imagePathToName(imagePath), progresser)
return s3.Put(ctx, imagePath, imagePathToName(imagePath), options.Options.S3UploadPartSizeMb, options.Options.S3UploadParallel, progresser)
}

func (s *S3Storage) CleanTempfile(filePath string) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/image/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type SImageOptions struct {
S3BucketName string `help:"s3 bucket name" default:"onecloud-images"`
S3MountPoint string `help:"s3fs mount point" default:"/opt/cloud/workspace/data/glance/s3images"`
S3CheckImageStatus bool `help:"Enable s3 check image status"`
S3SignVersion string `help:"signing version"`
S3UploadPartSizeMb int64 `help:"s3 upload part size in MB, default to 50MB" default:"50"`
S3UploadParallel int `help:"s3 upload parallel count" default:"4"`

ImageStreamWorkerCount int `help:"Image stream worker count" default:"10"`
}
Expand Down
1 change: 1 addition & 0 deletions pkg/image/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func initS3() {
options.Options.S3SecretKey,
options.Options.S3BucketName,
options.Options.S3UseSSL,
options.Options.S3SignVersion,
)
if err != nil {
log.Fatalf("failed init s3 client %s", err)
Expand Down
4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ sigs.k8s.io/structured-merge-diff/v4/value
# sigs.k8s.io/yaml v1.2.0
## explicit; go 1.12
sigs.k8s.io/yaml
# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75
# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09
## explicit; go 1.21
yunion.io/x/cloudmux/pkg/apis
yunion.io/x/cloudmux/pkg/apis/billing
Expand Down Expand Up @@ -1882,7 +1882,7 @@ yunion.io/x/log/hooks
yunion.io/x/ovsdb/cli_util
yunion.io/x/ovsdb/schema/ovn_nb
yunion.io/x/ovsdb/types
# yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07
# yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882
## explicit; go 1.18
yunion.io/x/pkg/appctx
yunion.io/x/pkg/errors
Expand Down
Loading

0 comments on commit a3345e4

Please sign in to comment.