Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Light Pending phase refactor help with future work to deal with problem nodes #86

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
}, nil
}

func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = *input.AutoScalingGroupName
}
}

return &autoscaling.AttachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
Expand Down Expand Up @@ -140,6 +142,62 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
return nil
}

// Find all the nodes in kube and the cloud provider that match the node selector and nodegroups
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", nodeGroupNames)

if len(nodeGroupNames) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("must have at least one nodegroup name defined")
}

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(nodeGroupNames)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}

return kubeNodes, nodeGroups.Instances(), nil
}

// newCycleNodeRequestNode converts a corev1.Node to a v1.CycleNodeRequestNode. This is done multiple
// times across the code, this function standardises the process
func newCycleNodeRequestNode(kubeNode *corev1.Node, nodeGroupName string) v1.CycleNodeRequestNode {
Expand Down
112 changes: 41 additions & 71 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -62,72 +61,51 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

kubeNodes, err := t.listReadyNodes(true)
// Start the equilibrium wait timer, if this times out as the set of nodes in kube and
// the cloud provider is not considered valid, then transition to the Healing phase as
// cycling should not proceed.
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
return t.transitionToHealing(err)
}

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no nodes matched selector"))
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return t.transitionToHealing(errors.Wrap(err, "failed to check instances that exist from cloud provider"))
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no existing nodes in cloud provider matched selector"))
if timedOut {
return t.transitionToHealing(fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
))
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", nodeGroupNames)

if len(nodeGroupNames) == 0 {
return t.transitionToHealing(fmt.Errorf("must have at least one nodegroup name defined"))
}
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(nodeGroupNames)
// Find all the nodes in kube and the cloud provider nodegroups selected by the CNR. These
// should be all the nodes in each, regardless of it they exist in both.
kubeNodes, nodeGroupInstances, err := t.findAllNodesForCycle()
if err != nil {
return t.transitionToHealing(err)
}

// get instances inside cloud provider node groups
nodeGroupInstances := nodeGroups.Instances()
// Find all the nodes nodes that exist in both kube and the cloud provider nodegroups. This is
// the valid set of nodes and can be worked on. This is an AND condition on the two initial
// sets of nodes.
validKubeNodes, validNodeGroupInstances := findValidNodes(kubeNodes, nodeGroupInstances)

// Find all the nodes that exist in either kube or the cloud provider nodegroups, but not both.
// The nodes in the cloud provider can either not exist or be detached from one of the nodegroups
// and this will be determined when dealt with. This is an XOR condition on the two initial sets
// of nodes.
nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(kubeNodes) != len(nodeGroupInstances) {
if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 {
var offendingNodesInfo string

nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)

if len(nodesNotInCPNodeGroup) > 0 {
if len(nodesNotInCloudProvider) > 0 {
providerIDs := make([]string, 0)

for providerID := range nodesNotInCPNodeGroup {
for providerID := range nodesNotInCloudProvider {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q", providerID),
)
Expand Down Expand Up @@ -156,22 +134,7 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(nodeGroupInstances), len(kubeNodes), offendingNodesInfo)

// If it doesn't, then retry for a while in case something just scaled the node group
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
return t.transitionToHealing(err)
}

if timedOut {
err := fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
)

return t.transitionToHealing(err)
}
len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo)

return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}
Expand All @@ -180,18 +143,18 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding named nodes to NodesToTerminate")
err := t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
err := t.addNamedNodesToTerminate(validKubeNodes, validNodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}
} else {
// Otherwise just add all the nodes in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding all node group nodes to NodesToTerminate")

for _, kubeNode := range kubeNodes {
for _, kubeNode := range validKubeNodes {
// Check to ensure the kubeNode object maps to an existing node in the ASG
// If this isn't the case, this is a phantom node. Fail the cnr to be safe.
nodeGroupName, ok := nodeGroupInstances[kubeNode.Spec.ProviderID]
nodeGroupName, ok := validNodeGroupInstances[kubeNode.Spec.ProviderID]
if !ok {
return t.transitionToHealing(fmt.Errorf("kubeNode %s not found in the list of instances in the ASG", kubeNode.Name))
}
Expand All @@ -209,7 +172,7 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
}

if len(t.cycleNodeRequest.Spec.HealthChecks) > 0 {
if err = t.performInitialHealthChecks(kubeNodes); err != nil {
if err = t.performInitialHealthChecks(validKubeNodes); err != nil {
return t.transitionToHealing(err)
}
}
Expand Down Expand Up @@ -595,9 +558,16 @@ func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, er

// un-cordon after attach as well
t.rm.LogEvent(t.cycleNodeRequest, "UncordoningNodes", "Uncordoning nodes in node group: %v", node.Name)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return k8s.UncordonNode(node.Name, t.rm.RawClient)
}); err != nil {
})

if apierrors.IsNotFound(err) {
continue
}

if err != nil {
return t.transitionToFailed(err)
}
}
Expand Down
Loading
Loading