From 4a6d9e810ec9420431d7cc02b5d31a026712d90b Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 25 Jan 2025 17:50:31 +0100 Subject: [PATCH 1/3] Add failing test --- tests/integration/test_manager.py | 40 +++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index 39cf07699..ce2a58cdd 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -123,7 +123,7 @@ async def test_fetch_job_no_result( {"nb_seconds": 1800, "task_name": "task_1"}, ], ) -async def test_get_stalled_jobs_yes( +async def test_get_stalled_jobs__yes( pg_job_manager, fetched_job_factory, psycopg_connector, filter_args ): job = await fetched_job_factory(queue="queue_a", task_name="task_1") @@ -146,7 +146,7 @@ async def test_get_stalled_jobs_yes( {"nb_seconds": 1800, "task_name": "task_2"}, ], ) -async def test_get_stalled_jobs_no( +async def test_get_stalled_jobs__no( pg_job_manager, fetched_job_factory, psycopg_connector, filter_args ): job = await fetched_job_factory(queue="queue_a", task_name="task_1") @@ -161,6 +161,42 @@ async def test_get_stalled_jobs_no( assert result == [] +async def test_get_stalled_jobs__retries( + pg_job_manager, fetched_job_factory, psycopg_connector +): + job = await fetched_job_factory(queue="queue_a", task_name="task_1") + + # We fake previous tries + await psycopg_connector.execute_query_async( + f"UPDATE procrastinate_events SET at=now() - INTERVAL '1 hour'" + f"WHERE job_id={job.id} AND type='deferred'" + ) + await psycopg_connector.execute_query_async( + f"INSERT INTO procrastinate_events (job_id, type, at) VALUES" + f"({job.id}, 'started', now() - INTERVAL '1 hour')" + ) + await psycopg_connector.execute_query_async( + f"INSERT INTO procrastinate_events (job_id, type, at) VALUES" + f"({job.id}, 'deferred_for_retry', now() - INTERVAL '59 minutes')" + ) + events = await psycopg_connector.execute_query_all_async( + f"SELECT at, type FROM procrastinate_events " + f"WHERE job_id={job.id} ORDER BY at ASC" + ) + # Sanity checks: We're in the situation where 1h ago, the job has been deferred, + # started, it failed so it was retried, and it just started again now. + assert [e["type"] for e in events] == [ + "deferred", + "started", + "deferred_for_retry", + "started", + ] + + # It should not be considered stalled + result = await pg_job_manager.get_stalled_jobs(nb_seconds=1800) + assert result == [] + + async def test_delete_old_jobs_job_todo( get_all, pg_job_manager, From 95f9aab9a4f676e2c608f4934601c352d12f0c18 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 25 Jan 2025 17:54:08 +0100 Subject: [PATCH 2/3] Improve readability --- procrastinate/sql/queries.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index ddb1b92c2..414504d41 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -19,7 +19,9 @@ SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, -- select_stalled_jobs -- -- Get running jobs that started more than a given time ago -SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at +SELECT job.id, status, task_name, priority, lock, queueing_lock, + args, scheduled_at, queue_name, attempts, + MAX(event.at) AS started_at FROM procrastinate_jobs job JOIN procrastinate_events event ON event.job_id = job.id From 83ee2459ae69aa4a29c21e66c93eed563a685eb2 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Sat, 25 Jan 2025 18:08:18 +0100 Subject: [PATCH 3/3] Fix select_stalled_jobs --- procrastinate/sql/queries.sql | 2 +- tests/integration/test_manager.py | 44 ++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/procrastinate/sql/queries.sql b/procrastinate/sql/queries.sql index 414504d41..6d69f77f8 100644 --- a/procrastinate/sql/queries.sql +++ b/procrastinate/sql/queries.sql @@ -27,10 +27,10 @@ SELECT job.id, status, task_name, priority, lock, queueing_lock, ON event.job_id = job.id WHERE event.type = 'started' AND job.status = 'doing' - AND event.at < NOW() - (%(nb_seconds)s || 'SECOND')::INTERVAL AND (%(queue)s::varchar IS NULL OR job.queue_name = %(queue)s) AND (%(task_name)s::varchar IS NULL OR job.task_name = %(task_name)s) GROUP BY job.id + HAVING MAX(event.at) < NOW() - (%(nb_seconds)s || 'SECOND')::INTERVAL -- delete_old_jobs -- -- Delete jobs that have been in a final state for longer than nb_hours diff --git a/tests/integration/test_manager.py b/tests/integration/test_manager.py index ce2a58cdd..8b396221b 100644 --- a/tests/integration/test_manager.py +++ b/tests/integration/test_manager.py @@ -161,7 +161,7 @@ async def test_get_stalled_jobs__no( assert result == [] -async def test_get_stalled_jobs__retries( +async def test_get_stalled_jobs__retries__no( pg_job_manager, fetched_job_factory, psycopg_connector ): job = await fetched_job_factory(queue="queue_a", task_name="task_1") @@ -183,6 +183,7 @@ async def test_get_stalled_jobs__retries( f"SELECT at, type FROM procrastinate_events " f"WHERE job_id={job.id} ORDER BY at ASC" ) + # Sanity checks: We're in the situation where 1h ago, the job has been deferred, # started, it failed so it was retried, and it just started again now. assert [e["type"] for e in events] == [ @@ -197,6 +198,47 @@ async def test_get_stalled_jobs__retries( assert result == [] +async def test_get_stalled_jobs__retries__yes( + pg_job_manager, fetched_job_factory, psycopg_connector +): + job = await fetched_job_factory(queue="queue_a", task_name="task_1") + + # We fake previous tries + await psycopg_connector.execute_query_async( + f"UPDATE procrastinate_events SET at=now() - INTERVAL '1 hour'" + f"WHERE job_id={job.id} AND type='deferred'" + ) + await psycopg_connector.execute_query_async( + f"UPDATE procrastinate_events SET at=now() - INTERVAL '40 minutes'" + f"WHERE job_id={job.id} AND type='started'" + ) + await psycopg_connector.execute_query_async( + f"INSERT INTO procrastinate_events (job_id, type, at) VALUES" + f"({job.id}, 'started', now() - INTERVAL '1 hour')" + ) + await psycopg_connector.execute_query_async( + f"INSERT INTO procrastinate_events (job_id, type, at) VALUES" + f"({job.id}, 'deferred_for_retry', now() - INTERVAL '59 minutes')" + ) + events = await psycopg_connector.execute_query_all_async( + f"SELECT at, type FROM procrastinate_events " + f"WHERE job_id={job.id} ORDER BY at ASC" + ) + + # Sanity checks: We're in the situation where 1h ago, the job has been deferred, + # started, it failed so it was retried, and it started again 40 minutes ago. + assert [e["type"] for e in events] == [ + "deferred", + "started", + "deferred_for_retry", + "started", + ] + + # It should not be considered stalled + result = await pg_job_manager.get_stalled_jobs(nb_seconds=1800) + assert result == [job] + + async def test_delete_old_jobs_job_todo( get_all, pg_job_manager,