Skip to content

Commit

Permalink
Merge pull request #1076 from procrastinate-org/fix-cast-psycopg2
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jun 17, 2024
2 parents 174e439 + 9b8779d commit ac62da7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 27 deletions.
2 changes: 1 addition & 1 deletion procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ WHERE id IN (
ON job.id = event.job_id
ORDER BY job.id, event.at DESC
) AS job
WHERE job.status = ANY(%(statuses)s)
WHERE job.status = ANY(%(statuses)s::procrastinate_job_status[])
AND (%(queue)s::varchar IS NULL OR job.queue_name = %(queue)s)
AND latest_at < NOW() - (%(nb_hours)s || 'HOUR')::INTERVAL
)
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/contrib/aiopg/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import pytest

from procrastinate.contrib.aiopg import aiopg_connector as aiopg


@pytest.fixture
async def aiopg_connector_factory(connection_params):
connectors = []

async def _(*, open: bool = True, **kwargs):
json_dumps = kwargs.pop("json_dumps", None)
json_loads = kwargs.pop("json_loads", None)
connection_params.update(kwargs)
connector = aiopg.AiopgConnector(
json_dumps=json_dumps, json_loads=json_loads, **connection_params
)
connectors.append(connector)
if open:
await connector.open_async()
return connector

yield _
for connector in connectors:
await connector.close_async()


@pytest.fixture
async def aiopg_connector(aiopg_connector_factory):
return await aiopg_connector_factory()
26 changes: 0 additions & 26 deletions tests/integration/contrib/aiopg/test_aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,6 @@
from procrastinate.contrib.psycopg2 import psycopg2_connector


@pytest.fixture
async def aiopg_connector_factory(connection_params):
connectors = []

async def _(*, open: bool = True, **kwargs):
json_dumps = kwargs.pop("json_dumps", None)
json_loads = kwargs.pop("json_loads", None)
connection_params.update(kwargs)
connector = aiopg.AiopgConnector(
json_dumps=json_dumps, json_loads=json_loads, **connection_params
)
connectors.append(connector)
if open:
await connector.open_async()
return connector

yield _
for connector in connectors:
await connector.close_async()


@pytest.fixture
async def aiopg_connector(aiopg_connector_factory):
return await aiopg_connector_factory()


async def test_adapt_pool_args_on_connect(mocker):
called = []

Expand Down
45 changes: 45 additions & 0 deletions tests/integration/contrib/aiopg/test_aiopg_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

import functools

import pytest

from procrastinate import manager


@pytest.fixture
def pg_job_manager(aiopg_connector):
return manager.JobManager(connector=aiopg_connector)


@pytest.fixture
def get_all(aiopg_connector):
async def f(table, *fields):
return await aiopg_connector.execute_query_all_async(
f"SELECT {', '.join(fields)} FROM {table}"
)

return f


@pytest.fixture
def deferred_job_factory(deferred_job_factory, pg_job_manager):
return functools.partial(deferred_job_factory, job_manager=pg_job_manager)


async def test_delete_old_jobs_job_todo(
get_all,
pg_job_manager,
psycopg_connector,
deferred_job_factory,
):
job = await deferred_job_factory(queue="queue_a")

# We fake its started event timestamp
await psycopg_connector.execute_query_async(
f"UPDATE procrastinate_events SET at=at - INTERVAL '2 hours'"
f"WHERE job_id={job.id}"
)

await pg_job_manager.delete_old_jobs(nb_hours=0)
assert len(await get_all("procrastinate_jobs", "id")) == 1

0 comments on commit ac62da7

Please sign in to comment.