From 7fbc86af428921ed544b31195553a7e30063b374 Mon Sep 17 00:00:00 2001 From: Bryce Soghigian Date: Wed, 8 Jan 2025 07:53:45 -0800 Subject: [PATCH] feat: adding garbage collection logic for network interfaces and refactoring gc functions slightly --- .../nodeclaim/garbagecollection/controller.go | 110 +++++++++++++++--- pkg/providers/instance/instance.go | 4 +- 2 files changed, 93 insertions(+), 21 deletions(-) diff --git a/pkg/controllers/nodeclaim/garbagecollection/controller.go b/pkg/controllers/nodeclaim/garbagecollection/controller.go index 033dc31f3..207e0426e 100644 --- a/pkg/controllers/nodeclaim/garbagecollection/controller.go +++ b/pkg/controllers/nodeclaim/garbagecollection/controller.go @@ -19,8 +19,10 @@ package garbagecollection import ( "context" "fmt" + "sync" "time" + "github.com/Azure/karpenter-provider-azure/pkg/providers/instance" "github.com/awslabs/operatorpkg/singleton" // "github.com/Azure/karpenter-provider-azure/pkg/cloudprovider" @@ -42,9 +44,10 @@ import ( ) type Controller struct { - kubeClient client.Client - cloudProvider corecloudprovider.CloudProvider - successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller + kubeClient client.Client + cloudProvider corecloudprovider.CloudProvider + instanceProvider instance.Provider + successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller } func NewController(kubeClient client.Client, cloudProvider corecloudprovider.CloudProvider) *Controller { @@ -58,39 +61,110 @@ func NewController(kubeClient client.Client, cloudProvider corecloudprovider.Clo func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { ctx = injection.WithControllerName(ctx, "instance.garbagecollection") - // We LIST VMs on the CloudProvider BEFORE we grab NodeClaims/Nodes on the cluster so that we make sure that, if - // LISTing instances takes a long time, our information is more updated by the time we get to nodeclaim and Node LIST - // This works since our CloudProvider instances are deleted based on whether the NodeClaim exists or not, not vice-versa + // Perform VM garbage collection + if err := c.gcVMs(ctx); err != nil { + return reconcile.Result{}, fmt.Errorf("VM garbage collection failed: %w", err) + } + + // Perform NIC garbage collection + if err := c.gcNics(ctx); err != nil { + return reconcile.Result{}, fmt.Errorf("NIC garbage collection failed: %w", err) + } + + c.successfulCount++ + return reconcile.Result{ + RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2), + }, nil +} + +// gcVMs handles the garbage collection of virtual machines. +func (c *Controller) gcVMs(ctx context.Context) error { + // List VMs from the CloudProvider retrieved, err := c.cloudProvider.List(ctx) if err != nil { - return reconcile.Result{}, fmt.Errorf("listing cloudprovider VMs, %w", err) + return fmt.Errorf("listing cloudprovider VMs: %w", err) } + + // Filter out VMs that are marked for deletion managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool { return nc.DeletionTimestamp.IsZero() }) + + // List NodeClaims and Nodes from the cluster nodeClaimList := &karpv1.NodeClaimList{} - if err = c.kubeClient.List(ctx, nodeClaimList); err != nil { - return reconcile.Result{}, err + if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { + return fmt.Errorf("listing NodeClaims: %w", err) } + nodeList := &v1.NodeList{} if err := c.kubeClient.List(ctx, nodeList); err != nil { - return reconcile.Result{}, err + return fmt.Errorf("listing Nodes: %w", err) } + resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) { return n.Status.ProviderID, n.Status.ProviderID != "" })...) - errs := make([]error, len(retrieved)) + + errs := make([]error, len(managedRetrieved)) + workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) { - if !resolvedProviderIDs.Has(managedRetrieved[i].Status.ProviderID) && - time.Since(managedRetrieved[i].CreationTimestamp.Time) > time.Minute*5 { - errs[i] = c.garbageCollect(ctx, managedRetrieved[i], nodeList) + vm := managedRetrieved[i] + if !resolvedProviderIDs.Has(vm.Status.ProviderID) && + time.Since(vm.CreationTimestamp.Time) > 5*time.Minute { + errs[i] = c.garbageCollect(ctx, vm, nodeList) } }) - if err = multierr.Combine(errs...); err != nil { - return reconcile.Result{}, err + + if combinedErr := multierr.Combine(errs...); combinedErr != nil { + return combinedErr } - c.successfulCount++ - return reconcile.Result{RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2)}, nil + + return nil +} + +// gcNics handles the garbage collection of network interfaces. +func (c *Controller) gcNics(ctx context.Context) error { + // Refresh the list of NodeClaims after VM GC + nodeClaimList := &karpv1.NodeClaimList{} + if err := c.kubeClient.List(ctx, nodeClaimList); err != nil { + return fmt.Errorf("listing NodeClaims for NIC GC: %w", err) + } + + // Normalize NodeClaim names to match NIC naming conventions + nodeClaimNames := sets.New[string]() + for _, nodeClaim := range nodeClaimList.Items { + // Adjust the prefix as per the aks naming convention + nodeClaimNames.Insert(fmt.Sprintf("aks-%s", nodeClaim.Name)) + } + + // List all NICs from the instance provider, this List call will give us network interfaces that belong to karpenter + nics, err := c.instanceProvider.ListNics(ctx) + if err != nil { + return fmt.Errorf("listing NICs: %w", err) + } + + // Initialize a slice to collect errors from goroutines + var gcErrors []error + var mu sync.Mutex + + // Parallelize the garbage collection process for NICs + workqueue.ParallelizeUntil(ctx, 100, len(nics), func(i int) { + nicName := lo.FromPtr(nics[i].Name) + if !nodeClaimNames.Has(nicName) { + if err := c.instanceProvider.Delete(ctx, nicName); err != nil { + mu.Lock() + gcErrors = append(gcErrors, fmt.Errorf("deleting NIC %s: %w", nicName, err)) + mu.Unlock() + } + } + }) + + // Combine all errors into one + if len(gcErrors) > 0 { + return multierr.Combine(gcErrors...) + } + + return nil } func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *v1.NodeList) error { diff --git a/pkg/providers/instance/instance.go b/pkg/providers/instance/instance.go index 6ce31561c..0adfb3a2d 100644 --- a/pkg/providers/instance/instance.go +++ b/pkg/providers/instance/instance.go @@ -54,7 +54,6 @@ import ( var ( NodePoolTagKey = strings.ReplaceAll(karpv1.NodePoolLabelKey, "/", "_") - listQuery string CapacityTypeToPriority = map[string]string{ karpv1.CapacityTypeSpot: string(compute.Spot), @@ -84,7 +83,7 @@ type Provider interface { // CreateTags(context.Context, string, map[string]string) error Update(context.Context, string, armcompute.VirtualMachineUpdate) error GetNic(context.Context, string, string) (*armnetwork.Interface, error) - ListNics(context.Context) []*armnetwork.Interface + ListNics(context.Context) ([]*armnetwork.Interface, error) } // assert that DefaultProvider implements Provider interface @@ -113,7 +112,6 @@ func NewDefaultProvider( subscriptionID string, provisionMode string, ) *DefaultProvider { - listQuery = GetListQueryBuilder(resourceGroup).String() return &DefaultProvider{ azClient: azClient, instanceTypeProvider: instanceTypeProvider,