Skip to content

Commit

Permalink
fix(controller): prevent zfs volume cr deletion if snapshot exists
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <[email protected]>
  • Loading branch information
sinhaashish committed Jan 24, 2025
1 parent a48fcdb commit 2b0b689
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 105 deletions.
173 changes: 164 additions & 9 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -32,6 +33,8 @@ import (
csipayload "github.com/openebs/zfs-localpv/pkg/response"
"github.com/openebs/zfs-localpv/pkg/version"
"github.com/openebs/zfs-localpv/pkg/zfs"

apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
)

// size constants
Expand Down Expand Up @@ -502,6 +505,18 @@ func (cs *controller) DeleteVolume(
unlock := cs.volumeLock.LockVolume(volumeID)
defer unlock()

// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.VolumeId,
err.Error(),
)
}

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
Expand All @@ -524,14 +539,16 @@ func (cs *controller) DeleteVolume(
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
}

// Delete the corresponding ZV CR
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
// Delete the corresponding ZV CR only if there are no snapshots present for the volume
if len(snapList.Items) == 0 {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
}

sendEventOrIgnore("", volumeID, vol.Spec.Capacity, analytics.VolumeDeprovision)
Expand Down Expand Up @@ -806,6 +823,8 @@ func (cs *controller) DeleteSnapshot(
return nil, status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}

var snapList *apis.ZFSSnapshotList
var isCorrespondingPvcPresent bool
klog.Infof("DeleteSnapshot request for %s", req.SnapshotId)

// snapshodID is formed as <volname>@<snapname>
Expand All @@ -815,8 +834,27 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
volumeId := snapshotID[0]
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()

// verify if the volume has already been deleted
snap, err := zfs.GetZFSSnapshot(snapshotID[1])
if snap != nil && snap.DeletionTimestamp != nil {
goto deleteSnapshotResponse
}

if err != nil {
if k8serror.IsNotFound(err) {
goto deleteSnapshotResponse
}
return nil, errors.Wrapf(
err,
"failed to get snapshot for {%s}",
snapshotID[1],
)
}

if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -825,7 +863,59 @@ func (cs *controller) DeleteSnapshot(
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil

if err := waitForSnapshotDelete(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to verify DeleteSnapshot for %s, {%s}",
req.SnapshotId,
err.Error(),
)
}

isCorrespondingPvcPresent, err = isCorrespondingPVCPresent(volumeId)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
volumeId,
err.Error(),
)
}

klog.Infof("DeleteSnapshot request for $$$$$$$$$$$$$$ %+v", isCorrespondingPvcPresent)

// Fetch the list of snapshot for the given volume
snapList, err = zfs.GetSnapshotForVolume(volumeId)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
volumeId,
err.Error(),
)
}

klog.Infof("DeleteSnapshot request for $$$$$$$$$$$$$$ %+v", snapList)
klog.Infof("Number of snapshot before %d", len(snapList.Items))

// Delete the corresponding ZV CR only if there are no snapshots present
// for the volume and the parent pvc is deleted

if len(snapList.Items) == 0 && !isCorrespondingPvcPresent {
err = zfs.DeleteVolume(volumeId)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeId,
)
}
}
deleteSnapshotResponse:
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

// ListSnapshots lists all snapshots for the
Expand Down Expand Up @@ -1128,3 +1218,68 @@ func LabelIndexFunc(label string) cache.IndexFunc {
return vs, nil
}
}

func waitForSnapshotDelete(snapname string) error {
for {
snap, err := zfs.GetZFSSnapshot(snapname)
klog.Infof("##########SNAP############## %+v", snap)
klog.Infof("$$$$$$$$$$$$$$ Error $$$$$$$$$$$$$$ %+v", err)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return status.Errorf(codes.Internal,
"zfs: destroy wait failed, not able to get the snap %s %s", snapname, err.Error())
}
time.Sleep(time.Second)
klog.Infof("waiting for snap to be destroyed %s", snapname)
}
}

// Function to check if a PVC exists for a given volume across all namespaces
func isCorrespondingPVCPresent(volumeName string) (bool, error) {

cfg, err := k8sapi.Config().Get()
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to build kubeconfig: %v",
err,
)
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to build k8s clientset: %v",
err,
)
}
isPVCPresent := true

pv, err := clientset.CoreV1().PersistentVolumes().Get(context.TODO(), volumeName, metav1.GetOptions{})
// if the PV is not found, then the PVC is not present
if err != nil {
isPVCPresent = false
}

if pv.Spec.ClaimRef != nil {

pvcName := pv.Spec.ClaimRef.Name
pvcNameSpace := pv.Spec.ClaimRef.Namespace
klog.Infof("pvcName Hrithik roshaan %s", pvcName)
klog.Infof("pvcNameSpace Kangana %s", pvcNameSpace)

// Check if the PVC exists
pvc, err := clientset.CoreV1().PersistentVolumeClaims(pvcNameSpace).Get(context.TODO(), pvcName, metav1.GetOptions{})
klog.Infof("pvc Details %s", pvc)
klog.Infof("pvc error %s", err)
if err != nil {
isPVCPresent = false
fmt.Printf("PVC '%s' in namespace '%s' not found: %v\n", pvcName, pvcNameSpace, err)
}
}

return isPVCPresent, nil
}
20 changes: 20 additions & 0 deletions pkg/response/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,23 @@ func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

// DeleteSnapshotResponseBuilder helps building an
// instance of csi DeleteSnapshotResponse
type DeleteSnapshotResponseBuilder struct {
response *csi.DeleteSnapshotResponse
}

// NewDeleteSnapshotResponseBuilder returns a new
// instance of DeleteSnapshotResponseBuilder
func NewDeleteSnapshotResponseBuilder() *DeleteSnapshotResponseBuilder {
return &DeleteSnapshotResponseBuilder{
response: &csi.DeleteSnapshotResponse{},
}
}

// Build returns the constructed instance
// of csi DeleteSnapshotResponse
func (b *DeleteSnapshotResponseBuilder) Build() *csi.DeleteSnapshotResponse {
return b.response
}
9 changes: 9 additions & 0 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,12 @@ func IsVolumeReady(vol *apis.ZFSVolume) bool {

return false
}

// GetSnapshotForVolume fetches all the snapshots for the given volume
func GetSnapshotForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}
68 changes: 39 additions & 29 deletions tests/provision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
var _ = Describe("[zfspv] TEST VOLUME PROVISIONING", func() {
Context("App is deployed with zfs driver", func() {
It("Running zfs volume Creation Test", volumeCreationTest)
It("Running zfs volume Creation Test with custom node id", Label("custom-node-id"), volumeCreationTest)
//It("Running zfs volume Creation Test with custom node id", Label("custom-node-id"), volumeCreationTest)
})
})

Expand All @@ -41,7 +41,7 @@ func exhaustiveVolumeTests(parameters map[string]string) {
snapshotAndCloneCreate()
// btrfs does not support online resize
if fstype != "btrfs" {
By("Resizing the PVC", resizeAndVerifyPVC)
By("Resizing the PVC", func() { resizeAndVerifyPVC(pvcNameFS) })
}
snapshotAndCloneCleanUp()
cleanUp()
Expand All @@ -51,62 +51,72 @@ func exhaustiveVolumeTests(parameters map[string]string) {
func create(parameters map[string]string) {
By("####### Creating the storage class : " + parameters["fstype"] + " #######")
createFstypeStorageClass(parameters)
By("creating and verifying PVC bound status", createAndVerifyPVC)
By("Creating and deploying app pod", createDeployVerifyApp)
By("creating and verifying PVC bound status", func() { createAndVerifyPVC(pvcNameFS) })
By("Creating and deploying app pod", func() { createDeployVerifyApp(appNameFS, pvcNameFS) })
By("verifying ZFSVolume object", VerifyZFSVolume)
By("verifying storage class parameters")
VerifyStorageClassParams(parameters)
}

// Creates the snapshot/clone resources
func snapshotAndCloneCreate() {
createSnapshot(pvcName, snapName)
verifySnapshotCreated(snapName)
createClone(clonePvcName, snapName, scObj.Name)
By("Creating and deploying clone app pod", createDeployVerifyCloneApp)
createSnapshot(pvcNameFS, snapNameFS)
verifySnapshotCreated(snapNameFS)
createClone(clonePvcNameFS, snapNameFS, scObj.Name)
By("Creating and deploying clone app pod", func() { createDeployVerifyCloneApp(cloneAppNameFS, clonePvcNameFS) })
}

// Removes the snapshot/clone resources
func snapshotAndCloneCleanUp() {
deleteAppDeployment(cloneAppName)
deletePVC(clonePvcName)
deleteSnapshot(pvcName, snapName)
deleteAppDeployment(cloneAppNameFS)
deletePVC(clonePvcNameFS)
deleteSnapshot(pvcNameFS, snapNameFS)
}

// Removes the resources
func cleanUp() {
deleteAppDeployment(appName)
deletePVC(pvcName)
deleteAppDeployment(appNameFS)
deletePVC(pvcNameFS)
By("Deleting storage class", deleteStorageClass)
}

func blockVolCreationTest() {
By("Creating default storage class", createStorageClass)
By("creating and verifying PVC bound status", createAndVerifyBlockPVC)
By("creating and verifying PVC bound status", func() { createAndVerifyPVC(pvcNameBlock) })

By("Creating and deploying app pod", createDeployVerifyBlockApp)
By("Creating and deploying app pod", func() { createDeployVerifyApp(appNameBlock, pvcNameBlock) })
By("verifying ZFSVolume object", VerifyZFSVolume)
By("verifying ZFSVolume property change", VerifyZFSVolumePropEdit)
By("Deleting application deployment")
//By("verifying ZFSVolume property change", VerifyZFSVolumePropEdit)

createSnapshot(pvcName, snapName)
verifySnapshotCreated(snapName)
createClone(clonePvcName, snapName, scObj.Name)
By("Creating and deploying clone app pod", createDeployVerifyCloneApp)
createSnapshot(pvcNameBlock, snapNameBlock)
verifySnapshotCreated(snapNameBlock)
createClone(clonePvcNameBlock, snapNameBlock, scObj.Name)
By("Creating and deploying clone app pod", func() { createDeployVerifyCloneApp(cloneAppNameBlock, clonePvcNameBlock) })

By("Deleting clone and main application deployment")
deleteAppDeployment(cloneAppName)
deleteAppDeployment(appName)
By("Deleting main application deployment")
deleteAppDeployment(appNameBlock)

By("Deleting snapshot, main pvc and clone pvc")
deletePVC(clonePvcName)
deleteSnapshot(pvcName, snapName)
deletePVC(pvcName)
zvName := getZVName(pvcNameBlock)
By("Deleting main pvc")
deletePVC(pvcNameBlock)

By("Verifying ZFSVolume object after pvc deletion when snapshot is present", VerifyZFSVolume)

By("Deleting clone application deployment")
deleteAppDeployment(cloneAppNameBlock)

By("Deleting snapshot and clone pvc")

deletePVC(clonePvcNameBlock)
By("Verifying that ZV is present after pvc deletion ", func() { isZVPresentConsistently(zvName) })
deleteSnapshot(pvcNameBlock, snapNameBlock)
//deleteZV(zvName)
By("Verifying that ZV is deleted after snapshot deletion ", func() { IsZVDeletedEventually(zvName) })

By("Deleting storage class", deleteStorageClass)
}

func volumeCreationTest() {
By("Running volume creation test", fsVolCreationTest)
//By("Running volume creation test", fsVolCreationTest)
By("Running block volume creation test", blockVolCreationTest)
}
Loading

0 comments on commit 2b0b689

Please sign in to comment.