From 1a90bead0d07ad36c8e70be507ce26f966b9f10e Mon Sep 17 00:00:00 2001 From: Srdjan Grubor Date: Wed, 30 Jun 2021 02:46:41 -0500 Subject: [PATCH] [statsd] Improve tag normalization speed (#672) * [tests] Add ability to print profiling stats in statsd benchmark test This change allows us to exactly see where statsd is spending the majority of its time when processing metrics. The profiling can be activated via `BENCHMARK_PROFILING` variable. * [statsd] Improve performance of tag normalization Previously, we used a raw string in `re.sub` which is in the hot-path and needed compilation every time so now we use the precompiled version of the regex to do the substitution, yielding a much better performance. --- datadog/util/format.py | 2 +- tests/performance/test_statsd_throughput.py | 47 ++++++++++++++++++++- tests/unit/util/test_format.py | 22 +++++++++- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/datadog/util/format.py b/datadog/util/format.py index b325d9690..a29edb231 100644 --- a/datadog/util/format.py +++ b/datadog/util/format.py @@ -30,4 +30,4 @@ def force_to_epoch_seconds(epoch_sec_or_dt): def normalize_tags(tag_list): - return [re.sub(TAG_INVALID_CHARS_RE, TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] + return [TAG_INVALID_CHARS_RE.sub(TAG_INVALID_CHARS_SUBS, tag) for tag in tag_list] diff --git a/tests/performance/test_statsd_throughput.py b/tests/performance/test_statsd_throughput.py index ef548c47d..59c45411c 100644 --- a/tests/performance/test_statsd_throughput.py +++ b/tests/performance/test_statsd_throughput.py @@ -6,8 +6,11 @@ # Copyright 2015-Present Datadog, Inc # stdlib +import cProfile +import io import logging import os +import pstats import random import sys import threading @@ -22,6 +25,7 @@ # datadog from datadog.dogstatsd.base import DogStatsd +from datadog.util.compat import is_p3k # test utils from tests.util.fake_statsd_server import FakeServer @@ -121,6 +125,7 @@ def setUp(self): os.getenv("BENCHMARK_NUM_THREADS", str(self.DEFAULT_NUM_THREADS)) ) self.num_runs = int(os.getenv("BENCHMARK_NUM_RUNS", str(self.DEFAULT_NUM_RUNS))) + self.profiling_enabled = os.getenv("BENCHMARK_PROFILING", "false") in ["1", "true", "True", "Y", "yes", "Yes"] self.transport = os.getenv( "BENCHMARK_TRANSPORT", str(self.DEFAULT_TRANSPORT) ).upper() @@ -142,11 +147,12 @@ def one_line_warning(message, category, filename, lineno, *_): # pylint: disable=too-many-locals def test_statsd_performance(self): print( - "Starting: {} run(s), {} threads, {} points/thread via {}...".format( + "Starting: {} run(s), {} threads, {} points/thread via {} (profiling: {})...".format( self.num_runs, self.num_threads, self.num_datapoints, self.transport, + str(self.profiling_enabled).lower(), ) ) @@ -250,6 +256,7 @@ def _execute_test_run(self, server, metrics_order, num_threads, num_datapoints): start_signal, metrics_order[thread_idx], latency_results, + self.profiling_enabled, ), ) thread.daemon = True @@ -311,11 +318,19 @@ def _execute_test_run(self, server, metrics_order, num_threads, num_datapoints): @staticmethod def _thread_runner( - statsd_instance, start_event, thread_metrics_order, latency_results + statsd_instance, + start_event, + thread_metrics_order, + latency_results, + profiling_enabled, ): # We wait for a global signal to start running our events start_event.wait(5) + if profiling_enabled: + profiler = cProfile.Profile() + profiler.enable() + duration = 0.0 for metric_idx, metric in enumerate(thread_metrics_order): start_time = timeit.default_timer() @@ -328,3 +343,31 @@ def _thread_runner( statsd_instance.flush() latency_results.put(duration) + + if profiling_enabled: + TestDogStatsdThroughput.print_profiling_stats(profiler) + + + @staticmethod + def print_profiling_stats(profiler, sort_by='cumulative'): + """ + Prints profiling results for the thread that finishes its run. Options for + sorting include 'tottime', 'pcalls', 'ncalls', 'cumulative', etc but you can + check https://github.com/python/cpython/blob/3.9/Lib/pstats.py#L37-L45 for + other options. + """ + + profiler.disable() + + if is_p3k(): + output_stream = io.StringIO() + else: + output_stream = io.BytesIO() + + profiling_stats = pstats.Stats( + profiler, + stream=output_stream, + ).sort_stats(sort_by) + + profiling_stats.print_stats() + print(output_stream.getvalue()) diff --git a/tests/unit/util/test_format.py b/tests/unit/util/test_format.py index 554718a6d..1d3d7cb36 100644 --- a/tests/unit/util/test_format.py +++ b/tests/unit/util/test_format.py @@ -1,9 +1,12 @@ +# coding: utf8 # Unless explicitly stated otherwise all files in this repository are licensed under the BSD-3-Clause License. # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2015-Present Datadog, Inc +import unittest + import pytest -from datadog.util.format import construct_url +from datadog.util.format import construct_url, normalize_tags class TestConstructURL: @@ -30,3 +33,20 @@ class TestConstructURL: @pytest.mark.parametrize("host,api_version,path,expected", test_data) def test_construct_url(self, host, api_version, path, expected): assert construct_url(host, api_version, path) == expected + +class TestNormalizeTags: + """ + Test of the format's `normalize_tags` functionality + """ + test_data = [ + (['this is a tag'], ['this_is_a_tag']), + (['abc!@#$%^&*()0987654321{}}{'], ['abc__________0987654321____']), + (['abc!@#', '^%$#3456#'], ['abc___', '____3456_']), + (['mutliple', 'tags', 'included'], ['mutliple', 'tags', 'included']), + ([u'абвгдежзийкл', u'абв' , 'test123'], [u'абвгдежзийкл', u'абв' , 'test123']), + ([u'абвгд西😃ежзийкл', u'аб😃西в' , u'a😃😃b'], [u'абвгд西_ежзийкл', u'аб_西в', u'a__b']), + ] + + @pytest.mark.parametrize("original_tags,expected_tags", test_data) + def test_normalize_tags(self, original_tags, expected_tags): + assert normalize_tags(original_tags) == expected_tags