Skip to content

Commit

Permalink
Skip named nodes that don't exist
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentportella committed Jul 19, 2024
1 parent 78c3d43 commit 4898f24
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/cyclenoderequest/transitioner/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (t *CycleNodeRequestTransitioner) performHealthCheck(node v1.CycleNodeReque

// performInitialHealthChecks on the nodes selected to be terminated before cycling begin. If any health
// check fails return an error to prevent cycling from starting
func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []corev1.Node) error {
func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes map[string]corev1.Node) error {
// Build a set of ready nodes from which to check below
readyNodesSet := make(map[string]v1.CycleNodeRequestNode)

Expand Down Expand Up @@ -241,7 +241,7 @@ func (t *CycleNodeRequestTransitioner) performInitialHealthChecks(kubeNodes []co

// performCyclingHealthChecks before terminating an instance selected for termination. Cycling pauses
// until all health checks pass for the new instance before terminating the old one
func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes []corev1.Node) (bool, error) {
func (t *CycleNodeRequestTransitioner) performCyclingHealthChecks(kubeNodes map[string]corev1.Node) (bool, error) {
var allHealthChecksPassed bool = true

// Find new instsances attached to the nodegroup and perform health checks on them
Expand Down
87 changes: 48 additions & 39 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (nodes []corev1.Node, err error) {
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
selector, err := t.cycleNodeRequest.NodeLabelSelector()
if err != nil {
return nodes, err
}

nodeList, err := t.rm.ListNodes(selector)
if err != nil {
return nodes, err
Expand All @@ -30,14 +33,16 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (n
continue
}
}

// Only add "Ready" nodes
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes = append(nodes, node)
nodes[node.Spec.ProviderID] = node
break
}
}
}

return nodes, nil
}

Expand All @@ -56,29 +61,34 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node
}

for _, kubeNode := range kubeNodes {
// Skip nodes that are already being worked on so we don't duplicate our work
if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name {
numNodesInProgress++
}
}

for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate {
kubeNode, found := kubeNodes[nodeToTerminate.ProviderID]

if !found {
continue
}

for _, nodeToTerminate := range t.cycleNodeRequest.Status.NodesToTerminate {
// Add nodes that need to be terminated but have not yet been actioned
if kubeNode.Name == nodeToTerminate.Name && kubeNode.Spec.ProviderID == nodeToTerminate.ProviderID {
nodes = append(nodes, &kubeNode)
// Skip nodes that are already being worked on so we don't duplicate our work
if value, ok := kubeNode.Labels[cycleNodeLabel]; ok && value == t.cycleNodeRequest.Name {
continue
}

for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ {
if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name {
// Remove nodes from available if they are also scheduled for termination
// Slice syntax removes this node at `i` from the array
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable[:i],
t.cycleNodeRequest.Status.NodesAvailable[i+1:]...,
)
// Add nodes that need to be terminated but have not yet been actioned
nodes = append(nodes, &kubeNode)

break
}
}
for i := 0; i < len(t.cycleNodeRequest.Status.NodesAvailable); i++ {
if kubeNode.Name == t.cycleNodeRequest.Status.NodesAvailable[i].Name {
// Remove nodes from available if they are also scheduled for termination
// Slice syntax removes this node at `i` from the array
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable[:i],
t.cycleNodeRequest.Status.NodesAvailable[i+1:]...,
)

break
}
Expand All @@ -94,33 +104,32 @@ func (t *CycleNodeRequestTransitioner) getNodesToTerminate(numNodes int64) (node
}

// addNamedNodesToTerminate adds the named nodes for this CycleNodeRequest to the list of nodes to terminate.
// Returns an error if any named node does not exist in the node group for this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes []corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) error {
for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames {
foundNode := false
for _, kubeNode := range kubeNodes {
if kubeNode.Name == namedNode {
foundNode = true
// Skips any named node that does not exist in the node group for this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[string]corev1.Node, nodeGroupInstances map[string]cloudprovider.Instance) {
kubeNodesMap := make(map[string]corev1.Node)

t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
for _, node := range kubeNodes {
kubeNodesMap[node.Name] = node
}

t.cycleNodeRequest.Status.NodesToTerminate = append(
t.cycleNodeRequest.Status.NodesToTerminate,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
for _, namedNode := range t.cycleNodeRequest.Spec.NodeNames {
kubeNode, found := kubeNodesMap[namedNode]

break
}
if !found {
t.rm.Logger.Info("could not find node by name, skipping", "nodeName", namedNode)
continue
}

if !foundNode {
return fmt.Errorf("could not find node by name: %v", namedNode)
}
t.cycleNodeRequest.Status.NodesAvailable = append(
t.cycleNodeRequest.Status.NodesAvailable,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)

t.cycleNodeRequest.Status.NodesToTerminate = append(
t.cycleNodeRequest.Status.NodesToTerminate,
newCycleNodeRequestNode(&kubeNode, nodeGroupInstances[kubeNode.Spec.ProviderID].NodeGroupName()),
)
}
return nil
}

// newCycleNodeRequestNode converts a corev1.Node to a v1.CycleNodeRequestNode. This is done multiple
Expand Down
45 changes: 24 additions & 21 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

kubeNodes, err := t.listReadyNodes(true)
if err != nil {
return t.transitionToHealing(err)
}
if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no nodes matched selector"))
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string
Expand All @@ -83,48 +81,49 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if err != nil {
return t.transitionToHealing(errors.Wrap(err, "failed to check instances that exist from cloud provider"))
}
var existingKubeNodes []corev1.Node

for _, node := range kubeNodes {
for _, validProviderID := range existingProviderIDs {
if node.Spec.ProviderID == validProviderID {
existingKubeNodes = append(existingKubeNodes, node)
break
}
existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no existing nodes in cloud provider matched selector"))
}

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", t.cycleNodeRequest.GetNodeGroupNames())

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames())
if err != nil {
return t.transitionToHealing(err)
}

// get instances inside cloud provider node groups
nodeGroupInstances := nodeGroups.Instances()

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(kubeNodes) != len(nodeGroupInstances) {
nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)
var offendingNodesInfo string

nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)

if len(nodesNotInCPNodeGroup) > 0 {
offendingNodesInfo += "nodes not in node group: "
offendingNodesInfo += strings.Join(nodesNotInCPNodeGroup, ",")
}

if len(nodesNotInKube) > 0 {
if offendingNodesInfo != "" {
offendingNodesInfo += ";"
}

offendingNodesInfo += "nodes not inside cluster: "
offendingNodesInfo += strings.Join(nodesNotInKube, ",")
}

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(nodeGroupInstances), len(kubeNodes), offendingNodesInfo)
Expand All @@ -134,26 +133,28 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if err != nil {
return t.transitionToHealing(err)
}

if timedOut {
err := fmt.Errorf(
"node count mismatch, number of kubernetes of nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit)
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
)

return t.transitionToHealing(err)
}

return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

// make a list of the nodes to terminate
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding named nodes to NodesToTerminate")
err := t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}
t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
} else {
// Otherwise just add all the nodes in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding all node group nodes to NodesToTerminate")

for _, kubeNode := range kubeNodes {
// Check to ensure the kubeNode object maps to an existing node in the ASG
// If this isn't the case, this is a phantom node. Fail the cnr to be safe.
Expand Down Expand Up @@ -205,7 +206,9 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result
// The maximum nodes we can select are bounded by our concurrency. We take into account the number
// of nodes we are already working on, and only introduce up to our concurrency cap more nodes in this step.
maxNodesToSelect := t.cycleNodeRequest.Spec.CycleSettings.Concurrency - t.cycleNodeRequest.Status.ActiveChildren

t.rm.Logger.Info("Selecting nodes to terminate", "numNodes", maxNodesToSelect)

nodes, numNodesInProgress, err := t.getNodesToTerminate(maxNodesToSelect)
if err != nil {
return t.transitionToHealing(err)
Expand Down
22 changes: 14 additions & 8 deletions pkg/controller/cyclenoderequest/transitioner/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num
transition, err := t.transitionObject(v1.CycleNodeRequestWaitingTermination)
return true, transition, err
}

// otherwise, we have finished everything, so transition to Successful
transition, err := t.transitionToSuccessful()
return true, transition, err
Expand All @@ -252,21 +253,26 @@ func (t *CycleNodeRequestTransitioner) checkIfTransitioning(numNodesToCycle, num

// findOffendingNodes finds the offending nodes information which cause number of nodes mismatch between
// cloud provider node group and nodes inside kubernetes cluster using label selector
func findOffendingNodes(kubeNodes []corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) {
kubeNodesMap := make(map[string]corev1.Node)
func findOffendingNodes(kubeNodes map[string]corev1.Node, cloudProviderNodes map[string]cloudprovider.Instance) ([]string, []string) {
var nodesNotInCPNodeGroup []string
var nodesNotInKube []string

for _, kubeNode := range kubeNodes {
kubeNodesMap[kubeNode.Spec.ProviderID] = kubeNode
if _, ok := cloudProviderNodes[kubeNode.Spec.ProviderID]; !ok {
nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup, fmt.Sprintf("id %q", kubeNode.Spec.ProviderID))
nodesNotInCPNodeGroup = append(nodesNotInCPNodeGroup,
fmt.Sprintf("id %q", kubeNode.Spec.ProviderID),
)
}
}

for cpNode := range cloudProviderNodes {
if _, ok := kubeNodesMap[cpNode]; !ok {
nodesNotInKube = append(nodesNotInKube, fmt.Sprintf("id %q in %q",
cpNode,
cloudProviderNodes[cpNode].NodeGroupName()))
if _, ok := kubeNodes[cpNode]; !ok {
nodesNotInKube = append(nodesNotInKube,
fmt.Sprintf("id %q in %q",
cpNode,
cloudProviderNodes[cpNode].NodeGroupName(),
),
)
}
}

Expand Down

0 comments on commit 4898f24

Please sign in to comment.