diff --git a/.github/workflows/changelog.yaml b/.github/workflows/changelog.yaml index 6dd28dcc4..c3454b79c 100644 --- a/.github/workflows/changelog.yaml +++ b/.github/workflows/changelog.yaml @@ -1,4 +1,8 @@ name: "Ensure labels" + +permissions: + pull-requests: read + on: # yamllint disable-line rule:truthy pull_request: types: diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 330c87a2f..387ae5dd1 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -1,5 +1,9 @@ name: "CodeQL" +permissions: + contents: read + checks: write + on: push: branches: [ master ] diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 0bd3565ac..efe88ab83 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -1,4 +1,9 @@ name: "Pull Request Labeler" + +permissions: + contents: read + pull-requests: write + on: - pull_request diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 8008ff16f..1898f83c6 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -1,5 +1,9 @@ name: Build +permissions: + contents: write + pull-requests: write + on: pull_request: release: diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index c5291bfc5..51f9ad156 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -1,6 +1,12 @@ # Configuration for https://github.com/actions/stale name: "Stale issues and pull requests" + +permissions: + contents: write + issues: write + pull-requests: write + on: schedule: - cron: "0 5 * * *" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 36572d945..f86f06175 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,5 +1,8 @@ name: test +permissions: + contents: read + on: push: branches: diff --git a/.github/workflows/test_integration.yml b/.github/workflows/test_integration.yml index cd8d494ad..152c0b500 100644 --- a/.github/workflows/test_integration.yml +++ b/.github/workflows/test_integration.yml @@ -1,5 +1,8 @@ name: Run Integration Tests +permissions: + contents: read + on: # yamllint disable-line rule:truthy pull_request: types: diff --git a/CHANGELOG.md b/CHANGELOG.md index e6dc44945..c9e32dce2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 0.50.1 / 2024-09-18 + +* [Added] Add the ability for buffering and aggregation to work at the same time. See [#851](https://github.com/DataDog/datadogpy/pull/851). + +## v0.50.0 / 2024-08-20 + +* [Added] Add client side aggregation. See [#844](https://github.com/DataDog/datadogpy/pull/844). +* [Added] Add metric object type. See [#837](https://github.com/DataDog/datadogpy/pull/837). +* [Added] Support passing Unix timestamps to dogstatsd. See [#831](https://github.com/DataDog/datadogpy/pull/831). +* [Fixed] Fix a potential deadlock on fork. See [#836](https://github.com/DataDog/datadogpy/pull/836). +* [Changed] feat(origin detection): send both container ID and Entity ID. See [#828](https://github.com/DataDog/datadogpy/pull/828). + ## 0.49.1 / 2024-03-18 * [Fixed] Fix potential metric loss when open_buffer is combined with disable_buffering=False. See [#820](https://github.com/DataDog/datadogpy/pull/820). diff --git a/RELEASING.md b/RELEASING.md index 2d8c3e9fb..f3e6e16b1 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -21,7 +21,7 @@ This project does not have a strict release schedule. However, we would make a r Our team will trigger the release pipeline. ### Prerequisite -- Install [datadog_checks_dev](https://datadog-checks-base.readthedocs.io/en/latest/datadog_checks_dev.cli.html#installation) using Python 3. +- Install [datadog_checks_dev](https://datadoghq.dev/integrations-core/setup/#ddev) using Python 3. - Setup PyPI, see the internal documentation for more details ### Update Changelog and version diff --git a/datadog/__init__.py b/datadog/__init__.py index 8a04d8155..dec93813a 100644 --- a/datadog/__init__.py +++ b/datadog/__init__.py @@ -37,9 +37,9 @@ def initialize( api_host=None, # type: Optional[str] statsd_host=None, # type: Optional[str] statsd_port=None, # type: Optional[int] - statsd_disable_aggregator=True, # type: bool + statsd_disable_aggregation=True, # type: bool statsd_disable_buffering=True, # type: bool - statsd_aggregation_flush_interval=2, # type: float + statsd_aggregation_flush_interval=0.3, # type: float statsd_use_default_route=False, # type: bool statsd_socket_path=None, # type: Optional[str] statsd_namespace=None, # type: Optional[str] @@ -78,12 +78,13 @@ def initialize( (default: True). :type statsd_disable_buffering: boolean - :param statsd_disable_aggregator: Enable/disable statsd client aggregation support + :param statsd_disable_aggregation: Enable/disable statsd client aggregation support (default: True). - :type statsd_disable_aggregator: boolean + :type statsd_disable_aggregation: boolean - :param statsd_aggregation_flush_interval: Sets the flush interval for aggregation - (default: 2 seconds) + :param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for + aggregation/buffering + (default: 0.3 seconds) :type statsd_aggregation_flush_interval: float :param statsd_use_default_route: Dynamically set the statsd host to the default route @@ -138,7 +139,7 @@ def initialize( if statsd_constant_tags: statsd.constant_tags += statsd_constant_tags - if statsd_disable_aggregator: + if statsd_disable_aggregation: statsd.disable_aggregation() else: statsd.enable_aggregation(statsd_aggregation_flush_interval) diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 8b503b6b7..690922c4c 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -50,11 +50,9 @@ DEFAULT_PORT = 8125 # Buffering-related values (in seconds) -DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3 +DEFAULT_FLUSH_INTERVAL = 0.3 MIN_FLUSH_INTERVAL = 0.0001 -# Aggregation-related values (in seconds) -DEFAULT_AGGREGATION_FLUSH_INTERVAL = 2 # Env var to enable/disable sending the container ID field ORIGIN_DETECTION_ENABLED = "DD_ORIGIN_DETECTION_ENABLED" @@ -147,8 +145,8 @@ def __init__( host=DEFAULT_HOST, # type: Text port=DEFAULT_PORT, # type: int max_buffer_size=None, # type: None - flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float - disable_aggregating=True, # type: bool + flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float + disable_aggregation=True, # type: bool disable_buffering=True, # type: bool namespace=None, # type: Optional[Text] constant_tags=None, # type: Optional[List[str]] @@ -238,8 +236,8 @@ def __init__( it overrides the default value. :type flush_interval: float - :disable_aggregating: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client - :type disable_aggregating: bool + :disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client + :type disable_aggregation: bool :disable_buffering: If set, metrics are no longered buffered by the client and all data is sent synchronously to the server @@ -447,7 +445,7 @@ def __init__( self._config_lock = RLock() self._disable_buffering = disable_buffering - self._disable_aggregating = disable_aggregating + self._disable_aggregation = disable_aggregation self._flush_interval = flush_interval self._flush_thread = None @@ -455,26 +453,16 @@ def __init__( self.aggregator = Aggregator() # Indicates if the process is about to fork, so we shouldn't start any new threads yet. self._forking = False - # Currently, we do not allow both aggregation and buffering, we may revisit this in the future - if self._disable_buffering and self._disable_aggregating: - self._send = self._send_to_server - log.debug("Statsd buffering and aggregation is disabled") - elif self._disable_aggregating: - # Start the flush thread if buffering is enabled and the interval is above - # a reasonable range. This both prevents thrashing and allow us to use "0.0" - # as a value for disabling the automatic flush timer as well. + + if not self._disable_buffering: self._send = self._send_to_buffer - self._start_flush_thread( - self._flush_interval, - self.flush_buffered_metrics, - ) else: self._send = self._send_to_server - self._disable_buffering = True - self._start_flush_thread( - self._flush_interval, - self.flush_aggregated_metrics, - ) + + if not self._disable_aggregation or not self._disable_buffering: + self._start_flush_thread() + else: + log.debug("Statsd buffering and aggregation is disabled") self._queue = None self._sender_thread = None @@ -551,30 +539,14 @@ def enable_telemetry(self): self._telemetry = True # Note: Invocations of this method should be thread-safe - def _start_flush_thread( - self, - flush_interval, - flush_function, - ): - if (self._disable_buffering or not self._disable_aggregating) and flush_function == self.flush_buffered_metrics: - log.debug("Statsd periodic buffer flush is disabled") - return - if ( - self._disable_aggregating - and flush_function == self.flush_aggregated_metrics - ): - log.debug("Statsd periodic aggregating flush is disabled") + def _start_flush_thread(self): + if self._disable_aggregation and self.disable_buffering: + log.debug("Statsd periodic buffer and aggregation flush is disabled") return - flush_type = "" - if self._disable_buffering: - flush_type = "aggregation" - else: - flush_type = "buffering" - - if flush_interval <= MIN_FLUSH_INTERVAL: + if self._flush_interval <= MIN_FLUSH_INTERVAL: log.debug( - "the set flush interval for %s is less then the minimum", flush_type + "the set flush interval is less then the minimum" ) return @@ -587,19 +559,20 @@ def _start_flush_thread( def _flush_thread_loop(self, flush_interval): while not self._flush_thread_stop.is_set(): time.sleep(flush_interval) - flush_function() - + if not self._disable_aggregation: + self.flush_aggregated_metrics() + if not self._disable_buffering: + self.flush_buffered_metrics() self._flush_thread = threading.Thread( name="{}_flush_thread".format(self.__class__.__name__), target=_flush_thread_loop, - args=(self, flush_interval,), + args=(self, self._flush_interval,), ) self._flush_thread.daemon = True self._flush_thread.start() log.debug( - "Statsd %s flush thread registered with period of %s", - flush_type, - flush_interval, + "Statsd flush thread registered with period of %s", + self._flush_interval, ) # Note: Invocations of this method should be thread-safe @@ -607,10 +580,10 @@ def _stop_flush_thread(self): if not self._flush_thread: return try: - if self._disable_aggregating: - self.flush_buffered_metrics() - else: + if not self._disable_aggregation: self.flush_aggregated_metrics() + if not self.disable_buffering: + self.flush_buffered_metrics() finally: pass @@ -645,43 +618,40 @@ def disable_buffering(self, is_disabled): self._disable_buffering = is_disabled - # If buffering has been disabled, flush and kill the background thread + # If buffering (and aggregation) has been disabled, flush and kill the background thread # otherwise start up the flushing thread and enable the buffering. if is_disabled: self._send = self._send_to_server - self._stop_flush_thread() + if self._disable_aggregation and self.disable_buffering: + self._stop_flush_thread() log.debug("Statsd buffering is disabled") else: self._send = self._send_to_buffer - self._start_flush_thread( - self._flush_interval, - self.flush_buffered_metrics, - ) + self._start_flush_thread() def disable_aggregation(self): with self._config_lock: # If the toggle didn't change anything, this method is a noop - if self._disable_aggregating: + if self._disable_aggregation: return - self._disable_aggregating = True + self._disable_aggregation = True - # If aggregation has been disabled, flush and kill the background thread + # If aggregation and buffering has been disabled, flush and kill the background thread # otherwise start up the flushing thread and enable aggregation. - self._stop_flush_thread() + if self._disable_aggregation and self.disable_buffering: + self._stop_flush_thread() log.debug("Statsd aggregation is disabled") - def enable_aggregation(self, aggregation_flush_interval=DEFAULT_AGGREGATION_FLUSH_INTERVAL): + def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL): with self._config_lock: - if not self._disable_aggregating: + if not self._disable_aggregation: return - self._disable_aggregating = False - self._flush_interval = aggregation_flush_interval - self._send = self._send_to_server - self._start_flush_thread( - self._flush_interval, - self.flush_aggregated_metrics, - ) + self._disable_aggregation = False + self._flush_interval = flush_interval + if self._disable_buffering: + self._send = self._send_to_server + self._start_flush_thread() @staticmethod def resolve_host(host, use_default_route): @@ -867,7 +837,7 @@ def gauge( >>> statsd.gauge("users.online", 123) >>> statsd.gauge("active.connections", 1001, tags=["protocol:http"]) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "g", value, tags, sample_rate) else: self.aggregator.gauge(metric, value, tags, sample_rate) @@ -890,7 +860,7 @@ def gauge_with_timestamp( >>> statsd.gauge("users.online", 123, 1713804588) >>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"]) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "g", value, tags, sample_rate, timestamp) else: self.aggregator.gauge(metric, value, tags, sample_rate, timestamp) @@ -908,7 +878,7 @@ def count( >>> statsd.count("page.views", 123) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "c", value, tags, sample_rate) else: self.aggregator.count(metric, value, tags, sample_rate) @@ -930,7 +900,7 @@ def count_with_timestamp( >>> statsd.count("files.transferred", 124, timestamp=1713804588) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "c", value, tags, sample_rate, timestamp) else: self.aggregator.count(metric, value, tags, sample_rate, timestamp) @@ -949,7 +919,7 @@ def increment( >>> statsd.increment("page.views") >>> statsd.increment("files.transferred", 124) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "c", value, tags, sample_rate) else: self.aggregator.count(metric, value, tags, sample_rate) @@ -969,7 +939,7 @@ def decrement( >>> statsd.decrement("active.connections", 2) """ metric_value = -value if value else value - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "c", metric_value, tags, sample_rate) else: self.aggregator.count(metric, metric_value, tags, sample_rate) @@ -1080,7 +1050,7 @@ def set(self, metric, value, tags=None, sample_rate=None): >>> statsd.set("visitors.uniques", 999) """ - if self._disable_aggregating: + if self._disable_aggregation: self._report(metric, "s", value, tags, sample_rate) else: self.aggregator.set(metric, value, tags, sample_rate) @@ -1539,17 +1509,7 @@ def pre_fork(self): def post_fork_parent(self): """Restore the client state after a fork in the parent process.""" self._socket_lock.release() - - if self._disable_aggregating: - self._start_flush_thread( - self._flush_interval, - self.flush_buffered_metrics, - ) - else: - self._start_flush_thread( - self._flush_interval, - self.flush_aggregated_metrics, - ) + self._start_flush_thread() self._start_sender_thread() self._config_lock.release() @@ -1568,16 +1528,7 @@ def post_fork_child(self): self._reset_buffer() with self._config_lock: - if self._disable_aggregating: - self._start_flush_thread( - self._flush_interval, - self.flush_buffered_metrics, - ) - else: - self._start_flush_thread( - self._flush_interval, - self.flush_aggregated_metrics, - ) + self._start_flush_thread() self._start_sender_thread() def stop(self): @@ -1590,9 +1541,9 @@ def stop(self): self.disable_background_sender() self._disable_buffering = True - self._disable_aggregating = True - self.flush_buffered_metrics() + self._disable_aggregation = True self.flush_aggregated_metrics() + self.flush_buffered_metrics() self.close_socket() diff --git a/datadog/version.py b/datadog/version.py index d4c710a5b..0b5aa08f1 100644 --- a/datadog/version.py +++ b/datadog/version.py @@ -1 +1 @@ -__version__ = "0.49.2.dev" +__version__ = "0.50.2.dev" diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index a27b5bdc4..8949f6431 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -29,7 +29,7 @@ # Datadog libraries from datadog import initialize, statsd from datadog import __version__ as version -from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH +from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH from datadog.dogstatsd.context import TimedContextManagerDecorator from datadog.util.compat import is_higher_py35, is_p3k from tests.util.contextmanagers import preserve_environment_variable, EnvVars @@ -41,7 +41,7 @@ class FakeSocket(object): FLUSH_GRACE_PERIOD = 0.2 - def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL): + def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL): self.payloads = deque() self._flush_interval = flush_interval @@ -1087,6 +1087,34 @@ def test_flush_interval(self): 'page.views:1|c\n', fake_socket.recv(2, no_wait=True) ) + + def test_aggregation_buffering_simultaneously(self): + dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, telemetry_min_flush_interval=0) + fake_socket = FakeSocket() + dogstatsd.socket = fake_socket + for _ in range(10): + dogstatsd.increment('test.aggregation_and_buffering') + self.assertIsNone(fake_socket.recv(no_wait=True)) + dogstatsd.flush_aggregated_metrics() + dogstatsd.flush_buffered_metrics() + self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2)) + + def test_aggregation_buffering_simultaneously_with_interval(self): + dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, flush_interval=1, telemetry_min_flush_interval=0) + fake_socket = FakeSocket() + dogstatsd.socket = fake_socket + for _ in range(10): + dogstatsd.increment('test.aggregation_and_buffering_with_interval') + self.assertIsNone(fake_socket.recv(no_wait=True)) + + time.sleep(0.3) + self.assertIsNone(fake_socket.recv(no_wait=True)) + + time.sleep(1) + self.assert_equal_telemetry( + 'test.aggregation_and_buffering_with_interval:10|c\n', + fake_socket.recv(2, no_wait=True) + ) def test_disable_buffering(self): dogstatsd = DogStatsd(disable_buffering=True, telemetry_min_flush_interval=0) @@ -1111,7 +1139,7 @@ def test_flush_disable(self): dogstatsd.increment('page.views') self.assertIsNone(fake_socket.recv(no_wait=True)) - time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL) + time.sleep(DEFAULT_FLUSH_INTERVAL) self.assertIsNone(fake_socket.recv(no_wait=True)) time.sleep(0.3)