Skip to content

Commit

Permalink
Merge pull request #894 from procrastinate-org/install_signal_handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored Jan 17, 2024
2 parents 65d51ce + 44d4044 commit 2a365ae
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 7 deletions.
67 changes: 67 additions & 0 deletions docs/howto/worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,70 @@ Naming the worker is optional.

On the other hand, `App.run_worker_async` needs to run while the app is open.
The CLI takes care of opening the app.

... Inside an application
~~~~~~~~~~~~~~~~~~~~~~~~~

When running the worker inside a bigger application, you may want to use
``install_signal_handlers=False`` so that the worker doesn't interfere with
your application's signal handlers.

.. note::

When you run the worker as a task, at any point, you can call ``task.cancel()``
to request the worker to gracefully stop at the next opportunity.
You may then wait for it to actually stop using ``await task`` if you're
ready to wait indefinitely, or ``asyncio.wait_for(task, timeout)`` if you
want to set a timeout.

Here is an example FastAPI application that does this:

.. code-block:: python
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from procrastinate import App, PsycopgConnector
logging.basicConfig(level=logging.DEBUG)
task_queue = App(connector=PsycopgConnector())
@task_queue.task
async def sleep(length):
await asyncio.sleep(length)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with task_queue.open_async():
worker = asyncio.create_task(
task_queue.run_worker_async(install_signal_handlers=False)
)
# Set to 100 to test the ungraceful shutdown
await sleep.defer_async(length=5)
print("STARTUP")
yield
print("SHUTDOWN")
worker.cancel()
try:
await asyncio.wait_for(worker, timeout=10)
except asyncio.TimeoutError:
print("Ungraceful shutdown")
except asyncio.CancelledError:
print("Graceful shutdown")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"Hello": "World"}
13 changes: 12 additions & 1 deletion procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,21 @@ async def run_worker_async(self, **kwargs) -> None:
additional_context: ``Optional[Dict[str, Any]]``
If set extend the context received by the tasks when ``pass_context`` is set
to ``True`` in the task definition.
install_signal_handlers: ``bool``
If ``True``, the worker will install signal handlers to gracefully stop the
worker. Use ``False`` if you want to handle signals yourself (e.g. if you
run the work as an async task in a bigger application)
(defaults to ``True``)
"""
self.perform_import_paths()
worker = self._worker(**kwargs)
await worker.run()
task = asyncio.create_task(worker.run())
try:
await asyncio.shield(task)
except asyncio.CancelledError:
worker.stop()
await task
raise

def run_worker(self, **kwargs) -> None:
"""
Expand Down
8 changes: 7 additions & 1 deletion procrastinate/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib
import functools
import inspect
import logging
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(
listen_notify: bool = True,
delete_jobs: str | DeleteJobCondition = DeleteJobCondition.NEVER.value,
additional_context: dict[str, Any] | None = None,
install_signal_handlers: bool = True,
):
self.app = app
self.queues = queues
Expand All @@ -56,6 +58,7 @@ def __init__(
)

self.job_manager = self.app.job_manager
self.install_signal_handlers = install_signal_handlers

if name:
self.logger = logger.getChild(name)
Expand Down Expand Up @@ -115,8 +118,11 @@ async def run(self) -> None:
action="start_worker", queues=self.queues
),
)
context = contextlib.nullcontext()
if self.install_signal_handlers:
context = signals.on_stop(self.stop)

with signals.on_stop(self.stop):
with context:
side_coros = [self.periodic_deferrer()]
if self.wait and self.listen_notify:
side_coros.append(self.listener())
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import contextlib
import signal

import pytest

Expand Down Expand Up @@ -89,3 +90,19 @@ async def test_run_no_listen_notify(app):
finally:
running_worker.stop()
await asyncio.wait_for(task, timeout=0.5)


async def test_run_no_signal_handlers(app, kill_own_pid):
running_worker = worker.Worker(
app=app, queues=["some_queue"], install_signal_handlers=False
)

task = asyncio.ensure_future(running_worker.run())
try:
with pytest.raises(KeyboardInterrupt):
await asyncio.sleep(0.01)
# Test that handlers are NOT installed
kill_own_pid(signal=signal.SIGINT)
finally:
running_worker.stop()
await asyncio.wait_for(task, timeout=0.5)
48 changes: 43 additions & 5 deletions tests/unit/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,50 @@ def test_app_worker(app, mocker):
)


def test_app_run_worker(app, mocker):
run = mocker.patch("procrastinate.worker.Worker.run", return_value=asyncio.Future())
run.return_value.set_result(None)
app.run_worker(queues=["yay"])
def test_app_run_worker(app):
result = []

run.assert_called_once_with()
@app.task
def my_task(a):
result.append(a)

my_task.defer(a=1)

app.run_worker(wait=False)

assert result == [1]


async def test_app_run_worker_async(app):
result = []

@app.task
async def my_task(a):
result.append(a)

await my_task.defer_async(a=1)

await app.run_worker_async(wait=False)

assert result == [1]


async def test_app_run_worker_async_cancel(app):
result = []

@app.task
async def my_task(a):
await asyncio.sleep(0.05)
result.append(a)

task = asyncio.create_task(app.run_worker_async())
await my_task.defer_async(a=1)
await asyncio.sleep(0.01)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(task, timeout=0.1)

assert result == [1]


def test_from_path(mocker):
Expand Down

0 comments on commit 2a365ae

Please sign in to comment.