Skip to content

Commit

Permalink
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Browse files Browse the repository at this point in the history
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes
utilized by DD_DOGSTATSD_URL in preparation to support that feature.

Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
  • Loading branch information
ddrthall committed Nov 15, 2024
1 parent ab20e29 commit e62632e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 10 deletions.
40 changes: 34 additions & 6 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8125

# Socket prefixes
UNIX_ADDRESS_SCHEME = "unix://"
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"

# Buffering-related values (in seconds)
DEFAULT_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001
Expand Down Expand Up @@ -731,11 +736,34 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):

@classmethod
def _get_uds_socket(cls, socket_path, timeout):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
return sock
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
valid_socket_kinds = [socket.SOCK_DGRAM]
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
valid_socket_kinds = [socket.SOCK_STREAM]
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]

last_error = ValueError("Invalid socket path")
for socket_kind in valid_socket_kinds:
try:
sock = socket.socket(socket.AF_UNIX, socket_kind)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
log.debug("Connected to socket %s with kind %s", socket_path, socket_kind.name)
return sock
except Exception as e:
if sock is not None:
sock.close()
log.debug("Failed to connect to %s with kind %s: %s", socket_path, socket_kind.name, e)
if e.errno == errno.EPROTOTYPE:
last_error = e
continue
raise e
raise last_error

@classmethod
def _get_udp_socket(cls, host, port, timeout):
Expand Down Expand Up @@ -1253,7 +1281,7 @@ def _xmit_packet(self, packet, is_telemetry):
)
self.close_socket()
except Exception as exc:
print("Unexpected error: %s", exc)
print("Unexpected error: ", exc)
log.error("Unexpected error: %s", str(exc))

if not is_telemetry and self._telemetry:
Expand Down
51 changes: 47 additions & 4 deletions tests/integration/dogstatsd/test_statsd_sender.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import itertools
import os
import shutil
import socket
import tempfile
from threading import Thread
import uuid

import pytest

from datadog.dogstatsd.base import DogStatsd

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
)
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
# Test basic sender operation with an assortment of options
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
statsd = DogStatsd(
telemetry_min_flush_interval=0,
disable_background_sender=disable_background_sender,
Expand Down Expand Up @@ -101,3 +105,42 @@ def test_buffering_with_context():
bar.settimeout(5)
msg = bar.recv(8192)
assert msg == b"first:1|c\n"

@pytest.fixture()
def socket_dir():
tempdir = tempfile.mkdtemp()
yield tempdir
shutil.rmtree(tempdir)

@pytest.mark.parametrize(
"socket_prefix, socket_kind, success",
[
("", socket.SOCK_DGRAM, True),
("", socket.SOCK_STREAM, True),
("unix://", socket.SOCK_DGRAM, True),
("unix://", socket.SOCK_STREAM, True),
("unixstream://", socket.SOCK_DGRAM, False),
("unixstream://", socket.SOCK_STREAM, True),
("unixgram://", socket.SOCK_DGRAM, True),
("unixgram://", socket.SOCK_STREAM, False)
]
)
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
listener_socket.bind(socket_path)

if socket_kind == socket.SOCK_STREAM:
listener_socket.listen(1)

statsd = DogStatsd(
socket_path = socket_prefix + socket_path
)

if success:
assert statsd.get_socket() is not None
else:
with pytest.raises(OSError):
statsd.get_socket()

listener_socket.close()

0 comments on commit e62632e

Please sign in to comment.