From 2a993c99797b98644c970a0d601bae30d6101f3e Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Thu, 16 Jan 2025 09:28:18 +0900 Subject: [PATCH] comform --- ops/tracing/_fixme.py | 132 +++++++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 61 deletions(-) diff --git a/ops/tracing/_fixme.py b/ops/tracing/_fixme.py index 30de2e677..1abaf1408 100644 --- a/ops/tracing/_fixme.py +++ b/ops/tracing/_fixme.py @@ -1,16 +1,14 @@ # Copyright 2022 Canonical Ltd. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. from __future__ import annotations import os @@ -251,7 +249,8 @@ def _remove_stale_otel_sdk_packages(): This only has an effect if executed on an upgrade-charm event. """ - # all imports are local to keep this function standalone, side-effect-free, and easy to revert later + # all imports are local to keep this function standalone, side-effect-free, and easy + # to revert later import os if os.getenv('JUJU_DISPATCH_PATH') != 'hooks/upgrade-charm': @@ -274,7 +273,8 @@ def _remove_stale_otel_sdk_packages(): otel_logger.debug(f'Found {len(otel_distributions)} opentelemetry distributions') - # If we have multiple distributions with the same name, remove any that have 0 associated files + # If we have multiple distributions with the same name, remove any that have 0 + # associated files for name, distributions_ in otel_distributions.items(): if len(distributions_) <= 1: continue @@ -289,8 +289,8 @@ def _remove_stale_otel_sdk_packages(): otel_logger.debug('Successfully applied _remove_stale_otel_sdk_packages patch. ') -# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. -# it could be trouble if someone ever decides to implement their own tracer parallel to +# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. it +# could be trouble if someone ever decides to implement their own tracer parallel to # ours and before the charm has inited. We assume they won't. _remove_stale_otel_sdk_packages() @@ -351,9 +351,9 @@ def _remove_stale_otel_sdk_packages(): CHARM_TRACING_ENABLED = 'CHARM_TRACING_ENABLED' BUFFER_DEFAULT_CACHE_FILE_NAME = '.charm_tracing_buffer.raw' -# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to serialize/deserialize it in -# any portable format. Json dumping is supported, but loading isn't. -# cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003 +# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to +# serialize/deserialize it in any portable format. Json dumping is supported, but +# loading isn't. cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003 BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB = 10 _BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN = 10 @@ -421,12 +421,13 @@ def _save(self, spans: Sequence[ReadableSpan], replace: bool = False): new = self._serialize(spans) try: - # if the buffer exceeds the size limit, we start dropping old spans until it does + # if the buffer exceeds the size limit, we start dropping old spans until it + # does while len(new + self._SPANSEP.join(old)) > (self._max_buffer_size_mib * _MiB_TO_B): if not old: - # if we've already dropped all spans and still we can't get under the - # size limit, we can't save this span + # if we've already dropped all spans and still we can't get under + # the size limit, we can't save this span logger.error( f'span exceeds total buffer size limit ({self._max_buffer_size_mib}MiB); ' f'buffering FAILED' @@ -523,14 +524,16 @@ class _OTLPSpanExporter(OTLPSpanExporter): it fails a bit faster. """ - # The issue we're trying to solve is that the model takes AGES to settle if e.g. tls is misconfigured, - # as every hook of a charm_tracing-instrumented charm takes about a minute to exit, as the charm can't - # flush the traces and keeps retrying for 'too long' + # The issue we're trying to solve is that the model takes AGES to settle if e.g. tls + # is misconfigured, as every hook of a charm_tracing-instrumented charm takes about + # a minute to exit, as the charm can't flush the traces and keeps retrying for 'too + # long' _MAX_RETRY_TIMEOUT = 4 - # we give the exporter 4 seconds in total to succeed pushing the traces to tempo - # if it fails, we'll be caching the data in the buffer and flush it the next time, so there's no data loss risk. - # this means 2/3 retries (hard to guess from the implementation) and up to ~7 seconds total wait + # we give the exporter 4 seconds in total to succeed pushing the traces to tempo if + # it fails, we'll be caching the data in the buffer and flush it the next time, so + # there's no data loss risk. this means 2/3 retries (hard to guess from the + # implementation) and up to ~7 seconds total wait class _BufferedExporter(InMemorySpanExporter): @@ -543,7 +546,8 @@ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: return super().export(spans) def force_flush(self, timeout_millis: int = 0) -> bool: - # parent implementation is fake, so the timeout_millis arg is not doing anything. + # parent implementation is fake, so the timeout_millis arg is not doing + # anything. result = super().force_flush(timeout_millis) self._buffer.save(self.get_finished_spans()) return result @@ -593,8 +597,9 @@ def _get_tracer() -> Optional[Tracer]: try: return tracer.get() except LookupError: - # fallback: this course-corrects for a user error where charm_tracing symbols are imported - # from different paths (typically charms.tempo_coordinator_k8s... and lib.charms.tempo_coordinator_k8s...) + # fallback: this course-corrects for a user error where charm_tracing symbols + # are imported from different paths (typically charms.tempo_coordinator_k8s... + # and lib.charms.tempo_coordinator_k8s...) try: ctx: Context = copy_context() if context_tracer := _get_tracer_from_context(ctx): @@ -722,12 +727,13 @@ def _setup_root_span_initializer( @functools.wraps(original_init) def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): - # we're using 'self' here because this is charm init code, makes sense to read what's below - # from the perspective of the charm. Self.unit.name... + # we're using 'self' here because this is charm init code, makes sense to read + # what's below from the perspective of the charm. Self.unit.name... original_init(self, framework, *args, **kwargs) - # we call this from inside the init context instead of, say, _autoinstrument, because we want it to - # be checked on a per-charm-instantiation basis, not on a per-type-declaration one. + # we call this from inside the init context instead of, say, _autoinstrument, + # because we want it to be checked on a per-charm-instantiation basis, not on a + # per-type-declaration one. if not is_enabled(): # this will only happen during unittesting, hopefully, so it's fine to log a # bit more verbosely @@ -735,14 +741,15 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): return original_event_context = framework._event_context - # default service name isn't just app name because it could conflict with the workload service name + # default service name isn't just app name because it could conflict with the + # workload service name _service_name = service_name or f'{self.app.name}-charm' unit_name = self.unit.name resource = Resource.create( attributes={ - # FIXME is it possible to detect these values early? - # FIXME do we need parity on these very fields? + # FIXME is it possible to detect these values early? FIXME do we need + # parity on these very fields? 'service.name': _service_name, # ahem? 'compose_service': _service_name, # double ahem? 'charm_type': type(self).__name__, # Charm class name, available later @@ -755,15 +762,16 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): ) provider = TracerProvider(resource=resource) - # if anything goes wrong with retrieving the endpoint, we let the exception bubble up. + # if anything goes wrong with retrieving the endpoint, we let the exception + # bubble up. tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_attr, self, charm_type) buffer_only = False # whether we're only exporting to buffer, or also to the otlp exporter. if not tracing_endpoint: - # tracing is off if tracing_endpoint is None - # however we can buffer things until tracing comes online + # tracing is off if tracing_endpoint is None however we can buffer things + # until tracing comes online buffer_only = True server_cert: Optional[Union[str, Path]] = ( @@ -796,9 +804,9 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): else: dev_logger.debug('buffering mode: FALLBACK') - # in principle, we have the right configuration to be pushing traces, - # but if we fail for whatever reason, we will put everything in the buffer - # and retry the next time + # in principle, we have the right configuration to be pushing traces, but if + # we fail for whatever reason, we will put everything in the buffer and + # retry the next time otlp_exporter = _OTLPSpanExporter( endpoint=tracing_endpoint, certificate_file=str(Path(server_cert).absolute()) if server_cert else None, @@ -821,13 +829,14 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): root_span_name = f'{unit_name}: {event_name} event' span = _tracer.start_span(root_span_name, attributes={'juju.dispatch_path': dispatch_path}) - # all these shenanigans are to work around the fact that the opentelemetry tracing API is built - # on the assumption that spans will be used as contextmanagers. - # Since we don't (as we need to close the span on framework.commit), - # we need to manually set the root span as current. + # all these shenanigans are to work around the fact that the opentelemetry + # tracing API is built on the assumption that spans will be used as + # contextmanagers. Since we don't (as we need to close the span on + # framework.commit), we need to manually set the root span as current. ctx = set_span_in_context(span) - # log a trace id, so we can pick it up from the logs (and jhack) to look it up in tempo. + # log a trace id, so we can pick it up from the logs (and jhack) to look it up + # in tempo. root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix logger.debug(f'Starting root trace with id={root_trace_id!r}.') @@ -857,9 +866,9 @@ def wrap_close(): flush_successful = tp.force_flush(timeout_millis=1000) # don't block for too long if buffer_only: - # if we're in buffer_only mode, it means we couldn't even set up the exporter for - # tempo as we're missing some data. - # so attempting to flush the buffer doesn't make sense + # if we're in buffer_only mode, it means we couldn't even set up the + # exporter for tempo as we're missing some data. so attempting to flush + # the buffer doesn't make sense dev_logger.debug('tracing backend unavailable: all spans pushed to buffer') else: @@ -874,7 +883,8 @@ def wrap_close(): # the backend has accepted the spans generated during this event, if not previous_spans_buffered: - # if the buffer was empty to begin with, any spans we collected now can be discarded + # if the buffer was empty to begin with, any spans we collected + # now can be discarded buffer.drop() dev_logger.debug('buffer dropped: this trace has been sent already') else: @@ -887,9 +897,9 @@ def wrap_close(): # TODO is this even possible? dev_logger.debug('buffer flush OK; empty: nothing to flush') else: - # this situation is pretty weird, I'm not even sure it can happen, - # because it would mean that we did manage - # to push traces directly to the tempo exporter (flush_successful), + # this situation is pretty weird, I'm not even sure it can + # happen, because it would mean that we did manage to push + # traces directly to the tempo exporter (flush_successful), # but the buffer flush failed to push to the same exporter! logger.error('buffer flush FAILED') @@ -1054,20 +1064,20 @@ def trace_type(cls: _T) -> _T: dev_logger.debug(f'skipping {method} (dunder)') continue - # the span title in the general case should be: - # method call: MyCharmWrappedMethods.b - # if the method has a name (functools.wrapped or regular method), let - # _trace_callable use its default algorithm to determine what name to give the span. + # the span title in the general case should be: method call: + # MyCharmWrappedMethods.b if the method has a name (functools.wrapped or regular + # method), let _trace_callable use its default algorithm to determine what name + # to give the span. trace_method_name = None try: qualname_c0 = method.__qualname__.split('.')[0] if not hasattr(cls, method.__name__): # if the callable doesn't have a __name__ (probably a decorated method), - # it probably has a bad qualname too (such as my_decorator..wrapper) - # which is not great for finding out what the trace is about. - # So we use the method name instead and add a reference to the decorator name. - # Result: - # method call: @my_decorator(MyCharmWrappedMethods.b) + # it probably has a bad qualname too (such as + # my_decorator..wrapper) which is not great for finding out what + # the trace is about. So we use the method name instead and add a + # reference to the decorator name. Result: method call: + # @my_decorator(MyCharmWrappedMethods.b) trace_method_name = f'@{qualname_c0}({cls.__name__}.{name})' except Exception: # noqa: S110 pass