Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: image support s3 parallel #22034

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/climc/shell/compute/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func init() {
ID string `help:"ID or name of host"`
IMAGE string `help:"ID or name of image"`
Force bool `help:"Force refresh cache, even if the image exists in cache"`
Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd"`
Format string `help:"image format" choices:"iso|vmdk|qcow2|vhd|tgz"`
}
R(&HostCacheImageActionOptions{}, "host-cache-image", "Ask a host to cache a image", func(s *mcclient.ClientSession, args *HostCacheImageActionOptions) error {
params := jsonutils.NewDict()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ require (
k8s.io/cri-api v0.22.17
k8s.io/klog/v2 v2.20.0
moul.io/http2curl/v2 v2.3.0
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256
yunion.io/x/jsonutils v1.0.1-0.20240930100528-1671a2d0d22f
yunion.io/x/log v1.0.1-0.20240305175729-7cf2d6cd5a91
yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1
yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1
yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1376,8 +1376,8 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75 h1:3GfPFiOXGcNlbMZbLg2TTov7VdRx2FS+FzrHQsmj64o=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09 h1:xfZ2R9kuaqbxASpc1ohWHMswEQwaAUExoVg8gob7Iqo=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09/go.mod h1:KQ/jWx7bZlmjCE711KEWuvHW/dzpdr/UTlBjjutkj0Y=
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256 h1:kLKQ6zbgPDQflRwoHFAjxNChcbhXIFgsUVLkJwiXu/8=
yunion.io/x/executor v0.0.0-20241205080005-48f5b1212256/go.mod h1:Uxuou9WQIeJXNpy7t2fPLL0BYLvLiMvGQwY7Qc6aSws=
yunion.io/x/jsonutils v0.0.0-20190625054549-a964e1e8a051/go.mod h1:4N0/RVzsYL3kH3WE/H1BjUQdFiWu50JGCFQuuy+Z634=
Expand All @@ -1391,8 +1391,8 @@ yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900 h1:Hu/4ERvoWaN6aiFs4h4/yvVB
yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900/go.mod h1:0vLkNEhlmA64HViPBAnSTUMrx5QP1CLsxXmxDKQ80tc=
yunion.io/x/pkg v0.0.0-20190620104149-945c25821dbf/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E=
yunion.io/x/pkg v0.0.0-20190628082551-f4033ba2ea30/go.mod h1:t6rEGG2sQ4J7DhFxSZVOTjNd0YO/KlfWQyK1W4tog+E=
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07 h1:XZsL8+YpgVBHMcRwdpZ4cOfSZCc+KsgUCTdV3KeK8FI=
yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM=
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882 h1:Nl4qngZ4mrVe2xsMVrCHLe9t+nY/KpaBXO9a0sTbg4M=
yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882/go.mod h1:0Bwxqd9MA3ACi119/l02FprY/o9gHahmYC2bsSbnVpM=
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1 h1:1KJ3YYinydPHpDEQRXdr/T8SYcKZ5Er+m489H+PnaQ4=
yunion.io/x/s3cli v0.0.0-20241221171442-1c11599d28e1/go.mod h1:0iFKpOs1y4lbCxeOmq3Xx/0AcQoewVPwj62eRluioEo=
yunion.io/x/sqlchemy v1.1.3-0.20240926163039-d41512b264e1 h1:HWPqY1I5JSmM6Sks6FyK9hnq/MjL7FDghM6M8DXHob0=
Expand Down
1 change: 0 additions & 1 deletion pkg/hostman/storageman/diskhandlers/diskhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func perfetchImageCache(ctx context.Context, w http.ResponseWriter, r *http.Requ

func deleteImageCache(ctx context.Context, w http.ResponseWriter, r *http.Request) {
performImageCache(ctx, w, r, "delete")

}

func getDiskStatus(ctx context.Context, w http.ResponseWriter, r *http.Request) {
Expand Down
19 changes: 10 additions & 9 deletions pkg/image/drivers/s3/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"

"yunion.io/x/cloudmux/pkg/cloudprovider"
"yunion.io/x/cloudmux/pkg/multicloud"
"yunion.io/x/cloudmux/pkg/multicloud/objectstore"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
Expand Down Expand Up @@ -54,11 +53,14 @@ func (c *S3Client) getBucket() (cloudprovider.ICloudBucket, error) {
return c.osc.GetIBucketByName(c.bucket)
}

func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool) error {
func Init(endpoint, accessKey, secretKey, bucket string, useSSL bool, signVer string) error {
if client != nil {
return nil
}
cfg := objectstore.NewObjectStoreClientConfig(endpoint, accessKey, secretKey)
if len(signVer) > 0 {
cfg.SignVersion(objectstore.S3SignVersion(signVer))
}
minioClient, err := objectstore.NewObjectStoreClient(cfg)
if err != nil {
return errors.Wrap(err, "new minio client")
Expand Down Expand Up @@ -88,29 +90,28 @@ func ensureBucket() error {
return nil
}

func PutStream(ctx context.Context, file io.Reader, fSize int64, objName string, progresser func(saved int64)) (string, error) {
func PutStream(ctx context.Context, file io.ReaderAt, fSize int64, objName string, partSizeMb int64, parallel int, progresser func(saved int64)) (string, error) {
if client == nil {
return "", ErrClientNotInit
}
bucket, err := client.getBucket()
if err != nil {
return "", errors.Wrap(err, "client.getBucket")
}
const blockSizeMB = 100
pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) {
/* pFile := multicloud.NewProgress(fSize, 100, file, func(ratio float32) {
if progresser != nil {
progresser(int64(float64(ratio) * float64(fSize)))
}
})
err = cloudprovider.UploadObject(ctx, bucket, objName, blockSizeMB*1000*1000, pFile, fSize, cloudprovider.ACLPrivate, "", nil, false)
}) */
err = cloudprovider.UploadObjectParallel(ctx, bucket, objName, partSizeMb*1000*1000, file, fSize, cloudprovider.ACLPrivate, "", nil, false, parallel)
if err != nil {
return "", errors.Wrap(err, "cloudprovider.UploadObject")
}
log.Debugf("put object %s size %d", objName, fSize)
return client.Location(objName), nil
}

func Put(ctx context.Context, filePath, objName string, progresser func(int64)) (string, error) {
func Put(ctx context.Context, filePath, objName string, partSizeMb int64, parallel int, progresser func(int64)) (string, error) {
finfo, err := os.Stat(filePath)
if err != nil {
return "", errors.Wrap(err, "os.Stat")
Expand All @@ -121,7 +122,7 @@ func Put(ctx context.Context, filePath, objName string, progresser func(int64))
return "", errors.Wrap(err, "os.Open")
}
defer file.Close()
return PutStream(ctx, file, fSize, objName, progresser)
return PutStream(ctx, file, fSize, objName, partSizeMb, parallel, progresser)
}

func Get(ctx context.Context, fileName string) (int64, io.ReadCloser, error) {
Expand Down
59 changes: 32 additions & 27 deletions pkg/image/models/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func (self *SImage) CustomizedGetDetailsBody(ctx context.Context, userCred mccli

if self.IsGuestImage.IsFalse() {
formatStr := jsonutils.GetAnyString(query, []string{"format", "disk_format"})
if len(formatStr) > 0 && formatStr != api.IMAGE_DISK_FORMAT_TGZ {
if len(formatStr) > 0 {
subImages := ImageSubformatManager.GetAllSubImages(self.Id)
if len(subImages) == 1 {
// ignore format field
formatStr = subImages[0].Format
}
subimg := ImageSubformatManager.FetchSubImage(self.Id, formatStr)
if subimg != nil {
if strings.HasPrefix(subimg.Location, api.LocalFilePrefix) {
Expand Down Expand Up @@ -648,15 +653,15 @@ func (self *SImage) PostCreate(ctx context.Context, userCred mcclient.TokenCrede

// After image probe and customization, image size and checksum changed
// will recalculate checksum in the end
func (self *SImage) StartImagePipeline(
func (image *SImage) StartImagePipeline(
ctx context.Context, userCred mcclient.TokenCredential, skipProbe bool,
) error {
data := jsonutils.NewDict()
if skipProbe {
data.Set("skip_probe", jsonutils.JSONTrue)
}
task, err := taskman.TaskManager.NewTask(
ctx, "ImagePipelineTask", self, userCred, data, "", "", nil)
ctx, "ImagePipelineTask", image, userCred, data, "", "", nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1421,67 +1426,67 @@ func (self *SImage) IsIso() bool {
return self.DiskFormat == string(api.ImageTypeISO)
}

func (self *SImage) isActive(useFast bool, noChecksum bool) bool {
active, reason := isActive(self.GetLocalLocation(), self.Size, self.Checksum, self.FastHash, useFast, noChecksum)
func (image *SImage) isActive(useFast bool, noChecksum bool) bool {
active, reason := isActive(image.GetLocalLocation(), image.Size, image.Checksum, image.FastHash, useFast, noChecksum)
if active || reason != FileChecksumMismatch {
return active
}
data := jsonutils.NewDict()
data.Set("name", jsonutils.NewString(self.Name))
data.Set("name", jsonutils.NewString(image.Name))
notifyclient.SystemExceptionNotifyWithResult(context.TODO(), noapi.ActionChecksumTest, noapi.TOPIC_RESOURCE_IMAGE, noapi.ResultFailed, data)
return false
}

func (self *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) {
if utils.IsInStringArray(self.Status, api.ImageDeadStatus) {
func (image *SImage) DoCheckStatus(ctx context.Context, userCred mcclient.TokenCredential, useFast bool) {
if utils.IsInStringArray(image.Status, api.ImageDeadStatus) {
return
}
if IsCheckStatusEnabled(self) {
if self.isActive(useFast, true) {
if self.Status != api.IMAGE_STATUS_ACTIVE {
self.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active")
if IsCheckStatusEnabled(image) {
if image.isActive(useFast, true) {
if image.Status != api.IMAGE_STATUS_ACTIVE {
image.SetStatus(ctx, userCred, api.IMAGE_STATUS_ACTIVE, "check active")
}
if len(self.FastHash) == 0 {
fastHash, err := fileutils2.FastCheckSum(self.GetLocalLocation())
if len(image.FastHash) == 0 {
fastHash, err := fileutils2.FastCheckSum(image.GetLocalLocation())
if err != nil {
log.Errorf("DoCheckStatus fileutils2.FastChecksum fail %s", err)
} else {
_, err := db.Update(self, func() error {
self.FastHash = fastHash
_, err := db.Update(image, func() error {
image.FastHash = fastHash
return nil
})
if err != nil {
log.Errorf("DoCheckStatus save FastHash fail %s", err)
}
}
}
img, err := qemuimg.NewQemuImage(self.GetLocalLocation())
img, err := qemuimg.NewQemuImage(image.GetLocalLocation())
if err == nil {
format := string(img.Format)
virtualSizeMB := int32(img.SizeBytes / 1024 / 1024)
if (len(format) > 0 && self.DiskFormat != format) || (virtualSizeMB > 0 && self.MinDiskMB != virtualSizeMB) {
db.Update(self, func() error {
if (len(format) > 0 && image.DiskFormat != format) || (virtualSizeMB > 0 && image.MinDiskMB != virtualSizeMB) {
db.Update(image, func() error {
if len(format) > 0 {
self.DiskFormat = format
image.DiskFormat = format
}
if virtualSizeMB > 0 && self.MinDiskMB < virtualSizeMB {
self.MinDiskMB = virtualSizeMB
if virtualSizeMB > 0 && image.MinDiskMB < virtualSizeMB {
image.MinDiskMB = virtualSizeMB
}
return nil
})
}
} else {
log.Warningf("fail to check image size of %s(%s)", self.Id, self.Name)
log.Warningf("fail to check image size of %s(%s)", image.Id, image.Name)
}
} else {
if self.Status != api.IMAGE_STATUS_QUEUED {
self.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive")
if image.Status != api.IMAGE_STATUS_QUEUED {
image.SetStatus(ctx, userCred, api.IMAGE_STATUS_QUEUED, "check inactive")
}
}
}

if self.Status == api.IMAGE_STATUS_ACTIVE {
self.StartImagePipeline(ctx, userCred, true)
if image.Status == api.IMAGE_STATUS_ACTIVE {
image.StartImagePipeline(ctx, userCred, true)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/image/models/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *S3Storage) SaveImage(ctx context.Context, imagePath string, progresser
if !fileutils2.IsFile(imagePath) {
return "", fmt.Errorf("%s not valid file", imagePath)
}
return s3.Put(ctx, imagePath, imagePathToName(imagePath), progresser)
return s3.Put(ctx, imagePath, imagePathToName(imagePath), options.Options.S3UploadPartSizeMb, options.Options.S3UploadParallel, progresser)
}

func (s *S3Storage) CleanTempfile(filePath string) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/image/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type SImageOptions struct {
S3BucketName string `help:"s3 bucket name" default:"onecloud-images"`
S3MountPoint string `help:"s3fs mount point" default:"/opt/cloud/workspace/data/glance/s3images"`
S3CheckImageStatus bool `help:"Enable s3 check image status"`
S3SignVersion string `help:"signing version"`
S3UploadPartSizeMb int64 `help:"s3 upload part size in MB, default to 50MB" default:"50"`
S3UploadParallel int `help:"s3 upload parallel count" default:"4"`

ImageStreamWorkerCount int `help:"Image stream worker count" default:"10"`
}
Expand Down
1 change: 1 addition & 0 deletions pkg/image/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func initS3() {
options.Options.S3SecretKey,
options.Options.S3BucketName,
options.Options.S3UseSSL,
options.Options.S3SignVersion,
)
if err != nil {
log.Fatalf("failed init s3 client %s", err)
Expand Down
4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ sigs.k8s.io/structured-merge-diff/v4/value
# sigs.k8s.io/yaml v1.2.0
## explicit; go 1.12
sigs.k8s.io/yaml
# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250113015514-eb7eb3112a75
# yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20250128155957-fbf9c82c5f09
## explicit; go 1.21
yunion.io/x/cloudmux/pkg/apis
yunion.io/x/cloudmux/pkg/apis/billing
Expand Down Expand Up @@ -1882,7 +1882,7 @@ yunion.io/x/log/hooks
yunion.io/x/ovsdb/cli_util
yunion.io/x/ovsdb/schema/ovn_nb
yunion.io/x/ovsdb/types
# yunion.io/x/pkg v1.10.4-0.20250123070256-9247ce856f07
# yunion.io/x/pkg v1.10.4-0.20250128110515-2cde6f625882
## explicit; go 1.18
yunion.io/x/pkg/appctx
yunion.io/x/pkg/errors
Expand Down
Loading