Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid potential deadlock in getaddrinfo #848

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1531,8 +1531,15 @@ def pre_fork(self):
self._stop_flush_thread()
self._stop_sender_thread()

# Prevent concurrent calls to system libraries (notably
# getaddrinfo) which may leave internal locks in a locked
# state and deadlock the child.
self._socket_lock.acquire()

def post_fork_parent(self):
"""Restore the client state after a fork in the parent process."""
self._socket_lock.release()

if self._disable_aggregating:
self._start_flush_thread(
self._flush_interval,
Expand All @@ -1548,21 +1555,17 @@ def post_fork_parent(self):

def post_fork_child(self):
"""Restore the client state after a fork in the child process."""
self._socket_lock.release()
self._config_lock.release()

# Discard the locks that could have been locked at the time
# when we forked. This may cause inconsistent internal state,
# which we will fix in the next steps.
self._socket_lock = Lock()
self._buffer_lock = RLock()

# Reset the buffer so we don't send metrics from the parent
# process. Also makes sure buffer properties are consistent.
self._reset_buffer()
# Execute the socket_path setter to reconcile transport and
# payload size properties in respect to socket_path value.
self.socket_path = self.socket_path
self.close_socket()

with self._config_lock:
if self._disable_aggregating:
Expand Down
16 changes: 11 additions & 5 deletions tests/integration/dogstatsd/test_statsd_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import itertools
import socket
import threading
import logging
import time

import pytest

from datadog.dogstatsd.base import DogStatsd, SUPPORTS_FORKING

logging.getLogger("datadog.dogstatsd").setLevel(logging.FATAL)

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering",
Expand Down Expand Up @@ -47,18 +50,20 @@ def inner(*args, **kwargs):
def sender_a(statsd, running):
while running[0]:
statsd.gauge("spam", 1)
time.sleep(0)


def sender_b(statsd, signal):
def sender_b(statsd, running):
while running[0]:
with statsd:
statsd.gauge("spam", 1)
time.sleep(0)

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, sender",
"disable_background_sender, disable_buffering, sender_fn",
list(itertools.product([True, False], [True, False], [sender_a, sender_b])),
)
def test_fork_with_thread(disable_background_sender, disable_buffering, sender):
def test_fork_with_thread(disable_background_sender, disable_buffering, sender_fn):
if not SUPPORTS_FORKING:
pytest.skip("os.register_at_fork is required for this test")

Expand All @@ -71,20 +76,21 @@ def test_fork_with_thread(disable_background_sender, disable_buffering, sender):
sender = None
try:
sender_running = [True]
sender = threading.Thread(target=sender, args=(statsd, sender_running))
sender = threading.Thread(target=sender_fn, args=(statsd, sender_running))
sender.daemon = True
sender.start()

pid = os.fork()
if pid == 0:
statsd.gauge("spam", 2)
os._exit(42)

assert pid > 0
(_, status) = os.waitpid(pid, 0)

assert os.WEXITSTATUS(status) == 42
finally:
statsd.stop()
if sender:
sender_running[0] = False
sender.join()
statsd.stop()
13 changes: 0 additions & 13 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2028,16 +2028,3 @@ def test_max_payload_size(self):
self.assertEqual(statsd._max_payload_size, UDP_OPTIMAL_PAYLOAD_LENGTH)
statsd.socket_path = "/foo"
self.assertEqual(statsd._max_payload_size, UDS_OPTIMAL_PAYLOAD_LENGTH)

def test_post_fork_locks(self):
def inner():
statsd = DogStatsd(socket_path=None, port=8125)
# Statsd should survive this sequence of events
statsd.pre_fork()
statsd.get_socket()
statsd.post_fork_parent()
t = Thread(target=inner)
t.daemon = True
t.start()
t.join(timeout=5)
self.assertFalse(t.is_alive())
Loading