Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow selecting which celery worker(s) to ping #370

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,26 @@ Using `django.settings` you may exert more fine-grained control over the behavio
- Number
- `3`
- Specifies the maximum total time for a task to complete and return a result, including queue time.


Celery-Ping Health Check
-------------------

Using `django.settings` you may exert more fine-grained control over the behavior of the celery-ping health check

.. list-table:: Additional Settings
:widths: 25 10 10 55
:header-rows: 1

* - Name
- Type
- Default
- Description
* - `HEALTHCHECK_CELERY_PING_TIMEOUT`
- Number
- `1`
- Specifies the maximum total time (in seconds) for which "pong" responses are awaited.
* - `HEALTHCHECK_CELERY_PING_DESTINATION`
- List of Strings
- `None`
- Specifies the list of workers which will receive the "ping" request.
16 changes: 13 additions & 3 deletions health_check/contrib/celery_ping/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ class CeleryPingHealthCheck(BaseHealthCheckBackend):

def check_status(self):
timeout = getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 1)
destination = getattr(settings, "HEALTHCHECK_CELERY_PING_DESTINATION", None)

try:
ping_result = app.control.ping(timeout=timeout)
ping_result = app.control.ping(destination=destination, timeout=timeout)
except IOError as e:
self.add_error(ServiceUnavailable("IOError"), e)
except NotImplementedError as exc:
Expand All @@ -30,9 +31,9 @@ def check_status(self):
ServiceUnavailable("Celery workers unavailable"),
)
else:
self._check_ping_result(ping_result)
self._check_ping_result(ping_result, destination)

def _check_ping_result(self, ping_result):
def _check_ping_result(self, ping_result, destination):
active_workers = []

for result in ping_result:
Expand All @@ -46,6 +47,15 @@ def _check_ping_result(self, ping_result):
continue
active_workers.append(worker)

if destination:
inactive_workers = set(destination) - set(active_workers)
if inactive_workers:
self.add_error(
ServiceUnavailable(
f"Celery workers {inactive_workers} did not respond"
)
)

if not self.errors:
self._check_active_queues(active_workers)

Expand Down
54 changes: 52 additions & 2 deletions tests/test_celery_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pytest
from django.apps import apps
from django.conf import settings

from health_check.contrib.celery_ping.apps import HealthCheckConfig
from health_check.contrib.celery_ping.backends import CeleryPingHealthCheck
Expand All @@ -20,7 +19,9 @@ class TestCeleryPingHealthCheck:
def health_check(self):
return CeleryPingHealthCheck()

def test_check_status_doesnt_add_errors_when_ping_successful(self, health_check):
def test_check_status_doesnt_add_errors_when_ping_successful(
self, health_check, settings
):
celery_worker = "celery@4cc150a7b49b"

with patch(
Expand Down Expand Up @@ -59,6 +60,7 @@ def test_check_status_reports_errors_if_ping_responses_are_incorrect(
def test_check_status_adds_errors_when_ping_successfull_but_not_all_defined_queues_have_consumers(
self,
health_check,
settings,
):
celery_worker = "celery@4cc150a7b49b"
queues = list(settings.CELERY_QUEUES)
Expand Down Expand Up @@ -123,6 +125,54 @@ def test_check_status_add_error_when_ping_result_failed(
assert len(health_check.errors) == 1
assert "workers unavailable" in health_check.errors[0].message.lower()

def test_check_status_reports_errors_if_ping_responses_are_missing(
self,
health_check,
settings,
):
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
"celery1@4cc150a7b49b",
"celery2@4cc150a7b49b",
]
with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
],
):
health_check.check_status()

assert len(health_check.errors) == 1

def test_check_status_reports_destinations(
self,
health_check,
settings,
):
settings.HEALTHCHECK_CELERY_PING_DESTINATION = [
"celery1@4cc150a7b49b",
"celery2@4cc150a7b49b",
]
with patch(
self.CELERY_APP_CONTROL_PING,
return_value=[
{"celery1@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
{"celery2@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
{"celery3@4cc150a7b49b": CeleryPingHealthCheck.CORRECT_PING_RESPONSE},
],
), patch(
self.CELERY_APP_CONTROL_INSPECT_ACTIVE_QUEUES,
return_value={
celery_worker: [
{"name": queue.name} for queue in settings.CELERY_QUEUES
]
for celery_worker in ("celery1@4cc150a7b49b", "celery2@4cc150a7b49b", "celery3@4cc150a7b49b")
},
):
health_check.check_status()

assert not health_check.errors


class TestCeleryPingHealthCheckApps:
def test_apps(self):
Expand Down