Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
joozero authored Feb 12, 2024
2 parents a2d21d9 + 730a775 commit 4759b56
Show file tree
Hide file tree
Showing 22 changed files with 1,266 additions and 17 deletions.
195 changes: 195 additions & 0 deletions application-code/ecs-target-setter/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import boto3
import os
import logging
from botocore.config import Config
from datetime import datetime
from datetime import timedelta
from datetime import timezone

# Create logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Define config
config=Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)

# Define session and resources
session=boto3.Session()
cloudwatch=session.client('cloudwatch', config=config, region_name='us-west-2')
appautoscaling=boto3.client('application-autoscaling', config=config, region_name='us-west-2')

# Read environment variables
ecs_sqs_app_scaling_policy_name=os.environ['scaling_policy_name']
desiredLatency=int(os.environ['desired_latency'])
defaultMsgProcDuration=int(os.environ['default_msg_proc_duration'])

queueName=os.environ['queue_name']
appMetricName = os.environ['app_metric_name']
bpiMetricName=os.environ['bpi_metric_name']
metricType=os.environ['metric_type']
metricNamespace=os.environ['metric_namespace']


def publishMetricValue(metricValue):

response = cloudwatch.put_metric_data(
Namespace = metricNamespace,
MetricData = [
{
'MetricName': bpiMetricName,
'Value': metricValue,
'Dimensions': [
{
'Name': 'Type',
'Value': metricType
},
{
'Name': 'QueueName',
'Value': queueName
}
],
'StorageResolution': 1
}
]
)

def getMetricValue(metricNamespace, metricName):

# Define query
query={
'Id': 'query_123',
'MetricStat': {
'Metric': {
'Namespace': metricNamespace,
'MetricName': appMetricName,
'Dimensions': [
{
'Name': 'Type',
'Value': metricType
},
{
'Name': 'QueueName',
'Value': queueName
},
]
},
'Period': 1,
'Stat': 'Average',
}
}

response = cloudwatch.get_metric_data(
MetricDataQueries=[query],
StartTime=datetime.now(timezone.utc) - timedelta(seconds=86400),
EndTime=datetime.now(timezone.utc),
)

if not response.get('MetricDataResults')[0].get('Values'):
msgProcessingDuration=defaultMsgProcDuration
else:
values = response.get('MetricDataResults')[0].get('Values')
total = sum(values)
count = len(values)
msgProcessingDuration = total / count
print("count={} total={} msgProcessingDuration={}".format(count, total, msgProcessingDuration))
msgProcessingDuration=response.get('MetricDataResults')[0].get('Values')[0]

# Return
return msgProcessingDuration



def lambda_handler(event, context):

# Get cloudwatch metric for msg processing duration
msgProcessingDuration=getMetricValue(metricNamespace, appMetricName)
print('Most recent message processing duration is {}'.format(msgProcessingDuration))

# Calculate new target BPI (assuming latency of 5mins)
newTargetBPI =int(desiredLatency / msgProcessingDuration)
print('New Target BPI is {}'.format(newTargetBPI))

# Get aplication auto scaling policy of ECS

response = appautoscaling.describe_scaling_policies(PolicyNames=[ecs_sqs_app_scaling_policy_name], ServiceNamespace='ecs')
policies =response.get('ScalingPolicies')
policy=policies[0]


# Get target tracking config and update target value
TargetTrackingConfig=policy.get('TargetTrackingScalingPolicyConfiguration')
TargetTrackingConfig['TargetValue'] = newTargetBPI
TargetTrackingConfig['ScaleOutCooldown'] = 240
TargetTrackingConfig['ScaleInCooldown'] = 240

customMetric = {
'Metrics': [
{
'Id': 'm1',
'Label': 'Get the queue size (the number of messages waiting to be processed)',
'MetricStat': {
'Metric': {
'Dimensions': [
{
'Name': 'QueueName',
'Value': queueName
},
],
'MetricName': 'ApproximateNumberOfMessagesVisible',
'Namespace': 'AWS/SQS'
},
'Stat': 'Average'
},
'ReturnData': False
},
{
'Id': 'm2',
'Label': 'Get the ECS running task count (the number of currently running tasks)',
'MetricStat': {
'Metric': {
'Dimensions': [
{
'Name': 'ClusterName',
'Value': 'core-infra'
},
{
'Name': 'ServiceName',
'Value': 'ecsdemo-queue-proc3'
},
],
'MetricName': 'RunningTaskCount',
'Namespace': 'ECS/ContainerInsights'
},
'Stat': 'Average'
},
'ReturnData': False
},
{
'Id': 'm3',
'Label': 'Calculate the backlog per instance',
'Expression': 'm1 / m2',
'ReturnData': True
},
]
}


TargetTrackingConfig['CustomizedMetricSpecification'] = customMetric
# Update scaling policy of ASG
appautoscaling.put_scaling_policy(
ServiceNamespace='ecs',
ResourceId=policy.get('ResourceId'),
ScalableDimension=policy.get('ScalableDimension'),
PolicyName=policy.get('PolicyName'),
PolicyType=policy.get('PolicyType'),
TargetTrackingScalingPolicyConfiguration=TargetTrackingConfig
)
print('Scaling policy of ECS has been successfully updated!')

# Publish new target BPI
publishMetricValue(newTargetBPI)
11 changes: 11 additions & 0 deletions application-code/ecsdemo-queue-proc/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM public.ecr.aws/docker/library/python:alpine3.16

RUN mkdir -p /tmp/ecsproc/

WORKDIR /app

COPY ./src /app

RUN pip install -r requirements.txt

ENTRYPOINT python app.py
104 changes: 104 additions & 0 deletions application-code/ecsdemo-queue-proc/src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@

# Import modules
import boto3
import os
import json
import time
import logging
from botocore.exceptions import ClientError
from botocore.config import Config
import datetime

# Create logger
#logging.basicConfig(filename='consumer.log', level=logging.INFO)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Define config
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)

# Define session and resources
session = boto3.Session()
sqs = session.resource('sqs', config=config, region_name='us-west-2')
cloudwatch = session.client('cloudwatch', config=config)

queue_name = os.environ['queue_name']
app_metric_name = os.environ['app_metric_name']
metric_type = os.environ['metric_type']
metric_namespace = os.environ['metric_namespace']


def publishMetricValue(metricValue):

now = datetime.datetime.now()
logger.info('Time {} publishMetricValue with metric_namespace {} app_metric_name {} metricValue {} metric_type {} queue_name {}'.format(now, metric_namespace, app_metric_name, metricValue,metric_type, queue_name))
response = cloudwatch.put_metric_data(
Namespace = metric_namespace,
MetricData = [
{
'MetricName': app_metric_name,
'Value': metricValue,
'Dimensions': [
{
'Name': 'Type',
'Value': metric_type
},
{
'Name': 'QueueName',
'Value': queue_name
}
],
'StorageResolution': 1
}
]
)

if __name__=="__main__":

# Initialize variables
logger.info('Environment queue_name {} app_metric_name {} metric_type {} metric_namespace {}'.format(queue_name, app_metric_name, metric_type, metric_namespace))
logger.info('Calling get_queue_by_name....')
queue = sqs.get_queue_by_name(QueueName=queue_name)
batchSize = 1
queueWaitTime= 5

# start continuous loop
logger.info('Starting queue consumer process....')
while True:

try:

# Read messages from queue
logger.info('Polling messages from the processing queue')
messages = queue.receive_messages(AttributeNames=['All'], MaxNumberOfMessages=batchSize, WaitTimeSeconds=queueWaitTime)
if not messages: continue


logger.info('-- Received {} messages'.format(len(messages)))

# Process messages
for message in messages:
now = datetime.datetime.now()
messageBody = json.loads(message.body)
processingDuration = messageBody.get('duration')
logger.info('Time {} Processing message_id {} messageBody {}...'.format(now, message.message_id, messageBody))
time.sleep(processingDuration)

# Delete the message
message.delete()
now = datetime.datetime.now()

# Report message duration to cloudwatch
publishMetricValue(processingDuration)

except ClientError as error:
logger.error('SQS Service Exception - Code: {}, Message: {}'.format(error.response['Error']['Code'],error.response['Error']['Message']))
continue

except Exception as e:
logger.error('Unexpected error - {}'.format(e))
2 changes: 2 additions & 0 deletions application-code/ecsdemo-queue-proc/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.24.58
Pillow==10.2.0
22 changes: 22 additions & 0 deletions application-code/ecsdemo-queue-proc/templates/buildspec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: 0.2

phases:
pre_build:
commands:
- REPOSITORY=${REPO_URL%/*}
- aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $REPOSITORY
- COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7)
- IMAGE_TAG=${COMMIT_HASH:=latest}
build:
commands:
- docker build -t $REPO_URL $FOLDER_PATH
post_build:
commands:
- docker tag $REPO_URL $REPO_URL:$IMAGE_TAG
- docker push $REPO_URL:$IMAGE_TAG
- echo Writing image definitions file...
- printf '[{"name":"%s","imageUri":"%s"}]' $CONTAINER_NAME $REPO_URL:$IMAGE_TAG > imagedefinitions.json

artifacts:
files:
- '**/*'
49 changes: 49 additions & 0 deletions application-code/message-producer/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

import boto3
import os
import struct
import json
import logging
from botocore.config import Config

# Create logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Define config
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)

# Define session and resources
session = boto3.Session()
sqs = session.resource('sqs', config=config)

# Read environment variables
queue_name = os.environ['queue_name']
default_msg_proc_duration = int(os.environ['default_msg_proc_duration'])
number_of_messages = int(os.environ['number_of_messages'])


def lambda_handler(event, context):

# Get the queue
queue = sqs.get_queue_by_name(QueueName=queue_name)

# Send N messages
for i in range(number_of_messages):

# Build Msg body
randomNumber = struct.unpack('H', os.urandom(2))[0]
messageBody = {"id": randomNumber, "duration": default_msg_proc_duration}
print('Sending message id: {}'.format(randomNumber))

# Call API
response = queue.send_message(
MessageBody=json.dumps(messageBody),
MessageGroupId=str(messageBody['id']),
MessageDeduplicationId=str(messageBody['id']) + ':' + str(randomNumber),
)
Binary file added docs/SQSTestMessageProducer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/cicd_codepipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ecs-github-token.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ecs-sqs-scaling-arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ecs_auto_scaling.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ecs_service_tasks_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ecs_target_tracking_metric.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4759b56

Please sign in to comment.