-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcdk_api_gateway_sqs_lambda.py
284 lines (258 loc) · 10.7 KB
/
cdk_api_gateway_sqs_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Built-in imports
import os
# External imports
from aws_cdk import (
Stack,
Duration,
CfnOutput,
aws_apigateway,
aws_sqs,
aws_lambda,
aws_iam,
aws_lambda_event_sources,
aws_cloudwatch,
aws_cloudwatch_actions,
aws_sns,
aws_sns_subscriptions,
)
from constructs import Construct
class ApiGatewaySqsLambdaStack(Stack):
"""
Class to create the infrastructure on AWS.
"""
def __init__(
self,
scope: Construct,
construct_id: str,
name_prefix: str,
main_resources_name: str,
deployment_environment: str,
**kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Input parameters
self.construct_id = construct_id
self.name_prefix = name_prefix
self.main_resources_name = main_resources_name
self.deployment_environment = deployment_environment
# Additional configurations
self.api_stage_deployment_version = "v1"
self.sns_notifications_email = "[email protected]"
self.powertools_layer = aws_lambda.LayerVersion.from_layer_version_arn(
self,
id="Lambda-Powertools",
layer_version_arn=f"arn:aws:lambda:{self.region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:31"
)
# Main methods for the deployment
self.create_queues()
self.create_alarm_and_notifications_for_dlq()
self.create_lambda()
self.configure_sqs_event_source_for_lambda()
self.create_api_gateway()
self.create_api_gateway_to_sqs_role()
self.create_api_gateway_integration_proxy_to_sqs()
self.create_api_gateway_resource_and_route()
# Create CloudFormation outputs
self.generate_cloudformation_outputs()
def create_queues(self):
"""
Create the SQS queue.
"""
# Configure DLQ for the queue (for non-processed messages)
self.dead_letter_queue = aws_sqs.Queue(
self,
id="DeadLetterQueue",
queue_name="{}{}-queue-dlq".format(self.name_prefix, self.main_resources_name),
retention_period=Duration.days(7),
)
# Main queue for the solution
self.queue = aws_sqs.Queue(
self,
id="Queue",
queue_name="{}{}-queue".format(self.name_prefix, self.main_resources_name),
retention_period=Duration.days(5),
visibility_timeout=Duration.seconds(60),
dead_letter_queue=aws_sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.dead_letter_queue
),
)
def create_alarm_and_notifications_for_dlq(self):
"""
Create the alarm and SNS notifications for the SQS Dead Letter Queue
(DLQ) based on messages on the queue.
"""
# Create SNS topic and email subscription for it
self.sns_topic_dlq = aws_sns.Topic(
self,
id="DLQTopic",
topic_name="{}{}-topic".format(self.name_prefix, self.main_resources_name),
display_name="{}{}-topic".format(self.name_prefix, self.main_resources_name),
)
self.sns_topic_dlq.add_subscription(
aws_sns_subscriptions.EmailSubscription(self.sns_notifications_email)
)
# Create DLQ alarm for 1+ messages in queue
self.dead_letter_queue_alarm = aws_cloudwatch.Alarm(
self,
id="DLQAlarm",
alarm_name="{}{}-alarm".format(self.name_prefix, self.main_resources_name),
alarm_description="Messages on DLQ for {} solution".format(self.main_resources_name),
metric=self.dead_letter_queue.metric("ApproximateNumberOfMessagesVisible"),
threshold=0.5,
evaluation_periods=1,
actions_enabled=True,
)
# Configure CW Alarm action with the SNS topic
self.sns_action_alarm = aws_cloudwatch_actions.SnsAction(self.sns_topic_dlq)
self.dead_letter_queue_alarm.add_alarm_action(self.sns_action_alarm)
def create_lambda(self):
"""
Create the Lambda Function.
"""
# Get relative path for folder that contains Lambda function source
# ! Note--> we must obtain parent dirs to create path (that"s why there is "os.path.dirname()")
PATH_TO_FUNCTION_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"lambda",
"src",
)
self.lambda_function = aws_lambda.Function(
self,
id="Lambda",
function_name="{}{}-lambda".format(self.name_prefix, self.main_resources_name),
runtime=aws_lambda.Runtime.PYTHON_3_9,
handler="lambda_function.handler",
code=aws_lambda.Code.from_asset(PATH_TO_FUNCTION_FOLDER),
timeout=Duration.seconds(30),
memory_size=128,
environment={
"ENVIRONMENT": self.deployment_environment,
"OWNER": "Santiago Garcia Arango",
"LOG_LEVEL": "DEBUG",
},
layers=[self.powertools_layer],
tracing=aws_lambda.Tracing.ACTIVE,
)
self.lambda_function.role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name("AWSXrayWriteOnlyAccess")
)
def configure_sqs_event_source_for_lambda(self):
"""
Configure the SQS as the event source of the Lambda Function.
"""
self.sqs_event_source = aws_lambda_event_sources.SqsEventSource(
self.queue,
enabled=True,
batch_size=5,
report_batch_item_failures=True, # Necessary for processing batches from SQS
)
self.lambda_function.add_event_source(self.sqs_event_source)
def create_api_gateway(self):
"""
Create the API Gateway.
"""
self.api_gateway = aws_apigateway.RestApi(
self,
id="ApiGateway",
rest_api_name="{}{}-api".format(self.name_prefix, self.main_resources_name),
description="API Gateway for {} solution".format(self.main_resources_name),
deploy=True,
deploy_options=aws_apigateway.StageOptions(
stage_name=self.api_stage_deployment_version,
description="{} deployment".format(self.api_stage_deployment_version),
logging_level=aws_apigateway.MethodLoggingLevel.INFO,
tracing_enabled=True, # Relevant config for X-ray tracing for complete traces
),
default_cors_preflight_options=aws_apigateway.CorsOptions(
allow_credentials=False,
allow_headers=["*"],
allow_methods=["POST"],
allow_origins=["*"],
),
)
def create_api_gateway_to_sqs_role(self):
"""
Create the IAM Role for the API Gateway to SQS actions.
"""
self.api_gateway_sqs_integration_role = aws_iam.Role(
self,
id="ApiGatewayRole",
assumed_by=aws_iam.ServicePrincipal("apigateway.amazonaws.com"),
)
# Allow CW operations to API Gateway Role
self.api_gateway_sqs_integration_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name(
managed_policy_name="service-role/AmazonAPIGatewayPushToCloudWatchLogs",
),
)
# Allow API Gateway Role to "sqs:sendMessage*" to queue
self.queue.grant_send_messages(self.api_gateway_sqs_integration_role)
# Allow API Gateway Role to send traces to X-Ray
self.api_gateway_sqs_integration_role.add_managed_policy(
aws_iam.ManagedPolicy.from_aws_managed_policy_name("AWSXrayWriteOnlyAccess")
)
def create_api_gateway_integration_proxy_to_sqs(self):
"""
Create the API Gateway integration proxy to SQS.
"""
# Note 1: The "request_templates" allows us to proxy the request payload to SQS structure
# Note 2: the "integration_responses" are important, otherwise we get error:
# ... <Execution failed due to configuration error>
self.api_gateway_integration = aws_apigateway.AwsIntegration(
service="sqs",
path="{}/{}".format(os.environ.get("CDK_DEFAULT_ACCOUNT"), self.queue.queue_name),
integration_http_method="POST",
options=aws_apigateway.IntegrationOptions(
credentials_role=self.api_gateway_sqs_integration_role,
passthrough_behavior=aws_apigateway.PassthroughBehavior.WHEN_NO_MATCH,
request_templates={
"application/json": "Action=SendMessage&MessageBody=$input.body",
},
request_parameters={
"integration.request.header.Content-Type": "'application/x-www-form-urlencoded'",
},
integration_responses=[
aws_apigateway.IntegrationResponse(status_code="200"),
aws_apigateway.IntegrationResponse(status_code="400"),
aws_apigateway.IntegrationResponse(status_code="500"),
],
),
)
def create_api_gateway_resource_and_route(self):
"""
Create the API Gateway "/message" resource and route for the POST requests,
which are messages that will be integrated via aws_proxy to the SQS.
"""
# Create the "<api_endpoint>/message" resource (path)
self.send_message_api_gateway_resource = self.api_gateway.root.add_resource(
path_part="message",
)
# Add to the "<api_endpoint>/message" resource the POST method and integration
# Note: the "method_responses" are important, otherwise we get error:
# ... <Execution failed due to configuration error>
self.send_message_api_gateway_resource.add_method(
http_method="POST",
integration=self.api_gateway_integration,
operation_name="SendMessage",
method_responses=[
aws_apigateway.MethodResponse(status_code="200"),
aws_apigateway.MethodResponse(status_code="400"),
aws_apigateway.MethodResponse(status_code="500"),
],
)
def generate_cloudformation_outputs(self):
"""
Method to add the relevant CloudFormation outputs.
"""
CfnOutput(
self,
"DeploymentEnvironment",
value=self.deployment_environment,
description="Deployment environment",
)
CfnOutput(
self,
"ApiEndpoint",
value=self.api_gateway.url_for_path("/message"),
description="API Gateway endpoint",
)