From f482ba9c952b4520f91c272612ee315ed681fed5 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 16 Aug 2024 17:06:04 +0000 Subject: [PATCH 1/5] Fix test_fork_with_thread Test failed to create a sender thread due to a name clash between sender the thread and the sender callback parameter. Sender thread produces lots of expected errors (of "Connection refused" kind), so we also silence errors from the client library. --- tests/integration/dogstatsd/test_statsd_fork.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration/dogstatsd/test_statsd_fork.py b/tests/integration/dogstatsd/test_statsd_fork.py index c856376e0..3a8aa5fd3 100644 --- a/tests/integration/dogstatsd/test_statsd_fork.py +++ b/tests/integration/dogstatsd/test_statsd_fork.py @@ -2,11 +2,13 @@ import itertools import socket import threading +import logging import pytest from datadog.dogstatsd.base import DogStatsd, SUPPORTS_FORKING +logging.getLogger("datadog.dogstatsd").setLevel(logging.FATAL) @pytest.mark.parametrize( "disable_background_sender, disable_buffering", @@ -49,16 +51,16 @@ def sender_a(statsd, running): statsd.gauge("spam", 1) -def sender_b(statsd, signal): +def sender_b(statsd, running): while running[0]: with statsd: statsd.gauge("spam", 1) @pytest.mark.parametrize( - "disable_background_sender, disable_buffering, sender", + "disable_background_sender, disable_buffering, sender_fn", list(itertools.product([True, False], [True, False], [sender_a, sender_b])), ) -def test_fork_with_thread(disable_background_sender, disable_buffering, sender): +def test_fork_with_thread(disable_background_sender, disable_buffering, sender_fn): if not SUPPORTS_FORKING: pytest.skip("os.register_at_fork is required for this test") @@ -71,7 +73,7 @@ def test_fork_with_thread(disable_background_sender, disable_buffering, sender): sender = None try: sender_running = [True] - sender = threading.Thread(target=sender, args=(statsd, sender_running)) + sender = threading.Thread(target=sender_fn, args=(statsd, sender_running)) sender.daemon = True sender.start() From 4a6146bd7cf413d4869a09583b02e22e885f1294 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 16 Aug 2024 17:15:38 +0000 Subject: [PATCH 2/5] Improve responsiveness test_fork_with_thread time.sleep(0) allows python to switch to another thread, allowing main thread to progress through stages quicker, while still being able to catch deadlocks from getaddrinfo or improper locking in fork hooks. --- tests/integration/dogstatsd/test_statsd_fork.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/dogstatsd/test_statsd_fork.py b/tests/integration/dogstatsd/test_statsd_fork.py index 3a8aa5fd3..495b1cb57 100644 --- a/tests/integration/dogstatsd/test_statsd_fork.py +++ b/tests/integration/dogstatsd/test_statsd_fork.py @@ -3,6 +3,7 @@ import socket import threading import logging +import time import pytest @@ -49,12 +50,14 @@ def inner(*args, **kwargs): def sender_a(statsd, running): while running[0]: statsd.gauge("spam", 1) + time.sleep(0) def sender_b(statsd, running): while running[0]: with statsd: statsd.gauge("spam", 1) + time.sleep(0) @pytest.mark.parametrize( "disable_background_sender, disable_buffering, sender_fn", @@ -86,7 +89,7 @@ def test_fork_with_thread(disable_background_sender, disable_buffering, sender_f assert os.WEXITSTATUS(status) == 42 finally: - statsd.stop() if sender: sender_running[0] = False sender.join() + statsd.stop() From 7ed936c2501ebe57a413e02a5eab1e46166c7e6d Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Fri, 16 Aug 2024 17:18:19 +0000 Subject: [PATCH 3/5] Extend test_fork_with_thread coverage Try sending metric from the child process. This makes sure that all locks are still in a serviceable state after the fork. --- tests/integration/dogstatsd/test_statsd_fork.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/dogstatsd/test_statsd_fork.py b/tests/integration/dogstatsd/test_statsd_fork.py index 495b1cb57..49337a827 100644 --- a/tests/integration/dogstatsd/test_statsd_fork.py +++ b/tests/integration/dogstatsd/test_statsd_fork.py @@ -82,6 +82,7 @@ def test_fork_with_thread(disable_background_sender, disable_buffering, sender_f pid = os.fork() if pid == 0: + statsd.gauge("spam", 2) os._exit(42) assert pid > 0 From fb20759b77edfb1f0ce4b77f1a66589139443ce6 Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Thu, 15 Aug 2024 16:39:47 +0000 Subject: [PATCH 4/5] Avoid deadlock in getaddrinfo getaddrinfo may use an internal lock that, in case of a concurrent fork, may be left in locked state and cause child process to deadlock. --- datadog/dogstatsd/base.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 72b3a7901..8b503b6b7 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -1531,8 +1531,15 @@ def pre_fork(self): self._stop_flush_thread() self._stop_sender_thread() + # Prevent concurrent calls to system libraries (notably + # getaddrinfo) which may leave internal locks in a locked + # state and deadlock the child. + self._socket_lock.acquire() + def post_fork_parent(self): """Restore the client state after a fork in the parent process.""" + self._socket_lock.release() + if self._disable_aggregating: self._start_flush_thread( self._flush_interval, @@ -1548,21 +1555,17 @@ def post_fork_parent(self): def post_fork_child(self): """Restore the client state after a fork in the child process.""" + self._socket_lock.release() self._config_lock.release() # Discard the locks that could have been locked at the time # when we forked. This may cause inconsistent internal state, # which we will fix in the next steps. - self._socket_lock = Lock() self._buffer_lock = RLock() # Reset the buffer so we don't send metrics from the parent # process. Also makes sure buffer properties are consistent. self._reset_buffer() - # Execute the socket_path setter to reconcile transport and - # payload size properties in respect to socket_path value. - self.socket_path = self.socket_path - self.close_socket() with self._config_lock: if self._disable_aggregating: From 70b3d02461623c07179d92bb1fa84ce616f3e23b Mon Sep 17 00:00:00 2001 From: Vikentiy Fesunov Date: Mon, 19 Aug 2024 13:23:50 +0000 Subject: [PATCH 5/5] Remove obsolete test pre_fork is now mutually exclusive with get_socket() until post_fork is called, so the sequence of events that the test was testing is no longer possible. --- tests/unit/dogstatsd/test_statsd.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 350c94843..a27b5bdc4 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -2028,16 +2028,3 @@ def test_max_payload_size(self): self.assertEqual(statsd._max_payload_size, UDP_OPTIMAL_PAYLOAD_LENGTH) statsd.socket_path = "/foo" self.assertEqual(statsd._max_payload_size, UDS_OPTIMAL_PAYLOAD_LENGTH) - - def test_post_fork_locks(self): - def inner(): - statsd = DogStatsd(socket_path=None, port=8125) - # Statsd should survive this sequence of events - statsd.pre_fork() - statsd.get_socket() - statsd.post_fork_parent() - t = Thread(target=inner) - t.daemon = True - t.start() - t.join(timeout=5) - self.assertFalse(t.is_alive())