Skip to content

Commit

Permalink
[statsd] Improve tag normalization speed (#672)
Browse files Browse the repository at this point in the history
* [tests] Add ability to print profiling stats in statsd benchmark test

This change allows us to exactly see where statsd is spending the
majority of its time when processing metrics. The profiling can be
activated via `BENCHMARK_PROFILING` variable.

* [statsd] Improve performance of tag normalization

Previously, we used a raw string in `re.sub` which is in the hot-path
and needed compilation every time so now we use the precompiled version
of the regex to do the substitution, yielding a much better performance.
  • Loading branch information
sgnn7 authored Jun 30, 2021
1 parent c4df82e commit 1a90bea
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
2 changes: 1 addition & 1 deletion datadog/util/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ def force_to_epoch_seconds(epoch_sec_or_dt):


def normalize_tags(tag_list):
return [re.sub(TAG_INVALID_CHARS_RE, TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list]
47 changes: 45 additions & 2 deletions tests/performance/test_statsd_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
# Copyright 2015-Present Datadog, Inc

# stdlib
import cProfile
import io
import logging
import os
import pstats
import random
import sys
import threading
Expand All @@ -22,6 +25,7 @@

# datadog
from datadog.dogstatsd.base import DogStatsd
from datadog.util.compat import is_p3k

# test utils
from tests.util.fake_statsd_server import FakeServer
Expand Down Expand Up @@ -121,6 +125,7 @@ def setUp(self):
os.getenv("BENCHMARK_NUM_THREADS", str(self.DEFAULT_NUM_THREADS))
)
self.num_runs = int(os.getenv("BENCHMARK_NUM_RUNS", str(self.DEFAULT_NUM_RUNS)))
self.profiling_enabled = os.getenv("BENCHMARK_PROFILING", "false") in ["1", "true", "True", "Y", "yes", "Yes"]
self.transport = os.getenv(
"BENCHMARK_TRANSPORT", str(self.DEFAULT_TRANSPORT)
).upper()
Expand All @@ -142,11 +147,12 @@ def one_line_warning(message, category, filename, lineno, *_):
# pylint: disable=too-many-locals
def test_statsd_performance(self):
print(
"Starting: {} run(s), {} threads, {} points/thread via {}...".format(
"Starting: {} run(s), {} threads, {} points/thread via {} (profiling: {})...".format(
self.num_runs,
self.num_threads,
self.num_datapoints,
self.transport,
str(self.profiling_enabled).lower(),
)
)

Expand Down Expand Up @@ -250,6 +256,7 @@ def _execute_test_run(self, server, metrics_order, num_threads, num_datapoints):
start_signal,
metrics_order[thread_idx],
latency_results,
self.profiling_enabled,
),
)
thread.daemon = True
Expand Down Expand Up @@ -311,11 +318,19 @@ def _execute_test_run(self, server, metrics_order, num_threads, num_datapoints):

@staticmethod
def _thread_runner(
statsd_instance, start_event, thread_metrics_order, latency_results
statsd_instance,
start_event,
thread_metrics_order,
latency_results,
profiling_enabled,
):
# We wait for a global signal to start running our events
start_event.wait(5)

if profiling_enabled:
profiler = cProfile.Profile()
profiler.enable()

duration = 0.0
for metric_idx, metric in enumerate(thread_metrics_order):
start_time = timeit.default_timer()
Expand All @@ -328,3 +343,31 @@ def _thread_runner(
statsd_instance.flush()

latency_results.put(duration)

if profiling_enabled:
TestDogStatsdThroughput.print_profiling_stats(profiler)


@staticmethod
def print_profiling_stats(profiler, sort_by='cumulative'):
"""
Prints profiling results for the thread that finishes its run. Options for
sorting include 'tottime', 'pcalls', 'ncalls', 'cumulative', etc but you can
check https://github.com/python/cpython/blob/3.9/Lib/pstats.py#L37-L45 for
other options.
"""

profiler.disable()

if is_p3k():
output_stream = io.StringIO()
else:
output_stream = io.BytesIO()

profiling_stats = pstats.Stats(
profiler,
stream=output_stream,
).sort_stats(sort_by)

profiling_stats.print_stats()
print(output_stream.getvalue())
22 changes: 21 additions & 1 deletion tests/unit/util/test_format.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# coding: utf8
# Unless explicitly stated otherwise all files in this repository are licensed under the BSD-3-Clause License.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2015-Present Datadog, Inc
import unittest

import pytest

from datadog.util.format import construct_url
from datadog.util.format import construct_url, normalize_tags


class TestConstructURL:
Expand All @@ -30,3 +33,20 @@ class TestConstructURL:
@pytest.mark.parametrize("host,api_version,path,expected", test_data)
def test_construct_url(self, host, api_version, path, expected):
assert construct_url(host, api_version, path) == expected

class TestNormalizeTags:
"""
Test of the format's `normalize_tags` functionality
"""
test_data = [
(['this is a tag'], ['this_is_a_tag']),
(['abc!@#$%^&*()0987654321{}}{'], ['abc__________0987654321____']),
(['abc!@#', '^%$#3456#'], ['abc___', '____3456_']),
(['mutliple', 'tags', 'included'], ['mutliple', 'tags', 'included']),
([u'абвгдежзийкл', u'абв' , 'test123'], [u'абвгдежзийкл', u'абв' , 'test123']),
([u'абвгд西😃ежзийкл', u'аб😃西в' , u'a😃😃b'], [u'абвгд西_ежзийкл', u'аб_西в', u'a__b']),
]

@pytest.mark.parametrize("original_tags,expected_tags", test_data)
def test_normalize_tags(self, original_tags, expected_tags):
assert normalize_tags(original_tags) == expected_tags

0 comments on commit 1a90bea

Please sign in to comment.