Skip to content

Commit

Permalink
Merge pull request #1283 from procrastinate-org/add_job_context_task
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jan 12, 2025
2 parents f9a6f44 + 6a390a4 commit a5eebcd
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 52 deletions.
115 changes: 66 additions & 49 deletions docs/howto/advanced/retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,36 @@ app / machine reboots.

## Simple strategies

- Retry 5 times (so 6 attempts total):

```python
@app.task(retry=5)
def flaky_task():
if random.random() > 0.9:
raise Exception("Who could have seen this coming?")
print("Hello world")
```

- Retry indefinitely:

```python
@app.task(retry=True)
def flaky_task():
if random.random() > 0.9:
raise Exception("Who could have seen this coming?")
print("Hello world")
```
- Retry 5 times (so 6 attempts total):

```python
@app.task(retry=5)
def flaky_task():
if random.random() > 0.9:
raise Exception("Who could have seen this coming?")
print("Hello world")
```

- Retry indefinitely:

```python
@app.task(retry=True)
def flaky_task():
if random.random() > 0.9:
raise Exception("Who could have seen this coming?")
print("Hello world")
```

## Advanced strategies

Advanced strategies let you:

- define a maximum number of retries (if you don't, jobs will be retried indefinitely
until they pass)
- define the retry delay, with constant, linear and exponential backoff options (if
you don't, jobs will be retried immediately)
- define the exception types you want to retry on (if you don't, jobs will be retried
on any type of exceptions)
- define a maximum number of retries (if you don't, jobs will be retried indefinitely
until they pass)
- define the retry delay, with constant, linear and exponential backoff options (if
you don't, jobs will be retried immediately)
- define the exception types you want to retry on (if you don't, jobs will be retried
on any type of exceptions)

Define your precise strategy using a {py:class}`RetryStrategy` instance:

Expand All @@ -57,9 +57,9 @@ def my_other_task():
{py:class}`RetryStrategy` takes 3 parameters related to how long it will wait
between retries:

- `wait=5` to wait 5 seconds before each retry
- `linear_wait=5` to wait 5 seconds then 10 then 15 and so on
- `exponential_wait=5` to wait 5 seconds then 25 then 125 and so on
- `wait=5` to wait 5 seconds before each retry
- `linear_wait=5` to wait 5 seconds then 10 then 15 and so on
- `exponential_wait=5` to wait 5 seconds then 25 then 125 and so on

## Implementing your own strategy

Expand All @@ -73,28 +73,45 @@ The time to wait between retries can be specified with `retry_in` or alternative
with `retry_at`. This is similar to how `schedule_in` and `schedule_at` are used
when {doc}`scheduling a job in the future <schedule>`.

```python
import random
from procrastinate import Job, RetryDecision

class RandomRetryStrategy(procrastinate.BaseRetryStrategy):
max_attempts = 3
min = 1
max = 10
```python
import random
from procrastinate import Job, RetryDecision

class RandomRetryStrategy(procrastinate.BaseRetryStrategy):
max_attempts = 3
min = 1
max = 10

def get_retry_decision(self, *, exception:Exception, job:Job) -> RetryDecision:
if job.attempts >= max_attempts:
return RetryDecision(should_retry=False)

wait = random.uniform(self.min, self.max)

return RetryDecision(
retry_in={"seconds": wait}, # or retry_at (a datetime object)
priority=job.priority + 1, # optional
queue="another_queue", # optional
lock="another_lock", # optional
)
```

def get_retry_decision(self, *, exception:Exception, job:Job) -> RetryDecision:
if job.attempts >= max_attempts:
return RetryDecision(should_retry=False)
There is also a legacy `get_schedule_in` method that is deprecated an will be
removed in a future version in favor of the above `get_retry_decision` method.

wait = random.uniform(self.min, self.max)
## Knowing whether a job is on its last attempt

return RetryDecision(
retry_in={"seconds": wait}, # or retry_at (a datetime object)
priority=job.priority + 1, # optional
queue="another_queue", # optional
lock="another_lock", # optional
)
```
By using `pass_context=True`, and introspecting the task's retry strategy,
you can know whether a currently executing job is on its last attempt:

There is also a legacy `get_schedule_in` method that is deprecated an will be
removed in a future version in favor of the above `get_retry_decision` method.
```python
@app.task(retry=10, pass_context=True)
def my_task(job_context: procrastinate.JobContext) -> None:
job = job_context.job
task = job_context.task
if task.retry.get_retry_decision(exception=Exception(), job=job) is None:
print("Warning: last attempt!")

if random.random() < 0.9:
raise Exception
```
4 changes: 2 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ When tasks are created with argument ``pass_context``, they are provided a
`JobContext` argument:

.. autoclass:: procrastinate.JobContext
:members: app, worker_name, worker_queues, job
:members: app, worker_name, worker_queues, job, task, should_abort

Blueprints
----------
Expand Down Expand Up @@ -80,7 +80,7 @@ Exceptions
.. automodule:: procrastinate.exceptions
:members: ProcrastinateException, LoadFromPathError,
ConnectorException, AlreadyEnqueued, AppNotOpen, TaskNotFound,
UnboundTaskError
UnboundTaskError, JobAborted

Job statuses
------------
Expand Down
16 changes: 15 additions & 1 deletion procrastinate/job_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import attr

from procrastinate import app as app_module
from procrastinate import jobs, utils
from procrastinate import jobs, tasks, utils


@attr.dataclass(kw_only=True)
Expand Down Expand Up @@ -64,9 +64,16 @@ class JobContext:

additional_context: dict = attr.ib(factory=dict)

#: Callable returning the reason the job should be aborted (or None if it
#: should not be aborted)
abort_reason: Callable[[], AbortReason | None]

def should_abort(self) -> bool:
"""
Returns True if the job should be aborted: in that case, the job should
stop processing as soon as possible and raise raise
:py:class:`~exceptions.JobAborted`
"""
return bool(self.abort_reason())

def evolve(self, **update: Any) -> JobContext:
Expand All @@ -75,3 +82,10 @@ def evolve(self, **update: Any) -> JobContext:
@property
def queues_display(self) -> str:
return utils.queues_display(self.worker_queues)

@property
def task(self) -> tasks.Task:
"""
The :py:class:`~tasks.Task` associated to the job
"""
return self.app.tasks[self.job.task_name]
16 changes: 16 additions & 0 deletions tests/unit/test_job_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,19 @@ def test_evolve(app: App, job_factory):
abort_reason=lambda: None,
)
assert context.evolve(worker_name="b").worker_name == "b"


def test_task(app: App, job_factory):
@app.task(name="my_task")
def my_task(a, b):
return a + b

job = job_factory(task_name="my_task")
context = job_context.JobContext(
start_timestamp=time.time(),
app=app,
job=job,
worker_name="a",
abort_reason=lambda: None,
)
assert context.task == my_task

0 comments on commit a5eebcd

Please sign in to comment.