Skip to content

Commit

Permalink
feat(velero): adding support for parallel backup and restore (#111)
Browse files Browse the repository at this point in the history
When backup is going on, then we can start restore operation. This is true other way also.
The problem is for both we are using the same port number to start the server.
So at a time only once server will be running so only one operation is supported.

This PR add two ports one for backup and one for restore.
They both will be using different ports to start the server and can run parallelly.
Also added different port for cstor and ZFS-LocalPV, so that they both can run parallely.

Signed-off-by: Pawan <[email protected]>
  • Loading branch information
pawanpraka1 authored Sep 9, 2020
1 parent 5ed3848 commit d1645b4
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 27 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/111-pawanpraka1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
adding support for parallel backup and restore
8 changes: 4 additions & 4 deletions pkg/clouduploader/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
// Upload will perform upload operation for given file.
// It will create a TCP server through which client can
// connect and upload data to cloud blob storage file
func (c *Conn) Upload(file string, fileSize int64) bool {
func (c *Conn) Upload(file string, fileSize int64, port int) bool {
c.Log.Infof("Uploading snapshot to '%s' with provider{%s} to bucket{%s}", file, c.provider, c.bucketname)

c.file = file
Expand All @@ -44,7 +44,7 @@ func (c *Conn) Upload(file string, fileSize int64) bool {
Log: c.Log,
cl: c,
}
err := s.Run(OpBackup)
err := s.Run(OpBackup, port)
if err != nil {
c.Log.Errorf("Failed to upload snapshot to bucket: %s", err.Error())
if c.bucket.Delete(c.ctx, file) != nil {
Expand All @@ -71,13 +71,13 @@ func (c *Conn) Delete(file string) bool {
// Download will perform restore operation for given file.
// It will create a TCP server through which client can
// connect and download data from cloud blob storage file
func (c *Conn) Download(file string) bool {
func (c *Conn) Download(file string, port int) bool {
c.file = file
s := &Server{
Log: c.Log,
cl: c,
}
err := s.Run(OpRestore)
err := s.Run(OpRestore, port)
if err != nil {
c.Log.Errorf("Failed to receive snapshot from bucket: %s", err.Error())
return false
Expand Down
9 changes: 3 additions & 6 deletions pkg/clouduploader/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ const (
// MaxClient defines max number of connection a server can accept
MaxClient = 10

// RecieverPort defines port number on which server should listen for new connection
RecieverPort = 9000

// ReadBufferLen defines max number of bytes should be read from wire
ReadBufferLen = 32 * 1024

Expand Down Expand Up @@ -244,7 +241,7 @@ func (s *Server) handleWrite(event syscall.EpollEvent) error {
}

// Run will start TCP server
func (s *Server) Run(opType ServerOperation) error {
func (s *Server) Run(opType ServerOperation, port int) error {
var event syscall.EpollEvent
var events [MaxEpollEvents]syscall.EpollEvent

Expand All @@ -264,11 +261,11 @@ func (s *Server) Run(opType ServerOperation) error {
return err
}

addr := syscall.SockaddrInet4{Port: RecieverPort}
addr := syscall.SockaddrInet4{Port: port}
copy(addr.Addr[:], net.ParseIP("0.0.0.0").To4())

if err = syscall.Bind(fd, &addr); err != nil {
s.Log.Errorf("Failed to bind server to port {%v} : %s", RecieverPort, err.Error())
s.Log.Errorf("Failed to bind server to port {%v} : %s", port, err.Error())
return err
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/cstor/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,13 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {

scheduleName := p.getScheduleName(vol.backupName) // This will be backup/schedule name

serverAddr := p.cstorServerAddr + ":" + strconv.Itoa(CstorBackupPort)

bkpSpec := &v1alpha1.CStorBackupSpec{
BackupName: scheduleName,
VolumeName: vol.volname,
SnapName: vol.backupName,
BackupDest: p.cstorServerAddr,
BackupDest: serverAddr,
LocalSnap: p.local,
}

Expand Down Expand Up @@ -213,7 +215,8 @@ func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
func (p *Plugin) sendRestoreRequest(vol *Volume) (*v1alpha1.CStorRestore, error) {
var url string

restoreSrc := p.cstorServerAddr
restoreSrc := p.cstorServerAddr + ":" + strconv.Itoa(CstorRestorePort)

if p.local {
restoreSrc = vol.srcVolname
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cstor

import (
"net"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -64,6 +63,12 @@ const (

// SnapshotIDIdentifier is a word to generate snapshotID from volume name and backup name
SnapshotIDIdentifier = "-velero-bkp-"

// port to connect for restoring the data
CstorRestorePort = 9000

// port to connect for backup
CstorBackupPort = 9001
)

// Plugin defines snapshot plugin for CStor volume
Expand Down Expand Up @@ -180,7 +185,7 @@ func (p *Plugin) getServerAddress() string {
if ok && !networkIP.IP.IsLoopback() && networkIP.IP.To4() != nil {
ip := networkIP.IP.String()
p.Log.Infof("Ip address of velero-plugin server: %s", ip)
return ip + ":" + strconv.Itoa(cloud.RecieverPort)
return ip
}
}
return ""
Expand Down Expand Up @@ -426,7 +431,7 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin

go p.checkBackupStatus(bkp, vol.isCSIVolume)

ok = p.cl.Upload(filename, size)
ok = p.cl.Upload(filename, size, CstorBackupPort)
if !ok {
return "", errors.New("failed to upload snapshot")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cstor/pv_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (p *Plugin) restoreVolumeFromCloud(vol *Volume) error {

go p.checkRestoreStatus(restore, vol)

ret := p.cl.Download(filename)
ret := p.cl.Download(filename, CstorRestorePort)
if !ret {
return errors.New("failed to restore snapshot")
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/zfs/plugin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (p *Plugin) getPrevSnap(volname, schdname string) (string, error) {
return "", nil
}

func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string) (string, error) {
func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string, port int) (string, error) {
bkpname := utils.GenerateSnapshotID(vol.Name, snapname)

p.Log.Debugf("zfs: creating ZFSBackup vol = %s bkp = %s schd = %s", vol.Name, bkpname, schdname)
Expand All @@ -123,6 +123,8 @@ func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string) (s
}
}

serverAddr := p.remoteAddr + ":" + strconv.Itoa(port)

bkp, err := bkpbuilder.NewBuilder().
WithName(bkpname).
WithLabels(labels).
Expand All @@ -131,7 +133,7 @@ func (p *Plugin) createBackup(vol *apis.ZFSVolume, schdname, snapname string) (s
WithSnap(snapname).
WithNode(vol.Spec.OwnerNodeID).
WithStatus(apis.BKPZFSStatusInit).
WithRemote(p.remoteAddr).
WithRemote(serverAddr).
Build()

if err != nil {
Expand Down Expand Up @@ -218,15 +220,15 @@ func (p *Plugin) doBackup(volumeID string, snapname string, schdname string) (st
}

// TODO(pawan) should wait for upload server to be up
bkpname, err := p.createBackup(vol, schdname, snapname)
bkpname, err := p.createBackup(vol, schdname, snapname, ZFSBackupPort)
if err != nil {
return "", err
}

go p.checkBackupStatus(bkpname)

p.Log.Debugf("zfs: uploading Snapshot %s file %s", snapname, filename)
ok := p.cl.Upload(filename, size)
ok := p.cl.Upload(filename, size, ZFSBackupPort)
if !ok {
p.deleteBackup(bkpname)
return "", errors.New("zfs: error in uploading snapshot")
Expand Down
10 changes: 6 additions & 4 deletions pkg/zfs/plugin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package plugin

import (
"encoding/json"
"strconv"
"time"

"github.com/openebs/velero-plugin/pkg/velero"
Expand Down Expand Up @@ -227,21 +228,22 @@ func (p *Plugin) cleanupRestore(oldvol, newvol, rname string) error {

// restoreVolume returns restored vol name and a boolean value indication if we need
// to restore the volume. If Volume is already restored, we don't need to restore it.
func (p *Plugin) restoreVolume(rname, volname, bkpname string) (string, error) {
func (p *Plugin) restoreVolume(rname, volname, bkpname string, port int) (string, error) {
zv, err := p.restoreZFSVolume(volname, bkpname)
if err != nil {
p.Log.Errorf("zfs: restore ZFSVolume failed vol %s bkp %s err %v", volname, bkpname, err)
return "", err
}

node := zv.Spec.OwnerNodeID
serverAddr := p.remoteAddr + ":" + strconv.Itoa(port)

rstr, err := restorebuilder.NewBuilder().
WithName(rname).
WithVolume(zv.Name).
WithNode(node).
WithStatus(apis.RSTZFSStatusInit).
WithRemote(p.remoteAddr).
WithRemote(serverAddr).
Build()

if err != nil {
Expand All @@ -263,15 +265,15 @@ func (p *Plugin) doRestore(snapshotID string) (string, error) {
return "", errors.Errorf("zfs: Error creating remote file name for restore")
}

newvol, err := p.restoreVolume(snapshotID, volname, bkpname)
newvol, err := p.restoreVolume(snapshotID, volname, bkpname, ZFSRestorePort)
if err != nil {
p.Log.Errorf("zfs: restoreVolume failed vol %s snap %s err: %v", volname, bkpname, err)
return "", err
}

go p.checkRestoreStatus(snapshotID)

ret := p.cl.Download(filename)
ret := p.cl.Download(filename, ZFSRestorePort)
if !ret {
p.cleanupRestore(volname, newvol, snapshotID)
return "", errors.New("zfs: failed to restore snapshot")
Expand Down
6 changes: 6 additions & 0 deletions pkg/zfs/plugin/zfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ const (
ZfsDriverName = "zfs.csi.openebs.io"

backupStatusInterval = 5

// port to connect for restoring the data
ZFSRestorePort = 9010

// port to connect for backup
ZFSBackupPort = 9011
)

// Plugin is a plugin for containing state for the blockstore
Expand Down
4 changes: 1 addition & 3 deletions pkg/zfs/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package utils

import (
"net"
"strconv"
"strings"
"time"

"github.com/gofrs/uuid"
cloud "github.com/openebs/velero-plugin/pkg/clouduploader"
"github.com/pkg/errors"
)

Expand All @@ -45,7 +43,7 @@ func GetServerAddress() (string, error) {
networkIP, ok := netInterfaceAddress.(*net.IPNet)
if ok && !networkIP.IP.IsLoopback() && networkIP.IP.To4() != nil {
ip := networkIP.IP.String()
return ip + ":" + strconv.Itoa(cloud.RecieverPort), nil
return ip, nil
}
}
return "", errors.New("error: fetching the interface")
Expand Down

0 comments on commit d1645b4

Please sign in to comment.