From 4898f242278e73ee387d084ebbdb6c3a12601556 Mon Sep 17 00:00:00 2001 From: vportella Date: Fri, 19 Jul 2024 15:25:06 +1000 Subject: [PATCH] Skip named nodes that don't exist --- .../cyclenoderequest/transitioner/checks.go | 4 +- .../cyclenoderequest/transitioner/node.go | 87 ++++++++++--------- .../transitioner/transitions.go | 45 +++++----- .../cyclenoderequest/transitioner/util.go | 22 +++-- 4 files changed, 88 insertions(+), 70 deletions(-) diff --git a/pkg/controller/cyclenoderequest/transitioner/checks.go b/pkg/controller/cyclenoderequest/transitioner/checks.go index e2e303a..a4df2bc 100644 --- a/pkg/controller/cyclenoderequest/transitioner/checks.go +++ b/pkg/controller/cyclenoderequest/transitioner/checks.go @@ -189,7 +189,7 @@ func (t *CycleNodeRequestTransitioner) performHealthCheck(node v1.CycleNodeReque // performInitialHealthChecks on the nodes selected to be terminated before cycling begin. If any health // check fails return an error to prevent cycling from starting -func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []corev1.Node) error { +func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes map[string]corev1.Node) error { // Build a set of ready nodes from which to check below readyNodesSet := make(map[string]v1.CycleNodeRequestNode) @@ -241,7 +241,7 @@ func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []co // performCyclingHealthChecks before terminating an instance selected for termination. Cycling pauses // until all health checks pass for the new instance before terminating the old one -func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes []corev1.Node) (bool, error) { +func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes map[string]corev1.Node) (bool, error) { var allHealthChecksPassed bool = true // Find new instsances attached to the nodegroup and perform health checks on them diff --git a/pkg/controller/cyclenoderequest/transitioner/node.go b/pkg/controller/cyclenoderequest/transitioner/node.go index 046148a..b22f119 100644 --- a/pkg/controller/cyclenoderequest/transitioner/node.go +++ b/pkg/controller/cyclenoderequest/transitioner/node.go @@ -11,12 +11,15 @@ import ( // listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops. // A label is used to determine whether nodes have been touched by this CycleNodeRequest. -func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (nodes []corev1.Node, err error) { +func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) { + nodes := make(map[string]corev1.Node) + // Get the nodes selector, err := t.cycleNodeRequest.NodeLabelSelector() if err != nil { return nodes, err } + nodeList, err := t.rm.ListNodes(selector) if err != nil { return nodes, err @@ -30,14 +33,16 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (n continue } } + // Only add "Ready" nodes for _, cond := range node.Status.Conditions { if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { - nodes = append(nodes, node) + nodes[node.Spec.ProviderID] = node break } } } + return nodes, nil } @@ -56,29 +61,34 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node } for _, kubeNode := range kubeNodes { - // Skip nodes that are already being worked on so we don't duplicate our work if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name { numNodesInProgress++ + } + } + + for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate { + kubeNode, found := kubeNodes[nodeToTerminate.ProviderID] + + if !found { continue } - for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate { - // Add nodes that need to be terminated but have not yet been actioned - if kubeNode.Name == nodeToTerminate.Name && kubeNode.Spec.ProviderID == nodeToTerminate.ProviderID { - nodes = append(nodes, &kubeNode) + // Skip nodes that are already being worked on so we don't duplicate our work + if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name { + continue + } - for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ { - if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name { - // Remove nodes from available if they are also scheduled for termination - // Slice syntax removes this node at `i` from the array - t.cycleNodeRequest.Status.NodesAvailable = append( - t.cycleNodeRequest.Status.NodesAvailable[:i], - t.cycleNodeRequest.Status.NodesAvailable[i+1:]..., - ) + // Add nodes that need to be terminated but have not yet been actioned + nodes = append(nodes, &kubeNode) - break - } - } + for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ { + if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name { + // Remove nodes from available if they are also scheduled for termination + // Slice syntax removes this node at `i` from the array + t.cycleNodeRequest.Status.NodesAvailable = append( + t.cycleNodeRequest.Status.NodesAvailable[:i], + t.cycleNodeRequest.Status.NodesAvailable[i+1:]..., + ) break } @@ -94,33 +104,32 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node } // addNamedNodesToTerminate adds the named nodes for this CycleNodeRequest to the list of nodes to terminate. -// Returns an error if any named node does not exist in the node group for this CycleNodeRequest. -func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes []corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) error { - for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames { - foundNode := false - for _, kubeNode := range kubeNodes { - if kubeNode.Name == namedNode { - foundNode = true +// Skips any named node that does not exist in the node group for this CycleNodeRequest. +func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[string]corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) { + kubeNodesMap := make(map[string]corev1.Node) - t.cycleNodeRequest.Status.NodesAvailable = append( - t.cycleNodeRequest.Status.NodesAvailable, - newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()), - ) + for _, node := range kubeNodes { + kubeNodesMap[node.Name] = node + } - t.cycleNodeRequest.Status.NodesToTerminate = append( - t.cycleNodeRequest.Status.NodesToTerminate, - newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()), - ) + for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames { + kubeNode, found := kubeNodesMap[namedNode] - break - } + if !found { + t.rm.Logger.Info("could not find node by name, skipping", "nodeName", namedNode) + continue } - if !foundNode { - return fmt.Errorf("could not find node by name: %v", namedNode) - } + t.cycleNodeRequest.Status.NodesAvailable = append( + t.cycleNodeRequest.Status.NodesAvailable, + newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()), + ) + + t.cycleNodeRequest.Status.NodesToTerminate = append( + t.cycleNodeRequest.Status.NodesToTerminate, + newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()), + ) } - return nil } // newCycleNodeRequestNode converts a corev1.Node to a v1.CycleNodeRequestNode. This is done multiple diff --git a/pkg/controller/cyclenoderequest/transitioner/transitions.go b/pkg/controller/cyclenoderequest/transitioner/transitions.go index f56a455..3ec6af6 100644 --- a/pkg/controller/cyclenoderequest/transitioner/transitions.go +++ b/pkg/controller/cyclenoderequest/transitioner/transitions.go @@ -64,13 +64,11 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result, func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) { // Fetch the node names for the cycleNodeRequest, using the label selector provided t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector") + kubeNodes, err := t.listReadyNodes(true) if err != nil { return t.transitionToHealing(err) } - if len(kubeNodes) == 0 { - return t.transitionToHealing(fmt.Errorf("no nodes matched selector")) - } // Only retain nodes which still exist inside cloud provider var nodeProviderIDs []string @@ -83,25 +81,20 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er if err != nil { return t.transitionToHealing(errors.Wrap(err, "failed to check instances that exist from cloud provider")) } - var existingKubeNodes []corev1.Node - for _, node := range kubeNodes { - for _, validProviderID := range existingProviderIDs { - if node.Spec.ProviderID == validProviderID { - existingKubeNodes = append(existingKubeNodes, node) - break - } + existingKubeNodes := make(map[string]corev1.Node) + + for _, validProviderID := range existingProviderIDs { + if node, found := kubeNodes[validProviderID]; found { + existingKubeNodes[node.Spec.ProviderID] = node } } kubeNodes = existingKubeNodes - if len(kubeNodes) == 0 { - return t.transitionToHealing(fmt.Errorf("no existing nodes in cloud provider matched selector")) - } - // Describe the node group for the request t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", t.cycleNodeRequest.GetNodeGroupNames()) + nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames()) if err != nil { return t.transitionToHealing(err) @@ -109,22 +102,28 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er // get instances inside cloud provider node groups nodeGroupInstances := nodeGroups.Instances() + // Do some sanity checking before we start filtering things // Check the instance count of the node group matches the number of nodes found in Kubernetes if len(kubeNodes) != len(nodeGroupInstances) { - nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances) var offendingNodesInfo string + + nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances) + if len(nodesNotInCPNodeGroup) > 0 { offendingNodesInfo += "nodes not in node group: " offendingNodesInfo += strings.Join(nodesNotInCPNodeGroup, ",") } + if len(nodesNotInKube) > 0 { if offendingNodesInfo != "" { offendingNodesInfo += ";" } + offendingNodesInfo += "nodes not inside cluster: " offendingNodesInfo += strings.Join(nodesNotInKube, ",") } + t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch", "node group: %v, kube: %v. %v", len(nodeGroupInstances), len(kubeNodes), offendingNodesInfo) @@ -134,12 +133,16 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er if err != nil { return t.transitionToHealing(err) } + if timedOut { err := fmt.Errorf( - "node count mismatch, number of kubernetes of nodes does not match number of cloud provider instances after %v", - nodeEquilibriumWaitLimit) + "node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v", + nodeEquilibriumWaitLimit, + ) + return t.transitionToHealing(err) } + return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil } @@ -147,13 +150,11 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er if len(t.cycleNodeRequest.Spec.NodeNames) > 0 { // If specific node names are provided, check they actually exist in the node group t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding named nodes to NodesToTerminate") - err := t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances) - if err != nil { - return t.transitionToHealing(err) - } + t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances) } else { // Otherwise just add all the nodes in the node group t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding all node group nodes to NodesToTerminate") + for _, kubeNode := range kubeNodes { // Check to ensure the kubeNode object maps to an existing node in the ASG // If this isn't the case, this is a phantom node. Fail the cnr to be safe. @@ -205,7 +206,9 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result // The maximum nodes we can select are bounded by our concurrency. We take into account the number // of nodes we are already working on, and only introduce up to our concurrency cap more nodes in this step. maxNodesToSelect := t.cycleNodeRequest.Spec.CycleSettings.Concurrency - t.cycleNodeRequest.Status.ActiveChildren + t.rm.Logger.Info("Selecting nodes to terminate", "numNodes", maxNodesToSelect) + nodes, numNodesInProgress, err := t.getNodesToTerminate(maxNodesToSelect) if err != nil { return t.transitionToHealing(err) diff --git a/pkg/controller/cyclenoderequest/transitioner/util.go b/pkg/controller/cyclenoderequest/transitioner/util.go index aef2815..1564f9f 100644 --- a/pkg/controller/cyclenoderequest/transitioner/util.go +++ b/pkg/controller/cyclenoderequest/transitioner/util.go @@ -242,6 +242,7 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num transition, err := t.transitionObject(v1.CycleNodeRequestWaitingTermination) return true, transition, err } + // otherwise, we have finished everything, so transition to Successful transition, err := t.transitionToSuccessful() return true, transition, err @@ -252,21 +253,26 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num // findOffendingNodes finds the offending nodes information which cause number of nodes mismatch between // cloud provider node group and nodes inside kubernetes cluster using label selector -func findOffendingNodes(kubeNodes []corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) { - kubeNodesMap := make(map[string]corev1.Node) +func findOffendingNodes(kubeNodes map[string]corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) { var nodesNotInCPNodeGroup []string var nodesNotInKube []string + for _, kubeNode := range kubeNodes { - kubeNodesMap[kubeNode.Spec.ProviderID] = kubeNode if _, ok := cloudProviderNodes[kubeNode.Spec.ProviderID]; !ok { - nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup, fmt.Sprintf("id %q", kubeNode.Spec.ProviderID)) + nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup, + fmt.Sprintf("id %q", kubeNode.Spec.ProviderID), + ) } } + for cpNode := range cloudProviderNodes { - if _, ok := kubeNodesMap[cpNode]; !ok { - nodesNotInKube = append(nodesNotInKube, fmt.Sprintf("id %q in %q", - cpNode, - cloudProviderNodes[cpNode].NodeGroupName())) + if _, ok := kubeNodes[cpNode]; !ok { + nodesNotInKube = append(nodesNotInKube, + fmt.Sprintf("id %q in %q", + cpNode, + cloudProviderNodes[cpNode].NodeGroupName(), + ), + ) } }