Skip to content

Commit

Permalink
[statsd] Add ability to toggle statsd.disable_buffering state durin…
Browse files Browse the repository at this point in the history
…g runtime (#700)

* [statsd] Enable toggling of `disable_buffer` flag at runtime

Since `initialize` uses the module-level DogStatsd instance that is only
initialized once, we cannot rely on new instantiation for changes to the
`disable_buffering` flag. This change allows the consumers to call
`stats.disable_buffering = <bool>` with runtime changes being applied to
the module-level instance. By doing this, we can in future changes allow
`initialize()` to change this flag if meeded.

* [init] Add `statsd_disable_buffering` arg to `datadog.initialize()`

Now that the DogStatsd module can handle live toggling of buffering
switches, we expose this argument in the `datadog.initialize()`. While
not terribly useful currently, it will be very valuable if/when
buffering is enabled on the DogStatsd module by default (again).
  • Loading branch information
sgnn7 authored Nov 17, 2021
1 parent 2dd033f commit 6bf4578
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 21 deletions.
7 changes: 7 additions & 0 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def initialize(
api_host=None, # type: Optional[str]
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_buffering=True, # type: bool
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -71,6 +72,10 @@ def initialize(
:param statsd_port: Port of DogStatsd server or statsd daemon
:type statsd_port: port
:param statsd_disable_buffering: Enable/disable statsd client buffering support
(default: True).
:type statsd_disable_buffering: boolean
:param statsd_use_default_route: Dynamically set the statsd host to the default route
(Useful when running the client in a container)
:type statsd_use_default_route: boolean
Expand Down Expand Up @@ -122,6 +127,8 @@ def initialize(
if statsd_constant_tags:
statsd.constant_tags += statsd_constant_tags

statsd.disable_buffering = statsd_disable_buffering

api._return_raw_response = return_raw_response

# HTTP client and API options
Expand Down
92 changes: 71 additions & 21 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,59 +304,109 @@ def __init__(

self._reset_buffer()

# This lock is used for backwards compatibility to prevent concurrent
# changes to the buffer when the user is managing the buffer themselves
# via deprecated `open_buffer()` and `close_buffer()` functions.
self._manual_buffer_lock = RLock()
# This lock is used for all cases where buffering functionality is
# being toggled (by `open_buffer()`, `close_buffer()`, or
# `self._disable_buffering` calls).
self._buffering_toggle_lock = RLock()

# If buffering is disabled, we bypass the buffer function.
self._send = self._send_to_buffer
if disable_buffering:
log.info("Statsd buffering is disabled")
self._disable_buffering = disable_buffering
if self._disable_buffering:
self._send = self._send_to_server
log.info("Statsd buffering is disabled")

# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.
if not disable_buffering and flush_interval >= MIN_FLUSH_INTERVAL:
self._register_flush_thread(flush_interval)
log.debug(
"Statsd flush thread registered with period of %s",
flush_interval,
)
else:
log.info("Statsd periodic buffer flush is disabled")
self._flush_interval = flush_interval
self._flush_thread_stop = threading.Event()
self._start_flush_thread(self._flush_interval)

def disable_telemetry(self):
self._telemetry = False

def enable_telemetry(self):
self._telemetry = True

def _register_flush_thread(self, sleep_duration):
def _flush_thread_loop(self, sleep_duration):
while True:
time.sleep(sleep_duration)
# Note: Invocations of this method should be thread-safe
def _start_flush_thread(self, flush_interval):
if self._disable_buffering or self._flush_interval <= MIN_FLUSH_INTERVAL:
log.info("Statsd periodic buffer flush is disabled")
return

def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
self.flush()

self._flush_thread = threading.Thread(
name="{}_flush_thread".format(self.__class__.__name__),
target=_flush_thread_loop,
args=(self, sleep_duration,),
args=(self, flush_interval,),
)
self._flush_thread.daemon = True
self._flush_thread.start()

log.debug(
"Statsd flush thread registered with period of %s",
self._flush_interval,
)

# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
log.warning("No statsd flush thread to stop")
return

try:
self.flush()
finally:
pass

self._flush_thread_stop.set()

self._flush_thread.join()
self._flush_thread = None

self._flush_thread_stop.clear()

def _dedicated_telemetry_destination(self):
return bool(self.telemetry_socket_path or self.telemetry_host)

# Context manager helper
def __enter__(self):
self.open_buffer()
return self

# Context manager helper
def __exit__(self, exc_type, value, traceback):
self.close_buffer()

@property
def disable_buffering(self):
with self._buffering_toggle_lock:
return self._disable_buffering

@disable_buffering.setter
def disable_buffering(self, is_disabled):
with self._buffering_toggle_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_buffering == is_disabled:
return

self._disable_buffering = is_disabled

# If buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable the buffering.
if is_disabled:
self._send = self._send_to_server
self._stop_flush_thread()
log.info("Statsd buffering is disabled")
else:
self._send = self._send_to_buffer
self._start_flush_thread(self._flush_interval)

@staticmethod
def resolve_host(host, use_default_route):
"""
Expand Down Expand Up @@ -448,7 +498,7 @@ def open_buffer(self, max_buffer_size=None):
Note: This method must be called before close_buffer() matching invocation.
"""

self._manual_buffer_lock.acquire()
self._buffering_toggle_lock.acquire()

# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_buffer
Expand All @@ -471,7 +521,7 @@ def close_buffer(self):
# XXX Remove if `disable_buffering` default is changed to False
self._send = self._send_to_server

self._manual_buffer_lock.release()
self._buffering_toggle_lock.release()

def _reset_buffer(self):
with self._buffer_lock:
Expand Down
124 changes: 124 additions & 0 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ def telemetry_metrics(metrics=1, events=0, service_checks=0, bytes_sent=0, bytes


class TestDogStatsd(unittest.TestCase):
METRIC_TYPE_MAP = {
'gauge': { 'id': 'g' },
'timing': { 'id': 'ms' },
}

def setUp(self):
"""
Set up a default Dogstatsd instance and mock the proc filesystem.
Expand Down Expand Up @@ -144,6 +149,63 @@ def assert_equal_telemetry(self, expected_payload, actual_payload, telemetry=Non

return self.assertEqual(expected_payload, actual_payload)

def send_and_assert(
self,
dogstatsd,
expected_metrics,
last_telemetry_size=0,
buffered=False,
):
"""
Send and then asserts that a chain of metrics arrive in the right order
and with expected telemetry values.
"""

expected_messages = []
for metric_type, metric_name, metric_value in expected_metrics:
# Construct the expected message data
metric_type_id = TestDogStatsd.METRIC_TYPE_MAP[metric_type]['id']
expected_messages.append(
"{}:{}|{}\n".format(metric_name, metric_value, metric_type_id)
)

# Send the value
getattr(dogstatsd, metric_type)(metric_name, metric_value)

# Sanity check
if buffered:
# Ensure that packets didn't arrive immediately if we are expecting
# buffering behavior
self.assertIsNone(dogstatsd.socket.recv(2, no_wait=True))

metrics = 1
if buffered:
metrics = len(expected_messages)

if buffered:
expected_messages = [ ''.join(expected_messages) ]

for message in expected_messages:
packets_sent = 1
# For all ono-initial packets, our current telemetry stats will
# contain the metadata for the last telemetry packet as well.
if last_telemetry_size > 0:
packets_sent += 1

expected_metrics=telemetry_metrics(
metrics=metrics,
packets_sent=packets_sent,
bytes_sent=len(message) + last_telemetry_size
)
self.assert_equal_telemetry(
message,
dogstatsd.socket.recv(2, no_wait=not buffered, reset_wait=True),
telemetry=expected_metrics,
)
last_telemetry_size = len(expected_metrics)

return last_telemetry_size

def assert_almost_equal(self, val1, val2, delta):
"""
Calculates a delta between first and second value and ensures
Expand Down Expand Up @@ -1001,6 +1063,68 @@ def test_batching_sequential(self):
)
)

def test_batching_runtime_changes(self):
dogstatsd = DogStatsd(
disable_buffering=True,
telemetry_min_flush_interval=0
)
dogstatsd.socket = FakeSocket()

# Send some unbuffered metrics and verify we got it immediately
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge', 123),
('timing', 'rt.timer', 123),
],
)

# Disable buffering (noop expected) and validate
dogstatsd.disable_buffering = True
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge2', 321),
('timing', 'rt.timer2', 321),
],
last_telemetry_size = last_telemetry_size,
)

# Enable buffering and validate
dogstatsd.disable_buffering = False
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'buffered.gauge', 12345),
('timing', 'buffered.timer', 12345),
],
last_telemetry_size = last_telemetry_size,
buffered=True,
)

# Enable buffering again (another noop change expected)
dogstatsd.disable_buffering = False
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'buffered.gauge2', 321),
('timing', 'buffered.timer2', 321),
],
last_telemetry_size = last_telemetry_size,
buffered=True,
)

# Flip the toggle to unbuffered functionality one more time and verify
dogstatsd.disable_buffering = True
last_telemetry_size = self.send_and_assert(
dogstatsd,
[
('gauge', 'rt.gauge3', 333),
('timing', 'rt.timer3', 333),
],
last_telemetry_size = last_telemetry_size,
)

def test_threaded_batching(self):
num_threads = 4
threads = []
Expand Down

0 comments on commit 6bf4578

Please sign in to comment.