Skip to content

Commit

Permalink
Fix node state in the Pending phase
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentportella committed Aug 22, 2024
1 parent 65b577a commit 00de0bb
Show file tree
Hide file tree
Showing 13 changed files with 672 additions and 171 deletions.
17 changes: 11 additions & 6 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func (p *provider) GetNodeGroups(names []string) (cloudprovider.NodeGroups, erro
}

// InstancesExist returns a list of the instances that exist
func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []string, err error) {
instanceIDSet := map[string]string{}
instanceIDs := []string{}
func (p *provider) InstancesExist(providerIDs []string) (map[string]interface{}, error) {
validProviderIDs := make(map[string]interface{})
instanceIDSet := make(map[string]string)
instanceIDs := make([]string, 0)

for _, providerID := range providerIDs {
instanceID, err := providerIDToInstanceID(providerID)
Expand All @@ -140,8 +141,12 @@ func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []stri

for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
if *instance.State.Name == ec2.InstanceStateNameTerminated {
continue
}

if providerID, ok := instanceIDSet[aws.StringValue(instance.InstanceId)]; ok {
validProviderIDs = append(validProviderIDs, providerID)
validProviderIDs[providerID] = nil
}
}
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func (a *autoscalingGroups) ReadyInstances() map[string]cloudprovider.Instance {
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
continue
}
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), *i.AvailabilityZone)
Expand All @@ -214,7 +219,7 @@ func (a *autoscalingGroups) NotReadyInstances() map[string]cloudprovider.Instanc
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), aws.StringValue(i.AvailabilityZone))
if err != nil {
a.logger.Info("[NotReadyInstances] skip instance which failed instanceID to providerID conversion: %v", aws.StringValue(i.InstanceId))
Expand Down
29 changes: 28 additions & 1 deletion pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func generateAutoscalingInstance(instance *Instance) *autoscaling.Instance {
autoscalingInstance := &autoscaling.Instance{
InstanceId: aws.String(instance.InstanceID),
AvailabilityZone: aws.String(defaultAvailabilityZone),
LifecycleState: aws.String(autoscaling.LifecycleStateInService),
}

return autoscalingInstance
Expand All @@ -76,7 +77,13 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
continue
}

if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists {
if instance.State != ec2.InstanceStateNameRunning {
continue
}

// Ensure to continue if the ASG name matching one of the ones from the
// input. If the input is empty then match all ASGs
if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists && len(asgNameLookup) > 0 {
continue
}

Expand Down Expand Up @@ -121,6 +128,16 @@ func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (
return &autoscaling.AttachInstancesOutput{}, nil
}

func (m *Autoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = ""
}
}

return &autoscaling.DetachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down Expand Up @@ -150,3 +167,13 @@ func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Describ
},
}, nil
}

func (m *Ec2) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.State = ec2.InstanceStateNameTerminated
}
}

return &ec2.TerminateInstancesOutput{}, nil
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cloudprovider
// CloudProvider provides an interface to interact with a cloud provider, e.g. AWS, GCP etc.
type CloudProvider interface {
Name() string
InstancesExist([]string) ([]string, error)
InstancesExist([]string) (map[string]interface{}, error)
GetNodeGroups([]string) (NodeGroups, error)
TerminateInstance(string) error
}
Expand Down
62 changes: 27 additions & 35 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"github.com/atlassian-labs/cyclops/pkg/cloudprovider"
)

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
// listNodes lists nodes matching the node selector. By default lists nodes that have also
// not been touched by Cyclops. A label is used to determine whether nodes have been touched
// by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
Expand All @@ -36,13 +35,32 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (m
}
}

// Only add "Ready" nodes
nodes[node.Spec.ProviderID] = node
}

return nodes, nil
}

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes, err := t.listNodes(includeInProgress)
if err != nil {
return nil, err
}

for providerID, node := range nodes {
nodeReady := false

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes[node.Spec.ProviderID] = node
break
nodeReady = true
}
}

if !nodeReady {
delete(nodes, providerID)
}
}

return nodes, nil
Expand Down Expand Up @@ -146,7 +164,7 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
kubeNodes, err = t.listNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}
Expand All @@ -155,32 +173,6 @@ func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[str
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/cyclenoderequest/transitioner/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type Option func(t *Transitioner)

func WithCloudProviderInstances(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.cloudProviderInstances = append(t.cloudProviderInstances, nodes...)
t.CloudProviderInstances = append(t.CloudProviderInstances, nodes...)
}
}

func WithKubeNodes(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.kubeNodes = append(t.kubeNodes, nodes...)
t.KubeNodes = append(t.KubeNodes, nodes...)
}
}

Expand All @@ -28,23 +28,23 @@ type Transitioner struct {
*CycleNodeRequestTransitioner
*mock.Client

cloudProviderInstances []*mock.Node
kubeNodes []*mock.Node
CloudProviderInstances []*mock.Node
KubeNodes []*mock.Node
}

func NewFakeTransitioner(cnr *v1.CycleNodeRequest, opts ...Option) *Transitioner {
t := &Transitioner{
// By default there are no nodes and each test will
// override these as needed
cloudProviderInstances: make([]*mock.Node, 0),
kubeNodes: make([]*mock.Node, 0),
CloudProviderInstances: make([]*mock.Node, 0),
KubeNodes: make([]*mock.Node, 0),
}

for _, opt := range opts {
opt(t)
}

t.Client = mock.NewClient(t.kubeNodes, t.cloudProviderInstances, cnr)
t.Client = mock.NewClient(t.KubeNodes, t.CloudProviderInstances, cnr)

rm := &controller.ResourceManager{
Client: t.K8sClient,
Expand Down
104 changes: 46 additions & 58 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,13 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Start the equilibrium wait timer, if this times out as the set of nodes in kube and
// the cloud provider is not considered valid, then transition to the Healing phase as
// cycling should not proceed.
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
// Start the equilibrium wait timer, if this times out then the set of nodes in kube and
// the cloud provider is not considered valid. Transition to the Healing phase as cycling
// should not proceed.
if err := t.errorIfEquilibriumTimeoutReached(); err != nil {
return t.transitionToHealing(err)
}

if timedOut {
return t.transitionToHealing(fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
))
}

// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

Expand All @@ -95,50 +87,47 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
// The nodes in the cloud provider can either not exist or be detached from one of the nodegroups
// and this will be determined when dealt with. This is an XOR condition on the two initial sets
// of nodes.
nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 {
var offendingNodesInfo string

if len(nodesNotInCloudProvider) > 0 {
providerIDs := make([]string, 0)

for providerID := range nodesNotInCloudProvider {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q", providerID),
)
}

offendingNodesInfo += "nodes not in node group: "
offendingNodesInfo += strings.Join(providerIDs, ",")
nodesNotInCloudProviderNodegroup, instancesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// If the node state isn't correct then go through and attempt to fix it. The steps in this block
// attempt to fix the node state and then requeues the Pending phase to re-check. It is very
// possible that the node state changes during the steps and it cannot be fixed. Hopefully after
// a few runs the state can be fixed.
if len(nodesNotInCloudProviderNodegroup) > 0 || len(instancesNotInKube) > 0 {
t.logProblemNodes(nodesNotInCloudProviderNodegroup, instancesNotInKube)

// Try to fix the case where there are 1 or more instances matching the node selector for the
// nodegroup in kube but are not attached to the nodegroup in the cloud provider by
// re-attaching them.
if err := t.reattachAnyDetachedInstances(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

if len(nodesNotInKube) > 0 {
if offendingNodesInfo != "" {
offendingNodesInfo += ";"
}

providerIDs := make([]string, 0)

for providerID, node := range nodesNotInKube {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q in %q", providerID, node.NodeGroupName()),
)
}

offendingNodesInfo += "nodes not inside cluster: "
offendingNodesInfo += strings.Join(providerIDs, ",")
// Try to fix the case where there are 1 or more kube node objects without any matching
// running instances in the cloud provider. This could be because of the finalizer that
// was added during a previous failed cycle.
if err := t.deleteAnyOrphanedKubeNodes(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo)
// After working through these attempts, requeue to run through the Pending phase from the
// beginning to check the full state of nodes again. If there are any problem nodes we should
// not proceed and keep requeuing until the state is fixed or the timeout has been reached.
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

valid, err := t.validateInstanceState(validNodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}

if !valid {
t.rm.Logger.Info("instance state not valid, requeuing")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

t.rm.Logger.Info("instance state valid, proceeding")

// make a list of the nodes to terminate
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
Expand Down Expand Up @@ -281,6 +270,14 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result
return t.transitionToHealing(err)
}

t.rm.Logger.Info("Adding annotation to node", "node", node.Name)

// Add the nodegroup annotation to the node before detaching it
if err := t.rm.AddNodegroupAnnotationToNode(node.Name, node.NodeGroupName); err != nil {
t.rm.LogEvent(t.cycleNodeRequest, "AddAnnotationToNodeError", err.Error())
return t.transitionToHealing(err)
}

alreadyDetaching, err := nodeGroups.DetachInstance(node.ProviderID)

if alreadyDetaching {
Expand Down Expand Up @@ -345,16 +342,7 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Increase the kubeNode count requirement by the number of nodes which are observed to have been removed prematurely
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
var instanceFound bool = false

for _, kubeNode := range kubeNodes {
if node.Name == kubeNode.Name {
instanceFound = true
break
}
}

if !instanceFound {
if _, instanceFound := kubeNodes[node.ProviderID]; !instanceFound {
nodesToRemove = append(nodesToRemove, node)
numKubeNodesReady++
}
Expand Down
Loading

0 comments on commit 00de0bb

Please sign in to comment.