diff --git a/docs/howto/advanced/retry.md b/docs/howto/advanced/retry.md index ed805ca62..d8a5ad708 100644 --- a/docs/howto/advanced/retry.md +++ b/docs/howto/advanced/retry.md @@ -11,7 +11,7 @@ app / machine reboots. - Retry 5 times (so 6 attempts total): - ``` + ```python @app.task(retry=5) def flaky_task(): if random.random() > 0.9: @@ -21,7 +21,7 @@ app / machine reboots. - Retry indefinitely: - ``` + ```python @app.task(retry=True) def flaky_task(): if random.random() > 0.9: @@ -42,7 +42,7 @@ Advanced strategies let you: Define your precise strategy using a {py:class}`RetryStrategy` instance: -``` +```python from procrastinate import RetryStrategy @app.task(retry=procrastinate.RetryStrategy( @@ -63,23 +63,38 @@ between retries: ## Implementing your own strategy -- If you want to go for a fully fledged custom retry strategy, you can implement your - own retry strategy (though we recommend always keeping a max_retry): +If you want to go for a fully fledged custom retry strategy, you can implement your +own retry strategy by returning a `RetryDecision` object from the +`get_retry_decision` method. This also allows to (optionally) change the priority, +the queue or the lock of the job. If `None` is returned from `get_retry_decision` +then the job will not be retried. - ``` +The time to wait between retries can be specified with `retry_in` or alternatively +with `retry_at`. This is similar to how `schedule_in` and `schedule_at` are used +when {doc}`scheduling a job in the future `. + + ```python import random + from procrastinate import Job, RetryDecision class RandomRetryStrategy(procrastinate.BaseRetryStrategy): max_attempts = 3 min = 1 max = 10 - def get_schedule_in(self, *, exception:Exception, attempts: int, **kwargs) -> int: - if attempts >= max_attempts: - return None + def get_retry_decision(self, *, exception:Exception, job:Job) -> RetryDecision: + if job.attempts >= max_attempts: + return RetryDecision(should_retry=False) + + wait = random.uniform(self.min, self.max) - return random.uniform(self.min, self.max) + return RetryDecision( + retry_in={"seconds": wait}, # or retry_at (a datetime object) + priority=job.priority + 1, # optional + queue="another_queue", # optional + lock="another_lock", # optional + ) ``` -It's interesting to add a catch-all parameter `**kwargs` to make your strategy more -resilient to possible changes of Procrastinate in the future. +There is also a legacy `get_schedule_in` method that is deprecated an will be +removed in a future version in favor of the above `get_retry_decision` method. diff --git a/docs/reference.rst b/docs/reference.rst index 317ae31c4..dc6302e0f 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -72,7 +72,12 @@ Retry strategies .. autoclass:: procrastinate.RetryStrategy .. autoclass:: procrastinate.BaseRetryStrategy - :members: get_schedule_in + :members: get_retry_decision, get_schedule_in + +.. deprecated:: 2.9 + The `get_schedule_in` method is deprecated. + +.. autoclass:: procrastinate.RetryDecision Exceptions diff --git a/procrastinate/__init__.py b/procrastinate/__init__.py index e120aacd9..d0dbffb78 100644 --- a/procrastinate/__init__.py +++ b/procrastinate/__init__.py @@ -6,7 +6,7 @@ from procrastinate.connector import BaseConnector from procrastinate.job_context import JobContext from procrastinate.psycopg_connector import PsycopgConnector -from procrastinate.retry import BaseRetryStrategy, RetryStrategy +from procrastinate.retry import BaseRetryStrategy, RetryDecision, RetryStrategy from procrastinate.sync_psycopg_connector import SyncPsycopgConnector from procrastinate.utils import MovedElsewhere as _MovedElsewhere @@ -27,6 +27,7 @@ "BaseRetryStrategy", "PsycopgConnector", "SyncPsycopgConnector", + "RetryDecision", "RetryStrategy", ] diff --git a/procrastinate/contrib/django/migrations/0029_add_additional_params_to_retry_job.py b/procrastinate/contrib/django/migrations/0029_add_additional_params_to_retry_job.py new file mode 100644 index 000000000..742c5ab36 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0029_add_additional_params_to_retry_job.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from django.db import migrations + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL( + name="02.08.00_01_add_additional_params_to_retry_job.sql" + ), + ] + name = "0029_add_additional_params_to_retry_job" + dependencies = [("procrastinate", "0028_add_cancel_states")] diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index 34b5e36d9..d1ec5d0bb 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -1,6 +1,6 @@ from __future__ import annotations -import datetime +from procrastinate.retry import RetryDecision class ProcrastinateException(Exception): @@ -45,8 +45,8 @@ class JobRetry(ProcrastinateException): Job should be retried. """ - def __init__(self, scheduled_at: datetime.datetime): - self.scheduled_at = scheduled_at + def __init__(self, retry_decision: RetryDecision): + self.retry_decision = retry_decision super().__init__() diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 5f26dac28..9657e4528 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -357,6 +357,9 @@ async def retry_job( self, job: jobs.Job, retry_at: datetime.datetime | None = None, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, ) -> None: """ Indicates that a job should be retried later. @@ -368,16 +371,32 @@ async def retry_job( If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time. + priority : ``Optional[int]`` + If set, the job will be retried with this priority. If not set, the priority + remains unchanged. + queue : ``Optional[int]`` + If set, the job will be retried on this queue. If not set, the queue remains + unchanged. + lock : ``Optional[int]`` + If set, the job will be retried with this lock. If not set, the lock remains + unchanged. """ assert job.id # TODO remove this await self.retry_job_by_id_async( - job_id=job.id, retry_at=retry_at or utils.utcnow() + job_id=job.id, + retry_at=retry_at or utils.utcnow(), + priority=priority, + queue=queue, + lock=lock, ) async def retry_job_by_id_async( self, job_id: int, retry_at: datetime.datetime, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, ) -> None: """ Indicates that a job should be retried later. @@ -389,17 +408,32 @@ async def retry_job_by_id_async( If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). + priority : ``Optional[int]`` + If set, the job will be retried with this priority. If not set, the priority + remains unchanged. + queue : ``Optional[int]`` + If set, the job will be retried on this queue. If not set, the queue remains + unchanged. + lock : ``Optional[int]`` + If set, the job will be retried with this lock. If not set, the lock remains + unchanged. """ await self.connector.execute_query_async( query=sql.queries["retry_job"], job_id=job_id, retry_at=retry_at, + new_priority=priority, + new_queue_name=queue, + new_lock=lock, ) def retry_job_by_id( self, job_id: int, retry_at: datetime.datetime, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, ) -> None: """ Sync version of `retry_job_by_id_async`. @@ -408,6 +442,9 @@ def retry_job_by_id( query=sql.queries["retry_job"], job_id=job_id, retry_at=retry_at, + new_priority=priority, + new_queue_name=queue, + new_lock=lock, ) async def listen_for_jobs( diff --git a/procrastinate/retry.py b/procrastinate/retry.py index 4da675d40..827a614df 100644 --- a/procrastinate/retry.py +++ b/procrastinate/retry.py @@ -6,28 +6,116 @@ from __future__ import annotations import datetime -from typing import Iterable, Union +import warnings +from typing import Iterable, Union, overload import attr -from procrastinate import exceptions, utils +from procrastinate import exceptions, types, utils +from procrastinate.jobs import Job + + +class RetryDecision: + retry_at: datetime.datetime | None = None + priority: int | None = None + queue: str | None = None + lock: str | None = None + + @overload + def __init__( + self, + *, + retry_at: datetime.datetime | None = None, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: ... + @overload + def __init__( + self, + *, + retry_in: types.TimeDeltaParams | None = None, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: ... + def __init__( + self, + *, + retry_at: datetime.datetime | None = None, + retry_in: types.TimeDeltaParams | None = None, + priority: int | None = None, + queue: str | None = None, + lock: str | None = None, + ) -> None: + """ + Specifies when and how a job should be retried. + + Parameters + ---------- + retry_at : ``Optional[datetime.datetime]`` + If set at present time or in the past, the job may be retried immediately. + Otherwise, the job will be retried no sooner than this date & time. + Should be timezone-aware (even if UTC). Defaults to present time. + retry_in : ``Optional[types.TimeDeltaParams]`` + If set, the job will be retried after this duration. If not set, the job will + be retried immediately. + priority : ``Optional[int]`` + If set, the job will be retried with this priority. If not set, the priority + remains unchanged. + queue : ``Optional[int]`` + If set, the job will be retried on this queue. If not set, the queue remains + unchanged. + lock : ``Optional[int]`` + If set, the job will be retried with this lock. If not set, the lock remains + unchanged. + """ + if retry_at and retry_in is not None: + raise ValueError("Cannot set both retry_at and retry_in") + + if retry_in is not None: + retry_at = utils.datetime_from_timedelta_params(retry_in) + + self.retry_at = retry_at + self.priority = priority + self.queue = queue + self.lock = lock class BaseRetryStrategy: """ If you want to implement your own retry strategy, you can inherit from this class. - Child classes only need to implement `get_schedule_in`. + Child classes only need to implement `get_retry_decision`. """ def get_retry_exception( - self, exception: BaseException, attempts: int + self, exception: BaseException, job: Job ) -> exceptions.JobRetry | None: - schedule_in = self.get_schedule_in(exception=exception, attempts=attempts) - if schedule_in is None: - return None - - schedule_at = utils.utcnow() + datetime.timedelta(seconds=schedule_in) - return exceptions.JobRetry(schedule_at.replace(microsecond=0)) + try: + retry_decision = self.get_retry_decision(exception=exception, job=job) + if retry_decision is None: + return None + + return exceptions.JobRetry(retry_decision=retry_decision) + except NotImplementedError as err: + try: + schedule_in = self.get_schedule_in( + exception=exception, attempts=job.attempts + ) + except NotImplementedError: + raise err from None + + warnings.warn( + "`get_schedule_in` is deprecated, use `get_retry_decision` instead.", + DeprecationWarning, + stacklevel=2, + ) + + if schedule_in is None: + return None + + retry_decision = RetryDecision(retry_in={"seconds": schedule_in}) + return exceptions.JobRetry(retry_decision) def get_schedule_in(self, *, exception: BaseException, attempts: int) -> int | None: """ @@ -43,8 +131,26 @@ def get_schedule_in(self, *, exception: BaseException, attempts: int) -> int | N If a job should not be retried, this function should return None. Otherwise, it should return the duration after which to schedule the new job run, *in seconds*. + + Notes + ----- + This function is deprecated and will be removed in a future version. Use + `get_retry_decision` instead. + """ + raise NotImplementedError + + def get_retry_decision( + self, *, exception: BaseException, job: Job + ) -> RetryDecision | None: + """ + Parameters + ---------- + exception: + The exception raised by the job + job: + The current job """ - raise NotImplementedError() + raise NotImplementedError("Missing implementation of 'get_retry_decision'.") @attr.dataclass(kw_only=True) @@ -84,8 +190,10 @@ class RetryStrategy(BaseRetryStrategy): exponential_wait: int = 0 retry_exceptions: Iterable[type[Exception]] | None = None - def get_schedule_in(self, *, exception: BaseException, attempts: int) -> int | None: - if self.max_attempts and attempts >= self.max_attempts: + def get_retry_decision( + self, *, exception: BaseException, job: Job + ) -> RetryDecision | None: + if self.max_attempts and job.attempts >= self.max_attempts: return None # isinstance's 2nd param must be a tuple, not an arbitrary iterable if self.retry_exceptions and not isinstance( @@ -93,9 +201,10 @@ def get_schedule_in(self, *, exception: BaseException, attempts: int) -> int | N ): return None wait: int = self.wait - wait += self.linear_wait * attempts - wait += self.exponential_wait ** (attempts + 1) - return wait + wait += self.linear_wait * job.attempts + wait += self.exponential_wait ** (job.attempts + 1) + + return RetryDecision(retry_in={"seconds": wait}) RetryValue = Union[bool, int, RetryStrategy] diff --git a/procrastinate/sql/migrations/02.08.00_01_add_additional_params_to_retry_job.sql b/procrastinate/sql/migrations/02.08.00_01_add_additional_params_to_retry_job.sql new file mode 100644 index 000000000..ffaba5aad --- /dev/null +++ b/procrastinate/sql/migrations/02.08.00_01_add_additional_params_to_retry_job.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION procrastinate_retry_job( + job_id bigint, + retry_at timestamp with time zone, + new_priority integer, + new_queue_name character varying, + new_lock character varying +) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + UPDATE procrastinate_jobs + SET status = 'todo', + attempts = attempts + 1, + scheduled_at = retry_at, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; + END IF; +END; +$$; diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index cdff1f889..234217888 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -60,7 +60,7 @@ SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s; -- retry_job -- -- Retry a job, changing it from "doing" to "todo" -SELECT procrastinate_retry_job(%(job_id)s, %(retry_at)s); +SELECT procrastinate_retry_job(%(job_id)s, %(retry_at)s, %(new_priority)s, %(new_queue_name)s, %(new_lock)s); -- listen_queue -- -- In this one, the argument is an identifier, shoud not be escaped the same way diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index d696cea1b..a2fe4d950 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -259,7 +259,13 @@ BEGIN END; $$; -CREATE FUNCTION procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) +CREATE FUNCTION procrastinate_retry_job( + job_id bigint, + retry_at timestamp with time zone, + new_priority integer, + new_queue_name character varying, + new_lock character varying +) RETURNS void LANGUAGE plpgsql AS $$ @@ -269,7 +275,10 @@ BEGIN UPDATE procrastinate_jobs SET status = 'todo', attempts = attempts + 1, - scheduled_at = retry_at + scheduled_at = retry_at, + priority = COALESCE(new_priority, priority), + queue_name = COALESCE(new_queue_name, queue_name), + lock = COALESCE(new_lock, lock) WHERE id = job_id AND status = 'doing' RETURNING id INTO _job_id; IF _job_id IS NULL THEN @@ -514,3 +523,24 @@ BEGIN RETURN _job_id; END; $$; + +-- procrastinate_retry_job +-- the function without the new_* arguments is kept for compatibility reasons +CREATE FUNCTION procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + UPDATE procrastinate_jobs + SET status = 'todo', + attempts = attempts + 1, + scheduled_at = retry_at + WHERE id = job_id AND status = 'doing' + RETURNING id INTO _job_id; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing" status (job id: %)', job_id; + END IF; +END; +$$; diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index e18fff35b..55658a20b 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -17,22 +17,12 @@ P = ParamSpec("P") -class TimeDeltaParams(TypedDict): - weeks: NotRequired[int] - days: NotRequired[int] - hours: NotRequired[int] - minutes: NotRequired[int] - seconds: NotRequired[int] - milliseconds: NotRequired[int] - microseconds: NotRequired[int] - - class ConfigureTaskOptions(TypedDict): lock: NotRequired[str | None] queueing_lock: NotRequired[str | None] task_kwargs: NotRequired[types.JSONDict | None] schedule_at: NotRequired[datetime.datetime | None] - schedule_in: NotRequired[TimeDeltaParams | None] + schedule_in: NotRequired[types.TimeDeltaParams | None] queue: NotRequired[str | None] priority: NotRequired[int | None] @@ -51,7 +41,7 @@ def configure_task( raise ValueError("Cannot set both schedule_at and schedule_in") if schedule_in is not None: - schedule_at = utils.utcnow() + datetime.timedelta(**schedule_in) + schedule_at = utils.datetime_from_timedelta_params(schedule_in) if priority is None: priority = jobs.DEFAULT_PRIORITY @@ -239,6 +229,4 @@ def get_retry_exception( if not self.retry_strategy: return None - return self.retry_strategy.get_retry_exception( - exception=exception, attempts=job.attempts - ) + return self.retry_strategy.get_retry_exception(exception=exception, job=job) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index 6d6631be9..4af80bd2c 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -244,11 +244,24 @@ def cancel_job_one(self, job_id: int, abort: bool, delete_job: bool) -> dict: def get_job_status_one(self, job_id: int) -> dict: return {"status": self.jobs[job_id]["status"]} - def retry_job_run(self, job_id: int, retry_at: datetime.datetime) -> None: + def retry_job_run( + self, + job_id: int, + retry_at: datetime.datetime, + new_priority: int | None = None, + new_queue_name: str | None = None, + new_lock: str | None = None, + ) -> None: job_row = self.jobs[job_id] job_row["status"] = "todo" job_row["attempts"] += 1 job_row["scheduled_at"] = retry_at + if new_priority is not None: + job_row["priority"] = new_priority + if new_queue_name is not None: + job_row["queue_name"] = new_queue_name + if new_lock is not None: + job_row["lock"] = new_lock self.events[job_id].append({"type": "scheduled", "at": retry_at}) self.events[job_id].append({"type": "deferred_for_retry", "at": utils.utcnow()}) diff --git a/procrastinate/types.py b/procrastinate/types.py index 1604e1620..eb604a4e1 100644 --- a/procrastinate/types.py +++ b/procrastinate/types.py @@ -2,5 +2,17 @@ import typing as t +from typing_extensions import NotRequired + JSONValue = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any]] JSONDict = t.Dict[str, JSONValue] + + +class TimeDeltaParams(t.TypedDict): + weeks: NotRequired[int] + days: NotRequired[int] + hours: NotRequired[int] + minutes: NotRequired[int] + seconds: NotRequired[int] + milliseconds: NotRequired[int] + microseconds: NotRequired[int] diff --git a/procrastinate/utils.py b/procrastinate/utils.py index 4bd448130..2d6e7afc2 100644 --- a/procrastinate/utils.py +++ b/procrastinate/utils.py @@ -26,6 +26,7 @@ from asgiref import sync from procrastinate import exceptions +from procrastinate.types import TimeDeltaParams T = TypeVar("T") U = TypeVar("U") @@ -432,3 +433,7 @@ def async_context_decorator(func: Callable) -> Callable: return contextlib2.asynccontextmanager(func) else: return contextlib.asynccontextmanager(func) + + +def datetime_from_timedelta_params(params: TimeDeltaParams) -> datetime.datetime: + return utcnow() + datetime.timedelta(**params) diff --git a/procrastinate/worker.py b/procrastinate/worker.py index f0374faaa..e5809e61a 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -191,7 +191,7 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: extra=context.log_extra(action="loaded_job_info"), ) - status, retry_at = None, None + status, retry_decision = None, None try: await self.run_job(job=job, worker_id=worker_id) status = jobs.Status.SUCCEEDED @@ -201,7 +201,7 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: except exceptions.JobError as e: status = jobs.Status.FAILED if e.retry_exception: - retry_at = e.retry_exception.scheduled_at + retry_decision = e.retry_exception.retry_decision if e.critical and e.__cause__: raise e.__cause__ @@ -212,8 +212,14 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: extra=context.log_extra(action="task_not_found", exception=str(exc)), ) finally: - if retry_at: - await self.job_manager.retry_job(job=job, retry_at=retry_at) + if retry_decision: + await self.job_manager.retry_job( + job=job, + retry_at=retry_decision.retry_at, + priority=retry_decision.priority, + queue=retry_decision.queue, + lock=retry_decision.lock, + ) else: assert status is not None diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index 1da333a62..8c9173f53 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -302,6 +302,27 @@ async def test_retry_job(pg_job_manager, fetched_job_factory): assert job2.id == job1.id assert job2.attempts == job1.attempts + 1 + assert job2.priority == job1.priority == 0 + + +async def test_retry_job_with_additional_params(pg_job_manager, fetched_job_factory): + job1 = await fetched_job_factory(queue="queue_a") + + await pg_job_manager.retry_job( + job=job1, + retry_at=datetime.datetime.now(datetime.timezone.utc), + priority=5, + queue="queue_b", + lock="some_lock", + ) + + job2 = await pg_job_manager.fetch_job(queues=None) + + assert job2.id == job1.id + assert job2.attempts == 1 + assert job2.priority == 5 + assert job2.queue == "queue_b" + assert job2.lock == "some_lock" async def test_enum_synced(psycopg_connector): diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index 8b539d111..3d14f2073 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -275,10 +275,18 @@ async def test_retry_job(job_manager, job_factory, connector): await job_manager.defer_job_async(job=job) retry_at = conftest.aware_datetime(2000, 1, 1) - await job_manager.retry_job(job=job, retry_at=retry_at) + await job_manager.retry_job( + job=job, retry_at=retry_at, priority=7, queue="some_queue", lock="some_lock" + ) assert connector.queries[-1] == ( "retry_job", - {"job_id": 1, "retry_at": retry_at}, + { + "job_id": 1, + "retry_at": retry_at, + "new_priority": 7, + "new_queue_name": "some_queue", + "new_lock": "some_lock", + }, ) @@ -399,22 +407,38 @@ def dt(): async def test_retry_job_by_id_async(job_manager, connector, job_factory, dt): job = await job_manager.defer_job_async(job=job_factory()) - await job_manager.retry_job_by_id_async(job_id=job.id, retry_at=dt) + await job_manager.retry_job_by_id_async( + job_id=job.id, retry_at=dt, priority=7, queue="some_queue", lock="some_lock" + ) assert connector.queries[-1] == ( "retry_job", - {"job_id": 1, "retry_at": dt}, + { + "job_id": 1, + "retry_at": dt, + "new_priority": 7, + "new_queue_name": "some_queue", + "new_lock": "some_lock", + }, ) def test_retry_job_by_id(job_manager, connector, job_factory, dt): job = job_manager.defer_job(job=job_factory()) - job_manager.retry_job_by_id(job_id=job.id, retry_at=dt) + job_manager.retry_job_by_id( + job_id=job.id, retry_at=dt, priority=7, queue="some_queue", lock="some_lock" + ) assert connector.queries[-1] == ( "retry_job", - {"job_id": 1, "retry_at": dt}, + { + "job_id": 1, + "retry_at": dt, + "new_priority": 7, + "new_queue_name": "some_queue", + "new_lock": "some_lock", + }, ) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 6ed3cfb9f..b01b7a2c7 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -4,8 +4,11 @@ import pytest -from procrastinate import exceptions, utils +from procrastinate import BaseRetryStrategy, RetryDecision, exceptions, utils from procrastinate import retry as retry_module +from procrastinate.jobs import Job + +from .. import conftest @pytest.mark.parametrize( @@ -24,8 +27,18 @@ def test_get_retry_strategy(retry, expected_strategy): assert expected_strategy == retry_module.get_retry_strategy(retry) +def test_retry_decision_constructor(): + now = conftest.aware_datetime(2000, 1, 1, tz_offset=1) + with pytest.raises(ValueError) as exc_info: + RetryDecision( + retry_in={"seconds": 42}, + retry_at=now + datetime.timedelta(seconds=42, microseconds=0), + ) + assert str(exc_info.value) == "Cannot set both retry_at and retry_in" + + @pytest.mark.parametrize( - "attempts, wait, linear_wait, exponential_wait, schedule_in", + "attempts, wait, linear_wait, exponential_wait, retry_in", [ # No wait (0, 0.0, 0.0, 0.0, 0.0), @@ -33,10 +46,6 @@ def test_get_retry_strategy(retry, expected_strategy): (1, 5.0, 0.0, 0.0, 5.0), # Constant, last try (9, 5.0, 0.0, 0.0, 5.0), - # Constant, first non-retry - (10, 5.0, 0.0, 0.0, None), - # Constant, other non-retry - (100, 5.0, 0.0, 0.0, None), # Linear (3 * 7) (3, 0.0, 7.0, 0.0, 21.0), # Exponential (2 ** (5+1)) @@ -45,41 +54,152 @@ def test_get_retry_strategy(retry, expected_strategy): (4, 8.0, 3.0, 2.0, 52.0), ], ) -def test_get_schedule_in_time( - attempts, schedule_in, wait, linear_wait, exponential_wait +def test_get_retry_decision( + attempts, wait, linear_wait, exponential_wait, retry_in, mocker ): + now = conftest.aware_datetime(2000, 1, 1, tz_offset=1) + mocker.patch.object(utils, "utcnow", return_value=now) + expected = now + datetime.timedelta(seconds=retry_in, microseconds=0) + strategy = retry_module.RetryStrategy( max_attempts=10, wait=wait, linear_wait=linear_wait, exponential_wait=exponential_wait, ) - assert strategy.get_schedule_in(exception=None, attempts=attempts) == schedule_in + + job_mock = mocker.Mock(attempts=attempts) + retry_decision = strategy.get_retry_decision(exception=Exception(), job=job_mock) + assert isinstance(retry_decision, RetryDecision) + assert retry_decision.retry_at == expected.replace(microsecond=0) @pytest.mark.parametrize( - "exception, expected", + "attempts, wait, linear_wait, exponential_wait", [ - (ValueError(), 0), - (KeyError(), None), + # Constant, first non-retry + (10, 5.0, 0.0, 0.0), + # Constant, other non-retry + (100, 5.0, 0.0, 0.0), ], ) -def test_get_schedule_in_exception(exception, expected): +def test_get_none_retry_decision(attempts, wait, linear_wait, exponential_wait, mocker): + strategy = retry_module.RetryStrategy( + max_attempts=10, + wait=wait, + linear_wait=linear_wait, + exponential_wait=exponential_wait, + ) + job_mock = mocker.Mock(attempts=attempts) + assert strategy.get_retry_decision(exception=Exception(), job=job_mock) is None + + +def test_retry_exception(mocker): strategy = retry_module.RetryStrategy(retry_exceptions=[ValueError]) - assert strategy.get_schedule_in(exception=exception, attempts=0) == expected + job_mock = mocker.Mock(attempts=0) + retry_decision = strategy.get_retry_decision(exception=ValueError(), job=job_mock) + assert isinstance(retry_decision, RetryDecision) + +def test_non_retry_exception(mocker): + strategy = retry_module.RetryStrategy(retry_exceptions=[ValueError]) + job_mock = mocker.Mock(attempts=0) + retry_decision = strategy.get_retry_decision(exception=KeyError(), job=job_mock) + assert retry_decision is None -def test_get_retry_exception_returns_none(): - strategy = retry_module.RetryStrategy(max_attempts=10, wait=5.0) - assert strategy.get_retry_exception(exception=None, attempts=100) is None +def test_get_retry_exception_returns_none(mocker): + strategy = retry_module.RetryStrategy(max_attempts=10, wait=5) + job_mock = mocker.Mock(attempts=100) + assert strategy.get_retry_exception(exception=Exception(), job=job_mock) is None -def test_get_retry_exception_returns(): - strategy = retry_module.RetryStrategy(max_attempts=10, wait=5.0) - now = utils.utcnow() +def test_get_retry_exception_returns(mocker): + now = conftest.aware_datetime(2000, 1, 1, tz_offset=1) + mocker.patch.object(utils, "utcnow", return_value=now) expected = now + datetime.timedelta(seconds=5, microseconds=0) - exc = strategy.get_retry_exception(exception=None, attempts=1) + strategy = retry_module.RetryStrategy(max_attempts=10, wait=5) + + job_mock = mocker.Mock(attempts=1) + exc = strategy.get_retry_exception(exception=Exception(), job=job_mock) assert isinstance(exc, exceptions.JobRetry) - assert exc.scheduled_at == expected.replace(microsecond=0) + assert exc.retry_decision.retry_at == expected.replace(microsecond=0) + + +def test_custom_retry_strategy_returns(mocker): + now = conftest.aware_datetime(2000, 1, 1, tz_offset=1) + mocker.patch.object(utils, "utcnow", return_value=now) + expected = now + datetime.timedelta(seconds=5, microseconds=0) + + class CustomRetryStrategy(BaseRetryStrategy): + def get_retry_decision( + self, *, exception: BaseException, job: Job + ) -> RetryDecision: + return RetryDecision( + retry_in={"seconds": 5}, + priority=7, + queue="some_queue", + lock="some_lock", + ) + + strategy = CustomRetryStrategy() + + job_mock = mocker.Mock(attempts=1) + exc = strategy.get_retry_exception(exception=Exception(), job=job_mock) + assert isinstance(exc, exceptions.JobRetry) + assert exc.retry_decision.retry_at == expected.replace(microsecond=0) + assert exc.retry_decision.priority == 7 + assert exc.retry_decision.queue == "some_queue" + assert exc.retry_decision.lock == "some_lock" + + +def test_custom_retry_strategy_depreciated_returns_none(mocker): + class CustomRetryStrategy(BaseRetryStrategy): + def get_schedule_in( + self, *, exception: BaseException, attempts: int + ) -> int | None: + return None + + strategy = CustomRetryStrategy() + + job_mock = mocker.Mock(attempts=1) + with pytest.warns( + DeprecationWarning, + match="`get_schedule_in` is deprecated, use `get_retry_decision` instead.", + ): + assert strategy.get_retry_exception(exception=Exception(), job=job_mock) is None + + +def test_custom_retry_strategy_depreciated_returns(mocker): + now = conftest.aware_datetime(2000, 1, 1, tz_offset=1) + mocker.patch.object(utils, "utcnow", return_value=now) + expected = now + datetime.timedelta(seconds=5, microseconds=0) + + class CustomRetryStrategy(BaseRetryStrategy): + def get_schedule_in( + self, *, exception: BaseException, attempts: int + ) -> int | None: + return 5 + + strategy = CustomRetryStrategy() + + job_mock = mocker.Mock(attempts=1) + with pytest.warns( + DeprecationWarning, + match="`get_schedule_in` is deprecated, use `get_retry_decision` instead.", + ): + exc = strategy.get_retry_exception(exception=Exception(), job=job_mock) + assert isinstance(exc, exceptions.JobRetry) + assert exc.retry_decision.retry_at == expected.replace(microsecond=0) + + +def test_missing_implementation_of_custom_retry_strategy(mocker): + class CustomRetryStrategy(BaseRetryStrategy): + pass + + strategy = CustomRetryStrategy() + job_mock = mocker.Mock(attempts=1) + with pytest.raises(NotImplementedError) as exc_info: + strategy.get_retry_exception(exception=Exception(), job=job_mock) + assert str(exc_info.value) == "Missing implementation of 'get_retry_decision'." diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index a4c98f01a..c6e757651 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -133,5 +133,5 @@ def test_task_get_retry_exception(app, mocker): exception = ValueError() assert task.get_retry_exception(exception=exception, job=job) is mock.return_value - mock.assert_called_with(exception=exception, attempts=0) - mock.assert_called_with(exception=exception, attempts=0) + mock.assert_called_with(exception=exception, job=job) + mock.assert_called_with(exception=exception, job=job) diff --git a/tests/unit/test_testing.py b/tests/unit/test_testing.py index 6262feb82..d50915330 100644 --- a/tests/unit/test_testing.py +++ b/tests/unit/test_testing.py @@ -383,11 +383,20 @@ def test_retry_job_run(connector): id = job_row["id"] retry_at = conftest.aware_datetime(2000, 1, 1) - connector.retry_job_run(job_id=id, retry_at=retry_at) + connector.retry_job_run( + job_id=id, + retry_at=retry_at, + new_priority=3, + new_queue_name="some_queue", + new_lock="some_lock", + ) assert connector.jobs[id]["attempts"] == 1 assert connector.jobs[id]["status"] == "todo" assert connector.jobs[id]["scheduled_at"] == retry_at + assert connector.jobs[id]["priority"] == 3 + assert connector.jobs[id]["queue_name"] == "some_queue" + assert connector.jobs[id]["lock"] == "some_lock" assert len(connector.events[id]) == 4 diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 257e2c871..04c9c945b 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -5,6 +5,7 @@ import pytest from procrastinate import exceptions, job_context, jobs, tasks, worker +from procrastinate.retry import RetryDecision from .. import conftest @@ -127,10 +128,12 @@ async def test_process_job_retry_failed_job( async def coro(*args, **kwargs): pass - scheduled_at = conftest.aware_datetime(2000, 1, 1) + retry_at = conftest.aware_datetime(2000, 1, 1) test_worker.run_job = mocker.Mock( side_effect=exceptions.JobError( - retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at) + retry_exception=exceptions.JobRetry( + retry_decision=RetryDecision(retry_at=retry_at) + ) ) ) job = job_factory(id=1) @@ -140,7 +143,7 @@ async def coro(*args, **kwargs): test_worker.run_job.assert_called_with(job=job, worker_id=0) assert connector.jobs[1]["status"] == "todo" - assert connector.jobs[1]["scheduled_at"] == scheduled_at + assert connector.jobs[1]["scheduled_at"] == retry_at assert connector.jobs[1]["attempts"] == 1 @@ -174,9 +177,12 @@ async def test_process_job_retry_failed_job_retry_critical( class TestException(BaseException): pass - scheduled_at = conftest.aware_datetime(2000, 1, 1) + retry_at = conftest.aware_datetime(2000, 1, 1) job_exception = exceptions.JobError( - critical=True, retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at) + critical=True, + retry_exception=exceptions.JobRetry( + retry_decision=RetryDecision(retry_at=retry_at) + ), ) job_exception.__cause__ = TestException() @@ -191,7 +197,7 @@ class TestException(BaseException): test_worker.run_job.assert_called_with(job=job, worker_id=0) assert connector.jobs[1]["status"] == "todo" - assert connector.jobs[1]["scheduled_at"] == scheduled_at + assert connector.jobs[1]["scheduled_at"] == retry_at assert connector.jobs[1]["attempts"] == 1