Skip to content

Commit

Permalink
Feature: support to skip the validation of indirectly related resourc…
Browse files Browse the repository at this point in the history
…es involved in webhook mutating

Signed-off-by: jiuyu <[email protected]>
  • Loading branch information
jiuyu committed Jan 14, 2025
1 parent 4e199c8 commit 9f831d1
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 73 deletions.
9 changes: 5 additions & 4 deletions pkg/application/inject/fuse/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ func (s *Injector) inject(in runtime.Object, runtimeInfos map[string]base.Runtim
Log: s.log,
Specs: podSpecs,
Options: common.FuseSidecarInjectOption{
EnableCacheDir: utils.InjectCacheDirEnabled(podSpecs.MetaObj.Labels),
EnableUnprivilegedSidecar: utils.FuseSidecarUnprivileged(podSpecs.MetaObj.Labels),
SkipSidecarPostStartInject: utils.SkipSidecarPostStartInject(podSpecs.MetaObj.Labels),
EnableCacheDir: utils.InjectCacheDirEnabled(podSpecs.MetaObj.Labels),
EnableUnprivilegedSidecar: utils.FuseSidecarUnprivileged(podSpecs.MetaObj.Labels),
SkipSidecarPostStartInject: utils.SkipSidecarPostStartInject(podSpecs.MetaObj.Labels),
SkipIndirectlyRelatedResourceValidation: utils.SkipIndirectlyRelatedResourceValidationEnable(podSpecs.MetaObj.Annotations),
},
ExtraArgs: mutator.FindExtraArgsFromMetadata(podSpecs.MetaObj, platform),
}
Expand Down Expand Up @@ -258,5 +259,5 @@ func (s *Injector) shouldInject(pod common.FluidObject) (should bool, err error)
}

func (s *Injector) getServerlessPlatformFromMeta(metaObj metav1.ObjectMeta) string {
return utils.GetServerlessPlatfrom(metaObj.Labels)
return utils.GetServerlessPlatform(metaObj.Labels)
}
2 changes: 1 addition & 1 deletion pkg/application/inject/fuse/mutator/mutator_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewDefaultMutator(args MutatorBuildArgs) Mutator {
var _ Mutator = &DefaultMutator{}

func (mutator *DefaultMutator) MutateWithRuntimeInfo(pvcName string, runtimeInfo base.RuntimeInfoInterface, nameSuffix string) error {
template, err := runtimeInfo.GetFuseContainerTemplate()
template, err := runtimeInfo.GetFuseContainerTemplate(mutator.options.SkipIndirectlyRelatedResourceValidation)
if err != nil {
return errors.Wrapf(err, "failed to get fuse container template for runtime \"%s/%s\"", runtimeInfo.GetNamespace(), runtimeInfo.GetName())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewUnprivilegedMutator(opts MutatorBuildArgs) Mutator {
}

func (mutator *UnprivilegedMutator) MutateWithRuntimeInfo(pvcName string, runtimeInfo base.RuntimeInfoInterface, nameSuffix string) error {
template, err := runtimeInfo.GetFuseContainerTemplate()
template, err := runtimeInfo.GetFuseContainerTemplate(mutator.options.SkipIndirectlyRelatedResourceValidation)
if err != nil {
return errors.Wrapf(err, "failed to get fuse container template for runtime \"%s/%s\"", runtimeInfo.GetNamespace(), runtimeInfo.GetName())
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,7 @@ const (
K8sZoneLabelKey = "topology.kubernetes.io/zone"
K8sRegionLabelKey = "topology.kubernetes.io/region"
)

const (
SkipResourceValidationAnnotationKey = "sidecar.fluid.io/skip-resource-validation"
)
7 changes: 4 additions & 3 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ type FuseMountInfo struct {

// FuseSidecarInjectOption are options for webhook to inject fuse sidecar containers
type FuseSidecarInjectOption struct {
EnableCacheDir bool
EnableUnprivilegedSidecar bool
SkipSidecarPostStartInject bool
EnableCacheDir bool
EnableUnprivilegedSidecar bool
SkipSidecarPostStartInject bool
SkipIndirectlyRelatedResourceValidation bool
}

func (f FuseSidecarInjectOption) String() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/base/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type RuntimeInfoInterface interface {

IsDeprecatedPVName() bool

GetFuseContainerTemplate() (template *common.FuseInjectionTemplate, err error)
GetFuseContainerTemplate(skipRelatedResourceCheck bool) (template *common.FuseInjectionTemplate, err error)

SetClient(client client.Client)

Expand Down
45 changes: 41 additions & 4 deletions pkg/ddc/base/runtime_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
"fmt"
"time"

"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

// GetFuseContainerTemplate collects the fuse container spec from the runtime's fuse daemonSet spec. The function summarizes fuse related information into
// the template and returns it. The template then can be freely modified according to need of the serverless platform.
func (info *RuntimeInfo) GetFuseContainerTemplate() (template *common.FuseInjectionTemplate, err error) {
func (info *RuntimeInfo) GetFuseContainerTemplate(skipIndirectlyRelatedResourceValidation bool) (template *common.FuseInjectionTemplate, err error) {
if utils.IsTimeTrackerDebugEnabled() {
defer utils.TimeTrack(time.Now(), "RuntimeInfo.GetFuseContainerTemplate",
"runtime.name", info.name, "runtime.namespace", info.namespace)
Expand All @@ -52,7 +53,11 @@ func (info *RuntimeInfo) GetFuseContainerTemplate() (template *common.FuseInject

template.FuseContainer.Name = common.FuseContainerName

hostMountPath, mountType, subPath, err := kubeclient.GetMountInfoFromVolumeClaim(info.client, info.name, info.namespace)
hostMountPath, mountType, subPath, err := info.getMountInfo(skipIndirectlyRelatedResourceValidation)
if err != nil {
return template, errors.Wrapf(err, "failed to get mount info")
}

if err != nil {
return template, errors.Wrapf(err, "failed get mount info from PVC \"%s/%s\"", info.namespace, info.name)
}
Expand Down Expand Up @@ -87,3 +92,35 @@ func (info *RuntimeInfo) getFuseDaemonset() (ds *appsv1.DaemonSet, err error) {
}
return kubeclient.GetDaemonset(info.client, fuseName, info.GetNamespace())
}

func (info *RuntimeInfo) getMountInfo(skipUnnecessaryResourceValidation bool) (path, mountType, subpath string, err error) {
var pv *corev1.PersistentVolume

pvName := info.GetPersistentVolumeName()

if !skipUnnecessaryResourceValidation {
var pvc *corev1.PersistentVolumeClaim
pvc, err = kubeclient.GetPersistentVolumeClaim(info.client, info.name, info.namespace)
if err != nil {
err = errors.Wrapf(err, "failed to get persistent volume claim %s/%s", info.namespace, info.name)
return
}
pvName = pvc.Spec.VolumeName
}

pv, err = kubeclient.GetPersistentVolume(info.client, pvName)
if err != nil {
err = errors.Wrapf(err, "cannot find pvc \"%s/%s\"'s bounded PV", info.namespace, info.name)
return
}

if pv.Spec.CSI != nil && len(pv.Spec.CSI.VolumeAttributes) > 0 {
path = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrFluidPath]
mountType = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrMountType]
subpath = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrFluidSubPath]
} else {
err = fmt.Errorf("the pv %s is not created by fluid")

Check failure on line 122 in pkg/ddc/base/runtime_helper.go

View workflow job for this annotation

GitHub Actions / staticcheck

Printf format %s reads arg #1, but call has only 0 args (SA5009)
}

return
}
6 changes: 5 additions & 1 deletion pkg/utils/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const (
PlatformUnprivileged = "Unprivileged"
)

func GetServerlessPlatfrom(infos map[string]string) (platform string) {
func GetServerlessPlatform(infos map[string]string) (platform string) {
if matchedKey(infos, ServerlessPlatformKey) {
return infos[ServerlessPlatformKey]
}
Expand Down Expand Up @@ -129,6 +129,10 @@ func serverlessPlatformMatched(infos map[string]string) (match bool) {
return matchedKey(infos, ServerlessPlatformKey)
}

func SkipIndirectlyRelatedResourceValidationEnable(infos map[string]string) (match bool) {
return enabled(infos, common.SkipResourceValidationAnnotationKey)
}

// enabled checks if the given name has a value of "true"
func enabled(infos map[string]string, name string) (match bool) {
return matchedValue(infos, name, common.True)
Expand Down
31 changes: 0 additions & 31 deletions pkg/utils/kubeclient/volume_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ package kubeclient

import (
"context"
"fmt"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -37,30 +33,3 @@ func GetPersistentVolumeClaim(client client.Client, name, namespace string) (pvc
pvc)
return
}

// GetMountInfoFromVolumeClaim gets the mountPath and type for CSI plugin
func GetMountInfoFromVolumeClaim(client client.Client, name, namespace string) (path string, mountType string, subpath string, err error) {
pvc, err := GetPersistentVolumeClaim(client, name, namespace)
if err != nil {
err = errors.Wrapf(err, "failed to get persistent volume claim")
return
}

pv, err := GetPersistentVolume(client, pvc.Spec.VolumeName)
if err != nil {
err = errors.Wrapf(err, "cannot find pvc \"%s/%s\"'s bounded PV", pvc.Namespace, pvc.Name)
return
}

if pv.Spec.CSI != nil && len(pv.Spec.CSI.VolumeAttributes) > 0 {
path = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrFluidPath]
mountType = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrMountType]
subpath = pv.Spec.CSI.VolumeAttributes[common.VolumeAttrFluidSubPath]
} else {
err = fmt.Errorf("the pvc %s in %s is not created by fluid",
name,
namespace)
}

return
}
3 changes: 2 additions & 1 deletion pkg/webhook/handler/mutating/mutating_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func (a *FluidMutatingHandler) MutatePod(pod *corev1.Pod) (err error) {
var setupLog = ctrl.Log.WithName("AddScheduleInfoToPod")
setupLog.V(1).Info("start to add schedule info", "Pod", pod.Name, "Namespace", pod.Namespace)
pvcNames := kubeclient.GetPVCNamesFromPod(pod)
runtimeInfos, err := webhookutils.CollectRuntimeInfosFromPVCs(a.Client, pvcNames, pod.Namespace, setupLog)
runtimeInfos, err := webhookutils.CollectRuntimeInfosFromPVCs(a.Client, pvcNames, pod.Namespace, setupLog,
utils.SkipIndirectlyRelatedResourceValidationEnable(pod.Annotations))
if err != nil {
setupLog.Error(err, "failed to collect runtime infos from PVCs", "pvcNames", pvcNames)
return errors.Wrapf(err, "failed to collect runtime infos from PVCs %v", pvcNames)
Expand Down
3 changes: 2 additions & 1 deletion pkg/webhook/plugins/fusesidecar/fuse_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (p *FuseSidecar) Mutate(pod *corev1.Pod, runtimeInfos map[string]base.Runti
}
out.DeepCopyInto(pod)
pvcNames := kubeclient.GetPVCNamesFromPod(pod)
runtimeInfos, err = webhookutils.CollectRuntimeInfosFromPVCs(p.client, pvcNames, pod.Namespace, p.log)
runtimeInfos, err = webhookutils.CollectRuntimeInfosFromPVCs(p.client, pvcNames, pod.Namespace, p.log,
utils.SkipIndirectlyRelatedResourceValidationEnable(pod.Annotations))
if err != nil {
return shouldStop, errors.Wrapf(err, "failed to collect runtime infos from PVCs %v", pvcNames)
}
Expand Down
46 changes: 21 additions & 25 deletions pkg/webhook/utils/runtime_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func CollectRuntimeInfosFromPVCs(client client.Client, pvcNames []string, namespace string, setupLog logr.Logger) (runtimeInfos map[string]base.RuntimeInfoInterface, err error) {
func CollectRuntimeInfosFromPVCs(client client.Client, pvcNames []string, namespace string, setupLog logr.Logger, skipRelatedResourceCheckEnable bool) (runtimeInfos map[string]base.RuntimeInfoInterface, err error) {
if utils.IsTimeTrackerDebugEnabled() {
defer utils.TimeTrack(time.Now(), "CreateUpdatePodForSchedulingHandler.checkIfDatasetPVCs",
"pvc.names", pvcNames, "pvc.namespace", namespace)
Expand Down Expand Up @@ -50,20 +50,7 @@ func CollectRuntimeInfosFromPVCs(client client.Client, pvcNames []string, namesp
}
isDatasetPVC = kubeclient.CheckIfPVCIsDataset(pvc)
if isDatasetPVC {
// isReferringPVC, referringName, referringNamespace := kubeclient.GetReferringDatasetPVCInfo(pvc)
// if isReferringPVC {
// pvc, err = kubeclient.GetPersistentVolumeClaim(a.Client, referringName, referringNamespace)
// if err != nil {
// setupLog.Error(err,
// "unable to get referring pvc, get failure",
// "name", referringName,
// "namespace", referringNamespace)
// return
// }
// }

runtimeInfo, err = buildRuntimeInfoInternal(client, pvc, setupLog)
// runtimeInfo, err = base.GetRuntimeInfo(a.Client, pvcName, namespace)
runtimeInfo, err = buildRuntimeInfoInternal(client, pvc, setupLog, skipRelatedResourceCheckEnable)
if err != nil {
err = errors.Wrapf(err, "failed to build runtime info for PVC \"%v/%v\"", namespace, pvcName)
return
Expand All @@ -88,7 +75,7 @@ func CollectRuntimeInfosFromPVCs(client client.Client, pvcNames []string, namesp

func buildRuntimeInfoInternal(client client.Client,
pvc *corev1.PersistentVolumeClaim,
log logr.Logger) (runtimeInfo base.RuntimeInfoInterface, err error) {
log logr.Logger, skipRelatedResourceCheckEnable bool) (runtimeInfo base.RuntimeInfoInterface, err error) {
if utils.IsTimeTrackerDebugEnabled() {
defer utils.TimeTrack(time.Now(), "mutating.buildRuntimeInfoInternalByPVC",
"pvc.name", pvc.GetName(), "pvc.namespace", pvc.GetNamespace())
Expand All @@ -102,7 +89,23 @@ func buildRuntimeInfoInternal(client client.Client,
pvcName = datasetName
}

dataset, err := utils.GetDataset(client, pvcName, namespace)
if !skipRelatedResourceCheckEnable {
if err = checkDatasetBound(client, pvcName, namespace); err != nil {
return
}
}

runtimeInfo, err = base.GetRuntimeInfo(client, pvcName, namespace)
if err != nil {
log.Error(err, "unable to get runtimeInfo, get failure", "runtime", pvc.GetName(), "namespace", namespace)
return
}
runtimeInfo.SetDeprecatedNodeLabel(false)
return
}

func checkDatasetBound(client client.Client, name, namespace string) (err error) {
dataset, err := utils.GetDataset(client, name, namespace)
if err != nil {
return
}
Expand All @@ -116,12 +119,5 @@ func buildRuntimeInfoInternal(client client.Client,
err = fmt.Errorf("dataset \"%s/%s\" not bound", dataset.Namespace, dataset.Name)
return
}

runtimeInfo, err = base.GetRuntimeInfo(client, pvcName, namespace)
if err != nil {
log.Error(err, "unable to get runtimeInfo, get failure", "runtime", pvc.GetName(), "namespace", namespace)
return
}
runtimeInfo.SetDeprecatedNodeLabel(false)
return
return nil
}

0 comments on commit 9f831d1

Please sign in to comment.