From f8be9f2090264375ed6077f6ede00b5c17e5afd0 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 15 Nov 2024 14:26:19 -0500 Subject: [PATCH] AMLII-2166 - Add UDS Streams support to the DogStatsD client Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path. --- datadog/dogstatsd/base.py | 46 ++++++++++++++--- .../dogstatsd/test_statsd_sender.py | 51 +++++++++++++++++-- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 28a62d896..2194f9844 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -49,6 +49,11 @@ DEFAULT_HOST = "localhost" DEFAULT_PORT = 8125 +# Socket prefixes +UNIX_ADDRESS_SCHEME = "unix://" +UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://" +UNIX_ADDRESS_STREAM_SCHEME = "unixstream://" + # Buffering-related values (in seconds) DEFAULT_FLUSH_INTERVAL = 0.3 MIN_FLUSH_INTERVAL = 0.0001 @@ -731,11 +736,40 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE): @classmethod def _get_uds_socket(cls, socket_path, timeout): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.settimeout(timeout) - cls._ensure_min_send_buffer_size(sock) - sock.connect(socket_path) - return sock + valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM] + if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME): + valid_socket_kinds = [socket.SOCK_DGRAM] + socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):] + elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME): + valid_socket_kinds = [socket.SOCK_STREAM] + socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):] + elif socket_path.startswith(UNIX_ADDRESS_SCHEME): + socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):] + + last_error = ValueError("Invalid socket path") + for socket_kind in valid_socket_kinds: + try: + # This will fail in python2.7 + sk_name = socket_kind.name + except AttributeError: + sk_name = socket_kind + + try: + sock = socket.socket(socket.AF_UNIX, socket_kind) + sock.settimeout(timeout) + cls._ensure_min_send_buffer_size(sock) + sock.connect(socket_path) + log.debug("Connected to socket %s with kind %s", socket_path, sk_name) + return sock + except Exception as e: + if sock is not None: + sock.close() + log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e) + if e.errno == errno.EPROTOTYPE: + last_error = e + continue + raise e + raise last_error @classmethod def _get_udp_socket(cls, host, port, timeout): @@ -1253,7 +1287,7 @@ def _xmit_packet(self, packet, is_telemetry): ) self.close_socket() except Exception as exc: - print("Unexpected error: %s", exc) + print("Unexpected error: ", exc) log.error("Unexpected error: %s", str(exc)) if not is_telemetry and self._telemetry: diff --git a/tests/integration/dogstatsd/test_statsd_sender.py b/tests/integration/dogstatsd/test_statsd_sender.py index 55710c173..23d5c075b 100644 --- a/tests/integration/dogstatsd/test_statsd_sender.py +++ b/tests/integration/dogstatsd/test_statsd_sender.py @@ -1,18 +1,23 @@ +from contextlib import closing import itertools +import os +import shutil import socket +import tempfile from threading import Thread +import uuid import pytest from datadog.dogstatsd.base import DogStatsd @pytest.mark.parametrize( - "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop", - list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])), + "disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind", + list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])), ) -def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop): +def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind): # Test basic sender operation with an assortment of options - foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0) + foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0) statsd = DogStatsd( telemetry_min_flush_interval=0, disable_background_sender=disable_background_sender, @@ -101,3 +106,41 @@ def test_buffering_with_context(): bar.settimeout(5) msg = bar.recv(8192) assert msg == b"first:1|c\n" + +@pytest.fixture() +def socket_dir(): + tempdir = tempfile.mkdtemp() + yield tempdir + shutil.rmtree(tempdir) + +@pytest.mark.parametrize( + "socket_prefix, socket_kind, success", + [ + ("", socket.SOCK_DGRAM, True), + ("", socket.SOCK_STREAM, True), + ("unix://", socket.SOCK_DGRAM, True), + ("unix://", socket.SOCK_STREAM, True), + ("unixstream://", socket.SOCK_DGRAM, False), + ("unixstream://", socket.SOCK_STREAM, True), + ("unixgram://", socket.SOCK_DGRAM, True), + ("unixgram://", socket.SOCK_STREAM, False) + ] +) +def test_socket_connection(socket_dir, socket_prefix, socket_kind, success): + socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock") + listener_socket = socket.socket(socket.AF_UNIX, socket_kind) + listener_socket.bind(socket_path) + + if socket_kind == socket.SOCK_STREAM: + listener_socket.listen(1) + + with closing(listener_socket): + statsd = DogStatsd( + socket_path = socket_prefix + socket_path + ) + + if success: + assert statsd.get_socket() is not None + else: + with pytest.raises(socket.error): + statsd.get_socket()