Skip to content

Commit

Permalink
Merge pull request #1081 from openradx/cancel-job-feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jun 25, 2024
2 parents 934b007 + 3502acb commit 8facc1a
Show file tree
Hide file tree
Showing 22 changed files with 919 additions and 52 deletions.
1 change: 1 addition & 0 deletions docs/howto/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ advanced/context
advanced/locks
advanced/schedule
advanced/priorities
advanced/cancellation
advanced/queueing_locks
advanced/cron
advanced/retry
Expand Down
72 changes: 72 additions & 0 deletions docs/howto/advanced/cancellation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Cancel a job

We can cancel a job that has not yet been processed by a worker. We can also
mark a job that is currently being processed for abortion, but this request
has to be handled by the task itself.

## Cancel a job (that is not being processed yet)

```python
# by using the sync method
app.job_manager.cancel_job_by_id(33)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33)
```

## Delete the cancelled job

A cancelled job can also be deleted from the database.

```python
# by using the sync method
app.job_manager.cancel_job_by_id(33, delete_job=True)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33, delete_job=True)
```

## Mark a currently being processed job for abortion

If a worker has not picked up the job yet, the below command behaves like the
command without the `abort` option. But if a job is already in the middle of
being processed, the `abort` option marks this job for abortion (see below
how to handle this request).

```python
# by using the sync method
app.job_manager.cancel_job_by_id(33, abort=True)
# or by using the async method
await app.job_manager.cancel_job_by_id_async(33, abort=True)
```

## Handle a abortion request inside the task

In our task, we can check (for example, periodically) if the task should be
aborted. If we want to respect that request (we don't have to), we raise a
`JobAborted` error. Any message passed to `JobAborted` (e.g.
`raise JobAborted("custom message")`) will end up in the logs.

```python
@app.task(pass_context=True)
def my_task(context):
for i in range(100):
if context.should_abort():
raise exceptions.JobAborted
do_something_expensive()
```

There is also an async API

```python
@app.task(pass_context=True)
async def my_task(context):
for i in range(100):
if await context.should_abort_async():
raise exceptions.JobAborted
do_something_expensive()
```

:::{warning}
`context.should_abort()` and `context.should_abort_async()` does poll the
database and might flood the database. Ensure you do it only sometimes and
not from too many parallel tasks.
:::
47 changes: 47 additions & 0 deletions procrastinate/contrib/django/migrations/0028_add_cancel_states.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

from django.db import migrations, models

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(name="02.06.00_01_add_cancel_states.sql"),
migrations.AlterField(
"procrastinatejob",
"status",
models.CharField(
choices=[
("todo", "todo"),
("doing", "doing"),
("succeeded", "succeeded"),
("failed", "failed"),
("cancelled", "cancelled"),
("aborting", "aborting"),
("aborted", "aborted"),
],
max_length=32,
),
),
migrations.AlterField(
"procrastinateevent",
"type",
models.CharField(
choices=[
("deferred", "deferred"),
("started", "started"),
("deferred_for_retry", "deferred_for_retry"),
("failed", "failed"),
("succeeded", "succeeded"),
("cancelled", "cancelled"),
("abort_requested", "abort_requested"),
("aborted", "aborted"),
("scheduled", "scheduled"),
],
max_length=32,
),
),
]
name = "0028_add_cancel_states"
dependencies = [("procrastinate", "0027_add_periodic_job_priority")]
5 changes: 5 additions & 0 deletions procrastinate/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class ProcrastinateJob(ProcrastinateReadOnlyModelMixin, models.Model):
"doing",
"succeeded",
"failed",
"cancelled",
"aborting",
"aborted",
)
id = models.BigAutoField(primary_key=True)
queue_name = models.CharField(max_length=128)
Expand Down Expand Up @@ -91,6 +94,8 @@ class ProcrastinateEvent(ProcrastinateReadOnlyModelMixin, models.Model):
"failed",
"succeeded",
"cancelled",
"abort_requested",
"aborted",
"scheduled",
)
id = models.BigAutoField(primary_key=True)
Expand Down
6 changes: 6 additions & 0 deletions procrastinate/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def __init__(
self.critical = critical


class JobAborted(ProcrastinateException):
"""
Job was aborted.
"""


class AppNotOpen(ProcrastinateException):
"""
App was not open. Procrastinate App needs to be opened using:
Expand Down
18 changes: 18 additions & 0 deletions procrastinate/job_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,21 @@ def job_description(self, current_timestamp: float) -> str:
message += "no current job"

return message

def should_abort(self) -> bool:
assert self.app
assert self.job
assert self.job.id

job_id = self.job.id
status = self.app.job_manager.get_job_status(job_id)
return status == jobs.Status.ABORTING

async def should_abort_async(self) -> bool:
assert self.app
assert self.job
assert self.job.id

job_id = self.job.id
status = await self.app.job_manager.get_job_status_async(job_id)
return status == jobs.Status.ABORTING
3 changes: 3 additions & 0 deletions procrastinate/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Status(Enum):
DOING = "doing" #: A worker is running the job
SUCCEEDED = "succeeded" #: The job ended successfully
FAILED = "failed" #: The job ended with an error
CANCELLED = "cancelled" #: The job was cancelled
ABORTING = "aborting" #: The job is requested to be aborted
ABORTED = "aborted" #: The job was aborted


@attr.dataclass(frozen=True, kw_only=True)
Expand Down
Loading

0 comments on commit 8facc1a

Please sign in to comment.