From 51015b82d2bce623e8a8d24e36293784e8ecca19 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sun, 23 Jun 2024 23:47:12 +0000 Subject: [PATCH 01/11] Allow to cancel or abort a job --- docs/howto/advanced.md | 1 + docs/howto/advanced/cancellation.md | 71 ++++++++++ .../migrations/0028_add_cancel_states.py | 47 +++++++ procrastinate/contrib/django/models.py | 5 + procrastinate/exceptions.py | 6 + procrastinate/job_context.py | 18 +++ procrastinate/jobs.py | 3 + procrastinate/manager.py | 129 +++++++++++++++++- procrastinate/shell.py | 22 +-- .../02.06.00_01_add_cancel_states.sql | 115 ++++++++++++++++ procrastinate/sql/queries.sql | 8 ++ procrastinate/sql/schema.sql | 67 +++++++-- procrastinate/testing.py | 20 +++ procrastinate/worker.py | 11 ++ tests/acceptance/test_async.py | 85 ++++++++++++ tests/acceptance/test_shell.py | 32 +++-- tests/acceptance/test_sync.py | 26 ++++ tests/integration/test_manager.py | 10 +- tests/unit/test_job_context.py | 42 +++++- tests/unit/test_manager.py | 115 ++++++++++++++++ tests/unit/test_shell.py | 20 +-- tests/unit/test_worker.py | 36 +++++ 22 files changed, 838 insertions(+), 51 deletions(-) create mode 100644 docs/howto/advanced/cancellation.md create mode 100644 procrastinate/contrib/django/migrations/0028_add_cancel_states.py create mode 100644 procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql diff --git a/docs/howto/advanced.md b/docs/howto/advanced.md index 4ea0325f0..aee8516a6 100644 --- a/docs/howto/advanced.md +++ b/docs/howto/advanced.md @@ -8,6 +8,7 @@ advanced/context advanced/locks advanced/schedule advanced/priorities +advanced/cancellation advanced/queueing_locks advanced/cron advanced/retry diff --git a/docs/howto/advanced/cancellation.md b/docs/howto/advanced/cancellation.md new file mode 100644 index 000000000..42cbdbaa6 --- /dev/null +++ b/docs/howto/advanced/cancellation.md @@ -0,0 +1,71 @@ +# Cancel a job + +We can cancel a job that has not yet been processed by a worker. We can also +mark a job that is currently being processed for abortion, but this request +has to be handled by the task itself. + +## Cancel a job (that is not being processed yet) + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33) +``` + +## Delete the canceled job + +A cancelled job can also be deleted from the database. + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33, delete_job=True) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33, delete_job=True) +``` + +## Mark a currently being processed job for abortion + +If a worker has not picked up the job yet, the below command behaves like the +command without the `abort` option. But if a job is already in the middle of +being processed, the `abort` option marks this job for abortion (see below +how to handle this request). + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33, abort=True) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33, abort=True) +``` + +## Handle a abortion request inside the task + +In our task, we can check (for example, periodically) if the task should be +aborted. If we want to respect that request (we don't have to), we raise a +`JobAborted` error. + +```python +@app.task(pass_context=True) +def my_task(context): + for i in range(100): + if context.should_abort(): + raise exceptions.JobAborted + do_something_expensive() +``` + +There is also an async API + +```python +@app.task(pass_context=True) +async def my_task(context): + for i in range(100): + if await context.should_abort_async(): + raise exceptions.JobAborted + do_something_expensive() +``` + +:::{warning} +`context.should_abort()` and `context.should_abort_async()` does poll the +database and might flood the database. Ensure you do it only sometimes and +not from too many parallel tasks. +::: diff --git a/procrastinate/contrib/django/migrations/0028_add_cancel_states.py b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py new file mode 100644 index 000000000..324309d52 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from django.db import migrations, models + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL(name="02.06.00_01_add_cancel_states.sql"), + migrations.AlterField( + "procrastinatejob", + "status", + models.CharField( + choices=[ + ("todo", "todo"), + ("doing", "doing"), + ("succeeded", "succeeded"), + ("failed", "failed"), + ("cancelled", "cancelled"), + ("aborting", "aborting"), + ("aborted", "aborted"), + ], + max_length=32, + ), + ), + migrations.AlterField( + "procrastinateevent", + "type", + models.CharField( + choices=[ + ("deferred", "deferred"), + ("started", "started"), + ("deferred_for_retry", "deferred_for_retry"), + ("failed", "failed"), + ("succeeded", "succeeded"), + ("cancelled", "cancelled"), + ("request_to_abort", "request_to_abort"), + ("aborted", "aborted"), + ("scheduled", "scheduled"), + ], + max_length=32, + ), + ), + ] + name = "0028_add_cancel_states" + dependencies = [("procrastinate", "0027_add_periodic_job_priority")] diff --git a/procrastinate/contrib/django/models.py b/procrastinate/contrib/django/models.py index 24bedbb31..fa5141695 100644 --- a/procrastinate/contrib/django/models.py +++ b/procrastinate/contrib/django/models.py @@ -64,6 +64,9 @@ class ProcrastinateJob(ProcrastinateReadOnlyModelMixin, models.Model): "doing", "succeeded", "failed", + "cancelled", + "aborting", + "aborted", ) id = models.BigAutoField(primary_key=True) queue_name = models.CharField(max_length=128) @@ -91,6 +94,8 @@ class ProcrastinateEvent(ProcrastinateReadOnlyModelMixin, models.Model): "failed", "succeeded", "cancelled", + "request_to_abort", + "aborted", "scheduled", ) id = models.BigAutoField(primary_key=True) diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index 1118e14a6..b992e2206 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -63,6 +63,12 @@ def __init__( self.critical = critical +class JobAborted(ProcrastinateException): + """ + Job was aborted (usually as reaction to an abortion request). + """ + + class AppNotOpen(ProcrastinateException): """ App was not open. Procrastinate App needs to be opened using: diff --git a/procrastinate/job_context.py b/procrastinate/job_context.py index b453064b0..a35fe5582 100644 --- a/procrastinate/job_context.py +++ b/procrastinate/job_context.py @@ -101,3 +101,21 @@ def job_description(self, current_timestamp: float) -> str: message += "no current job" return message + + def should_abort(self) -> bool: + assert self.app + assert self.job + assert self.job.id + + job_id = self.job.id + status = self.app.job_manager.get_job_status(job_id) + return status == jobs.Status.ABORTING + + async def should_abort_async(self) -> bool: + assert self.app + assert self.job + assert self.job.id + + job_id = self.job.id + status = await self.app.job_manager.get_job_status_async(job_id) + return status == jobs.Status.ABORTING diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index 3955713a7..21731da08 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -38,6 +38,9 @@ class Status(Enum): DOING = "doing" #: A worker is running the job SUCCEEDED = "succeeded" #: The job ended successfully FAILED = "failed" #: The job ended with an error + CANCELLED = "cancelled" #: The job was canceled + ABORTING = "aborting" #: The job is requested to be aborted + ABORTED = "aborted" #: The job was aborted @attr.dataclass(frozen=True, kw_only=True) diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 59339a28b..220888806 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -215,13 +215,13 @@ async def finish_job( delete_job: bool, ) -> None: """ - Set a job to its final state (``succeeded`` or ``failed``). + Set a job to its final state (``succeeded``, ``failed`` or ``aborted``). Parameters ---------- job : `jobs.Job` status : `jobs.Status` - ``succeeded`` or ``failed`` + ``succeeded``, ``failed`` or ``aborted`` """ assert job.id # TODO remove this await self.finish_job_by_id_async( @@ -241,6 +241,104 @@ async def finish_job_by_id_async( delete_job=delete_job, ) + def cancel_job_by_id( + self, job_id: int, abort: bool = False, delete_job: bool = False + ) -> bool: + """ + Cancel a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to cancel + abort : ``bool`` + If True, a job in ``doing`` state will be marked as ``aborting``, but the task + itself has to respect the abortion request. If False, only jobs in ``todo`` state + will be set to ``cancelled`` and won't be processed by a worker anymore. + delete_job : ``bool`` + If True, the job will be deleted from the database after being cancelled. Does + not affect the jobs that should be aborted. + + Returns + ------- + ``bool`` + True if the job to be cancelled was found, False otherwise. + """ + result = self.connector.get_sync_connector().execute_query_one( + query=sql.queries["cancel_job"], + job_id=job_id, + abort=abort, + delete_job=delete_job, + ) + return result["id"] is not None + + async def cancel_job_by_id_async( + self, job_id: int, abort: bool = False, delete_job=False + ) -> bool: + """ + Cancel a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to cancel + abort : ``bool`` + If True, a job in ``doing`` state will be marked as ``aborting``, but the task + itself has to respect the abortion request. If False, only jobs in ``todo`` state + will be set to ``cancelled`` and won't be processed by a worker anymore. + delete_job : ``bool`` + If True, the job will be deleted from the database after being cancelled. Does + not affect the jobs that should be aborted. + + Returns + ------- + ``bool`` + True if the job to be cancelled was found, False otherwise. + """ + result = await self.connector.execute_query_one_async( + query=sql.queries["cancel_job"], + job_id=job_id, + abort=abort, + delete_job=delete_job, + ) + return result["id"] is not None + + def get_job_status(self, job_id: int) -> jobs.Status: + """ + Get the status of a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to get the status of + + Returns + ------- + `jobs.Status` + """ + result = self.connector.get_sync_connector().execute_query_one( + query=sql.queries["get_job_status"], job_id=job_id + ) + return jobs.Status(result["status"]) + + async def get_job_status_async(self, job_id: int) -> jobs.Status: + """ + Get the status of a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to get the status of + + Returns + ------- + `jobs.Status` + """ + result = await self.connector.execute_query_one_async( + query=sql.queries["get_job_status"], job_id=job_id + ) + return jobs.Status(result["status"]) + async def retry_job( self, job: jobs.Job, @@ -435,7 +533,8 @@ async def list_queues_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing queues stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ return [ { @@ -445,6 +544,9 @@ async def list_queues_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in await self.connector.execute_query_all_async( query=sql.queries["list_queues"], @@ -473,6 +575,9 @@ def list_queues( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in self.connector.get_sync_connector().execute_query_all( query=sql.queries["list_queues"], @@ -508,7 +613,8 @@ async def list_tasks_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing tasks stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ return [ { @@ -518,6 +624,9 @@ async def list_tasks_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in await self.connector.execute_query_all_async( query=sql.queries["list_tasks"], @@ -546,6 +655,9 @@ def list_tasks( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in self.connector.get_sync_connector().execute_query_all( query=sql.queries["list_tasks"], @@ -581,7 +693,8 @@ async def list_locks_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing locks stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ result = [] for row in await self.connector.execute_query_all_async( @@ -599,6 +712,9 @@ async def list_locks_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } ) return result @@ -629,6 +745,9 @@ def list_locks( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } ) return result diff --git a/procrastinate/shell.py b/procrastinate/shell.py index ac6eb4994..c5940b7ed 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -87,7 +87,10 @@ def do_list_queues(self, arg: str) -> None: f"todo: {queue['todo']}, " f"doing: {queue['doing']}, " f"succeeded: {queue['succeeded']}, " - f"failed: {queue['failed']})" + f"failed: {queue['failed']}, " + f"cancelled: {queue['cancelled']}, " + f"aborting: {queue['aborting']}, " + f"aborted: {queue['aborted']})" ) def do_list_tasks(self, arg: str) -> None: @@ -107,7 +110,10 @@ def do_list_tasks(self, arg: str) -> None: f"todo: {task['todo']}, " f"doing: {task['doing']}, " f"succeeded: {task['succeeded']}, " - f"failed: {task['failed']})" + f"failed: {task['failed']}, " + f"cancelled: {task['cancelled']}, " + f"aborting: {task['aborting']}, " + f"aborted: {task['aborted']})" ) def do_list_locks(self, arg: str) -> None: @@ -127,7 +133,10 @@ def do_list_locks(self, arg: str) -> None: f"todo: {lock['todo']}, " f"doing: {lock['doing']}, " f"succeeded: {lock['succeeded']}, " - f"failed: {lock['failed']})" + f"failed: {lock['failed']}, " + f"cancelled: {lock['cancelled']}, " + f"aborting: {lock['aborting']}, " + f"aborted: {lock['aborted']})" ) def do_retry(self, arg: str) -> None: @@ -159,12 +168,7 @@ def do_cancel(self, arg: str) -> None: Example: cancel 3 """ job_id = int(arg) - self.async_to_sync( - self.job_manager.finish_job_by_id_async, - job_id=job_id, - status=jobs.Status.FAILED, - delete_job=False, - ) + self.job_manager.cancel_job_by_id(job_id=job_id) (job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id) print_job(job) diff --git a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql new file mode 100644 index 000000000..e3a6940a5 --- /dev/null +++ b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql @@ -0,0 +1,115 @@ +ALTER TYPE procrastinate_job_status ADD VALUE 'cancelled'; +ALTER TYPE procrastinate_job_status ADD VALUE 'aborting'; +ALTER TYPE procrastinate_job_status ADD VALUE 'aborted'; + +ALTER TYPE procrastinate_job_event_type ADD VALUE 'request_to_abort' BEFORE 'scheduled'; +ALTER TYPE procrastinate_job_event_type ADD VALUE 'aborted' BEFORE 'scheduled'; + +CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + IF abort THEN + UPDATE procrastinate_jobs + SET status = CASE + WHEN status = 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN status = 'doing' THEN 'aborting'::procrastinate_job_status + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'cancelled'::procrastinate_job_status + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + END IF; + RETURN _job_id; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id bigint, end_status procrastinate_job_status, delete_job boolean) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN + RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; + END IF; + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = end_status, + attempts = + CASE + WHEN status = 'doing' THEN attempts + 1 + ELSE attempts + END + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing", "todo" or "aborting" status (job id: %)', job_id; + END IF; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_trigger_status_events_procedure_update() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + WITH t AS ( + SELECT CASE + WHEN OLD.status = 'todo'::procrastinate_job_status + AND NEW.status = 'doing'::procrastinate_job_status + THEN 'started'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'deferred_for_retry'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'failed'::procrastinate_job_status + THEN 'failed'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'succeeded'::procrastinate_job_status + THEN 'succeeded'::procrastinate_job_event_type + WHEN OLD.status = 'todo'::procrastinate_job_status + AND ( + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status + OR NEW.status = 'succeeded'::procrastinate_job_status + ) + THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborting'::procrastinate_job_status + THEN 'request_to_abort'::procrastinate_job_event_type + WHEN ( + OLD.status = 'doing'::procrastinate_job_status + OR OLD.status = 'aborting'::procrastinate_job_status + ) + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type + ELSE NULL + END as event_type + ) + INSERT INTO procrastinate_events(job_id, type) + SELECT NEW.id, t.event_type + FROM t + WHERE t.event_type IS NOT NULL; + RETURN NEW; +END; +$$; diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 62dece1d6..cdff1f889 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -50,6 +50,14 @@ WHERE id IN ( -- Finish a job, changing it from "doing" to "succeeded" or "failed" SELECT procrastinate_finish_job(%(job_id)s, %(status)s, %(delete_job)s); +-- cancel_job -- +-- Cancel a job, changing it from "todo" to "cancelled" or from "doing" to "aborting" +SELECT procrastinate_cancel_job(%(job_id)s, %(abort)s, %(delete_job)s) AS id; + +-- get_job_status -- +-- Get the status of a job +SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s; + -- retry_job -- -- Retry a job, changing it from "doing" to "todo" SELECT procrastinate_retry_job(%(job_id)s, %(retry_at)s); diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index c9970c80c..6535efb9c 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -8,16 +8,21 @@ CREATE TYPE procrastinate_job_status AS ENUM ( 'todo', -- The job is queued 'doing', -- The job has been fetched by a worker 'succeeded', -- The job ended successfully - 'failed' -- The job ended with an error + 'failed', -- The job ended with an error + 'cancelled', -- The job was cancelled + 'aborting', -- The job was requested to abort + 'aborted' -- The job was aborted ); CREATE TYPE procrastinate_job_event_type AS ENUM ( 'deferred', -- Job created, in todo 'started', -- todo -> doing 'deferred_for_retry', -- doing -> todo - 'failed', -- doing -> failed - 'succeeded', -- doing -> succeeded - 'cancelled', -- todo -> failed or succeeded + 'failed', -- doing or aborting -> failed + 'succeeded', -- doing or aborting -> succeeded + 'cancelled', -- todo -> cancelled + 'request_to_abort', -- doing -> aborting + 'aborted', -- doing or aborting -> aborted 'scheduled' -- not an event transition, but recording when a task is scheduled for ); @@ -198,12 +203,12 @@ AS $$ DECLARE _job_id bigint; BEGIN - IF end_status NOT IN ('succeeded', 'failed') THEN - RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; + IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN + RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; END IF; IF delete_job THEN DELETE FROM procrastinate_jobs - WHERE id = job_id AND status IN ('todo', 'doing') + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') RETURNING id INTO _job_id; ELSE UPDATE procrastinate_jobs @@ -213,15 +218,47 @@ BEGIN WHEN status = 'doing' THEN attempts + 1 ELSE attempts END - WHERE id = job_id AND status IN ('todo', 'doing') + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') RETURNING id INTO _job_id; END IF; IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; + RAISE 'Job was not found or not in "doing", "todo" or "aborting" status (job id: %)', job_id; END IF; END; $$; +CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + IF abort THEN + UPDATE procrastinate_jobs + SET status = CASE + WHEN status = 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN status = 'doing' THEN 'aborting'::procrastinate_job_status + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'cancelled'::procrastinate_job_status + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + END IF; + RETURN _job_id; +END; +$$; + CREATE FUNCTION procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) RETURNS void LANGUAGE plpgsql @@ -284,10 +321,20 @@ BEGIN THEN 'succeeded'::procrastinate_job_event_type WHEN OLD.status = 'todo'::procrastinate_job_status AND ( - NEW.status = 'failed'::procrastinate_job_status + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status OR NEW.status = 'succeeded'::procrastinate_job_status ) THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborting'::procrastinate_job_status + THEN 'request_to_abort'::procrastinate_job_event_type + WHEN ( + OLD.status = 'doing'::procrastinate_job_status + OR OLD.status = 'aborting'::procrastinate_job_status + ) + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type ELSE NULL END as event_type ) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index a747f258c..6d6631be9 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -224,6 +224,26 @@ def finish_job_run(self, job_id: int, status: str, delete_job: bool) -> None: job_row["attempts"] += 1 self.events[job_id].append({"type": status, "at": utils.utcnow()}) + def cancel_job_one(self, job_id: int, abort: bool, delete_job: bool) -> dict: + job_row = self.jobs[job_id] + + if job_row["status"] == "todo": + if delete_job: + self.jobs.pop(job_id) + return {"id": job_id} + + job_row["status"] = "cancelled" + return {"id": job_id} + + if abort and job_row["status"] == "doing": + job_row["status"] = "aborting" + return {"id": job_id} + + return {"id": None} + + def get_job_status_one(self, job_id: int) -> dict: + return {"status": self.jobs[job_id]["status"]} + def retry_job_run(self, job_id: int, retry_at: datetime.datetime) -> None: job_row = self.jobs[job_id] job_row["status"] = "todo" diff --git a/procrastinate/worker.py b/procrastinate/worker.py index 574df84e3..f0374faaa 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -195,6 +195,9 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: try: await self.run_job(job=job, worker_id=worker_id) status = jobs.Status.SUCCEEDED + except exceptions.JobAborted: + status = jobs.Status.ABORTED + except exceptions.JobError as e: status = jobs.Status.FAILED if e.retry_exception: @@ -281,6 +284,14 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: if inspect.isawaitable(task_result): task_result = await task_result + except exceptions.JobAborted as e: + task_result = None + log_title = "Aborted" + log_action = "job_aborted" + log_level = logging.INFO + exc_info = e + raise + except BaseException as e: task_result = None log_title = "Error" diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index bd07518fb..727c0df77 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -1,9 +1,14 @@ from __future__ import annotations +import asyncio +import time + import pytest from procrastinate import app as app_module from procrastinate.contrib import aiopg +from procrastinate.exceptions import JobAborted +from procrastinate.jobs import Status @pytest.fixture(params=["psycopg_connector", "aiopg_connector"]) @@ -39,3 +44,83 @@ async def product_task(a, b): assert sum_results == [3, 7, 11] assert product_results == [12] + + +async def test_cancel(async_app): + sum_results = [] + + @async_app.task(queue="default", name="sum_task") + async def sum_task(a, b): + sum_results.append(a + b) + + job_id = await sum_task.defer_async(a=1, b=2) + await sum_task.defer_async(a=3, b=4) + + await async_app.job_manager.cancel_job_by_id_async(job_id) + + status = await async_app.job_manager.get_job_status_async(job_id) + assert status == Status.CANCELLED + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 2 + + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] + + +async def test_cancel_with_delete(async_app): + sum_results = [] + + @async_app.task(queue="default", name="sum_task") + async def sum_task(a, b): + sum_results.append(a + b) + + job_id = await sum_task.defer_async(a=1, b=2) + await sum_task.defer_async(a=3, b=4) + + await async_app.job_manager.cancel_job_by_id_async(job_id, delete_job=True) + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 1 + + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] + + +async def test_abort(async_app): + @async_app.task(queue="default", name="task1", pass_context=True) + async def task1(context): + while True: + await asyncio.sleep(1) + if await context.should_abort_async(): + raise JobAborted + + @async_app.task(queue="default", name="task2", pass_context=True) + def task2(context): + while True: + time.sleep(1) + if context.should_abort(): + raise JobAborted + + job1_id = await task1.defer_async() + job2_id = await task2.defer_async() + + worker_task = asyncio.create_task( + async_app.run_worker_async(queues=["default"], wait=False) + ) + + await asyncio.sleep(1) + await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) + + await asyncio.sleep(1) + await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) + + await worker_task + + status = await async_app.job_manager.get_job_status_async(job1_id) + assert status == Status.ABORTED + + status = await async_app.job_manager.get_job_status_async(job2_id) + assert status == Status.ABORTED diff --git a/tests/acceptance/test_shell.py b/tests/acceptance/test_shell.py index 9ac5d8dca..1194c3283 100644 --- a/tests/acceptance/test_shell.py +++ b/tests/acceptance/test_shell.py @@ -67,44 +67,48 @@ async def test_shell(read, write, defer): defer("increment_task", ["--lock=b"], a=5) await write("cancel 2") - assert await read() == ["#2 ns:tests.acceptance.app.sum_task on default - [failed]"] + assert await read() == [ + "#2 ns:tests.acceptance.app.sum_task on default - [cancelled]" + ] await write("cancel 3") - assert await read() == ["#3 ns:tests.acceptance.app.sum_task on other - [failed]"] + assert await read() == [ + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled]" + ] await write("cancel 4") assert await read() == [ - "#4 tests.acceptance.app.increment_task on default - [failed]" + "#4 tests.acceptance.app.increment_task on default - [cancelled]" ] await write("list_jobs") assert await read() == [ "#1 ns:tests.acceptance.app.sum_task on default - [todo]", - "#2 ns:tests.acceptance.app.sum_task on default - [failed]", - "#3 ns:tests.acceptance.app.sum_task on other - [failed]", - "#4 tests.acceptance.app.increment_task on default - [failed]", + "#2 ns:tests.acceptance.app.sum_task on default - [cancelled]", + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled]", + "#4 tests.acceptance.app.increment_task on default - [cancelled]", ] await write("list_jobs queue=other details") assert await read() == [ - "#3 ns:tests.acceptance.app.sum_task on other - [failed] (attempts=0, priority=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled] (attempts=0, priority=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", ] await write("list_queues") assert await read() == [ - "default: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 2)", - "other: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", + "default: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", + "other: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", ] await write("list_tasks") assert await read() == [ - "ns:tests.acceptance.app.sum_task: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 2)", - "tests.acceptance.app.increment_task: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", + "ns:tests.acceptance.app.sum_task: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", + "tests.acceptance.app.increment_task: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", ] await write("list_locks") assert await read() == [ - "a: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "b: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", - "lock: 2 jobs (todo: 0, doing: 0, succeeded: 0, failed: 2)", + "a: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "b: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", + "lock: 2 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", ] diff --git a/tests/acceptance/test_sync.py b/tests/acceptance/test_sync.py index fd4b3fc83..756b4dcd5 100644 --- a/tests/acceptance/test_sync.py +++ b/tests/acceptance/test_sync.py @@ -4,6 +4,7 @@ import procrastinate from procrastinate.contrib import psycopg2 +from procrastinate.jobs import Status @pytest.fixture(params=["sync_psycopg_connector", "psycopg2_connector"]) @@ -51,3 +52,28 @@ async def product_task(a, b): assert sum_results == [3, 7, 11] assert product_results == [12] + + +async def test_cancel(sync_app, async_app): + sum_results = [] + + @sync_app.task(queue="default", name="sum_task") + def sum_task(a, b): + sum_results.append(a + b) + + job_id = sum_task.defer(a=1, b=2) + sum_task.defer(a=3, b=4) + + sync_app.job_manager.cancel_job_by_id(job_id) + + status = sync_app.job_manager.get_job_status(job_id) + assert status == Status.CANCELLED + + jobs = sync_app.job_manager.list_jobs() + assert len(jobs) == 2 + + # We need to run the async app to execute the tasks + async_app.tasks = sync_app.tasks + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index 6fcd7c29d..a0a2dfc2f 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -261,7 +261,7 @@ async def test_finish_job_wrong_initial_status( await pg_job_manager.finish_job( job=job, status=jobs.Status.FAILED, delete_job=delete_job ) - assert 'Job was not found or not in "doing" or "todo" status' in str( + assert 'Job was not found or not in "doing", "todo" or "aborting" status' in str( excinfo.value.__cause__ ) @@ -276,7 +276,7 @@ async def test_finish_job_wrong_end_status( await pg_job_manager.finish_job( job=job, status=jobs.Status.TODO, delete_job=delete_job ) - assert 'End status should be either "succeeded" or "failed"' in str( + assert 'End status should be either "succeeded", "failed" or "aborted"' in str( excinfo.value.__cause__ ) @@ -452,6 +452,9 @@ async def test_list_queues_dict(fixture_jobs, pg_job_manager): "doing": 0, "succeeded": 0, "failed": 1, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } @@ -479,6 +482,9 @@ async def test_list_tasks_dict(fixture_jobs, pg_job_manager): "doing": 1, "succeeded": 0, "failed": 1, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } diff --git a/tests/unit/test_job_context.py b/tests/unit/test_job_context.py index 54fc7d1ff..77a1af891 100644 --- a/tests/unit/test_job_context.py +++ b/tests/unit/test_job_context.py @@ -2,7 +2,7 @@ import pytest -from procrastinate import job_context +from procrastinate import job_context, jobs @pytest.mark.parametrize( @@ -104,3 +104,43 @@ def test_job_description_job_time(job_factory): job_result=job_context.JobResult(start_timestamp=20.0), ).job_description(current_timestamp=30.0) assert descr == "worker 2: some_task[12](a='b') (started 10.000 s ago)" + + +def test_should_abort(app, job_factory): + job = job_factory(id=12) + app.job_manager.get_job_status = lambda job_id: jobs.Status.ABORTING + context = job_context.JobContext(app=app, job=job) + + assert context.should_abort() is True + + +def test_should_not_abort(app, job_factory): + job = job_factory(id=12) + app.job_manager.get_job_status = lambda job_id: jobs.Status.DOING + context = job_context.JobContext(app=app, job=job) + + assert context.should_abort() is False + + +async def test_should_abort_async(app, job_factory): + job = job_factory(id=12) + + async def get_job_status_async(job_id): + return jobs.Status.ABORTING + + app.job_manager.get_job_status_async = get_job_status_async + context = job_context.JobContext(app=app, job=job) + + assert await context.should_abort_async() is True + + +async def test_should_not_abort_async(app, job_factory): + job = job_factory(id=12) + + async def get_job_status_async(job_id): + return jobs.Status.DOING + + app.job_manager.get_job_status_async = get_job_status_async + context = job_context.JobContext(app=app, job=job) + + assert await context.should_abort_async() is False diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index 2d0230547..8b539d111 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -173,6 +173,103 @@ async def test_finish_job_with_deletion(job_manager, job_factory, connector): assert 1 not in connector.jobs +def test_cancel_todo_job(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + cancelled = job_manager.cancel_job_by_id(job_id=1) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "cancelled" + + +def test_delete_cancelled_todo_job(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + cancelled = job_manager.cancel_job_by_id(job_id=1, delete_job=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": True}, + ) + assert len(connector.jobs) == 0 + + +async def test_cancel_todo_job_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + cancelled = await job_manager.cancel_job_by_id_async(job_id=1) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "cancelled" + + +async def test_delete_cancelled_todo_job_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + cancelled = await job_manager.cancel_job_by_id_async(job_id=1, delete_job=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": True}, + ) + assert len(connector.jobs) == 0 + + +async def test_cancel_doing_job(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + await job_manager.fetch_job(queues=None) + + cancelled = job_manager.cancel_job_by_id(job_id=1) + assert not cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "doing" + + +async def test_abort_doing_job(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + await job_manager.fetch_job(queues=None) + + cancelled = job_manager.cancel_job_by_id(job_id=1, abort=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": True, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "aborting" + + +def test_get_job_status(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + assert job_manager.get_job_status(job_id=1) == jobs.Status.TODO + + +async def test_get_job_status_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + assert await job_manager.get_job_status_async(job_id=1) == jobs.Status.TODO + + await job_manager.fetch_job(queues=None) + assert await job_manager.get_job_status_async(job_id=1) == jobs.Status.DOING + + async def test_retry_job(job_manager, job_factory, connector): job = job_factory(id=1) await job_manager.defer_job_async(job=job) @@ -344,6 +441,9 @@ async def test_list_queues_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -359,6 +459,9 @@ def test_list_queues_(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -374,6 +477,9 @@ async def test_list_tasks_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -389,6 +495,9 @@ def test_list_tasks(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -404,6 +513,9 @@ async def test_list_locks_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -419,6 +531,9 @@ def test_list_locks(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index cb7a5146d..3e2db2cdb 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -145,8 +145,8 @@ def test_list_queues(shell, connector, capsys): shell.do_list_queues("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "queue1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "queue1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -163,7 +163,7 @@ def test_list_queues_filters(shell, connector, capsys): shell.do_list_queues("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -191,8 +191,8 @@ def test_list_tasks(shell, connector, capsys): shell.do_list_tasks("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "task1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "task1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -209,7 +209,7 @@ def test_list_tasks_filters(shell, connector, capsys): shell.do_list_tasks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -237,8 +237,8 @@ def test_list_locks(shell, connector, capsys): shell.do_list_locks("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "lock1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "lock1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -255,7 +255,7 @@ def test_list_locks_filters(shell, connector, capsys): shell.do_list_locks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -314,4 +314,4 @@ def test_cancel(shell, connector, capsys): shell.do_cancel("1") captured = capsys.readouterr() - assert captured.out.strip() == "#1 task on queue - [failed]" + assert captured.out.strip() == "#1 task on queue - [cancelled]" diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 0d5b2d9e3..257e2c871 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -52,6 +52,7 @@ async def mock(worker_id): "side_effect, status", [ (None, "succeeded"), + (exceptions.JobAborted(), "aborted"), (exceptions.JobError(), "failed"), (exceptions.TaskNotFound(), "failed"), ], @@ -324,6 +325,41 @@ def task(): assert all([name == record_worker_name for name in worker_names]) +async def test_run_job_aborted(app, caplog): + caplog.set_level("INFO") + + def job_func(a, b): # pylint: disable=unused-argument + raise exceptions.JobAborted() + + task = tasks.Task(job_func, blueprint=app, queue="yay", name="job") + task.func = job_func + + app.tasks = {"job": task} + + job = jobs.Job( + id=16, + task_kwargs={"a": 9, "b": 3}, + lock="sherlock", + queueing_lock="houba", + task_name="job", + queue="yay", + ) + test_worker = worker.Worker(app, queues=["yay"]) + with pytest.raises(exceptions.JobAborted): + await test_worker.run_job(job=job, worker_id=3) + + assert ( + len( + [ + r + for r in caplog.records + if r.levelname == "INFO" and "Aborted" in r.message + ] + ) + == 1 + ) + + async def test_run_job_error(app, caplog): caplog.set_level("INFO") From 7f66e398700e2c1f7d0272f75b1187cab25e8fba Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 21:24:30 +0000 Subject: [PATCH 02/11] Rename event (to abort_requested) --- .../contrib/django/migrations/0028_add_cancel_states.py | 2 +- procrastinate/contrib/django/models.py | 2 +- .../sql/migrations/02.06.00_01_add_cancel_states.sql | 4 ++-- procrastinate/sql/schema.sql | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/procrastinate/contrib/django/migrations/0028_add_cancel_states.py b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py index 324309d52..935a66731 100644 --- a/procrastinate/contrib/django/migrations/0028_add_cancel_states.py +++ b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py @@ -35,7 +35,7 @@ class Migration(migrations.Migration): ("failed", "failed"), ("succeeded", "succeeded"), ("cancelled", "cancelled"), - ("request_to_abort", "request_to_abort"), + ("abort_requested", "abort_requested"), ("aborted", "aborted"), ("scheduled", "scheduled"), ], diff --git a/procrastinate/contrib/django/models.py b/procrastinate/contrib/django/models.py index fa5141695..3dfeb1494 100644 --- a/procrastinate/contrib/django/models.py +++ b/procrastinate/contrib/django/models.py @@ -94,7 +94,7 @@ class ProcrastinateEvent(ProcrastinateReadOnlyModelMixin, models.Model): "failed", "succeeded", "cancelled", - "request_to_abort", + "abort_requested", "aborted", "scheduled", ) diff --git a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql index e3a6940a5..d4522f159 100644 --- a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql +++ b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql @@ -2,7 +2,7 @@ ALTER TYPE procrastinate_job_status ADD VALUE 'cancelled'; ALTER TYPE procrastinate_job_status ADD VALUE 'aborting'; ALTER TYPE procrastinate_job_status ADD VALUE 'aborted'; -ALTER TYPE procrastinate_job_event_type ADD VALUE 'request_to_abort' BEFORE 'scheduled'; +ALTER TYPE procrastinate_job_event_type ADD VALUE 'abort_requested' BEFORE 'scheduled'; ALTER TYPE procrastinate_job_event_type ADD VALUE 'aborted' BEFORE 'scheduled'; CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) @@ -96,7 +96,7 @@ BEGIN THEN 'cancelled'::procrastinate_job_event_type WHEN OLD.status = 'doing'::procrastinate_job_status AND NEW.status = 'aborting'::procrastinate_job_status - THEN 'request_to_abort'::procrastinate_job_event_type + THEN 'abort_requested'::procrastinate_job_event_type WHEN ( OLD.status = 'doing'::procrastinate_job_status OR OLD.status = 'aborting'::procrastinate_job_status diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 6535efb9c..16bb9d02b 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -21,7 +21,7 @@ CREATE TYPE procrastinate_job_event_type AS ENUM ( 'failed', -- doing or aborting -> failed 'succeeded', -- doing or aborting -> succeeded 'cancelled', -- todo -> cancelled - 'request_to_abort', -- doing -> aborting + 'abort_requested', -- doing -> aborting 'aborted', -- doing or aborting -> aborted 'scheduled' -- not an event transition, but recording when a task is scheduled for ); @@ -328,7 +328,7 @@ BEGIN THEN 'cancelled'::procrastinate_job_event_type WHEN OLD.status = 'doing'::procrastinate_job_status AND NEW.status = 'aborting'::procrastinate_job_status - THEN 'request_to_abort'::procrastinate_job_event_type + THEN 'abort_requested'::procrastinate_job_event_type WHEN ( OLD.status = 'doing'::procrastinate_job_status OR OLD.status = 'aborting'::procrastinate_job_status From af9c491a78ce848fd51bb88fce5c58a409888bb5 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 21:26:10 +0000 Subject: [PATCH 03/11] Shorten message of JobAborted --- procrastinate/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index b992e2206..34b5e36d9 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -65,7 +65,7 @@ def __init__( class JobAborted(ProcrastinateException): """ - Job was aborted (usually as reaction to an abortion request). + Job was aborted. """ From 532fbaca50d30dffc13a8171470ba210db620ccb Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 21:46:50 +0000 Subject: [PATCH 04/11] Be more clear about the return value of cancel_job_by_id (also add some assertions) --- procrastinate/manager.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 220888806..9b12b81bb 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -253,8 +253,8 @@ def cancel_job_by_id( The id of the job to cancel abort : ``bool`` If True, a job in ``doing`` state will be marked as ``aborting``, but the task - itself has to respect the abortion request. If False, only jobs in ``todo`` state - will be set to ``cancelled`` and won't be processed by a worker anymore. + itself has to respect the abortion request. If False, only jobs in ``todo`` + state will be set to ``cancelled`` and won't be processed by a worker anymore. delete_job : ``bool`` If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted. @@ -262,7 +262,8 @@ def cancel_job_by_id( Returns ------- ``bool`` - True if the job to be cancelled was found, False otherwise. + True if the job to be cancelled (or to mark for abortion) was found. False if + no job with that ID or in a state to be cancelled was found. """ result = self.connector.get_sync_connector().execute_query_one( query=sql.queries["cancel_job"], @@ -270,7 +271,12 @@ def cancel_job_by_id( abort=abort, delete_job=delete_job, ) - return result["id"] is not None + + if result["id"] is None: + return False + + assert result["id"] == job_id + return True async def cancel_job_by_id_async( self, job_id: int, abort: bool = False, delete_job=False @@ -284,8 +290,8 @@ async def cancel_job_by_id_async( The id of the job to cancel abort : ``bool`` If True, a job in ``doing`` state will be marked as ``aborting``, but the task - itself has to respect the abortion request. If False, only jobs in ``todo`` state - will be set to ``cancelled`` and won't be processed by a worker anymore. + itself has to respect the abortion request. If False, only jobs in ``todo`` + state will be set to ``cancelled`` and won't be processed by a worker anymore. delete_job : ``bool`` If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted. @@ -293,7 +299,8 @@ async def cancel_job_by_id_async( Returns ------- ``bool`` - True if the job to be cancelled was found, False otherwise. + True if the job to be cancelled (or to mark for abortion) was found. False if + no job with that ID or in a state to be cancelled was found. """ result = await self.connector.execute_query_one_async( query=sql.queries["cancel_job"], @@ -301,7 +308,12 @@ async def cancel_job_by_id_async( abort=abort, delete_job=delete_job, ) - return result["id"] is not None + + if result["id"] is None: + return False + + assert result["id"] == job_id + return True def get_job_status(self, job_id: int) -> jobs.Status: """ From 505d039af63ba113beb6c9b2ee29c7804e633d0f Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 21:57:52 +0000 Subject: [PATCH 05/11] Speed up acceptance test with shorter sleep time --- tests/acceptance/test_async.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index 727c0df77..3b43cee21 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -93,14 +93,14 @@ async def test_abort(async_app): @async_app.task(queue="default", name="task1", pass_context=True) async def task1(context): while True: - await asyncio.sleep(1) + await asyncio.sleep(0.1) if await context.should_abort_async(): raise JobAborted @async_app.task(queue="default", name="task2", pass_context=True) def task2(context): while True: - time.sleep(1) + time.sleep(0.1) if context.should_abort(): raise JobAborted @@ -111,10 +111,10 @@ def task2(context): async_app.run_worker_async(queues=["default"], wait=False) ) - await asyncio.sleep(1) + await asyncio.sleep(0.1) await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) - await asyncio.sleep(1) + await asyncio.sleep(0.1) await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) await worker_task From d34223c64da1c1ef1c82073bd729896702032657 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 22:05:22 +0000 Subject: [PATCH 06/11] Add documentation about message of JobAborted --- docs/howto/advanced/cancellation.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/howto/advanced/cancellation.md b/docs/howto/advanced/cancellation.md index 42cbdbaa6..9310ae486 100644 --- a/docs/howto/advanced/cancellation.md +++ b/docs/howto/advanced/cancellation.md @@ -42,7 +42,8 @@ await app.job_manager.cancel_job_by_id_async(33, abort=True) In our task, we can check (for example, periodically) if the task should be aborted. If we want to respect that request (we don't have to), we raise a -`JobAborted` error. +`JobAborted` error. Any message passed to `JobAborted` (e.g. +`raise JobAborted("custom message")`) will end up in the logs. ```python @app.task(pass_context=True) From 729dbbeb58531580718c4335f160f28ffbdce1d4 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 22:15:27 +0000 Subject: [PATCH 07/11] Use simple form of CASE in SQL --- .../sql/migrations/02.06.00_01_add_cancel_states.sql | 6 +++--- procrastinate/sql/schema.sql | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql index d4522f159..0541471d9 100644 --- a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql +++ b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql @@ -20,9 +20,9 @@ BEGIN IF _job_id IS NULL THEN IF abort THEN UPDATE procrastinate_jobs - SET status = CASE - WHEN status = 'todo' THEN 'cancelled'::procrastinate_job_status - WHEN status = 'doing' THEN 'aborting'::procrastinate_job_status + SET status = CASE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN 'doing' THEN 'aborting'::procrastinate_job_status END WHERE id = job_id AND status IN ('todo', 'doing') RETURNING id INTO _job_id; diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 16bb9d02b..30f733256 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -242,9 +242,9 @@ BEGIN IF _job_id IS NULL THEN IF abort THEN UPDATE procrastinate_jobs - SET status = CASE - WHEN status = 'todo' THEN 'cancelled'::procrastinate_job_status - WHEN status = 'doing' THEN 'aborting'::procrastinate_job_status + SET status = CASE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN 'doing' THEN 'aborting'::procrastinate_job_status END WHERE id = job_id AND status IN ('todo', 'doing') RETURNING id INTO _job_id; From 9ca9a082c8e19e4dcee44833f6e31e41aa147c21 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 24 Jun 2024 23:53:54 +0000 Subject: [PATCH 08/11] Simulate normal job cycle to test job context --- tests/unit/test_job_context.py | 42 ++++++++-------------------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/tests/unit/test_job_context.py b/tests/unit/test_job_context.py index 77a1af891..0d3a5fe0d 100644 --- a/tests/unit/test_job_context.py +++ b/tests/unit/test_job_context.py @@ -2,7 +2,7 @@ import pytest -from procrastinate import job_context, jobs +from procrastinate import job_context @pytest.mark.parametrize( @@ -106,41 +106,19 @@ def test_job_description_job_time(job_factory): assert descr == "worker 2: some_task[12](a='b') (started 10.000 s ago)" -def test_should_abort(app, job_factory): - job = job_factory(id=12) - app.job_manager.get_job_status = lambda job_id: jobs.Status.ABORTING +async def test_should_abort(app, job_factory): + await app.job_manager.defer_job_async(job=job_factory()) + job = await app.job_manager.fetch_job(queues=None) + await app.job_manager.cancel_job_by_id_async(job.id, abort=True) context = job_context.JobContext(app=app, job=job) - assert context.should_abort() is True - - -def test_should_not_abort(app, job_factory): - job = job_factory(id=12) - app.job_manager.get_job_status = lambda job_id: jobs.Status.DOING - context = job_context.JobContext(app=app, job=job) - - assert context.should_abort() is False - - -async def test_should_abort_async(app, job_factory): - job = job_factory(id=12) - - async def get_job_status_async(job_id): - return jobs.Status.ABORTING - - app.job_manager.get_job_status_async = get_job_status_async - context = job_context.JobContext(app=app, job=job) - assert await context.should_abort_async() is True -async def test_should_not_abort_async(app, job_factory): - job = job_factory(id=12) - - async def get_job_status_async(job_id): - return jobs.Status.DOING - - app.job_manager.get_job_status_async = get_job_status_async +async def test_should_not_abort(app, job_factory): + await app.job_manager.defer_job_async(job=job_factory()) + job = await app.job_manager.fetch_job(queues=None) + await app.job_manager.cancel_job_by_id_async(job.id) context = job_context.JobContext(app=app, job=job) - + assert context.should_abort() is False assert await context.should_abort_async() is False From 5bc37b65031d3120ee567c416878d9b5cf8584b1 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 25 Jun 2024 00:09:50 +0000 Subject: [PATCH 09/11] Test result of cancel method --- tests/acceptance/test_async.py | 29 +++++++++++++++++++++++++---- tests/acceptance/test_sync.py | 20 +++++++++++++++++++- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index 3b43cee21..ba6618e10 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -56,7 +56,8 @@ async def sum_task(a, b): job_id = await sum_task.defer_async(a=1, b=2) await sum_task.defer_async(a=3, b=4) - await async_app.job_manager.cancel_job_by_id_async(job_id) + result = await async_app.job_manager.cancel_job_by_id_async(job_id) + assert result is True status = await async_app.job_manager.get_job_status_async(job_id) assert status == Status.CANCELLED @@ -79,7 +80,8 @@ async def sum_task(a, b): job_id = await sum_task.defer_async(a=1, b=2) await sum_task.defer_async(a=3, b=4) - await async_app.job_manager.cancel_job_by_id_async(job_id, delete_job=True) + result = await async_app.job_manager.cancel_job_by_id_async(job_id, delete_job=True) + assert result is True jobs = await async_app.job_manager.list_jobs_async() assert len(jobs) == 1 @@ -89,6 +91,23 @@ async def sum_task(a, b): assert sum_results == [7] +async def test_no_job_to_cancel_found(async_app): + @async_app.task(queue="default", name="example_task") + def example_task(): + pass + + job_id = await example_task.defer_async() + + result = await async_app.job_manager.cancel_job_by_id_async(job_id + 1) + assert result is False + + status = await async_app.job_manager.get_job_status_async(job_id) + assert status == Status.TODO + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 1 + + async def test_abort(async_app): @async_app.task(queue="default", name="task1", pass_context=True) async def task1(context): @@ -112,10 +131,12 @@ def task2(context): ) await asyncio.sleep(0.1) - await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) + result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) + assert result is True await asyncio.sleep(0.1) - await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) + result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) + assert result is True await worker_task diff --git a/tests/acceptance/test_sync.py b/tests/acceptance/test_sync.py index 756b4dcd5..f856db2b5 100644 --- a/tests/acceptance/test_sync.py +++ b/tests/acceptance/test_sync.py @@ -64,7 +64,8 @@ def sum_task(a, b): job_id = sum_task.defer(a=1, b=2) sum_task.defer(a=3, b=4) - sync_app.job_manager.cancel_job_by_id(job_id) + result = sync_app.job_manager.cancel_job_by_id(job_id) + assert result is True status = sync_app.job_manager.get_job_status(job_id) assert status == Status.CANCELLED @@ -77,3 +78,20 @@ def sum_task(a, b): await async_app.run_worker_async(queues=["default"], wait=False) assert sum_results == [7] + + +def test_no_job_to_cancel_found(sync_app): + @sync_app.task(queue="default", name="example_task") + def example_task(): + pass + + job_id = example_task.defer() + + result = sync_app.job_manager.cancel_job_by_id(job_id + 1) + assert result is False + + status = sync_app.job_manager.get_job_status(job_id) + assert status == Status.TODO + + jobs = sync_app.job_manager.list_jobs() + assert len(jobs) == 1 From 270bc651679e612e8d8d048fe96620f6716f3a28 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 25 Jun 2024 10:30:04 +0000 Subject: [PATCH 10/11] Respect lock when job is aborting --- .../02.06.00_01_add_cancel_states.sql | 38 +++++++++++++++++++ procrastinate/sql/schema.sql | 2 +- tests/integration/test_manager.py | 10 +++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql index 0541471d9..72974be6f 100644 --- a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql +++ b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql @@ -37,6 +37,44 @@ BEGIN END; $$; +CREATE OR REPLACE FUNCTION procrastinate_fetch_job( + target_queue_names character varying[] +) + RETURNS procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE + jobs.lock IS NOT NULL + AND earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing', 'aborting') + AND earlier_jobs.id < jobs.id) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; + CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id bigint, end_status procrastinate_job_status, delete_job boolean) RETURNS void LANGUAGE plpgsql diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 30f733256..d696cea1b 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -175,7 +175,7 @@ BEGIN WHERE jobs.lock IS NOT NULL AND earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.status IN ('todo', 'doing', 'aborting') AND earlier_jobs.id < jobs.id) AND jobs.status = 'todo' AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index a0a2dfc2f..1da333a62 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -79,6 +79,16 @@ async def test_fetch_job_not_fetching_locked_job( assert await pg_job_manager.fetch_job(queues=None) is None +async def test_fetch_job_respect_lock_aborting_job( + pg_job_manager, deferred_job_factory, fetched_job_factory +): + job = await fetched_job_factory(lock="lock_1") + await deferred_job_factory(lock="lock_1") + + await pg_job_manager.cancel_job_by_id_async(job.id, abort=True) + assert await pg_job_manager.fetch_job(queues=None) is None + + async def test_fetch_job_spacial_case_none_lock( pg_job_manager, deferred_job_factory, fetched_job_factory ): From 3502acb511e81c6597dc4ef392ce2cc65dceca12 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 25 Jun 2024 10:58:27 +0000 Subject: [PATCH 11/11] Improve docstring of cancel method and replace canceled with cancelled in some other docs --- docs/howto/advanced/cancellation.md | 2 +- procrastinate/jobs.py | 2 +- procrastinate/manager.py | 12 +++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/howto/advanced/cancellation.md b/docs/howto/advanced/cancellation.md index 9310ae486..c8bddfe2c 100644 --- a/docs/howto/advanced/cancellation.md +++ b/docs/howto/advanced/cancellation.md @@ -13,7 +13,7 @@ app.job_manager.cancel_job_by_id(33) await app.job_manager.cancel_job_by_id_async(33) ``` -## Delete the canceled job +## Delete the cancelled job A cancelled job can also be deleted from the database. diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index 21731da08..bcd55a6d5 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -38,7 +38,7 @@ class Status(Enum): DOING = "doing" #: A worker is running the job SUCCEEDED = "succeeded" #: The job ended successfully FAILED = "failed" #: The job ended with an error - CANCELLED = "cancelled" #: The job was canceled + CANCELLED = "cancelled" #: The job was cancelled ABORTING = "aborting" #: The job is requested to be aborted ABORTED = "aborted" #: The job was aborted diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 9b12b81bb..5f26dac28 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -262,8 +262,9 @@ def cancel_job_by_id( Returns ------- ``bool`` - True if the job to be cancelled (or to mark for abortion) was found. False if - no job with that ID or in a state to be cancelled was found. + If True, the job was cancelled (or its abortion was requested). If False, + nothing was done: either there is no job with this id or it's not in a state + where it may be cancelled (i.e. `todo` or `doing`) """ result = self.connector.get_sync_connector().execute_query_one( query=sql.queries["cancel_job"], @@ -299,8 +300,9 @@ async def cancel_job_by_id_async( Returns ------- ``bool`` - True if the job to be cancelled (or to mark for abortion) was found. False if - no job with that ID or in a state to be cancelled was found. + If True, the job was cancelled (or its abortion was requested). If False, + nothing was done: either there is no job with this id or it's not in a state + where it may be cancelled (i.e. `todo` or `doing`) """ result = await self.connector.execute_query_one_async( query=sql.queries["cancel_job"], @@ -416,7 +418,7 @@ async def listen_for_jobs( defer operation is seen. This coroutine either returns ``None`` upon calling if it cannot start - listening or does not return and needs to be canceled to end. + listening or does not return and needs to be cancelled to end. Parameters ----------