Skip to content

Commit

Permalink
comform
Browse files Browse the repository at this point in the history
  • Loading branch information
dimaqq committed Jan 16, 2025
1 parent 0a081c8 commit 2a993c9
Showing 1 changed file with 71 additions and 61 deletions.
132 changes: 71 additions & 61 deletions ops/tracing/_fixme.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
# file except in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
from __future__ import annotations

import os
Expand Down Expand Up @@ -251,7 +249,8 @@ def _remove_stale_otel_sdk_packages():
This only has an effect if executed on an upgrade-charm event.
"""
# all imports are local to keep this function standalone, side-effect-free, and easy to revert later
# all imports are local to keep this function standalone, side-effect-free, and easy
# to revert later
import os

if os.getenv('JUJU_DISPATCH_PATH') != 'hooks/upgrade-charm':
Expand All @@ -274,7 +273,8 @@ def _remove_stale_otel_sdk_packages():

otel_logger.debug(f'Found {len(otel_distributions)} opentelemetry distributions')

# If we have multiple distributions with the same name, remove any that have 0 associated files
# If we have multiple distributions with the same name, remove any that have 0
# associated files
for name, distributions_ in otel_distributions.items():
if len(distributions_) <= 1:
continue
Expand All @@ -289,8 +289,8 @@ def _remove_stale_otel_sdk_packages():
otel_logger.debug('Successfully applied _remove_stale_otel_sdk_packages patch. ')


# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm.
# it could be trouble if someone ever decides to implement their own tracer parallel to
# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. it
# could be trouble if someone ever decides to implement their own tracer parallel to
# ours and before the charm has inited. We assume they won't.
_remove_stale_otel_sdk_packages()

Expand Down Expand Up @@ -351,9 +351,9 @@ def _remove_stale_otel_sdk_packages():

CHARM_TRACING_ENABLED = 'CHARM_TRACING_ENABLED'
BUFFER_DEFAULT_CACHE_FILE_NAME = '.charm_tracing_buffer.raw'
# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to serialize/deserialize it in
# any portable format. Json dumping is supported, but loading isn't.
# cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003
# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to
# serialize/deserialize it in any portable format. Json dumping is supported, but
# loading isn't. cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003

BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB = 10
_BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN = 10
Expand Down Expand Up @@ -421,12 +421,13 @@ def _save(self, spans: Sequence[ReadableSpan], replace: bool = False):
new = self._serialize(spans)

try:
# if the buffer exceeds the size limit, we start dropping old spans until it does
# if the buffer exceeds the size limit, we start dropping old spans until it
# does

while len(new + self._SPANSEP.join(old)) > (self._max_buffer_size_mib * _MiB_TO_B):
if not old:
# if we've already dropped all spans and still we can't get under the
# size limit, we can't save this span
# if we've already dropped all spans and still we can't get under
# the size limit, we can't save this span
logger.error(
f'span exceeds total buffer size limit ({self._max_buffer_size_mib}MiB); '
f'buffering FAILED'
Expand Down Expand Up @@ -523,14 +524,16 @@ class _OTLPSpanExporter(OTLPSpanExporter):
it fails a bit faster.
"""

# The issue we're trying to solve is that the model takes AGES to settle if e.g. tls is misconfigured,
# as every hook of a charm_tracing-instrumented charm takes about a minute to exit, as the charm can't
# flush the traces and keeps retrying for 'too long'
# The issue we're trying to solve is that the model takes AGES to settle if e.g. tls
# is misconfigured, as every hook of a charm_tracing-instrumented charm takes about
# a minute to exit, as the charm can't flush the traces and keeps retrying for 'too
# long'

_MAX_RETRY_TIMEOUT = 4
# we give the exporter 4 seconds in total to succeed pushing the traces to tempo
# if it fails, we'll be caching the data in the buffer and flush it the next time, so there's no data loss risk.
# this means 2/3 retries (hard to guess from the implementation) and up to ~7 seconds total wait
# we give the exporter 4 seconds in total to succeed pushing the traces to tempo if
# it fails, we'll be caching the data in the buffer and flush it the next time, so
# there's no data loss risk. this means 2/3 retries (hard to guess from the
# implementation) and up to ~7 seconds total wait


class _BufferedExporter(InMemorySpanExporter):
Expand All @@ -543,7 +546,8 @@ def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult:
return super().export(spans)

def force_flush(self, timeout_millis: int = 0) -> bool:
# parent implementation is fake, so the timeout_millis arg is not doing anything.
# parent implementation is fake, so the timeout_millis arg is not doing
# anything.
result = super().force_flush(timeout_millis)
self._buffer.save(self.get_finished_spans())
return result
Expand Down Expand Up @@ -593,8 +597,9 @@ def _get_tracer() -> Optional[Tracer]:
try:
return tracer.get()
except LookupError:
# fallback: this course-corrects for a user error where charm_tracing symbols are imported
# from different paths (typically charms.tempo_coordinator_k8s... and lib.charms.tempo_coordinator_k8s...)
# fallback: this course-corrects for a user error where charm_tracing symbols
# are imported from different paths (typically charms.tempo_coordinator_k8s...
# and lib.charms.tempo_coordinator_k8s...)
try:
ctx: Context = copy_context()
if context_tracer := _get_tracer_from_context(ctx):
Expand Down Expand Up @@ -722,27 +727,29 @@ def _setup_root_span_initializer(

@functools.wraps(original_init)
def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs):
# we're using 'self' here because this is charm init code, makes sense to read what's below
# from the perspective of the charm. Self.unit.name...
# we're using 'self' here because this is charm init code, makes sense to read
# what's below from the perspective of the charm. Self.unit.name...

original_init(self, framework, *args, **kwargs)
# we call this from inside the init context instead of, say, _autoinstrument, because we want it to
# be checked on a per-charm-instantiation basis, not on a per-type-declaration one.
# we call this from inside the init context instead of, say, _autoinstrument,
# because we want it to be checked on a per-charm-instantiation basis, not on a
# per-type-declaration one.
if not is_enabled():
# this will only happen during unittesting, hopefully, so it's fine to log a
# bit more verbosely
logger.info('Tracing DISABLED: skipping root span initialization')
return

original_event_context = framework._event_context
# default service name isn't just app name because it could conflict with the workload service name
# default service name isn't just app name because it could conflict with the
# workload service name
_service_name = service_name or f'{self.app.name}-charm'

unit_name = self.unit.name
resource = Resource.create(
attributes={
# FIXME is it possible to detect these values early?
# FIXME do we need parity on these very fields?
# FIXME is it possible to detect these values early? FIXME do we need
# parity on these very fields?
'service.name': _service_name, # ahem?
'compose_service': _service_name, # double ahem?
'charm_type': type(self).__name__, # Charm class name, available later
Expand All @@ -755,15 +762,16 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs):
)
provider = TracerProvider(resource=resource)

# if anything goes wrong with retrieving the endpoint, we let the exception bubble up.
# if anything goes wrong with retrieving the endpoint, we let the exception
# bubble up.
tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_attr, self, charm_type)

buffer_only = False
# whether we're only exporting to buffer, or also to the otlp exporter.

if not tracing_endpoint:
# tracing is off if tracing_endpoint is None
# however we can buffer things until tracing comes online
# tracing is off if tracing_endpoint is None however we can buffer things
# until tracing comes online
buffer_only = True

server_cert: Optional[Union[str, Path]] = (
Expand Down Expand Up @@ -796,9 +804,9 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs):

else:
dev_logger.debug('buffering mode: FALLBACK')
# in principle, we have the right configuration to be pushing traces,
# but if we fail for whatever reason, we will put everything in the buffer
# and retry the next time
# in principle, we have the right configuration to be pushing traces, but if
# we fail for whatever reason, we will put everything in the buffer and
# retry the next time
otlp_exporter = _OTLPSpanExporter(
endpoint=tracing_endpoint,
certificate_file=str(Path(server_cert).absolute()) if server_cert else None,
Expand All @@ -821,13 +829,14 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs):
root_span_name = f'{unit_name}: {event_name} event'
span = _tracer.start_span(root_span_name, attributes={'juju.dispatch_path': dispatch_path})

# all these shenanigans are to work around the fact that the opentelemetry tracing API is built
# on the assumption that spans will be used as contextmanagers.
# Since we don't (as we need to close the span on framework.commit),
# we need to manually set the root span as current.
# all these shenanigans are to work around the fact that the opentelemetry
# tracing API is built on the assumption that spans will be used as
# contextmanagers. Since we don't (as we need to close the span on
# framework.commit), we need to manually set the root span as current.
ctx = set_span_in_context(span)

# log a trace id, so we can pick it up from the logs (and jhack) to look it up in tempo.
# log a trace id, so we can pick it up from the logs (and jhack) to look it up
# in tempo.
root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix
logger.debug(f'Starting root trace with id={root_trace_id!r}.')

Expand Down Expand Up @@ -857,9 +866,9 @@ def wrap_close():
flush_successful = tp.force_flush(timeout_millis=1000) # don't block for too long

if buffer_only:
# if we're in buffer_only mode, it means we couldn't even set up the exporter for
# tempo as we're missing some data.
# so attempting to flush the buffer doesn't make sense
# if we're in buffer_only mode, it means we couldn't even set up the
# exporter for tempo as we're missing some data. so attempting to flush
# the buffer doesn't make sense
dev_logger.debug('tracing backend unavailable: all spans pushed to buffer')

else:
Expand All @@ -874,7 +883,8 @@ def wrap_close():

# the backend has accepted the spans generated during this event,
if not previous_spans_buffered:
# if the buffer was empty to begin with, any spans we collected now can be discarded
# if the buffer was empty to begin with, any spans we collected
# now can be discarded
buffer.drop()
dev_logger.debug('buffer dropped: this trace has been sent already')
else:
Expand All @@ -887,9 +897,9 @@ def wrap_close():
# TODO is this even possible?
dev_logger.debug('buffer flush OK; empty: nothing to flush')
else:
# this situation is pretty weird, I'm not even sure it can happen,
# because it would mean that we did manage
# to push traces directly to the tempo exporter (flush_successful),
# this situation is pretty weird, I'm not even sure it can
# happen, because it would mean that we did manage to push
# traces directly to the tempo exporter (flush_successful),
# but the buffer flush failed to push to the same exporter!
logger.error('buffer flush FAILED')

Expand Down Expand Up @@ -1054,20 +1064,20 @@ def trace_type(cls: _T) -> _T:
dev_logger.debug(f'skipping {method} (dunder)')
continue

# the span title in the general case should be:
# method call: MyCharmWrappedMethods.b
# if the method has a name (functools.wrapped or regular method), let
# _trace_callable use its default algorithm to determine what name to give the span.
# the span title in the general case should be: method call:
# MyCharmWrappedMethods.b if the method has a name (functools.wrapped or regular
# method), let _trace_callable use its default algorithm to determine what name
# to give the span.
trace_method_name = None
try:
qualname_c0 = method.__qualname__.split('.')[0]
if not hasattr(cls, method.__name__):
# if the callable doesn't have a __name__ (probably a decorated method),
# it probably has a bad qualname too (such as my_decorator.<locals>.wrapper)
# which is not great for finding out what the trace is about.
# So we use the method name instead and add a reference to the decorator name.
# Result:
# method call: @my_decorator(MyCharmWrappedMethods.b)
# it probably has a bad qualname too (such as
# my_decorator.<locals>.wrapper) which is not great for finding out what
# the trace is about. So we use the method name instead and add a
# reference to the decorator name. Result: method call:
# @my_decorator(MyCharmWrappedMethods.b)
trace_method_name = f'@{qualname_c0}({cls.__name__}.{name})'
except Exception: # noqa: S110
pass
Expand Down

0 comments on commit 2a993c9

Please sign in to comment.