From 05580df4c43aa0b2d4664d74328f4f742af98f72 Mon Sep 17 00:00:00 2001 From: Gguidini Date: Fri, 19 Apr 2024 17:31:20 +0200 Subject: [PATCH 1/2] Add task group to metrics We have task health metrics (success, failure, ...) But we also have a lot of tasks. And the graphs in grafana don't look amazing with all those tasks there. In particular I'd like to have general success / failure rates per task group. "task_group" is the task_config_group (part of the task name). This can be used so we know at any given time how's the general health for the group as a whole. --- tasks/base.py | 60 +++++++++++++++++-------- tasks/tests/unit/test_base.py | 85 ++++++++++++++++++++++------------- 2 files changed, 95 insertions(+), 50 deletions(-) diff --git a/tasks/base.py b/tasks/base.py index c40020558..af25e236a 100644 --- a/tasks/base.py +++ b/tasks/base.py @@ -26,12 +26,12 @@ REQUEST_TIMEOUT_COUNTER = Counter( "worker_task_counts_timeouts", "Number of times a task experienced any kind of timeout", - ["task"], + ["task", "task_group"], ) REQUEST_HARD_TIMEOUT_COUNTER = Counter( "worker_task_counts_hard_timeouts", "Number of times a task experienced a hard timeout", - ["task"], + ["task", "task_group"], ) @@ -42,49 +42,58 @@ def metrics_prefix(self): def on_timeout(self, soft: bool, timeout: int): res = super().on_timeout(soft, timeout) + task_group = ( + self.name.split(".")[-2] if self.name is not None else "unknown_group" + ) if not soft: - REQUEST_HARD_TIMEOUT_COUNTER.labels(task=self.name).inc() + REQUEST_HARD_TIMEOUT_COUNTER.labels( + task=self.name, task_group=task_group + ).inc() metrics.incr(f"{self.metrics_prefix}.hardtimeout") - REQUEST_TIMEOUT_COUNTER.labels(task=self.name).inc() + REQUEST_TIMEOUT_COUNTER.labels(task=self.name, task_group=task_group).inc() metrics.incr(f"{self.metrics_prefix}.timeout") return res # Task reliability metrics TASK_RUN_COUNTER = Counter( - "worker_task_counts_runs", "Number of times this task was run", ["task"] + "worker_task_counts_runs", + "Number of times this task was run", + ["task", "task_group"], ) TASK_RETRY_COUNTER = Counter( - "worker_task_counts_retries", "Number of times this task was retried", ["task"] + "worker_task_counts_retries", + "Number of times this task was retried", + ["task", "task_group"], ) TASK_SUCCESS_COUNTER = Counter( "worker_task_counts_successes", "Number of times this task completed without error", - ["task"], + ["task", "task_group"], ) TASK_FAILURE_COUNTER = Counter( "worker_task_counts_failures", "Number of times this task failed with an exception", - ["task"], + ["task", "task_group"], ) # Task runtime metrics TASK_FULL_RUNTIME = Histogram( "worker_task_timers_full_runtime_seconds", "Total runtime in seconds of this task including db commits and error handling", - ["task"], + ["task", "task_group"], buckets=[0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 180, 300, 600, 900], ) TASK_CORE_RUNTIME = Histogram( "worker_task_timers_core_runtime_seconds", "Runtime in seconds of this task's main logic, not including db commits or error handling", - ["task"], + ["task", "task_group"], buckets=[0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 180, 300, 600, 900], ) TASK_TIME_IN_QUEUE = Histogram( "worker_tasks_timers_time_in_queue_seconds", "Time in {TODO} spent waiting in the queue before being run", - ["task", "queue"], + ["task", "queue", "task_group"], buckets=[ 0.01, 0.05, @@ -114,20 +123,33 @@ def on_timeout(self, soft: bool, timeout: int): class BaseCodecovTask(celery_app.Task): Request = BaseCodecovRequest - def __init_subclass__(cls, name=None): + def __init_subclass__(cls, name="unknown_task"): cls.name = name + # All task names follow the format `app.[cron|task]..` + task_group = name.split(".")[-2] if name != "unknown_task" else "unknown_group" + cls.task_group = task_group cls.metrics_prefix = f"worker.task.{name}" # Task reliability metrics - cls.task_run_counter = TASK_RUN_COUNTER.labels(task=name) - cls.task_retry_counter = TASK_RETRY_COUNTER.labels(task=name) - cls.task_success_counter = TASK_SUCCESS_COUNTER.labels(task=name) - cls.task_failure_counter = TASK_FAILURE_COUNTER.labels(task=name) + cls.task_run_counter = TASK_RUN_COUNTER.labels(task=name, task_group=task_group) + cls.task_retry_counter = TASK_RETRY_COUNTER.labels( + task=name, task_group=task_group + ) + cls.task_success_counter = TASK_SUCCESS_COUNTER.labels( + task=name, task_group=task_group + ) + cls.task_failure_counter = TASK_FAILURE_COUNTER.labels( + task=name, task_group=task_group + ) # Task runtime metrics - cls.task_full_runtime = TASK_FULL_RUNTIME.labels(task=name) - cls.task_core_runtime = TASK_CORE_RUNTIME.labels(task=name) + cls.task_full_runtime = TASK_FULL_RUNTIME.labels( + task=name, task_group=task_group + ) + cls.task_core_runtime = TASK_CORE_RUNTIME.labels( + task=name, task_group=task_group + ) @property def hard_time_limit_task(self): @@ -236,7 +258,7 @@ def _emit_queue_metrics(self): queue_name = self.request.get("delivery_info", {}).get("routing_key", None) time_in_queue_timer = TASK_TIME_IN_QUEUE.labels( - task=self.name, queue=queue_name + task=self.name, queue=queue_name, task_group=self.task_group ) # TODO is None a valid label value time_in_queue_timer.observe(delta.total_seconds()) diff --git a/tasks/tests/unit/test_base.py b/tasks/tests/unit/test_base.py index f62d4dc92..ca027f542 100644 --- a/tasks/tests/unit/test_base.py +++ b/tasks/tests/unit/test_base.py @@ -40,13 +40,13 @@ def now(cls): return datetime.fromisoformat("2023-06-13T10:01:01.000123") -class SampleTask(BaseCodecovTask, name="test.SampleTask"): +class SampleTask(BaseCodecovTask, name="app.task.test.SampleTask"): def run_impl(self, dbsession): return {"unusual": "return", "value": ["There"]} class SampleTaskWithArbitraryError( - BaseCodecovTask, name="test.SampleTaskWithArbitraryError" + BaseCodecovTask, name="app.task.test.SampleTaskWithArbitraryError" ): def __init__(self, error): self.error = error @@ -60,7 +60,7 @@ def retry(self): class SampleTaskWithArbitraryPostgresError( - BaseCodecovTask, name="test.SampleTaskWithArbitraryPostgresError" + BaseCodecovTask, name="app.task.test.SampleTaskWithArbitraryPostgresError" ): def __init__(self, error): self.error = error @@ -73,17 +73,19 @@ def retry(self): raise Retry() -class SampleTaskWithSoftTimeout(BaseCodecovTask, name="test.SampleTaskWithSoftTimeout"): +class SampleTaskWithSoftTimeout( + BaseCodecovTask, name="app.task.test.SampleTaskWithSoftTimeout" +): def run_impl(self, dbsession): raise SoftTimeLimitExceeded() -class FailureSampleTask(BaseCodecovTask, name="test.FailureSampleTask"): +class FailureSampleTask(BaseCodecovTask, name="app.task.test.FailureSampleTask"): def run_impl(self, *args, **kwargs): raise Exception("Whhhhyyyyyyy") -class RetrySampleTask(BaseCodecovTask, name="test.RetrySampleTask"): +class RetrySampleTask(BaseCodecovTask, name="app.task.test.RetrySampleTask"): def run(self, *args, **kwargs): self.retry() @@ -121,7 +123,7 @@ def test_sample_run(self, mock_simple_metric, mocker, dbsession): mocked_metrics.timing.assert_has_calls( [ call( - "worker.task.test.SampleTask.time_in_queue", + f"worker.task.{task_instance.name}.time_in_queue", timedelta(seconds=61, microseconds=123), ), call( @@ -129,7 +131,7 @@ def test_sample_run(self, mock_simple_metric, mocker, dbsession): timedelta(seconds=61, microseconds=123), ), call( - "worker.task.test.SampleTask.my-queue.time_in_queue", + f"worker.task.{task_instance.name}.my-queue.time_in_queue", timedelta(seconds=61, microseconds=123), ), ] @@ -137,14 +139,18 @@ def test_sample_run(self, mock_simple_metric, mocker, dbsession): assert ( REGISTRY.get_sample_value( "worker_tasks_timers_time_in_queue_seconds_sum", - labels={"task": SampleTask.name, "queue": "my-queue"}, + labels={ + "task": SampleTask.name, + "task_group": SampleTask.task_group, + "queue": "my-queue", + }, ) == 61.000123 ) mock_simple_metric.assert_has_calls( [ - call("worker.task.test.SampleTask.core_runtime", ANY), - call("worker.task.test.SampleTask.full_runtime", ANY), + call(f"worker.task.{task_instance.name}.core_runtime", ANY), + call(f"worker.task.{task_instance.name}.full_runtime", ANY), ] ) @@ -330,7 +336,7 @@ def test_run_sqlalchemy_error_rollback(self, mocker, dbsession, celery_app): @pytest.mark.django_db(databases={"default", "timeseries"}) class TestBaseCodecovTaskHooks(object): def test_sample_task_success(self, celery_app, mocker): - class SampleTask(BaseCodecovTask, name="test.SampleTask"): + class SampleTask(BaseCodecovTask, name="app.task.test.SampleTask"): def run_impl(self, dbsession): return {"unusual": "return", "value": ["There"]} @@ -339,22 +345,26 @@ def run_impl(self, dbsession): task = celery_app.tasks[DTask.name] prom_run_counter_before = REGISTRY.get_sample_value( - "worker_task_counts_runs_total", labels={"task": DTask.name} + "worker_task_counts_runs_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) prom_success_counter_before = REGISTRY.get_sample_value( - "worker_task_counts_successes_total", labels={"task": DTask.name} + "worker_task_counts_successes_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) k = task.apply() prom_run_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_runs_total", labels={"task": DTask.name} + "worker_task_counts_runs_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) prom_success_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_successes_total", labels={"task": DTask.name} + "worker_task_counts_successes_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) res = k.get() assert res == {"unusual": "return", "value": ["There"]} - mock_metrics.assert_called_with("worker.task.test.SampleTask.successes") + mock_metrics.assert_called_with(f"worker.task.{DTask.name}.successes") assert prom_run_counter_after - prom_run_counter_before == 1 assert prom_success_counter_after - prom_success_counter_before == 1 @@ -366,19 +376,24 @@ def run_impl(self, *args, **kwargs): mock_metrics = mocker.patch("tasks.base.metrics.incr") DTask = celery_app.register_task(FailureSampleTask()) task = celery_app.tasks[DTask.name] + assert task.task_group == "test" with pytest.raises(Exception) as exc: prom_run_counter_before = REGISTRY.get_sample_value( - "worker_task_counts_runs_total", labels={"task": DTask.name} + "worker_task_counts_runs_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) prom_failure_counter_before = REGISTRY.get_sample_value( - "worker_task_counts_failures_total", labels={"task": DTask.name} + "worker_task_counts_failures_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) task.apply().get() prom_run_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_runs_total", labels={"task": DTask.name} + "worker_task_counts_runs_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) prom_failure_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_failures_total", labels={"task": DTask.name} + "worker_task_counts_failures_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) assert prom_run_counter_after - prom_run_counter_before == 1 assert prom_failure_counter_after - prom_failure_counter_before == 1 @@ -394,13 +409,15 @@ def test_sample_task_retry(self, celery_app, mocker): mock_metrics = mocker.patch("tasks.base.metrics.incr") task = RetrySampleTask() prom_retry_counter_before = REGISTRY.get_sample_value( - "worker_task_counts_retries_total", labels={"task": task.name} + "worker_task_counts_retries_total", + labels={"task": task.name, "task_group": task.task_group}, ) task.on_retry("exc", "task_id", "args", "kwargs", "einfo") prom_retry_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_retries_total", labels={"task": task.name} + "worker_task_counts_retries_total", + labels={"task": task.name, "task_group": task.task_group}, ) - mock_metrics.assert_called_with("worker.task.test.RetrySampleTask.retries") + mock_metrics.assert_called_with(f"worker.task.{task.name}.retries") assert prom_retry_counter_after - prom_retry_counter_before == 1 @@ -441,15 +458,17 @@ class SampleTask(BaseCodecovTask, name="test.SampleTask"): request = self.xRequest(mocker, DTask.name, celery_app) prom_timeout_counter_before = ( REGISTRY.get_sample_value( - "worker_task_counts_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) or 0 ) request.on_timeout(True, 10) prom_timeout_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) - mock_metrics.assert_called_with("worker.task.test.SampleTask.timeout") + mock_metrics.assert_called_with(f"worker.task.{DTask.name}.timeout") assert prom_timeout_counter_after - prom_timeout_counter_before == 1 def test_sample_task_hard_timeout(self, celery_app, mocker): @@ -461,22 +480,26 @@ class SampleTask(BaseCodecovTask, name="test.SampleTask"): request = self.xRequest(mocker, DTask.name, celery_app) prom_timeout_counter_before = ( REGISTRY.get_sample_value( - "worker_task_counts_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) or 0 ) prom_hard_timeout_counter_before = ( REGISTRY.get_sample_value( - "worker_task_counts_hard_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_hard_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) or 0 ) request.on_timeout(False, 10) prom_timeout_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) prom_hard_timeout_counter_after = REGISTRY.get_sample_value( - "worker_task_counts_hard_timeouts_total", labels={"task": DTask.name} + "worker_task_counts_hard_timeouts_total", + labels={"task": DTask.name, "task_group": DTask.task_group}, ) mock_metrics.assert_any_call("worker.task.test.SampleTask.hardtimeout") mock_metrics.assert_any_call("worker.task.test.SampleTask.timeout") From c58156e9fddd1413fee7f6e4392c6c7f629a060f Mon Sep 17 00:00:00 2001 From: Gguidini Date: Mon, 22 Apr 2024 09:31:07 +0200 Subject: [PATCH 2/2] move task_group to aux function thanks @joseph-sentry for the suggestion. Also avoids index errors by checking that the task name includes at least one '.' --- tasks/base.py | 15 +++++++++++---- tasks/tests/unit/test_base.py | 18 ++++++++++++++++-- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/tasks/base.py b/tasks/base.py index af25e236a..00af56a67 100644 --- a/tasks/base.py +++ b/tasks/base.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import Optional from celery.exceptions import SoftTimeLimitExceeded from celery.worker.request import Request @@ -35,6 +36,14 @@ ) +def _get_task_group_from_name(task_name: Optional[str]) -> Optional[str]: + if task_name is None or ("." not in task_name): + return None + # We checked that '.' is part of task_name + # So there will be at least 2 items in the array + return task_name.split(".")[-2] + + class BaseCodecovRequest(Request): @property def metrics_prefix(self): @@ -42,9 +51,7 @@ def metrics_prefix(self): def on_timeout(self, soft: bool, timeout: int): res = super().on_timeout(soft, timeout) - task_group = ( - self.name.split(".")[-2] if self.name is not None else "unknown_group" - ) + task_group = _get_task_group_from_name(self.name) if not soft: REQUEST_HARD_TIMEOUT_COUNTER.labels( task=self.name, task_group=task_group @@ -126,7 +133,7 @@ class BaseCodecovTask(celery_app.Task): def __init_subclass__(cls, name="unknown_task"): cls.name = name # All task names follow the format `app.[cron|task]..` - task_group = name.split(".")[-2] if name != "unknown_task" else "unknown_group" + task_group = _get_task_group_from_name(name) cls.task_group = task_group cls.metrics_prefix = f"worker.task.{name}" diff --git a/tasks/tests/unit/test_base.py b/tasks/tests/unit/test_base.py index ca027f542..cbfadfce5 100644 --- a/tasks/tests/unit/test_base.py +++ b/tasks/tests/unit/test_base.py @@ -20,12 +20,27 @@ ) from database.tests.factories.core import OwnerFactory, RepositoryFactory -from tasks.base import BaseCodecovRequest, BaseCodecovTask +from tasks.base import BaseCodecovRequest, BaseCodecovTask, _get_task_group_from_name from tasks.base import celery_app as base_celery_app here = Path(__file__) +@pytest.mark.parametrize( + "task_name,output", + [ + (None, None), + ("task_not_following_expected_pattern", None), + ("task_group.task_name", "task_group"), + ("app.task.task_group.task_name", "task_group"), + ("app.cron.daily.task_name", "daily"), + (f"app.tasks.test_results.TestResultsProcessor", "test_results"), + ], +) +def test__get_task_group_from_name(task_name, output): + assert _get_task_group_from_name(task_name) == output + + class MockDateTime(datetime): """ `@pytest.mark.freeze_time()` is convenient but will freeze time for @@ -422,7 +437,6 @@ def test_sample_task_retry(self, celery_app, mocker): class TestBaseCodecovRequest(object): - """ All in all, this is a really weird class