Skip to content

Commit

Permalink
Merge pull request #1102 from procrastinate-org/testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jul 6, 2024
2 parents 36b9262 + dab00f0 commit ca46449
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 38 deletions.
105 changes: 78 additions & 27 deletions docs/howto/django/tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

## Unit tests

If you want to be able to run your code that uses Procrastinate in a unit test,
you can use the {py:class}`procrastinate.testing.InMemoryConnector`.
There are different kind of tests you might want to do with Procrastinate in
a Django project. One such test is a unit test where you want fast tests that
don't hit the database.

For this, you can use the {py:class}`procrastinate.testing.InMemoryConnector`.

Here's an example pytest fixture you can write in your `conftest.py`:

Expand All @@ -12,44 +15,87 @@ import pytest
from procrastinate import testing
from procrastinate.contrib.django import procrastinate_app

from mypackage.procrastinate import my_task

@pytest.fixture
def in_memory_app(monkeypatch):
app = procrastinate_app.current_app.with_connector(
testing.InMemoryConnector()
)
monkeypatch.setattr(procrastinate_app, "current_app", app)
return app
def app():
in_memory = testing.InMemoryConnector()

# Replace the connector in the current app
# Note that this fixture gives you the app back for convenience, but it's
# the same instance as you'd get with `procrastinate.contrib.django.app`.
with procrastinate_app.current_app.replace_connector(in_memory) as app_with_connector:
yield app_with_connector

def test_my_task(app):
# Run the task
my_task.defer(a=1, b=2)

# Access all the existing jobs
jobs = app.connector.jobs
assert len(jobs) == 1

# Run the jobs
app.run_worker(wait=False)
assert task_side_effect() == "done"

# Reset the in-memory pseudo-database. This usually isn't necessary if
# you make small scoped tests as you'll use a new app fixture for each test
# but it might come in handy.
app.connector.reset()
```

:::{note}
`procrastinate.contrib.django.procrastinate_app.current_app` is not exactly
_documented_ but whatever instance this variable points to is what
will be returned as `procrastinate.contrib.django.app`. It's probably a bad
idea to use this outside of tests.
idea to manipulate this outside of tests.
:::

## Integration tests

You can use Procrastinate normally with `procrastinate.contrib.django.app`
in your tests, though registering new tasks or loading tasks from blueprints
within your tests might lead to test isolation issues. If you need to
do that, you may want to use the trick described above to have a dedicated
app for each relevant test.
Another kind of test, maybe more frequent within Django, would be an
integration test. In this kind of test you would use the database. You can use
Procrastinate normally with `procrastinate.contrib.django.app` in your tests.

You can use the procrastinate models to check if the jobs have been created
as expected.

```python
from procrastinate.contrib.django.models import ProcrastinateJob

def test_my_task(app):
from mypackage.procrastinate import my_task


def test_my_task():
# Run the task
app.defer("my_task", args=(1, 2))
my_task.defer(a=1, b=2)

# Check the job has been created
assert ProcrastinateJob.objects.filter(task_name="my_task").count() == 1
```

:::{note}
Registering a new task on the django procrastinate app within a test will
make this task available even after the test has run. You probably want to
avoid this, for example by creating a new app within each of those tests:

```python
from procrastinate.contrib.django import app

def test_my_task():
new_app = procrastinate_app.ProcrastinateApp(app.connector)

@new_app.task
def my_task(a, b):
return a + b

my_task.defer(a=1, b=2)
...
```

:::

In addition, you can also run a worker in your integration tests. Whether you
use `pytest-django` or Django's `TestCase` subclasses, this requires some
additonal configuration.
Expand All @@ -59,26 +105,29 @@ additonal configuration.
[`TransactionTestCase`]. With `TestCase`, you can't test code within a
transaction with `select_for_update()`. If you use `pytest-django`, use
the equivalent `@pytest.mark.django_db(transaction=True)`.
3. Lastly, setup the worker using `wait=False` and
`install_signal_handlers=False`. `wait=False` means that when all tasks are
executed, the call will return. `install_signal_handlers=False` is optional
and just here to keep the worker from changing the signal callbacks set by
your test runner.
3. Lastly, there are useful arguments to pass to `run_worker`:
- `wait=False` to exit the worker as soon as all jobs are done
- `install_signal_handlers=False` to avoid messing with signals in your
test runner
- `listen_notify=False` to avoid running the listen/notify coroutine which
is probably not needed in tests

[`TransactionTestCase`]: https://docs.djangoproject.com/en/5.0/topics/testing/tools/#transactiontestcase

```python
from procrastinate.contrib.django import app
from django.test import TransactionTestCase

from mypackage.procrastinate import my_task

class TestingTaskClass(TransactionTestCase):
def test_task(self):
# Run tasks
app.defer("my_task", args=(1, 2))
my_task.defer(a=1, b=2)

# Start worker
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=True)
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)

# Check task has been executed
assert ProcrastinateJob.objects.filter(task_name="my_task").status == "succeeded"
Expand All @@ -87,14 +136,16 @@ class TestingTaskClass(TransactionTestCase):
```python
from procrastinate.contrib.django import app

from mypackage.procrastinate import my_task

@pytest.mark.django_db(transaction=True)
def test_task():
# Run tasks
app.defer("my_task", args=(1, 2))
my_task.defer(a=1, b=2)

# Start worker
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=True)
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)

# Check task has been executed
assert ProcrastinateJob.objects.filter(task_name="my_task").status == "succeeded"
Expand All @@ -104,13 +155,13 @@ def test_task():
def worker(transactional_db):
def _():
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=True)
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)
return app
return _

def test_task(worker):
# Run tasks
app.defer("my_task", args=(1, 2))
my_task.defer(a=1, b=2)

# Start worker
worker()
Expand Down
35 changes: 27 additions & 8 deletions docs/howto/production/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,34 @@ controlled way.

To use it, you can do:

```
testing_app = normal_app.with_connector(procrastinate.testing.InMemoryConnector())
```python
from procrastinate import testing
from mypackage.procrastinate import my_app, my_task

@pytest.fixture
def app():
in_memory = testing.InMemoryConnector()

# Replace the connector in the current app
# Note that this fixture gives you the app back for covenience,
# but it's the same instance as `my_app`.
with my_app.replace_connector(in_memory) as app_with_connector:
yield app_with_connector


def test_my_task(app):
my_task.defer(...)

# Run the jobs your tests created, then stop the worker
app.run_worker(wait=False)
# Access all the existing jobs
jobs = app.connector.jobs
assert len(jobs) == 1

# See the jobs created:
print(app.connector.jobs)
# Run the jobs
app.run_worker(wait=False)
assert task_side_effect() == "done"

# Reset the "in-memory pseudo-database" between tests:
app.connector.reset()
# Reset the in-memory pseudo-database. This usually isn't necessary if
# you make small scoped tests as you'll use a new app fixture for each test
# but it might come in handy.
app.connector.reset()
```
34 changes: 33 additions & 1 deletion procrastinate/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import asyncio
import contextlib
import functools
import logging
from typing import TYPE_CHECKING, Any, Iterable
from typing import TYPE_CHECKING, Any, Iterable, Iterator

from procrastinate import blueprints, exceptions, jobs, manager, schema, utils
from procrastinate import connector as connector_module
Expand Down Expand Up @@ -105,6 +106,11 @@ def with_connector(self, connector: connector_module.BaseConnector) -> App:
(and its original connector) will be used, even when the new app's
methods are used.
Parameters
----------
connector :
The new connector to use.
Returns
-------
`App`
Expand All @@ -119,6 +125,32 @@ def with_connector(self, connector: connector_module.BaseConnector) -> App:
app.periodic_registry = self.periodic_registry
return app

@contextlib.contextmanager
def replace_connector(
self, connector: connector_module.BaseConnector
) -> Iterator[App]:
"""
Replace the connector of the app while in the context block, then restore it.
Parameters
----------
connector :
The new connector to use.
Returns
-------
`App`
A new compatible app.
"""
old_connector = self.connector
self.connector = connector
self.job_manager.connector = connector
try:
yield self
finally:
self.connector = old_connector
self.job_manager.connector = old_connector

def _register_builtin_tasks(self) -> None:
from procrastinate import builtin_tasks

Expand Down
1 change: 1 addition & 0 deletions procrastinate/contrib/django/procrastinate_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class FutureApp(blueprints.Blueprint):
"run_worker",
"schema_manager",
"with_connector",
"replace_connector",
"will_configure_task",
]
)
Expand Down
4 changes: 2 additions & 2 deletions procrastinate/signals.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
import signal
import threading
from contextlib import contextmanager
from typing import Any, Callable

logger = logging.getLogger(__name__)
Expand All @@ -23,7 +23,7 @@
# one. And hope that there was no previously set async handler.


@contextmanager
@contextlib.contextmanager
def on_stop(callback: Callable[[], None]):
if threading.current_thread() is not threading.main_thread():
logger.warning(
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,18 @@ def bar():
assert app.worker_defaults == new_app.worker_defaults
assert app.import_paths == new_app.import_paths
assert app.periodic_registry is new_app.periodic_registry


def test_replace_connector(app):
@app.task(name="foo")
def foo():
pass

foo.defer()
assert len(app.connector.jobs) == 1

new_connector = testing.InMemoryConnector()
with app.replace_connector(new_connector):
assert len(app.connector.jobs) == 0

assert len(app.connector.jobs) == 1

0 comments on commit ca46449

Please sign in to comment.