Skip to content

Commit

Permalink
Merge pull request #753 from JiscSD/OC-925
Browse files Browse the repository at this point in the history
OC-925: Configure ECS task to run on a schedule
  • Loading branch information
finlay-jisc authored Jan 20, 2025
2 parents 99f732b + 6524964 commit ddb50cf
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 77 deletions.
13 changes: 2 additions & 11 deletions api/serverless-config-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -490,22 +490,13 @@ functions:
method: GET
cors: true
# Integrations
incrementalAriIngestHttp:
handler: dist/src/components/integration/routes.incrementalAriIngest
timeout: 900
triggerARIIngest:
handler: dist/src/components/integration/routes.triggerARIIngest
events:
- http:
path: ${self:custom.versions.v1}/integrations/ari/incremental
method: POST
cors: true
# Commented out - for the time being ARI ingests will just be manually triggered.
# incrementalAriIngestScheduled:
# handler: dist/src/components/integration/controller.incrementalAriIngest
# timeout: 900
# events:
# - schedule:
# rate: cron(0 5 ? * TUE *) # Every Tuesday at 5 a.m.
# enabled: ${self:custom.scheduledAriIngestEnabled.${opt:stage}, false}

package:
defaultPatterns:
Expand Down
9 changes: 1 addition & 8 deletions api/serverless-config-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,4 @@ functions:
generatePDFsFromQueue:
handler: dist/src/components/sqs/handler.generatePDFs
events:
- sqs: 'arn:aws:sqs:${aws:region}:${aws:accountId}:science-octopus-pdf-queue-${self:provider.stage}'
triggerECSTask:
handler: dist/src/components/integration/routes.triggerECSTask
events:
- http:
path: ${self:custom.versions.v1}/integrations/simple-ecs-task
method: POST
cors: true
- sqs: 'arn:aws:sqs:${aws:region}:${aws:accountId}:science-octopus-pdf-queue-${self:provider.stage}'
4 changes: 3 additions & 1 deletion api/src/components/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ They should also always be owned by an organisational user account. That is, a u

On deployed environments, integrations are run in containers on AWS Elastic Container Service. These containers are defined in the infrastructure code (see [Dockerfile](../../../../infra/docker/ariImportRunner/Dockerfile)), so they can be built and tested locally from the `infra/docker/ariImportRunner` directory with `docker compose up` (see [compose.yml](../../../../infra/docker/ariImportRunner/compose.yml)).

These task containers may be triggered using an API key protected API endpoint that in turn triggers the task to spin up (e.g. the `triggerARIIngest` endpoint), or automatically at a specified time by an Eventbridge scheduler (see this in the [infra code](../../../../infra/modules/ecs/schedule.tf)).

They can also be run ad hoc on the local environment via npm scripts, for example (from the `api` directory):

`npm run ariImport -- dryRun=true allDepartments=true full=false`
Expand All @@ -42,7 +44,7 @@ On import, ARIs go through a handling flow:

- If no publication exists with the ARI's question ID in its `externalId` field, it is created as a new publication.
- If a publication does exist with the ARI's question ID in its `externalId` field, it is compared to the existing publication for changes.
- If changes are found, the existing publication is reversioned with those changes applied.
- If changes are found, the existing publication is updated with those changes applied. Note that this is not a reversioning - ARI publications always have only one version.
- If no changes are found, no action is taken.

#### How ARI data is mapped to octopus data
Expand Down
5 changes: 3 additions & 2 deletions api/src/components/integration/__tests__/ari.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,10 @@ describe('ARI import processes', () => {
.post('/integrations/ari/incremental')
.query({ apiKey: process.env.TRIGGER_ARI_INGEST_API_KEY });

expect(triggerImport.status).toEqual(202);
// May not seem right but the service task has technically been triggered, hence 200.
expect(triggerImport.status).toEqual(200);
expect(triggerImport.body).toMatchObject({
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
message: 'Did not run ingest. Either an import is already in progress or the last import failed.'
});
});
});
40 changes: 3 additions & 37 deletions api/src/components/integration/controller.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,11 @@
import * as I from 'interface';
import * as ingestLogService from 'ingestLog/service';
import * as integrationService from 'integration/service';
import * as response from 'lib/response';

export const incrementalAriIngest = async (
event: I.APIRequest | I.EventBridgeEvent<'Scheduled Event', string>
export const triggerAriIngest = async (
event: I.APIRequest<undefined, I.TriggerAriIngestQueryParams, undefined>
): Promise<I.JSONResponse> => {
// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);
const triggeredByHttp = event && 'headers' in event;
const dryRun = triggeredByHttp ? !!event.queryStringParameters?.dryRun : false;
const dryRunMessages: string[] = [];

if (lastLog && !lastLog.end) {
if (dryRun) {
dryRunMessages.push(
'This run would have been cancelled because another run is currently in progress. However, the run has still been simulated.'
);
} else {
return response.json(202, {
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
});
}
}

try {
const ingestResult = await integrationService.incrementalAriIngest(dryRun, 'email');

return response.json(
200,
dryRunMessages.length ? { messages: [...dryRunMessages, ingestResult] } : ingestResult
);
} catch (error) {
console.log(error);

return response.json(500, { message: 'Unknown server error.' });
}
};

export const triggerECSTask = async (): Promise<I.JSONResponse> => {
const triggerTaskOutput = await integrationService.triggerECSTask();
const triggerTaskOutput = await integrationService.triggerAriIngest(event.queryStringParameters.dryRun);

return response.json(200, { message: triggerTaskOutput });
};
8 changes: 2 additions & 6 deletions api/src/components/integration/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import * as middleware from 'middleware';

const triggerAriIngestApiKey = Helpers.checkEnvVariable('TRIGGER_ARI_INGEST_API_KEY');

export const incrementalAriIngest = middy(integrationController.incrementalAriIngest)
export const triggerARIIngest = middy(integrationController.triggerAriIngest)
.use(middleware.doNotWaitForEmptyEventLoop({ runOnError: true, runOnBefore: true, runOnAfter: true }))
.use(middleware.authentication(false, false, triggerAriIngestApiKey))
.use(middleware.validator(integrationSchema.incrementalAriIngestHttp, 'queryStringParameters'));

export const triggerECSTask = middy(integrationController.triggerECSTask)
.use(middleware.doNotWaitForEmptyEventLoop({ runOnError: true, runOnBefore: true, runOnAfter: true }))
.use(middleware.authentication(false, false, triggerAriIngestApiKey));
.use(middleware.validator(integrationSchema.triggerAriIngest, 'queryStringParameters'));
2 changes: 1 addition & 1 deletion api/src/components/integration/schema/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { default as incrementalAriIngestHttp } from './incrementalAriIngestHttp';
export { default as triggerAriIngest } from './triggerAriIngest';
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import * as I from 'interface';

const incrementalAriIngestHttpSchema: I.Schema = {
const incrementalAriIngestHttpSchema: I.JSONSchemaType<I.TriggerAriIngestQueryParams> = {
type: 'object',
properties: {
apiKey: {
type: 'string'
},
dryRun: {
type: 'boolean'
type: 'boolean',
nullable: true
}
},
additionalProperties: false,
Expand Down
38 changes: 29 additions & 9 deletions api/src/components/integration/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ import * as I from 'interface';
* recent successful ingest (if this start time is available).
*/
export const incrementalAriIngest = async (dryRun: boolean, reportFormat: I.IngestReportFormat): Promise<string> => {
// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);

if (lastLog && !lastLog.end) {
if (dryRun) {
console.log(
'This run would have been cancelled because another run is currently in progress. However, the run has still been simulated.'
);
} else {
return 'Did not run ingest. Either an import is already in progress or the last import failed.';
}
}

const start = new Date();
const MAX_UNCHANGED_STREAK = 5;
// Get most start time of last successful run to help us know when to stop.
Expand Down Expand Up @@ -155,13 +168,20 @@ export const incrementalAriIngest = async (dryRun: boolean, reportFormat: I.Inge
return `${preamble} ${writeCount} publication${writeCount !== 1 ? 's' : ''}.`;
};

export const triggerECSTask = async (): Promise<string> => {
await ecs.runFargateTask({
clusterArn: Helpers.checkEnvVariable('ECS_CLUSTER_ARN'),
securityGroups: [Helpers.checkEnvVariable('ECS_TASK_SECURITY_GROUP_ID')],
subnetIds: Helpers.checkEnvVariable('PRIVATE_SUBNET_IDS').split(','),
taskDefinitionId: Helpers.checkEnvVariable('ECS_TASK_DEFINITION_ID')
});

return 'Done';
export const triggerAriIngest = async (dryRun?: boolean): Promise<string> => {
if (process.env.STAGE !== 'local') {
// If not local, trigger task to run in ECS.
await ecs.runFargateTask({
clusterArn: Helpers.checkEnvVariable('ECS_CLUSTER_ARN'),
...(dryRun && { commandOverride: ['npm', 'run', 'ariImport', '--', 'dryRun=true', 'reportFormat=email'] }),
securityGroups: [Helpers.checkEnvVariable('ECS_TASK_SECURITY_GROUP_ID')],
subnetIds: Helpers.checkEnvVariable('PRIVATE_SUBNET_IDS').split(','),
taskDefinitionId: Helpers.checkEnvVariable('ECS_TASK_DEFINITION_ID')
});

return 'Task triggered.';
} else {
// If local, just run the ingest directly.
return await incrementalAriIngest(!!dryRun, 'file');
}
};
10 changes: 10 additions & 0 deletions api/src/lib/ecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const client = new ECSClient();

export const runFargateTask = async (config: {
clusterArn: string;
commandOverride?: string[];
securityGroups: string[];
subnetIds: string[];
taskDefinitionId: string;
Expand All @@ -17,6 +18,15 @@ export const runFargateTask = async (config: {
subnets: config.subnetIds
}
},
...(config.commandOverride && {
overrides: {
containerOverrides: [
{
command: config.commandOverride
}
]
}
}),
taskDefinition: config.taskDefinitionId
};
const command = new RunTaskCommand(input);
Expand Down
5 changes: 5 additions & 0 deletions api/src/lib/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1079,3 +1079,8 @@ export interface HandledARI {
}

export type IngestReportFormat = 'email' | 'file';

export interface TriggerAriIngestQueryParams {
apiKey: string;
dryRun?: boolean;
}
97 changes: 97 additions & 0 deletions infra/modules/ecs/schedule.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Code adapted from https://medium.com/@igorkachmaryk/using-terraform-to-setup-aws-eventbridge-scheduler-and-a-scheduled-ecs-task-1208ae077360
resource "aws_iam_role" "scheduler" {
name = "cron-scheduler-role-${var.environment}-${var.project_name}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = ["scheduler.amazonaws.com"]
}
Action = "sts:AssumeRole"
}
]
})
}

resource "aws_iam_role_policy_attachment" "scheduler" {
policy_arn = aws_iam_policy.scheduler.arn
role = aws_iam_role.scheduler.name
}

resource "aws_iam_policy" "scheduler" {
name = "cron-scheduler-policy-${var.environment}-${var.project_name}"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow",
Action = [
"ecs:RunTask"
]
# Replace revision number with *
Resource = ["${trimsuffix(aws_ecs_task_definition.ari-import.arn, ":${aws_ecs_task_definition.ari-import.revision}")}:*"]
},
{
Effect = "Allow",
Action = [
"iam:PassRole"
]
Resource = [aws_iam_role.ecs-task-role.arn, aws_iam_role.ecs-task-exec-role.arn]
},
{
Action = [
"sqs:SendMessage"
],
Effect = "Allow",
Resource = [aws_sqs_queue.scheduler-dlq.arn]
}
]
})
}

resource "aws_scheduler_schedule" "ari_import_cron" {
name = "ari-import-schedule-int"

flexible_time_window {
mode = "OFF"
}

schedule_expression = "cron(0 5 ? * TUE *)" # Run every Tuesday at 5AM

target {
arn = aws_ecs_cluster.ecs.arn
role_arn = aws_iam_role.scheduler.arn

# On prod, override container command to do a dry run instead of a real one.
# The output will be checked before manually triggering a real run using the API.
input = terraform.workspace == "prod" ? jsonencode({
containerOverrides = [
{
command = ["npm", "run", "ariImport", "--", "dryRun=true", "reportFormat=email"]
name = "ari-import"
}
]
}) : null

dead_letter_config {
arn = aws_sqs_queue.scheduler-dlq.arn
}

ecs_parameters {
# Trimming the revision suffix here so that schedule always uses latest revision
task_definition_arn = trimsuffix(aws_ecs_task_definition.ari-import.arn, ":${aws_ecs_task_definition.ari-import.revision}")
launch_type = "FARGATE"

network_configuration {
security_groups = [aws_security_group.ari-import-task-sg.id]
subnets = var.private_subnet_ids
}
}
}
}

resource "aws_sqs_queue" "scheduler-dlq" {
name = "scheduler-dlq-${var.environment}-${var.project_name}"
}

0 comments on commit ddb50cf

Please sign in to comment.