Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core-service & ml-workers integration #12

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Cognoma task-service

This repository houses the service, accessible by HTTP RESTful API, responsable for manageing backround tasks within the Cognoma application backend.
This repository houses the service, accessible by HTTP RESTful API, responsible for managing background tasks within the Cognoma application backend.

## Getting started

Expand Down
4 changes: 2 additions & 2 deletions api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return True

if not request.user: ## the "service" from above
if not request.user: # the "service" from above
raise exceptions.NotAuthenticated()

return True

class QueuePullPermission(permissions.BasePermission):
def has_permission(self, request, view):
if not request.user: ## the "service" from above
if not request.user: # the "service" from above
raise exceptions.NotAuthenticated()

return True
20 changes: 20 additions & 0 deletions api/migrations/0005_change_priority_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.8 on 2017-06-26 21:33
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api', '0004_remove_priority_levels'),
]

operations = [
migrations.AlterField(
model_name='task',
name='priority',
field=models.PositiveSmallIntegerField(choices=[(1, 'Critical'), (2, 'High'), (3, 'Normal'), (4, 'Low')], default=3),
),
]
12 changes: 6 additions & 6 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
)

PRIORITY_CHOICES = (
("critical", "Critical"),
("high", "High"),
("normal", "Normal"),
("low", "Low")
(1, "Critical"),
(2, "High"),
(3, "Normal"),
(4, "Low")
)

class TaskDef(models.Model):
Expand All @@ -31,7 +31,7 @@ class Meta:
validators=[
RegexValidator(
regex='^[a-z0-9\-_]+$',
message='Task definition name can only contain lowercase alphanumeric charaters, dashes, and underscores.',
message='Task definition name can only contain lowercase alphanumeric characters, dashes, and underscores.',
)
]
)
Expand All @@ -50,7 +50,7 @@ class Meta:
status = models.CharField(choices=STATUS_CHOICES, max_length=17, default='queued')
worker_id = models.CharField(null=True, max_length=255)
locked_at = models.DateTimeField(null=True)
priority = models.CharField(choices=PRIORITY_CHOICES, max_length=8, default="normal")
priority = models.PositiveSmallIntegerField(choices=PRIORITY_CHOICES, default=3)
unique = models.CharField(null=True, max_length=255)
run_at = models.DateTimeField(default=lambda: timezone.now())
started_at = models.DateTimeField(null=True)
Expand Down
15 changes: 2 additions & 13 deletions api/queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from django.db import connection

from api.models import TaskDef, Task

get_task_sql = """
Expand All @@ -16,19 +15,9 @@
(NOW() > (locked_at + INTERVAL '1 second' * task_defs.default_timeout))) OR
(status = 'failed_retrying' AND
attempts < task_defs.max_attempts))
ORDER BY
CASE WHEN priority = 'critical'
THEN 1
WHEN priority = 'high'
THEN 2
WHEN priority = 'normal'
THEN 3
WHEN priority = 'low'
THEN 4
END,
run_at
LIMIT %s
ORDER BY priority, run_at
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep ints make much more sense!

FOR UPDATE SKIP LOCKED
LIMIT %s
)
UPDATE tasks SET
status = 'in_progress',
Expand Down
13 changes: 9 additions & 4 deletions api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def update(self, instance, validated_data):
class TaskSerializer(serializers.Serializer):
id = serializers.IntegerField(read_only=True)
task_def = serializers.PrimaryKeyRelatedField(required=True, queryset=TaskDef.objects.all())
task_def = TaskDefSerializer()
status = serializers.CharField(read_only=True)
worker_id = serializers.CharField(read_only=True, required=False, max_length=255)
locked_at = serializers.DateTimeField(read_only=True, format='iso-8601')
Expand All @@ -48,27 +49,31 @@ class TaskSerializer(serializers.Serializer):

def create(self, validated_data):
try:
return Task.objects.create(**validated_data)
# task creation request specifies name of a task-def.
# get_or_create guarantees that a task-def of that name will exist before the task is created
task_def, created = TaskDef.objects.get_or_create(**validated_data.pop('task_def'))
return Task.objects.create(task_def=task_def, **validated_data)
except IntegrityError:
raise UniqueTaskConflict()

def update(self, instance, validated_data):
failed_at = validated_data.get('failed_at', None)
completed_at = validated_data.get('completed_at', None)

if failed_at == None and completed_at != None:
if failed_at is None and completed_at is not None:
instance.status = 'complete'
elif failed_at != None and completed_at == None:
elif failed_at is not None and completed_at is None:
if instance.attempts >= instance.task_def.max_attempts:
instance.status = 'failed'
else:
instance.status = 'failed_retrying'
elif failed_at != None and completed_at != None:
elif failed_at is not None and completed_at is not None:
raise exceptions.ValidationError('`failed_at` and `completed_at` cannot be both non-null at the same time.')

instance.worker_id = validated_data.get('worker_id', instance.priority)
instance.priority = validated_data.get('priority', instance.priority)
instance.started_at = validated_data.get('started_at', instance.started_at)
instance.locked_at = validated_data.get('locked_at', instance.locked_at)
instance.completed_at = completed_at
instance.failed_at = failed_at
instance.data = validated_data.get('data', instance.data)
Expand Down
69 changes: 52 additions & 17 deletions api/test/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timezone
from datetime import datetime
from unittest.mock import patch

from rest_framework.test import APITestCase, APIClient
Expand Down Expand Up @@ -37,11 +37,12 @@ def setUp(self):
@patch('django.utils.timezone.now')
def test_queueing(self, mocked_now):
test_datetime = datetime.utcnow().isoformat() + 'Z'

mocked_now.return_value = test_datetime

task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'data': {
'foo': 'bar'
}
Expand All @@ -54,16 +55,18 @@ def test_queueing(self, mocked_now):

self.assertEqual(response.status_code, 201)
self.assertEqual(list(response.data.keys()), task_keys)
self.assertEqual(response.data['task_def'], self.task_def_name)
self.assertEqual(response.data['task_def']['name'], self.task_def_name)

## test fields defaults
# test fields defaults
self.assertEqual(response.data['status'], 'queued')
self.assertEqual(response.data['priority'], 'normal')
self.assertEqual(response.data['priority'], 3)
self.assertEqual(response.data['run_at'], test_datetime)

def test_queueing_auth(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'data': {
'foo': 'bar'
}
Expand All @@ -78,7 +81,9 @@ def test_queueing_auth(self):

def test_queue_with_unique(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
Expand All @@ -97,7 +102,9 @@ def test_queue_with_unique(self):

def test_queue_with_unique_conflict(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
Expand All @@ -121,7 +128,9 @@ def test_queue_with_unique_conflict(self):

def test_update_task(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
Expand All @@ -136,17 +145,19 @@ def test_update_task(self):
self.assertEqual(create_response.status_code, 201)

update = create_response.data
update['priority'] = 'high'
update['priority'] = 2

update_response = client.put('/tasks/' + str(update['id']), update, format='json')

self.assertEqual(update_response.status_code, 200)
self.assertEqual(list(update_response.data.keys()), task_keys)
self.assertEqual(update_response.data['priority'], 'high')
self.assertEqual(update_response.data['priority'], 2)

def test_update_task_auth(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
Expand All @@ -163,7 +174,7 @@ def test_update_task_auth(self):
client = APIClient() # clear token

update = create_response.data
update['priority'] = 'high'
update['priority'] = 2

update_response = client.put('/tasks/' + str(update['id']), update, format='json')

Expand All @@ -172,7 +183,9 @@ def test_update_task_auth(self):

def test_list_tasks(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'data': {
'foo': 'bar'
}
Expand All @@ -199,7 +212,9 @@ def test_list_tasks(self):

def test_get_task(self):
task_post_data = {
'task_def': self.task_def_name,
'task_def': {
'name': self.task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
Expand All @@ -209,7 +224,7 @@ def test_get_task(self):
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=self.token)

task_create_response = client.post('/tasks', task_post_data, format='json')
task_create_response = client.post('/tasks/', task_post_data, format='json')

self.assertEqual(task_create_response.status_code, 201)

Expand All @@ -219,3 +234,23 @@ def test_get_task(self):

self.assertEqual(task_response.status_code, 200)
self.assertEqual(list(task_response.data.keys()), task_keys)

def test_create_nonexistent_task_def(self):
task_def_name = 'nonexistent-task-def'
task_post_data = {
'task_def': {
'name': task_def_name
},
'unique': 'classifer-2343',
'data': {
'foo': 'bar'
}
}

client = APIClient()
client.credentials(HTTP_AUTHORIZATION=self.token)

task_create_response = client.post('/tasks/', task_post_data, format='json')

self.assertEqual(task_create_response.status_code, 201)
self.assertEqual(task_create_response.data['task_def']['name'], task_def_name)
Loading