Skip to content

Commit

Permalink
Add jobs that trigger on file upload (#527)
Browse files Browse the repository at this point in the history
* Refactor service name and bucket name to env-config module
* Add file_upload_jobs config to env-config module
* Update service module to add event rule and target to trigger jobs
based on file uploads
* Add role and policy that EventBridge will use to run ECS tasks
* Document background job functionality

Example app changes

* Add etl command to example Flask app
* Add etl job to file_upload_jobs config for platform-test

CI changes

* Bump terraform version in ci-infra-service workflow

Fixes

* Fixed storage access permissions to have kms:Decrypt

Unrelated changes

* Moved feature flags config to separate file in same module

## Context

Many applications need a basic ETL system that ingests files that are
uploaded to a bucket. This change adds that functionality.

---------

Co-authored-by: Daphne Gold <[email protected]>
  • Loading branch information
lorenyu and daphnegold authored Feb 6, 2024
1 parent c803535 commit 164e7e4
Show file tree
Hide file tree
Showing 23 changed files with 312 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-infra-service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

- uses: hashicorp/setup-terraform@v2
with:
terraform_version: 1.2.1
terraform_version: 1.4.6
terraform_wrapper: false

- uses: actions/setup-go@v3
Expand Down
15 changes: 13 additions & 2 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import os
from datetime import datetime

import click
from flask import Flask

import storage
from db import get_db_connection
from feature_flags import is_feature_enabled
from storage import create_upload_url

logging.basicConfig()
logger = logging.getLogger()
Expand Down Expand Up @@ -57,7 +58,7 @@ def feature_flags():
@app.route("/document-upload")
def document_upload():
path = f"uploads/{datetime.now().date()}/${{filename}}"
upload_url, fields = create_upload_url(path)
upload_url, fields = storage.create_upload_url(path)
additional_fields = "".join(
[
f'<input type="hidden" name="{name}" value="{value}">'
Expand All @@ -68,5 +69,15 @@ def document_upload():
return f'<form method="post" action="{upload_url}" enctype="multipart/form-data">{additional_fields}<input type="file" name="file"><input type="submit"></form>'


@app.cli.command("etl", help="Run ETL job")
@click.argument("input")
def etl(input):
# input should be something like "etl/input/somefile.ext"
assert input.startswith("etl/input/")
output = input.replace("/input/", "/output/")
data = storage.download_file(input)
storage.upload_file(output, data)


if __name__ == "__main__":
main()
28 changes: 27 additions & 1 deletion app/storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import logging
import os

Expand All @@ -13,6 +14,31 @@ def create_upload_url(path):
# Manually specify signature version 4 which is required since the bucket is encrypted with KMS.
# By default presigned URLs use signature version 2 to be backwards compatible
s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
logger.info("Generating presigned POST URL")
logger.info("Generating presigned POST URL for path %s", path)
response = s3_client.generate_presigned_post(bucket_name, path)
return response["url"], response["fields"]


def download_file(path):
bucket_name = os.environ.get("BUCKET_NAME")

s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
logger.info("Downloading file %s", path)
response = s3_client.get_object(
Bucket=bucket_name,
Key=path,
)
body = response["Body"]
return io.BytesIO(body.read())


def upload_file(path, data):
bucket_name = os.environ.get("BUCKET_NAME")

s3_client = boto3.client("s3", config=Config(signature_version="s3v4"))
logger.info("Uploading file to path %s", path)
s3_client.put_object(
Bucket=bucket_name,
Key=path,
Body=data,
)
2 changes: 1 addition & 1 deletion docs/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This project leverages [Amazon CloudWatch Evidently](https://docs.aws.amazon.com

## Creating feature flags

The list of feature flags for an application is defined in the `feature_flags` property in its app-config module (in `/infra/[app_name]/app-config/main.tf`). To create a new feature flag, add a new string to that list. To remove a feature flag, remove the feature flag from the list. The set of feature flags will be updated on the next terraform apply of the service layer, or during the next deploy of the application.
The list of feature flags for an application is defined in the `feature_flags` property in its app-config module (in `/infra/[app_name]/app-config/feature-flags.tf`). To create a new feature flag, add a new string to that list. To remove a feature flag, remove the feature flag from the list. The set of feature flags will be updated on the next terraform apply of the service layer, or during the next deploy of the application.

## Querying feature flags in the application

Expand Down
16 changes: 16 additions & 0 deletions docs/infra/background-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Background jobs

The application may have background jobs that support the application. Types of background jobs include:

* Jobs that occur on a fixed schedule (e.g. every hour or every night) — This type of job is useful for ETL jobs that can't be event-driven, such as ETL jobs that ingest source files from an SFTP server or from an S3 bucket managed by another team that we have little control or influence over. **This functionality has not yet been implemented**
* Jobs that trigger on an event (e.g. when a file is uploaded to the document storage service). This type of job can be processed by two types of tasks:
* Tasks that spin up on demand to process the job — This type of task is appropriate for low-frequency ETL jobs **This is the currently the only type that's supported**
* Worker tasks that are running continuously, waiting for jobs to enter a queue that the worker then processes — This type of task is ideal for high frequency, low-latency jobs such as processing user uploads or submitting claims to an unreliable or high-latency legacy system **This functionality has not yet been implemented**

## Job configuration

Background jobs for the application are configured via the application's `env-config` module. The current infrastructure supports jobs that spin up on demand tasks when a file is uploaded to the document storage service. These are configured in the `file_upload_jobs` configuration.

## How it works

File upload jobs use AWS EventBridge to listen to "Object Created" events when files are uploaded to S3. An event rule is created for each job configuration, and each event rule has a single event target that targets the application's ECS cluster. The task uses the same container image that the service uses, and the task's configuration is the same as the service's configuration with the exception of the entrypoint, which is specified by the job configuration's `task_command` setting, which can reference the bucket and path of the file that triggered the event by using the template values `<bucket_name>` and `<object_key>`.
1 change: 1 addition & 0 deletions infra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ To set up this project for the first time (aka it has never been deployed to the
1. [Set up application build repository](/docs/infra/set-up-app-build-repository.md)
2. [Set up application database](/docs/infra/set-up-database.md)
3. [Set up application environment](/docs/infra/set-up-app-env.md)
4. [Set up background jobs](/docs/infra/background-jobs.md)

### 🆕 New developer

Expand Down
1 change: 1 addition & 0 deletions infra/app/app-config/dev.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module "dev_config" {
source = "./env-config"
project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "dev"
Expand Down
14 changes: 14 additions & 0 deletions infra/app/app-config/env-config/file_upload_jobs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
locals {
# Configuration for default jobs to run in every environment.
# See description of `file_upload_jobs` variable in the service module (infra/modules/service/variables.tf)
# for the structure of this configuration object.
# One difference is that `source_bucket` is optional here. If `source_bucket` is not
# specified, then the source bucket will be set to the storage bucket's name
file_upload_jobs = {
# Example job configuration
# etl = {
# path_prefix = "etl/input",
# task_command = ["python", "-m", "flask", "--app", "app.py", "etl", "<object_key>"]
# }
}
}
8 changes: 8 additions & 0 deletions infra/app/app-config/env-config/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
locals {
# The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
# By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
# if you choose not to use workspaces set this value to "dev"
prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"

bucket_name = "${local.prefix}${var.project_name}-${var.app_name}-${var.environment}"
}
14 changes: 14 additions & 0 deletions infra/app/app-config/env-config/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,24 @@ output "network_name" {

output "service_config" {
value = {
service_name = "${local.prefix}${var.app_name}-${var.environment}"
region = var.default_region
cpu = var.service_cpu
memory = var.service_memory
desired_instance_count = var.service_desired_instance_count

file_upload_jobs = {
for job_name, job_config in local.file_upload_jobs :
# For job configs that don't define a source_bucket, add the source_bucket config property
job_name => merge({ source_bucket = local.bucket_name }, job_config)
}
}
}

output "storage_config" {
value = {
# Include project name in bucket name since buckets need to be globally unique across AWS
bucket_name = local.bucket_name
}
}

Expand Down
4 changes: 4 additions & 0 deletions infra/app/app-config/env-config/variables.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
variable "project_name" {
type = string
}

variable "app_name" {
type = string
}
Expand Down
3 changes: 3 additions & 0 deletions infra/app/app-config/feature-flags.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
locals {
feature_flags = ["foo", "bar"]
}
2 changes: 0 additions & 2 deletions infra/app/app-config/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ locals {

has_incident_management_service = false

feature_flags = ["foo", "bar"]

environment_configs = {
dev = module.dev_config
staging = module.staging_config
Expand Down
1 change: 1 addition & 0 deletions infra/app/app-config/prod.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module "prod_config" {
source = "./env-config"
project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "prod"
Expand Down
1 change: 1 addition & 0 deletions infra/app/app-config/staging.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module "staging_config" {
source = "./env-config"
project_name = local.project_name
app_name = local.app_name
default_region = module.project_config.default_region
environment = "staging"
Expand Down
4 changes: 2 additions & 2 deletions infra/app/database/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ data "aws_subnets" "database" {

locals {
# The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
# By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
# if you choose not to use workspaces set this value to "dev"
# By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
# if you choose not to use workspaces set this value to "dev"
prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"

# Add environment specific tags
Expand Down
25 changes: 9 additions & 16 deletions infra/app/service/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,18 @@ data "aws_subnets" "private" {
}

locals {
# The prefix key/value pair is used for Terraform Workspaces, which is useful for projects with multiple infrastructure developers.
# By default, Terraform creates a workspace named “default.” If a non-default workspace is not created this prefix will equal “default”,
# if you choose not to use workspaces set this value to "dev"
prefix = terraform.workspace == "default" ? "" : "${terraform.workspace}-"

# Add environment specific tags
tags = merge(module.project_config.default_tags, {
environment = var.environment_name
description = "Application resources created in ${var.environment_name} environment"
})

service_name = "${local.prefix}${module.app_config.app_name}-${var.environment_name}"

is_temporary = startswith(terraform.workspace, "t-")

# Include project name in bucket name since buckets need to be globally unique across AWS
bucket_name = "${local.prefix}${module.project_config.project_name}-${module.app_config.app_name}-${var.environment_name}"

environment_config = module.app_config.environment_configs[var.environment_name]
service_config = local.environment_config.service_config
database_config = local.environment_config.database_config
storage_config = local.environment_config.storage_config
incident_management_service_integration_config = local.environment_config.incident_management_service_integration
}

Expand Down Expand Up @@ -112,7 +103,7 @@ data "aws_security_groups" "aws_services" {

module "service" {
source = "../../modules/service"
service_name = local.service_name
service_name = local.service_config.service_name
image_repository_name = module.app_config.image_repository_name
image_tag = local.image_tag
vpc_id = data.aws_vpc.network.id
Expand All @@ -125,7 +116,7 @@ module "service" {

aws_services_security_group_id = data.aws_security_groups.aws_services.ids[0]

is_temporary = local.is_temporary
file_upload_jobs = local.service_config.file_upload_jobs

db_vars = module.app_config.has_database ? {
security_group_ids = data.aws_rds_cluster.db_cluster[0].vpc_security_group_ids
Expand All @@ -142,12 +133,14 @@ module "service" {

extra_environment_variables = [
{ name : "FEATURE_FLAGS_PROJECT", value : module.feature_flags.evidently_project_name },
{ name : "BUCKET_NAME", value : local.bucket_name }
{ name : "BUCKET_NAME", value : local.storage_config.bucket_name }
]
extra_policies = {
feature_flags_access = module.feature_flags.access_policy_arn,
storage_access = module.storage.access_policy_arn
}

is_temporary = local.is_temporary
}

module "monitoring" {
Expand All @@ -156,18 +149,18 @@ module "monitoring" {
#email_alerts_subscription_list = ["[email protected]", "[email protected]"]

# Module takes service and ALB names to link all alerts with corresponding targets
service_name = local.service_name
service_name = local.service_config.service_name
load_balancer_arn_suffix = module.service.load_balancer_arn_suffix
incident_management_service_integration_url = module.app_config.has_incident_management_service ? data.aws_ssm_parameter.incident_management_service_integration_url[0].value : null
}

module "feature_flags" {
source = "../../modules/feature-flags"
service_name = local.service_name
service_name = local.service_config.service_name
feature_flags = module.app_config.feature_flags
}

module "storage" {
source = "../../modules/storage"
name = local.bucket_name
name = local.storage_config.bucket_name
}
2 changes: 1 addition & 1 deletion infra/app/service/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ output "service_cluster_name" {
}

output "service_name" {
value = local.service_name
value = local.service_config.service_name
}

output "application_log_group" {
Expand Down
Loading

0 comments on commit 164e7e4

Please sign in to comment.