Skip to content

Commit

Permalink
Merge pull request #1070 from openradx/job-priority
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jun 18, 2024
2 parents ac62da7 + 19e5467 commit a118e70
Show file tree
Hide file tree
Showing 25 changed files with 401 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/howto/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This section introduces most of Procrastinate's advanced features.
advanced/context
advanced/locks
advanced/schedule
advanced/priorities
advanced/queueing_locks
advanced/cron
advanced/retry
Expand Down
35 changes: 35 additions & 0 deletions docs/howto/advanced/priorities.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Provide a job priority

We can assign an optional priority to a job. Jobs with higher priority will be
preferred by a worker. Priority is represented as an (positive or negative)
integer, where a larger number indicates a higher priority. If no priority is
specified, it defaults to 0.

Priority is used as a way to order available jobs. If a procrastinate worker
requests a job, and there is a high-priority job scheduled that is blocked by a
lock, and a low-priority job that is available, the worker will take
the low-priority job. Procrastinate will never wait for a high-priority job to
become available if there are lower-priority jobs already available.

## From the code

Launch a task with a priority of 5:

```python
my_task.configure(priority=5).defer()
```

## From the command line

```console
$ procrastinate defer --priority=5 path.to.my_task
```

:::{warning}
If your setup involves a continuous influx of jobs where workers are
perpetually busy (i.e., jobs are always queuing and workers are never idle),
using priorities could lead to excessive delays in job execution. For example,
if you have a job assigned a priority of -1 while all other jobs have a
priority of 0, the lower priority job might experience significant delays. In
a continuously busy system, this job could potentially take months to execute.
:::
10 changes: 10 additions & 0 deletions procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ def configure_defer_parser(subparsers: argparse._SubParsersAction):
help="Number of seconds after which to launch the job",
envvar="DEFER_IN",
)
add_argument(
defer_parser,
"--priority",
default=argparse.SUPPRESS,
dest="priority",
type=int,
help="Job priority. May be positive or negative (higher values indicate jobs "
"that should execute first). Defaults to 0",
envvar="DEFER_PRIORITY",
)
add_argument(
defer_parser,
"--unknown",
Expand Down
14 changes: 14 additions & 0 deletions procrastinate/contrib/django/migrations/0026_add_job_priority.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

from django.db import migrations, models

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(name="02.00.03_01_add_job_priority.sql"),
migrations.AddField("procrastinatejob", "priority", models.IntegerField()),
]
name = "0026_add_job_priority"
dependencies = [("procrastinate", "0025_add_models")]
1 change: 1 addition & 0 deletions procrastinate/contrib/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ProcrastinateJob(ProcrastinateReadOnlyModelMixin, models.Model):
id = models.BigAutoField(primary_key=True)
queue_name = models.CharField(max_length=128)
task_name = models.CharField(max_length=128)
priority = models.IntegerField()
lock = models.TextField(unique=True, blank=True, null=True)
args = models.JSONField()
status = models.CharField(max_length=32, choices=[(e, e) for e in STATUSES])
Expand Down
5 changes: 5 additions & 0 deletions procrastinate/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


DEFAULT_QUEUE = "default"
DEFAULT_PRIORITY = 0

cached_property = getattr(functools, "cached_property", property)

Expand Down Expand Up @@ -51,6 +52,8 @@ class Job:
Internal id uniquely identifying the job.
status :
Status of the job.
priority :
Priority of the job.
queue :
Queue name the job will be run in.
lock :
Expand All @@ -70,6 +73,7 @@ class Job:
id: int | None = None
status: str | None = None
queue: str
priority: int = DEFAULT_PRIORITY
lock: str | None
queueing_lock: str | None
task_name: str
Expand All @@ -84,6 +88,7 @@ def from_row(cls, row: dict[str, Any]) -> Job:
return cls(
id=row["id"],
status=row["status"],
priority=row["priority"],
lock=row["lock"],
queueing_lock=row["queueing_lock"],
task_name=row["task_name"],
Expand Down
1 change: 1 addition & 0 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _defer_job_query_kwargs(self, job: jobs.Job) -> dict[str, Any]:
"query": sql.queries["defer_job"],
"task_name": job.task_name,
"queue": job.queue,
"priority": job.priority,
"lock": job.lock,
"queueing_lock": job.queueing_lock,
"args": job.task_kwargs,
Expand Down
2 changes: 1 addition & 1 deletion procrastinate/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def print_job(job: jobs.Job, details: bool = False) -> None:
msg += f"- [{job_dict['status']}]"
if details:
msg += (
f" (attempts={job_dict['attempts']}, "
f" (attempts={job_dict['attempts']}, priority={job_dict['priority']}, "
f"scheduled_at={job_dict['scheduled_at']}, args={job_dict['task_kwargs']}, "
f"lock={job_dict['lock']})"
)
Expand Down
121 changes: 121 additions & 0 deletions procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL;

CREATE OR REPLACE FUNCTION procrastinate_defer_job(
queue_name character varying,
task_name character varying,
priority integer,
lock text,
queueing_lock text,
args jsonb,
scheduled_at timestamp with time zone
)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
job_id bigint;
BEGIN
INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
RETURNING id INTO job_id;

RETURN job_id;
END;
$$;

DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, character varying, bigint, jsonb);
CREATE OR REPLACE FUNCTION procrastinate_defer_periodic_job(
_queue_name character varying,
_lock character varying,
_queueing_lock character varying,
_task_name character varying,
_periodic_id character varying,
_defer_timestamp bigint,
_args jsonb
)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
_job_id bigint;
_defer_id bigint;
BEGIN

INSERT
INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp)
VALUES (_task_name, _periodic_id, _defer_timestamp)
ON CONFLICT DO NOTHING
RETURNING id into _defer_id;

IF _defer_id IS NULL THEN
RETURN NULL;
END IF;

UPDATE procrastinate_periodic_defers
SET job_id = procrastinate_defer_job(
_queue_name,
_task_name,
0,
_lock,
_queueing_lock,
_args,
NULL
)
WHERE id = _defer_id
RETURNING job_id INTO _job_id;

DELETE
FROM procrastinate_periodic_defers
USING (
SELECT id
FROM procrastinate_periodic_defers
WHERE procrastinate_periodic_defers.task_name = _task_name
AND procrastinate_periodic_defers.periodic_id = _periodic_id
AND procrastinate_periodic_defers.defer_timestamp < _defer_timestamp
ORDER BY id
FOR UPDATE
) to_delete
WHERE procrastinate_periodic_defers.id = to_delete.id;

RETURN _job_id;
END;
$$;

DROP FUNCTION IF EXISTS procrastinate_fetch_job(character varying[]);
CREATE OR REPLACE FUNCTION procrastinate_fetch_job(
target_queue_names character varying[]
)
RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE
jobs.lock IS NOT NULL
AND earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
$$;
7 changes: 4 additions & 3 deletions procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-- defer_job --
-- Create and enqueue a job
SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id;
SELECT procrastinate_defer_job(%(queue)s, %(task_name)s, %(priority)s, %(lock)s, %(queueing_lock)s, %(args)s, %(scheduled_at)s) AS id;

-- defer_periodic_job --
-- Create a periodic job if it doesn't already exist, and delete periodic metadata
Expand All @@ -14,12 +14,12 @@ SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s,

-- fetch_job --
-- Get the first awaiting job
SELECT id, status, task_name, lock, queueing_lock, args, scheduled_at, queue_name, attempts
SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts
FROM procrastinate_fetch_job(%(queues)s);

-- select_stalled_jobs --
-- Get running jobs that started more than a given time ago
SELECT job.id, status, task_name, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at
SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at
FROM procrastinate_jobs job
JOIN procrastinate_events event
ON event.job_id = job.id
Expand Down Expand Up @@ -71,6 +71,7 @@ SELECT count(*) AS count, status FROM procrastinate_jobs GROUP BY status;
SELECT id,
queue_name,
task_name,
priority,
lock,
queueing_lock,
args,
Expand Down
36 changes: 32 additions & 4 deletions procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CREATE TABLE procrastinate_jobs (
id bigserial PRIMARY KEY,
queue_name character varying(128) NOT NULL,
task_name character varying(128) NOT NULL,
priority integer DEFAULT 0 NOT NULL,
lock text,
queueing_lock text,
args jsonb DEFAULT '{}' NOT NULL,
Expand All @@ -51,7 +52,7 @@ CREATE TABLE procrastinate_events (
at timestamp with time zone DEFAULT NOW() NULL
);

-- Contraints & Indices
-- Constraints & Indices

-- this prevents from having several jobs with the same queueing lock in the "todo" state
CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo';
Expand All @@ -71,6 +72,7 @@ CREATE INDEX procrastinate_periodic_defers_job_id_fkey ON procrastinate_periodic
CREATE FUNCTION procrastinate_defer_job(
queue_name character varying,
task_name character varying,
priority integer,
lock text,
queueing_lock text,
args jsonb,
Expand All @@ -82,8 +84,8 @@ AS $$
DECLARE
job_id bigint;
BEGIN
INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at)
VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at)
INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at)
RETURNING id INTO job_id;

RETURN job_id;
Expand Down Expand Up @@ -121,6 +123,7 @@ BEGIN
SET job_id = procrastinate_defer_job(
_queue_name,
_task_name,
0,
_lock,
_queueing_lock,
_args,
Expand Down Expand Up @@ -171,7 +174,7 @@ BEGIN
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
Expand Down Expand Up @@ -348,6 +351,31 @@ CREATE TRIGGER procrastinate_trigger_delete_jobs

-- Old versions of functions, for backwards compatibility (to be removed
-- after 2.0.0)

-- procrastinate_defer_job
-- the function without the priority argument is kept for compatibility reasons
CREATE FUNCTION procrastinate_defer_job(
queue_name character varying,
task_name character varying,
lock text,
queueing_lock text,
args jsonb,
scheduled_at timestamp with time zone
)
RETURNS bigint
LANGUAGE plpgsql
AS $$
DECLARE
job_id bigint;
BEGIN
INSERT INTO procrastinate_jobs (queue_name, task_name, lock, queueing_lock, args, scheduled_at)
VALUES (queue_name, task_name, lock, queueing_lock, args, scheduled_at)
RETURNING id INTO job_id;

RETURN job_id;
END;
$$;

-- procrastinate_finish_job
CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone, delete_job boolean)
RETURNS void
Expand Down
Loading

0 comments on commit a118e70

Please sign in to comment.