From 2bbf4a38e6301f0a4c23e4150d1fcb24fd587431 Mon Sep 17 00:00:00 2001 From: Andrew Johnston Date: Wed, 14 Dec 2022 11:22:17 -0900 Subject: [PATCH 1/3] draft of limiting number of concurrent step function executions --- lib/dynamo/dynamo/jobs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/dynamo/dynamo/jobs.py b/lib/dynamo/dynamo/jobs.py index 72ad7e609..b8a604390 100644 --- a/lib/dynamo/dynamo/jobs.py +++ b/lib/dynamo/dynamo/jobs.py @@ -152,7 +152,7 @@ def update_job(job): ) -def get_jobs_waiting_for_execution(limit: int) -> list[dict]: +def get_jobs_waiting_for_execution(limit: int, max_scanned: int = 50000) -> list[dict]: table = DYNAMODB_RESOURCE.Table(environ['JOBS_TABLE_NAME']) params = { @@ -162,8 +162,9 @@ def get_jobs_waiting_for_execution(limit: int) -> list[dict]: } response = table.query(**params) jobs = response['Items'] + scanned = response['ScannedCount'] - while 'LastEvaluatedKey' in response and len(jobs) < limit: + while 'LastEvaluatedKey' in response and len(jobs) < limit and scanned < max_scanned: params['ExclusiveStartKey'] = response['LastEvaluatedKey'] response = table.query(**params) jobs.extend(response['Items']) From 5968db15e0e6806d24a870d9f1441221ab6cd1cd Mon Sep 17 00:00:00 2001 From: Andrew Johnston Date: Wed, 14 Dec 2022 11:23:08 -0900 Subject: [PATCH 2/3] increment scanned_count --- lib/dynamo/dynamo/jobs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/dynamo/dynamo/jobs.py b/lib/dynamo/dynamo/jobs.py index b8a604390..3a5c9aa7d 100644 --- a/lib/dynamo/dynamo/jobs.py +++ b/lib/dynamo/dynamo/jobs.py @@ -168,5 +168,6 @@ def get_jobs_waiting_for_execution(limit: int, max_scanned: int = 50000) -> list params['ExclusiveStartKey'] = response['LastEvaluatedKey'] response = table.query(**params) jobs.extend(response['Items']) + scanned += response['ScannedCount'] return jobs[:limit] From 2f34e7f6bf1d3532208e82c582a3076016616a75 Mon Sep 17 00:00:00 2001 From: Andrew Johnston Date: Wed, 14 Dec 2022 13:52:55 -0900 Subject: [PATCH 3/3] Update lib/dynamo/dynamo/jobs.py Co-authored-by: Jake Herrmann --- lib/dynamo/dynamo/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dynamo/dynamo/jobs.py b/lib/dynamo/dynamo/jobs.py index 3a5c9aa7d..19cd37192 100644 --- a/lib/dynamo/dynamo/jobs.py +++ b/lib/dynamo/dynamo/jobs.py @@ -152,7 +152,7 @@ def update_job(job): ) -def get_jobs_waiting_for_execution(limit: int, max_scanned: int = 50000) -> list[dict]: +def get_jobs_waiting_for_execution(limit: int, max_scanned: int = 50_000) -> list[dict]: table = DYNAMODB_RESOURCE.Table(environ['JOBS_TABLE_NAME']) params = {