Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add S3 minimum part size defined by the user #17171

Merged
merged 8 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3
github.com/aws/smithy-go v1.22.0
github.com/bndr/gotabulate v1.1.2
github.com/dustin/go-humanize v1.0.1
github.com/gammazero/deque v0.2.1
github.com/google/safehtml v0.1.0
github.com/hashicorp/go-version v1.7.0
Expand Down Expand Up @@ -153,7 +154,6 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/ebitengine/purego v0.8.1 // indirect
github.com/envoyproxy/go-control-plane v0.13.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Flags:
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--restart_before_backup Perform a mysqld clean/full restart after applying binlogs, but before taking the backup. Only makes sense to work around xtrabackup bugs.
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Flags:
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ Flags:
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
--retain_online_ddl_tables duration How long should vttablet keep an old migrated table before purging it (default 24h0m0s)
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
39 changes: 33 additions & 6 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
transport "github.com/aws/smithy-go/endpoints"
"github.com/aws/smithy-go/middleware"
"github.com/dustin/go-humanize"
"github.com/spf13/pflag"

errorsbackup "vitess.io/vitess/go/vt/mysqlctl/errors"
Expand All @@ -57,6 +58,11 @@ import (
"vitess.io/vitess/go/vt/servenv"
)

const (
sseCustomerPrefix = "sse_c:"
MaxPartSize = 1024 * 1024 * 1024 * 5 // 5GiB - limited by AWS https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
frouioui marked this conversation as resolved.
Show resolved Hide resolved
)

var (
// AWS API region
region string
Expand Down Expand Up @@ -86,6 +92,11 @@ var (

// path component delimiter
delimiter = "/"

// minimum part size
minPartSize int64

ErrPartSize = errors.New("minimum S3 part size must be between 5MiB and 5GiB")
)

func registerFlags(fs *pflag.FlagSet) {
Expand All @@ -98,6 +109,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&tlsSkipVerifyCert, "s3_backup_tls_skip_verify_cert", false, "skip the 'certificate is valid' check for SSL connections.")
fs.StringVar(&requiredLogLevel, "s3_backup_log_level", "LogOff", "determine the S3 loglevel to use from LogOff, LogDebug, LogDebugWithSigning, LogDebugWithHTTPBody, LogDebugWithRequestRetries, LogDebugWithRequestErrors.")
fs.StringVar(&sse, "s3_backup_server_side_encryption", "", "server-side encryption algorithm (e.g., AES256, aws:kms, sse_c:/path/to/key/file).")
fs.Int64Var(&minPartSize, "s3_backup_aws_min_partsize", manager.MinUploadPartSize, "Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it only needed on that struct so we can load the definitions? in any case we can discuss in planetscale/vitess-operator#645

}

func init() {
Expand All @@ -111,8 +123,6 @@ type logNameToLogLevel map[string]aws.ClientLogMode

var logNameMap logNameToLogLevel

const sseCustomerPrefix = "sse_c:"

type endpointResolver struct {
r s3.EndpointResolverV2
endpoint *string
Expand Down Expand Up @@ -166,7 +176,12 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

bh.bs.params.Logger.Infof("Using S3 upload part size: %s", humanize.IBytes(uint64(partSizeBytes)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth adding a project dependency on humanize for this and the error message below. The flag is in bytes, so I think it would actually be better to log the bytes here as well to confirm that the expected/flag value is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was already an indirect dependency, so I thought I would use it :)

I can remove it tomorrow since the preference is to avoid it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right! Sorry, for some reason I thought we added it to go.mod here. No reason not to use it here then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I will leave it then and just update the comment as mentioned below!


reader, writer := io.Pipe()
bh.handleAddFile(ctx, filename, partSizeBytes, reader, func(err error) {
Expand Down Expand Up @@ -213,9 +228,11 @@ func (bh *S3BackupHandle) handleAddFile(ctx context.Context, filename string, pa
}()
}

func calculateUploadPartSize(filesize int64) int64 {
// this is a helper to calculate the part size, taking into consideration the minimum part size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but the function comment should start with the function name. Some linters will complain about this.

// passed in by an operator.
func calculateUploadPartSize(filesize int64) (partSizeBytes int64, err error) {
// Calculate s3 upload part size using the source filesize
partSizeBytes := manager.DefaultUploadPartSize
partSizeBytes = manager.DefaultUploadPartSize
if filesize > 0 {
minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts)
// Round up to ensure large enough partsize
Expand All @@ -224,7 +241,17 @@ func calculateUploadPartSize(filesize int64) int64 {
partSizeBytes = calculatedPartSizeBytes
}
}
return partSizeBytes

if minPartSize != 0 && partSizeBytes < minPartSize {
if minPartSize > 1024*1024*1024*5 || minPartSize < 1024*1024*5 { // 5GiB and 5MiB respectively
mattlord marked this conversation as resolved.
Show resolved Hide resolved
return 0, fmt.Errorf("%w, currently set to %s",
ErrPartSize, humanize.IBytes(uint64(minPartSize)),
)
}
partSizeBytes = int64(minPartSize)
}

return
}

// EndBackup is part of the backupstorage.BackupHandle interface.
Expand Down
12 changes: 10 additions & 2 deletions go/vt/mysqlctl/s3backupstorage/s3_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ func FailFirstWrite(s3bh *S3BackupHandle, ctx context.Context, filename string,
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

reader, writer := io.Pipe()
r := io.Reader(reader)

Expand All @@ -181,7 +185,11 @@ func FailAllWrites(s3bh *S3BackupHandle, ctx context.Context, filename string, f
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

reader, writer := io.Pipe()
r := &failReadPipeReader{PipeReader: reader}

Expand Down
65 changes: 65 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,68 @@ func TestWithParams(t *testing.T) {
assert.NotNil(t, s3.transport.DialContext)
assert.NotNil(t, s3.transport.Proxy)
}

func TestCalculateUploadPartSize(t *testing.T) {
originalMinimum := minPartSize
defer func() { minPartSize = originalMinimum }()

tests := []struct {
name string
filesize int64
minimumPartSize int64
want int64
err error
}{
{
name: "minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 1024 * 1024 * 5, // 5 MiB,
err: nil,
},
{
name: "below minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 8, // 8 MiB
want: 1024 * 1024 * 8, // 8 MiB,
err: nil,
},
{
name: "above minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 109951163, // ~104 MiB
err: nil,
},
{
name: "below minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 200, // 200 MiB
want: 1024 * 1024 * 200, // 200 MiB
err: nil,
},
{
name: "below S3 limits - 5 MiB",
filesize: 1024 * 1024 * 3, // 3 MiB
minimumPartSize: 1024 * 1024 * 4, // 4 MiB
want: 1024 * 1024 * 5, // 5 MiB - should always return the minimum
err: nil,
},
{
name: "above S3 limits - 5 GiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 1024 * 6, // 6 GiB
want: 0,
err: ErrPartSize,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
minPartSize = tt.minimumPartSize
partSize, err := calculateUploadPartSize(tt.filesize)
require.ErrorIs(t, err, tt.err)
require.Equal(t, tt.want, partSize)
})
}
}
Loading