diff --git a/cluster-autoscaler/FAQ.md b/cluster-autoscaler/FAQ.md index aca69d619ed1..462ec97cc0dc 100644 --- a/cluster-autoscaler/FAQ.md +++ b/cluster-autoscaler/FAQ.md @@ -720,6 +720,7 @@ The following startup parameters are supported for cluster autoscaler: | `kubeconfig` | Path to kubeconfig file with authorization and API Server location information | "" | `cloud-config` | The path to the cloud provider configuration file. Empty string for no configuration file | "" | `namespace` | Namespace in which cluster-autoscaler run | "kube-system" +| `scale-up-node-group-to-min-size-enabled` | Should CA scale up the node group to the configured min size if needed | false | `scale-down-enabled` | Should CA scale down the cluster | true | `scale-down-delay-after-add` | How long after scale up that scale down evaluation resumes | 10 minutes | `scale-down-delay-after-delete` | How long after node deletion that scale down evaluation resumes, defaults to scan-interval | scan-interval @@ -933,7 +934,14 @@ Events: ``` ### My cluster is below minimum / above maximum number of nodes, but CA did not fix that! Why? -Cluster Autoscaler will not scale the cluster beyond these limits, but does not enforce them. If your cluster is below the minimum number of nodes configured for Cluster Autoscaler, it will be scaled up *only* in presence of unschedulable pods. +Cluster Autoscaler will not scale the cluster beyond these limits, but some other external factors could make this happen. Here are some common scenarios. +* Existing nodes were deleted from K8s and the cloud provider, which could cause the cluster fell below the minimum number of nodes. +* New nodes were added directly to the cloud provider, which could cause the cluster exceeded the maximum number of nodes. +* Cluster Autoscaler was turned on in the middle of the cluster lifecycle, and the initial number of nodes might beyond these limits. + +By default, Cluster Autoscaler does not enforce the node group size. If your cluster is below the minimum number of nodes configured for CA, it will be scaled up *only* in presence of unschedulable pods. On the other hand, if your cluster is above the minimum number of nodes configured for CA, it will be scaled down *only* if it has unneeded nodes. + +Starting with CA 1.26.0, a new flag `--scale-up-to-node-group-min-size-enabled` was introduced to enforce the node group minimum size. For node groups with fewer nodes than the configuration, CA will scale them up to the minimum number of nodes. To enable this feature, please set it to `true` in the command. ### What happens in scale-up when I have no more quota in the cloud provider? diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index da0e1a41d96f..b4348bd0d03f 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -96,6 +96,8 @@ type AutoscalingOptions struct { CloudProviderName string // NodeGroups is the list of node groups a.k.a autoscaling targets NodeGroups []string + // ScaleUpToNodeGroupMinSizeEnabled is used to allow CA to scale up the node group to the configured min size if needed. + ScaleUpToNodeGroupMinSizeEnabled bool // ScaleDownEnabled is used to allow CA to scale down the cluster ScaleDownEnabled bool // ScaleDownDelayAfterAdd sets the duration from the last scale up to the time when CA starts to check scale down options diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index de1bf552f1c7..c615c4180787 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -18,16 +18,15 @@ package core import ( "fmt" - "math" "strings" "time" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -45,205 +44,6 @@ import ( klog "k8s.io/klog/v2" ) -type scaleUpResourcesLimits map[string]int64 -type scaleUpResourcesDelta map[string]int64 - -// used as a value in scaleUpResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider -const scaleUpLimitUnknown = math.MaxInt64 - -func computeScaleUpResourcesLeftLimits( - context *context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - nodeGroups []cloudprovider.NodeGroup, - nodeInfos map[string]*schedulerframework.NodeInfo, - nodesFromNotAutoscaledGroups []*apiv1.Node, - resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesLimits, errors.AutoscalerError) { - totalCores, totalMem, errCoresMem := calculateScaleUpCoresMemoryTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups) - - var totalResources map[string]int64 - var totalResourcesErr error - if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) { - totalResources, totalResourcesErr = calculateScaleUpCustomResourcesTotal(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups) - } - - resultScaleUpLimits := make(scaleUpResourcesLimits) - for _, resource := range resourceLimiter.GetResources() { - max := resourceLimiter.GetMax(resource) - - // we put only actual limits into final map. No entry means no limit. - if max > 0 { - if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil { - // core resource info missing - no reason to proceed with scale up - return scaleUpResourcesLimits{}, errCoresMem - } - switch { - case resource == cloudprovider.ResourceNameCores: - if errCoresMem != nil { - resultScaleUpLimits[resource] = scaleUpLimitUnknown - } else { - resultScaleUpLimits[resource] = computeBelowMax(totalCores, max) - } - - case resource == cloudprovider.ResourceNameMemory: - if errCoresMem != nil { - resultScaleUpLimits[resource] = scaleUpLimitUnknown - } else { - resultScaleUpLimits[resource] = computeBelowMax(totalMem, max) - } - - case cloudprovider.IsCustomResource(resource): - if totalResourcesErr != nil { - resultScaleUpLimits[resource] = scaleUpLimitUnknown - } else { - resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max) - } - - default: - klog.Errorf("Scale up limits defined for unsupported resource '%s'", resource) - } - } - } - - return resultScaleUpLimits, nil -} - -func calculateScaleUpCoresMemoryTotal( - nodeGroups []cloudprovider.NodeGroup, - nodeInfos map[string]*schedulerframework.NodeInfo, - nodesFromNotAutoscaledGroups []*apiv1.Node) (int64, int64, errors.AutoscalerError) { - var coresTotal int64 - var memoryTotal int64 - - for _, nodeGroup := range nodeGroups { - currentSize, err := nodeGroup.TargetSize() - if err != nil { - return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id()) - } - nodeInfo, found := nodeInfos[nodeGroup.Id()] - if !found { - return 0, 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id()) - } - if currentSize > 0 { - nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo) - coresTotal = coresTotal + int64(currentSize)*nodeCPU - memoryTotal = memoryTotal + int64(currentSize)*nodeMemory - } - } - - for _, node := range nodesFromNotAutoscaledGroups { - cores, memory := utils.GetNodeCoresAndMemory(node) - coresTotal += cores - memoryTotal += memory - } - - return coresTotal, memoryTotal, nil -} - -func calculateScaleUpCustomResourcesTotal( - context *context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - nodeGroups []cloudprovider.NodeGroup, - nodeInfos map[string]*schedulerframework.NodeInfo, - nodesFromNotAutoscaledGroups []*apiv1.Node) (map[string]int64, errors.AutoscalerError) { - - result := make(map[string]int64) - for _, nodeGroup := range nodeGroups { - currentSize, err := nodeGroup.TargetSize() - if err != nil { - return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id()) - } - nodeInfo, found := nodeInfos[nodeGroup.Id()] - if !found { - return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id()) - } - if currentSize > 0 { - resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup) - if err != nil { - return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id()) - } - for _, resourceTarget := range resourceTargets { - if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 { - continue - } - result[resourceTarget.ResourceType] += resourceTarget.ResourceCount * int64(currentSize) - } - } - } - - for _, node := range nodesFromNotAutoscaledGroups { - resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, node, nil) - if err != nil { - return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name) - } - for _, resourceTarget := range resourceTargets { - if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 { - continue - } - result[resourceTarget.ResourceType] += resourceTarget.ResourceCount - } - } - - return result, nil -} - -func computeBelowMax(total int64, max int64) int64 { - if total < max { - return max - total - } - return 0 -} - -func computeScaleUpResourcesDelta(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, - nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) { - resultScaleUpDelta := make(scaleUpResourcesDelta) - - nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo) - resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU - resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory - - if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) { - resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup) - if err != nil { - return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target custom resources for node group %v:", nodeGroup.Id()) - } - for _, resourceTarget := range resourceTargets { - resultScaleUpDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount - } - } - - return resultScaleUpDelta, nil -} - -type scaleUpLimitsCheckResult struct { - exceeded bool - exceededResources []string -} - -func scaleUpLimitsNotExceeded() scaleUpLimitsCheckResult { - return scaleUpLimitsCheckResult{false, []string{}} -} - -func (limits *scaleUpResourcesLimits) checkScaleUpDeltaWithinLimits(delta scaleUpResourcesDelta) scaleUpLimitsCheckResult { - exceededResources := sets.NewString() - for resource, resourceDelta := range delta { - resourceLeft, found := (*limits)[resource] - if found { - if (resourceDelta > 0) && (resourceLeft == scaleUpLimitUnknown || resourceDelta > resourceLeft) { - exceededResources.Insert(resource) - } - } - } - if len(exceededResources) > 0 { - return scaleUpLimitsCheckResult{true, exceededResources.List()} - } - - return scaleUpLimitsNotExceeded() -} - -func getNodeInfoCoresAndMemory(nodeInfo *schedulerframework.NodeInfo) (int64, int64) { - return utils.GetNodeCoresAndMemory(nodeInfo.Node()) -} - type skippedReasons struct { message []string } @@ -317,6 +117,57 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG return option, nil } +func isNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, clusterStateRegistry *clusterstate.ClusterStateRegistry, now time.Time) (bool, *skippedReasons) { + // Autoprovisioned node groups without nodes are created later so skip check for them. + if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) { + // Hack that depends on internals of IsNodeGroupSafeToScaleUp. + if !clusterStateRegistry.IsNodeGroupHealthy(nodeGroup.Id()) { + klog.Warningf("Node group %s is not ready for scaleup - unhealthy", nodeGroup.Id()) + return false, notReadyReason + } + klog.Warningf("Node group %s is not ready for scaleup - backoff", nodeGroup.Id()) + return false, backoffReason + } + return true, nil +} + +func isNodeGroupResourceExceeded(resourceManager scaleup.ScaleUpResourceManager, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) (bool, *skippedReasons) { + scaleUpResourcesDelta, err := resourceManager.ComputeScaleUpResourcesDelta(nodeInfo, nodeGroup) + if err != nil { + klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err) + return true, notReadyReason + } + + checkResult := resourceManager.CheckScaleUpDeltaWithinLimits(scaleUpResourcesDelta) + if checkResult.Exceeded { + klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources) + for _, resource := range checkResult.ExceededResources { + switch resource { + case cloudprovider.ResourceNameCores: + metrics.RegisterSkippedScaleUpCPU() + case cloudprovider.ResourceNameMemory: + metrics.RegisterSkippedScaleUpMemory() + default: + continue + } + } + return true, maxResourceLimitReached(checkResult.ExceededResources) + } + return false, nil +} + +func getCappedNewNodeCount(context *context.AutoscalingContext, newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) { + if context.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > context.MaxNodesTotal { + klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) + newNodeCount = context.MaxNodesTotal - currentNodeCount + context.LogRecorder.Eventf(apiv1.EventTypeWarning, "MaxNodesTotalReached", "Max total nodes in cluster reached: %v", context.MaxNodesTotal) + if newNodeCount < 1 { + return newNodeCount, errors.NewAutoscalerError(errors.TransientError, "max node total count already reached") + } + } + return newNodeCount, nil +} + // ScaleUp tries to scale the cluster up. Return true if it found a way to increase the size, // false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are // ready and in sync with instance groups. @@ -329,35 +180,12 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil } - now := time.Now() - loggingQuota := klogx.PodsLoggingQuota() - for _, pod := range unschedulablePods { klogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name) } klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left()) - - nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider) - if err != nil { - return scaleUpError(&status.ScaleUpStatus{}, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: ")) - } - - nodeGroups := context.CloudProvider.NodeGroups() - gpuLabel := context.CloudProvider.GPULabel() - availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes() - - resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter() - if errCP != nil { - return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError( - errors.CloudProviderError, - errCP)) - } - - scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter) - if errLimits != nil { - return scaleUpError(&status.ScaleUpStatus{}, errLimits.AddPrefix("Could not compute total resources: ")) - } + podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods) upcomingNodes := make([]*schedulerframework.NodeInfo, 0) for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() { @@ -374,8 +202,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto } klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) - expansionOptions := make(map[string]expander.Option, 0) - + nodeGroups := context.CloudProvider.NodeGroups() if processors != nil && processors.NodeGroupListProcessor != nil { var errProc error nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods) @@ -384,19 +211,21 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto } } - podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods) + resourceManager, err := scaleup.NewScaleUpResourceManager(context, processors, nodes, nodeInfos) + if err != nil { + return scaleUpError(&status.ScaleUpStatus{}, err) + } + now := time.Now() + gpuLabel := context.CloudProvider.GPULabel() + availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes() + expansionOptions := make(map[string]expander.Option, 0) skippedNodeGroups := map[string]status.Reasons{} + for _, nodeGroup := range nodeGroups { - // Autoprovisioned node groups without nodes are created later so skip check for them. - if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) { - // Hack that depends on internals of IsNodeGroupSafeToScaleUp. - if !clusterStateRegistry.IsNodeGroupHealthy(nodeGroup.Id()) { - klog.Warningf("Node group %s is not ready for scaleup - unhealthy", nodeGroup.Id()) - skippedNodeGroups[nodeGroup.Id()] = notReadyReason - } else { - klog.Warningf("Node group %s is not ready for scaleup - backoff", nodeGroup.Id()) - skippedNodeGroups[nodeGroup.Id()] = backoffReason + if readyToScaleUp, skipReason := isNodeGroupReadyToScaleUp(nodeGroup, clusterStateRegistry, now); !readyToScaleUp { + if skipReason != nil { + skippedNodeGroups[nodeGroup.Id()] = skipReason } continue } @@ -420,25 +249,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto continue } - scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter) - if err != nil { - klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err) - skippedNodeGroups[nodeGroup.Id()] = notReadyReason - continue - } - checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta) - if checkResult.exceeded { - klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.exceededResources) - skippedNodeGroups[nodeGroup.Id()] = maxResourceLimitReached(checkResult.exceededResources) - for _, resource := range checkResult.exceededResources { - switch resource { - case cloudprovider.ResourceNameCores: - metrics.RegisterSkippedScaleUpCPU() - case cloudprovider.ResourceNameMemory: - metrics.RegisterSkippedScaleUpMemory() - default: - continue - } + if exceeded, skipReason := isNodeGroupResourceExceeded(resourceManager, nodeGroup, nodeInfo); exceeded { + if skipReason != nil { + skippedNodeGroups[nodeGroup.Id()] = skipReason } continue } @@ -458,6 +271,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) } } + if len(expansionOptions) == 0 { klog.V(1).Info("No expansion options") return &status.ScaleUpStatus{ @@ -481,16 +295,11 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto klog.V(1).Infof("Estimated %d nodes needed in %s", bestOption.NodeCount, bestOption.NodeGroup.Id()) newNodes := bestOption.NodeCount - - if context.MaxNodesTotal > 0 && len(nodes)+newNodes+len(upcomingNodes) > context.MaxNodesTotal { - klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) - newNodes = context.MaxNodesTotal - len(nodes) - len(upcomingNodes) - context.LogRecorder.Eventf(apiv1.EventTypeWarning, "MaxNodesTotalReached", "Max total nodes in cluster reached: %v", context.MaxNodesTotal) - if newNodes < 1 { - return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, - errors.NewAutoscalerError(errors.TransientError, "max node total count already reached")) - } + newNodeCount, err := getCappedNewNodeCount(context, newNodes, len(nodes)+len(upcomingNodes)) + if err != nil { + return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, err) } + newNodes = newNodeCount createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) if !bestOption.NodeGroup.Exist() { @@ -556,8 +365,16 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto "No node info for best expansion option!")) } + // Recreate a new scale up resource manager, because the node group info might be changed in the new node group create scenario above. + resourceManager, err := scaleup.NewScaleUpResourceManager(context, processors, nodes, nodeInfos) + if err != nil { + return scaleUpError( + &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, + err) + } + // apply upper limits for CPU and memory - newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter) + newNodes, err = resourceManager.ApplyScaleUpResourcesLimits(newNodes, bestOption.NodeGroup, nodeInfo) if err != nil { return scaleUpError( &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, @@ -572,6 +389,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, typedErr.AddPrefix("Failed to find matching node groups: ")) } + similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions) for _, ng := range similarNodeGroups { if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) { @@ -583,6 +401,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto klog.V(2).Infof("Ignoring node group %s when balancing: group is not ready for scaleup", ng.Id()) } } + if len(targetNodeGroups) > 1 { var names = []string{} for _, ng := range targetNodeGroups { @@ -591,6 +410,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto klog.V(1).Infof("Splitting scale-up between %v similar node groups: {%v}", len(targetNodeGroups), strings.Join(names, ", ")) } } + scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups( context, targetNodeGroups, newNodes) if typedErr != nil { @@ -598,6 +418,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, typedErr) } + klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) for _, info := range scaleUpInfos { typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now) @@ -632,6 +453,110 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto }, nil } +// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes than the configured min size. +// The source of truth for the current node group size is the TargetSize queried directly from cloud providers. +// Return the scale up status (ScaleUpNotNeeded, ScaleUpSuccessful or FailedResizeNodeGroups) and errors if any. +func ScaleUpToNodeGroupMinSize(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, + nodes []*apiv1.Node, nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) { + now := time.Now() + nodeGroups := context.CloudProvider.NodeGroups() + gpuLabel := context.CloudProvider.GPULabel() + availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes() + scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0) + + resourceManager, err := scaleup.NewScaleUpResourceManager(context, processors, nodes, nodeInfos) + if err != nil { + return scaleUpError(&status.ScaleUpStatus{}, err) + } + + for _, ng := range nodeGroups { + if !ng.Exist() { + klog.Warningf("ScaleUpToNodeGroupMinSize: NodeGroup %s does not exist", ng.Id()) + continue + } + + targetSize, err := ng.TargetSize() + if err != nil { + klog.Warningf("ScaleUpToNodeGroupMinSize: failed to get target size of node group %s", ng.Id()) + continue + } + + klog.V(4).Infof("ScaleUpToNodeGroupMinSize: NodeGroup %s, TargetSize %d, MinSize %d, MaxSize %d", ng.Id(), targetSize, ng.MinSize(), ng.MaxSize()) + if targetSize >= ng.MinSize() { + continue + } + + if readyToScaleUp, skipReason := isNodeGroupReadyToScaleUp(ng, clusterStateRegistry, now); !readyToScaleUp { + klog.Warningf("ScaleUpToNodeGroupMinSize: node group is ready to scale up: %v", skipReason) + continue + } + + nodeInfo, found := nodeInfos[ng.Id()] + if !found { + klog.Warningf("ScaleUpToNodeGroupMinSize: no node info for %s", ng.Id()) + continue + } + + exceeded, skipReason := isNodeGroupResourceExceeded(resourceManager, ng, nodeInfo) + if exceeded { + klog.Warning("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason) + continue + } + + newNodeCount := ng.MinSize() - targetSize + newNodeCount, err = resourceManager.ApplyScaleUpResourcesLimits(newNodeCount, ng, nodeInfo) + if err != nil { + klog.Warning("ScaleUpToNodeGroupMinSize: failed to apply resource limits %v", err) + continue + } + + newNodeCount, err = getCappedNewNodeCount(context, newNodeCount, targetSize) + if err != nil { + klog.Warning("ScaleUpToNodeGroupMinSize: failed to get capped node count %v", err) + continue + } + + info := nodegroupset.ScaleUpInfo{ + Group: ng, + CurrentSize: targetSize, + NewSize: targetSize + newNodeCount, + MaxSize: ng.MaxSize(), + } + scaleUpInfos = append(scaleUpInfos, info) + } + + if len(scaleUpInfos) == 0 { + klog.V(1).Info("ScaleUpToNodeGroupMinSize: scale up not needed") + return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil + } + + klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos) + for _, info := range scaleUpInfos { + nodeInfo, ok := nodeInfos[info.Group.Id()] + if !ok { + klog.Warningf("ScaleUpToNodeGroupMinSize: failed to get node info for node group %s", info.Group.Id()) + continue + } + + gpuType := gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil) + if err := executeScaleUp(context, clusterStateRegistry, info, gpuType, now); err != nil { + return scaleUpError( + &status.ScaleUpStatus{ + FailedResizeNodeGroups: []cloudprovider.NodeGroup{info.Group}, + }, + err, + ) + } + } + + clusterStateRegistry.Recalculate() + return &status.ScaleUpStatus{ + Result: status.ScaleUpSuccessful, + ScaleUpInfos: scaleUpInfos, + ConsideredNodeGroups: nodeGroups, + }, nil +} + func getRemainingPods(egs []*podEquivalenceGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo { remaining := []status.NoScaleUpInfo{} for _, eg := range egs { @@ -717,48 +642,6 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c return nil } -func applyScaleUpResourcesLimits( - context *context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - newNodes int, - scaleUpResourcesLeft scaleUpResourcesLimits, - nodeInfo *schedulerframework.NodeInfo, - nodeGroup cloudprovider.NodeGroup, - resourceLimiter *cloudprovider.ResourceLimiter) (int, errors.AutoscalerError) { - - delta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter) - if err != nil { - return 0, err - } - - for resource, resourceDelta := range delta { - limit, limitFound := scaleUpResourcesLeft[resource] - if !limitFound { - continue - } - if limit == scaleUpLimitUnknown { - // should never happen - checked before - return 0, errors.NewAutoscalerError( - errors.InternalError, - fmt.Sprintf("limit unknown for resource %s", resource)) - } - if int64(newNodes)*resourceDelta <= limit { - // no capping required - continue - } - - newNodes = int(limit / resourceDelta) - klog.V(1).Infof("Capping scale-up size due to limit for resource %s", resource) - if newNodes < 1 { - // should never happen - checked before - return 0, errors.NewAutoscalerError( - errors.InternalError, - fmt.Sprintf("cannot create any node; max limit for resource %s reached", resource)) - } - } - return newNodes, nil -} - func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) { s.ScaleUpError = &err s.Result = status.ScaleUpError diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index 7cad79c43265..c8a3eafe07bc 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -30,6 +30,7 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" @@ -937,51 +938,99 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { assert.True(t, expandedGroupMap["autoprovisioned-T1-2-1"]) } +func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { + podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { + assert.Equal(t, "ng1", nodeGroup) + assert.Equal(t, 1, increase) + return nil + }, nil) + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0}, + map[string]int64{cloudprovider.ResourceNameCores: 48, cloudprovider.ResourceNameMemory: 1000}, + ) + provider.SetResourceLimiter(resourceLimiter) + + // Test cases: + // ng1: current size 1, min size 3, cores limit 48, memory limit 1000 => scale up with 1 new node. + // ng2: current size 1, min size 1, cores limit 48, memory limit 1000 => no scale up. + n1 := BuildTestNode("n1", 16000, 32) + SetNodeReadyState(n1, true, time.Now()) + n2 := BuildTestNode("n2", 16000, 32) + SetNodeReadyState(n2, true, time.Now()) + provider.AddNodeGroup("ng1", 3, 10, 1) + provider.AddNode("ng1", n1) + provider.AddNodeGroup("ng2", 1, 10, 1) + provider.AddNode("ng2", n2) + + options := config.AutoscalingOptions{ + EstimatorName: estimator.BinpackingEstimatorName, + MaxCoresTotal: config.DefaultMaxClusterCores, + MaxMemoryTotal: config.DefaultMaxClusterMemory, + } + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + + nodes := []*apiv1.Node{n1, n2} + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + processors := NewTestProcessors() + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff()) + clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) + + scaleUpStatus, err := ScaleUpToNodeGroupMinSize(&context, processors, clusterState, nodes, nodeInfos) + assert.NoError(t, err) + assert.True(t, scaleUpStatus.WasSuccessful()) + assert.Equal(t, 1, len(scaleUpStatus.ScaleUpInfos)) + assert.Equal(t, 2, scaleUpStatus.ScaleUpInfos[0].NewSize) + assert.Equal(t, "ng1", scaleUpStatus.ScaleUpInfos[0].Group.Id()) +} + func TestCheckScaleUpDeltaWithinLimits(t *testing.T) { type testcase struct { - limits scaleUpResourcesLimits - delta scaleUpResourcesDelta + limits scaleup.ScaleUpResourcesLimits + delta scaleup.ScaleUpResourcesDelta exceededResources []string } tests := []testcase{ { - limits: scaleUpResourcesLimits{"a": 10}, - delta: scaleUpResourcesDelta{"a": 10}, + limits: scaleup.ScaleUpResourcesLimits{"a": 10}, + delta: scaleup.ScaleUpResourcesDelta{"a": 10}, exceededResources: []string{}, }, { - limits: scaleUpResourcesLimits{"a": 10}, - delta: scaleUpResourcesDelta{"a": 11}, + limits: scaleup.ScaleUpResourcesLimits{"a": 10}, + delta: scaleup.ScaleUpResourcesDelta{"a": 11}, exceededResources: []string{"a"}, }, { - limits: scaleUpResourcesLimits{"a": 10}, - delta: scaleUpResourcesDelta{"b": 10}, + limits: scaleup.ScaleUpResourcesLimits{"a": 10}, + delta: scaleup.ScaleUpResourcesDelta{"b": 10}, exceededResources: []string{}, }, { - limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown}, - delta: scaleUpResourcesDelta{"a": 0}, + limits: scaleup.ScaleUpResourcesLimits{"a": scaleup.ScaleUpLimitUnknown}, + delta: scaleup.ScaleUpResourcesDelta{"a": 0}, exceededResources: []string{}, }, { - limits: scaleUpResourcesLimits{"a": scaleUpLimitUnknown}, - delta: scaleUpResourcesDelta{"a": 1}, + limits: scaleup.ScaleUpResourcesLimits{"a": scaleup.ScaleUpLimitUnknown}, + delta: scaleup.ScaleUpResourcesDelta{"a": 1}, exceededResources: []string{"a"}, }, { - limits: scaleUpResourcesLimits{"a": 10, "b": 20, "c": 30}, - delta: scaleUpResourcesDelta{"a": 11, "b": 20, "c": 31}, + limits: scaleup.ScaleUpResourcesLimits{"a": 10, "b": 20, "c": 30}, + delta: scaleup.ScaleUpResourcesDelta{"a": 11, "b": 20, "c": 31}, exceededResources: []string{"a", "c"}, }, } for _, test := range tests { - checkResult := test.limits.checkScaleUpDeltaWithinLimits(test.delta) + checkResult := scaleup.CheckScaleUpDeltaWithinLimits(test.limits, test.delta) if len(test.exceededResources) == 0 { - assert.Equal(t, scaleUpLimitsNotExceeded(), checkResult) + assert.Equal(t, scaleup.ScaleUpLimitsNotExceeded(), checkResult) } else { - assert.Equal(t, scaleUpLimitsCheckResult{true, test.exceededResources}, checkResult) + assert.Equal(t, scaleup.ScaleUpLimitsCheckResult{Exceeded: true, ExceededResources: test.exceededResources}, checkResult) } } } diff --git a/cluster-autoscaler/core/scaleup/resource_manager.go b/cluster-autoscaler/core/scaleup/resource_manager.go new file mode 100644 index 000000000000..5964c9dd4094 --- /dev/null +++ b/cluster-autoscaler/core/scaleup/resource_manager.go @@ -0,0 +1,323 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleup + +import ( + "fmt" + "math" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// ScaleUpLimitUnknown is used as a value in ScaleUpResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider. +const ScaleUpLimitUnknown = math.MaxInt64 + +// ScaleUpResourceManager provides resource checks before scaling up the cluster. +type ScaleUpResourceManager interface { + ComputeScaleUpResourcesDelta(nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (ScaleUpResourcesDelta, errors.AutoscalerError) + CheckScaleUpDeltaWithinLimits(delta ScaleUpResourcesDelta) ScaleUpLimitsCheckResult + ApplyScaleUpResourcesLimits(newNodes int, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) (int, errors.AutoscalerError) +} + +type scaleUpResourceManager struct { + context *context.AutoscalingContext + processors *ca_processors.AutoscalingProcessors + nodes []*apiv1.Node + nodeInfos map[string]*schedulerframework.NodeInfo + nodeGroups []cloudprovider.NodeGroup + + nodesFromNotAutoscaledGroups []*apiv1.Node + resourceLimiter *cloudprovider.ResourceLimiter + scaleUpResourcesLeft ScaleUpResourcesLimits +} + +// ScaleUpLimitsCheckResult contains the limit check result and the exceeded resources if any. +type ScaleUpLimitsCheckResult struct { + Exceeded bool + ExceededResources []string +} + +// ScaleUpResourcesLimits is a map: the key is resource type and the value is resource limit. +type ScaleUpResourcesLimits map[string]int64 + +// ScaleUpResourcesDelta is a map: the key is resource type and the value is resource delta. +type ScaleUpResourcesDelta map[string]int64 + +// NewScaleUpResourceManager creates an instance of scale up resource manager with provided parameters. +func NewScaleUpResourceManager(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo) (ScaleUpResourceManager, errors.AutoscalerError) { + nodeGroups := context.CloudProvider.NodeGroups() + manager := &scaleUpResourceManager{ + context: context, + processors: processors, + nodes: nodes, + nodeInfos: nodeInfos, + nodeGroups: nodeGroups, + } + return manager, manager.init() +} + +func (m *scaleUpResourceManager) ComputeScaleUpResourcesDelta(nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (ScaleUpResourcesDelta, errors.AutoscalerError) { + resultScaleUpDelta := make(ScaleUpResourcesDelta) + nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo) + resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU + resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory + + if cloudprovider.ContainsCustomResources(m.resourceLimiter.GetResources()) { + resourceTargets, err := m.processors.CustomResourcesProcessor.GetNodeResourceTargets(m.context, nodeInfo.Node(), nodeGroup) + if err != nil { + return ScaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target custom resources for node group %v:", nodeGroup.Id()) + } + + for _, resourceTarget := range resourceTargets { + resultScaleUpDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount + } + } + + return resultScaleUpDelta, nil +} + +func (m *scaleUpResourceManager) ApplyScaleUpResourcesLimits(newNodes int, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) (int, errors.AutoscalerError) { + delta, err := m.ComputeScaleUpResourcesDelta(nodeInfo, nodeGroup) + if err != nil { + return 0, err + } + + for resource, resourceDelta := range delta { + limit, limitFound := m.scaleUpResourcesLeft[resource] + if !limitFound { + continue + } + + if limit == ScaleUpLimitUnknown { + // should never happen - checked before + return 0, errors.NewAutoscalerError( + errors.InternalError, + fmt.Sprintf("limit unknown for resource %s", resource)) + } + + if int64(newNodes)*resourceDelta <= limit { + // no capping required + continue + } + + newNodes = int(limit / resourceDelta) + klog.V(1).Infof("Capping scale-up size due to limit for resource %s", resource) + if newNodes < 1 { + // should never happen - checked before + return 0, errors.NewAutoscalerError( + errors.InternalError, + fmt.Sprintf("cannot create any node; max limit for resource %s reached", resource)) + } + } + + return newNodes, nil +} + +func (m *scaleUpResourceManager) CheckScaleUpDeltaWithinLimits(delta ScaleUpResourcesDelta) ScaleUpLimitsCheckResult { + return CheckScaleUpDeltaWithinLimits(m.scaleUpResourcesLeft, delta) +} + +// CheckScaleUpDeltaWithinLimits compares the resource limit and resource delta, and returns the limit check result. +func CheckScaleUpDeltaWithinLimits(left ScaleUpResourcesLimits, delta ScaleUpResourcesDelta) ScaleUpLimitsCheckResult { + ExceededResources := sets.NewString() + for resource, resourceDelta := range delta { + resourceLeft, found := left[resource] + if found { + if (resourceDelta > 0) && (resourceLeft == ScaleUpLimitUnknown || resourceDelta > resourceLeft) { + ExceededResources.Insert(resource) + } + } + } + if len(ExceededResources) > 0 { + return ScaleUpLimitsCheckResult{true, ExceededResources.List()} + } + + return ScaleUpLimitsNotExceeded() +} + +// ScaleUpLimitsNotExceeded returns a not exceeded limit check result. +func ScaleUpLimitsNotExceeded() ScaleUpLimitsCheckResult { + return ScaleUpLimitsCheckResult{false, []string{}} +} + +func (m *scaleUpResourceManager) init() errors.AutoscalerError { + var errca errors.AutoscalerError + var errgo error + + m.nodesFromNotAutoscaledGroups, errca = utils.FilterOutNodesFromNotAutoscaledGroups(m.nodes, m.context.CloudProvider) + if errca != nil { + return errca.AddPrefix("failed to filter out nodes which are from not autoscaled groups: ") + } + + m.resourceLimiter, errgo = m.context.CloudProvider.GetResourceLimiter() + if errgo != nil { + return errors.ToAutoscalerError(errors.CloudProviderError, errgo) + } + + m.scaleUpResourcesLeft, errca = m.computeScaleUpResourcesLeftLimits() + if errca != nil { + return errca.AddPrefix("Could not compute total resources: ") + } + + return nil +} + +func (m *scaleUpResourceManager) computeScaleUpResourcesLeftLimits() (ScaleUpResourcesLimits, errors.AutoscalerError) { + totalCores, totalMem, errCoresMem := m.calculateScaleUpCoresMemoryTotal() + + var totalResources map[string]int64 + var totalResourcesErr error + if cloudprovider.ContainsCustomResources(m.resourceLimiter.GetResources()) { + totalResources, totalResourcesErr = m.calculateScaleUpCustomResourcesTotal() + } + + resultScaleUpLimits := make(ScaleUpResourcesLimits) + for _, resource := range m.resourceLimiter.GetResources() { + max := m.resourceLimiter.GetMax(resource) + + // we put only actual limits into final map. No entry means no limit. + if max > 0 { + if (resource == cloudprovider.ResourceNameCores || resource == cloudprovider.ResourceNameMemory) && errCoresMem != nil { + // core resource info missing - no reason to proceed with scale up + return ScaleUpResourcesLimits{}, errCoresMem + } + + switch { + case resource == cloudprovider.ResourceNameCores: + if errCoresMem != nil { + resultScaleUpLimits[resource] = ScaleUpLimitUnknown + } else { + resultScaleUpLimits[resource] = computeBelowMax(totalCores, max) + } + + case resource == cloudprovider.ResourceNameMemory: + if errCoresMem != nil { + resultScaleUpLimits[resource] = ScaleUpLimitUnknown + } else { + resultScaleUpLimits[resource] = computeBelowMax(totalMem, max) + } + + case cloudprovider.IsCustomResource(resource): + if totalResourcesErr != nil { + resultScaleUpLimits[resource] = ScaleUpLimitUnknown + } else { + resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max) + } + + default: + klog.Errorf("Scale up limits defined for unsupported resource '%s'", resource) + } + } + } + + return resultScaleUpLimits, nil +} + +func (m *scaleUpResourceManager) calculateScaleUpCoresMemoryTotal() (int64, int64, errors.AutoscalerError) { + var coresTotal int64 + var memoryTotal int64 + for _, nodeGroup := range m.nodeGroups { + currentSize, err := nodeGroup.TargetSize() + if err != nil { + return 0, 0, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id()) + } + + nodeInfo, found := m.nodeInfos[nodeGroup.Id()] + if !found { + return 0, 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id()) + } + + if currentSize > 0 { + nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo) + coresTotal = coresTotal + int64(currentSize)*nodeCPU + memoryTotal = memoryTotal + int64(currentSize)*nodeMemory + } + } + + for _, node := range m.nodesFromNotAutoscaledGroups { + cores, memory := utils.GetNodeCoresAndMemory(node) + coresTotal += cores + memoryTotal += memory + } + + return coresTotal, memoryTotal, nil +} + +func (m *scaleUpResourceManager) calculateScaleUpCustomResourcesTotal() (map[string]int64, errors.AutoscalerError) { + result := make(map[string]int64) + for _, nodeGroup := range m.nodeGroups { + currentSize, err := nodeGroup.TargetSize() + if err != nil { + return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node group size of %v:", nodeGroup.Id()) + } + + nodeInfo, found := m.nodeInfos[nodeGroup.Id()] + if !found { + return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id()) + } + + if currentSize > 0 { + resourceTargets, err := m.processors.CustomResourcesProcessor.GetNodeResourceTargets(m.context, nodeInfo.Node(), nodeGroup) + if err != nil { + return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id()) + } + + for _, resourceTarget := range resourceTargets { + if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 { + continue + } + result[resourceTarget.ResourceType] += resourceTarget.ResourceCount * int64(currentSize) + } + } + } + + for _, node := range m.nodesFromNotAutoscaledGroups { + resourceTargets, err := m.processors.CustomResourcesProcessor.GetNodeResourceTargets(m.context, node, nil) + if err != nil { + return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name) + } + + for _, resourceTarget := range resourceTargets { + if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 { + continue + } + result[resourceTarget.ResourceType] += resourceTarget.ResourceCount + } + } + + return result, nil +} + +func computeBelowMax(total int64, max int64) int64 { + if total < max { + return max - total + } + return 0 +} + +func getNodeInfoCoresAndMemory(nodeInfo *schedulerframework.NodeInfo) (int64, int64) { + return utils.GetNodeCoresAndMemory(nodeInfo.Node()) +} diff --git a/cluster-autoscaler/core/scaleup/resource_manager_test.go b/cluster-autoscaler/core/scaleup/resource_manager_test.go new file mode 100644 index 000000000000..f2789fb10ac6 --- /dev/null +++ b/cluster-autoscaler/core/scaleup/resource_manager_test.go @@ -0,0 +1,226 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleup + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type testCase struct { + NodeGroupMinSize int + NodeGroupMaxSize int + NodeGroupCurrentSize int + + CloudProviderCpuLimit int + CloudProvicerMemLimit int + + NodeCpuRequest int + NodeMemRequest int + NodeCount int +} + +func TestComputeScaleUpResourcesDelta(t *testing.T) { + testCases := []testCase{ + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 0, 0, 8, 16, 10}, + } + + expectedResults := []ScaleUpResourcesDelta{ + {cloudprovider.ResourceNameCores: 8, cloudprovider.ResourceNameMemory: 16}, + {cloudprovider.ResourceNameCores: 8, cloudprovider.ResourceNameMemory: 16}, + } + + for index, test := range testCases { + rm, ni, ng, err := initTestCase(t, test) + assert.NoError(t, err) + + delta, err := rm.ComputeScaleUpResourcesDelta(ni, ng) + assert.NoError(t, err) + assert.Equal(t, expectedResults[index], delta) + } +} + +func TestCheckScaleUpDeltaWithinLimits(t *testing.T) { + testCases := []testCase{ + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + } + + additionalParams := []ScaleUpResourcesDelta{ + {cloudprovider.ResourceNameCores: 8, cloudprovider.ResourceNameMemory: 16}, + {cloudprovider.ResourceNameCores: 24, cloudprovider.ResourceNameMemory: 48}, + {cloudprovider.ResourceNameCores: 32, cloudprovider.ResourceNameMemory: 64}, + {cloudprovider.ResourceNameCores: 24, cloudprovider.ResourceNameMemory: 64}, + {cloudprovider.ResourceNameCores: 48, cloudprovider.ResourceNameMemory: 48}, + } + + expectedResults := []ScaleUpLimitsCheckResult{ + {Exceeded: false, ExceededResources: []string{}}, + {Exceeded: false, ExceededResources: []string{}}, + {Exceeded: true, ExceededResources: []string{cloudprovider.ResourceNameCores, cloudprovider.ResourceNameMemory}}, + {Exceeded: true, ExceededResources: []string{cloudprovider.ResourceNameMemory}}, + {Exceeded: true, ExceededResources: []string{cloudprovider.ResourceNameCores}}, + } + + for index, test := range testCases { + rm, _, _, err := initTestCase(t, test) + assert.NoError(t, err) + + result := rm.CheckScaleUpDeltaWithinLimits(additionalParams[index]) + assert.Equal(t, expectedResults[index], result) + } +} + +func TestApplyScaleUpResourcesLimits(t *testing.T) { + testCases := []testCase{ + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 64, 8, 16, 1}, + {3, 10, 1, 32, 60, 8, 16, 1}, + } + + additionalParams := []int{ + 2, + 3, + 4, + 4, + } + + expectedResults := []int{ + 2, + 3, + 3, + 2, + } + + for index, test := range testCases { + rm, ni, ng, err := initTestCase(t, test) + assert.NoError(t, err) + + newNodeCount, err := rm.ApplyScaleUpResourcesLimits(additionalParams[index], ng, ni) + assert.NoError(t, err) + assert.Equal(t, expectedResults[index], newNodeCount) + } +} + +func TestResourceManagerWithGpuResource(t *testing.T) { + provider := newCloudProvider(t, 320, 640) + context := newContext(t, provider) + processors := test.NewTestProcessors() + + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0, "gpu": 0}, + map[string]int64{cloudprovider.ResourceNameCores: 320, cloudprovider.ResourceNameMemory: 640, "gpu": 16}, + ) + provider.SetResourceLimiter(resourceLimiter) + + n1 := newNode(t, "n1", 8, 16) + utils_test.AddGpusToNode(n1, 4) + n1.Labels[provider.GPULabel()] = "gpu" + provider.AddNodeGroup("ng1", 3, 10, 1) + provider.AddNode("ng1", n1) + ng1, err := provider.NodeGroupForNode(n1) + assert.NoError(t, err) + + nodes := []*corev1.Node{n1} + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + + rm, err := NewScaleUpResourceManager(&context, processors, nodes, nodeInfos) + assert.NoError(t, err) + + delta, err := rm.ComputeScaleUpResourcesDelta(nodeInfos["ng1"], ng1) + assert.Equal(t, int64(8), delta[cloudprovider.ResourceNameCores]) + assert.Equal(t, int64(16), delta[cloudprovider.ResourceNameMemory]) + assert.Equal(t, int64(4), delta["gpu"]) + + left := rm.CheckScaleUpDeltaWithinLimits(delta) + assert.False(t, left.Exceeded) + assert.Zero(t, len(left.ExceededResources)) + + newNodeCount, err := rm.ApplyScaleUpResourcesLimits(10, ng1, nodeInfos["ng1"]) + assert.Equal(t, 3, newNodeCount) +} + +func newCloudProvider(t *testing.T, cpu, mem int64) *testprovider.TestCloudProvider { + provider := testprovider.NewTestCloudProvider(nil, nil) + assert.NotNil(t, provider) + + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0}, + map[string]int64{cloudprovider.ResourceNameCores: cpu, cloudprovider.ResourceNameMemory: mem}, + ) + provider.SetResourceLimiter(resourceLimiter) + return provider +} + +func newContext(t *testing.T, provider cloudprovider.CloudProvider) context.AutoscalingContext { + podLister := kube_util.NewTestPodLister([]*corev1.Pod{}) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + context, err := test.NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + return context +} + +func newNode(t *testing.T, name string, cpu, mem int64) *corev1.Node { + return utils_test.BuildTestNode(name, cpu*1000, mem) +} + +func initTestCase(t *testing.T, c testCase) (ScaleUpResourceManager, *framework.NodeInfo, cloudprovider.NodeGroup, error) { + provider := newCloudProvider(t, int64(c.CloudProviderCpuLimit), int64(c.CloudProvicerMemLimit)) + context := newContext(t, provider) + processors := test.NewTestProcessors() + + nodes := make([]*corev1.Node, 0) + provider.AddNodeGroup("ng", c.NodeGroupMinSize, c.NodeGroupMaxSize, c.NodeGroupCurrentSize) + for index := 0; index < c.NodeCount; index++ { + name := fmt.Sprintf("n%d", index) + node := newNode(t, name, int64(c.NodeCpuRequest), int64(c.NodeMemRequest)) + provider.AddNode("ng", node) + nodes = append(nodes, node) + } + + assert.Equal(t, 1, len(provider.NodeGroups())) + ng := provider.NodeGroups()[0] + + nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now()) + assert.Equal(t, 1, len(nodeInfos)) + assert.NotNil(t, nodeInfos["ng"]) + ni := nodeInfos["ng"] + + rm, err := NewScaleUpResourceManager(&context, processors, nodes, nodeInfos) + assert.NoError(t, err) + return rm, ni, ng, nil +} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 84b09b65739f..ae9405175b7c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -440,26 +440,13 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) - if len(unschedulablePodsToHelp) == 0 { - scaleUpStatus.Result = status.ScaleUpNotNeeded - klog.V(1).Info("No unschedulable pods") - } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { - scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable - klog.V(1).Info("Max total nodes in cluster reached") - } else if allPodsAreNew(unschedulablePodsToHelp, currentTime) { - // The assumption here is that these pods have been created very recently and probably there - // is more pods to come. In theory we could check the newest pod time but then if pod were created - // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. - // We also want to skip a real scale down (just like if the pods were handled). - a.processorCallbacks.DisableScaleDownForLoop() - scaleUpStatus.Result = status.ScaleUpInCooldown - klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") - } else { + preScaleUp := func() time.Time { scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) + return scaleUpStart + } - scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints) - + postScaleUp := func(scaleUpStart time.Time) (bool, errors.AutoscalerError) { metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart) if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil { @@ -469,13 +456,36 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError if typedErr != nil { klog.Errorf("Failed to scale up: %v", typedErr) - return typedErr + return true, typedErr } if scaleUpStatus.Result == status.ScaleUpSuccessful { a.lastScaleUpTime = currentTime // No scale down in this iteration. scaleDownStatus.Result = status.ScaleDownInCooldown - return nil + return true, nil + } + return false, nil + } + + if len(unschedulablePodsToHelp) == 0 { + scaleUpStatus.Result = status.ScaleUpNotNeeded + klog.V(1).Info("No unschedulable pods") + } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { + scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable + klog.V(1).Info("Max total nodes in cluster reached") + } else if allPodsAreNew(unschedulablePodsToHelp, currentTime) { + // The assumption here is that these pods have been created very recently and probably there + // is more pods to come. In theory we could check the newest pod time but then if pod were created + // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. + // We also want to skip a real scale down (just like if the pods were handled). + a.processorCallbacks.DisableScaleDownForLoop() + scaleUpStatus.Result = status.ScaleUpInCooldown + klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") + } else { + scaleUpStart := preScaleUp() + scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints) + if exit, err := postScaleUp(scaleUpStart); exit { + return err } } @@ -591,6 +601,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError } } } + + if a.ScaleUpToNodeGroupMinSizeEnabled { + scaleUpStart := preScaleUp() + scaleUpStatus, typedErr = ScaleUpToNodeGroupMinSize(autoscalingContext, a.processors, a.clusterStateRegistry, readyNodes, nodeInfosForGroups) + if exit, err := postScaleUp(scaleUpStart); exit { + return err + } + } + return nil } diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index ad0011e1063c..1798bb266d56 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -154,6 +154,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, time.Now()) n3 := BuildTestNode("n3", 1000, 1000) + n4 := BuildTestNode("n4", 1000, 1000) p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" @@ -172,7 +173,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { return ret }, nil, nil, - nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni}) + nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni, "ng3": tni}) provider.AddNodeGroup("ng1", 1, 10, 1) provider.AddNode("ng1", n1) ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) @@ -186,11 +187,12 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { ScaleDownUnreadyTime: time.Minute, ScaleDownUtilizationThreshold: 0.5, }, - EstimatorName: estimator.BinpackingEstimatorName, - ScaleDownEnabled: true, - MaxNodesTotal: 1, - MaxCoresTotal: 10, - MaxMemoryTotal: 100000, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleUpToNodeGroupMinSizeEnabled: true, + ScaleDownEnabled: true, + MaxNodesTotal: 1, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -311,6 +313,22 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { assert.NoError(t, err) mock.AssertExpectationsForObjects(t, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + // Scale up to node gorup min size. + readyNodeLister.SetNodes([]*apiv1.Node{n4}) + allNodeLister.SetNodes([]*apiv1.Node{n4}) + scheduledPodMock.On("List").Return([]*apiv1.Pod{}, nil) + unschedulablePodMock.On("List").Return([]*apiv1.Pod{}, nil) + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil) + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) + onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up. + + provider.AddNodeGroup("ng3", 3, 10, 1) + provider.AddNode("ng3", n4) + + err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, onScaleUpMock) } func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 7d0e7c3d56c4..29c587b2c6a7 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -86,14 +86,15 @@ func multiStringFlag(name string, usage string) *MultiStringFlag { } var ( - clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available") - address = flag.String("address", ":8085", "The address to expose prometheus metrics.") - kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") - kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") - cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.") - scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster") - scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, + clusterName = flag.String("cluster-name", "", "Autoscaled cluster name, if available") + address = flag.String("address", ":8085", "The address to expose prometheus metrics.") + kubernetes = flag.String("kubernetes", "", "Kubernetes master location. Leave blank for default") + kubeConfigFile = flag.String("kubeconfig", "", "Path to kubeconfig file with authorization and master location information.") + cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") + namespace = flag.String("namespace", "kube-system", "Namespace in which cluster-autoscaler run.") + scaleUpToNodeGroupMinSizeEnabled = flag.Bool("scale-up-to-node-group-min-size-enabled", false, "Should CA scale up the node group to the configured min size if needed.") + scaleDownEnabled = flag.Bool("scale-down-enabled", true, "Should CA scale down the cluster") + scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, "How long after scale up that scale down evaluation resumes") scaleDownDelayAfterDelete = flag.Duration("scale-down-delay-after-delete", 0, "How long after node deletion that scale down evaluation resumes, defaults to scanInterval") @@ -256,6 +257,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MinMemoryTotal: minMemoryTotal, GpuTotal: parsedGpuTotal, NodeGroups: *nodeGroupsFlag, + ScaleUpToNodeGroupMinSizeEnabled: *scaleUpToNodeGroupMinSizeEnabled, ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd, ScaleDownDelayAfterDelete: *scaleDownDelayAfterDelete, ScaleDownDelayAfterFailure: *scaleDownDelayAfterFailure,