Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert background job queue time / latency to nano seconds #784

Merged
merged 4 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/scout_apm/bottle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@

import scout_apm.core
from scout_apm.core.config import scout_config
from scout_apm.core.queue_time import track_request_queue_time
from scout_apm.core.tracked_request import TrackedRequest
from scout_apm.core.web_requests import (
create_filtered_path,
ignore_path,
track_request_queue_time,
)
from scout_apm.core.web_requests import create_filtered_path, ignore_path


class ScoutPlugin(object):
Expand Down
13 changes: 4 additions & 9 deletions src/scout_apm/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from celery.signals import before_task_publish, task_failure, task_postrun, task_prerun

from scout_apm.core.queue_time import track_job_queue_time

try:
import django
from django.views.debug import SafeExceptionReporterFilter
Expand Down Expand Up @@ -34,15 +36,8 @@ def task_prerun_callback(task=None, **kwargs):
tracked_request = TrackedRequest.instance()
tracked_request.is_real_request = True

start = getattr(task.request, "scout_task_start", None)
if start is not None:
now = datetime_to_timestamp(dt.datetime.utcnow())
try:
queue_time = now - start
except TypeError:
pass
else:
tracked_request.tag("queue_time", queue_time)
start_time_header = getattr(task.request, "scout_task_start", None)
track_job_queue_time(start_time_header, tracked_request)

task_id = getattr(task.request, "id", None)
if task_id:
Expand Down
99 changes: 99 additions & 0 deletions src/scout_apm/core/queue_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# coding=utf-8

import datetime as dt
import logging
import time
import typing

from scout_apm.compat import datetime_to_timestamp
from scout_apm.core.tracked_request import TrackedRequest

logger = logging.getLogger(__name__)

# Cutoff epoch is used for determining ambiguous timestamp boundaries
CUTOFF_EPOCH_S = time.mktime((dt.date.today().year - 10, 1, 1, 0, 0, 0, 0, 0, 0))
CUTOFF_EPOCH_MS = CUTOFF_EPOCH_S * 1000.0
CUTOFF_EPOCH_US = CUTOFF_EPOCH_S * 1000000.0
CUTOFF_EPOCH_NS = CUTOFF_EPOCH_S * 1000000000.0


def _convert_ambiguous_timestamp_to_ns(timestamp: float) -> float:
"""
Convert an ambiguous float timestamp that could be in nanoseconds,
microseconds, milliseconds, or seconds to nanoseconds. Return 0.0 for
values in the more than 10 years ago.
"""
if timestamp > CUTOFF_EPOCH_NS:
converted_timestamp = timestamp
elif timestamp > CUTOFF_EPOCH_US:
converted_timestamp = timestamp * 1000.0
elif timestamp > CUTOFF_EPOCH_MS:
converted_timestamp = timestamp * 1000000.0
elif timestamp > CUTOFF_EPOCH_S:
converted_timestamp = timestamp * 1000000000.0
else:
return 0.0
return converted_timestamp


def track_request_queue_time(
header_value: typing.Any, tracked_request: TrackedRequest
) -> bool:
"""
Attempt to parse a queue time header and store the result in the tracked request.

Returns:
bool: Whether we succeeded in marking queue time. Used for testing.
"""
if header_value.startswith("t="):
header_value = header_value[2:]

try:
first_char = header_value[0]
except IndexError:
return False

if not first_char.isdigit(): # filter out negatives, nan, inf, etc.
return False

try:
ambiguous_start_timestamp = float(header_value)
except ValueError:
return False

start_timestamp_ns = _convert_ambiguous_timestamp_to_ns(ambiguous_start_timestamp)
if start_timestamp_ns == 0.0:
return False

tr_start_timestamp_ns = datetime_to_timestamp(tracked_request.start_time) * 1e9

# Ignore if in the future
if start_timestamp_ns > tr_start_timestamp_ns:
return False

queue_time_ns = int(tr_start_timestamp_ns - start_timestamp_ns)
tracked_request.tag("scout.queue_time_ns", queue_time_ns)
return True


def track_job_queue_time(
header_value: typing.Any, tracked_request: TrackedRequest
) -> bool:
"""
Attempt to parse a queue/latency time header and store the result in the request.

Returns:
bool: Whether we succeeded in marking queue time for the job. Used for testing.
"""
if header_value is not None:
now = datetime_to_timestamp(dt.datetime.utcnow()) * 1e9
try:
ambiguous_float_start = typing.cast(float, header_value)
start = _convert_ambiguous_timestamp_to_ns(ambiguous_float_start)
queue_time_ns = int(now - start)
except TypeError:
logger.debug("Invalid job queue time header: %r", header_value)
return False
else:
tracked_request.tag("scout.job_queue_time_ns", queue_time_ns)
return True
64 changes: 2 additions & 62 deletions src/scout_apm/core/web_requests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# coding=utf-8

import datetime as dt
import time

from scout_apm.compat import datetime_to_timestamp, parse_qsl, urlencode
from scout_apm.compat import parse_qsl, urlencode
from scout_apm.core.config import scout_config
from scout_apm.core.queue_time import track_request_queue_time

# Originally derived from:
# 1. Rails:
Expand Down Expand Up @@ -103,64 +101,6 @@ def ignore_path(path):
return False


def track_request_queue_time(header_value, tracked_request):
if header_value.startswith("t="):
header_value = header_value[2:]

try:
first_char = header_value[0]
except IndexError:
return False

if not first_char.isdigit(): # filter out negatives, nan, inf, etc.
return False

try:
ambiguous_start_timestamp = float(header_value)
except ValueError:
return False

start_timestamp_ns = convert_ambiguous_timestamp_to_ns(ambiguous_start_timestamp)
if start_timestamp_ns == 0.0:
return False

tr_start_timestamp_ns = datetime_to_timestamp(tracked_request.start_time) * 1e9

# Ignore if in the future
if start_timestamp_ns > tr_start_timestamp_ns:
return False

queue_time_ns = int(tr_start_timestamp_ns - start_timestamp_ns)
tracked_request.tag("scout.queue_time_ns", queue_time_ns)
return True


# Cutoff epoch is used for determining ambiguous timestamp boundaries
CUTOFF_EPOCH_S = time.mktime((dt.date.today().year - 10, 1, 1, 0, 0, 0, 0, 0, 0))
CUTOFF_EPOCH_MS = CUTOFF_EPOCH_S * 1000.0
CUTOFF_EPOCH_US = CUTOFF_EPOCH_S * 1000000.0
CUTOFF_EPOCH_NS = CUTOFF_EPOCH_S * 1000000000.0


def convert_ambiguous_timestamp_to_ns(timestamp):
"""
Convert an ambiguous float timestamp that could be in nanoseconds,
microseconds, milliseconds, or seconds to nanoseconds. Return 0.0 for
values in the more than 10 years ago.
"""
if timestamp > CUTOFF_EPOCH_NS:
converted_timestamp = timestamp
elif timestamp > CUTOFF_EPOCH_US:
converted_timestamp = timestamp * 1000.0
elif timestamp > CUTOFF_EPOCH_MS:
converted_timestamp = timestamp * 1000000.0
elif timestamp > CUTOFF_EPOCH_S:
converted_timestamp = timestamp * 1000000000.0
else:
return 0.0
return converted_timestamp


def asgi_track_request_data(scope, tracked_request):
"""
Track request data from an ASGI HTTP or Websocket scope.
Expand Down
7 changes: 2 additions & 5 deletions src/scout_apm/django/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
from django.urls import get_urlconf

from scout_apm.core.config import scout_config
from scout_apm.core.queue_time import track_request_queue_time
from scout_apm.core.tracked_request import TrackedRequest
from scout_apm.core.web_requests import (
create_filtered_path,
ignore_path,
track_request_queue_time,
)
from scout_apm.core.web_requests import create_filtered_path, ignore_path
from scout_apm.django.request import get_controller_name


Expand Down
7 changes: 2 additions & 5 deletions src/scout_apm/falcon.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@

from scout_apm.api import install
from scout_apm.core.config import scout_config
from scout_apm.core.queue_time import track_request_queue_time
from scout_apm.core.tracked_request import TrackedRequest
from scout_apm.core.web_requests import (
create_filtered_path,
ignore_path,
track_request_queue_time,
)
from scout_apm.core.web_requests import create_filtered_path, ignore_path

logger = logging.getLogger(__name__)

Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ def test_hello_worker(celery_app, celery_worker, tracked_requests):
assert tracked_request.tags["priority"] == 0
assert tracked_request.tags["routing_key"] == "celery"
assert tracked_request.tags["queue"] == "unknown"
sixty_seconds = 60_000_000_000
assert (
0.0 <= tracked_request.tags["queue_time"] < 60.0
0.0 <= tracked_request.tags["scout.job_queue_time_ns"] < sixty_seconds
) # Assume test took <60 seconds
assert tracked_request.active_spans == []
assert len(tracked_request.complete_spans) == 1
Expand All @@ -272,7 +273,7 @@ def test_hello_worker_header_preset(celery_app, celery_worker, tracked_requests)
assert len(tracked_request.complete_spans) == 1
span = tracked_request.complete_spans[0]
assert span.operation == "Job/tests.integration.test_celery.hello"
assert "queue_time" not in span.tags
assert "scout.job_queue_time_ns" not in span.tags


@skip_unless_celery_4_plus
Expand Down
93 changes: 93 additions & 0 deletions tests/unit/core/test_queue_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# coding=utf-8

import datetime as dt
import time

import pytest

from scout_apm.compat import datetime_to_timestamp
from scout_apm.core.queue_time import (
CUTOFF_EPOCH_S,
_convert_ambiguous_timestamp_to_ns,
track_job_queue_time,
track_request_queue_time,
)


@pytest.mark.parametrize("with_t", [True, False])
def test_track_request_queue_time_valid(with_t, tracked_request):
queue_start = int(datetime_to_timestamp(dt.datetime.utcnow())) - 2
if with_t:
header_value = str("t=") + str(queue_start)
else:
header_value = str(queue_start)

result = track_request_queue_time(header_value, tracked_request)

assert result is True
queue_time_ns = tracked_request.tags["scout.queue_time_ns"]
assert isinstance(queue_time_ns, int) and queue_time_ns > 0


@pytest.mark.parametrize(
"header_value",
[
str(""),
str("t=X"), # first character not a digit
str("t=0.3f"), # raises ValueError on float() conversion
str(datetime_to_timestamp(dt.datetime.utcnow()) + 3600.0), # one hour in future
str(datetime_to_timestamp(dt.datetime(2009, 1, 1))), # before ambig cutoff
],
)
def test_track_request_queue_time_invalid(header_value, tracked_request):
result = track_request_queue_time(header_value, tracked_request)

assert result is False
assert "scout.queue_time_ns" not in tracked_request.tags


@pytest.mark.parametrize("with_t", [True, False])
def test_track_job_queue_time_valid(with_t, tracked_request):
queue_start = datetime_to_timestamp(dt.datetime.utcnow()) - 2.0
result = track_job_queue_time(queue_start, tracked_request)

assert result is True
queue_time_ns = tracked_request.tags["scout.job_queue_time_ns"]
assert isinstance(queue_time_ns, int) and queue_time_ns > 0


@pytest.mark.parametrize(
"header_value",
[
str(""),
str("123"),
str(datetime_to_timestamp(dt.datetime.utcnow()) + 3600.0), # one hour in future
str(datetime_to_timestamp(dt.datetime(2009, 1, 1))), # before ambig cutoff
],
)
def test_track_job_queue_time_invalid(header_value, tracked_request):
result = track_job_queue_time(header_value, tracked_request)

assert result is False
assert "scout.job_queue_time_ns" not in tracked_request.tags


ref_time_s = time.mktime((2019, 6, 1, 0, 0, 0, 0, 0, 0))


@pytest.mark.parametrize(
"given,expected",
[
(ref_time_s, ref_time_s * 1e9),
(ref_time_s * 1e3, ref_time_s * 1e9),
(ref_time_s * 1e6, ref_time_s * 1e9),
(CUTOFF_EPOCH_S + 10, (CUTOFF_EPOCH_S + 10) * 1e9),
(0.0, 0.0),
(1000.0, 0.0),
(float("inf"), float("inf")),
(float("-inf"), 0.0),
(float("nan"), 0.0),
],
)
def test_convert_ambiguous_timestamp_to_ns(given, expected):
assert _convert_ambiguous_timestamp_to_ns(given) == expected
Loading
Loading