diff --git a/docs/howto/advanced.md b/docs/howto/advanced.md index 4ea0325f0..aee8516a6 100644 --- a/docs/howto/advanced.md +++ b/docs/howto/advanced.md @@ -8,6 +8,7 @@ advanced/context advanced/locks advanced/schedule advanced/priorities +advanced/cancellation advanced/queueing_locks advanced/cron advanced/retry diff --git a/docs/howto/advanced/cancellation.md b/docs/howto/advanced/cancellation.md new file mode 100644 index 000000000..c8bddfe2c --- /dev/null +++ b/docs/howto/advanced/cancellation.md @@ -0,0 +1,72 @@ +# Cancel a job + +We can cancel a job that has not yet been processed by a worker. We can also +mark a job that is currently being processed for abortion, but this request +has to be handled by the task itself. + +## Cancel a job (that is not being processed yet) + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33) +``` + +## Delete the cancelled job + +A cancelled job can also be deleted from the database. + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33, delete_job=True) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33, delete_job=True) +``` + +## Mark a currently being processed job for abortion + +If a worker has not picked up the job yet, the below command behaves like the +command without the `abort` option. But if a job is already in the middle of +being processed, the `abort` option marks this job for abortion (see below +how to handle this request). + +```python +# by using the sync method +app.job_manager.cancel_job_by_id(33, abort=True) +# or by using the async method +await app.job_manager.cancel_job_by_id_async(33, abort=True) +``` + +## Handle a abortion request inside the task + +In our task, we can check (for example, periodically) if the task should be +aborted. If we want to respect that request (we don't have to), we raise a +`JobAborted` error. Any message passed to `JobAborted` (e.g. +`raise JobAborted("custom message")`) will end up in the logs. + +```python +@app.task(pass_context=True) +def my_task(context): + for i in range(100): + if context.should_abort(): + raise exceptions.JobAborted + do_something_expensive() +``` + +There is also an async API + +```python +@app.task(pass_context=True) +async def my_task(context): + for i in range(100): + if await context.should_abort_async(): + raise exceptions.JobAborted + do_something_expensive() +``` + +:::{warning} +`context.should_abort()` and `context.should_abort_async()` does poll the +database and might flood the database. Ensure you do it only sometimes and +not from too many parallel tasks. +::: diff --git a/procrastinate/contrib/django/migrations/0028_add_cancel_states.py b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py new file mode 100644 index 000000000..935a66731 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0028_add_cancel_states.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from django.db import migrations, models + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL(name="02.06.00_01_add_cancel_states.sql"), + migrations.AlterField( + "procrastinatejob", + "status", + models.CharField( + choices=[ + ("todo", "todo"), + ("doing", "doing"), + ("succeeded", "succeeded"), + ("failed", "failed"), + ("cancelled", "cancelled"), + ("aborting", "aborting"), + ("aborted", "aborted"), + ], + max_length=32, + ), + ), + migrations.AlterField( + "procrastinateevent", + "type", + models.CharField( + choices=[ + ("deferred", "deferred"), + ("started", "started"), + ("deferred_for_retry", "deferred_for_retry"), + ("failed", "failed"), + ("succeeded", "succeeded"), + ("cancelled", "cancelled"), + ("abort_requested", "abort_requested"), + ("aborted", "aborted"), + ("scheduled", "scheduled"), + ], + max_length=32, + ), + ), + ] + name = "0028_add_cancel_states" + dependencies = [("procrastinate", "0027_add_periodic_job_priority")] diff --git a/procrastinate/contrib/django/models.py b/procrastinate/contrib/django/models.py index 24bedbb31..3dfeb1494 100644 --- a/procrastinate/contrib/django/models.py +++ b/procrastinate/contrib/django/models.py @@ -64,6 +64,9 @@ class ProcrastinateJob(ProcrastinateReadOnlyModelMixin, models.Model): "doing", "succeeded", "failed", + "cancelled", + "aborting", + "aborted", ) id = models.BigAutoField(primary_key=True) queue_name = models.CharField(max_length=128) @@ -91,6 +94,8 @@ class ProcrastinateEvent(ProcrastinateReadOnlyModelMixin, models.Model): "failed", "succeeded", "cancelled", + "abort_requested", + "aborted", "scheduled", ) id = models.BigAutoField(primary_key=True) diff --git a/procrastinate/exceptions.py b/procrastinate/exceptions.py index 1118e14a6..34b5e36d9 100644 --- a/procrastinate/exceptions.py +++ b/procrastinate/exceptions.py @@ -63,6 +63,12 @@ def __init__( self.critical = critical +class JobAborted(ProcrastinateException): + """ + Job was aborted. + """ + + class AppNotOpen(ProcrastinateException): """ App was not open. Procrastinate App needs to be opened using: diff --git a/procrastinate/job_context.py b/procrastinate/job_context.py index b453064b0..a35fe5582 100644 --- a/procrastinate/job_context.py +++ b/procrastinate/job_context.py @@ -101,3 +101,21 @@ def job_description(self, current_timestamp: float) -> str: message += "no current job" return message + + def should_abort(self) -> bool: + assert self.app + assert self.job + assert self.job.id + + job_id = self.job.id + status = self.app.job_manager.get_job_status(job_id) + return status == jobs.Status.ABORTING + + async def should_abort_async(self) -> bool: + assert self.app + assert self.job + assert self.job.id + + job_id = self.job.id + status = await self.app.job_manager.get_job_status_async(job_id) + return status == jobs.Status.ABORTING diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index 3955713a7..bcd55a6d5 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -38,6 +38,9 @@ class Status(Enum): DOING = "doing" #: A worker is running the job SUCCEEDED = "succeeded" #: The job ended successfully FAILED = "failed" #: The job ended with an error + CANCELLED = "cancelled" #: The job was cancelled + ABORTING = "aborting" #: The job is requested to be aborted + ABORTED = "aborted" #: The job was aborted @attr.dataclass(frozen=True, kw_only=True) diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 59339a28b..5f26dac28 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -215,13 +215,13 @@ async def finish_job( delete_job: bool, ) -> None: """ - Set a job to its final state (``succeeded`` or ``failed``). + Set a job to its final state (``succeeded``, ``failed`` or ``aborted``). Parameters ---------- job : `jobs.Job` status : `jobs.Status` - ``succeeded`` or ``failed`` + ``succeeded``, ``failed`` or ``aborted`` """ assert job.id # TODO remove this await self.finish_job_by_id_async( @@ -241,6 +241,118 @@ async def finish_job_by_id_async( delete_job=delete_job, ) + def cancel_job_by_id( + self, job_id: int, abort: bool = False, delete_job: bool = False + ) -> bool: + """ + Cancel a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to cancel + abort : ``bool`` + If True, a job in ``doing`` state will be marked as ``aborting``, but the task + itself has to respect the abortion request. If False, only jobs in ``todo`` + state will be set to ``cancelled`` and won't be processed by a worker anymore. + delete_job : ``bool`` + If True, the job will be deleted from the database after being cancelled. Does + not affect the jobs that should be aborted. + + Returns + ------- + ``bool`` + If True, the job was cancelled (or its abortion was requested). If False, + nothing was done: either there is no job with this id or it's not in a state + where it may be cancelled (i.e. `todo` or `doing`) + """ + result = self.connector.get_sync_connector().execute_query_one( + query=sql.queries["cancel_job"], + job_id=job_id, + abort=abort, + delete_job=delete_job, + ) + + if result["id"] is None: + return False + + assert result["id"] == job_id + return True + + async def cancel_job_by_id_async( + self, job_id: int, abort: bool = False, delete_job=False + ) -> bool: + """ + Cancel a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to cancel + abort : ``bool`` + If True, a job in ``doing`` state will be marked as ``aborting``, but the task + itself has to respect the abortion request. If False, only jobs in ``todo`` + state will be set to ``cancelled`` and won't be processed by a worker anymore. + delete_job : ``bool`` + If True, the job will be deleted from the database after being cancelled. Does + not affect the jobs that should be aborted. + + Returns + ------- + ``bool`` + If True, the job was cancelled (or its abortion was requested). If False, + nothing was done: either there is no job with this id or it's not in a state + where it may be cancelled (i.e. `todo` or `doing`) + """ + result = await self.connector.execute_query_one_async( + query=sql.queries["cancel_job"], + job_id=job_id, + abort=abort, + delete_job=delete_job, + ) + + if result["id"] is None: + return False + + assert result["id"] == job_id + return True + + def get_job_status(self, job_id: int) -> jobs.Status: + """ + Get the status of a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to get the status of + + Returns + ------- + `jobs.Status` + """ + result = self.connector.get_sync_connector().execute_query_one( + query=sql.queries["get_job_status"], job_id=job_id + ) + return jobs.Status(result["status"]) + + async def get_job_status_async(self, job_id: int) -> jobs.Status: + """ + Get the status of a job by id. + + Parameters + ---------- + job_id : ``int`` + The id of the job to get the status of + + Returns + ------- + `jobs.Status` + """ + result = await self.connector.execute_query_one_async( + query=sql.queries["get_job_status"], job_id=job_id + ) + return jobs.Status(result["status"]) + async def retry_job( self, job: jobs.Job, @@ -306,7 +418,7 @@ async def listen_for_jobs( defer operation is seen. This coroutine either returns ``None`` upon calling if it cannot start - listening or does not return and needs to be canceled to end. + listening or does not return and needs to be cancelled to end. Parameters ---------- @@ -435,7 +547,8 @@ async def list_queues_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing queues stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ return [ { @@ -445,6 +558,9 @@ async def list_queues_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in await self.connector.execute_query_all_async( query=sql.queries["list_queues"], @@ -473,6 +589,9 @@ def list_queues( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in self.connector.get_sync_connector().execute_query_all( query=sql.queries["list_queues"], @@ -508,7 +627,8 @@ async def list_tasks_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing tasks stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ return [ { @@ -518,6 +638,9 @@ async def list_tasks_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in await self.connector.execute_query_all_async( query=sql.queries["list_tasks"], @@ -546,6 +669,9 @@ def list_tasks( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } for row in self.connector.get_sync_connector().execute_query_all( query=sql.queries["list_tasks"], @@ -581,7 +707,8 @@ async def list_locks_async( ------- ``List[Dict[str, Any]]`` A list of dictionaries representing locks stats (``name``, ``jobs_count``, - ``todo``, ``doing``, ``succeeded``, ``failed``). + ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, + ``aborted``). """ result = [] for row in await self.connector.execute_query_all_async( @@ -599,6 +726,9 @@ async def list_locks_async( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } ) return result @@ -629,6 +759,9 @@ def list_locks( "doing": row["stats"].get("doing", 0), "succeeded": row["stats"].get("succeeded", 0), "failed": row["stats"].get("failed", 0), + "cancelled": row["stats"].get("cancelled", 0), + "aborting": row["stats"].get("aborting", 0), + "aborted": row["stats"].get("aborted", 0), } ) return result diff --git a/procrastinate/shell.py b/procrastinate/shell.py index ac6eb4994..c5940b7ed 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -87,7 +87,10 @@ def do_list_queues(self, arg: str) -> None: f"todo: {queue['todo']}, " f"doing: {queue['doing']}, " f"succeeded: {queue['succeeded']}, " - f"failed: {queue['failed']})" + f"failed: {queue['failed']}, " + f"cancelled: {queue['cancelled']}, " + f"aborting: {queue['aborting']}, " + f"aborted: {queue['aborted']})" ) def do_list_tasks(self, arg: str) -> None: @@ -107,7 +110,10 @@ def do_list_tasks(self, arg: str) -> None: f"todo: {task['todo']}, " f"doing: {task['doing']}, " f"succeeded: {task['succeeded']}, " - f"failed: {task['failed']})" + f"failed: {task['failed']}, " + f"cancelled: {task['cancelled']}, " + f"aborting: {task['aborting']}, " + f"aborted: {task['aborted']})" ) def do_list_locks(self, arg: str) -> None: @@ -127,7 +133,10 @@ def do_list_locks(self, arg: str) -> None: f"todo: {lock['todo']}, " f"doing: {lock['doing']}, " f"succeeded: {lock['succeeded']}, " - f"failed: {lock['failed']})" + f"failed: {lock['failed']}, " + f"cancelled: {lock['cancelled']}, " + f"aborting: {lock['aborting']}, " + f"aborted: {lock['aborted']})" ) def do_retry(self, arg: str) -> None: @@ -159,12 +168,7 @@ def do_cancel(self, arg: str) -> None: Example: cancel 3 """ job_id = int(arg) - self.async_to_sync( - self.job_manager.finish_job_by_id_async, - job_id=job_id, - status=jobs.Status.FAILED, - delete_job=False, - ) + self.job_manager.cancel_job_by_id(job_id=job_id) (job,) = self.async_to_sync(self.job_manager.list_jobs_async, id=job_id) print_job(job) diff --git a/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql new file mode 100644 index 000000000..72974be6f --- /dev/null +++ b/procrastinate/sql/migrations/02.06.00_01_add_cancel_states.sql @@ -0,0 +1,153 @@ +ALTER TYPE procrastinate_job_status ADD VALUE 'cancelled'; +ALTER TYPE procrastinate_job_status ADD VALUE 'aborting'; +ALTER TYPE procrastinate_job_status ADD VALUE 'aborted'; + +ALTER TYPE procrastinate_job_event_type ADD VALUE 'abort_requested' BEFORE 'scheduled'; +ALTER TYPE procrastinate_job_event_type ADD VALUE 'aborted' BEFORE 'scheduled'; + +CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + IF abort THEN + UPDATE procrastinate_jobs + SET status = CASE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN 'doing' THEN 'aborting'::procrastinate_job_status + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'cancelled'::procrastinate_job_status + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + END IF; + RETURN _job_id; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_fetch_job( + target_queue_names character varying[] +) + RETURNS procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE + jobs.lock IS NOT NULL + AND earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing', 'aborting') + AND earlier_jobs.id < jobs.id) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id bigint, end_status procrastinate_job_status, delete_job boolean) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN + RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; + END IF; + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = end_status, + attempts = + CASE + WHEN status = 'doing' THEN attempts + 1 + ELSE attempts + END + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + RAISE 'Job was not found or not in "doing", "todo" or "aborting" status (job id: %)', job_id; + END IF; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_trigger_status_events_procedure_update() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + WITH t AS ( + SELECT CASE + WHEN OLD.status = 'todo'::procrastinate_job_status + AND NEW.status = 'doing'::procrastinate_job_status + THEN 'started'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'todo'::procrastinate_job_status + THEN 'deferred_for_retry'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'failed'::procrastinate_job_status + THEN 'failed'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'succeeded'::procrastinate_job_status + THEN 'succeeded'::procrastinate_job_event_type + WHEN OLD.status = 'todo'::procrastinate_job_status + AND ( + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status + OR NEW.status = 'succeeded'::procrastinate_job_status + ) + THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborting'::procrastinate_job_status + THEN 'abort_requested'::procrastinate_job_event_type + WHEN ( + OLD.status = 'doing'::procrastinate_job_status + OR OLD.status = 'aborting'::procrastinate_job_status + ) + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type + ELSE NULL + END as event_type + ) + INSERT INTO procrastinate_events(job_id, type) + SELECT NEW.id, t.event_type + FROM t + WHERE t.event_type IS NOT NULL; + RETURN NEW; +END; +$$; diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 62dece1d6..cdff1f889 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -50,6 +50,14 @@ WHERE id IN ( -- Finish a job, changing it from "doing" to "succeeded" or "failed" SELECT procrastinate_finish_job(%(job_id)s, %(status)s, %(delete_job)s); +-- cancel_job -- +-- Cancel a job, changing it from "todo" to "cancelled" or from "doing" to "aborting" +SELECT procrastinate_cancel_job(%(job_id)s, %(abort)s, %(delete_job)s) AS id; + +-- get_job_status -- +-- Get the status of a job +SELECT status FROM procrastinate_jobs WHERE id = %(job_id)s; + -- retry_job -- -- Retry a job, changing it from "doing" to "todo" SELECT procrastinate_retry_job(%(job_id)s, %(retry_at)s); diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index c9970c80c..d696cea1b 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -8,16 +8,21 @@ CREATE TYPE procrastinate_job_status AS ENUM ( 'todo', -- The job is queued 'doing', -- The job has been fetched by a worker 'succeeded', -- The job ended successfully - 'failed' -- The job ended with an error + 'failed', -- The job ended with an error + 'cancelled', -- The job was cancelled + 'aborting', -- The job was requested to abort + 'aborted' -- The job was aborted ); CREATE TYPE procrastinate_job_event_type AS ENUM ( 'deferred', -- Job created, in todo 'started', -- todo -> doing 'deferred_for_retry', -- doing -> todo - 'failed', -- doing -> failed - 'succeeded', -- doing -> succeeded - 'cancelled', -- todo -> failed or succeeded + 'failed', -- doing or aborting -> failed + 'succeeded', -- doing or aborting -> succeeded + 'cancelled', -- todo -> cancelled + 'abort_requested', -- doing -> aborting + 'aborted', -- doing or aborting -> aborted 'scheduled' -- not an event transition, but recording when a task is scheduled for ); @@ -170,7 +175,7 @@ BEGIN WHERE jobs.lock IS NOT NULL AND earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.status IN ('todo', 'doing', 'aborting') AND earlier_jobs.id < jobs.id) AND jobs.status = 'todo' AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) @@ -198,12 +203,12 @@ AS $$ DECLARE _job_id bigint; BEGIN - IF end_status NOT IN ('succeeded', 'failed') THEN - RAISE 'End status should be either "succeeded" or "failed" (job id: %)', job_id; + IF end_status NOT IN ('succeeded', 'failed', 'aborted') THEN + RAISE 'End status should be either "succeeded", "failed" or "aborted" (job id: %)', job_id; END IF; IF delete_job THEN DELETE FROM procrastinate_jobs - WHERE id = job_id AND status IN ('todo', 'doing') + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') RETURNING id INTO _job_id; ELSE UPDATE procrastinate_jobs @@ -213,15 +218,47 @@ BEGIN WHEN status = 'doing' THEN attempts + 1 ELSE attempts END - WHERE id = job_id AND status IN ('todo', 'doing') + WHERE id = job_id AND status IN ('todo', 'doing', 'aborting') RETURNING id INTO _job_id; END IF; IF _job_id IS NULL THEN - RAISE 'Job was not found or not in "doing" or "todo" status (job id: %)', job_id; + RAISE 'Job was not found or not in "doing", "todo" or "aborting" status (job id: %)', job_id; END IF; END; $$; +CREATE FUNCTION procrastinate_cancel_job(job_id bigint, abort boolean, delete_job boolean) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; +BEGIN + IF delete_job THEN + DELETE FROM procrastinate_jobs + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + IF _job_id IS NULL THEN + IF abort THEN + UPDATE procrastinate_jobs + SET status = CASE status + WHEN 'todo' THEN 'cancelled'::procrastinate_job_status + WHEN 'doing' THEN 'aborting'::procrastinate_job_status + END + WHERE id = job_id AND status IN ('todo', 'doing') + RETURNING id INTO _job_id; + ELSE + UPDATE procrastinate_jobs + SET status = 'cancelled'::procrastinate_job_status + WHERE id = job_id AND status = 'todo' + RETURNING id INTO _job_id; + END IF; + END IF; + RETURN _job_id; +END; +$$; + CREATE FUNCTION procrastinate_retry_job(job_id bigint, retry_at timestamp with time zone) RETURNS void LANGUAGE plpgsql @@ -284,10 +321,20 @@ BEGIN THEN 'succeeded'::procrastinate_job_event_type WHEN OLD.status = 'todo'::procrastinate_job_status AND ( - NEW.status = 'failed'::procrastinate_job_status + NEW.status = 'cancelled'::procrastinate_job_status + OR NEW.status = 'failed'::procrastinate_job_status OR NEW.status = 'succeeded'::procrastinate_job_status ) THEN 'cancelled'::procrastinate_job_event_type + WHEN OLD.status = 'doing'::procrastinate_job_status + AND NEW.status = 'aborting'::procrastinate_job_status + THEN 'abort_requested'::procrastinate_job_event_type + WHEN ( + OLD.status = 'doing'::procrastinate_job_status + OR OLD.status = 'aborting'::procrastinate_job_status + ) + AND NEW.status = 'aborted'::procrastinate_job_status + THEN 'aborted'::procrastinate_job_event_type ELSE NULL END as event_type ) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index a747f258c..6d6631be9 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -224,6 +224,26 @@ def finish_job_run(self, job_id: int, status: str, delete_job: bool) -> None: job_row["attempts"] += 1 self.events[job_id].append({"type": status, "at": utils.utcnow()}) + def cancel_job_one(self, job_id: int, abort: bool, delete_job: bool) -> dict: + job_row = self.jobs[job_id] + + if job_row["status"] == "todo": + if delete_job: + self.jobs.pop(job_id) + return {"id": job_id} + + job_row["status"] = "cancelled" + return {"id": job_id} + + if abort and job_row["status"] == "doing": + job_row["status"] = "aborting" + return {"id": job_id} + + return {"id": None} + + def get_job_status_one(self, job_id: int) -> dict: + return {"status": self.jobs[job_id]["status"]} + def retry_job_run(self, job_id: int, retry_at: datetime.datetime) -> None: job_row = self.jobs[job_id] job_row["status"] = "todo" diff --git a/procrastinate/worker.py b/procrastinate/worker.py index 574df84e3..f0374faaa 100644 --- a/procrastinate/worker.py +++ b/procrastinate/worker.py @@ -195,6 +195,9 @@ async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None: try: await self.run_job(job=job, worker_id=worker_id) status = jobs.Status.SUCCEEDED + except exceptions.JobAborted: + status = jobs.Status.ABORTED + except exceptions.JobError as e: status = jobs.Status.FAILED if e.retry_exception: @@ -281,6 +284,14 @@ async def run_job(self, job: jobs.Job, worker_id: int) -> None: if inspect.isawaitable(task_result): task_result = await task_result + except exceptions.JobAborted as e: + task_result = None + log_title = "Aborted" + log_action = "job_aborted" + log_level = logging.INFO + exc_info = e + raise + except BaseException as e: task_result = None log_title = "Error" diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index bd07518fb..ba6618e10 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -1,9 +1,14 @@ from __future__ import annotations +import asyncio +import time + import pytest from procrastinate import app as app_module from procrastinate.contrib import aiopg +from procrastinate.exceptions import JobAborted +from procrastinate.jobs import Status @pytest.fixture(params=["psycopg_connector", "aiopg_connector"]) @@ -39,3 +44,104 @@ async def product_task(a, b): assert sum_results == [3, 7, 11] assert product_results == [12] + + +async def test_cancel(async_app): + sum_results = [] + + @async_app.task(queue="default", name="sum_task") + async def sum_task(a, b): + sum_results.append(a + b) + + job_id = await sum_task.defer_async(a=1, b=2) + await sum_task.defer_async(a=3, b=4) + + result = await async_app.job_manager.cancel_job_by_id_async(job_id) + assert result is True + + status = await async_app.job_manager.get_job_status_async(job_id) + assert status == Status.CANCELLED + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 2 + + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] + + +async def test_cancel_with_delete(async_app): + sum_results = [] + + @async_app.task(queue="default", name="sum_task") + async def sum_task(a, b): + sum_results.append(a + b) + + job_id = await sum_task.defer_async(a=1, b=2) + await sum_task.defer_async(a=3, b=4) + + result = await async_app.job_manager.cancel_job_by_id_async(job_id, delete_job=True) + assert result is True + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 1 + + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] + + +async def test_no_job_to_cancel_found(async_app): + @async_app.task(queue="default", name="example_task") + def example_task(): + pass + + job_id = await example_task.defer_async() + + result = await async_app.job_manager.cancel_job_by_id_async(job_id + 1) + assert result is False + + status = await async_app.job_manager.get_job_status_async(job_id) + assert status == Status.TODO + + jobs = await async_app.job_manager.list_jobs_async() + assert len(jobs) == 1 + + +async def test_abort(async_app): + @async_app.task(queue="default", name="task1", pass_context=True) + async def task1(context): + while True: + await asyncio.sleep(0.1) + if await context.should_abort_async(): + raise JobAborted + + @async_app.task(queue="default", name="task2", pass_context=True) + def task2(context): + while True: + time.sleep(0.1) + if context.should_abort(): + raise JobAborted + + job1_id = await task1.defer_async() + job2_id = await task2.defer_async() + + worker_task = asyncio.create_task( + async_app.run_worker_async(queues=["default"], wait=False) + ) + + await asyncio.sleep(0.1) + result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) + assert result is True + + await asyncio.sleep(0.1) + result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) + assert result is True + + await worker_task + + status = await async_app.job_manager.get_job_status_async(job1_id) + assert status == Status.ABORTED + + status = await async_app.job_manager.get_job_status_async(job2_id) + assert status == Status.ABORTED diff --git a/tests/acceptance/test_shell.py b/tests/acceptance/test_shell.py index 9ac5d8dca..1194c3283 100644 --- a/tests/acceptance/test_shell.py +++ b/tests/acceptance/test_shell.py @@ -67,44 +67,48 @@ async def test_shell(read, write, defer): defer("increment_task", ["--lock=b"], a=5) await write("cancel 2") - assert await read() == ["#2 ns:tests.acceptance.app.sum_task on default - [failed]"] + assert await read() == [ + "#2 ns:tests.acceptance.app.sum_task on default - [cancelled]" + ] await write("cancel 3") - assert await read() == ["#3 ns:tests.acceptance.app.sum_task on other - [failed]"] + assert await read() == [ + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled]" + ] await write("cancel 4") assert await read() == [ - "#4 tests.acceptance.app.increment_task on default - [failed]" + "#4 tests.acceptance.app.increment_task on default - [cancelled]" ] await write("list_jobs") assert await read() == [ "#1 ns:tests.acceptance.app.sum_task on default - [todo]", - "#2 ns:tests.acceptance.app.sum_task on default - [failed]", - "#3 ns:tests.acceptance.app.sum_task on other - [failed]", - "#4 tests.acceptance.app.increment_task on default - [failed]", + "#2 ns:tests.acceptance.app.sum_task on default - [cancelled]", + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled]", + "#4 tests.acceptance.app.increment_task on default - [cancelled]", ] await write("list_jobs queue=other details") assert await read() == [ - "#3 ns:tests.acceptance.app.sum_task on other - [failed] (attempts=0, priority=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", + "#3 ns:tests.acceptance.app.sum_task on other - [cancelled] (attempts=0, priority=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", ] await write("list_queues") assert await read() == [ - "default: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 2)", - "other: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", + "default: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", + "other: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", ] await write("list_tasks") assert await read() == [ - "ns:tests.acceptance.app.sum_task: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 2)", - "tests.acceptance.app.increment_task: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", + "ns:tests.acceptance.app.sum_task: 3 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", + "tests.acceptance.app.increment_task: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", ] await write("list_locks") assert await read() == [ - "a: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "b: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 1)", - "lock: 2 jobs (todo: 0, doing: 0, succeeded: 0, failed: 2)", + "a: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "b: 1 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 1, aborting: 0, aborted: 0)", + "lock: 2 jobs (todo: 0, doing: 0, succeeded: 0, failed: 0, cancelled: 2, aborting: 0, aborted: 0)", ] diff --git a/tests/acceptance/test_sync.py b/tests/acceptance/test_sync.py index fd4b3fc83..f856db2b5 100644 --- a/tests/acceptance/test_sync.py +++ b/tests/acceptance/test_sync.py @@ -4,6 +4,7 @@ import procrastinate from procrastinate.contrib import psycopg2 +from procrastinate.jobs import Status @pytest.fixture(params=["sync_psycopg_connector", "psycopg2_connector"]) @@ -51,3 +52,46 @@ async def product_task(a, b): assert sum_results == [3, 7, 11] assert product_results == [12] + + +async def test_cancel(sync_app, async_app): + sum_results = [] + + @sync_app.task(queue="default", name="sum_task") + def sum_task(a, b): + sum_results.append(a + b) + + job_id = sum_task.defer(a=1, b=2) + sum_task.defer(a=3, b=4) + + result = sync_app.job_manager.cancel_job_by_id(job_id) + assert result is True + + status = sync_app.job_manager.get_job_status(job_id) + assert status == Status.CANCELLED + + jobs = sync_app.job_manager.list_jobs() + assert len(jobs) == 2 + + # We need to run the async app to execute the tasks + async_app.tasks = sync_app.tasks + await async_app.run_worker_async(queues=["default"], wait=False) + + assert sum_results == [7] + + +def test_no_job_to_cancel_found(sync_app): + @sync_app.task(queue="default", name="example_task") + def example_task(): + pass + + job_id = example_task.defer() + + result = sync_app.job_manager.cancel_job_by_id(job_id + 1) + assert result is False + + status = sync_app.job_manager.get_job_status(job_id) + assert status == Status.TODO + + jobs = sync_app.job_manager.list_jobs() + assert len(jobs) == 1 diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index 6fcd7c29d..1da333a62 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -79,6 +79,16 @@ async def test_fetch_job_not_fetching_locked_job( assert await pg_job_manager.fetch_job(queues=None) is None +async def test_fetch_job_respect_lock_aborting_job( + pg_job_manager, deferred_job_factory, fetched_job_factory +): + job = await fetched_job_factory(lock="lock_1") + await deferred_job_factory(lock="lock_1") + + await pg_job_manager.cancel_job_by_id_async(job.id, abort=True) + assert await pg_job_manager.fetch_job(queues=None) is None + + async def test_fetch_job_spacial_case_none_lock( pg_job_manager, deferred_job_factory, fetched_job_factory ): @@ -261,7 +271,7 @@ async def test_finish_job_wrong_initial_status( await pg_job_manager.finish_job( job=job, status=jobs.Status.FAILED, delete_job=delete_job ) - assert 'Job was not found or not in "doing" or "todo" status' in str( + assert 'Job was not found or not in "doing", "todo" or "aborting" status' in str( excinfo.value.__cause__ ) @@ -276,7 +286,7 @@ async def test_finish_job_wrong_end_status( await pg_job_manager.finish_job( job=job, status=jobs.Status.TODO, delete_job=delete_job ) - assert 'End status should be either "succeeded" or "failed"' in str( + assert 'End status should be either "succeeded", "failed" or "aborted"' in str( excinfo.value.__cause__ ) @@ -452,6 +462,9 @@ async def test_list_queues_dict(fixture_jobs, pg_job_manager): "doing": 0, "succeeded": 0, "failed": 1, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } @@ -479,6 +492,9 @@ async def test_list_tasks_dict(fixture_jobs, pg_job_manager): "doing": 1, "succeeded": 0, "failed": 1, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } diff --git a/tests/unit/test_job_context.py b/tests/unit/test_job_context.py index 54fc7d1ff..0d3a5fe0d 100644 --- a/tests/unit/test_job_context.py +++ b/tests/unit/test_job_context.py @@ -104,3 +104,21 @@ def test_job_description_job_time(job_factory): job_result=job_context.JobResult(start_timestamp=20.0), ).job_description(current_timestamp=30.0) assert descr == "worker 2: some_task[12](a='b') (started 10.000 s ago)" + + +async def test_should_abort(app, job_factory): + await app.job_manager.defer_job_async(job=job_factory()) + job = await app.job_manager.fetch_job(queues=None) + await app.job_manager.cancel_job_by_id_async(job.id, abort=True) + context = job_context.JobContext(app=app, job=job) + assert context.should_abort() is True + assert await context.should_abort_async() is True + + +async def test_should_not_abort(app, job_factory): + await app.job_manager.defer_job_async(job=job_factory()) + job = await app.job_manager.fetch_job(queues=None) + await app.job_manager.cancel_job_by_id_async(job.id) + context = job_context.JobContext(app=app, job=job) + assert context.should_abort() is False + assert await context.should_abort_async() is False diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index 2d0230547..8b539d111 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -173,6 +173,103 @@ async def test_finish_job_with_deletion(job_manager, job_factory, connector): assert 1 not in connector.jobs +def test_cancel_todo_job(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + cancelled = job_manager.cancel_job_by_id(job_id=1) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "cancelled" + + +def test_delete_cancelled_todo_job(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + cancelled = job_manager.cancel_job_by_id(job_id=1, delete_job=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": True}, + ) + assert len(connector.jobs) == 0 + + +async def test_cancel_todo_job_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + cancelled = await job_manager.cancel_job_by_id_async(job_id=1) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "cancelled" + + +async def test_delete_cancelled_todo_job_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + cancelled = await job_manager.cancel_job_by_id_async(job_id=1, delete_job=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": True}, + ) + assert len(connector.jobs) == 0 + + +async def test_cancel_doing_job(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + await job_manager.fetch_job(queues=None) + + cancelled = job_manager.cancel_job_by_id(job_id=1) + assert not cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": False, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "doing" + + +async def test_abort_doing_job(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + await job_manager.fetch_job(queues=None) + + cancelled = job_manager.cancel_job_by_id(job_id=1, abort=True) + assert cancelled + assert connector.queries[-1] == ( + "cancel_job", + {"job_id": 1, "abort": True, "delete_job": False}, + ) + assert connector.jobs[1]["status"] == "aborting" + + +def test_get_job_status(job_manager, job_factory, connector): + job = job_factory(id=1) + job_manager.defer_job(job=job) + + assert job_manager.get_job_status(job_id=1) == jobs.Status.TODO + + +async def test_get_job_status_async(job_manager, job_factory, connector): + job = job_factory(id=1) + await job_manager.defer_job_async(job=job) + + assert await job_manager.get_job_status_async(job_id=1) == jobs.Status.TODO + + await job_manager.fetch_job(queues=None) + assert await job_manager.get_job_status_async(job_id=1) == jobs.Status.DOING + + async def test_retry_job(job_manager, job_factory, connector): job = job_factory(id=1) await job_manager.defer_job_async(job=job) @@ -344,6 +441,9 @@ async def test_list_queues_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -359,6 +459,9 @@ def test_list_queues_(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -374,6 +477,9 @@ async def test_list_tasks_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -389,6 +495,9 @@ def test_list_tasks(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -404,6 +513,9 @@ async def test_list_locks_async(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] @@ -419,6 +531,9 @@ def test_list_locks(job_manager, job_factory): "doing": 0, "succeeded": 0, "failed": 0, + "cancelled": 0, + "aborting": 0, + "aborted": 0, } ] diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index cb7a5146d..3e2db2cdb 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -145,8 +145,8 @@ def test_list_queues(shell, connector, capsys): shell.do_list_queues("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "queue1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "queue1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -163,7 +163,7 @@ def test_list_queues_filters(shell, connector, capsys): shell.do_list_queues("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "queue2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -191,8 +191,8 @@ def test_list_tasks(shell, connector, capsys): shell.do_list_tasks("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "task1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "task1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -209,7 +209,7 @@ def test_list_tasks_filters(shell, connector, capsys): shell.do_list_tasks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "task2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -237,8 +237,8 @@ def test_list_locks(shell, connector, capsys): shell.do_list_locks("") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "lock1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", - "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "lock1: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", + "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -255,7 +255,7 @@ def test_list_locks_filters(shell, connector, capsys): shell.do_list_locks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0)", + "lock2: 1 jobs (todo: 1, doing: 0, succeeded: 0, failed: 0, cancelled: 0, aborting: 0, aborted: 0)", ] assert connector.queries == [ ( @@ -314,4 +314,4 @@ def test_cancel(shell, connector, capsys): shell.do_cancel("1") captured = capsys.readouterr() - assert captured.out.strip() == "#1 task on queue - [failed]" + assert captured.out.strip() == "#1 task on queue - [cancelled]" diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 0d5b2d9e3..257e2c871 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -52,6 +52,7 @@ async def mock(worker_id): "side_effect, status", [ (None, "succeeded"), + (exceptions.JobAborted(), "aborted"), (exceptions.JobError(), "failed"), (exceptions.TaskNotFound(), "failed"), ], @@ -324,6 +325,41 @@ def task(): assert all([name == record_worker_name for name in worker_names]) +async def test_run_job_aborted(app, caplog): + caplog.set_level("INFO") + + def job_func(a, b): # pylint: disable=unused-argument + raise exceptions.JobAborted() + + task = tasks.Task(job_func, blueprint=app, queue="yay", name="job") + task.func = job_func + + app.tasks = {"job": task} + + job = jobs.Job( + id=16, + task_kwargs={"a": 9, "b": 3}, + lock="sherlock", + queueing_lock="houba", + task_name="job", + queue="yay", + ) + test_worker = worker.Worker(app, queues=["yay"]) + with pytest.raises(exceptions.JobAborted): + await test_worker.run_job(job=job, worker_id=3) + + assert ( + len( + [ + r + for r in caplog.records + if r.levelname == "INFO" and "Aborted" in r.message + ] + ) + == 1 + ) + + async def test_run_job_error(app, caplog): caplog.set_level("INFO")