From d7cfe5da514fac2f934a616626ff05c1d5a61732 Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Tue, 14 Jan 2025 16:15:48 +0900 Subject: [PATCH 1/6] feat: ops[tracing] --- docs/requirements.txt | 72 +++++++- dont-merge/fake-charm.py | 61 +++++++ dont-merge/metadata.yaml | 1 + dont-merge/otel-collector-config.yaml | 25 +++ dont-merge/readme.md | 114 ++++++++++++ dont-merge/send-traces.py | 63 +++++++ ops/__init__.py | 2 + ops/_main.py | 21 ++- ops/_private/yaml.py | 5 + ops/_tracing/__init__.py | 71 ++++++++ ops/_tracing/buffer.py | 244 ++++++++++++++++++++++++++ ops/_tracing/export.py | 198 +++++++++++++++++++++ ops/_tracing/hacks.py | 71 ++++++++ ops/charm.py | 11 ++ ops/framework.py | 53 ++++-- ops/log.py | 36 +++- ops/model.py | 57 +++++- ops/pebble.py | 41 +++++ ops/storage.py | 1 + pyproject.toml | 7 + test/test_tracing_buffer.py | 19 ++ test/test_tracing_export.py | 19 ++ tox.ini | 7 +- 23 files changed, 1179 insertions(+), 20 deletions(-) create mode 100755 dont-merge/fake-charm.py create mode 100644 dont-merge/metadata.yaml create mode 100644 dont-merge/otel-collector-config.yaml create mode 100644 dont-merge/readme.md create mode 100644 dont-merge/send-traces.py create mode 100644 ops/_tracing/__init__.py create mode 100644 ops/_tracing/buffer.py create mode 100644 ops/_tracing/export.py create mode 100644 ops/_tracing/hacks.py create mode 100644 test/test_tracing_buffer.py create mode 100644 test/test_tracing_export.py diff --git a/docs/requirements.txt b/docs/requirements.txt index ec9ab06a4..1c180c11a 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -2,7 +2,7 @@ # This file is autogenerated by pip-compile with Python 3.11 # by the following command: # -# pip-compile --extra=docs --output-file=docs/requirements.txt pyproject.toml +# pip-compile --extra=docs,tracing --output-file=docs/requirements.txt pyproject.toml # alabaster==1.0.0 # via sphinx @@ -29,6 +29,11 @@ click==8.1.8 # via uvicorn colorama==0.4.6 # via sphinx-autobuild +deprecated==1.2.15 + # via + # opentelemetry-api + # opentelemetry-exporter-otlp-proto-http + # opentelemetry-semantic-conventions docutils==0.21.2 # via # canonical-sphinx-extensions @@ -41,6 +46,8 @@ gitdb==4.0.12 # via gitpython gitpython==3.1.44 # via canonical-sphinx-extensions +googleapis-common-protos==1.66.0 + # via opentelemetry-exporter-otlp-proto-http h11==0.14.0 # via uvicorn html5lib==1.1 @@ -51,6 +58,10 @@ idna==3.10 # requests imagesize==1.4.1 # via sphinx +importlib-metadata==8.5.0 + # via + # opentelemetry-api + # ops (pyproject.toml) jinja2==3.1.5 # via # myst-parser @@ -73,8 +84,53 @@ mdurl==0.1.2 # via markdown-it-py myst-parser==4.0.0 # via ops (pyproject.toml) +opentelemetry-api==1.29.0 + # via + # opentelemetry-exporter-otlp-proto-http + # opentelemetry-instrumentation + # opentelemetry-instrumentation-dbapi + # opentelemetry-instrumentation-sqlite3 + # opentelemetry-instrumentation-urllib + # opentelemetry-sdk + # opentelemetry-semantic-conventions + # ops (pyproject.toml) +opentelemetry-exporter-otlp-proto-common==1.29.0 + # via opentelemetry-exporter-otlp-proto-http +opentelemetry-exporter-otlp-proto-http==1.29.0 + # via ops (pyproject.toml) +opentelemetry-instrumentation==0.50b0 + # via + # opentelemetry-instrumentation-dbapi + # opentelemetry-instrumentation-sqlite3 + # opentelemetry-instrumentation-urllib +opentelemetry-instrumentation-dbapi==0.50b0 + # via opentelemetry-instrumentation-sqlite3 +opentelemetry-instrumentation-sqlite3==0.50b0 + # via ops (pyproject.toml) +opentelemetry-instrumentation-urllib==0.50b0 + # via ops (pyproject.toml) +opentelemetry-proto==1.29.0 + # via + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-http +opentelemetry-sdk==1.29.0 + # via opentelemetry-exporter-otlp-proto-http +opentelemetry-semantic-conventions==0.50b0 + # via + # opentelemetry-instrumentation + # opentelemetry-instrumentation-dbapi + # opentelemetry-instrumentation-urllib + # opentelemetry-sdk +opentelemetry-util-http==0.50b0 + # via opentelemetry-instrumentation-urllib packaging==24.2 - # via sphinx + # via + # opentelemetry-instrumentation + # sphinx +protobuf==5.29.3 + # via + # googleapis-common-protos + # opentelemetry-proto pygments==2.19.1 # via # furo @@ -90,6 +146,7 @@ pyyaml==6.0.2 requests==2.32.3 # via # canonical-sphinx-extensions + # opentelemetry-exporter-otlp-proto-http # sphinx six==1.17.0 # via html5lib @@ -148,7 +205,9 @@ sphinxext-opengraph==0.9.1 starlette==0.45.2 # via sphinx-autobuild typing-extensions==4.12.2 - # via anyio + # via + # anyio + # opentelemetry-sdk uc-micro-py==1.0.3 # via linkify-it-py urllib3==2.3.0 @@ -165,4 +224,11 @@ websocket-client==1.8.0 # via ops (pyproject.toml) websockets==14.1 # via sphinx-autobuild +wrapt==1.17.2 + # via + # deprecated + # opentelemetry-instrumentation + # opentelemetry-instrumentation-dbapi +zipp==3.21.0 + # via importlib-metadata ./testing/ diff --git a/dont-merge/fake-charm.py b/dont-merge/fake-charm.py new file mode 100755 index 000000000..8c1514726 --- /dev/null +++ b/dont-merge/fake-charm.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""FIXME dummy_load docstring.""" + +from __future__ import annotations + +import time + +import opentelemetry.trace + +import ops + +tracer = opentelemetry.trace.get_tracer(__name__) + + +class FakeCharm(ops.CharmBase): + """Dummy docstring.""" + + def __init__(self, framework: ops.Framework): + """Dummy docstring.""" + super().__init__(framework) + self.framework.observe(self.on.start, self._on_start) + self.framework.observe(self.on.collect_app_status, self._on_collect_app_status) + self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) + + def _on_start(self, event: ops.StartEvent) -> None: + """Dummy docstring.""" + ops.set_tracing_destination(url='http://localhost:4318/v1/traces') + self.dummy_load(event, 0.0025) + + def _on_collect_app_status(self, event: ops.CollectStatusEvent) -> None: + """Dummy docstring.""" + self.dummy_load(event) + event.add_status(ops.ActiveStatus('app seems ready')) + + def _on_collect_unit_status(self, event: ops.CollectStatusEvent) -> None: + """Dummy docstring.""" + self.dummy_load(event) + event.add_status(ops.ActiveStatus('unit ready')) + + @tracer.start_as_current_span('FakeCharm.dummy_load') # type: ignore + def dummy_load(self, event: ops.EventBase, duration: float = 0.001) -> None: + """Dummy docstring.""" + print(event) + time.sleep(duration) + + +if __name__ == '__main__': + ops.main(FakeCharm) diff --git a/dont-merge/metadata.yaml b/dont-merge/metadata.yaml new file mode 100644 index 000000000..baad1ae37 --- /dev/null +++ b/dont-merge/metadata.yaml @@ -0,0 +1 @@ +name: testmetest diff --git a/dont-merge/otel-collector-config.yaml b/dont-merge/otel-collector-config.yaml new file mode 100644 index 000000000..5d5b0abaa --- /dev/null +++ b/dont-merge/otel-collector-config.yaml @@ -0,0 +1,25 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: "[::]:4317" + http: + endpoint: "[::]:4318" + +processors: + batch: + +exporters: + debug: + verbosity: detailed + jaeger: + endpoint: jaeger:14250 + tls: + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] diff --git a/dont-merge/readme.md b/dont-merge/readme.md new file mode 100644 index 000000000..9acd4196d --- /dev/null +++ b/dont-merge/readme.md @@ -0,0 +1,114 @@ +### Usage + +Recommended for traces of moderate and high complexity: + +```command +dima@colima-ahh /c/operator (feat-otel)> docker run --rm --name jaeger \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + -p 5778:5778 \ + -p 9411:9411 \ + jaegertracing/jaeger:2.2.0 +``` + +After which, you should be able to: +- open http://192.168.107.4:16686/ in your browser +- select the correct **Service** (`testapp-charm` at current branch state) +- click Search at the bottom of the form + +Note: the `jaeger` container keeps traces in memory, and your Service can't be selected +until it has sent some data to `jaeger`. + +Alternatively, text-based: + +```command +dima@colima-ahh /c/operator (feat-otel)> docker run -it --rm \ + -v (pwd)/dont-merge/otel-collector-config.yaml:/etc/otel-collector-config.yaml \ + -p 4317:4317 \ + -p 4318:4318 \ + otel/opentelemetry-collector:latest \ + --config=/etc/otel-collector-config.yaml +``` + +and then + +```command +dima@colima-ahh /c/operator (feat-otel)> uv venv --seed .ahh-venv +Using CPython 3.13.0 +Creating virtual environment with seed packages at: .ahh-venv + +dima@colima-ahh /c/operator (feat-otel)> . .ahh-venv/bin/activate.fish +(.ahh-venv) dima@colima-ahh /c/operator (feat-otel)> + +(.ahh-venv) dima@colima-ahh /c/operator (feat-otel)> uv pip install -e .[tracing] -U +Using Python 3.13.0 environment at .ahh-venv +Resolved 21 packages in 907ms +Prepared 18 packages in 72ms +... + +(.ahh-venv) dima@colima-ahh /c/operator (feat-otel)> python dont-merge/send-traces.py +Span created and exported to the collector! +``` + +### Hacking + +Or, trying to run code outside of a charm. + +Somehow I'm not getting anything, because the `juju-log` hook tool is missing. + +Let's fix that. + +```command +> ln -s (which echo) juju-log +``` + +Generate some tracing data: + +```command +(venv) > JUJU_UNIT_NAME=testapp/42 JUJU_CHARM_DIR=dont-merge/ PATH=$PATH:. JUJU_VERSION=3.5.4 ./dont-merge/start +``` + +OTEL collector debug output would look like this: + +``` +2025-01-15T08:46:23.229Z info Traces {"kind": "exporter", "data_type": "traces", "name": "debug", "resource spans": 1, "spans": 1} +2025-01-15T08:46:23.229Z info ResourceSpans #0 +Resource SchemaURL: +Resource attributes: + -> telemetry.sdk.language: Str(python) + -> telemetry.sdk.name: Str(opentelemetry) + -> telemetry.sdk.version: Str(1.29.0) + -> service.name: Str(testapp-charm) + -> compose_service: Str(testapp-charm) + -> charm_type: Str(CharmBase) + -> juju_unit: Str(testapp/42) + -> juju_application: Str(testapp) + -> juju_model: Str() + -> juju_model_uuid: Str() +ScopeSpans #0 +ScopeSpans SchemaURL: +InstrumentationScope ops +Span #0 + Trace ID : 8c3f292c89f29c59f1b37fe59ba0abbc + Parent ID : + ID : e0253a03ef694a4f + Name : ops.main + Kind : Internal + Start time : 2025-01-15 08:46:23.175916835 +0000 UTC + End time : 2025-01-15 08:46:23.182329655 +0000 UTC + Status code : Error + Status message : RuntimeError: command not found: is-leader +Events: +SpanEvent #0 + -> Name: exception + -> Timestamp: 2025-01-15 08:46:23.182316071 +0000 UTC + -> DroppedAttributesCount: 0 + -> Attributes:: + -> exception.type: Str(RuntimeError) + -> exception.message: Str(command not found: is-leader) + -> exception.stacktrace: Str(Traceback (most recent call last): + ... + -> exception.escaped: Str(False) + {"kind": "exporter", "data_type": "traces", "name": "debug"} +``` diff --git a/dont-merge/send-traces.py b/dont-merge/send-traces.py new file mode 100644 index 000000000..8a2061a88 --- /dev/null +++ b/dont-merge/send-traces.py @@ -0,0 +1,63 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""FIXME dummy docstring.""" + +from __future__ import annotations + +import logging + +import opentelemetry.trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +# The default ProxyTracer allows tracers to be declared ahead of time like loggers +logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) + +# 1. Create a tracer provider with a "service.name" resource attribute +opentelemetry.trace.set_tracer_provider( + TracerProvider(resource=Resource.create({'service.name': 'example-service'})) +) + +# 2. Configure the OTLP HTTP exporter (defaults to protobuf format) +otlp_exporter = OTLPSpanExporter( + endpoint='http://localhost:4318/v1/traces' + # If you needed headers or auth, you could add them like: + # headers={"Authorization": "Bearer "}, +) + +# 3. Create a span processor (BatchSpanProcessor recommended for production) +span_processor = BatchSpanProcessor(otlp_exporter) +opentelemetry.trace.get_tracer_provider().add_span_processor(span_processor) # type: ignore + + +@tracer.start_as_current_span('some label') # type: ignore +def main(foo: int = 42): + """Do something.""" + # can't add attributes to a decorator, if needed use the below instead + # + # with tracer.start_as_current_span("some label") as span: + # span.set_attribute('foo', 'bar') + # span.add_event('sample_event', {'event_attr': 123}) + + logger.info('Span created and will be exported to the collector soon!') + + +if __name__ == '__main__': + logging.basicConfig(level='INFO') + main() + # from typing_extensions import reveal_type + # reveal_type(main) diff --git a/ops/__init__.py b/ops/__init__.py index 4cda2bca5..3d954b286 100644 --- a/ops/__init__.py +++ b/ops/__init__.py @@ -54,6 +54,7 @@ # that those symbols are part of the public API, so we have to add __all__. __all__ = [ # noqa: RUF022 `__all__` is not sorted '__version__', + 'set_tracing_destination', 'main', 'pebble', # From charm.py @@ -333,6 +334,7 @@ # NOTE: don't import testing or Harness here, as that's a test-time concern # rather than a runtime concern. +from ._tracing import set_tracing_destination from .version import version as __version__ diff --git a/ops/_main.py b/ops/_main.py index d82fdc597..2406430c0 100644 --- a/ops/_main.py +++ b/ops/_main.py @@ -23,6 +23,9 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast +import opentelemetry.trace + +import ops._tracing import ops.charm import ops.framework import ops.model @@ -35,6 +38,7 @@ logger = logging.getLogger() +tracer = opentelemetry.trace.get_tracer(__name__) def _exe_path(path: Path) -> Optional[Path]: @@ -212,6 +216,8 @@ class _Dispatcher: """ + event_name: str + def __init__(self, charm_dir: Path, juju_context: _JujuContext): self._juju_context = juju_context self._charm_dir = charm_dir @@ -268,7 +274,9 @@ def run_any_legacy_hook(self): argv[0] = str(dispatch_path) logger.info('Running legacy %s.', self._dispatch_path) try: - subprocess.run(argv, check=True) + with tracer.start_as_current_span('ops.run_legacy_hook') as span: # type: ignore + span.set_attribute('argv', ' '.join(argv)) # type: ignore + subprocess.run(argv, check=True) except subprocess.CalledProcessError as e: logger.warning('Legacy %s exited with status %d.', self._dispatch_path, e.returncode) raise _Abort(e.returncode) from e @@ -552,9 +560,16 @@ def main(charm_class: Type[ops.charm.CharmBase], use_juju_for_storage: Optional[ See `ops.main() <#ops-main-entry-point>`_ for details. """ + ops._tracing.setup_tracing(charm_class.__name__) + + # opentelemetry-api types are broken + # https://github.com/open-telemetry/opentelemetry-python/issues/3836 try: - manager = _Manager(charm_class, use_juju_for_storage=use_juju_for_storage) + with tracer.start_as_current_span('ops.main'): # type: ignore + manager = _Manager(charm_class, use_juju_for_storage=use_juju_for_storage) - manager.run() + manager.run() except _Abort as e: sys.exit(e.exit_code) + finally: + ops._tracing.shutdown_tracing() diff --git a/ops/_private/yaml.py b/ops/_private/yaml.py index f3ceadb27..84f5b80a8 100644 --- a/ops/_private/yaml.py +++ b/ops/_private/yaml.py @@ -16,18 +16,23 @@ from typing import Any, Optional, TextIO, Union +import opentelemetry.trace import yaml +tracer = opentelemetry.trace.get_tracer(__name__) + # Use C speedups if available _safe_loader = getattr(yaml, 'CSafeLoader', yaml.SafeLoader) _safe_dumper = getattr(yaml, 'CSafeDumper', yaml.SafeDumper) +@tracer.start_as_current_span('ops.yaml.safe_load') # type: ignore def safe_load(stream: Union[str, TextIO]) -> Any: """Same as yaml.safe_load, but use fast C loader if available.""" return yaml.load(stream, Loader=_safe_loader) # noqa: S506 +@tracer.start_as_current_span('ops.yaml.safe_dump') # type: ignore def safe_dump(data: Any, stream: Optional[TextIO] = None) -> str: """Same as yaml.safe_dump, but use fast C dumper if available.""" return yaml.dump(data, stream=stream, Dumper=_safe_dumper) # type: ignore diff --git a/ops/_tracing/__init__.py b/ops/_tracing/__init__.py new file mode 100644 index 000000000..b5bd52b14 --- /dev/null +++ b/ops/_tracing/__init__.py @@ -0,0 +1,71 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The tracing facility of the Operator Framework. + +TODO: quick start, usage example. +""" + +from __future__ import annotations + +import logging + +import opentelemetry.trace + +from ops._tracing import hacks + +# FIXME must this hack be run before OTEL packages are imported? +hacks.remove_stale_otel_sdk_packages() +tracer = opentelemetry.trace.get_tracer(__name__) + + +try: + from . import export +except ImportError: + export = None + + +def setup_tracing(charm_class_name: str) -> None: + """Setup tracing for this "dispatch" of the charm code.""" + if not export: + return + export.setup_tracing(charm_class_name) + + +@tracer.start_as_current_span('ops.set_tracing_destination') # type: ignore +def set_tracing_destination( + *, + url: str | None, + ca: str | None = None, +) -> None: + """Configure the destination service for tracing data. + + Args: + url: The URL of the "collector", the destination for tracing data. + Example: 'http://localhost:4318/v1/traces' + ca: The PEM-formatted server certificate authority list. + This argument is in effect only if the ``url`` is an HTTPS URL. + """ + if not export: + return + export.set_tracing_destination(url=url, ca=ca) + + +def shutdown_tracing() -> None: + """Send out as much as possible, if possible.""" + if not export: + return + try: + export.shutdown_tracing() + except Exception: + logging.exception('failed to flush tracing') diff --git a/ops/_tracing/buffer.py b/ops/_tracing/buffer.py new file mode 100644 index 000000000..53d81027e --- /dev/null +++ b/ops/_tracing/buffer.py @@ -0,0 +1,244 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. +"""Buffer for tracing data.""" + +from __future__ import annotations + +import contextlib +import functools +import logging +import sqlite3 +from typing import Callable + +from typing_extensions import ParamSpec, TypeVar +from typing_extensions import reveal_type as reveal_type # FIXME + +# Approximate safety limit for the database file size +BUFFER_SIZE = 40 * 1024 * 1024 + +# Default priority for tracing data. +# Dispatch invocation that doesn't result in any event being observed +# by charm or its charm lib produces data at this priority. +DEFAULT_PRIORITY = 10 + +# Higher priority for data from invocation with observed events. +OBSERVED_PRIORITY = 50 + +# Must have a short timeout when terminating +# May want to have a longer timeout otherwise +DB_TIMEOUT = 5 +LONG_DB_TIMEOUT = 3600 + +# Must use isolation_level=None for consistency between Python 3.8 and 3.12 +# Can't use the STRICT keyword for tables, requires sqlite 3.37.0 +# Can't use the octet_length() either, requires sqlite 3.43.0 +# +# Ubuntu 20.04 Python 3.8.2 Sqlite 3.31.1 Adds UPSERT, window functions +# Ubuntu 22.04 Python 3.10.x Sqlite 3.37.2 Adds STRICT tables, JSON ops +# Ubuntu 24.04 Python 3.12.x Sqlite 3.45.2 Adds math functions + +logger = logging.getLogger(__name__) + +P = ParamSpec('P') +R = TypeVar('R') + + +def retry(f: Callable[P, R]) -> Callable[P, R]: + @functools.wraps(f) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + exc: sqlite3.Error | None = None + + for _ in range(3): + try: + return f(*args, **kwargs) + except sqlite3.Error as e: # noqa: PERF203 + exc = e + continue + else: + assert exc + raise exc + + return wrapper + + +class Buffer: + """Buffer for tracing data. + + Access buffer attributes is effectively protected by an sqlite transaction. + """ + + ids: set[int] + """tracing data ids buffered during this dispatch invocation.""" + observed = False + """Marks that data from this dispatch invocation has been marked observed.""" + + def __init__(self, path: str): + self.path = path + self.ids = set() + self._set_db_schema() + + @retry + def _set_db_schema(self): + # NOTE: measure the cost of this vs two-level approach: + # - check table and index in read-only mode + # - if needed, update the DSL + # NOTE: ops storage sets u+rw, go-rw permissions + # should we follow suit? + with self.tx(timeout=LONG_DB_TIMEOUT) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS tracing ( + -- effectively auto-incremented + id INTEGER PRIMARY KEY, + -- observed events are more important + priority INTEGER NOT NULL, + -- Protobuf-formatted tracing data + data BLOB NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS tracing_priority_id + ON tracing + (priority, id) + """ + ) + + @contextlib.contextmanager + def tx(self, *, timeout: float = DB_TIMEOUT, readonly: bool = False): + """Thread-safe transaction context manager.""" + with sqlite3.connect(self.path, isolation_level=None, timeout=timeout) as conn: + mode = 'DEFERRED' if readonly else 'IMMEDIATE' + conn.execute(f'BEGIN {mode}') + try: + yield conn + except: + conn.execute('ROLLBACK') + raise + else: + conn.execute('COMMIT') + + # TODO: + # add some retry mechanism... + # when database is written to from two threads, + # either we'd get an exaception on BEGIN (immediate mode) + # or we'd get an exception on COMMIT (deferred mode) + # either way, a retry will be needed sooner or later. + + @retry + def mark_observed(self): + if self.observed: + return + + with self.tx(timeout=LONG_DB_TIMEOUT) as conn: + conn.execute( + f""" + UPDATE tracing + SET priority = ? + WHERE id IN ({','.join(('?',) * len(self.ids))}) + """, # noqa: S608 + (OBSERVED_PRIORITY, *tuple(self.ids)), + ) + self.observed = True + self.ids.clear() + + @retry + def pump(self, chunk: bytes | None = None) -> tuple[int, bytes] | None: + """Pump the buffer queue. + + Accepts an optional new data chunk. + Removes old, boring data if needed. + Returns the oldest important record. + """ + # NOTE: discussion about transaction type: + # - this may be a read-only transaction (no data to save, read out one record) + # - or a read transaction later upgraded to write (check space, then delete some) + # currently I've made `self.tx()` return a write transaction always + # which is safer, but may incur a filesystem modification cost. + with self.tx(readonly=not chunk) as conn: + if chunk: + # Ensure that there's enough space in the buffer + chunklen = (len(chunk) + 4095) // 4096 * 4096 + stored: int | None = conn.execute( + """ + SELECT sum((length(data)+4095)/4096*4096) + FROM tracing + """ + ).fetchone()[0] + # TODO: expose `stored` in metrics, one day + excess = (stored or 0) + chunklen - BUFFER_SIZE + logging.debug(f'{excess=}') + + if excess > 0: + # Drop lower-priority, older data + cursor = conn.execute( + """ + SELECT id, (length(data)+4095)/4096*4096 + FROM tracing + ORDER BY priority ASC, id ASC + """ + ) + + collected_ids: set[int] = set() + collected_size: int = 0 + for id_, size in cursor: + collected_ids.add(id_) + collected_size += size + if collected_size > excess: + break + + assert collected_ids + logging.debug(f'{len(collected_ids)=}') + conn.execute( + f""" + DELETE FROM tracing + WHERE id IN ({','.join(('?',) * len(collected_ids))}) + """, # noqa: S608 + tuple(collected_ids), + ) + + # Store the new tracing data + priority = OBSERVED_PRIORITY if self.observed else DEFAULT_PRIORITY + cursor = conn.execute( + """ + INSERT INTO tracing (priority, data) + VALUES (?, ?) + """, + (priority, chunk), + ) + + assert cursor.lastrowid is not None + if not self.observed: + self.ids.add(cursor.lastrowid) + + # Return oldest important data + return conn.execute( + """ + SELECT id, data + FROM tracing + ORDER BY priority DESC, id ASC + LIMIT 1 + """ + ).fetchone() + + @retry + def remove(self, id_: int): + with self.tx() as conn: + conn.execute( + """ + DELETE FROM tracing + WHERE id = ? + """, + (id_,), + ) + self.ids -= {id_} diff --git a/ops/_tracing/export.py b/ops/_tracing/export.py new file mode 100644 index 000000000..0a2783459 --- /dev/null +++ b/ops/_tracing/export.py @@ -0,0 +1,198 @@ +# Copyright 2022 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. +"""FIXME Docstring.""" + +from __future__ import annotations + +import contextlib +import logging +import os +import time +from typing import Sequence + +from opentelemetry.exporter.otlp.proto.common._internal import trace_encoder +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.sqlite3 import SQLite3Instrumentor # type: ignore +from opentelemetry.instrumentation.urllib import URLLibInstrumentor # type: ignore +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, SpanExportResult +from opentelemetry.trace import get_tracer_provider, set_tracer_provider + +import ops +import ops._tracing.buffer +import ops.jujucontext +import ops.log + +logger = logging.getLogger(__name__) +# Trace `sqlite3` usage by ops storage component +SQLite3Instrumentor().instrument() +# Trace `urllib` usage when talking to Pebble +URLLibInstrumentor().instrument() + +_OTLP_SPAN_EXPORTER_TIMEOUT = 1 # seconds +"""How much to give OTLP span exporter has to push traces to the backend.""" + +SENDOUT_FACTOR = 2 +"""How much buffered chunks to send out for each incoming chunk.""" + +# FIXME: this creates a separate file next to the CHARM_STATE_FILE +# We could stuff both kinds of data into the same file, I guess? +BUFFER_FILE = '.tracing-data.db' +# Currently ops.storage keeps one long transaction open for the duration of the +# the dispatch, which means we can't use the same file from another thread. +# BUFFER_FILE = '.unit-state.db' + + +_exporter: ProxySpanExporter | None = None + + +class ProxySpanExporter(SpanExporter): + real_exporter: SpanExporter | None = None + + def __init__(self, buffer_path: str): + self.buffer = ops._tracing.buffer.Buffer(buffer_path) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export a batch of telemetry data. + + Note: to avoid data loops or recursion, this function cannot be instrumented. + """ + logging.debug(f'FIXME export {len(spans)=}') + with suppress_juju_log_handler(): + # Note: + # this is called in a helper thread, which is daemonic, + # the MainThread will wait at most 10s for this thread. + # Margins: + # - 1s safety margin + # - 1s for buffered data time overhang + # - 2s for live data + deadline = time.monotonic() + 6 + + assert spans # the BatchSpanProcessor won't call us if there's no data + # TODO: this will change in the JSON experiment + data: bytes = trace_encoder.encode_spans(spans).SerializePartialToString() + rv = self.buffer.pump(data) + logging.debug('FIXME saved') + assert rv + self.do_export(*rv) + + for _ in range(SENDOUT_FACTOR - 1): + if time.monotonic() > deadline: + break + if not (rv := self.buffer.pump()): + break + self.do_export(*rv) + + return SpanExportResult.SUCCESS + + def do_export(self, buffered_id: int, data: bytes) -> None: + """Export buffered data and remove it from the buffer on success.""" + logging.debug(f'FIXME asked {buffered_id=} {len(data)=}') + # TODO: this will change in the JSON experiment + if self.real_exporter and self.real_exporter._export(data).ok: # type: ignore + logging.debug('FIXME removing') + self.buffer.remove(buffered_id) + + def shutdown(self) -> None: + """Shut down the exporter.""" + if self.real_exporter: + self.real_exporter.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """No-op, as the real exporter doesn't buffer.""" + return True + + def set_real_exporter(self, exporter: SpanExporter) -> None: + self.real_exporter = exporter + + +@contextlib.contextmanager +def suppress_juju_log_handler(): + handlers = [h for h in logging.root.handlers if isinstance(h, ops.log.JujuLogHandler)] + if not handlers: + yield + return + + juju_log_handler = handlers[0] + token = juju_log_handler.drop.set(True) + try: + yield + finally: + juju_log_handler.drop.reset(token) + + +def setup_tracing(charm_class_name: str) -> None: + global _exporter + # FIXME would it be better to pass Juju context explicitly? + juju_context = ops.jujucontext._JujuContext.from_dict(os.environ) + app_name = '' if juju_context.unit_name is None else juju_context.unit_name.split('/')[0] + service_name = f'{app_name}-charm' # only one COS charm sets custom value + + resource = Resource.create( + attributes={ + 'service.name': service_name, + 'compose_service': service_name, # FIXME why is this copy needed? + 'charm_type': charm_class_name, + # juju topology + 'juju_unit': juju_context.unit_name, + 'juju_application': app_name, + 'juju_model': juju_context.model_name, + 'juju_model_uuid': juju_context.model_uuid, + } + ) + provider = TracerProvider(resource=resource) + + # How + + buffer_path = str(juju_context.charm_dir / BUFFER_FILE) + _exporter = ProxySpanExporter(buffer_path) + span_processor = BatchSpanProcessor(_exporter) + provider.add_span_processor(span_processor) + logging.debug('FIXME setup_tracing') + set_tracer_provider(provider) + + +# FIXME make it very cheap to call this method a second time with same arguments +def set_tracing_destination( + *, + url: str | None, + ca: str | None, +) -> None: + # FIXME needs a threading.Lock + # or access to underlying BatchXXX lock + # + # - check if settings are exactly same, do nothing in that case + # - replace current exported with a new exporter + if ca is not None and not ca.startswith('/'): + raise ValueError(f'{ca=} must be an absolute path') + assert _exporter + + # real exporter, hardcoded for now + real_exporter = OTLPSpanExporter(url, timeout=1) + # This is actually the max delay value in the sequence 1, 2, ..., MAX + # Set to 1 to disable sending live data (buffered data is still eventually sent) + # Set to 2 (or more) to enable sending live data (after buffered) + # + # _MAX_RETRY_TIMEOUT = 2 with timeout=1 means: + # - 1 attempt to send live, 1s sleep in the worst case + # _MAX_RETRY_TIMEOUT = 3 or 4 with timeout=1 means: + # - 1st attempt, 1s sleep, 2nd attempt, 1s sleep in the worst case + real_exporter._MAX_RETRY_TIMEOUT = 2 # type: ignore + _exporter.set_real_exporter(real_exporter) + _exporter.buffer.mark_observed() + + +def shutdown_tracing() -> None: + """Shutdown tracing, which is expected to flush the buffered data out.""" + logging.debug('FIXME shutdown tracing') + get_tracer_provider().shutdown() # type: ignore diff --git a/ops/_tracing/hacks.py b/ops/_tracing/hacks.py new file mode 100644 index 000000000..dddc0c053 --- /dev/null +++ b/ops/_tracing/hacks.py @@ -0,0 +1,71 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. +"""Workarounds for various Juju bugs.""" + +from __future__ import annotations + +import logging +import os +import shutil +from collections import defaultdict +from typing import Any + +import opentelemetry.trace +from importlib_metadata import distributions # type: ignore + +logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) + + +# FIXME must this hack be run before OTEL packages are imported? +# must test this +@tracer.start_as_current_span('ops.remove_stale_otel_sdk_packages') # type: ignore +def remove_stale_otel_sdk_packages() -> None: + """Remove stale opentelemetry sdk packages from the charm's Python venv. + + Charmcraft doesn't record empty directories in the charm (zip) file. + Juju creates directories on demand when a contained file is unpacked. + Juju removes what it has installed before the upgrade is unpacked. + Juju prior to 3.5.4 left unrecorded, stale directories. + + See https://github.com/canonical/grafana-agent-operator/issues/146 + and https://bugs.launchpad.net/juju/+bug/2058335 + + This only has an effect if executed on an upgrade-charm event. + """ + if os.getenv('JUJU_DISPATCH_PATH') != 'hooks/upgrade-charm': + return + + logger.debug('Applying _remove_stale_otel_sdk_packages patch on charm upgrade') + # group by name all distributions starting with "opentelemetry_" + otel_distributions: dict[str, list[Any]] = defaultdict(list) + for distribution in distributions(): + name = distribution._normalized_name + if name.startswith('opentelemetry_'): + otel_distributions[name].append(distribution) + + logger.debug(f'Found {len(otel_distributions)} opentelemetry distributions') + + # If we have multiple distributions with the same name, remove any that have 0 + # associated files + for name, distributions_ in otel_distributions.items(): + if len(distributions_) <= 1: + continue + + logger.debug(f'Package {name} has multiple ({len(distributions_)}) distributions.') + for distribution in distributions_: + if not distribution.files: # Not None or empty list + path = distribution._path + logger.info(f'Removing empty distribution of {name} at {path}.') + shutil.rmtree(path) + + logger.debug('Successfully applied _remove_stale_otel_sdk_packages patch. ') diff --git a/ops/charm.py b/ops/charm.py index 98bb8b1ac..3f137eb58 100644 --- a/ops/charm.py +++ b/ops/charm.py @@ -36,6 +36,8 @@ cast, ) +import opentelemetry.trace + from ops import model from ops._private import yaml from ops.framework import ( @@ -90,6 +92,7 @@ class _ContainerBaseDict(TypedDict): logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) class HookEvent(EventBase): @@ -1330,6 +1333,9 @@ def __init__(self, framework: ops.Framework): @property def on(self) -> CharmEvents: ... # noqa + # FIXME not sure if this is needed + # may help to exclude `super().__init__(...)` from `UserCharm.__init__` + @tracer.start_as_current_span('ops.CharmBase') # type: ignore def __init__(self, framework: Framework): super().__init__(framework, None) @@ -1385,6 +1391,8 @@ def config(self) -> model.ConfigData: return self.model.config +# FIXME may or may not be useful +@tracer.start_as_current_span('ops.charm._evaluate_status') # type: ignore def _evaluate_status(charm: CharmBase): # pyright: ignore[reportUnusedFunction] """Trigger collect-status events and evaluate and set the highest-priority status. @@ -1557,6 +1565,7 @@ def __init__( } @staticmethod + @tracer.start_as_current_span('ops.CharmMeta.from_charm_root') # type: ignore def from_charm_root(charm_root: Union[pathlib.Path, str]): """Initialise CharmMeta from the path to a charm repository root folder.""" _charm_root = pathlib.Path(charm_root) @@ -1916,6 +1925,8 @@ class ContainerMeta: resource is specified. """ + # FIXME is this ever needed? maybe if there are really many mounts? + @tracer.start_as_current_span('ops.ContainerMeta') # type: ignore def __init__(self, name: str, raw: Dict[str, Any]): self.name = name self._mounts: Dict[str, ContainerStorageMeta] = {} diff --git a/ops/framework.py b/ops/framework.py index 1d497704c..529d1c3c2 100644 --- a/ops/framework.py +++ b/ops/framework.py @@ -47,6 +47,8 @@ Union, ) +import opentelemetry.trace + from ops import charm from ops.model import Model, _ModelBackend from ops.storage import JujuStorage, NoSnapshotError, SQLiteStorage @@ -83,6 +85,7 @@ class _StoredObject(Protocol): _ObjectType = TypeVar('_ObjectType', bound='Object') logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) class Handle: @@ -600,6 +603,7 @@ class Framework(Object): @property def on(self) -> 'FrameworkEvents': ... # noqa + @tracer.start_as_current_span('ops.Framework') # type: ignore def __init__( self, storage: Union[SQLiteStorage, JujuStorage], @@ -693,6 +697,7 @@ def _forget(self, obj: 'Serializable'): """Stop tracking the given object. See also _track.""" self._objects.pop(obj.handle.path, None) + @tracer.start_as_current_span('ops.Framework.commit') # type: ignore def commit(self) -> None: """Save changes to the underlying backends.""" # Give a chance for objects to persist data they want to before a commit is made. @@ -866,6 +871,7 @@ def _event_is_in_storage( return True return False + @tracer.start_as_current_span('ops.Framework._emit') # type: ignore def _emit(self, event: EventBase): """See BoundEvent.emit for the public way to call this.""" saved = False @@ -907,6 +913,7 @@ def _emit(self, event: EventBase): if saved: self._reemit(event_path) + @tracer.start_as_current_span('ops.Framework.reemit') # type: ignore def reemit(self) -> None: """Reemit previously deferred events to the observers that deferred them. @@ -986,16 +993,42 @@ def _reemit(self, single_event_path: Optional[str] = None): if custom_handler: event_is_from_juju = isinstance(event, charm.HookEvent) event_is_action = isinstance(event, charm.ActionEvent) - with self._event_context(event_handle.kind): - if ( - event_is_from_juju or event_is_action - ) and self._juju_debug_at.intersection({'all', 'hook'}): - # Present the welcome message and run under PDB. - self._show_debug_code_message() - pdb.runcall(custom_handler, event) - else: - # Regular call to the registered method. - custom_handler(event) + synthetic = not event_is_from_juju and not event_is_action + event_module = event.__class__.__module__ + if event_module.startswith('ops.'): + # ops.charm.Events are re-exported through ops + event_module = 'ops' + event_class = f'{event_module}.{event.__class__.__qualname__}' + # verbatim_name for ops and custom events, and kebab-hook-names for Juju events + # NOTE: this is not the whole story, consider: + # * some_relation-relation-broken + # * workload_container_one-pebble-check-recovered + event_name = ( + event_handle.kind if synthetic else event_handle.kind.replace('_', '-') + ) + invocation = f'{event_name}: {observer_path}.{method_name}({event_class})' + with tracer.start_as_current_span(invocation) as span: # type: ignore + span.set_attribute('deferred', single_event_path is None) # type: ignore + span.set_attribute('synthetic', synthetic) # type: ignore + span.set_attribute('event_class', event_class) # type: ignore + # FIXME: perhaps name is superfluous given the event class? + span.set_attribute('event_name', event_name) # type: ignore + span.set_attribute('handler', f'{observer_path}.{method_name}') # type: ignore + # I don't think this exposes event attributes + # Example: + # FIXME: for interesting events, may want to add interesting attributes: + # e.g. relation joined: endpoint name? + # or secret blah: secret uuid? + # FIXME: consider if it makes sense to move the span ctx manager + # inside self._event_context... + with self._event_context(event_handle.kind): + if not synthetic and self._juju_debug_at.intersection({'all', 'hook'}): + # Present the welcome message and run under PDB. + self._show_debug_code_message() + pdb.runcall(custom_handler, event) + else: + # Regular call to the registered method. + custom_handler(event) else: logger.warning( diff --git a/ops/log.py b/ops/log.py index f31c4bfab..b59b442e6 100644 --- a/ops/log.py +++ b/ops/log.py @@ -14,6 +14,7 @@ """Interface to emit messages to the Juju logging system.""" +import contextvars import logging import sys import types @@ -26,9 +27,31 @@ class JujuLogHandler(logging.Handler): """A handler for sending logs and warnings to Juju via juju-log.""" + drop: contextvars.ContextVar[bool] + """When set to True, drop any record we're asked to emit, because: + - either we're already logging here and the record is recursive, + - or we're exporting tracing data and the record stems from that. + + # FIXME suggest a better name for this attribute + # + # Typical code path: + # logging -> this logger -> juju-log hook tool -> error -> + # logging [recursion] + # + # or + # + # helper thread -> export -> real export -> requests -> urllib3 -> log.debug(...) + # + # and additionally + # shutdown_tracing -> ... -> start_as_new_span -> if shutdown: logger.warning(...) + # + # FIXME: decision to be made if we want to capture export errors + """ + def __init__(self, model_backend: _ModelBackend, level: int = logging.DEBUG): super().__init__(level) self.model_backend = model_backend + self.drop = contextvars.ContextVar('drop', default=False) def emit(self, record: logging.LogRecord): """Send the specified logging record to the Juju backend. @@ -36,7 +59,14 @@ def emit(self, record: logging.LogRecord): This method is not used directly by the ops library, but by :class:`logging.Handler` itself as part of the logging machinery. """ - self.model_backend.juju_log(record.levelname, self.format(record)) + if self.drop.get(): + return + + token = self.drop.set(True) + try: + self.model_backend.juju_log(record.levelname, self.format(record)) + finally: + self.drop.reset(token) def setup_root_logging( @@ -60,6 +90,10 @@ def setup_root_logging( logger = logging.getLogger() logger.setLevel(logging.DEBUG) logger.addHandler(JujuLogHandler(model_backend)) + # FIXME temporary for debug, don't merge + # NOTE: figure out why I sometimes need this and other times I don't + # logger.addHandler(logging.StreamHandler(stream=sys.stderr)) + # logger.handlers[-1].setLevel(logging.NOTSET) def custom_showwarning( message: typing.Union[Warning, str], diff --git a/ops/model.py b/ops/model.py index a86c2db2b..b939f25c2 100644 --- a/ops/model.py +++ b/ops/model.py @@ -57,6 +57,8 @@ get_args, ) +import opentelemetry.trace + import ops import ops.pebble as pebble from ops._private import timeconv, yaml @@ -111,6 +113,7 @@ logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) MAX_LOG_LINE_LEN = 131071 # Max length of strings to pass to subshell. @@ -122,6 +125,7 @@ class Model: as ``self.model`` from any class that derives from :class:`Object`. """ + @tracer.start_as_current_span('ops.Model') # type: ignore def __init__( self, meta: 'ops.charm.CharmMeta', @@ -218,6 +222,7 @@ def uuid(self) -> str: """ return self._backend.model_uuid + @tracer.start_as_current_span('ops.Model.get_unit') # type: ignore def get_unit(self, unit_name: str) -> 'Unit': """Get an arbitrary unit by name. @@ -228,6 +233,7 @@ def get_unit(self, unit_name: str) -> 'Unit': """ return self._cache.get(Unit, unit_name) + @tracer.start_as_current_span('ops.Model.get_app') # type: ignore def get_app(self, app_name: str) -> 'Application': """Get an application by name. @@ -238,6 +244,7 @@ def get_app(self, app_name: str) -> 'Application': """ return self._cache.get(Application, app_name) + @tracer.start_as_current_span('ops.Model.get_relation') # type: ignore def get_relation( self, relation_name: str, relation_id: Optional[int] = None ) -> Optional['Relation']: @@ -258,6 +265,7 @@ def get_relation( """ return self.relations._get_unique(relation_name, relation_id) + @tracer.start_as_current_span('ops.Model.get_binding') # type: ignore def get_binding(self, binding_key: Union[str, 'Relation']) -> Optional['Binding']: """Get a network space binding. @@ -272,6 +280,7 @@ def get_binding(self, binding_key: Union[str, 'Relation']) -> Optional['Binding' """ return self._bindings.get(binding_key) + @tracer.start_as_current_span('ops.Model.get_secret') # type: ignore def get_secret(self, *, id: Optional[str] = None, label: Optional[str] = None) -> 'Secret': """Get the :class:`Secret` with the given ID or label. @@ -314,6 +323,7 @@ def get_secret(self, *, id: Optional[str] = None, label: Optional[str] = None) - _secret_set_cache=self._cache._secret_set_cache, ) + @tracer.start_as_current_span('ops.Model.get_cloud_spec') # type: ignore def get_cloud_spec(self) -> 'CloudSpec': """Get details of the cloud in which the model is deployed. @@ -391,6 +401,7 @@ def _invalidate(self): self._status = None @property + @tracer.start_as_current_span('ops.Application.status') # type: ignore def status(self) -> 'StatusBase': """Used to report or read the status of the overall application. @@ -429,6 +440,7 @@ def status(self) -> 'StatusBase': return self._status @status.setter + @tracer.start_as_current_span('ops.Application.status = ...') # type: ignore def status(self, value: 'StatusBase'): if not isinstance(value, StatusBase): raise InvalidStatusError( @@ -449,6 +461,7 @@ def status(self, value: 'StatusBase'): self._status = value + @tracer.start_as_current_span('ops.Application.planned_units') # type: ignore def planned_units(self) -> int: """Get the number of units that Juju has "planned" for this application. @@ -474,6 +487,7 @@ def planned_units(self) -> int: def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} {self.name}>' + @tracer.start_as_current_span('ops.Application.add_secret') # type: ignore def add_secret( self, content: Dict[str, str], @@ -577,6 +591,7 @@ def _invalidate(self): self._status = None @property + @tracer.start_as_current_span('ops.Unit.status') # type: ignore def status(self) -> 'StatusBase': """Used to report or read the status of a specific unit. @@ -609,6 +624,7 @@ def status(self) -> 'StatusBase': return self._status @status.setter + @tracer.start_as_current_span('ops.Unit.status = ...') # type: ignore def status(self, value: 'StatusBase'): if not isinstance(value, StatusBase): raise InvalidStatusError(f'invalid value provided for unit {self} status: {value}') @@ -626,6 +642,7 @@ def status(self, value: 'StatusBase'): def __repr__(self): return f'<{type(self).__module__}.{type(self).__name__} {self.name}>' + @tracer.start_as_current_span('ops.Unit.is_leader') # type: ignore def is_leader(self) -> bool: """Return whether this unit is the leader of its application. @@ -643,6 +660,7 @@ def is_leader(self) -> bool: f'leadership status of remote units ({self}) is not visible to other applications' ) + @tracer.start_as_current_span('ops.Unit.set_workload_version') # type: ignore def set_workload_version(self, version: str) -> None: """Record the version of the software running as the workload. @@ -677,6 +695,7 @@ def get_container(self, container_name: str) -> 'Container': except KeyError: raise ModelError(f'container {container_name!r} not found') from None + @tracer.start_as_current_span('ops.Unit.add_secret') # type: ignore def add_secret( self, content: Dict[str, str], @@ -710,6 +729,7 @@ def add_secret( _secret_set_cache=self._cache._secret_set_cache, ) + @tracer.start_as_current_span('ops.Unit.open_port') # type: ignore def open_port( self, protocol: typing.Literal['tcp', 'udp', 'icmp'], port: Optional[int] = None ) -> None: @@ -738,6 +758,7 @@ def open_port( """ self._backend.open_port(protocol.lower(), port) + @tracer.start_as_current_span('ops.Unit.close_port') # type: ignore def close_port( self, protocol: typing.Literal['tcp', 'udp', 'icmp'], port: Optional[int] = None ) -> None: @@ -767,10 +788,12 @@ def close_port( """ self._backend.close_port(protocol.lower(), port) + @tracer.start_as_current_span('ops.Unit.opened_ports') # type: ignore def opened_ports(self) -> Set['Port']: """Return a list of opened ports for this unit.""" return self._backend.opened_ports() + @tracer.start_as_current_span('ops.Unit.set_ports') # type: ignore def set_ports(self, *ports: Union[int, 'Port']) -> None: """Set the open ports for this unit, closing any others that are open. @@ -803,6 +826,7 @@ def set_ports(self, *ports: Union[int, 'Port']) -> None: for protocol, port in desired - existing: self._backend.open_port(protocol, port) + @tracer.start_as_current_span('ops.Unit.reboot') # type: ignore def reboot(self, now: bool = False) -> None: """Reboot the host machine. @@ -2339,6 +2363,7 @@ def __init__( pebble_client = backend.get_pebble(socket_path) self._pebble: pebble.Client = pebble_client + @tracer.start_as_current_span('ops.Container.can_connect') # type: ignore def can_connect(self) -> bool: """Report whether the Pebble API is reachable in the container. @@ -2372,14 +2397,17 @@ def can_connect(self) -> bool: return False return True + @tracer.start_as_current_span('ops.Container.autostart') # type: ignore def autostart(self) -> None: """Autostart all services marked as ``startup: enabled``.""" self._pebble.autostart_services() + @tracer.start_as_current_span('ops.Container.replan') # type: ignore def replan(self) -> None: """Replan all services: restart changed services and start startup-enabled services.""" self._pebble.replan_services() + @tracer.start_as_current_span('ops.Container.start') # type: ignore def start(self, *service_names: str): """Start given service(s) by name.""" if not service_names: @@ -2387,6 +2415,7 @@ def start(self, *service_names: str): self._pebble.start_services(service_names) + @tracer.start_as_current_span('ops.Container.restart') # type: ignore def restart(self, *service_names: str): """Restart the given service(s) by name. @@ -2409,6 +2438,7 @@ def restart(self, *service_names: str): self._pebble.stop_services(stop) self._pebble.start_services(service_names) + @tracer.start_as_current_span('ops.Container.stop') # type: ignore def stop(self, *service_names: str): """Stop given service(s) by name.""" if not service_names: @@ -2416,6 +2446,7 @@ def stop(self, *service_names: str): self._pebble.stop_services(service_names) + @tracer.start_as_current_span('ops.Container.add_layer') # type: ignore def add_layer( self, label: str, @@ -2438,6 +2469,7 @@ def add_layer( """ self._pebble.add_layer(label, layer, combine=combine) + @tracer.start_as_current_span('ops.Container.get_plan') # type: ignore def get_plan(self) -> pebble.Plan: """Get the combined Pebble configuration. @@ -2447,6 +2479,7 @@ def get_plan(self) -> pebble.Plan: """ return self._pebble.get_plan() + @tracer.start_as_current_span('ops.Container.service_name') # type: ignore def get_services(self, *service_names: str) -> Mapping[str, 'pebble.ServiceInfo']: """Fetch and return a mapping of status information indexed by service name. @@ -2457,6 +2490,7 @@ def get_services(self, *service_names: str) -> Mapping[str, 'pebble.ServiceInfo' services = self._pebble.get_services(names) return ServiceInfoMapping(services) + @tracer.start_as_current_span('ops.Container.get_service') # type: ignore def get_service(self, service_name: str) -> pebble.ServiceInfo: """Get status information for a single named service. @@ -2470,6 +2504,7 @@ def get_service(self, service_name: str) -> pebble.ServiceInfo: raise RuntimeError(f'expected 1 service, got {len(services)}') return services[service_name] + @tracer.start_as_current_span('ops.Container.get_checks') # type: ignore def get_checks( self, *check_names: str, level: Optional[pebble.CheckLevel] = None ) -> 'CheckInfoMapping': @@ -2484,6 +2519,7 @@ def get_checks( checks = self._pebble.get_checks(names=check_names or None, level=level) return CheckInfoMapping(checks) + @tracer.start_as_current_span('ops.Container.get_check') # type: ignore def get_check(self, check_name: str) -> pebble.CheckInfo: """Get check information for a single named check. @@ -2503,6 +2539,7 @@ def pull(self, path: Union[str, PurePath], *, encoding: None) -> BinaryIO: ... @typing.overload def pull(self, path: Union[str, PurePath], *, encoding: str = 'utf-8') -> TextIO: ... + @tracer.start_as_current_span('ops.Container.pull') # type: ignore def pull( self, path: Union[str, PurePath], *, encoding: Optional[str] = 'utf-8' ) -> Union[BinaryIO, TextIO]: @@ -2524,6 +2561,7 @@ def pull( """ return self._pebble.pull(str(path), encoding=encoding) + @tracer.start_as_current_span('ops.Container.push') # type: ignore def push( self, path: Union[str, PurePath], @@ -2573,6 +2611,7 @@ def push( group=group, ) + @tracer.start_as_current_span('ops.Container.list_files') # type: ignore def list_files( self, path: Union[str, PurePath], *, pattern: Optional[str] = None, itself: bool = False ) -> List[pebble.FileInfo]: @@ -2591,6 +2630,7 @@ def list_files( """ return self._pebble.list_files(str(path), pattern=pattern, itself=itself) + @tracer.start_as_current_span('ops.Container.push_path') # type: ignore def push_path( self, source_path: Union[str, Path, Iterable[Union[str, Path]]], @@ -2678,6 +2718,7 @@ def local_list(source_path: Path) -> List[pebble.FileInfo]: if errors: raise MultiPushPullError('failed to push one or more files', errors) + @tracer.start_as_current_span('ops.Container.pull_path') # type: ignore def pull_path( self, source_path: Union[str, PurePath, Iterable[Union[str, PurePath]]], @@ -2748,7 +2789,9 @@ def pull_path( dstpath.parent.mkdir(parents=True, exist_ok=True) with self.pull(info.path, encoding=None) as src: with dstpath.open(mode='wb') as dst: - shutil.copyfileobj(src, dst) + with tracer.start_as_current_span('shutil.copyfileobj') as span: # type: ignore + span.set_attribute('dstpath', str(dstpath)) # type: ignore + shutil.copyfileobj(src, dst) except (OSError, pebble.Error) as err: errors.append((str(source_path), err)) if errors: @@ -2846,6 +2889,7 @@ def _build_destpath( path_suffix = os.path.relpath(str(file_path), prefix) return dest_dir / path_suffix + @tracer.start_as_current_span('ops.Container.exists') # type: ignore def exists(self, path: Union[str, PurePath]) -> bool: """Report whether a path exists on the container filesystem.""" try: @@ -2856,6 +2900,7 @@ def exists(self, path: Union[str, PurePath]) -> bool: raise err return True + @tracer.start_as_current_span('ops.Container.isdir') # type: ignore def isdir(self, path: Union[str, PurePath]) -> bool: """Report whether a directory exists at the given path on the container filesystem.""" try: @@ -2866,6 +2911,7 @@ def isdir(self, path: Union[str, PurePath]) -> bool: raise err return files[0].type == pebble.FileType.DIRECTORY + @tracer.start_as_current_span('ops.Container.make_dir') # type: ignore def make_dir( self, path: Union[str, PurePath], @@ -2901,6 +2947,7 @@ def make_dir( group=group, ) + @tracer.start_as_current_span('ops.Container.remove_path') # type: ignore def remove_path(self, path: Union[str, PurePath], *, recursive: bool = False): """Remove a file or directory on the remote system. @@ -2959,6 +3006,7 @@ def exec( combine_stderr: bool = False, ) -> pebble.ExecProcess[bytes]: ... + @tracer.start_as_current_span('ops.Container.exec') # type: ignore def exec( self, command: List[str], @@ -3011,6 +3059,7 @@ def exec( combine_stderr=combine_stderr, ) + @tracer.start_as_current_span('ops.Container.send_signal') # type: ignore def send_signal(self, sig: Union[int, str], *service_names: str): """Send the given signal to one or more services. @@ -3028,6 +3077,7 @@ def send_signal(self, sig: Union[int, str], *service_names: str): self._pebble.send_signal(sig, service_names) + @tracer.start_as_current_span('ops.Container.get_notice') # type: ignore def get_notice(self, id: str) -> pebble.Notice: """Get details about a single notice by ID. @@ -3043,6 +3093,7 @@ def get_notice(self, id: str) -> pebble.Notice: raise ModelError(f'notice {id!r} not found') from e raise + @tracer.start_as_current_span('ops.Container.get_notices') # type: ignore def get_notices( self, *, @@ -3471,7 +3522,9 @@ def pod_spec_set( args.extend(['--k8s-resources', str(k8s_res_path)]) self._run('pod-spec-set', *args) finally: - shutil.rmtree(str(tmpdir)) + with tracer.start_as_current_span('shutil.rmtree') as span: # type: ignore + span.set_attribute('tmpdir', str(tmpdir)) # type: ignore + shutil.rmtree(str(tmpdir)) def status_get(self, *, is_app: bool = False) -> '_StatusDict': """Get a status of a unit or an application. diff --git a/ops/pebble.py b/ops/pebble.py index f17e1cf94..7089f8602 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -79,6 +79,7 @@ Union, ) +import opentelemetry.trace import websocket from ops._private import timeconv, yaml @@ -343,6 +344,7 @@ def recv(self) -> Union[str, bytes]: ... logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) class _NotProvidedFlag: @@ -1697,6 +1699,7 @@ def __del__(self): msg = 'ExecProcess instance garbage collected without call to wait() or wait_output()' warnings.warn(msg, ResourceWarning) + @tracer.start_as_current_span('ops.pebble.ExecProcess.wait') # type: ignore def wait(self): """Wait for the process to finish. @@ -1746,6 +1749,7 @@ def _wait(self) -> int: exit_code = change.tasks[0].data.get('exit-code', -1) return exit_code + @tracer.start_as_current_span('ops.pebble.ExecProcess.wait_output') # type: ignore def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]: """Wait for the process to finish and return tuple of (stdout, stderr). @@ -1787,6 +1791,7 @@ def wait_output(self) -> Tuple[AnyStr, Optional[AnyStr]]: return (out_value, err_value) + @tracer.start_as_current_span('ops.pebble.ExecProcess.send_signal') # type: ignore def send_signal(self, sig: Union[int, str]): """Send the given signal to the running process. @@ -1796,6 +1801,7 @@ def send_signal(self, sig: Union[int, str]): """ if isinstance(sig, int): sig = signal.Signals(sig).name + opentelemetry.trace.get_current_span().set_attribute('signal', sig) payload = { 'command': 'signal', 'signal': {'name': sig}, @@ -2068,23 +2074,27 @@ def _request_raw( return response + @tracer.start_as_current_span('ops.pebble.Client.get_system_info') # type: ignore def get_system_info(self) -> SystemInfo: """Get system info.""" resp = self._request('GET', '/v1/system-info') return SystemInfo.from_dict(resp['result']) + @tracer.start_as_current_span('ops.pebble.Client.get_warnings') # type: ignore def get_warnings(self, select: WarningState = WarningState.PENDING) -> List[Warning]: """Get list of warnings in given state (pending or all).""" query = {'select': select.value} resp = self._request('GET', '/v1/warnings', query) return [Warning.from_dict(w) for w in resp['result']] + @tracer.start_as_current_span('ops.pebble.Client.ack_warnings') # type: ignore def ack_warnings(self, timestamp: datetime.datetime) -> int: """Acknowledge warnings up to given timestamp, return number acknowledged.""" body = {'action': 'okay', 'timestamp': timestamp.isoformat()} resp = self._request('POST', '/v1/warnings', body=body) return resp['result'] + @tracer.start_as_current_span('ops.pebble.Client.get_changes') # type: ignore def get_changes( self, select: ChangeState = ChangeState.IN_PROGRESS, @@ -2097,17 +2107,20 @@ def get_changes( resp = self._request('GET', '/v1/changes', query) return [Change.from_dict(c) for c in resp['result']] + @tracer.start_as_current_span('ops.pebble.Client.get_change') # type: ignore def get_change(self, change_id: ChangeID) -> Change: """Get single change by ID.""" resp = self._request('GET', f'/v1/changes/{change_id}') return Change.from_dict(resp['result']) + @tracer.start_as_current_span('ops.pebble.Client.abort_change') # type: ignore def abort_change(self, change_id: ChangeID) -> Change: """Abort change with given ID.""" body = {'action': 'abort'} resp = self._request('POST', f'/v1/changes/{change_id}', body=body) return Change.from_dict(resp['result']) + @tracer.start_as_current_span('ops.pebble.Client.autostart_services') # type: ignore def autostart_services(self, timeout: float = 30.0, delay: float = 0.1) -> ChangeID: """Start the startup-enabled services and wait (poll) for them to be started. @@ -2125,6 +2138,7 @@ def autostart_services(self, timeout: float = 30.0, delay: float = 0.1) -> Chang """ return self._services_action('autostart', [], timeout, delay) + @tracer.start_as_current_span('ops.pebble.Client.replan_services') # type: ignore def replan_services(self, timeout: float = 30.0, delay: float = 0.1) -> ChangeID: """Replan by (re)starting changed and startup-enabled services and wait for them to start. @@ -2143,6 +2157,7 @@ def replan_services(self, timeout: float = 30.0, delay: float = 0.1) -> ChangeID """ return self._services_action('replan', [], timeout, delay) + @tracer.start_as_current_span('ops.pebble.Client.start_services') # type: ignore def start_services( self, services: Iterable[str], @@ -2167,6 +2182,7 @@ def start_services( """ return self._services_action('start', services, timeout, delay) + @tracer.start_as_current_span('ops.pebble.Client.stop_services') # type: ignore def stop_services( self, services: Iterable[str], @@ -2191,6 +2207,7 @@ def stop_services( """ return self._services_action('stop', services, timeout, delay) + @tracer.start_as_current_span('ops.pebble.Client.restart_services') # type: ignore def restart_services( self, services: Iterable[str], @@ -2231,6 +2248,7 @@ def _services_action( ) services = list(services) + opentelemetry.trace.get_current_span().set_attribute('services', services) for s in services: if not isinstance(s, str): raise TypeError(f'service names must be str, not {type(s).__name__}') @@ -2244,6 +2262,7 @@ def _services_action( raise ChangeError(change.err, change) return change_id + @tracer.start_as_current_span('ops.pebble.Client.wait_change') # type: ignore def wait_change( self, change_id: ChangeID, @@ -2335,6 +2354,7 @@ def _wait_change_using_polling( raise TimeoutError(f'timed out waiting for change {change_id} ({timeout} seconds)') + @tracer.start_as_current_span('ops.pebble.Client.add_layer') # type: ignore def add_layer(self, label: str, layer: Union[str, LayerDict, Layer], *, combine: bool = False): """Dynamically add a new layer onto the Pebble configuration layers. @@ -2345,6 +2365,7 @@ def add_layer(self, label: str, layer: Union[str, LayerDict, Layer], *, combine: """ if not isinstance(label, str): raise TypeError(f'label must be a str, not {type(label).__name__}') + opentelemetry.trace.get_current_span().set_attribute('label', label) if isinstance(layer, str): layer_yaml = layer @@ -2366,11 +2387,13 @@ def add_layer(self, label: str, layer: Union[str, LayerDict, Layer], *, combine: } self._request('POST', '/v1/layers', body=body) + @tracer.start_as_current_span('ops.pebble.Client.get_plan') # type: ignore def get_plan(self) -> Plan: """Get the Pebble plan (contains combined layer configuration).""" resp = self._request('GET', '/v1/plan', {'format': 'yaml'}) return Plan(resp['result']) + @tracer.start_as_current_span('ops.pebble.Client.get_services') # type: ignore def get_services(self, names: Optional[Iterable[str]] = None) -> List[ServiceInfo]: """Get the service status for the configured services. @@ -2389,6 +2412,7 @@ def pull(self, path: str, *, encoding: None) -> BinaryIO: ... @typing.overload def pull(self, path: str, *, encoding: str = 'utf-8') -> TextIO: ... + @tracer.start_as_current_span('ops.pebble.Client.pull') # type: ignore def pull(self, path: str, *, encoding: Optional[str] = 'utf-8') -> Union[BinaryIO, TextIO]: """Read a file's content from the remote system. @@ -2456,6 +2480,7 @@ def _raise_on_path_error(resp: _FilesResponse, path: str): if error: raise PathError(error['kind'], error['message']) + @tracer.start_as_current_span('ops.pebble.Client.push') # type: ignore def push( self, path: str, @@ -2585,6 +2610,7 @@ def generator() -> Generator[bytes, None, None]: return generator(), content_type + @tracer.start_as_current_span('ops.pebble.Client.list_files') # type: ignore def list_files( self, path: str, *, pattern: Optional[str] = None, itself: bool = False ) -> List[FileInfo]: @@ -2617,6 +2643,7 @@ def list_files( result: List[_FileInfoDict] = resp['result'] or [] # in case it's null instead of [] return [FileInfo.from_dict(d) for d in result] + @tracer.start_as_current_span('ops.pebble.Client.make_dir') # type: ignore def make_dir( self, path: str, @@ -2657,6 +2684,7 @@ def make_dir( resp = self._request('POST', '/v1/files', None, body) self._raise_on_path_error(typing.cast('_FilesResponse', resp), path) + @tracer.start_as_current_span('ops.pebble.Client.remove_path') # type: ignore def remove_path(self, path: str, *, recursive: bool = False): """Remove a file or directory on the remote system. @@ -2723,6 +2751,10 @@ def exec( combine_stderr: bool = False, ) -> ExecProcess[bytes]: ... + # FIXME would be kinda nice to log the command + # but what if the command contains some password? + # maybe log the first element of command? + @tracer.start_as_current_span('ops.pebble.Client.exec') # type: ignore def exec( self, command: List[str], @@ -3002,6 +3034,7 @@ def _websocket_url(self, task_id: str, websocket_id: str) -> str: url = f'{base_url}/v1/tasks/{task_id}/websocket/{websocket_id}' return url + @tracer.start_as_current_span('ops.pebble.Client.send_signal') # type: ignore def send_signal(self, sig: Union[int, str], services: Iterable[str]): """Send the given signal to the list of services named. @@ -3018,18 +3051,23 @@ def send_signal(self, sig: Union[int, str], services: Iterable[str]): raise TypeError( f'services must be of type Iterable[str], not {type(services).__name__}' ) + services = list(services) for s in services: if not isinstance(s, str): raise TypeError(f'service names must be str, not {type(s).__name__}') if isinstance(sig, int): sig = signal.Signals(sig).name + + opentelemetry.trace.get_current_span().set_attribute('signal', sig) + opentelemetry.trace.get_current_span().set_attribute('services', services) body = { 'signal': sig, 'services': services, } self._request('POST', '/v1/signals', body=body) + @tracer.start_as_current_span('ops.pebble.Client.get_checks') # type: ignore def get_checks( self, level: Optional[CheckLevel] = None, names: Optional[Iterable[str]] = None ) -> List[CheckInfo]: @@ -3052,6 +3090,7 @@ def get_checks( resp = self._request('GET', '/v1/checks', query) return [CheckInfo.from_dict(info) for info in resp['result']] + @tracer.start_as_current_span('ops.pebble.Client.notify') # type: ignore def notify( self, type: NoticeType, @@ -3084,6 +3123,7 @@ def notify( resp = self._request('POST', '/v1/notices', body=body) return resp['result']['id'] + @tracer.start_as_current_span('ops.pebble.Client.get_notice') # type: ignore def get_notice(self, id: str) -> Notice: """Get details about a single notice by ID. @@ -3093,6 +3133,7 @@ def get_notice(self, id: str) -> Notice: resp = self._request('GET', f'/v1/notices/{id}') return Notice.from_dict(resp['result']) + @tracer.start_as_current_span('ops.pebble.Client.get_notices') # type: ignore def get_notices( self, *, diff --git a/ops/storage.py b/ops/storage.py index f9ba0fcc9..23f730a63 100644 --- a/ops/storage.py +++ b/ops/storage.py @@ -90,6 +90,7 @@ def _setup(self): # not until the transaction ends. self._db.execute('PRAGMA locking_mode=EXCLUSIVE') c = self._db.execute('BEGIN') + # FIXME why don't I see this in sqlite3 instrumentation? c.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='snapshot'") if c.fetchone()[0] == 0: # Keep in mind what might happen if the process dies somewhere below. diff --git a/pyproject.toml b/pyproject.toml index ff6611ff7..d0627e1f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ classifiers = [ dependencies = [ "PyYAML==6.*", "websocket-client==1.*", + "opentelemetry-api~=1.21", # TODO: I've no idea what the lowest version may be + "importlib-metadata", ] dynamic = ["version"] @@ -40,6 +42,11 @@ docs = [ testing = [ "ops-scenario>=7.0.5,<8", ] +tracing = [ + "opentelemetry-exporter-otlp-proto-http~=1.29", # TODO: copied from the charm lib, no idea about the version range + "opentelemetry-instrumentation-urllib~=0.50b0", + "opentelemetry-instrumentation-sqlite3~=0.50b0", +] # Empty for now, because Harness is bundled with the base install, but allow # specifying the extra to ease transition later. harness = [] diff --git a/test/test_tracing_buffer.py b/test/test_tracing_buffer.py new file mode 100644 index 000000000..cbc2ee974 --- /dev/null +++ b/test/test_tracing_buffer.py @@ -0,0 +1,19 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + + +def test_mark_observed(): ... +def test_mark_observed_no_ids(): ... +def test_mark_observed_missing_ids(): ... diff --git a/test/test_tracing_export.py b/test/test_tracing_export.py new file mode 100644 index 000000000..88a8121a9 --- /dev/null +++ b/test/test_tracing_export.py @@ -0,0 +1,19 @@ +# Copyright 2025 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + + +def test_set_tracing_destination_again(): ... +def test_set_tracing_destination_none(): ... +def test_set_tracing_destination(): ... diff --git a/tox.ini b/tox.ini index b550a6d66..009db5e6e 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ passenv = description = Compile the requirements.txt file for docs deps = pip-tools commands = - pip-compile --extra=docs -o docs/requirements.txt pyproject.toml + pip-compile --extra=docs,tracing -o docs/requirements.txt pyproject.toml python -c 'open("docs/requirements.txt", "a").write("./testing/\n")' [testenv:docs] @@ -83,6 +83,7 @@ allowlist_externals = cp deps = PyYAML==6.* websocket-client==1.* + opentelemetry-api~=1.21 pyright==1.1.385 pytest~=7.2 typing_extensions~=4.2 @@ -99,6 +100,7 @@ passenv = deps = PyYAML==6.* websocket-client==1.* + opentelemetry-api~=1.21 pytest~=7.2 pytest-xdist~=3.6 typing_extensions~=4.2 @@ -120,6 +122,7 @@ passenv = deps = PyYAML==6.* websocket-client==1.* + opentelemetry-api~=1.21 coverage[toml]~=7.0 pytest~=7.2 typing_extensions~=4.2 @@ -144,6 +147,7 @@ passenv = deps = PyYAML==6.* websocket-client==1.* + opentelemetry-api~=1.21 pytest~=7.2 pytest-benchmark~=5.0 typing_extensions~=4.2 @@ -163,6 +167,7 @@ setenv = deps = PyYAML==6.* websocket-client==1.* + opentelemetry-api~=1.21 coverage[toml]~=7.0 pytest~=7.2 typing_extensions~=4.2 From 10308d8e70c5a020b8a13bc9f0cbf18b30b6cca8 Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Thu, 30 Jan 2025 17:54:05 +0900 Subject: [PATCH 2/6] chore: events as OTEL events; wip custom events --- dont-merge/fake-charm.py | 31 ++++++++++++++++++++++++++++++- ops/charm.py | 7 ------- ops/framework.py | 9 +++++---- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/dont-merge/fake-charm.py b/dont-merge/fake-charm.py index 8c1514726..5e6427484 100755 --- a/dont-merge/fake-charm.py +++ b/dont-merge/fake-charm.py @@ -25,6 +25,31 @@ tracer = opentelemetry.trace.get_tracer(__name__) +class DatabaseReadyEvent(ops.charm.EventBase): + """Event representing that the database is ready.""" + + +class DatabaseRequirerEvents(ops.framework.ObjectEvents): + """Container for Database Requirer events.""" + + ready = ops.charm.EventSource(DatabaseReadyEvent) + + +class DatabaseRequirer(ops.framework.Object): + """Dummy docstring.""" + + on = DatabaseRequirerEvents() # type: ignore + + def __init__(self, charm: ops.CharmBase): + """Dummy docstring.""" + super().__init__(charm, 'foo') + self.framework.observe(charm.on.start, self._on_db_changed) + + def _on_db_changed(self, event: ops.StartEvent) -> None: + """Dummy docstring.""" + self.on.ready.emit() + + class FakeCharm(ops.CharmBase): """Dummy docstring.""" @@ -33,13 +58,17 @@ def __init__(self, framework: ops.Framework): super().__init__(framework) self.framework.observe(self.on.start, self._on_start) self.framework.observe(self.on.collect_app_status, self._on_collect_app_status) - self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) + self.db_requirer = DatabaseRequirer(self) + self.framework.observe(self.db_requirer.on.ready, self._on_db_ready) def _on_start(self, event: ops.StartEvent) -> None: """Dummy docstring.""" ops.set_tracing_destination(url='http://localhost:4318/v1/traces') self.dummy_load(event, 0.0025) + def _on_db_ready(self, event: DatabaseReadyEvent) -> None: + self.dummy_load(event, 0.001) + def _on_collect_app_status(self, event: ops.CollectStatusEvent) -> None: """Dummy docstring.""" self.dummy_load(event) diff --git a/ops/charm.py b/ops/charm.py index 3f137eb58..300fb453b 100644 --- a/ops/charm.py +++ b/ops/charm.py @@ -1333,8 +1333,6 @@ def __init__(self, framework: ops.Framework): @property def on(self) -> CharmEvents: ... # noqa - # FIXME not sure if this is needed - # may help to exclude `super().__init__(...)` from `UserCharm.__init__` @tracer.start_as_current_span('ops.CharmBase') # type: ignore def __init__(self, framework: Framework): super().__init__(framework, None) @@ -1391,8 +1389,6 @@ def config(self) -> model.ConfigData: return self.model.config -# FIXME may or may not be useful -@tracer.start_as_current_span('ops.charm._evaluate_status') # type: ignore def _evaluate_status(charm: CharmBase): # pyright: ignore[reportUnusedFunction] """Trigger collect-status events and evaluate and set the highest-priority status. @@ -1565,7 +1561,6 @@ def __init__( } @staticmethod - @tracer.start_as_current_span('ops.CharmMeta.from_charm_root') # type: ignore def from_charm_root(charm_root: Union[pathlib.Path, str]): """Initialise CharmMeta from the path to a charm repository root folder.""" _charm_root = pathlib.Path(charm_root) @@ -1925,8 +1920,6 @@ class ContainerMeta: resource is specified. """ - # FIXME is this ever needed? maybe if there are really many mounts? - @tracer.start_as_current_span('ops.ContainerMeta') # type: ignore def __init__(self, name: str, raw: Dict[str, Any]): self.name = name self._mounts: Dict[str, ContainerStorageMeta] = {} diff --git a/ops/framework.py b/ops/framework.py index 529d1c3c2..7afdb2e83 100644 --- a/ops/framework.py +++ b/ops/framework.py @@ -603,7 +603,6 @@ class Framework(Object): @property def on(self) -> 'FrameworkEvents': ... # noqa - @tracer.start_as_current_span('ops.Framework') # type: ignore def __init__( self, storage: Union[SQLiteStorage, JujuStorage], @@ -697,7 +696,6 @@ def _forget(self, obj: 'Serializable'): """Stop tracking the given object. See also _track.""" self._objects.pop(obj.handle.path, None) - @tracer.start_as_current_span('ops.Framework.commit') # type: ignore def commit(self) -> None: """Save changes to the underlying backends.""" # Give a chance for objects to persist data they want to before a commit is made. @@ -871,12 +869,16 @@ def _event_is_in_storage( return True return False - @tracer.start_as_current_span('ops.Framework._emit') # type: ignore def _emit(self, event: EventBase): """See BoundEvent.emit for the public way to call this.""" saved = False event_path = event.handle.path event_kind = event.handle.kind + ops_event = event.__class__.__module__.startswith('ops.') + opentelemetry.trace.get_current_span().add_event( + f'{"ops." if ops_event else ""}{event.__class__.__name__}', + attributes={'deferred': event.deferred, 'kind': event_kind}, + ) parent = event.handle.parent assert isinstance(parent, Handle), 'event handle must have a parent' parent_path = parent.path @@ -913,7 +915,6 @@ def _emit(self, event: EventBase): if saved: self._reemit(event_path) - @tracer.start_as_current_span('ops.Framework.reemit') # type: ignore def reemit(self) -> None: """Reemit previously deferred events to the observers that deferred them. From f7b57d28f3fd91c3b7a34348de703108bba07f5d Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Fri, 31 Jan 2025 15:53:58 +0900 Subject: [PATCH 3/6] wip --- ops/_tracing/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ops/_tracing/export.py b/ops/_tracing/export.py index 0a2783459..10df73e19 100644 --- a/ops/_tracing/export.py +++ b/ops/_tracing/export.py @@ -67,7 +67,6 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: Note: to avoid data loops or recursion, this function cannot be instrumented. """ - logging.debug(f'FIXME export {len(spans)=}') with suppress_juju_log_handler(): # Note: # this is called in a helper thread, which is daemonic, @@ -81,6 +80,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: assert spans # the BatchSpanProcessor won't call us if there's no data # TODO: this will change in the JSON experiment data: bytes = trace_encoder.encode_spans(spans).SerializePartialToString() + logging.debug(f'FIXME export {len(spans)=} {len(data)=}') rv = self.buffer.pump(data) logging.debug('FIXME saved') assert rv From 1ac5fb99c968351b527c9dc0d0290d061fcc407e Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Fri, 31 Jan 2025 16:31:09 +0900 Subject: [PATCH 4/6] discussion: if we're goiong to instrument yaml, let's log byte size --- ops/_private/yaml.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ops/_private/yaml.py b/ops/_private/yaml.py index 84f5b80a8..294bfe2f0 100644 --- a/ops/_private/yaml.py +++ b/ops/_private/yaml.py @@ -29,10 +29,18 @@ @tracer.start_as_current_span('ops.yaml.safe_load') # type: ignore def safe_load(stream: Union[str, TextIO]) -> Any: """Same as yaml.safe_load, but use fast C loader if available.""" + if not isinstance(stream, TextIO): + opentelemetry.trace.get_current_span().set_attribute("len", len(stream)) + opentelemetry.trace.get_current_span().set_attribute("stream", isinstance(stream, TextIO)) return yaml.load(stream, Loader=_safe_loader) # noqa: S506 @tracer.start_as_current_span('ops.yaml.safe_dump') # type: ignore def safe_dump(data: Any, stream: Optional[TextIO] = None) -> str: """Same as yaml.safe_dump, but use fast C dumper if available.""" - return yaml.dump(data, stream=stream, Dumper=_safe_dumper) # type: ignore + rv = yaml.dump(data, stream=stream, Dumper=_safe_dumper) + if stream is None: + assert rv is not None + opentelemetry.trace.get_current_span().set_attribute("len", len(rv)) + opentelemetry.trace.get_current_span().set_attribute("stream", stream is not None) + return rv # type: ignore From 397a3ebb63dbeaf92e6184dddba38770b305074c Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Fri, 31 Jan 2025 17:12:54 +0900 Subject: [PATCH 5/6] instrument mappings --- ops/model.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/ops/model.py b/ops/model.py index b939f25c2..3ad3721b8 100644 --- a/ops/model.py +++ b/ops/model.py @@ -885,6 +885,7 @@ class _GenericLazyMapping(Mapping[str, _LazyValueType], ABC): _lazy_data: Optional[Dict[str, _LazyValueType]] = None @abstractmethod + # FIXME instrument here, or individual access? def _load(self) -> Dict[str, _LazyValueType]: raise NotImplementedError() @@ -898,20 +899,27 @@ def _data(self) -> Dict[str, _LazyValueType]: def _invalidate(self): self._lazy_data = None + # FIXME isntrument all these? def __contains__(self, key: str) -> bool: - return key in self._data + with tracer.start_as_current_span(f"x in ops.{self.__class__.__name__}"): # type: ignore + return key in self._data def __len__(self) -> int: - return len(self._data) + with tracer.start_as_current_span(f"len(ops.{self.__class__.__name__})"): # type: ignore + return len(self._data) def __iter__(self): - return iter(self._data) + with tracer.start_as_current_span(f"for x in ops.{self.__class__.__name__}"): # type: ignore + return iter(self._data) def __getitem__(self, key: str) -> _LazyValueType: - return self._data[key] + # FIXME code path also covers .get() + with tracer.start_as_current_span(f"ops.{self.__class__.__name__}[x]"): # type: ignore + return self._data[key] def __repr__(self) -> str: - return repr(self._data) + with tracer.start_as_current_span(f"repr(ops.{self.__class__.__name__})"): # type: ignore + return repr(self._data) class LazyMapping(_GenericLazyMapping[str]): @@ -952,6 +960,8 @@ def __len__(self): def __iter__(self) -> Iterable[str]: return iter(self._data) + # FIXME: called by bass class for .get() too + @tracer.start_as_current_span("ops.RelationMapping[x]") # type: ignore def __getitem__(self, relation_name: str) -> List['Relation']: is_peer = relation_name in self._peers relation_list: Optional[List[Relation]] = self._data[relation_name] @@ -1020,6 +1030,7 @@ def __init__(self, backend: '_ModelBackend'): self._backend = backend self._data: _BindingDictType = {} + # FIXME check def get(self, binding_key: Union[str, 'Relation']) -> 'Binding': """Get a specific Binding for an endpoint/relation. @@ -1068,6 +1079,7 @@ def _network_get(self, name: str, relation_id: Optional[int] = None) -> 'Network return Network(self._backend.network_get(name, relation_id)) @property + # FIXME check def network(self) -> 'Network': """The network information for this binding.""" if self._network is None: @@ -1782,6 +1794,7 @@ class RelationData(Mapping[Union['Unit', 'Application'], 'RelationDataContent']) :attr:`Relation.data` """ + # FIXME check def __init__(self, relation: Relation, our_unit: Unit, backend: '_ModelBackend'): self.relation = weakref.proxy(relation) self._data: Dict[Union[Unit, Application], RelationDataContent] = { @@ -1833,6 +1846,7 @@ def _hook_is_running(self) -> bool: # unrestricted, allowing test code to read/write databags at will. return bool(self._backend._hook_is_running) + # FIXME instrument this or LazyMapping? def _load(self) -> '_RelationDataContent_Raw': """Load the data from the current entity / relation.""" try: @@ -2187,6 +2201,7 @@ class Pod: def __init__(self, backend: '_ModelBackend'): self._backend = backend + @tracer.start_as_current_span("ops.Pod.set_spec") # type: ignore def set_spec(self, spec: 'K8sSpec', k8s_resources: Optional['K8sSpec'] = None): """Set the specification for pods that Juju should start in kubernetes. @@ -2219,6 +2234,8 @@ def __len__(self): def __iter__(self): return iter(self._storage_map) + # FIXME: called by bass class for .get() too + @tracer.start_as_current_span("ops.StorageMapping[]") # type: ignore def __getitem__(self, storage_name: str) -> List['Storage']: if storage_name not in self._storage_map: meant = ', or '.join(repr(k) for k in self._storage_map) @@ -2231,6 +2248,8 @@ def __getitem__(self, storage_name: str) -> List['Storage']: storage_list.append(storage) return storage_list + # FIXME doesn't seem used by any charm? + @tracer.start_as_current_span("ops.StorageMapping.request") # type: ignore def request(self, storage_name: str, count: int = 1): """Requests new storage instances of a given name. From cb82876cbf111d1c63e3723b10a20d97cf29c118 Mon Sep 17 00:00:00 2001 From: Dima Tisnek Date: Fri, 31 Jan 2025 20:26:51 +0900 Subject: [PATCH 6/6] add ops.Resources --- ops/model.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ops/model.py b/ops/model.py index 3ad3721b8..065820354 100644 --- a/ops/model.py +++ b/ops/model.py @@ -222,6 +222,7 @@ def uuid(self) -> str: """ return self._backend.model_uuid + # FIXME maybe skip this? @tracer.start_as_current_span('ops.Model.get_unit') # type: ignore def get_unit(self, unit_name: str) -> 'Unit': """Get an arbitrary unit by name. @@ -2171,6 +2172,7 @@ def __init__(self, names: Iterable[str], backend: '_ModelBackend'): self._backend = backend self._paths: Dict[str, Optional[Path]] = {name: None for name in names} + @tracer.start_as_current_span("ops.Resources.fetch") # type: ignore def fetch(self, name: str) -> Path: """Fetch the resource from the controller or store.