Skip to content

Commit

Permalink
Support scaling down a particular node with graceful termination
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxintong committed Dec 3, 2022
1 parent 6000d68 commit b68ca85
Show file tree
Hide file tree
Showing 15 changed files with 492 additions and 48 deletions.
18 changes: 18 additions & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ this document:
* [How can I see all the events from Cluster Autoscaler?](#how-can-i-see-all-events-from-cluster-autoscaler)
* [How can I scale my cluster to just 1 node?](#how-can-i-scale-my-cluster-to-just-1-node)
* [How can I scale a node group to 0?](#how-can-i-scale-a-node-group-to-0)
* [How can I request Clsuter Autoscaler to scale down a particular node?](#how-can-i-request-clsuter-autoscaler-to-scale-down-a-particular-node)
* [How can I prevent Cluster Autoscaler from scaling down a particular node?](#how-can-i-prevent-cluster-autoscaler-from-scaling-down-a-particular-node)
* [How can I prevent Cluster Autoscaler from scaling down non-empty nodes?](#how-can-i-prevent-cluster-autoscaler-from-scaling-down-non-empty-nodes)
* [How can I modify Cluster Autoscaler reaction time?](#how-can-i-modify-cluster-autoscaler-reaction-time)
Expand Down Expand Up @@ -311,6 +312,23 @@ For example, for a node label of `foo=bar`, you would tag the ASG with:
}
```

### How can I request Clsuter Autoscaler to scale down a particular node?

Starting with CA 1.26.0, nodes will be evicted by CA if it has the annotation requesting scale-down.
* The annotation key is `cluster-autoscaler.kubernetes.io/scale-down-requested`.
* The annotation value is a number representing the max graceful termination seconds for pods hosted on the node.

For example, the annotation can be added to or removed from a node using kubectl:

```
kubectl annotate node <nodename> cluster-autoscaler.kubernetes.io/scale-down-requested=30
kubectl annotate node <nodename> cluster-autoscaler.kubernetes.io/scale-down-requested-
```

If the annotation value is an invalid number, the global max graceful termination seconds will be respected. Please refer to [Does CA respect GracefulTermination in scale-down](#does-ca-respect-gracefultermination-in-scale-down).

Note that CA doesn't support scaling down nodes that are running pods without PDB in kube-system namespace. Please refer to [How to set PDBs to enable CA to move kube-system pods](#how-to-set-pdbs-to-enable-ca-to-move-kube-system-pods).

### How can I prevent Cluster Autoscaler from scaling down a particular node?

From CA 1.0, node will be excluded from scale-down if it has the
Expand Down
29 changes: 26 additions & 3 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package core

import (
"fmt"
"reflect"
"strings"
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
ca_utils "k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -445,13 +447,26 @@ func ScaleUpToNodeGroupMinSize(context *context.AutoscalingContext, processors *
nodeGroups := context.CloudProvider.NodeGroups()
gpuLabel := context.CloudProvider.GPULabel()
availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes()
groupsWithNodes := make(map[cloudprovider.NodeGroup][]*apiv1.Node)
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)

resourcesLeft, err := resourceManager.ResourcesLeft(context, nodeInfos, nodes)
if err != nil {
return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("could not compute total resources: "))
}

for _, node := range nodes {
ng, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil || ng == nil || reflect.ValueOf(ng).IsNil() {
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to get the node group of node %s: %w", node.Name, err)
continue
}
if len(groupsWithNodes[ng]) == 0 {
groupsWithNodes[ng] = make([]*apiv1.Node, 0)
}
groupsWithNodes[ng] = append(groupsWithNodes[ng], node)
}

for _, ng := range nodeGroups {
if !ng.Exist() {
klog.Warningf("ScaleUpToNodeGroupMinSize: NodeGroup %s does not exist", ng.Id())
Expand All @@ -464,8 +479,16 @@ func ScaleUpToNodeGroupMinSize(context *context.AutoscalingContext, processors *
continue
}

klog.V(4).Infof("ScaleUpToNodeGroupMinSize: NodeGroup %s, TargetSize %d, MinSize %d, MaxSize %d", ng.Id(), targetSize, ng.MinSize(), ng.MaxSize())
if targetSize >= ng.MinSize() {
scaleDownRequestedCount := 0
for _, node := range groupsWithNodes[ng] {
if ca_utils.HasScaleDownRequestedAnnotation(node) {
klog.V(4).Infof("ScaleUpToNodeGroupMinSize: node %s in node group %s has scale-down-requested annotation", node, ng.Id())
scaleDownRequestedCount++
}
}

klog.V(4).Infof("ScaleUpToNodeGroupMinSize: NodeGroup %s, TargetSize %d, MinSize %d, MaxSize %d, ScaleDownRequested %d", ng.Id(), targetSize, ng.MinSize(), ng.MaxSize(), scaleDownRequestedCount)
if targetSize >= ng.MinSize()+scaleDownRequestedCount {
continue
}

Expand All @@ -486,7 +509,7 @@ func ScaleUpToNodeGroupMinSize(context *context.AutoscalingContext, processors *
continue
}

newNodeCount := ng.MinSize() - targetSize
newNodeCount := ng.MinSize() + scaleDownRequestedCount - targetSize
newNodeCount, err = resourceManager.ApplyResourcesLimits(context, newNodeCount, resourcesLeft, nodeInfo, ng)
if err != nil {
klog.Warning("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
Expand Down
22 changes: 14 additions & 8 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
ca_util "k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -947,27 +948,31 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
assert.Equal(t, "ng1", nodeGroup)
assert.Equal(t, 1, increase)
return nil
}, nil)
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0},
map[string]int64{cloudprovider.ResourceNameCores: 48, cloudprovider.ResourceNameMemory: 1000},
map[string]int64{cloudprovider.ResourceNameCores: 64, cloudprovider.ResourceNameMemory: 1000},
)
provider.SetResourceLimiter(resourceLimiter)

// Test cases:
// ng1: current size 1, min size 3, cores limit 48, memory limit 1000 => scale up with 1 new node.
// ng2: current size 1, min size 1, cores limit 48, memory limit 1000 => no scale up.
// ng1: current size 1, min size 3, cores limit 64, memory limit 1000 => scale up with 1 new node.
// ng2: current size 1, min size 1, cores limit 64, memory limit 1000 => no scale up.
// ng3: current size 1, min size 1, cores limit 64, memory limit 1000, 1 node scale-down-requested => scale up with 1 new node.
n1 := BuildTestNode("n1", 16000, 32)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 16000, 32)
SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 16000, 32)
SetNodeReadyState(n3, true, time.Now())
n3.Annotations = map[string]string{ca_util.ScaleDownRequestedKey: "30"}
provider.AddNodeGroup("ng1", 3, 10, 1)
provider.AddNode("ng1", n1)
provider.AddNodeGroup("ng2", 1, 10, 1)
provider.AddNode("ng2", n2)
provider.AddNodeGroup("ng3", 1, 10, 1)
provider.AddNode("ng3", n3)

options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
Expand All @@ -977,7 +982,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
nodes := []*apiv1.Node{n1, n2, n3}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
processors := NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
Expand All @@ -987,9 +992,10 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
scaleUpStatus, err := ScaleUpToNodeGroupMinSize(&context, processors, clusterState, resourceManager, nodes, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, 1, len(scaleUpStatus.ScaleUpInfos))
assert.Equal(t, 2, len(scaleUpStatus.ScaleUpInfos))
assert.Equal(t, 2, scaleUpStatus.ScaleUpInfos[0].NewSize)
assert.Equal(t, "ng1", scaleUpStatus.ScaleUpInfos[0].Group.Id())
assert.Equal(t, 2, scaleUpStatus.ScaleUpInfos[1].NewSize)
assert.ElementsMatch(t, []string{"ng1", "ng3"}, []string{scaleUpStatus.ScaleUpInfos[0].Group.Id(), scaleUpStatus.ScaleUpInfos[1].Group.Id()})
}

func TestCheckDeltaWithinLimits(t *testing.T) {
Expand Down
22 changes: 18 additions & 4 deletions cluster-autoscaler/core/scaledown/actuation/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
Expand Down Expand Up @@ -95,14 +96,14 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
for _, pod := range pods {
evictionResults[pod.Name] = status.PodEvictionResult{Pod: pod, TimedOut: true, Err: nil}
go func(podToEvict *apiv1.Pod) {
confirmations <- evictPod(ctx, podToEvict, false, retryUntil, e.EvictionRetryTime, e.evictionRegister)
confirmations <- evictPod(ctx, node, podToEvict, false, retryUntil, e.EvictionRetryTime, e.evictionRegister)
}(pod)
}

// Perform eviction of daemonset. We don't want to raise an error if daemonsetPod wasn't evict properly
for _, daemonSetPod := range daemonSetPods {
go func(podToEvict *apiv1.Pod) {
daemonSetConfirmations <- evictPod(ctx, podToEvict, true, retryUntil, e.EvictionRetryTime, e.evictionRegister)
daemonSetConfirmations <- evictPod(ctx, node, podToEvict, true, retryUntil, e.EvictionRetryTime, e.evictionRegister)
}(daemonSetPod)

}
Expand Down Expand Up @@ -189,7 +190,7 @@ func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *
// Perform eviction of DaemonSet pods
for _, daemonSetPod := range daemonSetPods {
go func(podToEvict *apiv1.Pod) {
dsEviction <- evictPod(ctx, podToEvict, true, timeNow.Add(e.DsEvictionEmptyNodeTimeout), e.DsEvictionRetryTime, e.evictionRegister)
dsEviction <- evictPod(ctx, nodeInfo.Node(), podToEvict, true, timeNow.Add(e.DsEvictionEmptyNodeTimeout), e.DsEvictionRetryTime, e.evictionRegister)
}(daemonSetPod)
}
// Wait for creating eviction of DaemonSet pods
Expand All @@ -212,7 +213,7 @@ func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *
return nil
}

func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonSetPod bool, retryUntil time.Time, waitBetweenRetries time.Duration, evictionRegister evictionRegister) status.PodEvictionResult {
func evictPod(ctx *acontext.AutoscalingContext, node *apiv1.Node, podToEvict *apiv1.Pod, isDaemonSetPod bool, retryUntil time.Time, waitBetweenRetries time.Duration, evictionRegister evictionRegister) status.PodEvictionResult {
ctx.Recorder.Eventf(podToEvict, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")

maxTermination := int64(apiv1.DefaultTerminationGracePeriodSeconds)
Expand All @@ -224,6 +225,19 @@ func evictPod(ctx *acontext.AutoscalingContext, podToEvict *apiv1.Pod, isDaemonS
}
}

if utils.HasScaleDownRequestedAnnotation(node) {
nodeMaxTermination, err := utils.GetNodeMaxGracefulTerminationSec(node)
if err != nil {
klog.Warningf("Failed to get the max graceful termination seconds of node %s, error: %v", node.Name, err)
} else {
if nodeMaxTermination < maxTermination {
klog.V(4).Infof("Overriding max graceful termination seconds of pod %s/%s from %d to %d, because node %s has scale-down-requested annotation",
podToEvict.Namespace, podToEvict.Name, maxTermination, nodeMaxTermination, node.Name)
maxTermination = nodeMaxTermination
}
}
}

var lastError error
for first := true; first || time.Now().Before(retryUntil); time.Sleep(waitBetweenRetries) {
first = false
Expand Down
41 changes: 41 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
ca_util "k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -236,6 +237,46 @@ func TestDrainNodeWithPods(t *testing.T) {
assert.Equal(t, p2.Name, deleted[2])
}

func TestDrainNodeWithMaxGracefulTerminationSecOverride(t *testing.T) {
deletedPods := make(chan string, 10)
fakeClient := &fake.Clientset{}

p1 := BuildTestPod("p1", 100, 0)
n1 := BuildTestNode("n1", 1000, 1000)
n1.Annotations = map[string]string{ca_util.ScaleDownRequestedKey: "5"}
SetNodeReadyState(n1, true, time.Time{})

fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
})
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
createAction := action.(core.CreateAction)
if createAction == nil {
return false, nil, nil
}
eviction := createAction.GetObject().(*policyv1beta1.Eviction)
if eviction == nil {
return false, nil, nil
}
assert.Equal(t, int64(5), *eviction.DeleteOptions.GracePeriodSeconds)
deletedPods <- eviction.Name
return true, nil, nil
})

options := config.AutoscalingOptions{
MaxGracefulTerminationSec: 20,
MaxPodEvictionTime: 5 * time.Second,
}
ctx, err := NewScaleTestAutoscalingContext(options, fakeClient, nil, nil, nil, nil)
assert.NoError(t, err)

evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
_, err = evictor.DrainNodeWithPods(&ctx, n1, []*apiv1.Pod{p1}, []*apiv1.Pod{})
assert.NoError(t, err)
deleted := <-deletedPods
assert.Equal(t, p1.Name, deleted)
}

func TestDrainNodeWithPodsWithRescheduled(t *testing.T) {
deletedPods := make(chan string, 10)
fakeClient := &fake.Clientset{}
Expand Down
16 changes: 11 additions & 5 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/klogx"

Expand Down Expand Up @@ -118,11 +119,6 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.ScaleDownDisabledAnnotation, nil
}

utilInfo, err := utilization.Calculate(nodeInfo, context.IgnoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, context.CloudProvider.GPULabel(), timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}

nodeGroup, err := context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Warning("Node group not found for node %v: %v", node.Name, err)
Expand All @@ -135,6 +131,16 @@ func (c *Checker) unremovableReasonAndNodeUtilization(context *context.Autoscali
return simulator.NotAutoscaled, nil
}

utilInfo, err := utilization.Calculate(nodeInfo, context.IgnoreDaemonSetsUtilization, context.IgnoreMirrorPodsUtilization, context.CloudProvider.GPULabel(), timestamp)
if err != nil {
klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err)
}

if utils.HasScaleDownRequestedAnnotation(node) {
klog.V(1).Infof("Node %s is removable (%s utilization %f), because it has scale-down-requested annotation", node.Name, utilInfo.ResourceName, utilInfo.Utilization)
return simulator.NoReason, &utilInfo
}

underutilized, err := c.isNodeBelowUtilizationThreshold(context, node, nodeGroup, utilInfo)
if err != nil {
klog.Warningf("Failed to check utilization thresholds for %s: %v", node.Name, err)
Expand Down
10 changes: 10 additions & 0 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

Expand All @@ -50,6 +51,10 @@ func TestFilterOutUnremovable(t *testing.T) {
noScaleDownNode.Annotations = map[string]string{ScaleDownDisabledKey: "true"}
SetNodeReadyState(noScaleDownNode, true, time.Time{})

requestScaleDownNode := BuildTestNode("requestScaleDown", 1000, 10)
requestScaleDownNode.Annotations = map[string]string{utils.ScaleDownRequestedKey: "true"}
SetNodeReadyState(requestScaleDownNode, true, time.Time{})

bigPod := BuildTestPod("bigPod", 600, 0)
bigPod.Spec.NodeName = "regular"

Expand Down Expand Up @@ -77,6 +82,11 @@ func TestFilterOutUnremovable(t *testing.T) {
nodes: []*apiv1.Node{noScaleDownNode, regularNode},
want: []string{"regular"},
},
{
desc: "marked request scale down stays",
nodes: []*apiv1.Node{requestScaleDownNode},
want: []string{"requestScaleDown"},
},
{
desc: "highly utilized node is filtered out",
nodes: []*apiv1.Node{regularNode},
Expand Down
Loading

0 comments on commit b68ca85

Please sign in to comment.