Skip to content

Commit

Permalink
fix(controller): prevent zfs volume cr deletion if snapshot exists
Browse files Browse the repository at this point in the history
Signed-off-by: sinhaashish <[email protected]>
  • Loading branch information
sinhaashish committed Jan 29, 2025
1 parent aa590cf commit 014e5a4
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 120 deletions.
9 changes: 9 additions & 0 deletions pkg/builder/volbuilder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ func (b *Builder) WithVolBlockSize(bs string) *Builder {
return b
}

// WithAnnotation sets the annotation of ZFSVolume
func (b *Builder) WithAnnotation() *Builder {
if b.volume.Object.Annotations == nil {
b.volume.Object.Annotations = make(map[string]string)
}
b.volume.Object.Annotations["openebs.zv.isEligibleForDeletion"] = "true"
return b
}

// WithVolumeType sets if ZFSVolume needs to be thin provisioned
func (b *Builder) WithVolumeType(vtype string) *Builder {
b.volume.Object.Spec.VolumeType = vtype
Expand Down
123 changes: 112 additions & 11 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,14 @@ func (cs *controller) DeleteVolume(
defer unlock()

// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
vol, err := zfs.GetZFSVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
Expand All @@ -524,19 +524,43 @@ func (cs *controller) DeleteVolume(
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
}

// Delete the corresponding ZV CR
err = zfs.DeleteVolume(volumeID)
// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete volume request for {%s}, "+
"validation failed checking for snapshots. Error: %s",
req.VolumeId,
err.Error(),
)
}

// Delete the corresponding ZV CR only if there are no snapshots present for the volume
if len(snapList.Items) == 0 {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
} else {
// add annotation to the volume to indicate that it is elegible for deletion
err = zfs.MarkForDeletion(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}

}

sendEventOrIgnore("", volumeID, vol.Spec.Capacity, analytics.VolumeDeprovision)

deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}

Expand Down Expand Up @@ -815,8 +839,50 @@ func (cs *controller) DeleteSnapshot(
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
volumeID := snapshotID[0]
unlock := cs.volumeLock.LockVolumeWithSnapshot(snapshotID[0], snapshotID[1])
defer unlock()

// verify if the snapshot has already been deleted
snap, err := zfs.GetZFSSnapshot(snapshotID[1])
if snap != nil && snap.DeletionTimestamp != nil {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

if err != nil {
if k8serror.IsNotFound(err) {
return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}
return nil, errors.Wrapf(
err,
"failed to get snapshot for {%s}",
snapshotID[1],
)
}

// Fetch the list of snapshot for the given volume
snapList, err := zfs.GetSnapshotForVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for snapshot list for volume. Error: %s",
volumeID,
err.Error(),
)
}

eligibeForDeltion, err := zfs.IsVolumeEligibleForDeletion(volumeID)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle delete snapshot request for {%s}, "+
"validation failed checking for eligine for deletion. Error: %s",
volumeID,
err.Error(),
)
}

if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
Expand All @@ -825,7 +891,21 @@ func (cs *controller) DeleteSnapshot(
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil

// Delete the corresponding ZV CR only if this is the last snapshot
// for the volume and the corresponding pvc is deleted
if len(snapList.Items) == 1 && eligibeForDeltion {
err = zfs.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
}

return csipayload.NewDeleteSnapshotResponseBuilder().Build(), nil
}

// ListSnapshots lists all snapshots for the
Expand Down Expand Up @@ -1128,3 +1208,24 @@ func LabelIndexFunc(label string) cache.IndexFunc {
return vs, nil
}
}

// func getClient() (*kube.Client, error) {
// cfg, err := k8sapi.Config().Get()
// if err != nil {
// return nil, status.Errorf(
// codes.Internal,
// "failed to build kubeconfig: %v",
// err,
// )
// }

// clientset, err := kubernetes.NewForConfig(cfg)
// if err != nil {
// return nil, status.Errorf(
// codes.Internal,
// "failed to build k8s clientset: %v",
// err,
// )
// }
// return clientset, nil
// }
20 changes: 20 additions & 0 deletions pkg/response/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,23 @@ func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

// DeleteSnapshotResponseBuilder helps building an
// instance of csi DeleteSnapshotResponse
type DeleteSnapshotResponseBuilder struct {
response *csi.DeleteSnapshotResponse
}

// NewDeleteSnapshotResponseBuilder returns a new
// instance of DeleteSnapshotResponseBuilder
func NewDeleteSnapshotResponseBuilder() *DeleteSnapshotResponseBuilder {
return &DeleteSnapshotResponseBuilder{
response: &csi.DeleteSnapshotResponse{},
}
}

// Build returns the constructed instance
// of csi DeleteSnapshotResponse
func (b *DeleteSnapshotResponseBuilder) Build() *csi.DeleteSnapshotResponse {
return b.response
}
105 changes: 99 additions & 6 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -213,12 +216,12 @@ func DeleteSnapshot(snapname string) (err error) {
return
}

// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}
// // GetVolume the corresponding ZFSVolume CR
// func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
// return volbuilder.NewKubeclient().
// WithNamespace(OpenEBSNamespace).
// Get(volumeID, metav1.GetOptions{})
// }

// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
Expand Down Expand Up @@ -251,6 +254,18 @@ func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
return vol, err
}

// UpdateZFSVolumeAnnotation updtates the ZFSVolume CR with the given annotation
func UpdateZFSVolumeAnnotation(vol *apis.ZFSVolume) error {
newVol, err := volbuilder.BuildFrom(vol).
WithAnnotation().Build()

if err != nil {
return err
}
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}

// GetZFSVolumeState returns ZFSVolume OwnerNode and State for
// the given volume. CreateVolume request may call it again and
// again until volume is "Ready".
Expand Down Expand Up @@ -441,3 +456,81 @@ func IsVolumeReady(vol *apis.ZFSVolume) bool {

return false
}

// GetSnapshotForVolume fetches all the snapshots for the given volume
func GetSnapshotForVolume(volumeID string) (*apis.ZFSSnapshotList, error) {
listOptions := metav1.ListOptions{
LabelSelector: ZFSVolKey + "=" + volumeID,
}
snapList, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions)
return snapList, err
}

// MarkForDeletion marks the volume for deletion by adding the annotation
func MarkForDeletion(volumeName string) error {
zv, err := GetZFSVolume(volumeName)
if err != nil {
klog.Errorf("failed to get ZV %s: %v", volumeName, err)
return err
}

err = UpdateZFSVolumeAnnotation(zv)
if err != nil {
klog.Errorf("failed to update ZV %s: %v", volumeName, err)
return err
}
return nil
}

// Function to check if the volume can be deleted or not
func IsVolumeEligibleForDeletion(volumeName string) (bool, error) {
cfg, err := k8sapi.Config().Get()
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to build kubeconfig: %v",
err,
)
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to build k8s clientset: %v",
err,
)
}

pv, err := clientset.CoreV1().PersistentVolumes().Get(context.TODO(), volumeName, metav1.GetOptions{})
// If the PV is not found, then assuming the PVC is not present
// and thus the ZV is eligible for deletion
if err != nil {
return true, nil
}
// If the PV exist and does not have the calim reference
// then assuming the PVC is not present and thus the ZV is eligible for deletion
if pv.Spec.ClaimRef == nil {
return true, nil
}

// If the PV exist and has the reclaim policy as retain
// then the ZV is not eligible for deletion
if pv.Spec.PersistentVolumeReclaimPolicy == "Retain" {
return false, nil
}

zfsVol, err := GetZFSVolume(volumeName)
if err != nil {
return false, status.Errorf(
codes.Internal,
"failed to get ZFSVolume %s: %v",
volumeName,
err,
)
}
if zfsVol.Annotations["openebs.zv.isEligibleForDeletion"] == "true" {
return true, nil
}
return false, nil
}
Loading

0 comments on commit 014e5a4

Please sign in to comment.