'
+@app.cli.command("etl", help="Run ETL job")
+@click.argument("input")
+def etl(input):
+ # input should be something like "etl/input/somefile.ext"
+ assert input.startswith("etl/input/")
+ output = input.replace("/input/", "/output/")
+ data = storage.download_file(input)
+ storage.upload_file(output, data)
+
+
if __name__ == "__main__":
main()
diff --git a/app/storage.py b/app/storage.py
index 02ea0691..58d0da60 100644
--- a/app/storage.py
+++ b/app/storage.py
@@ -1,3 +1,4 @@
+import io
import logging
import os
@@ -13,6 +14,31 @@ def create_upload_url(path):
# Manually specify signature version 4 which is required since the bucket is encrypted with KMS.
# By default presigned URLs use signature version 2 to be backwards compatible
s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
- logger.info("Generating presigned POST URL")
+ logger.info("Generating presigned POST URL for path %s", path)
response = s3_client.generate_presigned_post(bucket_name, path)
return response["url"], response["fields"]
+
+
+def download_file(path):
+ bucket_name = os.environ.get("BUCKET_NAME")
+
+ s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
+ logger.info("Downloading file %s", path)
+ response = s3_client.get_object(
+ Bucket=bucket_name,
+ Key=path,
+ )
+ body = response["Body"]
+ return io.BytesIO(body.read())
+
+
+def upload_file(path, data):
+ bucket_name = os.environ.get("BUCKET_NAME")
+
+ s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
+ logger.info("Uploading file to path %s", path)
+ s3_client.put_object(
+ Bucket=bucket_name,
+ Key=path,
+ Body=data,
+ )
diff --git a/docs/feature-flags.md b/docs/feature-flags.md
index c9c40280..63bf6a48 100644
--- a/docs/feature-flags.md
+++ b/docs/feature-flags.md
@@ -8,7 +8,7 @@ This project leverages [Amazon CloudWatch Evidently](https://docs.aws.amazon.com
## Creating feature flags
-The list of feature flags for an application is defined in the `feature_flags` property in its app-config module (in `/infra/[app_name]/app-config/main.tf`). To create a new feature flag, add a new string to that list. To remove a feature flag, remove the feature flag from the list. The set of feature flags will be updated on the next terraform apply of the service layer, or during the next deploy of the application.
+The list of feature flags for an application is defined in the `feature_flags` property in its app-config module (in `/infra/[app_name]/app-config/feature-flags.tf`). To create a new feature flag, add a new string to that list. To remove a feature flag, remove the feature flag from the list. The set of feature flags will be updated on the next terraform apply of the service layer, or during the next deploy of the application.
## Querying feature flags in the application
diff --git a/docs/infra/background-jobs.md b/docs/infra/background-jobs.md
new file mode 100644
index 00000000..9455948d
--- /dev/null
+++ b/docs/infra/background-jobs.md
@@ -0,0 +1,16 @@
+# Background jobs
+
+The application may have background jobs that support the application. Types of background jobs include:
+
+* Jobs that occur on a fixed schedule (e.g. every hour or every night) — This type of job is useful for ETL jobs that can't be event-driven, such as ETL jobs that ingest source files from an SFTP server or from an S3 bucket managed by another team that we have little control or influence over. **This functionality has not yet been implemented**
+* Jobs that trigger on an event (e.g. when a file is uploaded to the document storage service). This type of job can be processed by two types of tasks:
+ * Tasks that spin up on demand to process the job — This type of task is appropriate for low-frequency ETL jobs **This is the currently the only type that's supported**
+ * Worker tasks that are running continuously, waiting for jobs to enter a queue that the worker then processes — This type of task is ideal for high frequency, low-latency jobs such as processing user uploads or submitting claims to an unreliable or high-latency legacy system **This functionality has not yet been implemented**
+
+## Job configuration
+
+Background jobs for the application are configured via the application's `env-config` module. The current infrastructure supports jobs that spin up on demand tasks when a file is uploaded to the document storage service. These are configured in the `file_upload_jobs` configuration.
+
+## How it works
+
+File upload jobs use AWS EventBridge to listen to "Object Created" events when files are uploaded to S3. An event rule is created for each job configuration, and each event rule has a single event target that targets the application's ECS cluster. The task uses the same container image that the service uses, and the task's configuration is the same as the service's configuration with the exception of the entrypoint, which is specified by the job configuration's `task_command` setting, which can reference the bucket and path of the file that triggered the event by using the template values `` and ``.
diff --git a/infra/README.md b/infra/README.md
index 30005fc0..49903e10 100644
--- a/infra/README.md
+++ b/infra/README.md
@@ -67,6 +67,7 @@ To set up this project for the first time (aka it has never been deployed to the
1. [Set up application build repository](/docs/infra/set-up-app-build-repository.md)
2. [Set up application database](/docs/infra/set-up-database.md)
3. [Set up application environment](/docs/infra/set-up-app-env.md)
+ 4. [Set up background jobs](/docs/infra/background-jobs.md)
### 🆕 New developer
diff --git a/infra/app/app-config/dev.tf b/infra/app/app-config/dev.tf
index 22a5ceac..51dd733e 100644
--- a/infra/app/app-config/dev.tf
+++ b/infra/app/app-config/dev.tf
@@ -1,5 +1,6 @@
module "dev_config" {
source = "./env-config"
+ project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "dev"
diff --git a/infra/app/app-config/env-config/file_upload_jobs.tf b/infra/app/app-config/env-config/file_upload_jobs.tf
new file mode 100644
index 00000000..ef61a89f
--- /dev/null
+++ b/infra/app/app-config/env-config/file_upload_jobs.tf
@@ -0,0 +1,14 @@
+locals {
+ # Configuration for default jobs to run in every environment.
+ # See description of `file_upload_jobs` variable in the service module (infra/modules/service/variables.tf)
+ # for the structure of this configuration object.
+ # One difference is that `source_bucket` is optional here. If `source_bucket` is not
+ # specified, then the source bucket will be set to the storage bucket's name
+ file_upload_jobs = {
+ # Example job configuration
+ # etl = {
+ # path_prefix = "etl/input",
+ # task_command = ["python", "-m", "flask", "--app", "app.py", "etl", ""]
+ # }
+ }
+}
diff --git a/infra/app/app-config/env-config/main.tf b/infra/app/app-config/env-config/main.tf
new file mode 100644
index 00000000..3eebe7a2
--- /dev/null
+++ b/infra/app/app-config/env-config/main.tf
@@ -0,0 +1,8 @@
+locals {
+ # The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
+ # By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
+ # if you choose not to use workspaces set this value to "dev"
+ prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"
+
+ bucket_name = "${local.prefix}${var.project_name}-${var.app_name}-${var.environment}"
+}
diff --git a/infra/app/app-config/env-config/outputs.tf b/infra/app/app-config/env-config/outputs.tf
index 09005a1f..b9897d1b 100644
--- a/infra/app/app-config/env-config/outputs.tf
+++ b/infra/app/app-config/env-config/outputs.tf
@@ -16,10 +16,24 @@ output "network_name" {
output "service_config" {
value = {
+ service_name = "${local.prefix}${var.app_name}-${var.environment}"
region = var.default_region
cpu = var.service_cpu
memory = var.service_memory
desired_instance_count = var.service_desired_instance_count
+
+ file_upload_jobs = {
+ for job_name, job_config in local.file_upload_jobs :
+ # For job configs that don't define a source_bucket, add the source_bucket config property
+ job_name => merge({ source_bucket = local.bucket_name }, job_config)
+ }
+ }
+}
+
+output "storage_config" {
+ value = {
+ # Include project name in bucket name since buckets need to be globally unique across AWS
+ bucket_name = local.bucket_name
}
}
diff --git a/infra/app/app-config/env-config/variables.tf b/infra/app/app-config/env-config/variables.tf
index 712cea1b..add56599 100644
--- a/infra/app/app-config/env-config/variables.tf
+++ b/infra/app/app-config/env-config/variables.tf
@@ -1,3 +1,7 @@
+variable "project_name" {
+ type = string
+}
+
variable "app_name" {
type = string
}
diff --git a/infra/app/app-config/feature-flags.tf b/infra/app/app-config/feature-flags.tf
new file mode 100644
index 00000000..6224b253
--- /dev/null
+++ b/infra/app/app-config/feature-flags.tf
@@ -0,0 +1,3 @@
+locals {
+ feature_flags = ["foo", "bar"]
+}
diff --git a/infra/app/app-config/main.tf b/infra/app/app-config/main.tf
index 93fd896d..75b63807 100644
--- a/infra/app/app-config/main.tf
+++ b/infra/app/app-config/main.tf
@@ -21,8 +21,6 @@ locals {
has_incident_management_service = false
- feature_flags = ["foo", "bar"]
-
environment_configs = {
dev = module.dev_config
staging = module.staging_config
diff --git a/infra/app/app-config/prod.tf b/infra/app/app-config/prod.tf
index 4a85e934..a8790aa7 100644
--- a/infra/app/app-config/prod.tf
+++ b/infra/app/app-config/prod.tf
@@ -1,5 +1,6 @@
module "prod_config" {
source = "./env-config"
+ project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "prod"
diff --git a/infra/app/app-config/staging.tf b/infra/app/app-config/staging.tf
index 59e6dde0..9a77c34f 100644
--- a/infra/app/app-config/staging.tf
+++ b/infra/app/app-config/staging.tf
@@ -1,5 +1,6 @@
module "staging_config" {
source = "./env-config"
+ project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "staging"
diff --git a/infra/app/database/main.tf b/infra/app/database/main.tf
index d881a35d..49d120b1 100644
--- a/infra/app/database/main.tf
+++ b/infra/app/database/main.tf
@@ -15,8 +15,8 @@ data "aws_subnets" "database" {
locals {
# The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
- # By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
- # if you choose not to use workspaces set this value to "dev"
+ # By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
+ # if you choose not to use workspaces set this value to "dev"
prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"
# Add environment specific tags
diff --git a/infra/app/service/main.tf b/infra/app/service/main.tf
index 98760d80..2f27a5d4 100644
--- a/infra/app/service/main.tf
+++ b/infra/app/service/main.tf
@@ -22,27 +22,18 @@ data "aws_subnets" "private" {
}
locals {
- # The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
- # By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
- # if you choose not to use workspaces set this value to "dev"
- prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"
-
# Add environment specific tags
tags = merge(module.project_config.default_tags, {
environment = var.environment_name
description = "Application resources created in ${var.environment_name} environment"
})
- service_name = "${local.prefix}${module.app_config.app_name}-${var.environment_name}"
-
is_temporary = startswith(terraform.workspace, "t-")
- # Include project name in bucket name since buckets need to be globally unique across AWS
- bucket_name = "${local.prefix}${module.project_config.project_name}-${module.app_config.app_name}-${var.environment_name}"
-
environment_config = module.app_config.environment_configs[var.environment_name]
service_config = local.environment_config.service_config
database_config = local.environment_config.database_config
+ storage_config = local.environment_config.storage_config
incident_management_service_integration_config = local.environment_config.incident_management_service_integration
}
@@ -112,7 +103,7 @@ data "aws_security_groups" "aws_services" {
module "service" {
source = "../../modules/service"
- service_name = local.service_name
+ service_name = local.service_config.service_name
image_repository_name = module.app_config.image_repository_name
image_tag = local.image_tag
vpc_id = data.aws_vpc.network.id
@@ -125,7 +116,7 @@ module "service" {
aws_services_security_group_id = data.aws_security_groups.aws_services.ids[0]
- is_temporary = local.is_temporary
+ file_upload_jobs = local.service_config.file_upload_jobs
db_vars = module.app_config.has_database ? {
security_group_ids = data.aws_rds_cluster.db_cluster[0].vpc_security_group_ids
@@ -142,12 +133,14 @@ module "service" {
extra_environment_variables = [
{ name : "FEATURE_FLAGS_PROJECT", value : module.feature_flags.evidently_project_name },
- { name : "BUCKET_NAME", value : local.bucket_name }
+ { name : "BUCKET_NAME", value : local.storage_config.bucket_name }
]
extra_policies = {
feature_flags_access = module.feature_flags.access_policy_arn,
storage_access = module.storage.access_policy_arn
}
+
+ is_temporary = local.is_temporary
}
module "monitoring" {
@@ -156,18 +149,18 @@ module "monitoring" {
#email_alerts_subscription_list = ["email1@email.com", "email2@email.com"]
# Module takes service and ALB names to link all alerts with corresponding targets
- service_name = local.service_name
+ service_name = local.service_config.service_name
load_balancer_arn_suffix = module.service.load_balancer_arn_suffix
incident_management_service_integration_url = module.app_config.has_incident_management_service ? data.aws_ssm_parameter.incident_management_service_integration_url[0].value : null
}
module "feature_flags" {
source = "../../modules/feature-flags"
- service_name = local.service_name
+ service_name = local.service_config.service_name
feature_flags = module.app_config.feature_flags
}
module "storage" {
source = "../../modules/storage"
- name = local.bucket_name
+ name = local.storage_config.bucket_name
}
diff --git a/infra/app/service/outputs.tf b/infra/app/service/outputs.tf
index fe319eab..58711aaf 100644
--- a/infra/app/service/outputs.tf
+++ b/infra/app/service/outputs.tf
@@ -8,7 +8,7 @@ output "service_cluster_name" {
}
output "service_name" {
- value = local.service_name
+ value = local.service_config.service_name
}
output "application_log_group" {
diff --git a/infra/modules/service/jobs.tf b/infra/modules/service/jobs.tf
new file mode 100644
index 00000000..0d0bef71
--- /dev/null
+++ b/infra/modules/service/jobs.tf
@@ -0,0 +1,106 @@
+#-----------------
+# Background Jobs
+#-----------------
+# CloudWatch Event Rules and CloudWatch Event Targets that define event-based
+# triggers for background jobs, such as jobs that trigger when a file is
+# uploaded to an S3 bucket or jobs that trigger on a specified "cron" schedule.
+#
+# For each job configuration, there is a single event rule and an associated
+# event target
+#
+
+# Event rules that trigger whenever an object is created in S3
+# for a particular source bucket and object key prefix
+resource "aws_cloudwatch_event_rule" "file_upload_jobs" {
+ for_each = var.file_upload_jobs
+
+ name = "${local.cluster_name}-${each.key}"
+ description = "File uploaded to bucket ${each.value.source_bucket} with path prefix ${each.value.path_prefix}"
+
+ event_pattern = jsonencode({
+ source = ["aws.s3"],
+ detail-type = ["Object Created"],
+ detail = {
+ bucket = {
+ name = [each.value.source_bucket]
+ },
+ object = {
+ key = [{
+ prefix = each.value.path_prefix
+ }]
+ }
+ }
+ })
+}
+
+# Event target for each event rule that specifies what task command to run
+
+resource "aws_cloudwatch_event_target" "document_upload_jobs" {
+ for_each = var.file_upload_jobs
+
+ target_id = "${local.cluster_name}-${each.key}"
+ rule = aws_cloudwatch_event_rule.file_upload_jobs[each.key].name
+ arn = aws_ecs_cluster.cluster.arn
+ role_arn = aws_iam_role.events.arn
+
+ ecs_target {
+ task_definition_arn = aws_ecs_task_definition.app.arn
+ launch_type = "FARGATE"
+
+ # Configuring Network Configuration is required when the task definition uses the awsvpc network mode.
+ network_configuration {
+ subnets = var.private_subnet_ids
+ security_groups = [aws_security_group.app.id]
+ }
+ }
+
+ input_transformer {
+ input_paths = {
+ bucket_name = "$.detail.bucket.name",
+ object_key = "$.detail.object.key",
+ }
+
+ # When triggering the ECS task, override the command to run in the container to the
+ # command specified by the file_upload_job config. To do this define an input_template
+ # that transforms the input S3 event:
+ # {
+ # detail: {
+ # bucket: { name: "mybucket" },
+ # object: { key: "uploaded/file/path" }
+ # }
+ # }
+ # to match the Amazon ECS RunTask TaskOverride structure:
+ # {
+ # containerOverrides: [{
+ # name: "container_name",
+ # command: ["command", "to", "run"]
+ # }]
+ # }
+ # (see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html#targets-specifics-ecs-task
+ # and https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html)
+ #
+ # The task command can optionally use the bucket name or the object key in the command
+ # by including the placeholder values "" or "", e.g.
+ # {
+ # containerOverrides: [{
+ # name: "container_name",
+ # command: ["process_file.sh", "--bucket", "", "--object", ""]
+ # }]
+ # }
+ #
+ # Since jsonencode will cause the string "" to turn into
+ # "U+003Cbucket_nameU+003E" and "" to turn into "U+003Cobject_keyU+003E",
+ # we need to replace the unicode characters U+003C and U+003E with < and > to reverse
+ # the encoding.
+ # (see https://developer.hashicorp.com/terraform/language/functions/jsonencode and
+ # https://github.com/hashicorp/terraform/pull/18871)
+ input_template = replace(replace(jsonencode({
+ containerOverrides = [
+ {
+ name = local.container_name,
+ command = each.value.task_command
+ }
+ ]
+ }), "\\u003c", "<"), "\\u003e", ">")
+ }
+}
diff --git a/infra/modules/service/main.tf b/infra/modules/service/main.tf
index 5e7ecf2f..e86c0c5a 100644
--- a/infra/modules/service/main.tf
+++ b/infra/modules/service/main.tf
@@ -7,6 +7,7 @@ data "aws_ecr_repository" "app" {
locals {
alb_name = var.service_name
cluster_name = var.service_name
+ container_name = var.service_name
log_group_name = "service/${var.service_name}"
log_stream_prefix = var.service_name
task_executor_role_name = "${var.service_name}-task-executor"
@@ -68,7 +69,7 @@ resource "aws_ecs_task_definition" "app" {
container_definitions = jsonencode([
{
- name = var.service_name,
+ name = local.container_name,
image = local.image_url,
memory = var.memory,
cpu = var.cpu,
diff --git a/infra/modules/service/task-scheduler-role.tf b/infra/modules/service/task-scheduler-role.tf
new file mode 100644
index 00000000..d0b4393c
--- /dev/null
+++ b/infra/modules/service/task-scheduler-role.tf
@@ -0,0 +1,57 @@
+#---------------------
+# Task Scheduler Role
+#---------------------
+# Role and policy used by AWS EventBridge to trigger jobs from events
+#
+
+# Role that EventBridge will assume
+# The role allows EventBridge to run tasks on the ECS cluster
+resource "aws_iam_role" "events" {
+ name = "${local.cluster_name}-events"
+ managed_policy_arns = [aws_iam_policy.run_task.arn]
+ assume_role_policy = data.aws_iam_policy_document.events_assume_role.json
+}
+
+data "aws_iam_policy_document" "events_assume_role" {
+ statement {
+ effect = "Allow"
+ actions = ["sts:AssumeRole"]
+ principals {
+ type = "Service"
+ identifiers = ["events.amazonaws.com"]
+ }
+ }
+}
+
+# Policy that allows running tasks on the ECS cluster
+resource "aws_iam_policy" "run_task" {
+ name = "${var.service_name}-run-access"
+ policy = data.aws_iam_policy_document.run_task.json
+}
+
+data "aws_iam_policy_document" "run_task" {
+ statement {
+ effect = "Allow"
+ actions = ["ecs:RunTask"]
+ resources = ["${aws_ecs_task_definition.app.arn_without_revision}:*"]
+ condition {
+ test = "ArnLike"
+ variable = "ecs:cluster"
+ values = [aws_ecs_cluster.cluster.arn]
+ }
+ }
+
+ statement {
+ effect = "Allow"
+ actions = ["iam:PassRole"]
+ resources = [
+ aws_iam_role.task_executor.arn,
+ aws_iam_role.app_service.arn,
+ ]
+ condition {
+ test = "StringLike"
+ variable = "iam:PassedToService"
+ values = ["ecs-tasks.amazonaws.com"]
+ }
+ }
+}
diff --git a/infra/modules/service/variables.tf b/infra/modules/service/variables.tf
index 6034ec75..0efcd36f 100644
--- a/infra/modules/service/variables.tf
+++ b/infra/modules/service/variables.tf
@@ -90,6 +90,35 @@ variable "extra_policies" {
default = {}
}
+variable "file_upload_jobs" {
+ type = map(object({
+ source_bucket = string
+ path_prefix = string
+ task_command = list(string)
+ }))
+
+ description = <`
+ and ``. For example if task_command is:
+
+ ["python", "etl.py", ""]
+
+ Then if an object was uploaded to s3://somebucket/path/to/file.txt, the
+ task will execute the command:
+
+ python etl.py path/to/file.txt
+ EOT
+ default = {}
+}
+
variable "is_temporary" {
description = "Whether the service is meant to be spun up temporarily (e.g. for automated infra tests). This is used to disable deletion protection for the load balancer."
type = bool
diff --git a/infra/modules/storage/access-control.tf b/infra/modules/storage/access-control.tf
index 6cba9bb5..cda3065d 100644
--- a/infra/modules/storage/access-control.tf
+++ b/infra/modules/storage/access-control.tf
@@ -51,7 +51,7 @@ data "aws_iam_policy_document" "storage_access" {
resources = ["arn:aws:s3:::${var.name}/*"]
}
statement {
- actions = ["kms:GenerateDataKey"]
+ actions = ["kms:GenerateDataKey", "kms:Decrypt"]
effect = "Allow"
resources = [aws_kms_key.storage.arn]
}