From ee4f1b73dc74bf4fc48dc9590e24aa169b873ecc Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 3 Jun 2024 21:22:27 +0000 Subject: [PATCH 01/16] Allow to specify a job priority --- procrastinate/contrib/django/models.py | 1 + .../contrib/django/static_migrations.py | 5 +- procrastinate/jobs.py | 5 + procrastinate/manager.py | 1 + .../02.00.03_01_add_job_priority.sql | 122 ++++++++++++++++++ procrastinate/sql/queries.sql | 7 +- procrastinate/sql/schema.sql | 11 +- procrastinate/tasks.py | 12 +- procrastinate/testing.py | 3 + .../integration/contrib/django/test_models.py | 2 + tests/integration/test_cli.py | 4 + tests/integration/test_psycopg_connector.py | 4 +- tests/unit/test_app.py | 11 +- tests/unit/test_jobs.py | 2 + tests/unit/test_manager.py | 1 + tests/unit/test_shell.py | 32 +++-- tests/unit/test_tasks.py | 1 + tests/unit/test_testing.py | 15 +++ 18 files changed, 213 insertions(+), 26 deletions(-) create mode 100644 procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql diff --git a/procrastinate/contrib/django/models.py b/procrastinate/contrib/django/models.py index 4a402fe2d..24bedbb31 100644 --- a/procrastinate/contrib/django/models.py +++ b/procrastinate/contrib/django/models.py @@ -68,6 +68,7 @@ class ProcrastinateJob(ProcrastinateReadOnlyModelMixin, models.Model): id = models.BigAutoField(primary_key=True) queue_name = models.CharField(max_length=128) task_name = models.CharField(max_length=128) + priority = models.IntegerField() lock = models.TextField(unique=True, blank=True, null=True) args = models.JSONField() status = models.CharField(max_length=32, choices=[(e, e) for e in STATUSES]) diff --git a/procrastinate/contrib/django/static_migrations.py b/procrastinate/contrib/django/static_migrations.py index 21c591dbc..0e3d24057 100644 --- a/procrastinate/contrib/django/static_migrations.py +++ b/procrastinate/contrib/django/static_migrations.py @@ -11,7 +11,7 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ("procrastinate", "0024_job_id_bigint"), + ("procrastinate", "0025_add_job_priority"), ] operations = [ @@ -51,6 +51,7 @@ class Migration(migrations.Migration): ("id", models.BigAutoField(primary_key=True, serialize=False)), ("queue_name", models.CharField(max_length=128)), ("task_name", models.CharField(max_length=128)), + ("priority", models.IntegerField()), ("lock", models.TextField(blank=True, null=True, unique=True)), ("args", models.JSONField()), ( @@ -94,4 +95,4 @@ class Migration(migrations.Migration): ] -static_migrations["0025_add_models"] = Migration +static_migrations["0026_add_models"] = Migration diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index 8249c89d7..4136610f2 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -17,6 +17,7 @@ DEFAULT_QUEUE = "default" +DEFAULT_PRIORITY = 0 cached_property = getattr(functools, "cached_property", property) @@ -51,6 +52,8 @@ class Job: Internal id uniquely identifying the job. status : Status of the job. + priority : + Priority of the job. queue : Queue name the job will be run in. lock : @@ -70,6 +73,7 @@ class Job: id: int | None = None status: str | None = None queue: str + priority: int = 0 lock: str | None queueing_lock: str | None task_name: str @@ -84,6 +88,7 @@ def from_row(cls, row: dict[str, Any]) -> Job: return cls( id=row["id"], status=row["status"], + priority=row["priority"], lock=row["lock"], queueing_lock=row["queueing_lock"], task_name=row["task_name"], diff --git a/procrastinate/manager.py b/procrastinate/manager.py index e69152c6d..618967e38 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -64,6 +64,7 @@ def _defer_job_query_kwargs(self, job: jobs.Job) -> dict[str, Any]: "query": sql.queries["defer_job"], "task_name": job.task_name, "queue": job.queue, + "priority": job.priority, "lock": job.lock, "queueing_lock": job.queueing_lock, "args": job.task_kwargs, diff --git a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql new file mode 100644 index 000000000..6df46dbb1 --- /dev/null +++ b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql @@ -0,0 +1,122 @@ +ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL; + +DROP FUNCTION IF EXISTS procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone); +CREATE OR REPLACE FUNCTION procrastinate_defer_job( + queue_name character varying, + task_name character varying, + priority integer, + lock text, + queueing_lock text, + args jsonb, + scheduled_at timestamp with time zone +) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + job_id bigint; +BEGIN + INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) + VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) + RETURNING id INTO job_id; + + RETURN job_id; +END; +$$; + +DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, character varying, bigint, jsonb); +CREATE OR REPLACE FUNCTION procrastinate_defer_periodic_job( + _queue_name character varying, + _lock character varying, + _queueing_lock character varying, + _task_name character varying, + _periodic_id character varying, + _defer_timestamp bigint, + _args jsonb +) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + _job_id bigint; + _defer_id bigint; +BEGIN + + INSERT + INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp) + VALUES (_task_name, _periodic_id, _defer_timestamp) + ON CONFLICT DO NOTHING + RETURNING id into _defer_id; + + IF _defer_id IS NULL THEN + RETURN NULL; + END IF; + + UPDATE procrastinate_periodic_defers + SET job_id = procrastinate_defer_job( + _queue_name, + _task_name, + 0, + _lock, + _queueing_lock, + _args, + NULL + ) + WHERE id = _defer_id + RETURNING job_id INTO _job_id; + + DELETE + FROM procrastinate_periodic_defers + USING ( + SELECT id + FROM procrastinate_periodic_defers + WHERE procrastinate_periodic_defers.task_name = _task_name + AND procrastinate_periodic_defers.periodic_id = _periodic_id + AND procrastinate_periodic_defers.defer_timestamp < _defer_timestamp + ORDER BY id + FOR UPDATE + ) to_delete + WHERE procrastinate_periodic_defers.id = to_delete.id; + + RETURN _job_id; +END; +$$; + +DROP FUNCTION IF EXISTS procrastinate_fetch_job(character varying[]); +CREATE OR REPLACE FUNCTION procrastinate_fetch_job( + target_queue_names character varying[] +) + RETURNS procrastinate_jobs + LANGUAGE plpgsql +AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + WHERE + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE + jobs.lock IS NOT NULL + AND earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 2e74f7f4a..d724fd544 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -5,7 +5,7 @@ -- defer_job -- -- Create and enqueue a job -SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id; +SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(priority)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id; -- defer_periodic_job -- -- Create a periodic job if it doesn't already exist, and delete periodic metadata @@ -14,12 +14,12 @@ SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s, -- fetch_job -- -- Get the first awaiting job -SELECT id, status, task_name, lock, queueing_lock, args, scheduled_at, queue_name, attempts +SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts FROM procrastinate_fetch_job(%(queues)s); -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago -SELECT job.id, status, task_name, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at +SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at FROM procrastinate_jobs job JOIN procrastinate_events event ON event.job_id = job.id @@ -71,6 +71,7 @@ SELECT count(*) AS count, status FROM procrastinate_jobs GROUP BY status; SELECT id, queue_name, task_name, + priority, lock, queueing_lock, args, diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index b7dcc929b..ddd1db71c 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -27,6 +27,7 @@ CREATE TABLE procrastinate_jobs ( id bigserial PRIMARY KEY, queue_name character varying(128) NOT NULL, task_name character varying(128) NOT NULL, + priority integer DEFAULT 0 NOT NULL, lock text, queueing_lock text, args jsonb DEFAULT '{}' NOT NULL, @@ -51,7 +52,7 @@ CREATE TABLE procrastinate_events ( at timestamp with time zone DEFAULT NOW() NULL ); --- Contraints & Indices +-- Constraints & Indices -- this prevents from having several jobs with the same queueing lock in the "todo" state CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; @@ -71,6 +72,7 @@ CREATE INDEX procrastinate_periodic_defers_job_id_fkey ON procrastinate_periodic CREATE FUNCTION procrastinate_defer_job( queue_name character varying, task_name character varying, + priority integer, lock text, queueing_lock text, args jsonb, @@ -82,8 +84,8 @@ AS $$ DECLARE job_id bigint; BEGIN - INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at) - VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at) + INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) + VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) RETURNING id INTO job_id; RETURN job_id; @@ -121,6 +123,7 @@ BEGIN SET job_id = procrastinate_defer_job( _queue_name, _task_name, + 0, _lock, _queueing_lock, _args, @@ -171,7 +174,7 @@ BEGIN AND jobs.status = 'todo' AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) - ORDER BY jobs.id ASC LIMIT 1 + ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 FOR UPDATE OF jobs SKIP LOCKED ) UPDATE procrastinate_jobs diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 4c52467a9..0b74237c7 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -34,6 +34,7 @@ class ConfigureTaskOptions(TypedDict): schedule_at: NotRequired[datetime.datetime | None] schedule_in: NotRequired[TimeDeltaParams | None] queue: NotRequired[str | None] + priority: NotRequired[int | None] def configure_task( @@ -44,6 +45,7 @@ def configure_task( ) -> jobs.JobDeferrer: schedule_at = options.get("schedule_at") schedule_in = options.get("schedule_in") + priority = options.get("priority") if schedule_at and schedule_in is not None: raise ValueError("Cannot set both schedule_at and schedule_in") @@ -51,6 +53,9 @@ def configure_task( if schedule_in is not None: schedule_at = utils.utcnow() + datetime.timedelta(**schedule_in) + if priority is None: + priority = jobs.DEFAULT_PRIORITY + task_kwargs = options.get("task_kwargs") or {} return jobs.JobDeferrer( job=jobs.Job( @@ -61,6 +66,7 @@ def configure_task( queue=options.get("queue") or jobs.DEFAULT_QUEUE, task_kwargs=task_kwargs, scheduled_at=schedule_at, + priority=priority, ), job_manager=job_manager, ) @@ -179,9 +185,11 @@ def configure(self, **options: Unpack[ConfigureTaskOptions]) -> jobs.JobDeferrer Converted to schedule_at internally. See `python timedelta documentation `__ (incompatible with schedule_at) - queue : By setting a queue on the job launch, you override the task default queue + priority : + Set the priority of the job as an integer. Jobs with higher priority + are run first. The default priority is 0. Returns ------- @@ -201,6 +209,7 @@ def configure(self, **options: Unpack[ConfigureTaskOptions]) -> jobs.JobDeferrer schedule_at = options.get("schedule_at") schedule_in = options.get("schedule_in") queue = options.get("queue") + priority = options.get("priority") app = cast(app_module.App, self.blueprint) return configure_task( @@ -214,6 +223,7 @@ def configure(self, **options: Unpack[ConfigureTaskOptions]) -> jobs.JobDeferrer schedule_at=schedule_at, schedule_in=schedule_in, queue=queue if queue is not None else self.queue, + priority=priority, ) def get_retry_exception( diff --git a/procrastinate/testing.py b/procrastinate/testing.py index 668171ac8..d4b9a66b0 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -108,6 +108,7 @@ async def close_async(self) -> None: def defer_job_one( self, task_name: str, + priority: int, lock: str | None, queueing_lock: str | None, args: types.JSONDict, @@ -130,6 +131,7 @@ def defer_job_one( "id": id, "queue_name": queue, "task_name": task_name, + "priority": priority, "lock": lock, "queueing_lock": queueing_lock, "args": args, @@ -166,6 +168,7 @@ def defer_periodic_job_one( return self.defer_job_one( task_name=task_name, queue=queue, + priority=0, lock=lock, queueing_lock=queueing_lock, args=args, diff --git a/tests/integration/contrib/django/test_models.py b/tests/integration/contrib/django/test_models.py index 3e073862c..f3fe8ba4a 100644 --- a/tests/integration/contrib/django/test_models.py +++ b/tests/integration/contrib/django/test_models.py @@ -20,6 +20,7 @@ def test_procrastinate_job(db): "id": job_id, "queue_name": "default", "task_name": "test_task", + "priority": 0, "lock": None, "args": {"a": 1, "b": 2}, "status": "todo", @@ -39,6 +40,7 @@ def test_procrastinate_job__create__with_setting(db, settings): assert models.ProcrastinateJob.objects.create( task_name="test_task", queue_name="foo", + priority=0, lock="bar", args={"a": 1, "b": 2}, status="todo", diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index ae1b0f400..2b90008f1 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -166,6 +166,7 @@ def mytask(a): "scheduled_at": None, "status": "todo", "task_name": "hello", + "priority": 0, } } @@ -195,6 +196,7 @@ def mytask(a): ), "status": "todo", "task_name": "hello", + "priority": 0, } } @@ -223,6 +225,7 @@ def mytask(a): "queue_name": "default", "status": "todo", "task_name": "hello", + "priority": 0, } assert ( now + datetime.timedelta(seconds=9) @@ -286,6 +289,7 @@ async def test_defer_unknown(entrypoint, cli_app, connector): "scheduled_at": None, "status": "todo", "task_name": "hello", + "priority": 0, } } diff --git a/tests/integration/test_psycopg_connector.py b/tests/integration/test_psycopg_connector.py index 4ef4c47a2..ae7be376a 100644 --- a/tests/integration/test_psycopg_connector.py +++ b/tests/integration/test_psycopg_connector.py @@ -100,13 +100,13 @@ async def test_execute_query(psycopg_connector): async def test_wrap_exceptions(psycopg_connector): await psycopg_connector.execute_query_async( """SELECT procrastinate_defer_job( - 'queue', 'foo', NULL, 'lock', '{}', NULL + 'queue', 'foo', 0, NULL, 'lock', '{}', NULL ) AS id;""" ) with pytest.raises(exceptions.UniqueViolation): await psycopg_connector.execute_query_async( """SELECT procrastinate_defer_job( - 'queue', 'foo', NULL, 'lock', '{}', NULL + 'queue', 'foo', 0, NULL, 'lock', '{}', NULL ) AS id;""" ) diff --git a/tests/unit/test_app.py b/tests/unit/test_app.py index 129c5c184..18f1cf9e6 100644 --- a/tests/unit/test_app.py +++ b/tests/unit/test_app.py @@ -117,6 +117,7 @@ def test_app_configure_task(app: app_module.App): queue="marsupilami", lock="sher", schedule_at=scheduled_at, + priority=7, task_kwargs={"a": 1}, ).job @@ -124,6 +125,7 @@ def test_app_configure_task(app: app_module.App): assert job.queue == "marsupilami" assert job.lock == "sher" assert job.scheduled_at == scheduled_at + assert job.priority == 7 assert job.task_kwargs == {"a": 1} @@ -134,17 +136,22 @@ def my_name(a): scheduled_at = conftest.aware_datetime(2000, 1, 1) job = app.configure_task( - name="my_name", lock="sher", schedule_at=scheduled_at, task_kwargs={"a": 1} + name="my_name", + lock="sher", + schedule_at=scheduled_at, + priority=7, + task_kwargs={"a": 1}, ).job assert job.task_name == "my_name" assert job.queue == "bla" assert job.lock == "sher" assert job.scheduled_at == scheduled_at + assert job.priority == 7 assert job.task_kwargs == {"a": 1} -def test_app_configure_task_unkown_not_allowed(app: app_module.App): +def test_app_configure_task_unknown_not_allowed(app: app_module.App): with pytest.raises(exceptions.TaskNotFound): app.configure_task(name="my_name", allow_unknown=False) diff --git a/tests/unit/test_jobs.py b/tests/unit/test_jobs.py index cdca08ba6..fc12bf5d6 100644 --- a/tests/unit/test_jobs.py +++ b/tests/unit/test_jobs.py @@ -33,6 +33,7 @@ def test_job_get_context(job_factory, scheduled_at, context_scheduled_at): "id": 12, "status": "doing", "queue": "marsupilami", + "priority": 0, "lock": "sher", "queueing_lock": "houba", "task_name": "mytask", @@ -72,6 +73,7 @@ async def test_job_deferrer_defer_async(job_factory, job_manager, connector): "lock": "sher", "queueing_lock": "houba", "queue_name": "marsupilami", + "priority": 0, "scheduled_at": None, "status": "todo", "task_name": "mytask", diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index c2f29f16e..4c7fb48d7 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -27,6 +27,7 @@ async def test_manager_defer_job(job_manager, job_factory, connector): "lock": "sher", "queueing_lock": None, "queue_name": "marsupilami", + "priority": 0, "scheduled_at": None, "status": "todo", "task_name": "bla", diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index 9a9a1e397..9f9c68a0b 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -24,6 +24,7 @@ def test_EOF(shell): def test_list_jobs(shell, connector, capsys): connector.defer_job_one( "task1", + 0, "lock1", "queueing_lock1", {}, @@ -32,6 +33,7 @@ def test_list_jobs(shell, connector, capsys): ) connector.defer_job_one( "task2", + 0, "lock2", "queueing_lock2", {}, @@ -63,6 +65,7 @@ def test_list_jobs(shell, connector, capsys): def test_list_jobs_filters(shell, connector, capsys): connector.defer_job_one( "task1", + 0, "lock1", "queueing_lock1", {}, @@ -71,6 +74,7 @@ def test_list_jobs_filters(shell, connector, capsys): ) connector.defer_job_one( "task2", + 0, "lock2", "queueing_lock2", {}, @@ -101,6 +105,7 @@ def test_list_jobs_filters(shell, connector, capsys): def test_list_jobs_details(shell, connector, capsys): connector.defer_job_one( "task1", + 0, "lock1", "queueing_lock1", {"x": 11}, @@ -109,6 +114,7 @@ def test_list_jobs_details(shell, connector, capsys): ) connector.defer_job_one( "task2", + 0, "lock2", "queueing_lock2", {"y": 22}, @@ -133,8 +139,8 @@ def test_list_jobs_empty(shell, connector, capsys): def test_list_queues(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_queues("") captured = capsys.readouterr() @@ -151,8 +157,8 @@ def test_list_queues(shell, connector, capsys): def test_list_queues_filters(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_queues("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() @@ -179,8 +185,8 @@ def test_list_queues_empty(shell, connector, capsys): def test_list_tasks(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_tasks("") captured = capsys.readouterr() @@ -197,8 +203,8 @@ def test_list_tasks(shell, connector, capsys): def test_list_tasks_filters(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_tasks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() @@ -225,8 +231,8 @@ def test_list_tasks_empty(shell, connector, capsys): def test_list_locks(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_locks("") captured = capsys.readouterr() @@ -243,8 +249,8 @@ def test_list_locks(shell, connector, capsys): def test_list_locks_filters(shell, connector, capsys): - connector.defer_job_one("task1", "lock1", "queueing_lock1", {}, 0, "queue1") - connector.defer_job_one("task2", "lock2", "queueing_lock2", {}, 0, "queue2") + connector.defer_job_one("task1", 0, "lock1", "queueing_lock1", {}, 0, "queue1") + connector.defer_job_one("task2", 0, "lock2", "queueing_lock2", {}, 0, "queue2") shell.do_list_locks("queue=queue2 task=task2 lock=lock2 status=todo") captured = capsys.readouterr() @@ -273,6 +279,7 @@ def test_list_locks_empty(shell, connector, capsys): def test_retry(shell, connector, capsys): connector.defer_job_one( "task", + 0, "lock", "queueing_lock", {}, @@ -293,6 +300,7 @@ def test_retry(shell, connector, capsys): def test_cancel(shell, connector, capsys): connector.defer_job_one( "task", + 0, "lock", "queueing_lock", {}, diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index c78a3cb23..34fcf807a 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -30,6 +30,7 @@ async def test_task_defer_async(app: App, connector): 1: { "id": 1, "queue_name": "queue", + "priority": 0, "task_name": "tests.unit.test_tasks.task_func", "lock": lock, "queueing_lock": None, diff --git a/tests/unit/test_testing.py b/tests/unit/test_testing.py index 0fa2f83b9..e05792bda 100644 --- a/tests/unit/test_testing.py +++ b/tests/unit/test_testing.py @@ -60,6 +60,7 @@ def test_make_dynamic_query(connector): def test_defer_job_one(connector): job = connector.defer_job_one( task_name="mytask", + priority=0, lock="sher", queueing_lock="houba", args={"a": "b"}, @@ -71,6 +72,7 @@ def test_defer_job_one(connector): 1: { "id": 1, "queue_name": "marsupilami", + "priority": 0, "task_name": "mytask", "lock": "sher", "queueing_lock": "houba", @@ -86,6 +88,7 @@ def test_defer_job_one(connector): def test_defer_job_one_multiple_times(connector): connector.defer_job_one( task_name="mytask", + priority=0, lock=None, queueing_lock=None, args={}, @@ -94,6 +97,7 @@ def test_defer_job_one_multiple_times(connector): ) connector.defer_job_one( task_name="mytask", + priority=0, lock=None, queueing_lock=None, args={}, @@ -108,6 +112,7 @@ def test_defer_same_job_with_queueing_lock_second_time_after_first_one_succeeded ): job_data = { "task_name": "mytask", + "priority": 0, "lock": None, "queueing_lock": "some-lock", "args": {}, @@ -240,6 +245,7 @@ def test_fetch_job_one(connector): # This one will be selected, then skipped the second time because it's processing connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=None, @@ -250,6 +256,7 @@ def test_fetch_job_one(connector): # This one because it's the wrong queue connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="other_queue", scheduled_at=None, @@ -259,6 +266,7 @@ def test_fetch_job_one(connector): # This one because of the scheduled_at connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=conftest.aware_datetime(2100, 1, 1), @@ -268,6 +276,7 @@ def test_fetch_job_one(connector): # This one because of the lock connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=None, @@ -277,6 +286,7 @@ def test_fetch_job_one(connector): # We're taking this one. connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=None, @@ -292,6 +302,7 @@ def test_fetch_job_one_none_lock(connector): """Testing that 2 jobs with locks "None" don't block one another""" connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="default", scheduled_at=None, @@ -300,6 +311,7 @@ def test_fetch_job_one_none_lock(connector): ) connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="default", scheduled_at=None, @@ -314,6 +326,7 @@ def test_fetch_job_one_none_lock(connector): def test_finish_job_run(connector): connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=None, @@ -332,6 +345,7 @@ def test_finish_job_run(connector): def test_retry_job_run(connector): connector.defer_job_one( task_name="mytask", + priority=0, args={}, queue="marsupilami", scheduled_at=None, @@ -367,6 +381,7 @@ async def test_defer_no_notify(connector): await connector.listen_notify(event=event, channels="some_other_channel") connector.defer_job_one( task_name="foo", + priority=0, lock="bar", args={}, scheduled_at=None, From da927d6dbd03fcb4c55f216a36c8cdc4b612fc35 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 3 Jun 2024 22:04:51 +0000 Subject: [PATCH 02/16] Allow to specify priority from the command line --- procrastinate/cli.py | 9 +++++++++ tests/unit/test_cli.py | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/procrastinate/cli.py b/procrastinate/cli.py index deadb5241..d1a3c41f9 100644 --- a/procrastinate/cli.py +++ b/procrastinate/cli.py @@ -408,6 +408,15 @@ def configure_defer_parser(subparsers: argparse._SubParsersAction): help="Number of seconds after which to launch the job", envvar="DEFER_IN", ) + add_argument( + defer_parser, + "--priority", + default=argparse.SUPPRESS, + dest="priority", + type=int, + help="Job priority. Defaults to 0", + envvar="DEFER_PRIORITY", + ) add_argument( defer_parser, "--unknown", diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 55851a18c..939940fa3 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -73,6 +73,13 @@ def test_main(mocker): "schedule_in": {"seconds": 3600}, }, ), + ( + ["defer", "x", "--priority", "5"], + { + "command": "defer", + "priority": 5, + }, + ), ( ["schema"], { From 56db611c66a2023e1185161346087f5957d4cc10 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 4 Jun 2024 18:09:39 +0000 Subject: [PATCH 03/16] Add priority to shell print job --- procrastinate/shell.py | 2 +- tests/acceptance/test_shell.py | 2 +- tests/unit/test_shell.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/procrastinate/shell.py b/procrastinate/shell.py index a34f7b7d9..ac6eb4994 100644 --- a/procrastinate/shell.py +++ b/procrastinate/shell.py @@ -18,7 +18,7 @@ def print_job(job: jobs.Job, details: bool = False) -> None: msg += f"- [{job_dict['status']}]" if details: msg += ( - f" (attempts={job_dict['attempts']}, " + f" (attempts={job_dict['attempts']}, priority={job_dict['priority']}, " f"scheduled_at={job_dict['scheduled_at']}, args={job_dict['task_kwargs']}, " f"lock={job_dict['lock']})" ) diff --git a/tests/acceptance/test_shell.py b/tests/acceptance/test_shell.py index 6da19bbe9..9ac5d8dca 100644 --- a/tests/acceptance/test_shell.py +++ b/tests/acceptance/test_shell.py @@ -87,7 +87,7 @@ async def test_shell(read, write, defer): await write("list_jobs queue=other details") assert await read() == [ - "#3 ns:tests.acceptance.app.sum_task on other - [failed] (attempts=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", + "#3 ns:tests.acceptance.app.sum_task on other - [failed] (attempts=0, priority=0, scheduled_at=None, args={'a': 1, 'b': 2}, lock=lock)", ] await write("list_queues") diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index 9f9c68a0b..cb7a5146d 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -105,7 +105,7 @@ def test_list_jobs_filters(shell, connector, capsys): def test_list_jobs_details(shell, connector, capsys): connector.defer_job_one( "task1", - 0, + 5, "lock1", "queueing_lock1", {"x": 11}, @@ -114,7 +114,7 @@ def test_list_jobs_details(shell, connector, capsys): ) connector.defer_job_one( "task2", - 0, + 7, "lock2", "queueing_lock2", {"y": 22}, @@ -125,9 +125,9 @@ def test_list_jobs_details(shell, connector, capsys): shell.do_list_jobs("details") captured = capsys.readouterr() assert captured.out.splitlines() == [ - "#1 task1 on queue1 - [todo] (attempts=0, scheduled_at=1000-01-01 " + "#1 task1 on queue1 - [todo] (attempts=0, priority=5, scheduled_at=1000-01-01 " "00:00:00+00:00, args={'x': 11}, lock=lock1)", - "#2 task2 on queue2 - [todo] (attempts=0, scheduled_at=2000-01-01 " + "#2 task2 on queue2 - [todo] (attempts=0, priority=7, scheduled_at=2000-01-01 " "00:00:00+00:00, args={'y': 22}, lock=lock2)", ] From b9f6dd0f2ea10b94b903e03ea071ade4e14ec7d7 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 4 Jun 2024 18:14:52 +0000 Subject: [PATCH 04/16] Add more tests --- procrastinate/testing.py | 17 ++++++++++++----- tests/acceptance/test_nominal.py | 13 +++++++++++++ tests/integration/test_cli.py | 25 +++++++++++++++++++++++++ tests/unit/test_cli.py | 4 ++++ tests/unit/test_jobs.py | 6 ++++-- tests/unit/test_manager.py | 8 ++++++-- tests/unit/test_tasks.py | 6 ++++++ tests/unit/test_testing.py | 31 +++++++++++++++++++++++++++++-- 8 files changed, 99 insertions(+), 11 deletions(-) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index d4b9a66b0..9466197a8 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -192,17 +192,24 @@ def finished_jobs(self) -> list[JobRow]: def fetch_job_one(self, queues: Iterable[str] | None) -> dict: # Creating a copy of the iterable so that we can modify it while we iterate - for job in self.jobs.values(): + filtered_jobs = [ + job + for job in self.jobs.values() if ( job["status"] == "todo" and (queues is None or job["queue_name"] in queues) and (not job["scheduled_at"] or job["scheduled_at"] <= utils.utcnow()) and job["lock"] not in self.current_locks - ): - job["status"] = "doing" - self.events[job["id"]].append({"type": "started", "at": utils.utcnow()}) + ) + ] + + filtered_jobs.sort(key=lambda job: job["priority"], reverse=True) - return job + if filtered_jobs: + job = filtered_jobs[0] + job["status"] = "doing" + self.events[job["id"]].append({"type": "started", "at": utils.utcnow()}) + return job return {"id": None} diff --git a/tests/acceptance/test_nominal.py b/tests/acceptance/test_nominal.py index 149436c9e..c4ccb927a 100644 --- a/tests/acceptance/test_nominal.py +++ b/tests/acceptance/test_nominal.py @@ -88,6 +88,19 @@ def test_nominal(defer, worker, app): assert stderr.count(expected_log) == 2 +@pytest.mark.parametrize("app", ["app", "app_aiopg"]) +def test_priority(defer, worker, app): + defer("sum_task", ["--priority", "5"], a=5, b=7) + defer("sum_task", ["--priority", "7"], a=1, b=3) + defer("sum_task", ["--priority", "6"], a=2, b=6) + + stdout, stderr = worker(app=app) + print(stdout, stderr) + + assert stdout.splitlines() == ["4", "8", "12"] + assert stderr.startswith("DEBUG:procrastinate.") + + def test_lock(defer, running_worker): """ In this test, we launch 2 workers in two parallel threads, and ask them diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index 2b90008f1..b54817382 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -171,6 +171,31 @@ def mytask(a): } +async def test_defer_priority(entrypoint, cli_app, connector): + @cli_app.task(name="hello") + def mytask(a): + pass + + result = await entrypoint("""defer --lock=sherlock --priority=5 hello {"a":1}""") + + assert "Launching a job: hello(a=1)\n" in result.stderr + assert result.exit_code == 0 + assert connector.jobs == { + 1: { + "args": {"a": 1}, + "attempts": 0, + "id": 1, + "lock": "sherlock", + "queueing_lock": None, + "queue_name": "default", + "scheduled_at": None, + "status": "todo", + "task_name": "hello", + "priority": 5, + } + } + + async def test_defer_at(entrypoint, cli_app, connector): @cli_app.task(name="hello") def mytask(a): diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 939940fa3..296df7f4a 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -176,6 +176,10 @@ async def healthchecks(app): ["defer", "--at", "2023-01-01", "--in", "12"], "argument --in: not allowed with argument --at", ), + ( + ["defer", "--priority", "foo"], + "argument --priority: invalid int value: 'foo'", + ), ], ) def test_parser__error(input, error, capsys): diff --git a/tests/unit/test_jobs.py b/tests/unit/test_jobs.py index fc12bf5d6..6c776772b 100644 --- a/tests/unit/test_jobs.py +++ b/tests/unit/test_jobs.py @@ -21,6 +21,7 @@ def test_job_get_context(job_factory, scheduled_at, context_scheduled_at): id=12, status="doing", queue="marsupilami", + priority=5, lock="sher", queueing_lock="houba", task_name="mytask", @@ -33,7 +34,7 @@ def test_job_get_context(job_factory, scheduled_at, context_scheduled_at): "id": 12, "status": "doing", "queue": "marsupilami", - "priority": 0, + "priority": 5, "lock": "sher", "queueing_lock": "houba", "task_name": "mytask", @@ -55,6 +56,7 @@ async def test_job_deferrer_defer_async(job_factory, job_manager, connector): job = job_factory( queue="marsupilami", lock="sher", + priority=5, queueing_lock="houba", task_name="mytask", task_kwargs={"a": "b"}, @@ -73,7 +75,7 @@ async def test_job_deferrer_defer_async(job_factory, job_manager, connector): "lock": "sher", "queueing_lock": "houba", "queue_name": "marsupilami", - "priority": 0, + "priority": 5, "scheduled_at": None, "status": "todo", "task_name": "mytask", diff --git a/tests/unit/test_manager.py b/tests/unit/test_manager.py index 4c7fb48d7..2d0230547 100644 --- a/tests/unit/test_manager.py +++ b/tests/unit/test_manager.py @@ -13,7 +13,11 @@ async def test_manager_defer_job(job_manager, job_factory, connector): job = await job_manager.defer_job_async( job=job_factory( - task_kwargs={"a": "b"}, queue="marsupilami", task_name="bla", lock="sher" + task_kwargs={"a": "b"}, + queue="marsupilami", + task_name="bla", + priority=5, + lock="sher", ) ) @@ -27,7 +31,7 @@ async def test_manager_defer_job(job_manager, job_factory, connector): "lock": "sher", "queueing_lock": None, "queue_name": "marsupilami", - "priority": 0, + "priority": 5, "scheduled_at": None, "status": "todo", "task_name": "bla", diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index 34fcf807a..06474659f 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -51,6 +51,12 @@ def test_configure_task(job_manager): assert job.task_kwargs == {"yay": "ho"} +def test_configure_task_priority(job_manager): + job = tasks.configure_task(name="my_name", job_manager=job_manager, priority=7).job + + assert job.priority == 7 + + def test_configure_task_schedule_at(job_manager): job = tasks.configure_task( name="my_name", diff --git a/tests/unit/test_testing.py b/tests/unit/test_testing.py index e05792bda..6262feb82 100644 --- a/tests/unit/test_testing.py +++ b/tests/unit/test_testing.py @@ -60,7 +60,7 @@ def test_make_dynamic_query(connector): def test_defer_job_one(connector): job = connector.defer_job_one( task_name="mytask", - priority=0, + priority=5, lock="sher", queueing_lock="houba", args={"a": "b"}, @@ -72,7 +72,7 @@ def test_defer_job_one(connector): 1: { "id": 1, "queue_name": "marsupilami", - "priority": 0, + "priority": 5, "task_name": "mytask", "lock": "sher", "queueing_lock": "houba", @@ -298,6 +298,33 @@ def test_fetch_job_one(connector): assert connector.fetch_job_one(queues=["marsupilami"])["id"] == 5 +def test_fetch_job_one_prioritized(connector): + # This one will be selected second as it has a lower priority + connector.defer_job_one( + task_name="mytask", + priority=5, + args={}, + queue="marsupilami", + scheduled_at=None, + lock=None, + queueing_lock=None, + ) + + # This one will be selected first as it has a higher priority + connector.defer_job_one( + task_name="mytask", + priority=7, + args={}, + queue="marsupilami", + scheduled_at=None, + lock=None, + queueing_lock=None, + ) + + assert connector.fetch_job_one(queues=None)["id"] == 2 + assert connector.fetch_job_one(queues=None)["id"] == 1 + + def test_fetch_job_one_none_lock(connector): """Testing that 2 jobs with locks "None" don't block one another""" connector.defer_job_one( From 1e68a44397adec063270970a45fadf5914c2df8a Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 4 Jun 2024 19:41:48 +0000 Subject: [PATCH 05/16] Add some documentation --- docs/howto/advanced.md | 1 + docs/howto/advanced/priorities.md | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 docs/howto/advanced/priorities.md diff --git a/docs/howto/advanced.md b/docs/howto/advanced.md index a07561eef..4ea0325f0 100644 --- a/docs/howto/advanced.md +++ b/docs/howto/advanced.md @@ -7,6 +7,7 @@ This section introduces most of Procrastinate's advanced features. advanced/context advanced/locks advanced/schedule +advanced/priorities advanced/queueing_locks advanced/cron advanced/retry diff --git a/docs/howto/advanced/priorities.md b/docs/howto/advanced/priorities.md new file mode 100644 index 000000000..cd18dfb75 --- /dev/null +++ b/docs/howto/advanced/priorities.md @@ -0,0 +1,29 @@ +# Provide a job priority + +We can assign an optional priority to a job. Jobs with higher priority will be +preferred by a worker. Priority is represented as an integer, where a higher +number indicates a higher priority. If no priority is specified, it defaults +to 0. + +## From the code + +Launch a task with a priority of 5: + +```python +my_task.configure(priority=5).defer() +``` + +## From the command line + +```console +$ procrastinate defer --priority=5 path.to.my_task +``` + +:::{warning} +If your setup involves a continuous influx of jobs where workers are +perpetually busy (i.e., jobs are always queuing and workers are never idle), +using priorities could lead to excessive delays in job execution. For example, +if you have a job assigned a priority of -1 while all other jobs have a +priority of 0, the lower priority job might experience significant delays. In +a continuously busy system, this job could potentially take months to execute. +::: From b533dd7ce024cdb646c79053eda3146fff9cea7d Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Tue, 4 Jun 2024 19:56:16 +0000 Subject: [PATCH 06/16] Add an index to improve ordering --- procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql | 2 ++ procrastinate/sql/schema.sql | 2 ++ 2 files changed, 4 insertions(+) diff --git a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql index 6df46dbb1..41c67ef5a 100644 --- a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql +++ b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql @@ -1,5 +1,7 @@ ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL; +CREATE UNIQUE INDEX idx_procrastinate_jobs_priority_id ON procrastinate_jobs (priority, id) WHERE status = 'todo'; + DROP FUNCTION IF EXISTS procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone); CREATE OR REPLACE FUNCTION procrastinate_defer_job( queue_name character varying, diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index ddd1db71c..70002a41b 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -58,6 +58,8 @@ CREATE TABLE procrastinate_events ( CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; -- this prevents from having several jobs with the same lock in the "doing" state CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; +-- improve ordering by priority and id when fetching a job to process +CREATE UNIQUE INDEX idx_procrastinate_jobs_priority_id ON procrastinate_jobs (priority, id) WHERE status = 'todo'; CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); From 20357a3bb508fd15dd161eeafce0f1f93cf6b5e0 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Wed, 5 Jun 2024 17:54:03 +0200 Subject: [PATCH 07/16] Be more specific about the allowed integer --- docs/howto/advanced/priorities.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/howto/advanced/priorities.md b/docs/howto/advanced/priorities.md index cd18dfb75..f4414e28a 100644 --- a/docs/howto/advanced/priorities.md +++ b/docs/howto/advanced/priorities.md @@ -1,9 +1,9 @@ # Provide a job priority We can assign an optional priority to a job. Jobs with higher priority will be -preferred by a worker. Priority is represented as an integer, where a higher -number indicates a higher priority. If no priority is specified, it defaults -to 0. +preferred by a worker. Priority is represented as an (positive or negative) +integer, where a larger number indicates a higher priority. If no priority is +specified, it defaults to 0. ## From the code From 9207e25aca21e912aef72a3358f4e60588015d02 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 8 Jun 2024 16:02:39 +0000 Subject: [PATCH 08/16] Revert changes in static migrations --- procrastinate/contrib/django/static_migrations.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/procrastinate/contrib/django/static_migrations.py b/procrastinate/contrib/django/static_migrations.py index 0e3d24057..21c591dbc 100644 --- a/procrastinate/contrib/django/static_migrations.py +++ b/procrastinate/contrib/django/static_migrations.py @@ -11,7 +11,7 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ("procrastinate", "0025_add_job_priority"), + ("procrastinate", "0024_job_id_bigint"), ] operations = [ @@ -51,7 +51,6 @@ class Migration(migrations.Migration): ("id", models.BigAutoField(primary_key=True, serialize=False)), ("queue_name", models.CharField(max_length=128)), ("task_name", models.CharField(max_length=128)), - ("priority", models.IntegerField()), ("lock", models.TextField(blank=True, null=True, unique=True)), ("args", models.JSONField()), ( @@ -95,4 +94,4 @@ class Migration(migrations.Migration): ] -static_migrations["0026_add_models"] = Migration +static_migrations["0025_add_models"] = Migration From ca603274018315d51972be5a48a8f109a62e3fa9 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 8 Jun 2024 16:31:49 +0000 Subject: [PATCH 09/16] Add job priority migration to Django --- .../django/migrations/0026_add_job_priority.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 procrastinate/contrib/django/migrations/0026_add_job_priority.py diff --git a/procrastinate/contrib/django/migrations/0026_add_job_priority.py b/procrastinate/contrib/django/migrations/0026_add_job_priority.py new file mode 100644 index 000000000..76d57bf95 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0026_add_job_priority.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from django.db import migrations + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL(name="02.00.03_01_add_job_priority.sql") + ] + name = "0026_add_job_priority" + dependencies = [("procrastinate", "0025_add_models")] From 8878678363a183d7edc57b17e9e4af661aed616f Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 8 Jun 2024 17:56:50 +0000 Subject: [PATCH 10/16] Improve documentation, help and docstring --- docs/howto/advanced/priorities.md | 6 ++++++ procrastinate/cli.py | 3 ++- procrastinate/tasks.py | 3 ++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/howto/advanced/priorities.md b/docs/howto/advanced/priorities.md index f4414e28a..3f5a6398f 100644 --- a/docs/howto/advanced/priorities.md +++ b/docs/howto/advanced/priorities.md @@ -5,6 +5,12 @@ preferred by a worker. Priority is represented as an (positive or negative) integer, where a larger number indicates a higher priority. If no priority is specified, it defaults to 0. +Priority is used as a way to order available jobs. If a procrastinate worker +requests a job, and there is a high-priority job scheduled that is blocked by a +lock, and a low-priority job that is available, the worker will take +the low-priority job. Procrastinate will never wait for a high-priority job to +become available if there are lower-priority jobs already available. + ## From the code Launch a task with a priority of 5: diff --git a/procrastinate/cli.py b/procrastinate/cli.py index d1a3c41f9..a4c33b875 100644 --- a/procrastinate/cli.py +++ b/procrastinate/cli.py @@ -414,7 +414,8 @@ def configure_defer_parser(subparsers: argparse._SubParsersAction): default=argparse.SUPPRESS, dest="priority", type=int, - help="Job priority. Defaults to 0", + help="Job priority. May be positive or negative (higher values indicate jobs " + "that should execute first). Defaults to 0", envvar="DEFER_PRIORITY", ) add_argument( diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 0b74237c7..5e0a25d7a 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -189,7 +189,8 @@ def configure(self, **options: Unpack[ConfigureTaskOptions]) -> jobs.JobDeferrer By setting a queue on the job launch, you override the task default queue priority : Set the priority of the job as an integer. Jobs with higher priority - are run first. The default priority is 0. + are run first. Priority can be positive or negative. The default priority + is 0. Returns ------- From 9cee9a5bc0c5c1b1f4b1440ad115329016ac233c Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 8 Jun 2024 18:20:12 +0000 Subject: [PATCH 11/16] Add priority field to Django model migration --- .../contrib/django/migrations/0026_add_job_priority.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/procrastinate/contrib/django/migrations/0026_add_job_priority.py b/procrastinate/contrib/django/migrations/0026_add_job_priority.py index 76d57bf95..5af536aaf 100644 --- a/procrastinate/contrib/django/migrations/0026_add_job_priority.py +++ b/procrastinate/contrib/django/migrations/0026_add_job_priority.py @@ -1,13 +1,14 @@ from __future__ import annotations -from django.db import migrations +from django.db import migrations, models from .. import migrations_utils class Migration(migrations.Migration): operations = [ - migrations_utils.RunProcrastinateSQL(name="02.00.03_01_add_job_priority.sql") + migrations_utils.RunProcrastinateSQL(name="02.00.03_01_add_job_priority.sql"), + migrations.AddField("procrastinatejob", "priority", models.IntegerField()), ] name = "0026_add_job_priority" dependencies = [("procrastinate", "0025_add_models")] From 2fb4a689708d5d1883a255d0bd56ebf6f2387c44 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sat, 8 Jun 2024 18:34:16 +0000 Subject: [PATCH 12/16] Refactor in-memory connector --- procrastinate/testing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/procrastinate/testing.py b/procrastinate/testing.py index 9466197a8..662072f2c 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -203,15 +203,15 @@ def fetch_job_one(self, queues: Iterable[str] | None) -> dict: ) ] - filtered_jobs.sort(key=lambda job: job["priority"], reverse=True) + filtered_jobs.sort(key=lambda job: (-job["priority"], job["id"])) - if filtered_jobs: - job = filtered_jobs[0] - job["status"] = "doing" - self.events[job["id"]].append({"type": "started", "at": utils.utcnow()}) - return job + if not filtered_jobs: + return {"id": None} - return {"id": None} + job = filtered_jobs[0] + job["status"] = "doing" + self.events[job["id"]].append({"type": "started", "at": utils.utcnow()}) + return job def finish_job_run(self, job_id: int, status: str, delete_job: bool) -> None: if delete_job: From d629d49c8a47708edc516269a92492f7d3d6e217 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sun, 9 Jun 2024 11:08:27 +0000 Subject: [PATCH 13/16] Reuse DEFAULT_PRIORITY constant --- procrastinate/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index 4136610f2..3955713a7 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -73,7 +73,7 @@ class Job: id: int | None = None status: str | None = None queue: str - priority: int = 0 + priority: int = DEFAULT_PRIORITY lock: str | None queueing_lock: str | None task_name: str From fb49c8bb74aa58fc3ff90fdb41920de698b8d7ce Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Fri, 14 Jun 2024 22:36:01 +0000 Subject: [PATCH 14/16] Remove unused index --- procrastinate/sql/schema.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 70002a41b..ddd1db71c 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -58,8 +58,6 @@ CREATE TABLE procrastinate_events ( CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; -- this prevents from having several jobs with the same lock in the "doing" state CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; --- improve ordering by priority and id when fetching a job to process -CREATE UNIQUE INDEX idx_procrastinate_jobs_priority_id ON procrastinate_jobs (priority, id) WHERE status = 'todo'; CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); From 4c515f0c504b1300276a90a28561eed96cd1ef43 Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Sun, 16 Jun 2024 17:08:47 +0000 Subject: [PATCH 15/16] Remove unused index from migration --- procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql index 41c67ef5a..6df46dbb1 100644 --- a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql +++ b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql @@ -1,7 +1,5 @@ ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL; -CREATE UNIQUE INDEX idx_procrastinate_jobs_priority_id ON procrastinate_jobs (priority, id) WHERE status = 'todo'; - DROP FUNCTION IF EXISTS procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone); CREATE OR REPLACE FUNCTION procrastinate_defer_job( queue_name character varying, From 19e5467d57ff35139c0f6b4d0724211607782c6a Mon Sep 17 00:00:00 2001 From: Kai Schlamp Date: Mon, 17 Jun 2024 15:58:04 +0000 Subject: [PATCH 16/16] Keep old procrastinate_defer_job function for compatibility reasons --- .../02.00.03_01_add_job_priority.sql | 1 - procrastinate/sql/schema.sql | 25 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql index 6df46dbb1..2cfb2859d 100644 --- a/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql +++ b/procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql @@ -1,6 +1,5 @@ ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL; -DROP FUNCTION IF EXISTS procrastinate_defer_job(character varying, character varying, text, text, jsonb, timestamp with time zone); CREATE OR REPLACE FUNCTION procrastinate_defer_job( queue_name character varying, task_name character varying, diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index ddd1db71c..6530819b1 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -351,6 +351,31 @@ CREATE TRIGGER procrastinate_trigger_delete_jobs -- Old versions of functions, for backwards compatibility (to be removed -- after 2.0.0) + +-- procrastinate_defer_job +-- the function without the priority argument is kept for compatibility reasons +CREATE FUNCTION procrastinate_defer_job( + queue_name character varying, + task_name character varying, + lock text, + queueing_lock text, + args jsonb, + scheduled_at timestamp with time zone +) + RETURNS bigint + LANGUAGE plpgsql +AS $$ +DECLARE + job_id bigint; +BEGIN + INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at) + VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at) + RETURNING id INTO job_id; + + RETURN job_id; +END; +$$; + -- procrastinate_finish_job CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone, delete_job boolean) RETURNS void