Skip to content

Commit

Permalink
add elasticdump direct S3 option
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesClonk committed Apr 23, 2021
1 parent dd545b1 commit 53551ec
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 109 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export BACKMAN_CONFIG='{
"encryption_key":"a_super_strong_example_key"
},
"services": {
"my-elasticsearch": {
"direct_s3": false
},
"my_mysql_db": {
"force_import": true
},
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,12 @@ Possible JSON properties:
- `services.<service-instance>.timeout`: optional, backman will abort a running backup/restore if timeout is exceeded
- `services.<service-instance>.retention.days`: optional, specifies how long backman will keep backups on S3 at maximum for this service instance
- `services.<service-instance>.retention.files`: optional, specifies how maximum number of files backman will keep on S3 for this service instance
- `services.<service-instance>.direct_s3`: optional / Elasticsearch-specific, bypasses backman internal backup stream and encryption entirely, streaming directly from/to S3 via elasticdump
- `services.<service-instance>.disable_column_statistics`: optional / MySQL-specific, allows for disabling export of column statistics. Set to `true` to avoid issues with pre-8.0 versions of MySQL
- `services.<service-instance>.force_import`: optional / MySQL-specific. Set to `true` to use the `--force` flag for mysql, ignoring any errors that might occur while importing backups.
- `services.<service-instance>.local_backup_path`: optional / PostgreSQL-specific, path where to store backup files locally first before uploading them. Otherwise streams directly onto s3 if not specified.
- `services.<service-instance>.force_import`: optional / MySQL-specific. Set to `true` to use the `--force` flag for mysql, ignoring any errors that might occur while importing backups
- `services.<service-instance>.local_backup_path`: optional / PostgreSQL-specific, path where to store backup files locally first before uploading them. Otherwise streams directly to S3 if not specified
- `services.<service-instance>.backup_options`: optional, allows specifying additional parameters and flags for service backup executable
- `services.<service-instance>.restore_options`: optional, allows specifying additional parameters and flags for service restore executable

Note: Usage of `s3.encryption_key` is not backward compatible! Backups generated without or with a different encryption key cannot be downloaded or restored anymore.

Expand Down
3 changes: 3 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"bucket_name": "my-database-backups"
},
"services": {
"my-elasticsearch": {
"direct_s3": true
},
"my_postgres_db": {
"schedule": "0 0 2,18 * * *",
"timeout": "2h",
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ServiceConfig struct {
Days int
Files int
}
DirectS3 bool `json:"direct_s3"`
DisableColumnStatistics bool `json:"disable_column_statistics"`
ForceImport bool `json:"force_import"`
LocalBackupPath string `json:"local_backup_path"`
Expand Down Expand Up @@ -163,6 +164,9 @@ func Get() *Config {
if serviceConfig.Retention.Files > 0 {
mergedServiceConfig.Retention.Files = serviceConfig.Retention.Files
}
if serviceConfig.DirectS3 {
mergedServiceConfig.DirectS3 = serviceConfig.DirectS3
}
if serviceConfig.DisableColumnStatistics {
mergedServiceConfig.DisableColumnStatistics = serviceConfig.DisableColumnStatistics
}
Expand Down
5 changes: 4 additions & 1 deletion manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ applications:

# ### push either as docker image
docker:
image: jamesclonk/backman:1.27.2 # choose version from https://hub.docker.com/r/jamesclonk/backman/tags, or 'latest'
image: jamesclonk/backman:1.28.0 # choose version from https://hub.docker.com/r/jamesclonk/backman/tags, or 'latest'
# ### or as buildpack/src
# buildpacks:
# - https://github.com/cloudfoundry/apt-buildpack
Expand Down Expand Up @@ -62,6 +62,9 @@ applications:
"files": 500
}
},
"my-elasticsearch": {
"direct_s3": true
},
"my_mysql_db": {
"timeout": "35m",
"disable_column_statistics": true
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "backman",
"dependencies": {
"elasticdump": "6.7.5"
"elasticdump": "6.69.2"
}
}
6 changes: 6 additions & 0 deletions s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
type Client struct {
Client *minio.Client
BucketName string
Endpoint string
AccessKey string
SecretKey string
}

func New(app *cfenv.App) *Client {
Expand Down Expand Up @@ -95,5 +98,8 @@ func New(app *cfenv.App) *Client {
return &Client{
Client: minioClient,
BucketName: bucketName,
Endpoint: endpoint,
AccessKey: accessKeyID,
SecretKey: secretAccessKey,
}
}
156 changes: 94 additions & 62 deletions service/elasticsearch/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,86 +43,118 @@ func Backup(ctx context.Context, s3 *s3.Client, service util.Service, binding *c

u, _ := url.Parse(host)
connectstring := fmt.Sprintf("%s://%s:%s@%s", u.Scheme, username, password, u.Host)
objectPath := fmt.Sprintf("%s/%s/%s", service.Label, service.Name, filename)

// prepare elasticdump command
var command []string
command = append(command, "elasticdump")
command = append(command, "--quiet")
command = append(command, fmt.Sprintf("--input=%s", connectstring))
command = append(command, "--output=$")

log.Debugf("executing elasticsearch backup command: %v", strings.Join(command, " "))
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
// stream to stdout (default behaviour) or directly onto s3 (new behaviour)?
if service.DirectS3 {
command = append(command, fmt.Sprintf("--output=s3://%s/%s", s3.BucketName, objectPath))
command = append(command, "--s3Endpoint")
command = append(command, s3.Endpoint)
command = append(command, "--s3AccessKeyId")
command = append(command, s3.AccessKey)
command = append(command, "--s3SecretAccessKey")
command = append(command, s3.SecretKey)
command = append(command, "--s3Compress")
command = append(command, service.BackupOptions...)

log.Debugf("executing elasticsearch direct S3 backup command: %v", strings.Join(command, " "))
cmd := exec.CommandContext(ctx, command[0], command[1:]...)

if err := cmd.Run(); err != nil {
log.Errorf("could not run elasticdump: %v", err)
state.BackupFailure(service, filename)

// capture stdout to pass to gzipping buffer
outPipe, err := cmd.StdoutPipe()
if err != nil {
log.Errorf("could not get stdout pipe for elasticdump: %v", err)
state.BackupFailure(service, filename)
return err
}
defer outPipe.Close()

var uploadWait sync.WaitGroup
uploadCtx, uploadCancel := context.WithCancel(context.Background()) // allows upload to be cancelable, in case backup times out
defer uploadCancel() // cancel upload in case Backup() exits before uploadWait is done

// start upload in background, streaming output onto S3
uploadWait.Add(1)
go func() {
defer uploadWait.Done()

// gzipping stdout
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
gw.Name = strings.TrimSuffix(filename, ".gz")
gw.ModTime = time.Now()
go func() {
_, _ = io.Copy(gw, bufio.NewReader(outPipe))
if err := gw.Flush(); err != nil {
log.Errorf("%v", err)
// check for timeout error
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
}
if err := gw.Close(); err != nil {
log.Errorf("%v", err)
}
if err := pw.Close(); err != nil {
log.Errorf("%v", err)
}
}()
return fmt.Errorf("elasticdump: %v", err)
}
time.Sleep(1 * time.Second)
state.BackupSuccess(service, filename)

} else {
command = append(command, "--output=$")
command = append(command, service.BackupOptions...)

objectPath := fmt.Sprintf("%s/%s/%s", service.Label, service.Name, filename)
err = s3.UploadWithContext(uploadCtx, objectPath, pr, -1)
log.Debugf("executing elasticsearch backup command: %v", strings.Join(command, " "))
cmd := exec.CommandContext(ctx, command[0], command[1:]...)

// capture stdout to pass to gzipping buffer
outPipe, err := cmd.StdoutPipe()
if err != nil {
log.Errorf("could not upload service backup [%s] to S3: %v", service.Name, err)
log.Errorf("could not get stdout pipe for elasticdump: %v", err)
state.BackupFailure(service, filename)
return err
}
}()
time.Sleep(2 * time.Second) // wait for upload goroutine to be ready
defer outPipe.Close()

// capture and read stderr in case an error occurs
var errBuf bytes.Buffer
cmd.Stderr = &errBuf
var uploadWait sync.WaitGroup
uploadCtx, uploadCancel := context.WithCancel(context.Background()) // allows upload to be cancelable, in case backup times out
defer uploadCancel() // cancel upload in case Backup() exits before uploadWait is done

if err := cmd.Start(); err != nil {
log.Errorf("could not run elasticdump: %v", err)
state.BackupFailure(service, filename)
return err
}
// start upload in background, streaming output onto S3
uploadWait.Add(1)
go func() {
defer uploadWait.Done()

// gzipping stdout
pr, pw := io.Pipe()
gw := gzip.NewWriter(pw)
gw.Name = strings.TrimSuffix(filename, ".gz")
gw.ModTime = time.Now()
go func() {
_, _ = io.Copy(gw, bufio.NewReader(outPipe))
if err := gw.Flush(); err != nil {
log.Errorf("%v", err)
}
if err := gw.Close(); err != nil {
log.Errorf("%v", err)
}
if err := pw.Close(); err != nil {
log.Errorf("%v", err)
}
}()

err = s3.UploadWithContext(uploadCtx, objectPath, pr, -1)
if err != nil {
log.Errorf("could not upload service backup [%s] to S3: %v", service.Name, err)
state.BackupFailure(service, filename)
}
}()
time.Sleep(2 * time.Second) // wait for upload goroutine to be ready

// capture and read stderr in case an error occurs
var errBuf bytes.Buffer
cmd.Stderr = &errBuf

if err := cmd.Wait(); err != nil {
state.BackupFailure(service, filename)
// check for timeout error
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
if err := cmd.Start(); err != nil {
log.Errorf("could not run elasticdump: %v", err)
state.BackupFailure(service, filename)
return err
}

log.Errorln(strings.TrimRight(errBuf.String(), "\r\n"))
return fmt.Errorf("elasticdump: %v", err)
}
if err := cmd.Wait(); err != nil {
state.BackupFailure(service, filename)
// check for timeout error
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("elasticdump: timeout: %v", ctx.Err())
}

uploadWait.Wait() // wait for upload to have finished
if err == nil {
state.BackupSuccess(service, filename)
log.Errorln(strings.TrimRight(errBuf.String(), "\r\n"))
return fmt.Errorf("elasticdump: %v", err)
}

uploadWait.Wait() // wait for upload to have finished
if err == nil {
state.BackupSuccess(service, filename)
}
}
return err
return nil
}
Loading

0 comments on commit 53551ec

Please sign in to comment.