Skip to content

Commit

Permalink
feat: adding garbage collection logic for network interfaces and refa…
Browse files Browse the repository at this point in the history
…ctoring gc functions slightly
  • Loading branch information
Bryce-Soghigian committed Jan 8, 2025
1 parent 198818a commit 7fbc86a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 21 deletions.
110 changes: 92 additions & 18 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package garbagecollection
import (
"context"
"fmt"
"sync"
"time"

"github.com/Azure/karpenter-provider-azure/pkg/providers/instance"
"github.com/awslabs/operatorpkg/singleton"

// "github.com/Azure/karpenter-provider-azure/pkg/cloudprovider"
Expand All @@ -42,9 +44,10 @@ import (
)

type Controller struct {
kubeClient client.Client
cloudProvider corecloudprovider.CloudProvider
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
kubeClient client.Client
cloudProvider corecloudprovider.CloudProvider
instanceProvider instance.Provider
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
}

func NewController(kubeClient client.Client, cloudProvider corecloudprovider.CloudProvider) *Controller {
Expand All @@ -58,39 +61,110 @@ func NewController(kubeClient client.Client, cloudProvider corecloudprovider.Clo
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "instance.garbagecollection")

// We LIST VMs on the CloudProvider BEFORE we grab NodeClaims/Nodes on the cluster so that we make sure that, if
// LISTing instances takes a long time, our information is more updated by the time we get to nodeclaim and Node LIST
// This works since our CloudProvider instances are deleted based on whether the NodeClaim exists or not, not vice-versa
// Perform VM garbage collection
if err := c.gcVMs(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("VM garbage collection failed: %w", err)
}

// Perform NIC garbage collection
if err := c.gcNics(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("NIC garbage collection failed: %w", err)
}

c.successfulCount++
return reconcile.Result{
RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2),
}, nil
}

// gcVMs handles the garbage collection of virtual machines.
func (c *Controller) gcVMs(ctx context.Context) error {
// List VMs from the CloudProvider
retrieved, err := c.cloudProvider.List(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing cloudprovider VMs, %w", err)
return fmt.Errorf("listing cloudprovider VMs: %w", err)
}

// Filter out VMs that are marked for deletion
managedRetrieved := lo.Filter(retrieved, func(nc *karpv1.NodeClaim, _ int) bool {
return nc.DeletionTimestamp.IsZero()
})

// List NodeClaims and Nodes from the cluster
nodeClaimList := &karpv1.NodeClaimList{}
if err = c.kubeClient.List(ctx, nodeClaimList); err != nil {
return reconcile.Result{}, err
if err := c.kubeClient.List(ctx, nodeClaimList); err != nil {
return fmt.Errorf("listing NodeClaims: %w", err)
}

nodeList := &v1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList); err != nil {
return reconcile.Result{}, err
return fmt.Errorf("listing Nodes: %w", err)
}

resolvedProviderIDs := sets.New[string](lo.FilterMap(nodeClaimList.Items, func(n karpv1.NodeClaim, _ int) (string, bool) {
return n.Status.ProviderID, n.Status.ProviderID != ""
})...)
errs := make([]error, len(retrieved))

errs := make([]error, len(managedRetrieved))

workqueue.ParallelizeUntil(ctx, 100, len(managedRetrieved), func(i int) {
if !resolvedProviderIDs.Has(managedRetrieved[i].Status.ProviderID) &&
time.Since(managedRetrieved[i].CreationTimestamp.Time) > time.Minute*5 {
errs[i] = c.garbageCollect(ctx, managedRetrieved[i], nodeList)
vm := managedRetrieved[i]
if !resolvedProviderIDs.Has(vm.Status.ProviderID) &&
time.Since(vm.CreationTimestamp.Time) > 5*time.Minute {
errs[i] = c.garbageCollect(ctx, vm, nodeList)
}
})
if err = multierr.Combine(errs...); err != nil {
return reconcile.Result{}, err

if combinedErr := multierr.Combine(errs...); combinedErr != nil {
return combinedErr
}
c.successfulCount++
return reconcile.Result{RequeueAfter: lo.Ternary(c.successfulCount <= 20, time.Second*10, time.Minute*2)}, nil

return nil
}

// gcNics handles the garbage collection of network interfaces.
func (c *Controller) gcNics(ctx context.Context) error {
// Refresh the list of NodeClaims after VM GC
nodeClaimList := &karpv1.NodeClaimList{}
if err := c.kubeClient.List(ctx, nodeClaimList); err != nil {
return fmt.Errorf("listing NodeClaims for NIC GC: %w", err)
}

// Normalize NodeClaim names to match NIC naming conventions
nodeClaimNames := sets.New[string]()
for _, nodeClaim := range nodeClaimList.Items {
// Adjust the prefix as per the aks naming convention
nodeClaimNames.Insert(fmt.Sprintf("aks-%s", nodeClaim.Name))
}

// List all NICs from the instance provider, this List call will give us network interfaces that belong to karpenter
nics, err := c.instanceProvider.ListNics(ctx)
if err != nil {
return fmt.Errorf("listing NICs: %w", err)
}

// Initialize a slice to collect errors from goroutines
var gcErrors []error
var mu sync.Mutex

// Parallelize the garbage collection process for NICs
workqueue.ParallelizeUntil(ctx, 100, len(nics), func(i int) {
nicName := lo.FromPtr(nics[i].Name)
if !nodeClaimNames.Has(nicName) {
if err := c.instanceProvider.Delete(ctx, nicName); err != nil {
mu.Lock()
gcErrors = append(gcErrors, fmt.Errorf("deleting NIC %s: %w", nicName, err))
mu.Unlock()
}
}
})

// Combine all errors into one
if len(gcErrors) > 0 {
return multierr.Combine(gcErrors...)
}

return nil
}

func (c *Controller) garbageCollect(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeList *v1.NodeList) error {
Expand Down
4 changes: 1 addition & 3 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (

var (
NodePoolTagKey = strings.ReplaceAll(karpv1.NodePoolLabelKey, "/", "_")
listQuery string

CapacityTypeToPriority = map[string]string{
karpv1.CapacityTypeSpot: string(compute.Spot),
Expand Down Expand Up @@ -84,7 +83,7 @@ type Provider interface {
// CreateTags(context.Context, string, map[string]string) error
Update(context.Context, string, armcompute.VirtualMachineUpdate) error
GetNic(context.Context, string, string) (*armnetwork.Interface, error)
ListNics(context.Context) []*armnetwork.Interface
ListNics(context.Context) ([]*armnetwork.Interface, error)
}

// assert that DefaultProvider implements Provider interface
Expand Down Expand Up @@ -113,7 +112,6 @@ func NewDefaultProvider(
subscriptionID string,
provisionMode string,
) *DefaultProvider {
listQuery = GetListQueryBuilder(resourceGroup).String()
return &DefaultProvider{
azClient: azClient,
instanceTypeProvider: instanceTypeProvider,
Expand Down

0 comments on commit 7fbc86a

Please sign in to comment.