From dd927019362327936c8a86102a3da4d985a60e59 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Tue, 27 Jun 2017 14:40:54 -0400 Subject: [PATCH 1/9] Switched priority types from strings to integers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of tracking task priority with words like “high”, “low”, etc, this commit changes task priorities to be tracked with positive integers. Lower integers are higher priority. Why? Simplicity and performance. It is much easier for a database to sort numbers than to convert words to numbers and then sort. It also simplifies the “vocabulary”, meaning the specific word used is not relevant - only the magnitude of number. --- api/migrations/0005_change_priority_types.py | 20 ++++++++++++++++++++ api/models.py | 10 +++++----- api/queue.py | 15 ++------------- api/serializers.py | 6 +++--- api/test/test_tasks.py | 11 +++++------ api/test/test_tasks_queue.py | 12 ++++++------ docker-compose.yml | 2 +- 7 files changed, 42 insertions(+), 34 deletions(-) create mode 100644 api/migrations/0005_change_priority_types.py diff --git a/api/migrations/0005_change_priority_types.py b/api/migrations/0005_change_priority_types.py new file mode 100644 index 0000000..6cdbe0b --- /dev/null +++ b/api/migrations/0005_change_priority_types.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.8 on 2017-06-26 21:33 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('api', '0004_remove_priority_levels'), + ] + + operations = [ + migrations.AlterField( + model_name='task', + name='priority', + field=models.PositiveSmallIntegerField(choices=[(1, 'Critical'), (2, 'High'), (3, 'Normal'), (4, 'Low')], default=3), + ), + ] diff --git a/api/models.py b/api/models.py index 4c1032b..910d1f4 100644 --- a/api/models.py +++ b/api/models.py @@ -15,10 +15,10 @@ ) PRIORITY_CHOICES = ( - ("critical", "Critical"), - ("high", "High"), - ("normal", "Normal"), - ("low", "Low") + (1, "Critical"), + (2, "High"), + (3, "Normal"), + (4, "Low") ) class TaskDef(models.Model): @@ -50,7 +50,7 @@ class Meta: status = models.CharField(choices=STATUS_CHOICES, max_length=17, default='queued') worker_id = models.CharField(null=True, max_length=255) locked_at = models.DateTimeField(null=True) - priority = models.CharField(choices=PRIORITY_CHOICES, max_length=8, default="normal") + priority = models.PositiveSmallIntegerField(choices=PRIORITY_CHOICES, default=3) unique = models.CharField(null=True, max_length=255) run_at = models.DateTimeField(default=lambda: timezone.now()) started_at = models.DateTimeField(null=True) diff --git a/api/queue.py b/api/queue.py index f2f41b4..faef5f4 100644 --- a/api/queue.py +++ b/api/queue.py @@ -1,5 +1,4 @@ from django.db import connection - from api.models import TaskDef, Task get_task_sql = """ @@ -16,19 +15,9 @@ (NOW() > (locked_at + INTERVAL '1 second' * task_defs.default_timeout))) OR (status = 'failed_retrying' AND attempts < task_defs.max_attempts)) - ORDER BY - CASE WHEN priority = 'critical' - THEN 1 - WHEN priority = 'high' - THEN 2 - WHEN priority = 'normal' - THEN 3 - WHEN priority = 'low' - THEN 4 - END, - run_at - LIMIT %s + ORDER BY priority, run_at FOR UPDATE SKIP LOCKED + LIMIT %s ) UPDATE tasks SET status = 'in_progress', diff --git a/api/serializers.py b/api/serializers.py index da27cca..f396473 100644 --- a/api/serializers.py +++ b/api/serializers.py @@ -56,14 +56,14 @@ def update(self, instance, validated_data): failed_at = validated_data.get('failed_at', None) completed_at = validated_data.get('completed_at', None) - if failed_at == None and completed_at != None: + if failed_at is None and completed_at is not None: instance.status = 'complete' - elif failed_at != None and completed_at == None: + elif failed_at is not None and completed_at is None: if instance.attempts >= instance.task_def.max_attempts: instance.status = 'failed' else: instance.status = 'failed_retrying' - elif failed_at != None and completed_at != None: + elif failed_at is not None and completed_at is not None: raise exceptions.ValidationError('`failed_at` and `completed_at` cannot be both non-null at the same time.') instance.worker_id = validated_data.get('worker_id', instance.priority) diff --git a/api/test/test_tasks.py b/api/test/test_tasks.py index d9d518e..e34458d 100644 --- a/api/test/test_tasks.py +++ b/api/test/test_tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import datetime from unittest.mock import patch from rest_framework.test import APITestCase, APIClient @@ -37,7 +37,6 @@ def setUp(self): @patch('django.utils.timezone.now') def test_queueing(self, mocked_now): test_datetime = datetime.utcnow().isoformat() + 'Z' - mocked_now.return_value = test_datetime task_post_data = { @@ -58,7 +57,7 @@ def test_queueing(self, mocked_now): ## test fields defaults self.assertEqual(response.data['status'], 'queued') - self.assertEqual(response.data['priority'], 'normal') + self.assertEqual(response.data['priority'], 3) self.assertEqual(response.data['run_at'], test_datetime) def test_queueing_auth(self): @@ -136,13 +135,13 @@ def test_update_task(self): self.assertEqual(create_response.status_code, 201) update = create_response.data - update['priority'] = 'high' + update['priority'] = 2 update_response = client.put('/tasks/' + str(update['id']), update, format='json') self.assertEqual(update_response.status_code, 200) self.assertEqual(list(update_response.data.keys()), task_keys) - self.assertEqual(update_response.data['priority'], 'high') + self.assertEqual(update_response.data['priority'], 2) def test_update_task_auth(self): task_post_data = { @@ -163,7 +162,7 @@ def test_update_task_auth(self): client = APIClient() # clear token update = create_response.data - update['priority'] = 'high' + update['priority'] = 2 update_response = client.put('/tasks/' + str(update['id']), update, format='json') diff --git a/api/test/test_tasks_queue.py b/api/test/test_tasks_queue.py index 91ef5d0..1a3966a 100644 --- a/api/test/test_tasks_queue.py +++ b/api/test/test_tasks_queue.py @@ -9,7 +9,7 @@ class TaskQueueTests(APITestCase): @patch('django.utils.timezone.now') def setUp(self, mocked_now): # now needs to be padded to account for API and db clocks not in perfect sync - test_datetime = (datetime.utcnow() - timedelta(0,3)).isoformat() + 'Z' + test_datetime = (datetime.utcnow() - timedelta(0, 3)).isoformat() + 'Z' mocked_now.return_value = test_datetime @@ -41,10 +41,10 @@ def schedule_task(self, client, run_at=None, priority=None): } } - if run_at != None: + if run_at is not None: task_post_data['run_at'] = run_at - if priority != None: + if priority is not None: task_post_data['priority'] = priority response = client.post('/tasks', task_post_data, format='json') @@ -103,9 +103,9 @@ def test_pull_from_queue_order(self, mocked_now): ## purposely not ordered by the actual expected by pull task1 = self.schedule_task(client, run_at=minus_10_min) - task2 = self.schedule_task(client, priority='high') - task3 = self.schedule_task(client, priority='low') - task4 = self.schedule_task(client, run_at=minus_10_min, priority='high') + task2 = self.schedule_task(client, priority=2) + task3 = self.schedule_task(client, priority=4) + task4 = self.schedule_task(client, run_at=minus_10_min, priority=2) response = client.get('/tasks/queue?tasks=' + self.task_def_name + '&worker_id=foo') self.assertEqual(task4['id'], response.data[0]['id']) diff --git a/docker-compose.yml b/docker-compose.yml index 6eb532d..30c5c93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,7 @@ services: image: postgres task: build: . - command: bash -c "python manage.py migrate && python manage.py runserver 0.0.0.0:8001" + command: bash -c "sleep 5 && python manage.py migrate -v3 --no-input && python manage.py runserver 0.0.0.0:8001" volumes: - .:/code ports: From 545ca8ca07a5559d443614f7b3334f0fb2eaea06 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Wed, 28 Jun 2017 13:38:04 -0400 Subject: [PATCH 2/9] Get or create task-def on task create request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For now, I believe it is acceptable for the task-service to automatically create a task-def based off of a task creation request if a task-def of the specified name doesn’t already exist. This also helps with ensuring the core-service tests succeed in their current form, as they do not create a task-def prior to creating a task. --- README.md | 2 +- api/models.py | 2 +- api/serializers.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a240ea7..748faa3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Cognoma task-service -This repository houses the service, accessible by HTTP RESTful API, responsable for manageing backround tasks within the Cognoma application backend. +This repository houses the service, accessible by HTTP RESTful API, responsible for managing background tasks within the Cognoma application backend. ## Getting started diff --git a/api/models.py b/api/models.py index 910d1f4..4010f67 100644 --- a/api/models.py +++ b/api/models.py @@ -31,7 +31,7 @@ class Meta: validators=[ RegexValidator( regex='^[a-z0-9\-_]+$', - message='Task definition name can only contain lowercase alphanumeric charaters, dashes, and underscores.', + message='Task definition name can only contain lowercase alphanumeric characters, dashes, and underscores.', ) ] ) diff --git a/api/serializers.py b/api/serializers.py index f396473..1ddd122 100644 --- a/api/serializers.py +++ b/api/serializers.py @@ -48,7 +48,10 @@ class TaskSerializer(serializers.Serializer): def create(self, validated_data): try: - return Task.objects.create(**validated_data) + # task creation request specifies name of a task-def. + # get_or_create guarantees that a task-def of that name will exist before the task is created + task_def, created = TaskDef.objects.get_or_create(name=validated_data.pop('task_def').name) + return Task.objects.create(task_def=task_def, **validated_data) except IntegrityError: raise UniqueTaskConflict() From 08cfa711c8cff9a74fe45067bde79036cb8ee9c5 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Wed, 28 Jun 2017 15:09:39 -0400 Subject: [PATCH 3/9] Fixed typos --- api/test/test_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/test/test_tasks.py b/api/test/test_tasks.py index e34458d..184521f 100644 --- a/api/test/test_tasks.py +++ b/api/test/test_tasks.py @@ -208,7 +208,7 @@ def test_get_task(self): client = APIClient() client.credentials(HTTP_AUTHORIZATION=self.token) - task_create_response = client.post('/tasks', task_post_data, format='json') + task_create_response = client.post('/tasks/', task_post_data, format='json') self.assertEqual(task_create_response.status_code, 201) From edf6bd0a460dd7d53d6fdbccd07e9a7d169d1b6a Mon Sep 17 00:00:00 2001 From: dcgoss Date: Wed, 28 Jun 2017 15:32:08 -0400 Subject: [PATCH 4/9] Nested task-def creation from task creation Including the nested data of a task-def in the data of task create request will create both the task-def and the task simultaneously. If the task-def already exists, a new one is not created and the old one is used. --- api/serializers.py | 3 ++- api/test/test_tasks.py | 52 +++++++++++++++++++++++++++++------- api/test/test_tasks_queue.py | 4 ++- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/api/serializers.py b/api/serializers.py index 1ddd122..ec95c3c 100644 --- a/api/serializers.py +++ b/api/serializers.py @@ -32,6 +32,7 @@ def update(self, instance, validated_data): class TaskSerializer(serializers.Serializer): id = serializers.IntegerField(read_only=True) task_def = serializers.PrimaryKeyRelatedField(required=True, queryset=TaskDef.objects.all()) + task_def = TaskDefSerializer() status = serializers.CharField(read_only=True) worker_id = serializers.CharField(read_only=True, required=False, max_length=255) locked_at = serializers.DateTimeField(read_only=True, format='iso-8601') @@ -50,7 +51,7 @@ def create(self, validated_data): try: # task creation request specifies name of a task-def. # get_or_create guarantees that a task-def of that name will exist before the task is created - task_def, created = TaskDef.objects.get_or_create(name=validated_data.pop('task_def').name) + task_def, created = TaskDef.objects.get_or_create(**validated_data.pop('task_def')) return Task.objects.create(task_def=task_def, **validated_data) except IntegrityError: raise UniqueTaskConflict() diff --git a/api/test/test_tasks.py b/api/test/test_tasks.py index 184521f..707e071 100644 --- a/api/test/test_tasks.py +++ b/api/test/test_tasks.py @@ -40,7 +40,9 @@ def test_queueing(self, mocked_now): mocked_now.return_value = test_datetime task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'data': { 'foo': 'bar' } @@ -53,7 +55,7 @@ def test_queueing(self, mocked_now): self.assertEqual(response.status_code, 201) self.assertEqual(list(response.data.keys()), task_keys) - self.assertEqual(response.data['task_def'], self.task_def_name) + self.assertEqual(response.data['task_def']['name'], self.task_def_name) ## test fields defaults self.assertEqual(response.data['status'], 'queued') @@ -62,7 +64,9 @@ def test_queueing(self, mocked_now): def test_queueing_auth(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'data': { 'foo': 'bar' } @@ -77,7 +81,9 @@ def test_queueing_auth(self): def test_queue_with_unique(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifer-2343', 'data': { 'foo': 'bar' @@ -96,7 +102,9 @@ def test_queue_with_unique(self): def test_queue_with_unique_conflict(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifer-2343', 'data': { 'foo': 'bar' @@ -120,7 +128,9 @@ def test_queue_with_unique_conflict(self): def test_update_task(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifer-2343', 'data': { 'foo': 'bar' @@ -145,7 +155,9 @@ def test_update_task(self): def test_update_task_auth(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifer-2343', 'data': { 'foo': 'bar' @@ -171,7 +183,9 @@ def test_update_task_auth(self): def test_list_tasks(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'data': { 'foo': 'bar' } @@ -198,7 +212,9 @@ def test_list_tasks(self): def test_get_task(self): task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifer-2343', 'data': { 'foo': 'bar' @@ -218,3 +234,21 @@ def test_get_task(self): self.assertEqual(task_response.status_code, 200) self.assertEqual(list(task_response.data.keys()), task_keys) + + def test_create_nonexistent_task_def(self): + task_post_data = { + 'task_def': { + 'name': 'nonexistent-task-def' + }, + 'unique': 'classifer-2343', + 'data': { + 'foo': 'bar' + } + } + + client = APIClient() + client.credentials(HTTP_AUTHORIZATION=self.token) + + task_create_response = client.post('/tasks/', task_post_data, format='json') + + self.assertEqual(task_create_response.status_code, 201) diff --git a/api/test/test_tasks_queue.py b/api/test/test_tasks_queue.py index 1a3966a..ba61818 100644 --- a/api/test/test_tasks_queue.py +++ b/api/test/test_tasks_queue.py @@ -34,7 +34,9 @@ def setUp(self, mocked_now): def schedule_task(self, client, run_at=None, priority=None): self.task_number += 1 task_post_data = { - 'task_def': self.task_def_name, + 'task_def': { + 'name': self.task_def_name + }, 'unique': 'classifier-' + str(self.task_number), 'data': { 'foo': 'bar' From c8bb9dc399be2293701225199d3003533d263368 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Wed, 28 Jun 2017 15:41:28 -0400 Subject: [PATCH 5/9] Ensures a new task-def is created from a specified name --- api/test/test_tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/test/test_tasks.py b/api/test/test_tasks.py index 707e071..c414190 100644 --- a/api/test/test_tasks.py +++ b/api/test/test_tasks.py @@ -236,9 +236,10 @@ def test_get_task(self): self.assertEqual(list(task_response.data.keys()), task_keys) def test_create_nonexistent_task_def(self): + task_def_name = 'nonexistent-task-def' task_post_data = { 'task_def': { - 'name': 'nonexistent-task-def' + 'name': task_def_name }, 'unique': 'classifer-2343', 'data': { @@ -252,3 +253,4 @@ def test_create_nonexistent_task_def(self): task_create_response = client.post('/tasks/', task_post_data, format='json') self.assertEqual(task_create_response.status_code, 201) + self.assertEqual(task_create_response.data['task_def']['name'], task_def_name) From 532d1f8fc06f3ddca22b399dae9d058b0f933784 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Wed, 5 Jul 2017 17:35:45 -0400 Subject: [PATCH 6/9] Added endpoint for completing task (for ml-worker) This endpoint is called by ml-worker when a task is completed and the completed notebook has been uploaded to core-service. The endpoint updates the status, completed_at, locked_at, and worker_id attributes of the task. --- api/views.py | 24 +++++++++++++++++++++++- task_service/settings.py | 1 + task_service/urls.py | 15 +++++++++------ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/api/views.py b/api/views.py index 631887f..deecdf3 100644 --- a/api/views.py +++ b/api/views.py @@ -111,7 +111,12 @@ def get(self, request, format=None): if 'tasks' not in request.query_params: raise ParseError('`tasks` query parameter required') - if 'worker_id' not in request.query_params: + if 'worker_id' in request.query_params: + try: + worker_id = str(request.query_params['worker_id']) + except ValueError: + raise ParseError('`worker_id` query parameter must be a string') + else: raise ParseError('`worker_id` query parameter required') if 'limit' in request.query_params: @@ -192,3 +197,20 @@ def post(self, request, id): task.save() return Response(status=204) + +class CompleteTask(APIView): + permission_classes = (TaskServicePermission,) + + def post(self, request, id): + try: + task = Task.objects.get(id=id) + except Task.DoesNotExist: + raise NotFound('Task not found') + + task.status = 'completed' + task.completed_at = datetime.datetime.utcnow() + task.locked_at = None + task.worker_id = None + task.save() + + return Response(data='Task completed', status=200) diff --git a/task_service/settings.py b/task_service/settings.py index 7ae0425..3958bb9 100644 --- a/task_service/settings.py +++ b/task_service/settings.py @@ -56,6 +56,7 @@ # development STATIC_URL = '/static/' +APPEND_SLASH = True # Database # https://docs.djangoproject.com/en/1.9/ref/settings/#databases diff --git a/task_service/urls.py b/task_service/urls.py index 5f8f0c8..c583f60 100644 --- a/task_service/urls.py +++ b/task_service/urls.py @@ -6,11 +6,14 @@ urlpatterns = [ url(r'^task-defs/?$', views.TaskDefList.as_view()), - url(r'^task-defs/(?P[a-z0-9\-_]+)$', views.TaskDefRetrieveUpdate.as_view()), + url(r'^task-defs/(?P[a-z0-9\-_]+)/?$', views.TaskDefRetrieveUpdate.as_view()), + + url(r'^tasks/queue/?$', views.PullQueue.as_view()), + url(r'^tasks/?$', views.TaskList.as_view()), - url(r'^tasks/(?P[0-9]+)$', views.TaskRetrieveUpdate.as_view()), - url(r'^tasks/queue$', views.PullQueue.as_view()), - url(r'^tasks/(?P[0-9]+)/touch$', views.TouchTask.as_view()), - url(r'^tasks/(?P[0-9]+)/release$', views.ReleaseTask.as_view()), - url(r'^tasks/(?P[0-9]+)/dequeue$', views.DequeueTask.as_view()) + url(r'^tasks/(?P[0-9]+)/?$', views.TaskRetrieveUpdate.as_view()), + url(r'^tasks/(?P[0-9]+)/touch/?$', views.TouchTask.as_view()), + url(r'^tasks/(?P[0-9]+)/release/?$', views.ReleaseTask.as_view()), + url(r'^tasks/(?P[0-9]+)/dequeue/?$', views.DequeueTask.as_view()), + url(r'^tasks/(?P[0-9]+)/complete/?$', views.CompleteTask.as_view()), ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) From f42c9a1615204299de1f18c4aaa8efd63fc7325b Mon Sep 17 00:00:00 2001 From: dcgoss Date: Thu, 6 Jul 2017 14:41:38 -0400 Subject: [PATCH 7/9] Added endpoint for task failure; updated queue tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I prefer the design pattern where a request hits an endpoint for a specific task and the server does the updating, rather than the client doing the updating and hitting a general purpose “update” endpoint where the client can update any value. I followed this pattern in 532d1f8fc06f3ddca22b399dae9d058b0f933784 and continue it here by adding in an endpoint for “failing” a task. I also updated the queue tests as they used to hit the general purpose “update” endpoint - they now hit the specific complete/fail/release/etc. endpoints. --- api/auth.py | 4 +-- api/serializers.py | 1 + api/test/test_tasks.py | 2 +- api/test/test_tasks_queue.py | 51 ++++++++++-------------------------- api/views.py | 31 ++++++++++++++++------ task_service/urls.py | 1 + 6 files changed, 42 insertions(+), 48 deletions(-) diff --git a/api/auth.py b/api/auth.py index c2d1129..58e8bf4 100644 --- a/api/auth.py +++ b/api/auth.py @@ -46,14 +46,14 @@ def has_permission(self, request, view): if request.method in permissions.SAFE_METHODS: return True - if not request.user: ## the "service" from above + if not request.user: # the "service" from above raise exceptions.NotAuthenticated() return True class QueuePullPermission(permissions.BasePermission): def has_permission(self, request, view): - if not request.user: ## the "service" from above + if not request.user: # the "service" from above raise exceptions.NotAuthenticated() return True diff --git a/api/serializers.py b/api/serializers.py index ec95c3c..40e5f7f 100644 --- a/api/serializers.py +++ b/api/serializers.py @@ -73,6 +73,7 @@ def update(self, instance, validated_data): instance.worker_id = validated_data.get('worker_id', instance.priority) instance.priority = validated_data.get('priority', instance.priority) instance.started_at = validated_data.get('started_at', instance.started_at) + instance.locked_at = validated_data.get('locked_at', instance.locked_at) instance.completed_at = completed_at instance.failed_at = failed_at instance.data = validated_data.get('data', instance.data) diff --git a/api/test/test_tasks.py b/api/test/test_tasks.py index c414190..1de32a8 100644 --- a/api/test/test_tasks.py +++ b/api/test/test_tasks.py @@ -57,7 +57,7 @@ def test_queueing(self, mocked_now): self.assertEqual(list(response.data.keys()), task_keys) self.assertEqual(response.data['task_def']['name'], self.task_def_name) - ## test fields defaults + # test fields defaults self.assertEqual(response.data['status'], 'queued') self.assertEqual(response.data['priority'], 3) self.assertEqual(response.data['run_at'], test_datetime) diff --git a/api/test/test_tasks_queue.py b/api/test/test_tasks_queue.py index ba61818..087c7da 100644 --- a/api/test/test_tasks_queue.py +++ b/api/test/test_tasks_queue.py @@ -189,61 +189,38 @@ def test_dequeue_task(self): self.assertEqual(task_response.data['status'], 'dequeued') - def test_pull_and_complete(self): - client = APIClient() - client.credentials(HTTP_AUTHORIZATION=self.token) - + def pull_and_update(self, client, is_fail): task_response = client.get('/tasks/queue?tasks=' + self.task_def_name + '&worker_id=foo') self.assertEqual(task_response.status_code, 200) self.assertEqual(len(task_response.data), 1) task = task_response.data[0] - task['completed_at'] = (datetime.utcnow() + timedelta(0,600)).isoformat() + 'Z' - - update_response = client.put('/tasks/' + str(task['id']), task, format='json') + if is_fail: + update_response = client.post('/tasks/{id}/fail/'.format(id=task['id']), task, format='json') + else: + update_response = client.post('/tasks/{id}/complete/'.format(id=task['id']), task, format='json') self.assertEqual(update_response.status_code, 200) - self.assertEqual(update_response.data['status'], 'complete') - def test_pull_and_fail(self): + return update_response + + def test_pull_and_complete(self): client = APIClient() client.credentials(HTTP_AUTHORIZATION=self.token) - task_response = client.get('/tasks/queue?tasks=' + self.task_def_name + '&worker_id=foo') - self.assertEqual(task_response.status_code, 200) - self.assertEqual(len(task_response.data), 1) + update_response = self.pull_and_update(client, False) - task = task_response.data[0] + self.assertEqual(update_response.data['status'], 'complete') - task['failed_at'] = (datetime.utcnow() + timedelta(0,600)).isoformat() + 'Z' + def test_pull_and_fail(self): + client = APIClient() + client.credentials(HTTP_AUTHORIZATION=self.token) - update_response = client.put('/tasks/' + str(task['id']), task, format='json') + update_response = self.pull_and_update(client, True) - self.assertEqual(update_response.status_code, 200) self.assertEqual(update_response.data['status'], 'failed') - def pull_and_update(self, client, is_fail): - task_response = client.get('/tasks/queue?tasks=' + self.task_def_name + '&worker_id=foo') - self.assertEqual(task_response.status_code, 200) - self.assertEqual(len(task_response.data), 1) - - task = task_response.data[0] - - update_datetime = (datetime.utcnow() + timedelta(0,600)).isoformat() + 'Z' - if is_fail: - task['failed_at'] = update_datetime - task['completed_at'] = None - else: - task['failed_at'] = None - task['completed_at'] = update_datetime - - update_response = client.put('/tasks/' + str(task['id']), task, format='json') - - self.assertEqual(update_response.status_code, 200) - - return update_response - def test_pull_retry_fail(self): client = APIClient() client.credentials(HTTP_AUTHORIZATION=self.token) diff --git a/api/views.py b/api/views.py index deecdf3..01d2b36 100644 --- a/api/views.py +++ b/api/views.py @@ -164,7 +164,7 @@ def post(self, request, id): task.locked_at = (datetime.datetime.now() + datetime.timedelta(seconds=timeout)).isoformat() + 'Z' task.save() - return Response(status=204) + return Response(data={'message': 'Task touched.'}, status=200) class ReleaseTask(APIView): permission_classes = (TaskServicePermission,) @@ -180,7 +180,7 @@ def post(self, request, id): task.worker_id = None task.save() - return Response(status=204) + return Response(data={'message': 'Task released.'}, status=200) class DequeueTask(APIView): permission_classes = (TaskServicePermission,) @@ -196,7 +196,7 @@ def post(self, request, id): task.worker_id = None task.save() - return Response(status=204) + return Response(data={'message': 'Task dequeued.'}, status=200) class CompleteTask(APIView): permission_classes = (TaskServicePermission,) @@ -206,11 +206,26 @@ def post(self, request, id): task = Task.objects.get(id=id) except Task.DoesNotExist: raise NotFound('Task not found') + task = TaskSerializer(task, data={ + 'completed_at': datetime.datetime.utcnow() + }, partial=True) + task.is_valid(raise_exception=True) + task.save() - task.status = 'completed' - task.completed_at = datetime.datetime.utcnow() - task.locked_at = None - task.worker_id = None + return Response(data=task.data, status=200) + +class FailTask(APIView): + permission_classes = (TaskServicePermission,) + + def post(self, request, id): + try: + task = Task.objects.get(id=id) + except Task.DoesNotExist: + raise NotFound('Task not found') + task = TaskSerializer(task, data={ + 'failed_at': datetime.datetime.utcnow() + }, partial=True) + task.is_valid(raise_exception=True) task.save() - return Response(data='Task completed', status=200) + return Response(data=task.data, status=200) diff --git a/task_service/urls.py b/task_service/urls.py index c583f60..8d4efe1 100644 --- a/task_service/urls.py +++ b/task_service/urls.py @@ -16,4 +16,5 @@ url(r'^tasks/(?P[0-9]+)/release/?$', views.ReleaseTask.as_view()), url(r'^tasks/(?P[0-9]+)/dequeue/?$', views.DequeueTask.as_view()), url(r'^tasks/(?P[0-9]+)/complete/?$', views.CompleteTask.as_view()), + url(r'^tasks/(?P[0-9]+)/fail/?$', views.FailTask.as_view()), ] + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) From 2d23eb0622cc40c4b2e0bd26e2b8445f1a06ec76 Mon Sep 17 00:00:00 2001 From: dcgoss Date: Mon, 10 Jul 2017 13:28:17 -0400 Subject: [PATCH 8/9] Added CircleCI integration Ported from core-service --- .dockerignore | 1 + circle.yml | 29 ++- ecr_push.sh | 3 + ecs_deploy.sh | 544 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 569 insertions(+), 8 deletions(-) create mode 100644 .dockerignore create mode 100755 ecr_push.sh create mode 100755 ecs_deploy.sh diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f7275bb --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +venv/ diff --git a/circle.yml b/circle.yml index 93de67d..3171058 100644 --- a/circle.yml +++ b/circle.yml @@ -1,9 +1,22 @@ machine: - python: - version: 3.5.1 - environment: - DB_NAME: circle_test - DB_USER: ubuntu - DB_HOST: 127.0.0.1 - services: - - postgresql + python: + version: 3.5.1 + environment: + DB_NAME: circle_test + DB_USER: ubuntu + DB_HOST: 127.0.0.1 + services: + - postgresql + - docker + +dependencies: + post: + - docker build -t $AWS_ECR_ENDPOINT.dkr.ecr.us-east-1.amazonaws.com/cognoma-task-service:$CIRCLE_SHA1 . + +deployment: + production: + branch: master + commands: + - chmod +x ecr_push.sh ecs_deploy.sh + - ./ecr_push.sh + - ./ecs_deploy.sh -t 180 -c $AWS_CLUSTER_NAME -n $AWS_SERVICE_NAME -i $AWS_ECR_ENDPOINT.dkr.ecr.us-east-1.amazonaws.com/cognoma-task-service:$CIRCLE_SHA1 \ No newline at end of file diff --git a/ecr_push.sh b/ecr_push.sh new file mode 100755 index 0000000..77c28ee --- /dev/null +++ b/ecr_push.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +eval $(aws ecr get-login --region $AWS_DEFAULT_REGION) +docker push $AWS_ECR_ENDPOINT.dkr.ecr.us-east-1.amazonaws.com/cognoma-task-service:$CIRCLE_SHA1 \ No newline at end of file diff --git a/ecs_deploy.sh b/ecs_deploy.sh new file mode 100755 index 0000000..d4d8f94 --- /dev/null +++ b/ecs_deploy.sh @@ -0,0 +1,544 @@ +#!/usr/bin/env bash +# source: https://github.com/silinternational/ecs-deploy + +# Setup default values for variables +CLUSTER=false +SERVICE=false +TASK_DEFINITION=false +MAX_DEFINITIONS=0 +IMAGE=false +MIN=false +MAX=false +TIMEOUT=90 +VERBOSE=false +TAGVAR=false +TAGONLY="" +ENABLE_ROLLBACK=false +AWS_CLI=$(which aws) +AWS_ECS="$AWS_CLI --output json ecs" + +function usage() { + cat < /dev/null 2>&1 || { + echo "Some of the required software is not installed:" + echo " please install $1" >&2; + exit 4; + } +} + +# Check that all required variables/combinations are set +function assertRequiredArgumentsSet() { + + # AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION and AWS_PROFILE can be set as environment variables + if [ -z ${AWS_ACCESS_KEY_ID+x} ]; then unset AWS_ACCESS_KEY_ID; fi + if [ -z ${AWS_SECRET_ACCESS_KEY+x} ]; then unset AWS_SECRET_ACCESS_KEY; fi + if [ -z ${AWS_DEFAULT_REGION+x} ]; + then unset AWS_DEFAULT_REGION + else + AWS_ECS="$AWS_ECS --region $AWS_DEFAULT_REGION" + fi + if [ -z ${AWS_PROFILE+x} ]; + then unset AWS_PROFILE + else + AWS_ECS="$AWS_ECS --profile $AWS_PROFILE" + fi + + if [ $SERVICE == false ] && [ $TASK_DEFINITION == false ]; then + echo "One of SERVICE or TASK DEFINITON is required. You can pass the value using -n / --service-name for a service, or -d / --task-definition for a task" + exit 5 + fi + if [ $SERVICE != false ] && [ $TASK_DEFINITION != false ]; then + echo "Only one of SERVICE or TASK DEFINITON may be specified, but you supplied both" + exit 6 + fi + if [ $SERVICE != false ] && [ $CLUSTER == false ]; then + echo "CLUSTER is required. You can pass the value using -c or --cluster" + exit 7 + fi + if [ $IMAGE == false ]; then + echo "IMAGE is required. You can pass the value using -i or --image" + exit 8 + fi + if ! [[ $MAX_DEFINITIONS =~ ^-?[0-9]+$ ]]; then + echo "MAX_DEFINITIONS must be numeric, or not defined." + exit 9 + fi + +} + +function parseImageName() { + + # Define regex for image name + # This regex will create groups for: + # - domain + # - port + # - repo + # - image + # - tag + # If a group is missing it will be an empty string + if [[ "x$TAGONLY" == "x" ]]; then + imageRegex="^([a-zA-Z0-9\.\-]+):?([0-9]+)?/([a-zA-Z0-9\._\-]+)(/[\/a-zA-Z0-9\._\-]+)?:?([a-zA-Z0-9\._\-]+)?$" + else + imageRegex="^:?([a-zA-Z0-9\._-]+)?$" + fi + + if [[ $IMAGE =~ $imageRegex ]]; then + # Define variables from matching groups + if [[ "x$TAGONLY" == "x" ]]; then + domain=${BASH_REMATCH[1]} + port=${BASH_REMATCH[2]} + repo=${BASH_REMATCH[3]} + img=${BASH_REMATCH[4]/#\//} + tag=${BASH_REMATCH[5]} + + # Validate what we received to make sure we have the pieces needed + if [[ "x$domain" == "x" ]]; then + echo "Image name does not contain a domain or repo as expected. See usage for supported formats." + exit 10; + fi + if [[ "x$repo" == "x" ]]; then + echo "Image name is missing the actual image name. See usage for supported formats." + exit 11; + fi + + # When a match for image is not found, the image name was picked up by the repo group, so reset variables + if [[ "x$img" == "x" ]]; then + img=$repo + repo="" + fi + else + tag=${BASH_REMATCH[1]} + fi + else + # check if using root level repo with format like mariadb or mariadb:latest + rootRepoRegex="^([a-zA-Z0-9\-]+):?([a-zA-Z0-9\.\-]+)?$" + if [[ $IMAGE =~ $rootRepoRegex ]]; then + img=${BASH_REMATCH[1]} + if [[ "x$img" == "x" ]]; then + echo "Invalid image name. See usage for supported formats." + exit 12 + fi + tag=${BASH_REMATCH[2]} + else + echo "Unable to parse image name: $IMAGE, check the format and try again" + exit 13 + fi + fi + + # If tag is missing make sure we can get it from env var, or use latest as default + if [[ "x$tag" == "x" ]]; then + if [[ $TAGVAR == false ]]; then + tag="latest" + else + tag=${!TAGVAR} + if [[ "x$tag" == "x" ]]; then + tag="latest" + fi + fi + fi + + # Reassemble image name + if [[ "x$TAGONLY" == "x" ]]; then + + if [[ ! -z ${domain+undefined-guard} ]]; then + useImage="$domain" + fi + if [[ ! -z ${port} ]]; then + useImage="$useImage:$port" + fi + if [[ ! -z ${repo+undefined-guard} ]]; then + if [[ ! "x$repo" == "x" ]]; then + useImage="$useImage/$repo" + fi + fi + if [[ ! -z ${img+undefined-guard} ]]; then + if [[ "x$useImage" == "x" ]]; then + useImage="$img" + else + useImage="$useImage/$img" + fi + fi + imageWithoutTag="$useImage" + if [[ ! -z ${tag+undefined-guard} ]]; then + useImage="$useImage:$tag" + fi + + else + useImage="$TAGONLY" + fi + + # If in test mode output $useImage + if [ "$BASH_SOURCE" != "$0" ]; then + echo $useImage + fi +} + +function getCurrentTaskDefinition() { + if [ $SERVICE != false ]; then + # Get current task definition name from service + TASK_DEFINITION_ARN=`$AWS_ECS describe-services --services $SERVICE --cluster $CLUSTER | jq -r .services[0].taskDefinition` + TASK_DEFINITION=`$AWS_ECS describe-task-definition --task-def $TASK_DEFINITION_ARN` + fi +} + +function createNewTaskDefJson() { + # Get a JSON representation of the current task definition + # + Update definition to use new image name + # + Filter the def + if [[ "x$TAGONLY" == "x" ]]; then + DEF=$( echo "$TASK_DEFINITION" \ + | sed -e "s|\"image\": *\"${imageWithoutTag}:.*\"|\"image\": \"${useImage}\"|g" \ + | sed -e "s|\"image\": *\"${imageWithoutTag}\"|\"image\": \"${useImage}\"|g" \ + | jq '.taskDefinition' ) + else + DEF=$( echo "$TASK_DEFINITION" \ + | sed -e "s|\(\"image\": *\".*:\)\(.*\)\"|\1${useImage}\"|g" \ + | jq '.taskDefinition' ) + fi + + # Default JQ filter for new task definition + NEW_DEF_JQ_FILTER="family: .family, volumes: .volumes, containerDefinitions: .containerDefinitions" + + # Some options in task definition should only be included in new definition if present in + # current definition. If found in current definition, append to JQ filter. + CONDITIONAL_OPTIONS=(networkMode taskRoleArn placementConstraints) + for i in "${CONDITIONAL_OPTIONS[@]}"; do + re=".*${i}.*" + if [[ "$DEF" =~ $re ]]; then + NEW_DEF_JQ_FILTER="${NEW_DEF_JQ_FILTER}, ${i}: .${i}" + fi + done + + # Build new DEF with jq filter + NEW_DEF=$(echo $DEF | jq "{${NEW_DEF_JQ_FILTER}}") + + # If in test mode output $NEW_DEF + if [ "$BASH_SOURCE" != "$0" ]; then + echo $NEW_DEF + fi +} + +function registerNewTaskDefinition() { + # Register the new task definition, and store its ARN + NEW_TASKDEF=`$AWS_ECS register-task-definition --cli-input-json "$NEW_DEF" | jq -r .taskDefinition.taskDefinitionArn` +} + +function rollback() { + echo "Rolling back to ${TASK_DEFINITION_ARN}" + $AWS_ECS update-service --cluster $CLUSTER --service $SERVICE --task-definition $TASK_DEFINITION_ARN > /dev/null +} + +function updateService() { + UPDATE_SERVICE_SUCCESS="false" + DEPLOYMENT_CONFIG="" + if [ $MAX != false ]; then + DEPLOYMENT_CONFIG=",maximumPercent=$MAX" + fi + if [ $MIN != false ]; then + DEPLOYMENT_CONFIG="$DEPLOYMENT_CONFIG,minimumHealthyPercent=$MIN" + fi + if [ ! -z "$DEPLOYMENT_CONFIG" ]; then + DEPLOYMENT_CONFIG="--deployment-configuration ${DEPLOYMENT_CONFIG:1}" + fi + + DESIRED_COUNT="" + if [ ! -z ${DESIRED+undefined-guard} ]; then + DESIRED_COUNT="--desired-count $DESIRED" + fi + + # Update the service + UPDATE=`$AWS_ECS update-service --cluster $CLUSTER --service $SERVICE $DESIRED_COUNT --task-definition $NEW_TASKDEF $DEPLOYMENT_CONFIG` + + # Only excepts RUNNING state from services whose desired-count > 0 + SERVICE_DESIREDCOUNT=`$AWS_ECS describe-services --cluster $CLUSTER --service $SERVICE | jq '.services[]|.desiredCount'` + if [ $SERVICE_DESIREDCOUNT -gt 0 ]; then + # See if the service is able to come up again + every=10 + i=0 + while [ $i -lt $TIMEOUT ] + do + # Scan the list of running tasks for that service, and see if one of them is the + # new version of the task definition + + RUNNING_TASKS=$($AWS_ECS list-tasks --cluster "$CLUSTER" --service-name "$SERVICE" --desired-status RUNNING \ + | jq -r '.taskArns[]') + + if [[ ! -z $RUNNING_TASKS ]] ; then + RUNNING=$($AWS_ECS describe-tasks --cluster "$CLUSTER" --tasks $RUNNING_TASKS \ + | jq ".tasks[]| if .taskDefinitionArn == \"$NEW_TASKDEF\" then . else empty end|.lastStatus" \ + | grep -e "RUNNING") || : + + if [ "$RUNNING" ]; then + echo "Service updated successfully, new task definition running."; + + if [[ $MAX_DEFINITIONS -gt 0 ]]; then + FAMILY_PREFIX=${TASK_DEFINITION_ARN##*:task-definition/} + FAMILY_PREFIX=${FAMILY_PREFIX%*:[0-9]*} + TASK_REVISIONS=`$AWS_ECS list-task-definitions --family-prefix $FAMILY_PREFIX --status ACTIVE --sort ASC` + NUM_ACTIVE_REVISIONS=$(echo "$TASK_REVISIONS" | jq ".taskDefinitionArns|length") + if [[ $NUM_ACTIVE_REVISIONS -gt $MAX_DEFINITIONS ]]; then + LAST_OUTDATED_INDEX=$(($NUM_ACTIVE_REVISIONS - $MAX_DEFINITIONS - 1)) + for i in $(seq 0 $LAST_OUTDATED_INDEX); do + OUTDATED_REVISION_ARN=$(echo "$TASK_REVISIONS" | jq -r ".taskDefinitionArns[$i]") + + echo "Deregistering outdated task revision: $OUTDATED_REVISION_ARN" + + $AWS_ECS deregister-task-definition --task-definition "$OUTDATED_REVISION_ARN" > /dev/null + done + fi + + fi + UPDATE_SERVICE_SUCCESS="true" + break + fi + fi + + sleep $every + i=$(( $i + $every )) + done + + if [[ "${UPDATE_SERVICE_SUCCESS}" != "true" ]]; then + # Timeout + echo "ERROR: New task definition not running within $TIMEOUT seconds" + if [[ "${ENABLE_ROLLBACK}" != "false" ]]; then + rollback + fi + exit 1 + fi + else + echo "Skipping check for running task definition, as desired-count <= 0" + fi +} + +function waitForGreenDeployment { + DEPLOYMENT_SUCCESS="false" + every=2 + i=0 + echo "Waiting for service deployment to complete..." + while [ $i -lt $TIMEOUT ] + do + NUM_DEPLOYMENTS=$($AWS_ECS describe-services --services $SERVICE --cluster $CLUSTER | jq "[.services[].deployments[]] | length") + + # Wait to see if more than 1 deployment stays running + # If the wait time has passed, we need to roll back + if [ $NUM_DEPLOYMENTS -eq 1 ]; then + echo "Service deployment successful." + DEPLOYMENT_SUCCESS="true" + # Exit the loop. + i=$TIMEOUT + else + sleep $every + i=$(( $i + $every )) + fi + done + + if [[ "${DEPLOYMENT_SUCCESS}" != "true" ]]; then + if [[ "${ENABLE_ROLLBACK}" != "false" ]]; then + rollback + fi + exit 1 + fi +} + +###################################################### +# When not being tested, run application as expected # +###################################################### +if [ "$BASH_SOURCE" == "$0" ]; then + set -o errexit + set -o pipefail + set -u + set -e + # If no args are provided, display usage information + if [ $# == 0 ]; then usage; fi + + # Check for AWS, AWS Command Line Interface + require aws + # Check for jq, Command-line JSON processor + require jq + + # Loop through arguments, two at a time for key and value + while [[ $# -gt 0 ]] + do + key="$1" + + case $key in + -k|--aws-access-key) + AWS_ACCESS_KEY_ID="$2" + shift # past argument + ;; + -s|--aws-secret-key) + AWS_SECRET_ACCESS_KEY="$2" + shift # past argument + ;; + -r|--region) + AWS_DEFAULT_REGION="$2" + shift # past argument + ;; + -p|--profile) + AWS_PROFILE="$2" + shift # past argument + ;; + --aws-instance-profile) + echo "--aws-instance-profile is not yet in use" + AWS_IAM_ROLE=true + ;; + -c|--cluster) + CLUSTER="$2" + shift # past argument + ;; + -n|--service-name) + SERVICE="$2" + shift # past argument + ;; + -d|--task-definition) + TASK_DEFINITION="$2" + shift + ;; + -i|--image) + IMAGE="$2" + shift + ;; + -t|--timeout) + TIMEOUT="$2" + shift + ;; + -m|--min) + MIN="$2" + shift + ;; + -M|--max) + MAX="$2" + shift + ;; + -D|--desired-count) + DESIRED="$2" + shift + ;; + -e|--tag-env-var) + TAGVAR="$2" + shift + ;; + -to|--tag-only) + TAGONLY="$2" + shift + ;; + --max-definitions) + MAX_DEFINITIONS="$2" + shift + ;; + --enable-rollback) + ENABLE_ROLLBACK=true + ;; + -v|--verbose) + VERBOSE=true + ;; + *) + usage + exit 2 + ;; + esac + shift # past argument or value + done + + if [ $VERBOSE == true ]; then + set -x + fi + + # Check that required arguments are provided + assertRequiredArgumentsSet + + # Determine image name + parseImageName + echo "Using image name: $useImage" + + # Get current task definition + getCurrentTaskDefinition + echo "Current task definition: $TASK_DEFINITION_ARN"; + + # create new task definition json + createNewTaskDefJson + + # register new task definition + registerNewTaskDefinition + echo "New task definition: $NEW_TASKDEF"; + + # update service if needed + if [ $SERVICE == false ]; then + echo "Task definition updated successfully" + else + updateService + + waitForGreenDeployment + fi + + exit 0 + +fi +############################# +# End application run logic # +############################# \ No newline at end of file From 939a1e12906bb8de752592e9ede5e2f5980f87ed Mon Sep 17 00:00:00 2001 From: dcgoss Date: Mon, 10 Jul 2017 15:22:51 -0400 Subject: [PATCH 9/9] Fixed task queue test status codes from 204 -> 200 A 204 status code delivers no data in the response. A 200 allows for data to be returned. --- api/test/test_tasks_queue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/test/test_tasks_queue.py b/api/test/test_tasks_queue.py index 087c7da..d99142b 100644 --- a/api/test/test_tasks_queue.py +++ b/api/test/test_tasks_queue.py @@ -137,7 +137,7 @@ def test_touching_task(self): task = task_response.data[0] touch_response = client.post('/tasks/' + str(task['id']) + '/touch?timeout=300') - self.assertEqual(touch_response.status_code, 204) + self.assertEqual(touch_response.status_code, 200) task_response = client.get('/tasks/' + str(task['id'])) self.assertEqual(task_response.status_code, 200) @@ -162,7 +162,7 @@ def test_release_task(self): self.assertEqual(task['status'], 'in_progress') release_response = client.post('/tasks/' + str(task['id']) + '/release') - self.assertEqual(release_response.status_code, 204) + self.assertEqual(release_response.status_code, 200) task_response = client.get('/tasks/' + str(task['id'])) self.assertEqual(task_response.status_code, 200) @@ -182,7 +182,7 @@ def test_dequeue_task(self): self.assertEqual(task['status'], 'in_progress') dequeue_response = client.post('/tasks/' + str(task['id']) + '/dequeue') - self.assertEqual(dequeue_response.status_code, 204) + self.assertEqual(dequeue_response.status_code, 200) task_response = client.get('/tasks/' + str(task['id'])) self.assertEqual(task_response.status_code, 200)