-
Notifications
You must be signed in to change notification settings - Fork 268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add/lifecycle heartbeat #1116
base: main
Are you sure you want to change the base?
Add/lifecycle heartbeat #1116
Changes from all commits
d991814
0e4b686
1fbd7cb
d0f1ef4
df0696f
d7f8e07
a6cfd89
64e9cff
d3047a0
559adc3
7012bab
bc79eb7
75400a9
bbddcfa
029fdf7
56b3f55
7221ed2
4bcb916
4ff40d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,14 @@ package sqsevent | |
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/aws/aws-node-termination-handler/pkg/monitor" | ||
"github.com/aws/aws-node-termination-handler/pkg/node" | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/awserr" | ||
"github.com/aws/aws-sdk-go/service/autoscaling" | ||
"github.com/aws/aws-sdk-go/service/sqs" | ||
"github.com/rs/zerolog/log" | ||
|
@@ -95,26 +98,117 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m | |
Description: fmt.Sprintf("ASG Lifecycle Termination event received. Instance will be interrupted at %s \n", event.getTime()), | ||
} | ||
|
||
stopHeartbeatCh := make(chan struct{}) | ||
|
||
interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error { | ||
|
||
_, err = m.continueLifecycleAction(lifecycleDetail) | ||
if err != nil { | ||
return fmt.Errorf("continuing ASG termination lifecycle: %w", err) | ||
} | ||
log.Info().Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).Str("instanceID", lifecycleDetail.EC2InstanceID).Msg("Completed ASG Lifecycle Hook") | ||
|
||
close(stopHeartbeatCh) | ||
return m.deleteMessage(message) | ||
} | ||
|
||
interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { | ||
nthConfig := n.GetNthConfig() | ||
if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 { | ||
go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail) | ||
go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) | ||
} | ||
|
||
err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID) | ||
if err != nil { | ||
log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) | ||
log.Err(err).Msgf("unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) | ||
} | ||
return nil | ||
} | ||
|
||
return &interruptionEvent, nil | ||
} | ||
|
||
// Compare the heartbeatInterval with the heartbeat timeout and warn if (heartbeatInterval >= heartbeat timeout) | ||
func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail *LifecycleDetail) { | ||
input := &autoscaling.DescribeLifecycleHooksInput{ | ||
AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), | ||
LifecycleHookNames: []*string{aws.String(lifecycleDetail.LifecycleHookName)}, | ||
} | ||
|
||
lifecyclehook, err := m.ASG.DescribeLifecycleHooks(input) | ||
if err != nil { | ||
log.Err(err).Msg("failed to describe lifecycle hook") | ||
return | ||
} | ||
|
||
if len(lifecyclehook.LifecycleHooks) == 0 { | ||
log.Warn(). | ||
Str("asgName", lifecycleDetail.AutoScalingGroupName). | ||
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). | ||
Msg("Tried to check heartbeat timeout, but no lifecycle hook found from ASG") | ||
return | ||
} | ||
|
||
heartbeatTimeout := int(*lifecyclehook.LifecycleHooks[0].HeartbeatTimeout) | ||
|
||
if heartbeatInterval >= heartbeatTimeout { | ||
log.Warn().Msgf("Heartbeat interval (%d seconds) is equal to or greater than the heartbeat timeout (%d seconds) for the lifecycle hook %s. The node would likely be terminated before the heartbeat is sent", heartbeatInterval, heartbeatTimeout, *lifecyclehook.LifecycleHooks[0].LifecycleHookName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also add the ASG name (lifecycleDetail.AutoScalingGroupName) in this log warn? Just to help with debugging? |
||
} | ||
} | ||
|
||
// Issue lifecycle heartbeats to reset the heartbeat timeout timer in ASG | ||
func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) { | ||
ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second) | ||
defer ticker.Stop() | ||
timeout := time.After(time.Duration(heartbeatUntil) * time.Second) | ||
|
||
for { | ||
select { | ||
case <-stopCh: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of stopCh that isn't already covered by timeout? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh actually it makes sense. forgot it's being closed by post drain task |
||
return | ||
case <-ticker.C: | ||
err := m.recordLifecycleActionHeartbeat(lifecycleDetail) | ||
if err != nil { | ||
log.Err(err).Msg("invalid heartbeat target, stopping heartbeat") | ||
return | ||
} | ||
case <-timeout: | ||
log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) error { | ||
input := &autoscaling.RecordLifecycleActionHeartbeatInput{ | ||
AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), | ||
LifecycleHookName: aws.String(lifecycleDetail.LifecycleHookName), | ||
LifecycleActionToken: aws.String(lifecycleDetail.LifecycleActionToken), | ||
InstanceId: aws.String(lifecycleDetail.EC2InstanceID), | ||
} | ||
|
||
log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). | ||
Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). | ||
Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). | ||
Str("instanceID", lifecycleDetail.EC2InstanceID). | ||
Msg("Sending lifecycle heartbeat") | ||
|
||
// Stop the heartbeat if the target is invalid | ||
_, err := m.ASG.RecordLifecycleActionHeartbeat(input) | ||
if err != nil { | ||
var awsErr awserr.Error | ||
log.Warn().Err(err).Msg("Failed to send lifecycle heartbeat") | ||
if errors.As(err, &awsErr) && awsErr.Code() == "ValidationError" { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
log.Info().Msg("Successfully sent lifecycle heartbeat") | ||
return nil | ||
} | ||
|
||
func (m SQSMonitor) deleteMessage(message *sqs.Message) error { | ||
errs := m.deleteMessages([]*sqs.Message{message}) | ||
if errs != nil { | ||
|
@@ -123,7 +217,7 @@ func (m SQSMonitor) deleteMessage(message *sqs.Message) error { | |
return nil | ||
} | ||
|
||
// Continues the lifecycle hook thereby indicating a successful action occured | ||
// Continues the lifecycle hook thereby indicating a successful action occurred | ||
func (m SQSMonitor) continueLifecycleAction(lifecycleDetail *LifecycleDetail) (*autoscaling.CompleteLifecycleActionOutput, error) { | ||
return m.completeLifecycleAction(&autoscaling.CompleteLifecycleActionInput{ | ||
AutoScalingGroupName: &lifecycleDetail.AutoScalingGroupName, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this section is important enough that it could be described in the How to use section? The How to use section currently seems sparse.