Skip to content

Commit

Permalink
Merge pull request #1300 from procrastinate-org/fix-stalled
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jan 26, 2025
2 parents dee445b + 83ee245 commit 6ba830b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
6 changes: 4 additions & 2 deletions procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at,

-- select_stalled_jobs --
-- Get running jobs that started more than a given time ago
SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at
SELECT job.id, status, task_name, priority, lock, queueing_lock,
args, scheduled_at, queue_name, attempts,
MAX(event.at) AS started_at
FROM procrastinate_jobs job
JOIN procrastinate_events event
ON event.job_id = job.id
WHERE event.type = 'started'
AND job.status = 'doing'
AND event.at < NOW() - (%(nb_seconds)s || 'SECOND')::INTERVAL
AND (%(queue)s::varchar IS NULL OR job.queue_name = %(queue)s)
AND (%(task_name)s::varchar IS NULL OR job.task_name = %(task_name)s)
GROUP BY job.id
HAVING MAX(event.at) < NOW() - (%(nb_seconds)s || 'SECOND')::INTERVAL

-- delete_old_jobs --
-- Delete jobs that have been in a final state for longer than nb_hours
Expand Down
82 changes: 80 additions & 2 deletions tests/integration/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def test_fetch_job_no_result(
{"nb_seconds": 1800, "task_name": "task_1"},
],
)
async def test_get_stalled_jobs_yes(
async def test_get_stalled_jobs__yes(
pg_job_manager, fetched_job_factory, psycopg_connector, filter_args
):
job = await fetched_job_factory(queue="queue_a", task_name="task_1")
Expand All @@ -146,7 +146,7 @@ async def test_get_stalled_jobs_yes(
{"nb_seconds": 1800, "task_name": "task_2"},
],
)
async def test_get_stalled_jobs_no(
async def test_get_stalled_jobs__no(
pg_job_manager, fetched_job_factory, psycopg_connector, filter_args
):
job = await fetched_job_factory(queue="queue_a", task_name="task_1")
Expand All @@ -161,6 +161,84 @@ async def test_get_stalled_jobs_no(
assert result == []


async def test_get_stalled_jobs__retries__no(
pg_job_manager, fetched_job_factory, psycopg_connector
):
job = await fetched_job_factory(queue="queue_a", task_name="task_1")

# We fake previous tries
await psycopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=now() - INTERVAL '1 hour'"
f"WHERE job_id={job.id} AND type='deferred'"
)
await psycopg_connector.execute_query_async(
f"INSERT INTO procrastinate_events (job_id, type, at) VALUES"
f"({job.id}, 'started', now() - INTERVAL '1 hour')"
)
await psycopg_connector.execute_query_async(
f"INSERT INTO procrastinate_events (job_id, type, at) VALUES"
f"({job.id}, 'deferred_for_retry', now() - INTERVAL '59 minutes')"
)
events = await psycopg_connector.execute_query_all_async(
f"SELECT at, type FROM procrastinate_events "
f"WHERE job_id={job.id} ORDER BY at ASC"
)

# Sanity checks: We're in the situation where 1h ago, the job has been deferred,
# started, it failed so it was retried, and it just started again now.
assert [e["type"] for e in events] == [
"deferred",
"started",
"deferred_for_retry",
"started",
]

# It should not be considered stalled
result = await pg_job_manager.get_stalled_jobs(nb_seconds=1800)
assert result == []


async def test_get_stalled_jobs__retries__yes(
pg_job_manager, fetched_job_factory, psycopg_connector
):
job = await fetched_job_factory(queue="queue_a", task_name="task_1")

# We fake previous tries
await psycopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=now() - INTERVAL '1 hour'"
f"WHERE job_id={job.id} AND type='deferred'"
)
await psycopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=now() - INTERVAL '40 minutes'"
f"WHERE job_id={job.id} AND type='started'"
)
await psycopg_connector.execute_query_async(
f"INSERT INTO procrastinate_events (job_id, type, at) VALUES"
f"({job.id}, 'started', now() - INTERVAL '1 hour')"
)
await psycopg_connector.execute_query_async(
f"INSERT INTO procrastinate_events (job_id, type, at) VALUES"
f"({job.id}, 'deferred_for_retry', now() - INTERVAL '59 minutes')"
)
events = await psycopg_connector.execute_query_all_async(
f"SELECT at, type FROM procrastinate_events "
f"WHERE job_id={job.id} ORDER BY at ASC"
)

# Sanity checks: We're in the situation where 1h ago, the job has been deferred,
# started, it failed so it was retried, and it started again 40 minutes ago.
assert [e["type"] for e in events] == [
"deferred",
"started",
"deferred_for_retry",
"started",
]

# It should not be considered stalled
result = await pg_job_manager.get_stalled_jobs(nb_seconds=1800)
assert result == [job]


async def test_delete_old_jobs_job_todo(
get_all,
pg_job_manager,
Expand Down

0 comments on commit 6ba830b

Please sign in to comment.