Skip to content

Commit

Permalink
Merge pull request #1113 from openradx/retry-decision
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jul 20, 2024
2 parents 559ea81 + 190e66c commit c6c203a
Show file tree
Hide file tree
Showing 21 changed files with 537 additions and 94 deletions.
39 changes: 27 additions & 12 deletions docs/howto/advanced/retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ app / machine reboots.

- Retry 5 times (so 6 attempts total):

```
```python
@app.task(retry=5)
def flaky_task():
if random.random() > 0.9:
Expand All @@ -21,7 +21,7 @@ app / machine reboots.

- Retry indefinitely:

```
```python
@app.task(retry=True)
def flaky_task():
if random.random() > 0.9:
Expand All @@ -42,7 +42,7 @@ Advanced strategies let you:

Define your precise strategy using a {py:class}`RetryStrategy` instance:

```
```python
from procrastinate import RetryStrategy

@app.task(retry=procrastinate.RetryStrategy(
Expand All @@ -63,23 +63,38 @@ between retries:

## Implementing your own strategy

- If you want to go for a fully fledged custom retry strategy, you can implement your
own retry strategy (though we recommend always keeping a max_retry):
If you want to go for a fully fledged custom retry strategy, you can implement your
own retry strategy by returning a `RetryDecision` object from the
`get_retry_decision` method. This also allows to (optionally) change the priority,
the queue or the lock of the job. If `None` is returned from `get_retry_decision`
then the job will not be retried.

```
The time to wait between retries can be specified with `retry_in` or alternatively
with `retry_at`. This is similar to how `schedule_in` and `schedule_at` are used
when {doc}`scheduling a job in the future <schedule>`.

```python
import random
from procrastinate import Job, RetryDecision

class RandomRetryStrategy(procrastinate.BaseRetryStrategy):
max_attempts = 3
min = 1
max = 10

def get_schedule_in(self, *, exception:Exception, attempts: int, **kwargs) -> int:
if attempts >= max_attempts:
return None
def get_retry_decision(self, *, exception:Exception, job:Job) -> RetryDecision:
if job.attempts >= max_attempts:
return RetryDecision(should_retry=False)

wait = random.uniform(self.min, self.max)

return random.uniform(self.min, self.max)
return RetryDecision(
retry_in={"seconds": wait}, # or retry_at (a datetime object)
priority=job.priority + 1, # optional
queue="another_queue", # optional
lock="another_lock", # optional
)
```

It's interesting to add a catch-all parameter `**kwargs` to make your strategy more
resilient to possible changes of Procrastinate in the future.
There is also a legacy `get_schedule_in` method that is deprecated an will be
removed in a future version in favor of the above `get_retry_decision` method.
7 changes: 6 additions & 1 deletion docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ Retry strategies
.. autoclass:: procrastinate.RetryStrategy

.. autoclass:: procrastinate.BaseRetryStrategy
:members: get_schedule_in
:members: get_retry_decision, get_schedule_in

.. deprecated:: 2.9
The `get_schedule_in` method is deprecated.

.. autoclass:: procrastinate.RetryDecision


Exceptions
Expand Down
3 changes: 2 additions & 1 deletion procrastinate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from procrastinate.connector import BaseConnector
from procrastinate.job_context import JobContext
from procrastinate.psycopg_connector import PsycopgConnector
from procrastinate.retry import BaseRetryStrategy, RetryStrategy
from procrastinate.retry import BaseRetryStrategy, RetryDecision, RetryStrategy
from procrastinate.sync_psycopg_connector import SyncPsycopgConnector
from procrastinate.utils import MovedElsewhere as _MovedElsewhere

Expand All @@ -27,6 +27,7 @@
"BaseRetryStrategy",
"PsycopgConnector",
"SyncPsycopgConnector",
"RetryDecision",
"RetryStrategy",
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from django.db import migrations

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(
name="02.08.00_01_add_additional_params_to_retry_job.sql"
),
]
name = "0029_add_additional_params_to_retry_job"
dependencies = [("procrastinate", "0028_add_cancel_states")]
6 changes: 3 additions & 3 deletions procrastinate/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

import datetime
from procrastinate.retry import RetryDecision


class ProcrastinateException(Exception):
Expand Down Expand Up @@ -45,8 +45,8 @@ class JobRetry(ProcrastinateException):
Job should be retried.
"""

def __init__(self, scheduled_at: datetime.datetime):
self.scheduled_at = scheduled_at
def __init__(self, retry_decision: RetryDecision):
self.retry_decision = retry_decision
super().__init__()


Expand Down
39 changes: 38 additions & 1 deletion procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ async def retry_job(
self,
job: jobs.Job,
retry_at: datetime.datetime | None = None,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Indicates that a job should be retried later.
Expand All @@ -368,16 +371,32 @@ async def retry_job(
If set at present time or in the past, the job may be retried immediately.
Otherwise, the job will be retried no sooner than this date & time.
Should be timezone-aware (even if UTC). Defaults to present time.
priority : ``Optional[int]``
If set, the job will be retried with this priority. If not set, the priority
remains unchanged.
queue : ``Optional[int]``
If set, the job will be retried on this queue. If not set, the queue remains
unchanged.
lock : ``Optional[int]``
If set, the job will be retried with this lock. If not set, the lock remains
unchanged.
"""
assert job.id # TODO remove this
await self.retry_job_by_id_async(
job_id=job.id, retry_at=retry_at or utils.utcnow()
job_id=job.id,
retry_at=retry_at or utils.utcnow(),
priority=priority,
queue=queue,
lock=lock,
)

async def retry_job_by_id_async(
self,
job_id: int,
retry_at: datetime.datetime,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Indicates that a job should be retried later.
Expand All @@ -389,17 +408,32 @@ async def retry_job_by_id_async(
If set at present time or in the past, the job may be retried immediately.
Otherwise, the job will be retried no sooner than this date & time.
Should be timezone-aware (even if UTC).
priority : ``Optional[int]``
If set, the job will be retried with this priority. If not set, the priority
remains unchanged.
queue : ``Optional[int]``
If set, the job will be retried on this queue. If not set, the queue remains
unchanged.
lock : ``Optional[int]``
If set, the job will be retried with this lock. If not set, the lock remains
unchanged.
"""
await self.connector.execute_query_async(
query=sql.queries["retry_job"],
job_id=job_id,
retry_at=retry_at,
new_priority=priority,
new_queue_name=queue,
new_lock=lock,
)

def retry_job_by_id(
self,
job_id: int,
retry_at: datetime.datetime,
priority: int | None = None,
queue: str | None = None,
lock: str | None = None,
) -> None:
"""
Sync version of `retry_job_by_id_async`.
Expand All @@ -408,6 +442,9 @@ def retry_job_by_id(
query=sql.queries["retry_job"],
job_id=job_id,
retry_at=retry_at,
new_priority=priority,
new_queue_name=queue,
new_lock=lock,
)

async def listen_for_jobs(
Expand Down
Loading

0 comments on commit c6c203a

Please sign in to comment.