From 6b9c374626133a71e5ceca99f094f9146449c713 Mon Sep 17 00:00:00 2001 From: Ashley Heath Date: Thu, 14 Dec 2023 16:34:24 +0000 Subject: [PATCH 1/6] Ensure jobs are scheduled for retry regardless of exception type raised --- procrastinate/retry.py | 6 +++--- procrastinate/tasks.py | 2 +- procrastinate/worker.py | 26 +++++++++++++++++++++++--- tests/unit/test_worker.py | 26 ++++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/procrastinate/retry.py b/procrastinate/retry.py index 55e58cbb7..10c050d5f 100644 --- a/procrastinate/retry.py +++ b/procrastinate/retry.py @@ -17,7 +17,7 @@ class BaseRetryStrategy: """ def get_retry_exception( - self, exception: Exception, attempts: int + self, exception: BaseException, attempts: int ) -> Optional[exceptions.JobRetry]: schedule_in = self.get_schedule_in(exception=exception, attempts=attempts) if schedule_in is None: @@ -26,7 +26,7 @@ def get_retry_exception( schedule_at = utils.utcnow() + datetime.timedelta(seconds=schedule_in) return exceptions.JobRetry(schedule_at.replace(microsecond=0)) - def get_schedule_in(self, *, exception: Exception, attempts: int) -> Optional[int]: + def get_schedule_in(self, *, exception: BaseException, attempts: int) -> Optional[int]: """ Parameters ---------- @@ -81,7 +81,7 @@ class RetryStrategy(BaseRetryStrategy): exponential_wait: int = 0 retry_exceptions: Optional[Iterable[Type[Exception]]] = None - def get_schedule_in(self, *, exception: Exception, attempts: int) -> Optional[int]: + def get_schedule_in(self, *, exception: BaseException, attempts: int) -> Optional[int]: if self.max_attempts and attempts >= self.max_attempts: return None # isinstance's 2nd param must be a tuple, not an arbitrary iterable diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 26431d7fb..fe1dda68e 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -197,7 +197,7 @@ def configure( ) def get_retry_exception( - self, exception: Exception, job: jobs.Job + self, exception: BaseException, job: jobs.Job ) -> Optional[exceptions.JobRetry]: if not self.retry_strategy: return None diff --git a/procrastinate/worker.py b/procrastinate/worker.py index 75c7f3c04..1205f9c67 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Iterable, Optional, Union from procrastinate import app, exceptions, job_context, jobs, signals, tasks, utils +from procrastinate.exceptions import ProcrastinateException logger = logging.getLogger(__name__) @@ -164,14 +165,24 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: extra=context.log_extra(action="loaded_job_info"), ) + def find_exception_to_re_raise(ex: ProcrastinateException) -> Optional[BaseException]: + # If the job raises a BaseException that is _not_ an Exception + # (e.g. a CancelledError, SystemExit, etc.) then we want to persist the + # outcome of the job before propagating the exception further up the + # call stack. + return ex.__cause__ if not isinstance(e.__cause__, Exception) else None + status, retry_at = None, None + exception_to_re_raise = None try: await self.run_job(job=job, worker_id=worker_id) status = jobs.Status.SUCCEEDED except exceptions.JobRetry as e: retry_at = e.scheduled_at - except exceptions.JobError: + exception_to_re_raise = find_exception_to_re_raise(e) + except exceptions.JobError as e: status = jobs.Status.FAILED + exception_to_re_raise = find_exception_to_re_raise(e) except exceptions.TaskNotFound as exc: status = jobs.Status.FAILED self.logger.exception( @@ -201,6 +212,9 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: # Remove job information from the current context self.context_for_worker(worker_id=worker_id, reset=True) + if exception_to_re_raise is not None: + raise exception_to_re_raise + def find_task(self, task_name: str) -> tasks.Task: try: return self.app.tasks[task_name] @@ -221,10 +235,16 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: f"Starting job {job.call_string}", extra=context.log_extra(action="start_job"), ) - exc_info: Union[bool, Exception] job_args = [] if task.pass_context: job_args.append(context) + + # Initialise logging variables + task_result = None + log_title = "Error" + log_action = "job_error" + log_level = logging.ERROR + exc_info: Union[bool, BaseException] = False try: task_result = task(*job_args, **job.task_kwargs) if asyncio.iscoroutine(task_result): @@ -236,7 +256,7 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: extra=context.log_extra(action="concurrent_sync_task"), ) - except Exception as e: + except BaseException as e: task_result = None log_title = "Error" log_action = "job_error" diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index cdb6cf847..c572ea507 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -137,6 +137,32 @@ async def coro(*args, **kwargs): test_worker.run_job.assert_called_with(job=job, worker_id=0) assert connector.jobs[1]["status"] == "todo" assert connector.jobs[1]["scheduled_at"] == scheduled_at + assert connector.jobs[1]["attempts"] == 1 + + +async def test_process_job_retry_failed_job_re_raise_base_exception( + mocker, test_worker, job_factory, connector +): + class TestException(BaseException): + pass + + scheduled_at = conftest.aware_datetime(2000, 1, 1) + job_exception = exceptions.JobRetry(scheduled_at=scheduled_at) + job_exception.__cause__ = TestException() + + test_worker.run_job = mocker.Mock(side_effect=job_exception) + job = job_factory(id=1) + await test_worker.job_manager.defer_job_async(job) + + # Exceptions that extend BaseException should be re-raised after the failed job + # is scheduled for retry (if retry is applicable). + with pytest.raises(TestException): + await test_worker.process_job(job=job, worker_id=0) + + test_worker.run_job.assert_called_with(job=job, worker_id=0) + assert connector.jobs[1]["status"] == "todo" + assert connector.jobs[1]["scheduled_at"] == scheduled_at + assert connector.jobs[1]["attempts"] == 1 async def test_run_job(app): From e8011e9dc896e6e0493b2dd7846b4e8621699d06 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 14 Dec 2023 16:38:47 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- procrastinate/retry.py | 8 ++++++-- procrastinate/worker.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/procrastinate/retry.py b/procrastinate/retry.py index 10c050d5f..689e40b74 100644 --- a/procrastinate/retry.py +++ b/procrastinate/retry.py @@ -26,7 +26,9 @@ def get_retry_exception( schedule_at = utils.utcnow() + datetime.timedelta(seconds=schedule_in) return exceptions.JobRetry(schedule_at.replace(microsecond=0)) - def get_schedule_in(self, *, exception: BaseException, attempts: int) -> Optional[int]: + def get_schedule_in( + self, *, exception: BaseException, attempts: int + ) -> Optional[int]: """ Parameters ---------- @@ -81,7 +83,9 @@ class RetryStrategy(BaseRetryStrategy): exponential_wait: int = 0 retry_exceptions: Optional[Iterable[Type[Exception]]] = None - def get_schedule_in(self, *, exception: BaseException, attempts: int) -> Optional[int]: + def get_schedule_in( + self, *, exception: BaseException, attempts: int + ) -> Optional[int]: if self.max_attempts and attempts >= self.max_attempts: return None # isinstance's 2nd param must be a tuple, not an arbitrary iterable diff --git a/procrastinate/worker.py b/procrastinate/worker.py index 1205f9c67..bd95611c7 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -165,7 +165,9 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: extra=context.log_extra(action="loaded_job_info"), ) - def find_exception_to_re_raise(ex: ProcrastinateException) -> Optional[BaseException]: + def find_exception_to_re_raise( + ex: ProcrastinateException, + ) -> Optional[BaseException]: # If the job raises a BaseException that is _not_ an Exception # (e.g. a CancelledError, SystemExit, etc.) then we want to persist the # outcome of the job before propagating the exception further up the From 49c43ae03e32b6a9e21454f756adc6d50bb61851 Mon Sep 17 00:00:00 2001 From: Ashley Heath Date: Thu, 14 Dec 2023 16:40:39 +0000 Subject: [PATCH 3/6] Fix typo --- procrastinate/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procrastinate/worker.py b/procrastinate/worker.py index bd95611c7..e3d712563 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -172,7 +172,7 @@ def find_exception_to_re_raise( # (e.g. a CancelledError, SystemExit, etc.) then we want to persist the # outcome of the job before propagating the exception further up the # call stack. - return ex.__cause__ if not isinstance(e.__cause__, Exception) else None + return ex.__cause__ if not isinstance(ex.__cause__, Exception) else None status, retry_at = None, None exception_to_re_raise = None From 9f5e5be8af0ba1105ecc32e2865e77cf96de6729 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 17:53:22 +0100 Subject: [PATCH 4/6] Redesign suggestion --- procrastinate/exceptions.py | 20 ++++++++++++++------ procrastinate/worker.py | 28 +++++++++------------------- 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index f03489a30..8d003bb74 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -1,4 +1,5 @@ import datetime +from typing import Optional class ProcrastinateException(Exception): @@ -32,12 +33,6 @@ class TaskAlreadyRegistered(ProcrastinateException): """ -class JobError(ProcrastinateException): - """ - Job ended with an exception. - """ - - class LoadFromPathError(ImportError, ProcrastinateException): """ App was not found at the provided path, or the loaded object is not an App. @@ -54,6 +49,19 @@ def __init__(self, scheduled_at: datetime.datetime): super().__init__() +class JobError(ProcrastinateException): + """ + Job ended with an exception. + """ + + def __init__( + self, *args, retry_exception: Optional[JobRetry] = None, critical: bool = False + ): + super().__init__(*args) + self.retry_exception = retry_exception + self.critical = critical + + class AppNotOpen(ProcrastinateException): """ App was not open. Procrastinate App needs to be opened using: diff --git a/procrastinate/worker.py b/procrastinate/worker.py index e3d712563..8216717ea 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -165,26 +165,17 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: extra=context.log_extra(action="loaded_job_info"), ) - def find_exception_to_re_raise( - ex: ProcrastinateException, - ) -> Optional[BaseException]: - # If the job raises a BaseException that is _not_ an Exception - # (e.g. a CancelledError, SystemExit, etc.) then we want to persist the - # outcome of the job before propagating the exception further up the - # call stack. - return ex.__cause__ if not isinstance(ex.__cause__, Exception) else None - status, retry_at = None, None - exception_to_re_raise = None try: await self.run_job(job=job, worker_id=worker_id) status = jobs.Status.SUCCEEDED - except exceptions.JobRetry as e: - retry_at = e.scheduled_at - exception_to_re_raise = find_exception_to_re_raise(e) except exceptions.JobError as e: status = jobs.Status.FAILED - exception_to_re_raise = find_exception_to_re_raise(e) + if e.retry_exception: + retry_at = e.retry_exception.scheduled_at + if e.critical and e.__cause__: + raise e.__cause__ + except exceptions.TaskNotFound as exc: status = jobs.Status.FAILED self.logger.exception( @@ -214,9 +205,6 @@ def find_exception_to_re_raise( # Remove job information from the current context self.context_for_worker(worker_id=worker_id, reset=True) - if exception_to_re_raise is not None: - raise exception_to_re_raise - def find_task(self, task_name: str) -> tasks.Task: try: return self.app.tasks[task_name] @@ -264,14 +252,16 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: log_action = "job_error" log_level = logging.ERROR exc_info = e + critical = not isinstance(e, Exception) retry_exception = task.get_retry_exception(exception=e, job=job) if retry_exception: log_title = "Error, to retry" log_action = "job_error_retry" log_level = logging.INFO - raise retry_exception from e - raise exceptions.JobError() from e + raise exceptions.JobError( + retry_exception=retry_exception, critical=critical + ) from e else: log_title = "Success" From b4131904eacd54b525c475c688a473025530becc Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 16 Dec 2023 18:29:04 +0100 Subject: [PATCH 5/6] Add and fix tests --- tests/unit/test_worker.py | 76 +++++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 10 deletions(-) diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index c572ea507..e7fbc7d44 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -127,7 +127,9 @@ async def coro(*args, **kwargs): scheduled_at = conftest.aware_datetime(2000, 1, 1) test_worker.run_job = mocker.Mock( - side_effect=exceptions.JobRetry(scheduled_at=scheduled_at) + side_effect=exceptions.JobError( + retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at) + ) ) job = job_factory(id=1) await test_worker.job_manager.defer_job_async(job) @@ -140,14 +142,40 @@ async def coro(*args, **kwargs): assert connector.jobs[1]["attempts"] == 1 -async def test_process_job_retry_failed_job_re_raise_base_exception( +async def test_process_job_retry_failed_job_critical( + mocker, test_worker, job_factory, connector +): + class TestException(BaseException): + pass + + job_exception = exceptions.JobError(critical=True) + job_exception.__cause__ = TestException() + + test_worker.run_job = mocker.Mock(side_effect=job_exception) + job = job_factory(id=1) + await test_worker.job_manager.defer_job_async(job) + + # Exceptions that extend BaseException should be re-raised after the failed job + # is scheduled for retry (if retry is applicable). + with pytest.raises(TestException): + await test_worker.process_job(job=job, worker_id=0) + + test_worker.run_job.assert_called_with(job=job, worker_id=0) + assert connector.jobs[1]["status"] == "failed" + assert connector.jobs[1]["scheduled_at"] is None + assert connector.jobs[1]["attempts"] == 1 + + +async def test_process_job_retry_failed_job_retry_critical( mocker, test_worker, job_factory, connector ): class TestException(BaseException): pass scheduled_at = conftest.aware_datetime(2000, 1, 1) - job_exception = exceptions.JobRetry(scheduled_at=scheduled_at) + job_exception = exceptions.JobError( + critical=True, retry_exception=exceptions.JobRetry(scheduled_at=scheduled_at) + ) job_exception.__cause__ = TestException() test_worker.run_job = mocker.Mock(side_effect=job_exception) @@ -274,11 +302,11 @@ def task(): async def test_run_job_error(app, caplog): caplog.set_level("INFO") - def job(a, b): # pylint: disable=unused-argument + def job_func(a, b): # pylint: disable=unused-argument raise ValueError("nope") - task = tasks.Task(job, blueprint=app, queue="yay", name="job") - task.func = job + task = tasks.Task(job_func, blueprint=app, queue="yay", name="job") + task.func = job_func app.tasks = {"job": task} @@ -306,14 +334,40 @@ def job(a, b): # pylint: disable=unused-argument ) +async def test_run_job_critical_error(app, caplog): + caplog.set_level("INFO") + + def job_func(a, b): # pylint: disable=unused-argument + raise BaseException("nope") + + task = tasks.Task(job_func, blueprint=app, queue="yay", name="job") + task.func = job_func + + app.tasks = {"job": task} + + job = jobs.Job( + id=16, + task_kwargs={"a": 9, "b": 3}, + lock="sherlock", + queueing_lock="houba", + task_name="job", + queue="yay", + ) + test_worker = worker.Worker(app, queues=["yay"]) + with pytest.raises(exceptions.JobError) as exc_info: + await test_worker.run_job(job=job, worker_id=3) + + assert exc_info.value.critical is True + + async def test_run_job_retry(app, caplog): caplog.set_level("INFO") - def job(a, b): # pylint: disable=unused-argument + def job_func(a, b): # pylint: disable=unused-argument raise ValueError("nope") - task = tasks.Task(job, blueprint=app, queue="yay", name="job", retry=True) - task.func = job + task = tasks.Task(job_func, blueprint=app, queue="yay", name="job", retry=True) + task.func = job_func app.tasks = {"job": task} @@ -326,9 +380,11 @@ def job(a, b): # pylint: disable=unused-argument queue="yay", ) test_worker = worker.Worker(app, queues=["yay"]) - with pytest.raises(exceptions.JobRetry): + with pytest.raises(exceptions.JobError) as exc_info: await test_worker.run_job(job=job, worker_id=3) + assert isinstance(exc_info.value.retry_exception, exceptions.JobRetry) + assert ( len( [ From 32f4db8a1d2662d34c9fd1e2ad2daec597a1109c Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Thu, 21 Dec 2023 08:46:56 +0100 Subject: [PATCH 6/6] Fix lint --- procrastinate/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/procrastinate/worker.py b/procrastinate/worker.py index 8216717ea..c99bac8ea 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -5,7 +5,6 @@ from typing import Any, Dict, Iterable, Optional, Union from procrastinate import app, exceptions, job_context, jobs, signals, tasks, utils -from procrastinate.exceptions import ProcrastinateException logger = logging.getLogger(__name__)