From 6bf457859e23da8847291536e54b00e3b7023d5b Mon Sep 17 00:00:00 2001 From: Srdjan Grubor Date: Wed, 17 Nov 2021 07:23:41 -0600 Subject: [PATCH] [statsd] Add ability to toggle `statsd.disable_buffering` state during runtime (#700) * [statsd] Enable toggling of `disable_buffer` flag at runtime Since `initialize` uses the module-level DogStatsd instance that is only initialized once, we cannot rely on new instantiation for changes to the `disable_buffering` flag. This change allows the consumers to call `stats.disable_buffering = ` with runtime changes being applied to the module-level instance. By doing this, we can in future changes allow `initialize()` to change this flag if meeded. * [init] Add `statsd_disable_buffering` arg to `datadog.initialize()` Now that the DogStatsd module can handle live toggling of buffering switches, we expose this argument in the `datadog.initialize()`. While not terribly useful currently, it will be very valuable if/when buffering is enabled on the DogStatsd module by default (again). --- datadog/__init__.py | 7 ++ datadog/dogstatsd/base.py | 92 ++++++++++++++++----- tests/unit/dogstatsd/test_statsd.py | 124 ++++++++++++++++++++++++++++ 3 files changed, 202 insertions(+), 21 deletions(-) diff --git a/datadog/__init__.py b/datadog/__init__.py index 45e0dd532..b801cedc6 100644 --- a/datadog/__init__.py +++ b/datadog/__init__.py @@ -37,6 +37,7 @@ def initialize( api_host=None, # type: Optional[str] statsd_host=None, # type: Optional[str] statsd_port=None, # type: Optional[int] + statsd_disable_buffering=True, # type: bool statsd_use_default_route=False, # type: bool statsd_socket_path=None, # type: Optional[str] statsd_namespace=None, # type: Optional[str] @@ -71,6 +72,10 @@ def initialize( :param statsd_port: Port of DogStatsd server or statsd daemon :type statsd_port: port + :param statsd_disable_buffering: Enable/disable statsd client buffering support + (default: True). + :type statsd_disable_buffering: boolean + :param statsd_use_default_route: Dynamically set the statsd host to the default route (Useful when running the client in a container) :type statsd_use_default_route: boolean @@ -122,6 +127,8 @@ def initialize( if statsd_constant_tags: statsd.constant_tags += statsd_constant_tags + statsd.disable_buffering = statsd_disable_buffering + api._return_raw_response = return_raw_response # HTTP client and API options diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index d06560905..218ecaa58 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -304,28 +304,24 @@ def __init__( self._reset_buffer() - # This lock is used for backwards compatibility to prevent concurrent - # changes to the buffer when the user is managing the buffer themselves - # via deprecated `open_buffer()` and `close_buffer()` functions. - self._manual_buffer_lock = RLock() + # This lock is used for all cases where buffering functionality is + # being toggled (by `open_buffer()`, `close_buffer()`, or + # `self._disable_buffering` calls). + self._buffering_toggle_lock = RLock() # If buffering is disabled, we bypass the buffer function. self._send = self._send_to_buffer - if disable_buffering: - log.info("Statsd buffering is disabled") + self._disable_buffering = disable_buffering + if self._disable_buffering: self._send = self._send_to_server + log.info("Statsd buffering is disabled") # Start the flush thread if buffering is enabled and the interval is above # a reasonable range. This both prevents thrashing and allow us to use "0.0" # as a value for disabling the automatic flush timer as well. - if not disable_buffering and flush_interval >= MIN_FLUSH_INTERVAL: - self._register_flush_thread(flush_interval) - log.debug( - "Statsd flush thread registered with period of %s", - flush_interval, - ) - else: - log.info("Statsd periodic buffer flush is disabled") + self._flush_interval = flush_interval + self._flush_thread_stop = threading.Event() + self._start_flush_thread(self._flush_interval) def disable_telemetry(self): self._telemetry = False @@ -333,30 +329,84 @@ def disable_telemetry(self): def enable_telemetry(self): self._telemetry = True - def _register_flush_thread(self, sleep_duration): - def _flush_thread_loop(self, sleep_duration): - while True: - time.sleep(sleep_duration) + # Note: Invocations of this method should be thread-safe + def _start_flush_thread(self, flush_interval): + if self._disable_buffering or self._flush_interval <= MIN_FLUSH_INTERVAL: + log.info("Statsd periodic buffer flush is disabled") + return + + def _flush_thread_loop(self, flush_interval): + while not self._flush_thread_stop.is_set(): + time.sleep(flush_interval) self.flush() self._flush_thread = threading.Thread( name="{}_flush_thread".format(self.__class__.__name__), target=_flush_thread_loop, - args=(self, sleep_duration,), + args=(self, flush_interval,), ) self._flush_thread.daemon = True self._flush_thread.start() + log.debug( + "Statsd flush thread registered with period of %s", + self._flush_interval, + ) + + # Note: Invocations of this method should be thread-safe + def _stop_flush_thread(self): + if not self._flush_thread: + log.warning("No statsd flush thread to stop") + return + + try: + self.flush() + finally: + pass + + self._flush_thread_stop.set() + + self._flush_thread.join() + self._flush_thread = None + + self._flush_thread_stop.clear() + def _dedicated_telemetry_destination(self): return bool(self.telemetry_socket_path or self.telemetry_host) + # Context manager helper def __enter__(self): self.open_buffer() return self + # Context manager helper def __exit__(self, exc_type, value, traceback): self.close_buffer() + @property + def disable_buffering(self): + with self._buffering_toggle_lock: + return self._disable_buffering + + @disable_buffering.setter + def disable_buffering(self, is_disabled): + with self._buffering_toggle_lock: + # If the toggle didn't change anything, this method is a noop + if self._disable_buffering == is_disabled: + return + + self._disable_buffering = is_disabled + + # If buffering has been disabled, flush and kill the background thread + # otherwise start up the flushing thread and enable the buffering. + if is_disabled: + self._send = self._send_to_server + self._stop_flush_thread() + log.info("Statsd buffering is disabled") + else: + self._send = self._send_to_buffer + self._start_flush_thread(self._flush_interval) + @staticmethod def resolve_host(host, use_default_route): """ @@ -448,7 +498,7 @@ def open_buffer(self, max_buffer_size=None): Note: This method must be called before close_buffer() matching invocation. """ - self._manual_buffer_lock.acquire() + self._buffering_toggle_lock.acquire() # XXX Remove if `disable_buffering` default is changed to False self._send = self._send_to_buffer @@ -471,7 +521,7 @@ def close_buffer(self): # XXX Remove if `disable_buffering` default is changed to False self._send = self._send_to_server - self._manual_buffer_lock.release() + self._buffering_toggle_lock.release() def _reset_buffer(self): with self._buffer_lock: diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 830b9bcbc..834ce1cb3 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -113,6 +113,11 @@ def telemetry_metrics(metrics=1, events=0, service_checks=0, bytes_sent=0, bytes class TestDogStatsd(unittest.TestCase): + METRIC_TYPE_MAP = { + 'gauge': { 'id': 'g' }, + 'timing': { 'id': 'ms' }, + } + def setUp(self): """ Set up a default Dogstatsd instance and mock the proc filesystem. @@ -144,6 +149,63 @@ def assert_equal_telemetry(self, expected_payload, actual_payload, telemetry=Non return self.assertEqual(expected_payload, actual_payload) + def send_and_assert( + self, + dogstatsd, + expected_metrics, + last_telemetry_size=0, + buffered=False, + ): + """ + Send and then asserts that a chain of metrics arrive in the right order + and with expected telemetry values. + """ + + expected_messages = [] + for metric_type, metric_name, metric_value in expected_metrics: + # Construct the expected message data + metric_type_id = TestDogStatsd.METRIC_TYPE_MAP[metric_type]['id'] + expected_messages.append( + "{}:{}|{}\n".format(metric_name, metric_value, metric_type_id) + ) + + # Send the value + getattr(dogstatsd, metric_type)(metric_name, metric_value) + + # Sanity check + if buffered: + # Ensure that packets didn't arrive immediately if we are expecting + # buffering behavior + self.assertIsNone(dogstatsd.socket.recv(2, no_wait=True)) + + metrics = 1 + if buffered: + metrics = len(expected_messages) + + if buffered: + expected_messages = [ ''.join(expected_messages) ] + + for message in expected_messages: + packets_sent = 1 + # For all ono-initial packets, our current telemetry stats will + # contain the metadata for the last telemetry packet as well. + if last_telemetry_size > 0: + packets_sent += 1 + + expected_metrics=telemetry_metrics( + metrics=metrics, + packets_sent=packets_sent, + bytes_sent=len(message) + last_telemetry_size + ) + self.assert_equal_telemetry( + message, + dogstatsd.socket.recv(2, no_wait=not buffered, reset_wait=True), + telemetry=expected_metrics, + ) + last_telemetry_size = len(expected_metrics) + + return last_telemetry_size + def assert_almost_equal(self, val1, val2, delta): """ Calculates a delta between first and second value and ensures @@ -1001,6 +1063,68 @@ def test_batching_sequential(self): ) ) + def test_batching_runtime_changes(self): + dogstatsd = DogStatsd( + disable_buffering=True, + telemetry_min_flush_interval=0 + ) + dogstatsd.socket = FakeSocket() + + # Send some unbuffered metrics and verify we got it immediately + last_telemetry_size = self.send_and_assert( + dogstatsd, + [ + ('gauge', 'rt.gauge', 123), + ('timing', 'rt.timer', 123), + ], + ) + + # Disable buffering (noop expected) and validate + dogstatsd.disable_buffering = True + last_telemetry_size = self.send_and_assert( + dogstatsd, + [ + ('gauge', 'rt.gauge2', 321), + ('timing', 'rt.timer2', 321), + ], + last_telemetry_size = last_telemetry_size, + ) + + # Enable buffering and validate + dogstatsd.disable_buffering = False + last_telemetry_size = self.send_and_assert( + dogstatsd, + [ + ('gauge', 'buffered.gauge', 12345), + ('timing', 'buffered.timer', 12345), + ], + last_telemetry_size = last_telemetry_size, + buffered=True, + ) + + # Enable buffering again (another noop change expected) + dogstatsd.disable_buffering = False + last_telemetry_size = self.send_and_assert( + dogstatsd, + [ + ('gauge', 'buffered.gauge2', 321), + ('timing', 'buffered.timer2', 321), + ], + last_telemetry_size = last_telemetry_size, + buffered=True, + ) + + # Flip the toggle to unbuffered functionality one more time and verify + dogstatsd.disable_buffering = True + last_telemetry_size = self.send_and_assert( + dogstatsd, + [ + ('gauge', 'rt.gauge3', 333), + ('timing', 'rt.timer3', 333), + ], + last_telemetry_size = last_telemetry_size, + ) + def test_threaded_batching(self): num_threads = 4 threads = []