From d9918140319e9327e6a8ba947e3e4c6b11ac6a12 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 30 Dec 2024 18:44:58 -0800 Subject: [PATCH 01/29] add lifecycle heartbeat --- .../templates/deployment.yaml | 4 ++ pkg/config/config.go | 35 ++++++++++++- pkg/monitor/sqsevent/asg-lifecycle-event.go | 51 +++++++++++++++++++ pkg/node/node.go | 4 ++ 4 files changed, 93 insertions(+), 1 deletion(-) diff --git a/config/helm/aws-node-termination-handler/templates/deployment.yaml b/config/helm/aws-node-termination-handler/templates/deployment.yaml index 2d7a8896..7c043fec 100644 --- a/config/helm/aws-node-termination-handler/templates/deployment.yaml +++ b/config/helm/aws-node-termination-handler/templates/deployment.yaml @@ -168,6 +168,10 @@ spec: value: {{ .Values.deleteSqsMsgIfNodeNotFound | quote }} - name: WORKERS value: {{ .Values.workers | quote }} + - name: HEARTBEAT_INTERVAL + value: {{ .Values.heartbeatInterval | quote }} + - name: HEARTBEAT_UNTIL + value: {{ .Values.heartbeatUntil | quote }} {{- with .Values.extraEnv }} {{- toYaml . | nindent 12 }} {{- end }} diff --git a/pkg/config/config.go b/pkg/config/config.go index 05fabdca..284c1d9a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -112,6 +112,9 @@ const ( queueURLConfigKey = "QUEUE_URL" completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS" deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND" + // heartbeat + heartbeatIntervalKey = "HEARTBEAT_INTERVAL" + heartbeatUntilKey = "HEARTBEAT_UNTIL" ) // Config arguments set via CLI, environment variables, or defaults @@ -166,6 +169,8 @@ type Config struct { CompleteLifecycleActionDelaySeconds int DeleteSqsMsgIfNodeNotFound bool UseAPIServerCacheToListPods bool + HeartbeatInterval int + HeartbeatUntil int } // ParseCliArgs parses cli arguments and uses environment variables as fallback values @@ -230,6 +235,8 @@ func ParseCliArgs() (config Config, err error) { flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.") flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.") flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.") + flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period between consecutive heartbeat signals. It is measured in seconds. Minimum 30 seconds and maximum 3600 seconds.") + flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The length of time over which the heartbeat signals are sent out. It is measured in seconds. Minimum 60 seconds and maximum 172800 seconds.") flag.Parse() if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { @@ -274,6 +281,26 @@ func ParseCliArgs() (config Config, err error) { panic("You must provide a node-name to the CLI or NODE_NAME environment variable.") } + // heartbeat value boundary and compability check + if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { + return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) + } + if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { + return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 30 and 172800 seconds", config.HeartbeatUntil) + } + if config.EnableSQSTerminationDraining { + if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { + config.HeartbeatUntil = 172800 + log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) + } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") + } + } else { + if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { + return config, fmt.Errorf("heartbeat is only supported for Queue Processor mode") + } + } + // client-go expects these to be set in env vars os.Setenv(kubernetesServiceHostConfigKey, config.KubernetesServiceHost) os.Setenv(kubernetesServicePortConfigKey, config.KubernetesServicePort) @@ -332,6 +359,8 @@ func (c Config) PrintJsonConfigArgs() { Str("ManagedTag", c.ManagedTag). Bool("use_provider_id", c.UseProviderId). Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods). + Int("heartbeat_interval", c.HeartbeatInterval). + Int("heartbeat_until", c.HeartbeatUntil). Msg("aws-node-termination-handler arguments") } @@ -383,7 +412,9 @@ func (c Config) PrintHumanConfigArgs() { "\tmanaged-tag: %s,\n"+ "\tuse-provider-id: %t,\n"+ "\taws-endpoint: %s,\n"+ - "\tuse-apiserver-cache: %t,\n", + "\tuse-apiserver-cache: %t,\n"+ + "\theartbeat-interval: %d,\n"+ + "\theartbeat-until: %d\n", c.DryRun, c.NodeName, c.PodName, @@ -424,6 +455,8 @@ func (c Config) PrintHumanConfigArgs() { c.UseProviderId, c.AWSEndpoint, c.UseAPIServerCacheToListPods, + c.HeartbeatInterval, + c.HeartbeatUntil, ) } diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index dede29cc..f35634bb 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -16,6 +16,7 @@ package sqsevent import ( "encoding/json" "fmt" + "time" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/aws/aws-node-termination-handler/pkg/node" @@ -94,16 +95,24 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m Description: fmt.Sprintf("ASG Lifecycle Termination event received. Instance will be interrupted at %s \n", event.getTime()), } + stopHeartbeatCh := make(chan struct{}) + interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error { + _, err = m.continueLifecycleAction(lifecycleDetail) if err != nil { return fmt.Errorf("continuing ASG termination lifecycle: %w", err) } log.Info().Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).Str("instanceID", lifecycleDetail.EC2InstanceID).Msg("Completed ASG Lifecycle Hook") + + close(stopHeartbeatCh) return m.deleteMessage(message) } interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { + nthConfig := n.GetNthConfig() + go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) + err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID) if err != nil { log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) @@ -114,6 +123,48 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m return &interruptionEvent, nil } +func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) { + ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second) + defer ticker.Stop() + timeout := time.After(time.Duration(heartbeatUntil) * time.Second) + + for { + select { + case <-stopCh: + return + case <-ticker.C: + if err := m.recordLifecycleActionHeartbeat(lifecycleDetail); err != nil { + log.Err(err).Msg("Unable to send lifecycle heartbeat") + } + case <-timeout: + log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") + return + } + } +} + +func (m SQSMonitor) recordLifecycleActionHeartbeat(LifecycleDetail *LifecycleDetail) error { + input := &autoscaling.RecordLifecycleActionHeartbeatInput{ + AutoScalingGroupName: aws.String(LifecycleDetail.AutoScalingGroupName), + LifecycleHookName: aws.String(LifecycleDetail.LifecycleHookName), + LifecycleActionToken: aws.String(LifecycleDetail.LifecycleActionToken), + InstanceId: aws.String(LifecycleDetail.EC2InstanceID), + } + + _, err := m.ASG.RecordLifecycleActionHeartbeat(input) + if err != nil { + return err + } + + log.Info().Str("asgName", LifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", LifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", LifecycleDetail.LifecycleActionToken). + Str("instanceID", LifecycleDetail.EC2InstanceID). + Msg("Successfully sent lifecycle heartbeat") + + return nil +} + func (m SQSMonitor) deleteMessage(message *sqs.Message) error { errs := m.deleteMessages([]*sqs.Message{message}) if errs != nil { diff --git a/pkg/node/node.go b/pkg/node/node.go index 7e323d13..204c5de6 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -280,6 +280,10 @@ func (n Node) MarkForUncordonAfterReboot(nodeName string) error { return nil } +func (n Node) GetNthConfig() config.Config { + return n.nthConfig +} + // addLabel will add a label to the node given a label key and value // Specifying true for the skipExisting parameter will skip adding the label if it already exists func (n Node) addLabel(nodeName string, key string, value string, skipExisting bool) error { From 0e4b68650b2a3d01b5c8d9fc7fd6317d5181997f Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 30 Dec 2024 18:45:21 -0800 Subject: [PATCH 02/29] Lifecycle heartbeat unit test --- pkg/monitor/sqsevent/sqs-monitor_test.go | 90 +++++++++++++++++++++++- pkg/test/aws-mocks.go | 25 +++++-- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index 8e827377..6a492257 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -18,6 +18,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/aws/aws-node-termination-handler/pkg/monitor/sqsevent" @@ -276,7 +277,6 @@ func TestMonitor_AsgDirectToSqsSuccess(t *testing.T) { default: h.Ok(t, fmt.Errorf("Expected an event to be generated")) } - } func TestMonitor_AsgDirectToSqsTestNotification(t *testing.T) { @@ -520,7 +520,6 @@ func TestMonitor_DrainTasksASGFailure(t *testing.T) { default: h.Ok(t, fmt.Errorf("Expected to get an event with a failing post drain task")) } - } func TestMonitor_Failure(t *testing.T) { @@ -908,6 +907,93 @@ func TestMonitor_InstanceNotManaged(t *testing.T) { } } +func TestSendHeartbeats_EarlyClosure(t *testing.T) { + asgMock := h.MockedASG{ + RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, + RecordLifecycleActionHeartbeatErr: nil, + } + + sqsMonitor := sqsevent.SQSMonitor{ + ASG: asgMock, + } + + lifecycleDetail := sqsevent.LifecycleDetail{ + LifecycleHookName: "test-hook", + AutoScalingGroupName: "test-asg", + LifecycleActionToken: "test-token", + EC2InstanceID: "XXXXXXXXXXXX", + } + + stopHeartbeatCh := make(chan struct{}) + + go func() { + time.Sleep(3500 * time.Millisecond) + close(stopHeartbeatCh) + }() + + sqsMonitor.SendHeartbeats(1, 5, &lifecycleDetail, stopHeartbeatCh) + + h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func TestSendHeartbeats_NormalClosure(t *testing.T) { + asgMock := h.MockedASG{ + RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, + RecordLifecycleActionHeartbeatErr: nil, + } + + sqsMonitor := sqsevent.SQSMonitor{ + ASG: asgMock, + } + + lifecycleDetail := sqsevent.LifecycleDetail{ + LifecycleHookName: "test-hook", + AutoScalingGroupName: "test-asg", + LifecycleActionToken: "test-token", + EC2InstanceID: "XXXXXXXXXXXX", + } + + stopHeartbeatCh := make(chan struct{}) + + go func() { + time.Sleep(10 * time.Second) + close(stopHeartbeatCh) + }() + + sqsMonitor.SendHeartbeats(1, 5, &lifecycleDetail, stopHeartbeatCh) + + h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func TestSendHeartbeats_ErrorASG(t *testing.T) { + asgMock := h.MockedASG{ + RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, + RecordLifecycleActionHeartbeatErr: awserr.NewRequestFailure(aws.ErrThrottling, 400, "bad-request"), + } + + sqsMonitor := sqsevent.SQSMonitor{ + ASG: asgMock, + } + + lifecycleDetail := sqsevent.LifecycleDetail{ + LifecycleHookName: "test-hook", + AutoScalingGroupName: "test-asg", + LifecycleActionToken: "test-token", + EC2InstanceID: "XXXXXXXXXXXX", + } + + stopHeartbeatCh := make(chan struct{}) + + go func() { + time.Sleep(10 * time.Second) + close(stopHeartbeatCh) + }() + + sqsMonitor.SendHeartbeats(1, 6, &lifecycleDetail, stopHeartbeatCh) + + h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + // AWS Mock Helpers specific to sqs-monitor tests func getDescribeInstancesResp(privateDNSName string, withASGTag bool, withManagedTag bool) ec2.DescribeInstancesOutput { diff --git a/pkg/test/aws-mocks.go b/pkg/test/aws-mocks.go index 8d5c8ae5..b00ae365 100644 --- a/pkg/test/aws-mocks.go +++ b/pkg/test/aws-mocks.go @@ -56,12 +56,14 @@ func (m MockedEC2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.De // MockedASG mocks the autoscaling API type MockedASG struct { autoscalingiface.AutoScalingAPI - CompleteLifecycleActionResp autoscaling.CompleteLifecycleActionOutput - CompleteLifecycleActionErr error - DescribeAutoScalingInstancesResp autoscaling.DescribeAutoScalingInstancesOutput - DescribeAutoScalingInstancesErr error - DescribeTagsPagesResp autoscaling.DescribeTagsOutput - DescribeTagsPagesErr error + CompleteLifecycleActionResp autoscaling.CompleteLifecycleActionOutput + CompleteLifecycleActionErr error + DescribeAutoScalingInstancesResp autoscaling.DescribeAutoScalingInstancesOutput + DescribeAutoScalingInstancesErr error + DescribeTagsPagesResp autoscaling.DescribeTagsOutput + DescribeTagsPagesErr error + RecordLifecycleActionHeartbeatResp autoscaling.RecordLifecycleActionHeartbeatOutput + RecordLifecycleActionHeartbeatErr error } // CompleteLifecycleAction mocks the autoscaling.CompleteLifecycleAction API call @@ -81,3 +83,14 @@ func (m MockedASG) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn de fn(&m.DescribeTagsPagesResp, true) return m.DescribeTagsPagesErr } + +var HeartbeatCallCount int + +// RecordLifecycleActionHeartbeat mocks the autoscaling.RecordLifecycleActionHeartbeat API call +func (m MockedASG) RecordLifecycleActionHeartbeat(input *autoscaling.RecordLifecycleActionHeartbeatInput) (*autoscaling.RecordLifecycleActionHeartbeatOutput, error) { + HeartbeatCallCount++ + if m.RecordLifecycleActionHeartbeatErr != nil && HeartbeatCallCount%2 == 1 { + return &m.RecordLifecycleActionHeartbeatResp, m.RecordLifecycleActionHeartbeatErr + } + return &m.RecordLifecycleActionHeartbeatResp, nil +} From 1fbd7cbad9feebb39f21107beba9149bf6954067 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 2 Jan 2025 14:39:43 -0800 Subject: [PATCH 03/29] Refactor heartbeat logging statements --- pkg/monitor/sqsevent/asg-lifecycle-event.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index f35634bb..9d2d1f12 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -133,9 +133,7 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li case <-stopCh: return case <-ticker.C: - if err := m.recordLifecycleActionHeartbeat(lifecycleDetail); err != nil { - log.Err(err).Msg("Unable to send lifecycle heartbeat") - } + m.recordLifecycleActionHeartbeat(lifecycleDetail) case <-timeout: log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") return @@ -143,7 +141,7 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li } } -func (m SQSMonitor) recordLifecycleActionHeartbeat(LifecycleDetail *LifecycleDetail) error { +func (m SQSMonitor) recordLifecycleActionHeartbeat(LifecycleDetail *LifecycleDetail) { input := &autoscaling.RecordLifecycleActionHeartbeatInput{ AutoScalingGroupName: aws.String(LifecycleDetail.AutoScalingGroupName), LifecycleHookName: aws.String(LifecycleDetail.LifecycleHookName), @@ -151,18 +149,19 @@ func (m SQSMonitor) recordLifecycleActionHeartbeat(LifecycleDetail *LifecycleDet InstanceId: aws.String(LifecycleDetail.EC2InstanceID), } - _, err := m.ASG.RecordLifecycleActionHeartbeat(input) - if err != nil { - return err - } - log.Info().Str("asgName", LifecycleDetail.AutoScalingGroupName). Str("lifecycleHookName", LifecycleDetail.LifecycleHookName). Str("lifecycleActionToken", LifecycleDetail.LifecycleActionToken). Str("instanceID", LifecycleDetail.EC2InstanceID). - Msg("Successfully sent lifecycle heartbeat") + Msg("Sending lifecycle heartbeat") - return nil + _, err := m.ASG.RecordLifecycleActionHeartbeat(input) + if err != nil { + log.Err(err).Msg("Failed to send lifecycle heartbeat") + return + } + + log.Info().Msg("Successfully sent lifecycle heartbeat") } func (m SQSMonitor) deleteMessage(message *sqs.Message) error { From d0f1ef4dd7d49d4565f37a5b1a375d4fe29e76da Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 2 Jan 2025 14:40:07 -0800 Subject: [PATCH 04/29] Heartbeat e2e test --- test/e2e/asg-lifecycle-sqs-heartbeat-test | 222 ++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100755 test/e2e/asg-lifecycle-sqs-heartbeat-test diff --git a/test/e2e/asg-lifecycle-sqs-heartbeat-test b/test/e2e/asg-lifecycle-sqs-heartbeat-test new file mode 100755 index 00000000..ea2e2ead --- /dev/null +++ b/test/e2e/asg-lifecycle-sqs-heartbeat-test @@ -0,0 +1,222 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + + +function fail_and_exit { + echo "❌ ASG Lifecycle SQS Heartbeat Test failed $CLUSTER_NAME ❌" + exit "${1:-1}" +} + +echo "Starting ASG Lifecycle SQS Heartbeat Test for Node Termination Handler" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") + +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" + +common_helm_args=() + +localstack_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --set nodeSelector."${NTH_CONTROL_LABEL}" + --set defaultRegion="${AWS_REGION}" + --wait +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'" +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod ${localstack_pod}" +run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "${RUN_INSTANCE_CMD}") +private_dns_name=$(echo "${run_instances_resp}" | jq -r '.Instances[] .PrivateDnsName') +instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') +echo "🥑 Started mock EC2 instance ($instance_id) w/ private DNS name: ${private_dns_name}" +set -x +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl) +set +x + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + --namespace kube-system + "$CLUSTER_NAME-acth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" +# --set completeLifecycleActionDelaySeconds=-1 + --set heartbeatInterval=30 + --set heartbeatUntil=100 + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set nodeSelector."${NTH_CONTROL_LABEL}" + --set tolerations[0].operator=Exists + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set queueURL="${queue_url}" + --wait +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" + --wait +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 $TAINT_CHECK_CYCLES); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + echo "Setup Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + +ASG_TERMINATE_EVENT=$(cat < /dev/null; then + echo "✅ Verified the worker node was cordoned!" + cordoned=1 + fi + + if [[ $cordoned -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then + echo "✅ Verified the regular-pod-test pod was evicted!" + echo "✅ ASG Lifecycle SQS Test Passed $CLUSTER_NAME! ✅" + exit 0 + fi + echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" +else + echo "❌ regular-pod-test was not evicted" +fi + +fail_and_exit 1 From d7f8e07f40fea42728c24847900f0d25fa1f69cd Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 6 Jan 2025 11:29:19 -0800 Subject: [PATCH 05/29] Remove error handling for using heartbeat and imds together --- pkg/config/config.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 284c1d9a..8381f80e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -286,19 +286,13 @@ func ParseCliArgs() (config Config, err error) { return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) } if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { - return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 30 and 172800 seconds", config.HeartbeatUntil) + return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) } - if config.EnableSQSTerminationDraining { - if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { - config.HeartbeatUntil = 172800 - log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) - } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { - return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") - } - } else { - if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { - return config, fmt.Errorf("heartbeat is only supported for Queue Processor mode") - } + if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { + config.HeartbeatUntil = 172800 + log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) + } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") } // client-go expects these to be set in env vars From a6cfd89f95f40226efab78678a227618279eb786 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 6 Jan 2025 18:26:43 -0800 Subject: [PATCH 06/29] add e2e test for lifecycle heartbeat --- test/e2e/asg-lifecycle-sqs-heartbeat-test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/asg-lifecycle-sqs-heartbeat-test b/test/e2e/asg-lifecycle-sqs-heartbeat-test index ea2e2ead..be5202a7 100755 --- a/test/e2e/asg-lifecycle-sqs-heartbeat-test +++ b/test/e2e/asg-lifecycle-sqs-heartbeat-test @@ -64,7 +64,7 @@ anth_helm_args=( --namespace kube-system "$CLUSTER_NAME-acth" "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" -# --set completeLifecycleActionDelaySeconds=-1 + --set completeLifecycleActionDelaySeconds=120 --set heartbeatInterval=30 --set heartbeatUntil=100 --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" From 64e9cffb2298acae7a89e7fec7a4bf6e2d9503bf Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 6 Jan 2025 18:28:42 -0800 Subject: [PATCH 07/29] Add check heartbeat timeout and compare to heartbeat interval --- pkg/monitor/sqsevent/asg-lifecycle-event.go | 56 ++++++++++++++++----- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index 3c3aaec8..109fe107 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -112,11 +112,14 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { nthConfig := n.GetNthConfig() - go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) + if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 { + go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail) + go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) + } err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID) if err != nil { - log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) + log.Err(err).Msgf("unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) } return nil } @@ -124,6 +127,35 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m return &interruptionEvent, nil } +// Compare the heartbeatInterval with the heartbeat timeout and warn if (heartbeatInterval >= heartbeat timeout) +func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail *LifecycleDetail) { + input := &autoscaling.DescribeLifecycleHooksInput{ + AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), + LifecycleHookNames: []*string{aws.String(lifecycleDetail.LifecycleHookName)}, + } + + lifecyclehook, err := m.ASG.DescribeLifecycleHooks(input) + if err != nil { + log.Err(err).Msg("failed to describe lifecycle hook") + return + } + + if len(lifecyclehook.LifecycleHooks) == 0 { + log.Warn(). + Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Msg("Tried to check heartbeat timeout, but no lifecycle hook found from ASG") + return + } + + heartbeatTimeout := int(*lifecyclehook.LifecycleHooks[0].HeartbeatTimeout) + + if heartbeatInterval >= heartbeatTimeout { + log.Warn().Msgf("Heartbeat interval (%d seconds) is equal to or greater than the heartbeat timeout (%d seconds) for the lifecycle hook %s. The node would likely be terminated before the heartbeat is sent", heartbeatInterval, heartbeatTimeout, *lifecyclehook.LifecycleHooks[0].LifecycleHookName) + } +} + +// Issue lifecycle heartbeats to reset the heartbeat timeout timer in ASG func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) { ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second) defer ticker.Stop() @@ -142,18 +174,18 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li } } -func (m SQSMonitor) recordLifecycleActionHeartbeat(LifecycleDetail *LifecycleDetail) { +func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) { input := &autoscaling.RecordLifecycleActionHeartbeatInput{ - AutoScalingGroupName: aws.String(LifecycleDetail.AutoScalingGroupName), - LifecycleHookName: aws.String(LifecycleDetail.LifecycleHookName), - LifecycleActionToken: aws.String(LifecycleDetail.LifecycleActionToken), - InstanceId: aws.String(LifecycleDetail.EC2InstanceID), + AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), + LifecycleHookName: aws.String(lifecycleDetail.LifecycleHookName), + LifecycleActionToken: aws.String(lifecycleDetail.LifecycleActionToken), + InstanceId: aws.String(lifecycleDetail.EC2InstanceID), } - log.Info().Str("asgName", LifecycleDetail.AutoScalingGroupName). - Str("lifecycleHookName", LifecycleDetail.LifecycleHookName). - Str("lifecycleActionToken", LifecycleDetail.LifecycleActionToken). - Str("instanceID", LifecycleDetail.EC2InstanceID). + log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). + Str("instanceID", lifecycleDetail.EC2InstanceID). Msg("Sending lifecycle heartbeat") _, err := m.ASG.RecordLifecycleActionHeartbeat(input) @@ -173,7 +205,7 @@ func (m SQSMonitor) deleteMessage(message *sqs.Message) error { return nil } -// Continues the lifecycle hook thereby indicating a successful action occured +// Continues the lifecycle hook thereby indicating a successful action occurred func (m SQSMonitor) continueLifecycleAction(lifecycleDetail *LifecycleDetail) (*autoscaling.CompleteLifecycleActionOutput, error) { return m.completeLifecycleAction(&autoscaling.CompleteLifecycleActionInput{ AutoScalingGroupName: &lifecycleDetail.AutoScalingGroupName, From d3047a06229f498886385038a6d727f9df910f1e Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Mon, 6 Jan 2025 19:10:01 -0800 Subject: [PATCH 08/29] Add error handling for using heartbeat and imds together --- pkg/config/config.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 8381f80e..67e09b33 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -288,11 +288,17 @@ func ParseCliArgs() (config Config, err error) { if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) } - if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { - config.HeartbeatUntil = 172800 - log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) - } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { - return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") + if config.EnableSQSTerminationDraining { + if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { + config.HeartbeatUntil = 172800 + log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) + } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") + } + } else { + if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { + return config, fmt.Errorf("heartbeat is only supported for Queue Processor mode") + } } // client-go expects these to be set in env vars From 559adc34ec20721b2508e70740ea878582ee0ac9 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Tue, 7 Jan 2025 12:56:16 -0800 Subject: [PATCH 09/29] fix config error message --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 67e09b33..350467cb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -297,7 +297,7 @@ func ParseCliArgs() (config Config, err error) { } } else { if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { - return config, fmt.Errorf("heartbeat is only supported for Queue Processor mode") + return config, fmt.Errorf("currently using IMDS mode. Heartbeat is only supported for Queue Processor mode") } } From 7012babaa04597fbefe0661ac6d785510665066a Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Tue, 7 Jan 2025 13:37:00 -0800 Subject: [PATCH 10/29] update error message for heartbeat config --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 350467cb..7b85ae98 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -293,7 +293,7 @@ func ParseCliArgs() (config Config, err error) { config.HeartbeatUntil = 172800 log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { - return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval must be set to activate heartbeat") + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval is required when heartbeat-until is set") } } else { if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { From bc79eb7dcb926c720930ee53c8f1adfc4ac3585b Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Tue, 7 Jan 2025 19:15:33 -0800 Subject: [PATCH 11/29] Fix heartbeat flag explanation --- pkg/config/config.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 7b85ae98..e87066cb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -235,8 +235,8 @@ func ParseCliArgs() (config Config, err error) { flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.") flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.") flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.") - flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period between consecutive heartbeat signals. It is measured in seconds. Minimum 30 seconds and maximum 3600 seconds.") - flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The length of time over which the heartbeat signals are sent out. It is measured in seconds. Minimum 60 seconds and maximum 172800 seconds.") + flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).") + flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).") flag.Parse() if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { @@ -282,13 +282,13 @@ func ParseCliArgs() (config Config, err error) { } // heartbeat value boundary and compability check - if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { - return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) - } - if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { - return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) - } if config.EnableSQSTerminationDraining { + if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { + return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) + } + if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { + return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) + } if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { config.HeartbeatUntil = 172800 log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) From 75400a9712911a192364498fcd176737c6dbc1a6 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Tue, 7 Jan 2025 19:16:13 -0800 Subject: [PATCH 12/29] Update readme for new heartbeat feature --- README.md | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/README.md b/README.md index 9ba2c0ca..d7e6c035 100644 --- a/README.md +++ b/README.md @@ -612,6 +612,72 @@ extraScrapeConfigs: | In IMDS mode, metrics can be collected as follows: - Use a `podMonitor` custom resource with the Prometheus Operator to collect metrics. +## Issuing Lifecycle Heartbeats + +You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. + +### How it works + +- When NTH receives an ASG lifecycle termination event, it starts sending heartbeats to ASG to renew the heartbeat timeout associated with the ASG's termination lifecycle hook. +- The heartbeat timeout acts as a timer that starts when the termination event begins. +- Before the timeout reaches zero, the termination process is halted at the `Terminating:Wait` stage. +- Previously, NTH couldn't issue heartbeats, limiting the maximum time for preventing termination to the maximum heartbeat timeout (7200 seconds). +- Now, the graceful termination duration can be extended up to 48 hours, limited by the global timeout. + +### How to use + +- Specify values for `Heartbeat Interval` (required) and `Heartbeat Until` (optional). + +### Configurations +#### Heartbeat Interval +- Time period between consecutive heartbeat signals (in seconds) +- Range: 30 to 3600 seconds (30 seconds to 1 hour) +- Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` +- CLI flag: `heartbeat-interval` + +#### Heartbeat Until +- Duration over which heartbeat signals are sent (in seconds) +- Range: 60 to 172800 seconds (1 minute to 48 hours) +- Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` +- CLI flag: `heartbeat-Until` + +#### Example Case + +- Heartbeat Interval: 1000 seconds +- Heartbeat Until: 4500 seconds +- Heartbeat Timeout: 3000 seconds + +| Time (s) | Event | Heartbeat Timeout (HT) | Heartbeat Until (HU) | Action | +|----------|-------------|------------------|----------------------|--------| +| 0 | Start | 3000 | 4500 | Termination Event Received | +| 1000 | HB1 Issued | 2000 -> 3000 | 3500 | Send Heartbeat | +| 2000 | HB2 Issued | 2000 -> 3000 | 2500 | Send Heartbeat | +| 3000 | HB3 Issued | 2000 -> 3000 | 1500 | Send Heartbeat | +| 4000 | HB4 Issued | 2000 -> 3000 | 500 | Send Heartbeat | +| 4500 | HB Expires | 2500 | 0 | Stop Heartbeats | +| 7000 | Termination | - | - | Instance Terminates | + +Note: The instance can terminate earlier if its pods finish draining and are ready for termination. + +### Example Helm Command + +```sh +helm upgrade --install aws-node-termination-handler \ + --namespace kube-system \ + --set enableSqsTerminationDraining=true \ + --set heartbeatInterval=1000 \ + --set heartbeatUntil=4500 \ + // other inputs.. +``` + +### Important Notes + +- A lifecycle hook for instance termination is required for this feature. Longer grace periods are achieved by renewing the heartbeat timeout of the ASG's lifecycle hook. Instances terminate instantly without a hook. + +- Issuing lifecycle heartbeats is only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to do this will cause NTH to fail. + +- The heartbeat interval should be sufficiently smaller than the heartbeat timeout. There's a time gap between instance start and NTH start. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the heartbeat is issued. Provide enough buffer for NTH to finish initializing. + ## Communication * If you've run into a bug or have a new feature request, please open an [issue](https://github.com/aws/aws-node-termination-handler/issues/new). * You can also chat with us in the [Kubernetes Slack](https://kubernetes.slack.com) in the `#provider-aws` channel From bbddcfa58d7ea8604117b76c7cf0168439e1ab55 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Tue, 7 Jan 2025 19:20:35 -0800 Subject: [PATCH 13/29] Fix readme for heartbeat section --- README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index d7e6c035..f558c8b6 100644 --- a/README.md +++ b/README.md @@ -629,23 +629,23 @@ You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows f - Specify values for `Heartbeat Interval` (required) and `Heartbeat Until` (optional). ### Configurations -#### Heartbeat Interval +#### `Heartbeat Interval` - Time period between consecutive heartbeat signals (in seconds) - Range: 30 to 3600 seconds (30 seconds to 1 hour) - Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` - CLI flag: `heartbeat-interval` -#### Heartbeat Until +#### `Heartbeat Until` - Duration over which heartbeat signals are sent (in seconds) - Range: 60 to 172800 seconds (1 minute to 48 hours) - Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` -- CLI flag: `heartbeat-Until` +- CLI flag: `heartbeat-until` #### Example Case -- Heartbeat Interval: 1000 seconds -- Heartbeat Until: 4500 seconds -- Heartbeat Timeout: 3000 seconds +- `Heartbeat Interval`: 1000 seconds +- `Heartbeat Until`: 4500 seconds +- `Heartbeat Timeout`: 3000 seconds | Time (s) | Event | Heartbeat Timeout (HT) | Heartbeat Until (HU) | Action | |----------|-------------|------------------|----------------------|--------| @@ -673,9 +673,7 @@ helm upgrade --install aws-node-termination-handler \ ### Important Notes - A lifecycle hook for instance termination is required for this feature. Longer grace periods are achieved by renewing the heartbeat timeout of the ASG's lifecycle hook. Instances terminate instantly without a hook. - - Issuing lifecycle heartbeats is only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to do this will cause NTH to fail. - - The heartbeat interval should be sufficiently smaller than the heartbeat timeout. There's a time gap between instance start and NTH start. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the heartbeat is issued. Provide enough buffer for NTH to finish initializing. ## Communication From 029fdf7a11d4107153ec6fca7db92d4536cad1d4 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 9 Jan 2025 16:20:16 -0800 Subject: [PATCH 14/29] Update readme on the concurrency of heartbeat --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index f558c8b6..9302521b 100644 --- a/README.md +++ b/README.md @@ -675,6 +675,7 @@ helm upgrade --install aws-node-termination-handler \ - A lifecycle hook for instance termination is required for this feature. Longer grace periods are achieved by renewing the heartbeat timeout of the ASG's lifecycle hook. Instances terminate instantly without a hook. - Issuing lifecycle heartbeats is only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to do this will cause NTH to fail. - The heartbeat interval should be sufficiently smaller than the heartbeat timeout. There's a time gap between instance start and NTH start. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the heartbeat is issued. Provide enough buffer for NTH to finish initializing. +- Issuing heartbeats is part of the termination process. The maximum number of instances that NTH can handle termination concurrently is limited by the number of workers. This implies that heartbeats can only be issued for up to the number of instances specified by the `workers` flag simultaneously. ## Communication * If you've run into a bug or have a new feature request, please open an [issue](https://github.com/aws/aws-node-termination-handler/issues/new). From 56b3f5549434347f82ae1dc7cec644d3c9db0c6f Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 9 Jan 2025 17:40:42 -0800 Subject: [PATCH 15/29] fix: stop heartbeat when target is invalid --- pkg/monitor/sqsevent/asg-lifecycle-event.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index 109fe107..74d19a84 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -15,12 +15,14 @@ package sqsevent import ( "encoding/json" + "errors" "fmt" "time" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/aws/aws-node-termination-handler/pkg/node" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/sqs" "github.com/rs/zerolog/log" @@ -166,7 +168,11 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li case <-stopCh: return case <-ticker.C: - m.recordLifecycleActionHeartbeat(lifecycleDetail) + err := m.recordLifecycleActionHeartbeat(lifecycleDetail) + if err != nil { + log.Err(err).Msg("invalid heartbeat target, stopping heartbeat") + return + } case <-timeout: log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") return @@ -174,7 +180,7 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li } } -func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) { +func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) error { input := &autoscaling.RecordLifecycleActionHeartbeatInput{ AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), LifecycleHookName: aws.String(lifecycleDetail.LifecycleHookName), @@ -188,13 +194,19 @@ func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDet Str("instanceID", lifecycleDetail.EC2InstanceID). Msg("Sending lifecycle heartbeat") + // Stop the heartbeat if the target is invalid _, err := m.ASG.RecordLifecycleActionHeartbeat(input) if err != nil { - log.Err(err).Msg("Failed to send lifecycle heartbeat") - return + var awsErr awserr.Error + log.Warn().Err(err).Msg("Failed to send lifecycle heartbeat") + if errors.As(err, &awsErr) && awsErr.Code() == "ValidationError" { + return err + } + return nil } log.Info().Msg("Successfully sent lifecycle heartbeat") + return nil } func (m SQSMonitor) deleteMessage(message *sqs.Message) error { From 7221ed2c83c33b2a0a6fccb994b8e231ba899f78 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 9 Jan 2025 17:59:54 -0800 Subject: [PATCH 16/29] Added heartbeat test for handling invalid lifecycle action --- pkg/monitor/sqsevent/sqs-monitor_test.go | 31 +++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index 751749ca..ccc9a285 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -965,7 +965,7 @@ func TestSendHeartbeats_NormalClosure(t *testing.T) { h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount) } -func TestSendHeartbeats_ErrorASG(t *testing.T) { +func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, RecordLifecycleActionHeartbeatErr: awserr.NewRequestFailure(aws.ErrThrottling, 400, "bad-request"), @@ -994,6 +994,35 @@ func TestSendHeartbeats_ErrorASG(t *testing.T) { h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount) } +func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { + asgMock := h.MockedASG{ + RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, + RecordLifecycleActionHeartbeatErr: awserr.NewRequestFailure(aws.ErrValidation, 400, "bad-request"), + } + + sqsMonitor := sqsevent.SQSMonitor{ + ASG: asgMock, + } + + lifecycleDetail := sqsevent.LifecycleDetail{ + LifecycleHookName: "test-hook", + AutoScalingGroupName: "test-asg", + LifecycleActionToken: "test-token", + EC2InstanceID: "XXXXXXXXXXXX", + } + + stopHeartbeatCh := make(chan struct{}) + + go func() { + time.Sleep(10 * time.Second) + close(stopHeartbeatCh) + }() + + sqsMonitor.SendHeartbeats(1, 6, &lifecycleDetail, stopHeartbeatCh) + + h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + // AWS Mock Helpers specific to sqs-monitor tests func getDescribeInstancesResp(privateDNSName string, withASGTag bool, withManagedTag bool) ec2.DescribeInstancesOutput { From 4bcb9163734afff6b4f9b4b0eb88e4fcdf49b59b Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Fri, 10 Jan 2025 11:28:05 -0800 Subject: [PATCH 17/29] incorporated unsupoorted error types for unit testing --- pkg/monitor/sqsevent/sqs-monitor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index ccc9a285..90dbf406 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -968,7 +968,7 @@ func TestSendHeartbeats_NormalClosure(t *testing.T) { func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: awserr.NewRequestFailure(aws.ErrThrottling, 400, "bad-request"), + RecordLifecycleActionHeartbeatErr: awserr.New("ThrottlingException", "Rate exceeded", nil), } sqsMonitor := sqsevent.SQSMonitor{ @@ -997,7 +997,7 @@ func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: awserr.NewRequestFailure(aws.ErrValidation, 400, "bad-request"), + RecordLifecycleActionHeartbeatErr: awserr.New("ValidationError", "No active Lifecycle Action found", nil), } sqsMonitor := sqsevent.SQSMonitor{ From 4ff40d9aa4ea7b2445a2a28a6eb5eb653d74b2e0 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Fri, 10 Jan 2025 11:38:37 -0800 Subject: [PATCH 18/29] fix unit-test: reset heartbeatCallCount each test --- pkg/monitor/sqsevent/sqs-monitor_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index 90dbf406..28d337d0 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -908,6 +908,8 @@ func TestMonitor_InstanceNotManaged(t *testing.T) { } func TestSendHeartbeats_EarlyClosure(t *testing.T) { + h.HeartbeatCallCount = 0 + asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, RecordLifecycleActionHeartbeatErr: nil, @@ -937,6 +939,8 @@ func TestSendHeartbeats_EarlyClosure(t *testing.T) { } func TestSendHeartbeats_NormalClosure(t *testing.T) { + h.HeartbeatCallCount = 0 + asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, RecordLifecycleActionHeartbeatErr: nil, @@ -966,6 +970,8 @@ func TestSendHeartbeats_NormalClosure(t *testing.T) { } func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { + h.HeartbeatCallCount = 0 + asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, RecordLifecycleActionHeartbeatErr: awserr.New("ThrottlingException", "Rate exceeded", nil), @@ -995,6 +1001,8 @@ func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { } func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { + h.HeartbeatCallCount = 0 + asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, RecordLifecycleActionHeartbeatErr: awserr.New("ValidationError", "No active Lifecycle Action found", nil), From 265828df10716ee4619424fcfae1bb4b131034ab Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Fri, 17 Jan 2025 13:13:28 -0800 Subject: [PATCH 19/29] use helper function to reduce repetitive code in heartbeat unit test --- pkg/monitor/sqsevent/sqs-monitor_test.go | 140 +++++++++-------------- pkg/test/aws-mocks.go | 4 + 2 files changed, 59 insertions(+), 85 deletions(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index 28d337d0..def86e4a 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/aws/aws-node-termination-handler/pkg/config" "github.com/aws/aws-node-termination-handler/pkg/monitor" "github.com/aws/aws-node-termination-handler/pkg/monitor/sqsevent" "github.com/aws/aws-node-termination-handler/pkg/node" @@ -907,132 +908,101 @@ func TestMonitor_InstanceNotManaged(t *testing.T) { } } -func TestSendHeartbeats_EarlyClosure(t *testing.T) { - h.HeartbeatCallCount = 0 +func TestHeartbeat_closeHeartbeatCh(t *testing.T) { + msg, err := getSQSMessageFromEvent(asgLifecycleEvent) + h.Ok(t, err) + sqsMock := h.MockedSQS{ + ReceiveMessageResp: sqs.ReceiveMessageOutput{Messages: []*sqs.Message{&msg}}, + ReceiveMessageErr: nil, + DeleteMessageResp: sqs.DeleteMessageOutput{}, + } + dnsNodeName := "ip-10-0-0-157.us-east-2.compute.internal" + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(dnsNodeName, true, true), + } asgMock := h.MockedASG{ - RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: nil, + CompleteLifecycleActionResp: autoscaling.CompleteLifecycleActionOutput{}, } + drainChan := make(chan monitor.InterruptionEvent, 1) sqsMonitor := sqsevent.SQSMonitor{ - ASG: asgMock, + SQS: sqsMock, + EC2: ec2Mock, + ManagedTag: "aws-node-termination-handler/managed", + ASG: asgMock, + CheckIfManaged: true, + QueueURL: "https://test-queue", + InterruptionChan: drainChan, + BeforeCompleteLifecycleAction: func() { + time.Sleep(3500 * time.Millisecond) + }, } - lifecycleDetail := sqsevent.LifecycleDetail{ - LifecycleHookName: "test-hook", - AutoScalingGroupName: "test-asg", - LifecycleActionToken: "test-token", - EC2InstanceID: "XXXXXXXXXXXX", - } + err = sqsMonitor.Monitor() + h.Ok(t, err) - stopHeartbeatCh := make(chan struct{}) + nthConfig := &config.Config{ + HeartbeatInterval: 1, + HeartbeatUntil: 5, + } + h.HeartbeatCallCount = 0 - go func() { - time.Sleep(3500 * time.Millisecond) - close(stopHeartbeatCh) - }() + testNode, _ := node.New(*nthConfig, nil) - sqsMonitor.SendHeartbeats(1, 5, &lifecycleDetail, stopHeartbeatCh) + result := <-drainChan + h.Assert(t, result.PreDrainTask != nil, "PreDrainTask should have been set") + err = result.PreDrainTask(result, *testNode) + h.Ok(t, err) + h.Assert(t, result.PostDrainTask != nil, "PostDrainTask should have been set") + err = result.PostDrainTask(result, *testNode) + h.Ok(t, err) + h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} +func TestSendHeartbeats_EarlyClosure(t *testing.T) { + heartbeatTestHelper(nil, 3500, 1, 5) h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) } func TestSendHeartbeats_NormalClosure(t *testing.T) { - h.HeartbeatCallCount = 0 - - asgMock := h.MockedASG{ - RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: nil, - } - - sqsMonitor := sqsevent.SQSMonitor{ - ASG: asgMock, - } - - lifecycleDetail := sqsevent.LifecycleDetail{ - LifecycleHookName: "test-hook", - AutoScalingGroupName: "test-asg", - LifecycleActionToken: "test-token", - EC2InstanceID: "XXXXXXXXXXXX", - } - - stopHeartbeatCh := make(chan struct{}) - - go func() { - time.Sleep(10 * time.Second) - close(stopHeartbeatCh) - }() - - sqsMonitor.SendHeartbeats(1, 5, &lifecycleDetail, stopHeartbeatCh) - + heartbeatTestHelper(nil, 10000, 1, 5) h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount) } func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { - h.HeartbeatCallCount = 0 - - asgMock := h.MockedASG{ - RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: awserr.New("ThrottlingException", "Rate exceeded", nil), - } - - sqsMonitor := sqsevent.SQSMonitor{ - ASG: asgMock, - } - - lifecycleDetail := sqsevent.LifecycleDetail{ - LifecycleHookName: "test-hook", - AutoScalingGroupName: "test-asg", - LifecycleActionToken: "test-token", - EC2InstanceID: "XXXXXXXXXXXX", - } - - stopHeartbeatCh := make(chan struct{}) - - go func() { - time.Sleep(10 * time.Second) - close(stopHeartbeatCh) - }() - - sqsMonitor.SendHeartbeats(1, 6, &lifecycleDetail, stopHeartbeatCh) - + heartbeatTestHelper(awserr.New("ThrottlingException", "Rate exceeded", nil), 10000, 1, 6) h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount) } func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { + heartbeatTestHelper(awserr.New("ValidationError", "No active Lifecycle Action found", nil), 10000, 1, 6) + h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +// AWS Mock Helpers specific to sqs-monitor tests +func heartbeatTestHelper(err error, sleepMilliSeconds int, heartbeatInterval int, heartbeatUntil int) { h.HeartbeatCallCount = 0 asgMock := h.MockedASG{ RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: awserr.New("ValidationError", "No active Lifecycle Action found", nil), + RecordLifecycleActionHeartbeatErr: err, } sqsMonitor := sqsevent.SQSMonitor{ ASG: asgMock, } - lifecycleDetail := sqsevent.LifecycleDetail{ - LifecycleHookName: "test-hook", - AutoScalingGroupName: "test-asg", - LifecycleActionToken: "test-token", - EC2InstanceID: "XXXXXXXXXXXX", - } - stopHeartbeatCh := make(chan struct{}) go func() { - time.Sleep(10 * time.Second) + time.Sleep(time.Duration(sleepMilliSeconds) * time.Millisecond) close(stopHeartbeatCh) }() - sqsMonitor.SendHeartbeats(1, 6, &lifecycleDetail, stopHeartbeatCh) - - h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount) + sqsMonitor.SendHeartbeats(heartbeatInterval, heartbeatUntil, &asgLifecycleEventFromSQS, stopHeartbeatCh) } -// AWS Mock Helpers specific to sqs-monitor tests - func getDescribeInstancesResp(privateDNSName string, withASGTag bool, withManagedTag bool) ec2.DescribeInstancesOutput { tags := []*ec2.Tag{} if withASGTag { diff --git a/pkg/test/aws-mocks.go b/pkg/test/aws-mocks.go index b00ae365..0697c6ad 100644 --- a/pkg/test/aws-mocks.go +++ b/pkg/test/aws-mocks.go @@ -94,3 +94,7 @@ func (m MockedASG) RecordLifecycleActionHeartbeat(input *autoscaling.RecordLifec } return &m.RecordLifecycleActionHeartbeatResp, nil } + +func (m MockedASG) DescribeLifecycleHooks(input *autoscaling.DescribeLifecycleHooksInput) (*autoscaling.DescribeLifecycleHooksOutput, error) { + return &autoscaling.DescribeLifecycleHooksOutput{}, nil +} From 044fc3a87653d9cf9fda56712d23f729c7fa33f3 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:25:23 -0800 Subject: [PATCH 20/29] Update readme. Moved heartbeat under Queue Processor --- README.md | 141 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 74 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index 3bf089f5..b60a6dfc 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,78 @@ When using the ASG Lifecycle Hooks, ASG first sends the lifecycle action notific #### Queue Processor with Instance State Change Events When using the EC2 Console or EC2 API to terminate the instance, a state-change notification is sent and the instance termination is started. EC2 does not wait for a "continue" signal before beginning to terminate the instance. When you terminate an EC2 instance, it should trigger a graceful operating system shutdown which will send a SIGTERM to the kubelet, which will in-turn start shutting down pods by propagating that SIGTERM to the containers on the node. If the containers do not shut down by the kubelet's `podTerminationGracePeriod (k8s default is 30s)`, then it will send a SIGKILL to forcefully terminate the containers. Setting the `podTerminationGracePeriod` to a max of 90sec (probably a bit less than that) will delay the termination of pods, which helps in graceful shutdown. +#### Issuing Lifecycle Heartbeats + +You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. + +##### How it works + +- When NTH receives an ASG lifecycle termination event, it starts sending heartbeats to ASG to renew the heartbeat timeout associated with the ASG's termination lifecycle hook. +- The heartbeat timeout acts as a timer that starts when the termination event begins. +- Before the timeout reaches zero, the termination process is halted at the `Terminating:Wait` stage. +- Previously, NTH couldn't issue heartbeats, limiting the maximum time for preventing termination to the maximum heartbeat timeout (7200 seconds). +- Now, the graceful termination duration can be extended up to 48 hours, limited by the global timeout. + +##### How to use + +- Configure a termination lifecycle hook on ASG (required). Set the heartbeat timeout value to be longer than the `Heartbeat Interval`. Each heartbeat signal resets this timeout, extending the duration that an instance remains in the `Terminating:Wait` state. Without this lifecycle hook, the instance will terminate immediately when termination event occurs. +- Configure `Heartbeat Interval` (required) and `Heartbeat Until` (optional). NTH operates normally without heartbeats if neither value is set. If only the interval is specified, `Heartbeat Until` defaults to 172800 seconds (48 hours) and heartbeats will be sent. Providing both values enables NTH to run with heartbeats. `Heartbeat Until` must be provided with a valid `Heartbeat Interval`, otherwise NTH will fail to start. Any invalid values (wrong type or out of range) will also prevent NTH from starting. + +##### Configurations +###### `Heartbeat Interval` +- Time period between consecutive heartbeat signals (in seconds) +- Specifying this value triggers heartbeat +- Range: 30 to 3600 seconds (30 seconds to 1 hour) +- Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` +- CLI flag: `heartbeat-interval` +- Required: O +- Default value: X + +###### `Heartbeat Until` +- Duration over which heartbeat signals are sent (in seconds) +- Must be provided with a valid `Heartbeat Interval` +- Range: 60 to 172800 seconds (1 minute to 48 hours) +- Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` +- CLI flag: `heartbeat-until` +- Required: X +- Default value: 172800 (48 hours) + +###### Example Case + +- `Heartbeat Interval`: 1000 seconds +- `Heartbeat Until`: 4500 seconds +- `Heartbeat Timeout`: 3000 seconds + +| Time (s) | Event | Heartbeat Timeout (HT) | Heartbeat Until (HU) | Action | +|----------|-------------|------------------|----------------------|--------| +| 0 | Start | 3000 | 4500 | Termination Event Received | +| 1000 | HB1 Issued | 2000 -> 3000 | 3500 | Send Heartbeat | +| 2000 | HB2 Issued | 2000 -> 3000 | 2500 | Send Heartbeat | +| 3000 | HB3 Issued | 2000 -> 3000 | 1500 | Send Heartbeat | +| 4000 | HB4 Issued | 2000 -> 3000 | 500 | Send Heartbeat | +| 4500 | HB Expires | 2500 | 0 | Stop Heartbeats | +| 7000 | Termination | - | - | Instance Terminates | + +Note: The instance can terminate earlier if its pods finish draining and are ready for termination. + +##### Example Helm Command + +```sh +helm upgrade --install aws-node-termination-handler \ + --namespace kube-system \ + --set enableSqsTerminationDraining=true \ + --set heartbeatInterval=1000 \ + --set heartbeatUntil=4500 \ + // other inputs.. +``` + +##### Important Notes + +- Be aware of global timeout. Instances cannot remain in a wait state indefinitely. The global timeout is 48 hours or 100 times the heartbeat timeout, whichever is smaller. This is the maximum amount of time that you can keep an instance in `terminating:wait` state. +- Lifecycle heartbeats are only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to bypass this will cause NTH to fail. +- The heartbeat interval should be sufficiently shorter than the heartbeat timeout. There's a time gap between instance startup and NTH initialization. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the first heartbeat is issued. Provide adequate buffer time for NTH to complete initialization. +- Issuing heartbeats is part of the termination process. The maximum number of instances that NTH can handle termination concurrently is limited by the number of workers. This implies that heartbeats can only be issued for up to the number of instances specified by the `workers` flag simultaneously. + ### Which one should I use? | Feature | IMDS Processor | Queue Processor | | :-------------------------------------------: | :------------: | :-------------: | @@ -91,6 +163,7 @@ When using the EC2 Console or EC2 API to terminate the instance, a state-change | ASG Termination Lifecycle State Change | ✅ | ❌ | | AZ Rebalance Recommendation | ❌ | ✅ | | Instance State Change Events | ❌ | ✅ | +| Issue Lifecycle Heartbeats | ❌ | ✅ | ### Kubernetes Compatibility @@ -617,71 +690,6 @@ extraScrapeConfigs: | In IMDS mode, metrics can be collected as follows: - Use a `podMonitor` custom resource with the Prometheus Operator to collect metrics. -## Issuing Lifecycle Heartbeats - -You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. - -### How it works - -- When NTH receives an ASG lifecycle termination event, it starts sending heartbeats to ASG to renew the heartbeat timeout associated with the ASG's termination lifecycle hook. -- The heartbeat timeout acts as a timer that starts when the termination event begins. -- Before the timeout reaches zero, the termination process is halted at the `Terminating:Wait` stage. -- Previously, NTH couldn't issue heartbeats, limiting the maximum time for preventing termination to the maximum heartbeat timeout (7200 seconds). -- Now, the graceful termination duration can be extended up to 48 hours, limited by the global timeout. - -### How to use - -- Specify values for `Heartbeat Interval` (required) and `Heartbeat Until` (optional). - -### Configurations -#### `Heartbeat Interval` -- Time period between consecutive heartbeat signals (in seconds) -- Range: 30 to 3600 seconds (30 seconds to 1 hour) -- Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` -- CLI flag: `heartbeat-interval` - -#### `Heartbeat Until` -- Duration over which heartbeat signals are sent (in seconds) -- Range: 60 to 172800 seconds (1 minute to 48 hours) -- Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` -- CLI flag: `heartbeat-until` - -#### Example Case - -- `Heartbeat Interval`: 1000 seconds -- `Heartbeat Until`: 4500 seconds -- `Heartbeat Timeout`: 3000 seconds - -| Time (s) | Event | Heartbeat Timeout (HT) | Heartbeat Until (HU) | Action | -|----------|-------------|------------------|----------------------|--------| -| 0 | Start | 3000 | 4500 | Termination Event Received | -| 1000 | HB1 Issued | 2000 -> 3000 | 3500 | Send Heartbeat | -| 2000 | HB2 Issued | 2000 -> 3000 | 2500 | Send Heartbeat | -| 3000 | HB3 Issued | 2000 -> 3000 | 1500 | Send Heartbeat | -| 4000 | HB4 Issued | 2000 -> 3000 | 500 | Send Heartbeat | -| 4500 | HB Expires | 2500 | 0 | Stop Heartbeats | -| 7000 | Termination | - | - | Instance Terminates | - -Note: The instance can terminate earlier if its pods finish draining and are ready for termination. - -### Example Helm Command - -```sh -helm upgrade --install aws-node-termination-handler \ - --namespace kube-system \ - --set enableSqsTerminationDraining=true \ - --set heartbeatInterval=1000 \ - --set heartbeatUntil=4500 \ - // other inputs.. -``` - -### Important Notes - -- A lifecycle hook for instance termination is required for this feature. Longer grace periods are achieved by renewing the heartbeat timeout of the ASG's lifecycle hook. Instances terminate instantly without a hook. -- Issuing lifecycle heartbeats is only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to do this will cause NTH to fail. -- The heartbeat interval should be sufficiently smaller than the heartbeat timeout. There's a time gap between instance start and NTH start. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the heartbeat is issued. Provide enough buffer for NTH to finish initializing. -- Issuing heartbeats is part of the termination process. The maximum number of instances that NTH can handle termination concurrently is limited by the number of workers. This implies that heartbeats can only be issued for up to the number of instances specified by the `workers` flag simultaneously. - ## Communication * If you've run into a bug or have a new feature request, please open an [issue](https://github.com/aws/aws-node-termination-handler/issues/new). * You can also chat with us in the [Kubernetes Slack](https://kubernetes.slack.com) in the `#provider-aws` channel @@ -691,5 +699,4 @@ helm upgrade --install aws-node-termination-handler \ Contributions are welcome! Please read our [guidelines](https://github.com/aws/aws-node-termination-handler/blob/main/CONTRIBUTING.md) and our [Code of Conduct](https://github.com/aws/aws-node-termination-handler/blob/main/CODE_OF_CONDUCT.md) ## License -This project is licensed under the Apache-2.0 License. - +This project is licensed under the Apache-2.0 License. \ No newline at end of file From 2732775d4cd859c33c5967178d6cd2c3108678f4 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:26:32 -0800 Subject: [PATCH 21/29] Fix config.go for better readability and check until < interval --- pkg/config/config.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index e87066cb..6e926bf4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -282,23 +282,24 @@ func ParseCliArgs() (config Config, err error) { } // heartbeat value boundary and compability check - if config.EnableSQSTerminationDraining { - if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { - return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) - } - if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { - return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) - } - if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { - config.HeartbeatUntil = 172800 - log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) - } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { - return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval is required when heartbeat-until is set") - } - } else { - if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { - return config, fmt.Errorf("currently using IMDS mode. Heartbeat is only supported for Queue Processor mode") - } + if !config.EnableSQSTerminationDraining && (config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1) { + return config, fmt.Errorf("currently using IMDS mode. Heartbeat is only supported for Queue Processor mode") + } + if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { + return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) + } + if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { + return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) + } + if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval is required when heartbeat-until is set") + } + if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { + config.HeartbeatUntil = 172800 + log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) + } + if config.HeartbeatInterval != -1 && config.HeartbeatUntil != -1 && config.HeartbeatInterval > config.HeartbeatUntil { + return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval should be less than or equal to heartbeat-until") } // client-go expects these to be set in env vars From 04929762fbc9c9d191a0da15d111564aef529173 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:27:04 -0800 Subject: [PATCH 22/29] Update heartbeat to have better logging --- pkg/monitor/sqsevent/asg-lifecycle-event.go | 42 ++++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index 74d19a84..d318f7e6 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -153,7 +153,15 @@ func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail heartbeatTimeout := int(*lifecyclehook.LifecycleHooks[0].HeartbeatTimeout) if heartbeatInterval >= heartbeatTimeout { - log.Warn().Msgf("Heartbeat interval (%d seconds) is equal to or greater than the heartbeat timeout (%d seconds) for the lifecycle hook %s. The node would likely be terminated before the heartbeat is sent", heartbeatInterval, heartbeatTimeout, *lifecyclehook.LifecycleHooks[0].LifecycleHookName) + log.Warn().Msgf( + "Heartbeat interval (%d seconds) is equal to or greater than "+ + "the heartbeat timeout (%d seconds) for the lifecycle hook %s attached to ASG %s. "+ + "The node would likely be terminated before the heartbeat is sent", + heartbeatInterval, + heartbeatTimeout, + *lifecyclehook.LifecycleHooks[0].LifecycleHookName, + *lifecyclehook.LifecycleHooks[0].AutoScalingGroupName, + ) } } @@ -166,6 +174,11 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li for { select { case <-stopCh: + log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). + Str("instanceID", lifecycleDetail.EC2InstanceID). + Msg("Successfully cordoned and drained the node, stopping heartbeat") return case <-ticker.C: err := m.recordLifecycleActionHeartbeat(lifecycleDetail) @@ -174,7 +187,11 @@ func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, li return } case <-timeout: - log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") + log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). + Str("instanceID", lifecycleDetail.EC2InstanceID). + Msg("Heartbeat deadline exceeded, stopping heartbeat") return } } @@ -188,24 +205,29 @@ func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDet InstanceId: aws.String(lifecycleDetail.EC2InstanceID), } - log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). - Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). - Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). - Str("instanceID", lifecycleDetail.EC2InstanceID). - Msg("Sending lifecycle heartbeat") - // Stop the heartbeat if the target is invalid _, err := m.ASG.RecordLifecycleActionHeartbeat(input) if err != nil { var awsErr awserr.Error - log.Warn().Err(err).Msg("Failed to send lifecycle heartbeat") + log.Warn(). + Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). + Str("instanceID", lifecycleDetail.EC2InstanceID). + Err(err). + Msg("Failed to send lifecycle heartbeat") if errors.As(err, &awsErr) && awsErr.Code() == "ValidationError" { return err } return nil } - log.Info().Msg("Successfully sent lifecycle heartbeat") + log.Info(). + Str("asgName", lifecycleDetail.AutoScalingGroupName). + Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). + Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). + Str("instanceID", lifecycleDetail.EC2InstanceID). + Msg("Successfully sent lifecycle heartbeat") return nil } From 1631bb601efd504877fa2ee1824d4445b5c857a4 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:27:36 -0800 Subject: [PATCH 23/29] Update unit test to cover whole process of heartbeat start and closure --- pkg/monitor/sqsevent/sqs-monitor_test.go | 115 +++++++++++------------ pkg/test/aws-mocks.go | 14 ++- 2 files changed, 67 insertions(+), 62 deletions(-) diff --git a/pkg/monitor/sqsevent/sqs-monitor_test.go b/pkg/monitor/sqsevent/sqs-monitor_test.go index def86e4a..1884dddc 100644 --- a/pkg/monitor/sqsevent/sqs-monitor_test.go +++ b/pkg/monitor/sqsevent/sqs-monitor_test.go @@ -908,99 +908,92 @@ func TestMonitor_InstanceNotManaged(t *testing.T) { } } -func TestHeartbeat_closeHeartbeatCh(t *testing.T) { - msg, err := getSQSMessageFromEvent(asgLifecycleEvent) +func TestSendHeartbeats_EarlyClosure(t *testing.T) { + err := heartbeatTestHelper(nil, 3500, 1, 5) h.Ok(t, err) + h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func TestSendHeartbeats_HeartbeatUntilExpire(t *testing.T) { + err := heartbeatTestHelper(nil, 8000, 1, 5) + h.Ok(t, err) + h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { + RecordLifecycleActionHeartbeatErr := awserr.New("Throttling", "Rate exceeded", nil) + err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 8000, 1, 6) + h.Ok(t, err) + h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { + RecordLifecycleActionHeartbeatErr := awserr.New("ValidationError", "No active Lifecycle Action found", nil) + err := heartbeatTestHelper(RecordLifecycleActionHeartbeatErr, 6000, 1, 4) + h.Ok(t, err) + h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount) +} + +func heartbeatTestHelper(RecordLifecycleActionHeartbeatErr error, sleepMilliSeconds int, heartbeatInterval int, heartbeatUntil int) error { + h.HeartbeatCallCount = 0 + + msg, err := getSQSMessageFromEvent(asgLifecycleEvent) + if err != nil { + return err + } sqsMock := h.MockedSQS{ ReceiveMessageResp: sqs.ReceiveMessageOutput{Messages: []*sqs.Message{&msg}}, - ReceiveMessageErr: nil, - DeleteMessageResp: sqs.DeleteMessageOutput{}, } dnsNodeName := "ip-10-0-0-157.us-east-2.compute.internal" ec2Mock := h.MockedEC2{ DescribeInstancesResp: getDescribeInstancesResp(dnsNodeName, true, true), } asgMock := h.MockedASG{ - CompleteLifecycleActionResp: autoscaling.CompleteLifecycleActionOutput{}, + CompleteLifecycleActionResp: autoscaling.CompleteLifecycleActionOutput{}, + RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, + RecordLifecycleActionHeartbeatErr: RecordLifecycleActionHeartbeatErr, + HeartbeatTimeout: 30, } - drainChan := make(chan monitor.InterruptionEvent, 1) + drainChan := make(chan monitor.InterruptionEvent, 1) sqsMonitor := sqsevent.SQSMonitor{ SQS: sqsMock, EC2: ec2Mock, - ManagedTag: "aws-node-termination-handler/managed", ASG: asgMock, - CheckIfManaged: true, - QueueURL: "https://test-queue", InterruptionChan: drainChan, BeforeCompleteLifecycleAction: func() { - time.Sleep(3500 * time.Millisecond) + time.Sleep(time.Duration(sleepMilliSeconds) * time.Millisecond) }, } - err = sqsMonitor.Monitor() - h.Ok(t, err) + if err := sqsMonitor.Monitor(); err != nil { + return err + } nthConfig := &config.Config{ - HeartbeatInterval: 1, - HeartbeatUntil: 5, + HeartbeatInterval: heartbeatInterval, + HeartbeatUntil: heartbeatUntil, } - h.HeartbeatCallCount = 0 testNode, _ := node.New(*nthConfig, nil) - result := <-drainChan - h.Assert(t, result.PreDrainTask != nil, "PreDrainTask should have been set") - err = result.PreDrainTask(result, *testNode) - h.Ok(t, err) - h.Assert(t, result.PostDrainTask != nil, "PostDrainTask should have been set") - err = result.PostDrainTask(result, *testNode) - h.Ok(t, err) - h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) -} -func TestSendHeartbeats_EarlyClosure(t *testing.T) { - heartbeatTestHelper(nil, 3500, 1, 5) - h.Assert(t, h.HeartbeatCallCount == 3, "3 Heartbeat Expected, got %d", h.HeartbeatCallCount) -} - -func TestSendHeartbeats_NormalClosure(t *testing.T) { - heartbeatTestHelper(nil, 10000, 1, 5) - h.Assert(t, h.HeartbeatCallCount == 5, "5 Heartbeat Expected, got %d", h.HeartbeatCallCount) -} - -func TestSendHeartbeats_ErrThrottlingASG(t *testing.T) { - heartbeatTestHelper(awserr.New("ThrottlingException", "Rate exceeded", nil), 10000, 1, 6) - h.Assert(t, h.HeartbeatCallCount == 6, "6 Heartbeat Expected, got %d", h.HeartbeatCallCount) -} - -func TestSendHeartbeats_ErrInvalidTarget(t *testing.T) { - heartbeatTestHelper(awserr.New("ValidationError", "No active Lifecycle Action found", nil), 10000, 1, 6) - h.Assert(t, h.HeartbeatCallCount == 1, "1 Heartbeat Expected, got %d", h.HeartbeatCallCount) -} - -// AWS Mock Helpers specific to sqs-monitor tests -func heartbeatTestHelper(err error, sleepMilliSeconds int, heartbeatInterval int, heartbeatUntil int) { - h.HeartbeatCallCount = 0 - - asgMock := h.MockedASG{ - RecordLifecycleActionHeartbeatResp: autoscaling.RecordLifecycleActionHeartbeatOutput{}, - RecordLifecycleActionHeartbeatErr: err, + if result.PreDrainTask == nil { + return fmt.Errorf("PreDrainTask should have been set") } - - sqsMonitor := sqsevent.SQSMonitor{ - ASG: asgMock, + if err := result.PreDrainTask(result, *testNode); err != nil { + return err } - stopHeartbeatCh := make(chan struct{}) - - go func() { - time.Sleep(time.Duration(sleepMilliSeconds) * time.Millisecond) - close(stopHeartbeatCh) - }() + if result.PostDrainTask == nil { + return fmt.Errorf("PostDrainTask should have been set") + } + if err := result.PostDrainTask(result, *testNode); err != nil { + return err + } - sqsMonitor.SendHeartbeats(heartbeatInterval, heartbeatUntil, &asgLifecycleEventFromSQS, stopHeartbeatCh) + return nil } func getDescribeInstancesResp(privateDNSName string, withASGTag bool, withManagedTag bool) ec2.DescribeInstancesOutput { diff --git a/pkg/test/aws-mocks.go b/pkg/test/aws-mocks.go index 0697c6ad..79626687 100644 --- a/pkg/test/aws-mocks.go +++ b/pkg/test/aws-mocks.go @@ -14,6 +14,7 @@ package test import ( + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" "github.com/aws/aws-sdk-go/service/ec2" @@ -64,6 +65,9 @@ type MockedASG struct { DescribeTagsPagesErr error RecordLifecycleActionHeartbeatResp autoscaling.RecordLifecycleActionHeartbeatOutput RecordLifecycleActionHeartbeatErr error + HeartbeatTimeout int + AutoScalingGroupName string + LifecycleHookName string } // CompleteLifecycleAction mocks the autoscaling.CompleteLifecycleAction API call @@ -96,5 +100,13 @@ func (m MockedASG) RecordLifecycleActionHeartbeat(input *autoscaling.RecordLifec } func (m MockedASG) DescribeLifecycleHooks(input *autoscaling.DescribeLifecycleHooksInput) (*autoscaling.DescribeLifecycleHooksOutput, error) { - return &autoscaling.DescribeLifecycleHooksOutput{}, nil + return &autoscaling.DescribeLifecycleHooksOutput{ + LifecycleHooks: []*autoscaling.LifecycleHook{ + { + AutoScalingGroupName: &m.AutoScalingGroupName, + LifecycleHookName: &m.LifecycleHookName, + HeartbeatTimeout: aws.Int64(int64(m.HeartbeatTimeout)), + }, + }, + }, nil } From b41751d2f759fc6f6ae82268af586bbd91e7ebf1 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:28:57 -0800 Subject: [PATCH 24/29] Update heartbeat e2e test. Auto-value calculations for future modification --- test/e2e/asg-lifecycle-sqs-heartbeat-test | 30 ++++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/test/e2e/asg-lifecycle-sqs-heartbeat-test b/test/e2e/asg-lifecycle-sqs-heartbeat-test index be5202a7..45baf78c 100755 --- a/test/e2e/asg-lifecycle-sqs-heartbeat-test +++ b/test/e2e/asg-lifecycle-sqs-heartbeat-test @@ -58,15 +58,20 @@ set +x echo "🥑 Created SQS Queue ${queue_url}" +# arguments specific to heartbeat testing +COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS=120 +HEARTBEAT_INTERVAL=30 +HEARTBEAT_UNTIL=100 + anth_helm_args=( upgrade --install --namespace kube-system "$CLUSTER_NAME-acth" "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" - --set completeLifecycleActionDelaySeconds=120 - --set heartbeatInterval=30 - --set heartbeatUntil=100 + --set completeLifecycleActionDelaySeconds=$COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS + --set heartbeatInterval=$HEARTBEAT_INTERVAL + --set heartbeatUntil=$HEARTBEAT_UNTIL --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" --set nodeSelector."${NTH_CONTROL_LABEL}" @@ -154,7 +159,7 @@ EOF ASG_TERMINATE_EVENT_ONE_LINE=$(echo "${ASG_TERMINATE_EVENT}" | tr -d '\n' |sed 's/\"/\\"/g') SEND_SQS_CMD="awslocal sqs send-message --queue-url ${queue_url} --message-body \"${ASG_TERMINATE_EVENT_ONE_LINE}\" --region ${AWS_REGION}" kubectl exec -i "${localstack_pod}" -- bash -c "$SEND_SQS_CMD" -echo "✅ Sent Spot Interruption Event to SQS queue: ${queue_url}" +echo "✅ Sent ASG Termination Event to SQS queue: ${queue_url}" NTH_POD=$(kubectl get pods -n kube-system -l app.kubernetes.io/name=aws-node-termination-handler -o jsonpath="{.items[0].metadata.name}") @@ -162,13 +167,14 @@ HEARTBEAT_COUNT=0 LAST_HEARTBEAT_LOG='' CURRENT_HEARTBEAT_LOG='' FOUND_HEARTBEAT_END_LOG=0 -HEARTBEAT_CHECK_CYCLES=8 -HEARTBEAT_CHECK_SLEEP=15 +HEARTBEAT_CHECK_CYCLES=$((($HEARTBEAT_UNTIL/$HEARTBEAT_INTERVAL+1)*2)) +HEARTBEAT_CHECK_SLEEP=$(($HEARTBEAT_INTERVAL/2)) +TARGET_HEARTBEAT_COUNT=$((($HEARTBEAT_UNTIL-5)/$HEARTBEAT_INTERVAL)) -# Localstack does not support RecordLifecycleActionHeartbeat. We currently can only check upto issuing signals. +# Localstack does not support RecordLifecycleActionHeartbeat. We currently can only check up to issuing signals. for i in $(seq 1 $HEARTBEAT_CHECK_CYCLES); do FULL_LOG=$(kubectl logs -n kube-system "${NTH_POD}") - CURRENT_HEARTBEAT_LOG=$(echo "${FULL_LOG}" | grep "Sending lifecycle heartbeat" || true) + CURRENT_HEARTBEAT_LOG=$(echo "${FULL_LOG}" | grep "Failed to send lifecycle heartbeat" || true) if [[ "$CURRENT_HEARTBEAT_LOG" != "$LAST_HEARTBEAT_LOG" ]]; then LAST_HEARTBEAT_LOG=$CURRENT_HEARTBEAT_LOG (( HEARTBEAT_COUNT+=1 )) @@ -176,20 +182,20 @@ for i in $(seq 1 $HEARTBEAT_CHECK_CYCLES); do if [[ $FOUND_HEARTBEAT_END_LOG -eq 0 ]] && kubectl logs -n kube-system "${NTH_POD}" | grep -q "Heartbeat deadline exceeded, stopping heartbeat"; then FOUND_HEARTBEAT_END_LOG=1 fi - if [[ $HEARTBEAT_COUNT -eq 3 && $FOUND_HEARTBEAT_END_LOG -eq 1 ]]; then + if [[ $HEARTBEAT_COUNT -eq $TARGET_HEARTBEAT_COUNT && $FOUND_HEARTBEAT_END_LOG -eq 1 ]]; then break fi echo "Heartbeat Loop $i/$HEARTBEAT_CHECK_CYCLES, sleeping for $HEARTBEAT_CHECK_SLEEP seconds" sleep $HEARTBEAT_CHECK_SLEEP done -if [[ $HEARTBEAT_COUNT -eq 3 && $FOUND_HEARTBEAT_END_LOG -eq 1 ]]; then +if [[ $HEARTBEAT_COUNT -eq $TARGET_HEARTBEAT_COUNT && $FOUND_HEARTBEAT_END_LOG -eq 1 ]]; then echo "✅ Verified the heartbeat was sent correct!" else if [[ $FOUND_HEARTBEAT_END_LOG -eq 0 ]]; then echo "❌ Heartbeat was not closed" fi - if [[ $HEARTBEAT_COUNT -ne 3 ]]; then + if [[ $HEARTBEAT_COUNT -ne $TARGET_HEARTBEAT_COUNT ]]; then echo "❌ Heartbeat was sent $HEARTBEAT_COUNT out of 3 times" fi fail_and_exit 3 @@ -206,7 +212,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do if [[ $cordoned -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then echo "✅ Verified the regular-pod-test pod was evicted!" - echo "✅ ASG Lifecycle SQS Test Passed $CLUSTER_NAME! ✅" + echo "✅ ASG Lifecycle SQS Test Passed with Heartbeat $CLUSTER_NAME! ✅" exit 0 fi echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" From 9e3fe771709e2ea9870c8b9bff6a563537147863 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 14:49:37 -0800 Subject: [PATCH 25/29] Add inline comment for heartbeatUntil default behavior --- pkg/monitor/sqsevent/asg-lifecycle-event.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/monitor/sqsevent/asg-lifecycle-event.go b/pkg/monitor/sqsevent/asg-lifecycle-event.go index d318f7e6..a442b824 100644 --- a/pkg/monitor/sqsevent/asg-lifecycle-event.go +++ b/pkg/monitor/sqsevent/asg-lifecycle-event.go @@ -114,6 +114,7 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { nthConfig := n.GetNthConfig() + // If only HeartbeatInterval is set, HeartbeatUntil will default to 172800. if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 { go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail) go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) From dbdeec1817808363c1bb4a8841e81bc6ca1e31e5 Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Wed, 22 Jan 2025 15:10:20 -0800 Subject: [PATCH 26/29] Fixed e2e variables to have double quotes --- test/e2e/asg-lifecycle-sqs-heartbeat-test | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/asg-lifecycle-sqs-heartbeat-test b/test/e2e/asg-lifecycle-sqs-heartbeat-test index 45baf78c..0dff00b9 100755 --- a/test/e2e/asg-lifecycle-sqs-heartbeat-test +++ b/test/e2e/asg-lifecycle-sqs-heartbeat-test @@ -69,9 +69,9 @@ anth_helm_args=( --namespace kube-system "$CLUSTER_NAME-acth" "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" - --set completeLifecycleActionDelaySeconds=$COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS - --set heartbeatInterval=$HEARTBEAT_INTERVAL - --set heartbeatUntil=$HEARTBEAT_UNTIL + --set completeLifecycleActionDelaySeconds="$COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS" + --set heartbeatInterval="$HEARTBEAT_INTERVAL" + --set heartbeatUntil="$HEARTBEAT_UNTIL" --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" --set nodeSelector."${NTH_CONTROL_LABEL}" From 80b88a48d0342d42489d762dd96a16920f5d8ebf Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 23 Jan 2025 13:09:44 -0800 Subject: [PATCH 27/29] fix readme for heartbeat --- README.md | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index b60a6dfc..1f63695e 100644 --- a/README.md +++ b/README.md @@ -83,38 +83,35 @@ When using the EC2 Console or EC2 API to terminate the instance, a state-change #### Issuing Lifecycle Heartbeats -You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. +You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. The feature is useful when pods require long time to drain or when you need a shorter heartbeat timeout with a longer grace period. ##### How it works - When NTH receives an ASG lifecycle termination event, it starts sending heartbeats to ASG to renew the heartbeat timeout associated with the ASG's termination lifecycle hook. - The heartbeat timeout acts as a timer that starts when the termination event begins. - Before the timeout reaches zero, the termination process is halted at the `Terminating:Wait` stage. -- Previously, NTH couldn't issue heartbeats, limiting the maximum time for preventing termination to the maximum heartbeat timeout (7200 seconds). -- Now, the graceful termination duration can be extended up to 48 hours, limited by the global timeout. +- By issuing heartbeats, graceful termination duration can be extended up to 48 hours, limited by the global timeout. ##### How to use - Configure a termination lifecycle hook on ASG (required). Set the heartbeat timeout value to be longer than the `Heartbeat Interval`. Each heartbeat signal resets this timeout, extending the duration that an instance remains in the `Terminating:Wait` state. Without this lifecycle hook, the instance will terminate immediately when termination event occurs. -- Configure `Heartbeat Interval` (required) and `Heartbeat Until` (optional). NTH operates normally without heartbeats if neither value is set. If only the interval is specified, `Heartbeat Until` defaults to 172800 seconds (48 hours) and heartbeats will be sent. Providing both values enables NTH to run with heartbeats. `Heartbeat Until` must be provided with a valid `Heartbeat Interval`, otherwise NTH will fail to start. Any invalid values (wrong type or out of range) will also prevent NTH from starting. +- Configure `Heartbeat Interval` (required) and `Heartbeat Until` (optional). NTH operates normally without heartbeats if neither value is set. If only the interval is specified, `Heartbeat Until` defaults to 172800 seconds (48 hours) and heartbeats will be sent. `Heartbeat Until` must be provided with a valid `Heartbeat Interval`, otherwise NTH will fail to start. Any invalid values (wrong type or out of range) will also prevent NTH from starting. ##### Configurations -###### `Heartbeat Interval` +###### `Heartbeat Interval` (Required) - Time period between consecutive heartbeat signals (in seconds) - Specifying this value triggers heartbeat - Range: 30 to 3600 seconds (30 seconds to 1 hour) - Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` - CLI flag: `heartbeat-interval` -- Required: O - Default value: X -###### `Heartbeat Until` +###### `Heartbeat Until` (Optional) - Duration over which heartbeat signals are sent (in seconds) - Must be provided with a valid `Heartbeat Interval` - Range: 60 to 172800 seconds (1 minute to 48 hours) - Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` - CLI flag: `heartbeat-until` -- Required: X - Default value: 172800 (48 hours) ###### Example Case From 9c54964ef0e9880f14df673c7dda7d3d64865e0f Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 23 Jan 2025 13:10:15 -0800 Subject: [PATCH 28/29] Added new flags in config test --- pkg/config/config_test.go | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index c411b9fd..8b7e2399 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -37,7 +37,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { t.Setenv("ENABLE_SCHEDULED_EVENT_DRAINING", "true") t.Setenv("ENABLE_SPOT_INTERRUPTION_DRAINING", "false") t.Setenv("ENABLE_ASG_LIFECYCLE_DRAINING", "false") - t.Setenv("ENABLE_SQS_TERMINATION_DRAINING", "false") + t.Setenv("ENABLE_SQS_TERMINATION_DRAINING", "true") t.Setenv("ENABLE_REBALANCE_MONITORING", "true") t.Setenv("ENABLE_REBALANCE_DRAINING", "true") t.Setenv("GRACE_PERIOD", "12345") @@ -54,6 +54,8 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { t.Setenv("METADATA_TRIES", "100") t.Setenv("CORDON_ONLY", "false") t.Setenv("USE_APISERVER_CACHE", "true") + t.Setenv("HEARTBEAT_INTERVAL", "30") + t.Setenv("HEARTBEAT_UNTIL", "60") nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -64,7 +66,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { h.Equals(t, true, nthConfig.EnableScheduledEventDraining) h.Equals(t, false, nthConfig.EnableSpotInterruptionDraining) h.Equals(t, false, nthConfig.EnableASGLifecycleDraining) - h.Equals(t, false, nthConfig.EnableSQSTerminationDraining) + h.Equals(t, true, nthConfig.EnableSQSTerminationDraining) h.Equals(t, true, nthConfig.EnableRebalanceMonitoring) h.Equals(t, true, nthConfig.EnableRebalanceDraining) h.Equals(t, false, nthConfig.IgnoreDaemonSets) @@ -80,6 +82,8 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { h.Equals(t, 100, nthConfig.MetadataTries) h.Equals(t, false, nthConfig.CordonOnly) h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods) + h.Equals(t, 30, nthConfig.HeartbeatInterval) + h.Equals(t, 60, nthConfig.HeartbeatUntil) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -101,7 +105,7 @@ func TestParseCliArgsSuccess(t *testing.T) { "--enable-scheduled-event-draining=true", "--enable-spot-interruption-draining=false", "--enable-asg-lifecycle-draining=false", - "--enable-sqs-termination-draining=false", + "--enable-sqs-termination-draining=true", "--enable-rebalance-monitoring=true", "--enable-rebalance-draining=true", "--ignore-daemon-sets=false", @@ -117,6 +121,8 @@ func TestParseCliArgsSuccess(t *testing.T) { "--metadata-tries=100", "--cordon-only=false", "--use-apiserver-cache=true", + "--heartbeat-interval=30", + "--heartbeat-until=60", } nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -128,7 +134,7 @@ func TestParseCliArgsSuccess(t *testing.T) { h.Equals(t, true, nthConfig.EnableScheduledEventDraining) h.Equals(t, false, nthConfig.EnableSpotInterruptionDraining) h.Equals(t, false, nthConfig.EnableASGLifecycleDraining) - h.Equals(t, false, nthConfig.EnableSQSTerminationDraining) + h.Equals(t, true, nthConfig.EnableSQSTerminationDraining) h.Equals(t, true, nthConfig.EnableRebalanceMonitoring) h.Equals(t, true, nthConfig.EnableRebalanceDraining) h.Equals(t, false, nthConfig.IgnoreDaemonSets) @@ -145,6 +151,8 @@ func TestParseCliArgsSuccess(t *testing.T) { h.Equals(t, false, nthConfig.CordonOnly) h.Equals(t, false, nthConfig.EnablePrometheus) h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods) + h.Equals(t, 30, nthConfig.HeartbeatInterval) + h.Equals(t, 60, nthConfig.HeartbeatUntil) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -176,6 +184,9 @@ func TestParseCliArgsOverrides(t *testing.T) { t.Setenv("WEBHOOK_TEMPLATE", "no") t.Setenv("METADATA_TRIES", "100") t.Setenv("CORDON_ONLY", "true") + t.Setenv("HEARTBEAT_INTERVAL", "3601") + t.Setenv("HEARTBEAT_UNTIL", "172801") + os.Args = []string{ "cmd", "--use-provider-id=false", @@ -201,6 +212,8 @@ func TestParseCliArgsOverrides(t *testing.T) { "--cordon-only=false", "--enable-prometheus-server=true", "--prometheus-server-port=2112", + "--heartbeat-interval=3600", + "--heartbeat-until=172800", } nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -229,6 +242,8 @@ func TestParseCliArgsOverrides(t *testing.T) { h.Equals(t, false, nthConfig.CordonOnly) h.Equals(t, true, nthConfig.EnablePrometheus) h.Equals(t, 2112, nthConfig.PrometheusPort) + h.Equals(t, 3600, nthConfig.HeartbeatInterval) + h.Equals(t, 172800, nthConfig.HeartbeatUntil) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") From 56ea41dd8089561e00f46e637ca4166c4d8ad8fd Mon Sep 17 00:00:00 2001 From: Heeyoung Jung Date: Thu, 23 Jan 2025 13:10:36 -0800 Subject: [PATCH 29/29] Fixed typo in heartbeat e2e test --- test/e2e/asg-lifecycle-sqs-heartbeat-test | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/e2e/asg-lifecycle-sqs-heartbeat-test b/test/e2e/asg-lifecycle-sqs-heartbeat-test index 0dff00b9..6b0afc2e 100755 --- a/test/e2e/asg-lifecycle-sqs-heartbeat-test +++ b/test/e2e/asg-lifecycle-sqs-heartbeat-test @@ -133,7 +133,7 @@ if [[ $DEPLOYED -eq 0 ]]; then fail_and_exit 2 fi -ASG_TERMINATE_EVENT=$(cat <