From 2ac9679c5016aa7c2c9ae9f04388286e61d644f8 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Mon, 29 Apr 2024 11:03:12 +0200 Subject: [PATCH 01/13] charm logging POC --- lib/charms/loki_k8s/v0/charm_logging.py | 317 ++++++++++++++++++++++++ requirements.txt | 4 + 2 files changed, 321 insertions(+) create mode 100644 lib/charms/loki_k8s/v0/charm_logging.py diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py new file mode 100644 index 000000000..04f180879 --- /dev/null +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""This charm library contains utilities to automatically forward your charm logs to a loki-push-api endpoint. + +(yes! charm code, not workload code!) + +This means that, if your charm is related to, for example, COS' Loki charm (or a Grafana Agent), +you will be able to inspect in real time from the Grafana dashboard the logs emitted by your charm. + +To start using this library, you need to do two things: +1) decorate your charm class with + +`@log_charm(loki_push_api_endpoint="my_logging_endpoint")` + +2) add to your charm a "my_logging_endpoint" (you can name this attribute whatever you like) **property** +that returns an http/https endpoint url. If you are using the `LogProxyConsumer` as +`self.logging = LogProxyConsumer(self, ...)`, the implementation could be: + +``` + @property + def my_logging_endpoint(self) -> List[str]: + '''Loki push API endpoints for charm logging''' + return self.logging.loki_endpoints: +``` + +The ``log_charm`` decorator will take these endpoints and set up the root logger (as in python's +logging module root logger) to forward all logs to these loki endpoints. + +3) If you were passing a certificate using `server_cert`, you need to change it to provide an *absolute* path to +the certificate file. +""" + +import functools +import inspect +import logging +import os +from contextlib import contextmanager +from pathlib import Path +from typing import ( + Any, + Callable, + Generator, + Optional, + Sequence, + Type, + TypeVar, + Union, + cast, List, +) + +import logging_loki +from ops.charm import CharmBase +from ops.framework import Framework + + +# The unique Charmhub library identifier, never change it +LIBID = "52ee6051f4e54aedaa60aa04134d1a6d" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + +PYDEPS = ["python-logging-loki=0.3.1"] + +logger = logging.getLogger("charm_logging") + +_GetterType = Union[Callable[[CharmBase], Optional[str]], property] + +CHARM_LOGGING_ENABLED = "CHARM_LOGGING_ENABLED" + + +def is_enabled() -> bool: + """Whether charm logging is enabled.""" + return os.getenv(CHARM_LOGGING_ENABLED, "1") == "1" + + +class CharmLoggingError(Exception): + """Base class for all exceptions raised by this module.""" + + +class InvalidEndpointError(CharmLoggingError): + """Raised if an endpoint is invalid.""" + + +@contextmanager +def charm_logging_disabled(): + """Contextmanager to temporarily disable charm logging. + + For usage in tests. + """ + previous = os.getenv(CHARM_LOGGING_ENABLED, "1") + os.environ[CHARM_LOGGING_ENABLED] = "0" + yield + os.environ[CHARM_LOGGING_ENABLED] = previous + + + +_C = TypeVar("_C", bound=Type[CharmBase]) +_T = TypeVar("_T", bound=type) +_F = TypeVar("_F", bound=Type[Callable]) + + +def _get_logging_endpoints(logging_endpoints_getter, self, charm): + if isinstance(logging_endpoints_getter, property): + logging_endpoints = logging_endpoints_getter.__get__(self) + else: # method or callable + logging_endpoints = logging_endpoints_getter(self) + + if logging_endpoints is None: + logger.debug( + f"{charm}.{logging_endpoints_getter} returned None; quietly disabling " + f"charm_logging for the run." + ) + return + + errors = [] + logging_endpoints = tuple(logging_endpoints) + sanitized_logging_endponts = [] + for endpoint in logging_endpoints: + if isinstance(endpoint, str): + sanitized_logging_endponts.append(endpoint) + else: + errors.append(endpoint) + + if errors: + if sanitized_logging_endponts: + logger.error(f"{charm}.{logging_endpoints_getter} returned some invalid endpoint strings: {errors}") + else: + logger.error( + f"{charm}.{logging_endpoints_getter} should return an iterable of Loki push-api (compatible) endpoints (strings); " + f"got {errors} instead." + ) + + return sanitized_logging_endponts + + +def _get_server_cert(server_cert_getter, self, charm): + if isinstance(server_cert_getter, property): + server_cert = server_cert_getter.__get__(self) + else: # method or callable + server_cert = server_cert_getter(self) + + if server_cert is None: + logger.warning( + f"{charm}.{server_cert_getter} returned None; sending logs over INSECURE connection." + ) + return + elif not Path(server_cert).is_absolute(): + raise ValueError( + f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " + f"got {server_cert} instead." + ) + return server_cert + + +def _setup_root_logger_initializer( + charm: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType], + service_name: Optional[str] = None, +): + """Patch the charm's initializer.""" + original_init = charm.__init__ + + @functools.wraps(original_init) + def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): + original_init(self, framework, *args, **kwargs) + + if not is_enabled(): + logger.info("Charm logging DISABLED by env: skipping root logger initialization") + return + + try: + logging_endpoints = _get_logging_endpoints(logging_endpoints_getter, self, charm) + except Exception: + # if anything goes wrong with retrieving the endpoint, we go on with logging disabled. + # better than breaking the charm. + logger.exception( + f"exception retrieving the logging " + f"endpoint from {charm}.{logging_endpoints_getter}; " + f"proceeding with charm_logging DISABLED. " + ) + return + + if not logging_endpoints: + return + + juju_topology = { + "juju_unit": self.unit.name, + "juju_application": self.app.name, + "juju_model": self.model.name, + "juju_model_uuid": self.model.uuid, + "service_name" : service_name or self.app.name, + "charm_type_name" : type(self).__name__, + "dispatch_path": os.getenv("JUJU_DISPATCH_PATH", ""), + } + server_cert: Optional[Union[str, Path]] = ( + _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None + ) + + if server_cert: + # todo figure out how to use cert with this + logger.warning("server-cert authentication not implemented.") + + root_logger = logging.getLogger() + + for url in logging_endpoints: + handler = logging_loki.LokiHandler( + url=url, + tags=juju_topology, + # auth=("username", "password"), + version="1", + ) + + root_logger.addHandler(handler) + root_logger.debug( + "Initialized charm logger", + extra={"tags": {"endpoint": url}}, + ) + return + + charm.__init__ = wrap_init + + +def log_charm( + logging_endpoints: str, + server_cert: Optional[str] = None, + service_name: Optional[str] = None, +): + """Set up the root logger to forward any charm logs to one or more Loki push API endpoints. + + Usage: + >>> from charms.loki_k8s.v0.charm_logging import log_charm + >>> from charms.loki_k8s.v1.loki_push_api import LogProxyConsumer + >>> from ops import CharmBase + >>> + >>> @log_charm( + >>> logging_endpoints="loki_push_api_urls", + >>> ) + >>> class MyCharm(CharmBase): + >>> + >>> def __init__(self, framework: Framework): + >>> ... + >>> self.logging = LogProxyConsumer(self, ...) + >>> + >>> @property + >>> def loki_push_api_urls(self) -> Optional[List[str]]: + >>> return [endpoint['url'] for endpoint in self.logging.loki_endpoints] + >>> + :param server_cert: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param logging_endpoints: name of a property on the charm type that returns a sequence + of (fully resolvable) Loki push API urls. If None, charm logging will be effectively disabled. + Else, the root logger will be set up to forward all logs to those endpoints. + :param service_name: service name tag to attach to all logs generated by this charm. + Defaults to the juju application name this charm is deployed under. + """ + + def _decorator(charm_type: Type[CharmBase]): + """Autoinstrument the wrapped charmbase type.""" + _autoinstrument( + charm_type, + logging_endpoints_getter=getattr(charm_type, logging_endpoints), + server_cert_getter=getattr(charm_type, server_cert) if server_cert else None, + service_name=service_name, + ) + return charm_type + + return _decorator + + +def _autoinstrument( + charm_type: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType] = None, + service_name: Optional[str] = None, +) -> Type[CharmBase]: + """Set up logging on this charm class. + + Use this function to get out-of-the-box traces for all events emitted on this charm and all + method calls on instances of this class. + + Usage: + + >>> from charms.loki_k8s.v0.charm_logging import _autoinstrument + >>> from ops.main import main + >>> _autoinstrument( + >>> MyCharm, + >>> logging_endpoints_getter=MyCharm.get_loki_endpoints, + >>> service_name="MyCharm", + >>> ) + >>> main(MyCharm) + + :param charm_type: the CharmBase subclass to autoinstrument. + :param server_cert_getter: method or property on the charm type that returns an + optional absolute path to a tls certificate to be used when sending traces to a remote server. + If it returns None, an _insecure_ connection will be used. + :param logging_endpoints_getter: name of a property on the charm type that returns a sequence + of (fully resolvable) Loki push API urls. If None, charm logging will be effectively disabled. + Else, the root logger will be set up to forward all logs to those endpoints. + :param service_name: service name tag to attach to all logs generated by this charm. + Defaults to the juju application name this charm is deployed under. + """ + logger.info(f"instrumenting {charm_type}") + _setup_root_logger_initializer( + charm_type, + logging_endpoints_getter, + server_cert_getter=server_cert_getter, + service_name=service_name, + ) + return charm_type diff --git a/requirements.txt b/requirements.txt index cdbc96519..1d77e4a68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,7 @@ lightkube-models # Cryptography # Deps: tls_certificates cryptography + + +# deps: charm_logging +python-logging-loki==0.3.1 \ No newline at end of file From 63019fdf478f5171ffba4b0a75f3cbb6bcda4d4a Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Wed, 1 May 2024 10:05:51 +0200 Subject: [PATCH 02/13] test for logging --- lib/charms/loki_k8s/v0/charm_logging.py | 21 ++++----- requirements.txt | 7 ++- src/charm.py | 10 +++++ tests/scenario/conftest.py | 29 ++++++++++++ tests/scenario/test_charm_logging.py | 60 +++++++++++++++++++++++++ 5 files changed, 112 insertions(+), 15 deletions(-) create mode 100644 tests/scenario/conftest.py create mode 100644 tests/scenario/test_charm_logging.py diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index 04f180879..6c9d59ec4 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -33,28 +33,22 @@ def my_logging_endpoint(self) -> List[str]: """ import functools -import inspect import logging import os from contextlib import contextmanager from pathlib import Path from typing import ( - Any, Callable, - Generator, Optional, - Sequence, Type, TypeVar, Union, - cast, List, ) import logging_loki from ops.charm import CharmBase from ops.framework import Framework - # The unique Charmhub library identifier, never change it LIBID = "52ee6051f4e54aedaa60aa04134d1a6d" @@ -99,7 +93,6 @@ def charm_logging_disabled(): os.environ[CHARM_LOGGING_ENABLED] = previous - _C = TypeVar("_C", bound=Type[CharmBase]) _T = TypeVar("_T", bound=type) _F = TypeVar("_F", bound=Type[Callable]) @@ -116,7 +109,7 @@ def _get_logging_endpoints(logging_endpoints_getter, self, charm): f"{charm}.{logging_endpoints_getter} returned None; quietly disabling " f"charm_logging for the run." ) - return + return None errors = [] logging_endpoints = tuple(logging_endpoints) @@ -129,7 +122,9 @@ def _get_logging_endpoints(logging_endpoints_getter, self, charm): if errors: if sanitized_logging_endponts: - logger.error(f"{charm}.{logging_endpoints_getter} returned some invalid endpoint strings: {errors}") + logger.error( + f"{charm}.{logging_endpoints_getter} returned some invalid endpoint strings: {errors}" + ) else: logger.error( f"{charm}.{logging_endpoints_getter} should return an iterable of Loki push-api (compatible) endpoints (strings); " @@ -149,8 +144,8 @@ def _get_server_cert(server_cert_getter, self, charm): logger.warning( f"{charm}.{server_cert_getter} returned None; sending logs over INSECURE connection." ) - return - elif not Path(server_cert).is_absolute(): + return None + if not Path(server_cert).is_absolute(): raise ValueError( f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " f"got {server_cert} instead." @@ -195,8 +190,8 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): "juju_application": self.app.name, "juju_model": self.model.name, "juju_model_uuid": self.model.uuid, - "service_name" : service_name or self.app.name, - "charm_type_name" : type(self).__name__, + "service_name": service_name or self.app.name, + "charm_type_name": type(self).__name__, "dispatch_path": os.getenv("JUJU_DISPATCH_PATH", ""), } server_cert: Optional[Union[str, Path]] = ( diff --git a/requirements.txt b/requirements.txt index 1d77e4a68..e0814eebe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,9 @@ lightkube-models # Deps: tls_certificates cryptography - # deps: charm_logging -python-logging-loki==0.3.1 \ No newline at end of file +python-logging-loki==0.3.1 + +# deps: tracing, charm_tracing +pydantic +opentelemetry-exporter-otlp-proto-http==1.21.0 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 0eee2198b..1666f745a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -24,11 +24,13 @@ from urllib.error import HTTPError, URLError from urllib.parse import urlparse +import ops import yaml from charms.alertmanager_k8s.v1.alertmanager_dispatch import AlertmanagerConsumer from charms.catalogue_k8s.v1.catalogue import CatalogueConsumer, CatalogueItem from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.grafana_k8s.v0.grafana_source import GrafanaSourceProvider +from charms.loki_k8s.v0.charm_logging import log_charm from charms.loki_k8s.v0.loki_push_api import ( LokiPushApiAlertRulesChanged, LokiPushApiProvider, @@ -106,6 +108,7 @@ def to_status(tpl: Tuple[str, str]) -> StatusBase: MetricsEndpointProvider, ], ) +@log_charm(logging_endpoints="logging_endpoints", server_cert="server_cert_path") class LokiOperatorCharm(CharmBase): """Charm the service.""" @@ -717,6 +720,13 @@ def tracing_endpoint(self) -> Optional[str]: """Tempo endpoint for charm tracing.""" return self.tracing.otlp_http_endpoint() + @property + def logging_endpoints(self) -> Optional[List[str]]: + """Loki endpoint for charm logging.""" + if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: + return [self.loki_provider._endpoint(self.loki_provider._url)["url"]] + return [] + @property def server_cert_path(self) -> Optional[str]: """Server certificate path for TLS tracing.""" diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py new file mode 100644 index 000000000..964ba705b --- /dev/null +++ b/tests/scenario/conftest.py @@ -0,0 +1,29 @@ +from unittest.mock import PropertyMock, patch + +import pytest +import scenario +from charm import LokiOperatorCharm + + +def tautology(*_, **__) -> bool: + return True + + +@pytest.fixture +def loki_charm(): + with ( + patch.multiple( + "charm.KubernetesComputeResourcesPatch", + _namespace=PropertyMock("test-namespace"), + _patch=PropertyMock(tautology), + is_ready=PropertyMock(tautology), + ), + patch("socket.getfqdn", new=lambda *args: "fqdn"), + patch("lightkube.core.client.GenericSyncClient"), + ): + yield LokiOperatorCharm + + +@pytest.fixture +def context(loki_charm): + return scenario.Context(loki_charm) diff --git a/tests/scenario/test_charm_logging.py b/tests/scenario/test_charm_logging.py new file mode 100644 index 000000000..12a20565f --- /dev/null +++ b/tests/scenario/test_charm_logging.py @@ -0,0 +1,60 @@ +import logging +from unittest.mock import patch + +import ops.pebble +import pytest +import scenario + + +@pytest.fixture +def loki_handler_mock(): + with patch("logging_loki.LokiHandler.handle") as h: + yield h + + +def test_no_endpoints_on_loki_not_ready(context, loki_handler_mock): + state = scenario.State( + containers=[ + scenario.Container( + "loki", + can_connect=True, + layers={"loki": ops.pebble.Layer({"services": {"loki": {}}})}, + service_status={"loki": ops.pebble.ServiceStatus.INACTIVE}, + exec_mock={("update-ca-certificates", "--fresh"): scenario.ExecOutput()}, + ) + ] + ) + + with context.manager("update-status", state) as mgr: + charm = mgr.charm + assert charm.logging_endpoints == [] + logging.getLogger("foo").debug("bar") + + loki_handler_mock.assert_not_called() + + +def test_endpoints_on_loki_ready(context, loki_handler_mock): + state = scenario.State( + containers=[ + scenario.Container( + "loki", + can_connect=True, + layers={"loki": ops.pebble.Layer({"services": {"loki": {}}})}, + service_status={"loki": ops.pebble.ServiceStatus.ACTIVE}, + exec_mock={("update-ca-certificates", "--fresh"): scenario.ExecOutput()}, + ) + ] + ) + + with context.manager("update-status", state) as mgr: + charm = mgr.charm + assert charm.logging_endpoints == ["http://fqdn:80/loki/api/v1/push"] + logging.getLogger("foo").debug("bar") + + loki_handler_mock.assert_called() + + for call in loki_handler_mock.call_args_list: + record = call.args[0] + if record.filename == __name__ + ".py": # log emitted by this module + assert record.msg == "bar" + assert record.name == "foo" From b0d8fff0430e26275a724fe9d6d0147a1744879d Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 12:36:53 +0200 Subject: [PATCH 03/13] progress --- lib/charms/loki_k8s/v0/charm_logging.py | 171 ++++++++++++++++++++++-- requirements.txt | 3 - src/charm.py | 5 +- 3 files changed, 162 insertions(+), 17 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index 6c9d59ec4..2c4c18b72 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -31,24 +31,31 @@ def my_logging_endpoint(self) -> List[str]: 3) If you were passing a certificate using `server_cert`, you need to change it to provide an *absolute* path to the certificate file. """ - +import copy import functools import logging import os +import string +import time from contextlib import contextmanager +from logging.config import ConvertingDict from pathlib import Path from typing import ( Callable, Optional, Type, TypeVar, - Union, + Union, Tuple, Dict, Any, ) -import logging_loki +import requests from ops.charm import CharmBase from ops.framework import Framework +# prevent infinite recursion because on failure urllib3 will push more logs +# https://github.com/GreyZmeem/python-logging-loki/issues/18 +logging.getLogger('urllib3').setLevel(logging.INFO) + # The unique Charmhub library identifier, never change it LIBID = "52ee6051f4e54aedaa60aa04134d1a6d" @@ -59,15 +66,159 @@ def my_logging_endpoint(self) -> List[str]: # to 0 if you are raising the major API version LIBPATCH = 1 -PYDEPS = ["python-logging-loki=0.3.1"] +PYDEPS = [] logger = logging.getLogger("charm_logging") - _GetterType = Union[Callable[[CharmBase], Optional[str]], property] - CHARM_LOGGING_ENABLED = "CHARM_LOGGING_ENABLED" +# from https://github.com/GreyZmeem/python-logging-loki, which seems to be dead +class LokiEmitter: + """Base Loki emitter class.""" + + #: Success HTTP status code from Loki API. + success_response_code: int = 204 + + #: Label name indicating logging level. + level_tag: str = "severity" + #: Label name indicating logger name. + logger_tag: str = "logger" + + #: String contains chars that can be used in label names in LogQL. + label_allowed_chars: str = "".join((string.ascii_letters, string.digits, "_")) + #: A list of pairs of characters to replace in the label name. + label_replace_with: Tuple[Tuple[str, str], ...] = ( + ("'", ""), + ('"', ""), + (" ", "_"), + (".", "_"), + ("-", "_"), + ) + + def __init__(self, url: str, tags: Optional[dict] = None, cert: Optional[str] = None): + """ + Create new Loki emitter. + + Arguments: + url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). + tags: Default tags added to every log record. + auth: Optional tuple with username and password for basic HTTP authentication. + + """ + #: Tags that will be added to all records handled by this handler. + self.tags = tags or {} + #: Loki JSON push endpoint (e.g `http://127.0.0.1/loki/api/v1/push`) + self.url = url + #: Optional cert for TLS auth + self.cert = cert + + self._session: Optional[requests.Session] = None + + def __call__(self, record: logging.LogRecord, line: str): + """Send log record to Loki.""" + payload = self.build_payload(record, line) + resp = self.session.post(self.url, json=payload, timeout=5) + if resp.status_code != self.success_response_code: + raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code)) + + def build_payload(self, record: logging.LogRecord, line) -> dict: + """Build JSON payload with a log entry.""" + labels = self.build_tags(record) + ns = 1e9 + ts = str(int(time.time() * ns)) + stream = { + "stream": labels, + "values": [[ts, line]], + } + return {"streams": [stream]} + + @property + def session(self) -> requests.Session: + """Create HTTP session.""" + if self._session is None: + self._session = requests.Session() + self._session.cert = self.cert or None + return self._session + + def close(self): + """Close HTTP session.""" + if self._session is not None: + self._session.close() + self._session = None + + @functools.lru_cache(256) + def format_label(self, label: str) -> str: + """ + Build label to match prometheus format. + + `Label format `_ + """ + for char_from, char_to in self.label_replace_with: + label = label.replace(char_from, char_to) + return "".join(char for char in label if char in self.label_allowed_chars) + + def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: + """Return tags that must be send to Loki with a log record.""" + tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags + tags = copy.deepcopy(tags) + tags[self.level_tag] = record.levelname.lower() + tags[self.logger_tag] = record.name + + extra_tags = getattr(record, "tags", {}) + if not isinstance(extra_tags, dict): + return tags + + for tag_name, tag_value in extra_tags.items(): + cleared_name = self.format_label(tag_name) + if cleared_name: + tags[cleared_name] = tag_value + + return tags + + +class LokiHandler(logging.Handler): + """ + Log handler that sends log records to Loki. + + `Loki API `_ + """ + + def __init__( + self, + url: str, + tags: Optional[dict] = None, + # username, password tuple + cert: Optional[str] = None, + ): + """ + Create new Loki logging handler. + + Arguments: + url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). + tags: Default tags added to every log record. + + # FIXME: Session expects a .pem file it says + cert: Optional absolute path to cert file for TLS auth. + + """ + super().__init__() + self.emitter = LokiEmitter(url, tags, cert) + + def handleError(self, record): # noqa: N802 + """Close emitter and let default handler take actions on error.""" + self.emitter.close() + super().handleError(record) + + def emit(self, record: logging.LogRecord): + """Send log record to Loki.""" + # noinspection PyBroadException + try: + self.emitter(record, self.format(record)) + except Exception: + self.handleError(record) + + def is_enabled() -> bool: """Whether charm logging is enabled.""" return os.getenv(CHARM_LOGGING_ENABLED, "1") == "1" @@ -198,18 +349,14 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None ) - if server_cert: - # todo figure out how to use cert with this - logger.warning("server-cert authentication not implemented.") - root_logger = logging.getLogger() for url in logging_endpoints: - handler = logging_loki.LokiHandler( + handler = LokiHandler( url=url, tags=juju_topology, + cert=server_cert # auth=("username", "password"), - version="1", ) root_logger.addHandler(handler) diff --git a/requirements.txt b/requirements.txt index e0814eebe..af719587a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,9 +13,6 @@ lightkube-models # Deps: tls_certificates cryptography -# deps: charm_logging -python-logging-loki==0.3.1 - # deps: tracing, charm_tracing pydantic opentelemetry-exporter-otlp-proto-http==1.21.0 \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 1666f745a..52a5d6a52 100755 --- a/src/charm.py +++ b/src/charm.py @@ -540,6 +540,7 @@ def _update_cert(self): ) # Repeat for the charm container. We need it there for loki client requests. + # (and charm tracing and logging TLS) ca_cert_path.parent.mkdir(exist_ok=True, parents=True) ca_cert_path.write_text(self.server_cert.ca_cert) # pyright: ignore else: @@ -724,13 +725,13 @@ def tracing_endpoint(self) -> Optional[str]: def logging_endpoints(self) -> Optional[List[str]]: """Loki endpoint for charm logging.""" if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: - return [self.loki_provider._endpoint(self.loki_provider._url)["url"]] + return [self._internal_url] return [] @property def server_cert_path(self) -> Optional[str]: """Server certificate path for TLS tracing.""" - return CERT_FILE + return self._ca_cert_path if __name__ == "__main__": From 283a6e1eba9b777f960ca36766604af440090fa7 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 13:06:33 +0200 Subject: [PATCH 04/13] tls working --- lib/charms/loki_k8s/v0/charm_logging.py | 8 +++++--- src/charm.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index 2c4c18b72..b808f4c54 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -135,10 +135,12 @@ def build_payload(self, record: logging.LogRecord, line) -> dict: @property def session(self) -> requests.Session: - """Create HTTP session.""" + """Create HTTP(s) session.""" if self._session is None: self._session = requests.Session() - self._session.cert = self.cert or None + # very unclear why we don't need to use 'Session.cert' for this, but... + # See: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification + self._session.verify = self.cert or None return self._session def close(self): @@ -184,7 +186,7 @@ class LokiHandler(logging.Handler): `Loki API `_ """ - def __init__( + def __init__( self, url: str, tags: Optional[dict] = None, diff --git a/src/charm.py b/src/charm.py index 52a5d6a52..d10fc45c3 100755 --- a/src/charm.py +++ b/src/charm.py @@ -725,7 +725,7 @@ def tracing_endpoint(self) -> Optional[str]: def logging_endpoints(self) -> Optional[List[str]]: """Loki endpoint for charm logging.""" if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: - return [self._internal_url] + return [self._internal_url + "/loki/api/v1/push"] return [] @property From 883da53e953df4825c4dfc77fb0149daf77f295f Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 13:29:58 +0200 Subject: [PATCH 05/13] grafana source works with internal url --- src/charm.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/charm.py b/src/charm.py index d10fc45c3..e3c10f768 100755 --- a/src/charm.py +++ b/src/charm.py @@ -185,7 +185,7 @@ def __init__(self, *args): self.server_cert.on.cert_changed, ], source_type="loki", - source_url=self._external_url, + source_url=self.internal_url, ) self.metrics_provider = MetricsEndpointProvider( @@ -372,6 +372,11 @@ def hostname(self) -> str: """Unit's hostname.""" return socket.getfqdn() + @property + def internal_url(self): + scheme = "https" if self.server_cert.server_cert else "http" + return f"{scheme}://{self.hostname}:{self._port}" + @property def _external_url(self) -> str: """Return the external hostname to be passed to ingress via the relation.""" @@ -385,8 +390,7 @@ def _external_url(self) -> str: # are routable virtually exclusively inside the cluster (as they rely) # on the cluster's DNS service, while the ip address is _sometimes_ # routable from the outside, e.g., when deploying on MicroK8s on Linux. - scheme = "https" if self.server_cert.server_cert else "http" - return f"{scheme}://{self.hostname}:{self._port}" + return self.internal_url @property def scrape_jobs(self) -> List[Dict[str, Any]]: @@ -501,7 +505,8 @@ def _configure(self): # noqa: C901 scheme="https" if self._certs_on_disk else "http", port=self._port ) self.metrics_provider.update_scrape_job_spec(self.scrape_jobs) - self.grafana_source_provider.update_source(source_url=self._external_url) + # self.grafana_source_provider.update_source(source_url=self._external_url) + # self.grafana_source_provider.update_source(source_url=self.hostname) self.loki_provider.update_endpoint(url=self._external_url) self.catalogue.update_item(item=self._catalogue_item) From 233fab4d423bb9efc70dff234fc53342e98e408e Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 13:30:41 +0200 Subject: [PATCH 06/13] fmt --- lib/charms/loki_k8s/v0/charm_logging.py | 29 +++++++++++++------------ src/charm.py | 1 + 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index b808f4c54..03a930596 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -41,11 +41,14 @@ def my_logging_endpoint(self) -> List[str]: from logging.config import ConvertingDict from pathlib import Path from typing import ( + Any, Callable, + Dict, Optional, + Tuple, Type, TypeVar, - Union, Tuple, Dict, Any, + Union, ) import requests @@ -54,7 +57,7 @@ def my_logging_endpoint(self) -> List[str]: # prevent infinite recursion because on failure urllib3 will push more logs # https://github.com/GreyZmeem/python-logging-loki/issues/18 -logging.getLogger('urllib3').setLevel(logging.INFO) +logging.getLogger("urllib3").setLevel(logging.INFO) # The unique Charmhub library identifier, never change it LIBID = "52ee6051f4e54aedaa60aa04134d1a6d" @@ -97,13 +100,12 @@ class LokiEmitter: ) def __init__(self, url: str, tags: Optional[dict] = None, cert: Optional[str] = None): - """ - Create new Loki emitter. + """Create new Loki emitter. Arguments: url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). tags: Default tags added to every log record. - auth: Optional tuple with username and password for basic HTTP authentication. + cert: Absolute path to a ca cert for TLS authentication. """ #: Tags that will be added to all records handled by this handler. @@ -120,7 +122,9 @@ def __call__(self, record: logging.LogRecord, line: str): payload = self.build_payload(record, line) resp = self.session.post(self.url, json=payload, timeout=5) if resp.status_code != self.success_response_code: - raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code)) + raise ValueError( + "Unexpected Loki API response status code: {0}".format(resp.status_code) + ) def build_payload(self, record: logging.LogRecord, line) -> dict: """Build JSON payload with a log entry.""" @@ -151,8 +155,7 @@ def close(self): @functools.lru_cache(256) def format_label(self, label: str) -> str: - """ - Build label to match prometheus format. + """Build label to match prometheus format. `Label format `_ """ @@ -180,21 +183,19 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: class LokiHandler(logging.Handler): - """ - Log handler that sends log records to Loki. + """Log handler that sends log records to Loki. `Loki API `_ """ - def __init__( + def __init__( self, url: str, tags: Optional[dict] = None, # username, password tuple cert: Optional[str] = None, ): - """ - Create new Loki logging handler. + """Create new Loki logging handler. Arguments: url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). @@ -357,7 +358,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): handler = LokiHandler( url=url, tags=juju_topology, - cert=server_cert + cert=server_cert, # auth=("username", "password"), ) diff --git a/src/charm.py b/src/charm.py index e3c10f768..3fc673b5c 100755 --- a/src/charm.py +++ b/src/charm.py @@ -374,6 +374,7 @@ def hostname(self) -> str: @property def internal_url(self): + """Fqdn plus appropriate scheme and server port.""" scheme = "https" if self.server_cert.server_cert else "http" return f"{scheme}://{self.hostname}:{self._port}" From 7e1b76813c42780e13570e61942d11cb028a1897 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 13:35:29 +0200 Subject: [PATCH 07/13] static fix --- lib/charms/loki_k8s/v0/charm_logging.py | 8 ++++---- requirements.txt | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index 03a930596..2ab6c9221 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -48,7 +48,7 @@ def my_logging_endpoint(self) -> List[str]: Tuple, Type, TypeVar, - Union, + Union, cast, ) import requests @@ -166,7 +166,7 @@ def format_label(self, label: str) -> str: def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: """Return tags that must be send to Loki with a log record.""" tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags - tags = copy.deepcopy(tags) + tags = cast(Dict[str, Any], copy.deepcopy(tags)) tags[self.level_tag] = record.levelname.lower() tags[self.logger_tag] = record.name @@ -185,7 +185,7 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: class LokiHandler(logging.Handler): """Log handler that sends log records to Loki. - `Loki API `_ + `Loki API ter/docs/api.md>` """ def __init__( @@ -358,7 +358,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): handler = LokiHandler( url=url, tags=juju_topology, - cert=server_cert, + cert=str(server_cert) if server_cert else None, # auth=("username", "password"), ) diff --git a/requirements.txt b/requirements.txt index af719587a..99c9381cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,7 @@ cryptography # deps: tracing, charm_tracing pydantic -opentelemetry-exporter-otlp-proto-http==1.21.0 \ No newline at end of file +opentelemetry-exporter-otlp-proto-http==1.21.0 + +# deps: charm_logging +requests \ No newline at end of file From 585d475192bda7c3faa62c51ad3771f31f06a56e Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 13:38:17 +0200 Subject: [PATCH 08/13] fmt --- lib/charms/loki_k8s/v0/charm_logging.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index 2ab6c9221..c61c03157 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -48,7 +48,8 @@ def my_logging_endpoint(self) -> List[str]: Tuple, Type, TypeVar, - Union, cast, + Union, + cast, ) import requests From f25e367416a624ae1aa6f4fc5088680c69addfc2 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 15:43:14 +0200 Subject: [PATCH 09/13] fixed itest --- tests/integration/test_log_proxy_send_logs.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_log_proxy_send_logs.py b/tests/integration/test_log_proxy_send_logs.py index 51d49ead3..c3be574b1 100644 --- a/tests/integration/test_log_proxy_send_logs.py +++ b/tests/integration/test_log_proxy_send_logs.py @@ -62,13 +62,20 @@ async def test_check_both_containers_send_logs(ops_test, loki_charm, log_proxy_t series = await loki_endpoint_request(ops_test, loki_app_name, "loki/api/v1/series", 0) data_series = json.loads(series)["data"] - assert len(data_series) == 3 + found = 0 for data in data_series: - assert data["container"] in ["workload-a", "workload-b"] - assert data["juju_application"] == tester_app_name - assert data["filename"] in [ - "/tmp/worload-a-1.log", - "/tmp/worload-a-2.log", - "/tmp/worload-b.log", - ] + if ( + data["container"] in ["workload-a", "workload-b"] + and data["juju_application"] == tester_app_name + and data["filename"] + in [ + "/tmp/worload-a-1.log", + "/tmp/worload-a-2.log", + "/tmp/worload-b.log", + ] + ): + found += 1 + + # there might be more data series (charm logging). + assert found == 3 From 0108e0a03b68570b773d0c3fb64fb97e5dc8e1b1 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Thu, 2 May 2024 17:05:05 +0200 Subject: [PATCH 10/13] rolled back grafana datasource fix --- src/charm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/charm.py b/src/charm.py index 3fc673b5c..9fe353480 100755 --- a/src/charm.py +++ b/src/charm.py @@ -185,7 +185,7 @@ def __init__(self, *args): self.server_cert.on.cert_changed, ], source_type="loki", - source_url=self.internal_url, + source_url=self._external_url, ) self.metrics_provider = MetricsEndpointProvider( From d9bb7dac2f0ae058845562491f492812a21c100e Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 21 May 2024 15:30:02 +0200 Subject: [PATCH 11/13] pr comments --- lib/charms/loki_k8s/v0/charm_logging.py | 47 +++++++++++++++---------- src/charm.py | 5 +-- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index c61c03157..ab158d90f 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -28,8 +28,22 @@ def my_logging_endpoint(self) -> List[str]: The ``log_charm`` decorator will take these endpoints and set up the root logger (as in python's logging module root logger) to forward all logs to these loki endpoints. -3) If you were passing a certificate using `server_cert`, you need to change it to provide an *absolute* path to -the certificate file. +## TLS support +If your charm integrates with a tls provider which is also trusted by Loki, you can configure TLS by +passing to charm logging a `server_cert` parameter. + +``` +@log_charm(loki_push_api_endpoint="my_logging_endpoint", server_cert="my_server_cert") +class MyCharm(...): + ... + + @property + def my_server_cert(self) -> Optional[str]: + '''Absolute path to a server crt if TLS is enabled.''' + if self.tls_is_enabled(): + return "/path/to/my/server_cert.crt" +``` + """ import copy import functools @@ -53,6 +67,7 @@ def my_logging_endpoint(self) -> List[str]: ) import requests +from cosl import JujuTopology from ops.charm import CharmBase from ops.framework import Framework @@ -70,7 +85,7 @@ def my_logging_endpoint(self) -> List[str]: # to 0 if you are raising the major API version LIBPATCH = 1 -PYDEPS = [] +PYDEPS = ["cosl"] logger = logging.getLogger("charm_logging") _GetterType = Union[Callable[[CharmBase], Optional[str]], property] @@ -273,18 +288,14 @@ def _get_logging_endpoints(logging_endpoints_getter, self, charm): if isinstance(endpoint, str): sanitized_logging_endponts.append(endpoint) else: - errors.append(endpoint) + errors.append(f"invalid endpoint: expected string, got {endpoint!r}") if errors: - if sanitized_logging_endponts: - logger.error( - f"{charm}.{logging_endpoints_getter} returned some invalid endpoint strings: {errors}" - ) - else: - logger.error( - f"{charm}.{logging_endpoints_getter} should return an iterable of Loki push-api (compatible) endpoints (strings); " - f"got {errors} instead." - ) + logger.error( + f"{charm}.{logging_endpoints_getter} should return an iterable of Loki push-api " + "(-compatible) endpoints (strings); " + f"ERRORS: {errors}" + ) return sanitized_logging_endponts @@ -340,11 +351,9 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): if not logging_endpoints: return - juju_topology = { - "juju_unit": self.unit.name, - "juju_application": self.app.name, - "juju_model": self.model.name, - "juju_model_uuid": self.model.uuid, + juju_topology = JujuTopology.from_charm(self) + tags = { + **juju_topology.as_dict(), "service_name": service_name or self.app.name, "charm_type_name": type(self).__name__, "dispatch_path": os.getenv("JUJU_DISPATCH_PATH", ""), @@ -358,7 +367,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): for url in logging_endpoints: handler = LokiHandler( url=url, - tags=juju_topology, + tags=tags, cert=str(server_cert) if server_cert else None, # auth=("username", "password"), ) diff --git a/src/charm.py b/src/charm.py index 9fe353480..6bf73eca7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -115,6 +115,7 @@ class LokiOperatorCharm(CharmBase): _stored = StoredState() _port = HTTP_LISTEN_PORT _name = "loki" + _loki_push_api_endpoint = "/loki/api/v1/push" _ca_cert_path = "/usr/local/share/ca-certificates/cos-ca.crt" def __init__(self, *args): @@ -206,7 +207,7 @@ def __init__(self, *args): address=external_url.hostname or self.hostname, port=external_url.port or 443 if self._certs_on_disk else 80, scheme=external_url.scheme, - path=f"{external_url.path}/loki/api/v1/push", + path=f"{external_url.path}{self._loki_push_api_endpoint}", ) self.dashboard_provider = GrafanaDashboardProvider(self) @@ -731,7 +732,7 @@ def tracing_endpoint(self) -> Optional[str]: def logging_endpoints(self) -> Optional[List[str]]: """Loki endpoint for charm logging.""" if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: - return [self._internal_url + "/loki/api/v1/push"] + return [self._internal_url + self._loki_push_api_endpoint] return [] @property From c89e89b4c53eb51da1fe7c3c1daa1cc97ce5759a Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 28 May 2024 11:26:14 +0200 Subject: [PATCH 12/13] tested self-logging --- lib/charms/loki_k8s/v0/charm_logging.py | 88 ++-- lib/charms/loki_k8s/v1/loki_push_api.py | 15 +- .../observability_libs/v1/cert_handler.py | 443 ++++++++++++------ .../prometheus_k8s/v0/prometheus_scrape.py | 6 +- lib/charms/tempo_k8s/v1/charm_tracing.py | 7 +- lib/charms/tempo_k8s/v1/tracing.py | 4 +- .../v3/tls_certificates.py | 173 +++++-- requirements.txt | 3 - src/charm.py | 7 +- 9 files changed, 508 insertions(+), 238 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index ab158d90f..dffd69ba4 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -47,10 +47,12 @@ def my_server_cert(self) -> Optional[str]: """ import copy import functools +import json import logging import os import string import time +import urllib.error from contextlib import contextmanager from logging.config import ConvertingDict from pathlib import Path @@ -65,8 +67,8 @@ def my_server_cert(self) -> Optional[str]: Union, cast, ) +from urllib import request, parse -import requests from cosl import JujuTopology from ops.charm import CharmBase from ops.framework import Framework @@ -131,13 +133,24 @@ def __init__(self, url: str, tags: Optional[dict] = None, cert: Optional[str] = #: Optional cert for TLS auth self.cert = cert - self._session: Optional[requests.Session] = None - def __call__(self, record: logging.LogRecord, line: str): """Send log record to Loki.""" payload = self.build_payload(record, line) - resp = self.session.post(self.url, json=payload, timeout=5) - if resp.status_code != self.success_response_code: + req = request.Request(self.url, method='POST') + req.add_header('Content-Type', 'application/json; charset=utf-8') + jsondata_encoded = json.dumps(payload).encode("utf-8") + + try: + resp = request.urlopen( + req, + jsondata_encoded, + capath=self.cert + ) + except urllib.error.HTTPError as e: + logger.error(f"error pushing logs to {self.url}: {e.status, e.reason}") + return + + if resp.getcode() != self.success_response_code: raise ValueError( "Unexpected Loki API response status code: {0}".format(resp.status_code) ) @@ -153,22 +166,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict: } return {"streams": [stream]} - @property - def session(self) -> requests.Session: - """Create HTTP(s) session.""" - if self._session is None: - self._session = requests.Session() - # very unclear why we don't need to use 'Session.cert' for this, but... - # See: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification - self._session.verify = self.cert or None - return self._session - - def close(self): - """Close HTTP session.""" - if self._session is not None: - self._session.close() - self._session = None - @functools.lru_cache(256) def format_label(self, label: str) -> str: """Build label to match prometheus format. @@ -205,11 +202,11 @@ class LokiHandler(logging.Handler): """ def __init__( - self, - url: str, - tags: Optional[dict] = None, - # username, password tuple - cert: Optional[str] = None, + self, + url: str, + tags: Optional[dict] = None, + # username, password tuple + cert: Optional[str] = None, ): """Create new Loki logging handler. @@ -224,11 +221,6 @@ def __init__( super().__init__() self.emitter = LokiEmitter(url, tags, cert) - def handleError(self, record): # noqa: N802 - """Close emitter and let default handler take actions on error.""" - self.emitter.close() - super().handleError(record) - def emit(self, record: logging.LogRecord): """Send log record to Loki.""" # noinspection PyBroadException @@ -311,21 +303,27 @@ def _get_server_cert(server_cert_getter, self, charm): f"{charm}.{server_cert_getter} returned None; sending logs over INSECURE connection." ) return None + if not Path(server_cert).is_absolute(): raise ValueError( f"{charm}.{server_cert_getter} should return a valid tls cert absolute path (string | Path)); " f"got {server_cert} instead." ) + + if not Path(server_cert).exists(): + logger.warning(f"cert not found at {server_cert}: sending logs over INSECURE connection") + return None + return server_cert def _setup_root_logger_initializer( - charm: Type[CharmBase], - logging_endpoints_getter: _GetterType, - server_cert_getter: Optional[_GetterType], - service_name: Optional[str] = None, + charm: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType], + service_name: Optional[str] = None, ): - """Patch the charm's initializer.""" + """Patch the charm's initializer and inject a call to set up root logging.""" original_init = charm.__init__ @functools.wraps(original_init) @@ -383,9 +381,9 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): def log_charm( - logging_endpoints: str, - server_cert: Optional[str] = None, - service_name: Optional[str] = None, + logging_endpoints: str, + server_cert: Optional[str] = None, + service_name: Optional[str] = None, ): """Set up the root logger to forward any charm logs to one or more Loki push API endpoints. @@ -431,15 +429,15 @@ def _decorator(charm_type: Type[CharmBase]): def _autoinstrument( - charm_type: Type[CharmBase], - logging_endpoints_getter: _GetterType, - server_cert_getter: Optional[_GetterType] = None, - service_name: Optional[str] = None, + charm_type: Type[CharmBase], + logging_endpoints_getter: _GetterType, + server_cert_getter: Optional[_GetterType] = None, + service_name: Optional[str] = None, ) -> Type[CharmBase]: """Set up logging on this charm class. - Use this function to get out-of-the-box traces for all events emitted on this charm and all - method calls on instances of this class. + Use this function to setup automatic log forwarding for all logs emitted throughout executions of + this charm. Usage: diff --git a/lib/charms/loki_k8s/v1/loki_push_api.py b/lib/charms/loki_k8s/v1/loki_push_api.py index 1aa092edc..4b3bcfee8 100644 --- a/lib/charms/loki_k8s/v1/loki_push_api.py +++ b/lib/charms/loki_k8s/v1/loki_push_api.py @@ -519,7 +519,7 @@ def _alert_rules_error(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 9 +LIBPATCH = 10 PYDEPS = ["cosl"] @@ -535,17 +535,20 @@ def _alert_rules_error(self, event): # update all sha256 sums in PROMTAIL_BINARIES. To support a new architecture # you only need to add a new key value pair for the architecture in PROMTAIL_BINARIES. PROMTAIL_VERSION = "v2.9.7" +PROMTAIL_ARM_BINARY = { + "filename": "promtail-static-arm64", + "zipsha": "c083fdb45e5c794103f974eeb426489b4142438d9e10d0ae272b2aff886e249b", + "binsha": "4cd055c477a301c0bdfdbcea514e6e93f6df5d57425ce10ffc77f3e16fec1ddf", +} + PROMTAIL_BINARIES = { "amd64": { "filename": "promtail-static-amd64", "zipsha": "6873cbdabf23062aeefed6de5f00ff382710332af3ab90a48c253ea17e08f465", "binsha": "28da9b99f81296fe297831f3bc9d92aea43b4a92826b8ff04ba433b8cb92fb50", }, - "arm64": { - "filename": "promtail-static-arm64", - "zipsha": "c083fdb45e5c794103f974eeb426489b4142438d9e10d0ae272b2aff886e249b", - "binsha": "4cd055c477a301c0bdfdbcea514e6e93f6df5d57425ce10ffc77f3e16fec1ddf", - }, + "arm64": PROMTAIL_ARM_BINARY, + "aarch64": PROMTAIL_ARM_BINARY, } # Paths in `charm` container diff --git a/lib/charms/observability_libs/v1/cert_handler.py b/lib/charms/observability_libs/v1/cert_handler.py index 79458e096..c482662f0 100644 --- a/lib/charms/observability_libs/v1/cert_handler.py +++ b/lib/charms/observability_libs/v1/cert_handler.py @@ -31,10 +31,12 @@ Since this library uses [Juju Secrets](https://juju.is/docs/juju/secret) it requires Juju >= 3.0.3. """ +import abc import ipaddress +import json import socket from itertools import filterfalse -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union try: from charms.tls_certificates_interface.v3.tls_certificates import ( # type: ignore @@ -42,13 +44,14 @@ CertificateAvailableEvent, CertificateExpiringEvent, CertificateInvalidatedEvent, + ProviderCertificate, TLSCertificatesRequiresV3, generate_csr, generate_private_key, ) except ImportError as e: raise ImportError( - "failed to import charms.tls_certificates_interface.v2.tls_certificates; " + "failed to import charms.tls_certificates_interface.v3.tls_certificates; " "Either the library itself is missing (please get it through charmcraft fetch-lib) " "or one of its dependencies is unmet." ) from e @@ -58,14 +61,15 @@ from ops.charm import CharmBase, RelationBrokenEvent from ops.framework import EventBase, EventSource, Object, ObjectEvents from ops.jujuversion import JujuVersion -from ops.model import SecretNotFoundError +from ops.model import Relation, Secret, SecretNotFoundError logger = logging.getLogger(__name__) - LIBID = "b5cd5cd580f3428fa5f59a8876dcbe6a" LIBAPI = 1 -LIBPATCH = 5 +LIBPATCH = 8 + +VAULT_SECRET_LABEL = "cert-handler-private-vault" def is_ip_address(value: str) -> bool: @@ -87,6 +91,178 @@ class CertHandlerEvents(ObjectEvents): cert_changed = EventSource(CertChanged) +class _VaultBackend(abc.ABC): + """Base class for a single secret manager. + + Assumptions: + - A single secret (label) is managed by a single instance. + - Secret is per-unit (not per-app, i.e. may differ from unit to unit). + """ + + def store(self, contents: Dict[str, str], clear: bool = False): ... + + def get_value(self, key: str) -> Optional[str]: ... + + def retrieve(self) -> Dict[str, str]: ... + + def clear(self): ... + + +class _RelationVaultBackend(_VaultBackend): + """Relation backend for Vault. + + Use it to store data in a relation databag. + Assumes that a single relation exists and its data is readable. + If not, it will raise RuntimeErrors as soon as you try to read/write. + It will store the data, in plaintext (json-dumped) nested under a configurable + key in the **unit databag** of this relation. + + Typically, you'll use this with peer relations. + + Note: it is assumed that this object has exclusive access to the data, even though in practice it does not. + Modifying relation data yourself would go unnoticed and disrupt consistency. + """ + + _NEST_UNDER = "lib.charms.observability_libs.v1.cert_handler::vault" + # This key needs to be relation-unique. If someone ever creates multiple Vault(_RelationVaultBackend) + # instances backed by the same (peer) relation, they'll need to set different _NEST_UNDERs + # for each _RelationVaultBackend instance or they'll be fighting over it. + + def __init__(self, charm: CharmBase, relation_name: str): + self.charm = charm + self.relation_name = relation_name + + def _check_ready(self): + relation = self.charm.model.get_relation(self.relation_name) + if not relation or not relation.data: + # if something goes wrong here, the peer-backed vault is not ready to operate + # it can be because you are trying to use it too soon, i.e. before the (peer) + # relation has been created (or has joined). + raise RuntimeError("Relation backend not ready.") + + @property + def _relation(self) -> Optional[Relation]: + self._check_ready() + return self.charm.model.get_relation(self.relation_name) + + @property + def _databag(self): + self._check_ready() + # _check_ready verifies that there is a relation + return self._relation.data[self.charm.unit] # type: ignore + + def _read(self) -> Dict[str, str]: + value = self._databag.get(self._NEST_UNDER) + if value: + return json.loads(value) + return {} + + def _write(self, value: Dict[str, str]): + if not all(isinstance(x, str) for x in value.values()): + # the caller has to take care of encoding + raise TypeError("You can only store strings in Vault.") + + self._databag[self._NEST_UNDER] = json.dumps(value) + + def store(self, contents: Dict[str, str], clear: bool = False): + """Create a new revision by updating the previous one with ``contents``.""" + current = self._read() + + if clear: + current.clear() + + current.update(contents) + self._write(current) + + def get_value(self, key: str) -> Optional[str]: + """Like retrieve, but single-value.""" + return self._read().get(key) + + def retrieve(self): + """Return the full vault content.""" + return self._read() + + def clear(self): + del self._databag[self._NEST_UNDER] + + +class _SecretVaultBackend(_VaultBackend): + """Relation backend for Vault. + + Use it to store data in a Juju secret. + Assumes that Juju supports secrets. + If not, it will raise some exception as soon as you try to read/write. + + Note: it is assumed that this object has exclusive access to the data, even though in practice it does not. + Modifying secret's data yourself would go unnoticed and disrupt consistency. + """ + + _uninitialized_key = "uninitialized-secret-key" + + def __init__(self, charm: CharmBase, secret_label: str): + self.charm = charm + self.secret_label = secret_label # needs to be charm-unique. + + @property + def _secret(self) -> Secret: + try: + # we are owners, so we don't need to grant it to ourselves + return self.charm.model.get_secret(label=self.secret_label) + except SecretNotFoundError: + # we need to set SOME contents when we're creating the secret, so we do it. + return self.charm.unit.add_secret( + {self._uninitialized_key: "42"}, label=self.secret_label + ) + + def store(self, contents: Dict[str, str], clear: bool = False): + """Create a new revision by updating the previous one with ``contents``.""" + secret = self._secret + current = secret.get_content(refresh=True) + + if clear: + current.clear() + elif current.get(self._uninitialized_key): + # is this the first revision? clean up the mock contents we created instants ago. + del current[self._uninitialized_key] + + current.update(contents) + secret.set_content(current) + + def get_value(self, key: str) -> Optional[str]: + """Like retrieve, but single-value.""" + return self._secret.get_content(refresh=True).get(key) + + def retrieve(self): + """Return the full vault content.""" + return self._secret.get_content(refresh=True) + + def clear(self): + self._secret.remove_all_revisions() + + +class Vault: + """Simple application secret wrapper for local usage.""" + + def __init__(self, backend: _VaultBackend): + self._backend = backend + + def store(self, contents: Dict[str, str], clear: bool = False): + """Store these contents in the vault overriding whatever is there.""" + self._backend.store(contents, clear=clear) + + def get_value(self, key: str): + """Like retrieve, but single-value.""" + return self._backend.get_value(key) + + def retrieve(self) -> Dict[str, str]: + """Return the full vault content.""" + return self._backend.retrieve() + + def clear(self): + """Clear the vault.""" + self._backend.clear() + + class CertHandler(Object): """A wrapper for the requirer side of the TLS Certificates charm library.""" @@ -114,9 +290,8 @@ def __init__( sans: DNS names. If none are given, use FQDN. """ super().__init__(charm, key) - self._check_juju_supports_secrets() - self.charm = charm + # We need to sanitize the unit name, otherwise route53 complains: # "urn:ietf:params:acme:error:malformed" :: Domain name contains an invalid character self.cert_subject = charm.unit.name.replace("/", "-") if not cert_subject else cert_subject @@ -126,6 +301,17 @@ def __init__( self.sans_ip = list(filter(is_ip_address, sans)) self.sans_dns = list(filterfalse(is_ip_address, sans)) + if self._check_juju_supports_secrets(): + vault_backend = _SecretVaultBackend(charm, secret_label=VAULT_SECRET_LABEL) + + # TODO: gracefully handle situations where the + # secret is gone because the admin has removed it manually + # self.framework.observe(self.charm.on.secret_remove, self._rotate_csr) + + else: + vault_backend = _RelationVaultBackend(charm, relation_name="peers") + self.vault = Vault(vault_backend) + self.certificates_relation_name = certificates_relation_name self.certificates = TLSCertificatesRequiresV3(self.charm, self.certificates_relation_name) @@ -157,6 +343,51 @@ def __init__( self.charm.on[self.certificates_relation_name].relation_broken, # pyright: ignore self._on_certificates_relation_broken, ) + self.framework.observe( + self.charm.on.upgrade_charm, # pyright: ignore + self._on_upgrade_charm, + ) + + def _on_upgrade_charm(self, _): + has_privkey = self.vault.get_value("private-key") + + self._migrate_vault() + + # If we already have a csr, but the pre-migration vault has no privkey stored, + # the csr must have been signed with a privkey that is now outdated and utterly lost. + # So we throw away the csr and generate a new one (and a new privkey along with it). + if not has_privkey and self._csr: + logger.debug("CSR and privkey out of sync after charm upgrade. Renewing CSR.") + # this will call `self.private_key` which will generate a new privkey. + self._generate_csr(renew=True) + + def _migrate_vault(self): + peer_backend = _RelationVaultBackend(self.charm, relation_name="peers") + + if self._check_juju_supports_secrets(): + # we are on recent juju + if self.vault.retrieve(): + # we already were on recent juju: nothing to migrate + logger.debug( + "Private key is already stored as a juju secret. Skipping private key migration." + ) + return + + # we used to be on old juju: our secret stuff is in peer data + if contents := peer_backend.retrieve(): + logger.debug( + "Private key found in relation data. " + "Migrating private key to a juju secret." + ) + # move over to secret-backed storage + self.vault.store(contents) + + # clear the peer storage + peer_backend.clear() + return + + # if we are downgrading, i.e. from juju with secrets to juju without, + # we have lost all that was in the secrets backend. @property def enabled(self) -> bool: @@ -185,30 +416,17 @@ def enabled(self) -> bool: return True def _on_certificates_relation_joined(self, _) -> None: - self._generate_privkey() + # this will only generate a csr if we don't have one already self._generate_csr() - def _generate_privkey(self): - # Generate priv key unless done already - # TODO figure out how to go about key rotation. - - if not (relation := self.charm.model.get_relation(self.certificates_relation_name)): - return - - if not self.private_key: - private_key = generate_private_key() - secret = self.charm.unit.add_secret({"private-key": private_key.decode()}) - secret.grant(relation) - relation.data[self.charm.unit]["private-key-secret-id"] = secret.id # pyright: ignore - def _on_config_changed(self, _): - relation = self.charm.model.get_relation(self.certificates_relation_name) - - if not relation: - return + # this will only generate a csr if we don't have one already + self._generate_csr() - self._generate_privkey() - self._generate_csr(renew=True) + @property + def relation(self): + """The "certificates" relation.""" + return self.charm.model.get_relation(self.certificates_relation_name) def _generate_csr( self, overwrite: bool = False, renew: bool = False, clear_cert: bool = False @@ -222,7 +440,7 @@ def _generate_csr( This method intentionally does not emit any events, leave it for caller's responsibility. """ # if we are in a relation-broken hook, we might not have a relation to publish the csr to. - if not self.charm.model.get_relation(self.certificates_relation_name): + if not self.relation: logger.warning( f"No {self.certificates_relation_name!r} relation found. " f"Cannot generate csr." ) @@ -231,12 +449,6 @@ def _generate_csr( # In case we already have a csr, do not overwrite it by default. if overwrite or renew or not self._csr: private_key = self.private_key - if private_key is None: - # FIXME: raise this in a less nested scope by - # generating privkey and csr in the same method. - raise RuntimeError( - "private key unset. call _generate_privkey() before you call this method." - ) csr = generate_csr( private_key=private_key.encode(), subject=self.cert_subject, @@ -258,119 +470,73 @@ def _generate_csr( ) self.certificates.request_certificate_creation(certificate_signing_request=csr) - # Note: CSR is being replaced with a new one, so until we get the new cert, we'd have - # a mismatch between the CSR and the cert. - # For some reason the csr contains a trailing '\n'. TODO figure out why - self._csr = csr.decode().strip() - if clear_cert: - try: - secret = self.model.get_secret(label="ca-certificate-chain") - secret.remove_all_revisions() - except SecretNotFoundError: - logger.debug("Secret with label: 'ca-certificate-chain' not found") + self.vault.clear() def _on_certificate_available(self, event: CertificateAvailableEvent) -> None: - """Get the certificate from the event and store it in a peer relation. - - Note: assuming "limit: 1" in metadata - """ - event_csr = ( - event.certificate_signing_request.strip() - if event.certificate_signing_request - else None - ) - if event_csr == self._csr: - content = { - "ca-cert": event.ca, - "server-cert": event.certificate, - "chain": event.chain_as_pem(), - "csr": event_csr, - } - try: - secret = self.model.get_secret(label="ca-certificate-chain") - except SecretNotFoundError: - if not ( - relation := self.charm.model.get_relation(self.certificates_relation_name) - ): - logger.error("Relation %s not found", self.certificates_relation_name) - return - - secret = self.charm.unit.add_secret(content, label="ca-certificate-chain") - secret.grant(relation) - relation.data[self.charm.unit]["secret-id"] = secret.id # pyright: ignore - self.on.cert_changed.emit() # pyright: ignore - - def _retrieve_secret_id(self, secret_id_name: str) -> Optional[str]: - if not (relation := self.charm.model.get_relation(self.certificates_relation_name)): - return None - - if not (secret_id := relation.data[self.charm.unit].get(secret_id_name)): - return None - - return secret_id - - def _retrieve_from_secret(self, value: str, secret_id_name: str) -> Optional[str]: - if not (secret_id := self._retrieve_secret_id(secret_id_name)): - return None - - if not (secret := self.model.get_secret(id=secret_id)): - return None - - content = secret.get_content() - return content.get(value) + """Emit cert-changed.""" + self.on.cert_changed.emit() # pyright: ignore @property - def private_key(self) -> Optional[str]: - """Private key.""" - return self._retrieve_from_secret("private-key", "private-key-secret-id") + def private_key(self) -> str: + """Private key. - @property - def private_key_secret_id(self) -> Optional[str]: - """ID of the Juju Secret for the Private key.""" - return self._retrieve_secret_id("private-key-secret-id") + BEWARE: if the vault misbehaves, the backing secret is removed, the peer relation dies + or whatever, we might be calling generate_private_key() again and cause a desync + with the CSR because it's going to be signed with an outdated key we have no way of retrieving. + The caller needs to ensure that if the vault backend gets reset, then so does the csr. + + TODO: we could consider adding a way to verify if the csr was signed by our privkey, + and do that on collect_unit_status as a consistency check + """ + private_key = self.vault.get_value("private-key") + if private_key is None: + private_key = generate_private_key().decode() + self.vault.store({"private-key": private_key}) + return private_key @property def _csr(self) -> Optional[str]: - return self._retrieve_from_secret("csr", "csr-secret-id") + csrs = self.certificates.get_requirer_csrs() + if not csrs: + return None - @_csr.setter - def _csr(self, value: str): - if not (relation := self.charm.model.get_relation(self.certificates_relation_name)): - return + # in principle we only ever need one cert. + # we might want to complicate this a bit once we get into cert rotations: during the rotation, we may need to + # keep the old one around for a little while. If there's multiple certs, at the moment we're + # ignoring all but the last one. + if len(csrs) > 1: + logger.warning( + "Multiple CSRs found in `certificates` relation. " + "cert_handler is not ready to expect it." + ) - if not (secret_id := relation.data[self.charm.unit].get("csr-secret-id", None)): - secret = self.charm.unit.add_secret({"csr": value}) - secret.grant(relation) - relation.data[self.charm.unit]["csr-secret-id"] = secret.id # pyright: ignore - return + return csrs[-1].csr - secret = self.model.get_secret(id=secret_id) - secret.set_content({"csr": value}) + def get_cert(self) -> Optional[ProviderCertificate]: + """Get the certificate from relation data.""" + all_certs = self.certificates.get_provider_certificates() + # search for the cert matching our csr. + matching_cert = [c for c in all_certs if c.csr == self._csr] + return matching_cert[0] if matching_cert else None @property def ca_cert(self) -> Optional[str]: """CA Certificate.""" - return self._retrieve_from_secret("ca-cert", "secret-id") - - @property - def ca_server_cert_secret_id(self) -> Optional[str]: - """CA server cert secret id.""" - return self._retrieve_secret_id("secret-id") + cert = self.get_cert() + return cert.ca if cert else None @property def server_cert(self) -> Optional[str]: """Server Certificate.""" - return self._retrieve_from_secret("server-cert", "secret-id") - - @property - def _chain(self) -> Optional[str]: - return self._retrieve_from_secret("chain", "secret-id") + cert = self.get_cert() + return cert.certificate if cert else None @property def chain(self) -> Optional[str]: - """Return the ca chain.""" - return self._chain + """Return the ca chain bundled as a single PEM string.""" + cert = self.get_cert() + return cert.chain_as_pem() if cert else None def _on_certificate_expiring( self, event: Union[CertificateExpiringEvent, CertificateInvalidatedEvent] @@ -378,6 +544,7 @@ def _on_certificate_expiring( """Generate a new CSR and request certificate renewal.""" if event.certificate == self.server_cert: self._generate_csr(renew=True) + # FIXME why are we not emitting cert_changed here? def _certificate_revoked(self, event) -> None: """Remove the certificate and generate a new CSR.""" @@ -388,13 +555,11 @@ def _certificate_revoked(self, event) -> None: def _on_certificate_invalidated(self, event: CertificateInvalidatedEvent) -> None: """Deal with certificate revocation and expiration.""" - if event.certificate != self.server_cert: - return - - # if event.reason in ("revoked", "expired"): - # Currently, the reason does not matter to us because the action is the same. - self._generate_csr(overwrite=True, clear_cert=True) - self.on.cert_changed.emit() # pyright: ignore + if event.certificate == self.server_cert: + # if event.reason in ("revoked", "expired"): + # Currently, the reason does not matter to us because the action is the same. + self._generate_csr(overwrite=True, clear_cert=True) + self.on.cert_changed.emit() # pyright: ignore def _on_all_certificates_invalidated(self, _: AllCertificatesInvalidatedEvent) -> None: # Do what you want with this information, probably remove all certificates @@ -403,18 +568,14 @@ def _on_all_certificates_invalidated(self, _: AllCertificatesInvalidatedEvent) - self.on.cert_changed.emit() # pyright: ignore def _on_certificates_relation_broken(self, _: RelationBrokenEvent) -> None: - """Clear the certificates data when removing the relation.""" - try: - secret = self.model.get_secret(label="csr-secret-id") - secret.remove_all_revisions() - except SecretNotFoundError: - logger.debug("Secret 'csr-scret-id' not found") + """Clear all secrets data when removing the relation.""" + self.vault.clear() self.on.cert_changed.emit() # pyright: ignore - def _check_juju_supports_secrets(self) -> None: + def _check_juju_supports_secrets(self) -> bool: version = JujuVersion.from_environ() - if not JujuVersion(version=str(version)).has_secrets: msg = f"Juju version {version} does not supports Secrets. Juju >= 3.0.3 is needed" - logger.error(msg) - raise RuntimeError(msg) + logger.debug(msg) + return False + return True diff --git a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py index be9676861..e3d35c6f3 100644 --- a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py +++ b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py @@ -178,7 +178,7 @@ def __init__(self, *args): - `scrape_timeout` - `proxy_url` - `relabel_configs` -- `metrics_relabel_configs` +- `metric_relabel_configs` - `sample_limit` - `label_limit` - `label_name_length_limit` @@ -362,7 +362,7 @@ def _on_scrape_targets_changed(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 46 +LIBPATCH = 47 PYDEPS = ["cosl"] @@ -377,7 +377,7 @@ def _on_scrape_targets_changed(self, event): "scrape_timeout", "proxy_url", "relabel_configs", - "metrics_relabel_configs", + "metric_relabel_configs", "sample_limit", "label_limit", "label_name_length_limit", diff --git a/lib/charms/tempo_k8s/v1/charm_tracing.py b/lib/charms/tempo_k8s/v1/charm_tracing.py index 39ebcd460..7c118856a 100644 --- a/lib/charms/tempo_k8s/v1/charm_tracing.py +++ b/lib/charms/tempo_k8s/v1/charm_tracing.py @@ -126,14 +126,15 @@ def my_tracing_endpoint(self) -> Optional[str]: from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import Span, TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import INVALID_SPAN, Tracer -from opentelemetry.trace import get_current_span as otlp_get_current_span from opentelemetry.trace import ( + INVALID_SPAN, + Tracer, get_tracer, get_tracer_provider, set_span_in_context, set_tracer_provider, ) +from opentelemetry.trace import get_current_span as otlp_get_current_span from ops.charm import CharmBase from ops.framework import Framework @@ -146,7 +147,7 @@ def my_tracing_endpoint(self) -> Optional[str]: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 6 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] diff --git a/lib/charms/tempo_k8s/v1/tracing.py b/lib/charms/tempo_k8s/v1/tracing.py index 2b09ee755..8a0385284 100644 --- a/lib/charms/tempo_k8s/v1/tracing.py +++ b/lib/charms/tempo_k8s/v1/tracing.py @@ -93,7 +93,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 6 PYDEPS = ["pydantic>=2"] @@ -361,6 +361,8 @@ def __init__( Args: charm: a `CharmBase` instance that manages this instance of the Tempo service. + host: hostname. + ingesters: list of ingester protocols that are enabled on this endpoint. relation_name: an optional string name of the relation between `charm` and the Tempo charmed service. The default is "tracing". diff --git a/lib/charms/tls_certificates_interface/v3/tls_certificates.py b/lib/charms/tls_certificates_interface/v3/tls_certificates.py index cbdd80d19..2e45475a5 100644 --- a/lib/charms/tls_certificates_interface/v3/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v3/tls_certificates.py @@ -111,6 +111,7 @@ def _on_certificate_request(self, event: CertificateCreationRequestEvent) -> Non ca=ca_certificate, chain=[ca_certificate, certificate], relation_id=event.relation_id, + recommended_expiry_notification_time=720, ) def _on_certificate_revocation_request(self, event: CertificateRevocationRequestEvent) -> None: @@ -316,7 +317,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 10 +LIBPATCH = 14 PYDEPS = ["cryptography", "jsonschema"] @@ -453,11 +454,35 @@ class ProviderCertificate: ca: str chain: List[str] revoked: bool + expiry_time: datetime + expiry_notification_time: Optional[datetime] = None def chain_as_pem(self) -> str: """Return full certificate chain as a PEM string.""" return "\n\n".join(reversed(self.chain)) + def to_json(self) -> str: + """Return the object as a JSON string. + + Returns: + str: JSON representation of the object + """ + return json.dumps( + { + "relation_id": self.relation_id, + "application_name": self.application_name, + "csr": self.csr, + "certificate": self.certificate, + "ca": self.ca, + "chain": self.chain, + "revoked": self.revoked, + "expiry_time": self.expiry_time.isoformat(), + "expiry_notification_time": self.expiry_notification_time.isoformat() + if self.expiry_notification_time + else None, + } + ) + class CertificateAvailableEvent(EventBase): """Charm Event triggered when a TLS certificate is available.""" @@ -682,21 +707,49 @@ def _get_closest_future_time( ) -def _get_certificate_expiry_time(certificate: str) -> Optional[datetime]: - """Extract expiry time from a certificate string. +def calculate_expiry_notification_time( + validity_start_time: datetime, + expiry_time: datetime, + provider_recommended_notification_time: Optional[int], + requirer_recommended_notification_time: Optional[int], +) -> datetime: + """Calculate a reasonable time to notify the user about the certificate expiry. + + It takes into account the time recommended by the provider and by the requirer. + Time recommended by the provider is preferred, + then time recommended by the requirer, + then dynamically calculated time. Args: - certificate (str): x509 certificate as a string + validity_start_time: Certificate validity time + expiry_time: Certificate expiry time + provider_recommended_notification_time: + Time in hours prior to expiry to notify the user. + Recommended by the provider. + requirer_recommended_notification_time: + Time in hours prior to expiry to notify the user. + Recommended by the requirer. Returns: - Optional[datetime]: Expiry datetime or None + datetime: Time to notify the user about the certificate expiry. """ - try: - certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) - return certificate_object.not_valid_after_utc - except ValueError: - logger.warning("Could not load certificate.") - return None + if provider_recommended_notification_time is not None: + provider_recommended_notification_time = abs(provider_recommended_notification_time) + provider_recommendation_time_delta = ( + expiry_time - timedelta(hours=provider_recommended_notification_time) + ) + if validity_start_time < provider_recommendation_time_delta: + return provider_recommendation_time_delta + + if requirer_recommended_notification_time is not None: + requirer_recommended_notification_time = abs(requirer_recommended_notification_time) + requirer_recommendation_time_delta = ( + expiry_time - timedelta(hours=requirer_recommended_notification_time) + ) + if validity_start_time < requirer_recommendation_time_delta: + return requirer_recommendation_time_delta + calculated_hours = (expiry_time - validity_start_time).total_seconds() / (3600 * 3) + return expiry_time - timedelta(hours=calculated_hours) def generate_ca( @@ -965,6 +1018,8 @@ def generate_csr( # noqa: C901 organization: Optional[str] = None, email_address: Optional[str] = None, country_name: Optional[str] = None, + state_or_province_name: Optional[str] = None, + locality_name: Optional[str] = None, private_key_password: Optional[bytes] = None, sans: Optional[List[str]] = None, sans_oid: Optional[List[str]] = None, @@ -983,6 +1038,8 @@ def generate_csr( # noqa: C901 organization (str): Name of organization. email_address (str): Email address. country_name (str): Country Name. + state_or_province_name (str): State or Province Name. + locality_name (str): Locality Name. private_key_password (bytes): Private key password sans (list): Use sans_dns - this will be deprecated in a future release List of DNS subject alternative names (keeping it for now for backward compatibility) @@ -1008,6 +1065,12 @@ def generate_csr( # noqa: C901 subject_name.append(x509.NameAttribute(x509.NameOID.EMAIL_ADDRESS, email_address)) if country_name: subject_name.append(x509.NameAttribute(x509.NameOID.COUNTRY_NAME, country_name)) + if state_or_province_name: + subject_name.append( + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, state_or_province_name) + ) + if locality_name: + subject_name.append(x509.NameAttribute(x509.NameOID.LOCALITY_NAME, locality_name)) csr = x509.CertificateSigningRequestBuilder(subject_name=x509.Name(subject_name)) _sans: List[x509.GeneralName] = [] @@ -1135,6 +1198,7 @@ def _add_certificate( certificate_signing_request: str, ca: str, chain: List[str], + recommended_expiry_notification_time: Optional[int] = None, ) -> None: """Add certificate to relation data. @@ -1144,6 +1208,8 @@ def _add_certificate( certificate_signing_request (str): Certificate Signing Request ca (str): CA Certificate chain (list): CA Chain + recommended_expiry_notification_time (int): + Time in hours before the certificate expires to notify the user. Returns: None @@ -1161,6 +1227,7 @@ def _add_certificate( "certificate_signing_request": certificate_signing_request, "ca": ca, "chain": chain, + "recommended_expiry_notification_time": recommended_expiry_notification_time, } provider_relation_data = self._load_app_relation_data(relation) provider_certificates = provider_relation_data.get("certificates", []) @@ -1227,6 +1294,7 @@ def set_relation_certificate( ca: str, chain: List[str], relation_id: int, + recommended_expiry_notification_time: Optional[int] = None, ) -> None: """Add certificates to relation data. @@ -1236,6 +1304,8 @@ def set_relation_certificate( ca (str): CA Certificate chain (list): CA Chain relation_id (int): Juju relation ID + recommended_expiry_notification_time (int): + Recommended time in hours before the certificate expires to notify the user. Returns: None @@ -1257,6 +1327,7 @@ def set_relation_certificate( certificate_signing_request=certificate_signing_request.strip(), ca=ca.strip(), chain=[cert.strip() for cert in chain], + recommended_expiry_notification_time=recommended_expiry_notification_time, ) def remove_certificate(self, certificate: str) -> None: @@ -1310,6 +1381,13 @@ def get_provider_certificates( provider_relation_data = self._load_app_relation_data(relation) provider_certificates = provider_relation_data.get("certificates", []) for certificate in provider_certificates: + try: + certificate_object = x509.load_pem_x509_certificate( + data=certificate["certificate"].encode() + ) + except ValueError as e: + logger.error("Could not load certificate - Skipping: %s", e) + continue provider_certificate = ProviderCertificate( relation_id=relation.id, application_name=relation.app.name, @@ -1318,6 +1396,10 @@ def get_provider_certificates( ca=certificate["ca"], chain=certificate["chain"], revoked=certificate.get("revoked", False), + expiry_time=certificate_object.not_valid_after_utc, + expiry_notification_time=certificate.get( + "recommended_expiry_notification_time" + ), ) certificates.append(provider_certificate) return certificates @@ -1475,15 +1557,17 @@ def __init__( self, charm: CharmBase, relationship_name: str, - expiry_notification_time: int = 168, + expiry_notification_time: Optional[int] = None, ): """Generate/use private key and observes relation changed event. Args: charm: Charm object relationship_name: Juju relation name - expiry_notification_time (int): Time difference between now and expiry (in hours). - Used to trigger the CertificateExpiring event. Default: 7 days. + expiry_notification_time (int): Number of hours prior to certificate expiry. + Used to trigger the CertificateExpiring event. + This value is used as a recommendation only, + The actual value is calculated taking into account the provider's recommendation. """ super().__init__(charm, relationship_name) if not JujuVersion.from_environ().has_secrets: @@ -1544,9 +1628,25 @@ def get_provider_certificates(self) -> List[ProviderCertificate]: if not certificate: logger.warning("No certificate found in relation data - Skipping") continue + try: + certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) + except ValueError as e: + logger.error("Could not load certificate - Skipping: %s", e) + continue ca = provider_certificate_dict.get("ca") chain = provider_certificate_dict.get("chain", []) csr = provider_certificate_dict.get("certificate_signing_request") + recommended_expiry_notification_time = provider_certificate_dict.get( + "recommended_expiry_notification_time" + ) + expiry_time = certificate_object.not_valid_after_utc + validity_start_time = certificate_object.not_valid_before_utc + expiry_notification_time = calculate_expiry_notification_time( + validity_start_time=validity_start_time, + expiry_time=expiry_time, + provider_recommended_notification_time=recommended_expiry_notification_time, + requirer_recommended_notification_time=self.expiry_notification_time, + ) if not csr: logger.warning("No CSR found in relation data - Skipping") continue @@ -1559,6 +1659,8 @@ def get_provider_certificates(self) -> List[ProviderCertificate]: ca=ca, chain=chain, revoked=revoked, + expiry_time=expiry_time, + expiry_notification_time=expiry_notification_time, ) provider_certificates.append(provider_certificate) return provider_certificates @@ -1708,13 +1810,9 @@ def get_expiring_certificates(self) -> List[ProviderCertificate]: expiring_certificates: List[ProviderCertificate] = [] for requirer_csr in self.get_certificate_signing_requests(fulfilled_only=True): if cert := self._find_certificate_in_relation_data(requirer_csr.csr): - expiry_time = _get_certificate_expiry_time(cert.certificate) - if not expiry_time: + if not cert.expiry_time or not cert.expiry_notification_time: continue - expiry_notification_time = expiry_time - timedelta( - hours=self.expiry_notification_time - ) - if datetime.now(timezone.utc) > expiry_notification_time: + if datetime.now(timezone.utc) > cert.expiry_notification_time: expiring_certificates.append(cert) return expiring_certificates @@ -1776,6 +1874,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: if certificate.csr in requirer_csrs: if certificate.revoked: with suppress(SecretNotFoundError): + logger.debug( + "Removing secret with label %s", f"{LIBID}-{certificate.csr}" + ) secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}") secret.remove_all_revisions() self.on.certificate_invalidated.emit( @@ -1787,16 +1888,22 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: ) else: try: + logger.debug( + "Setting secret with label %s", f"{LIBID}-{certificate.csr}" + ) secret = self.model.get_secret(label=f"{LIBID}-{certificate.csr}") secret.set_content({"certificate": certificate.certificate}) secret.set_info( - expire=self._get_next_secret_expiry_time(certificate.certificate), + expire=self._get_next_secret_expiry_time(certificate), ) except SecretNotFoundError: + logger.debug( + "Creating new secret with label %s", f"{LIBID}-{certificate.csr}" + ) secret = self.charm.unit.add_secret( {"certificate": certificate.certificate}, label=f"{LIBID}-{certificate.csr}", - expire=self._get_next_secret_expiry_time(certificate.certificate), + expire=self._get_next_secret_expiry_time(certificate), ) self.on.certificate_available.emit( certificate_signing_request=certificate.csr, @@ -1805,7 +1912,7 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: chain=certificate.chain, ) - def _get_next_secret_expiry_time(self, certificate: str) -> Optional[datetime]: + def _get_next_secret_expiry_time(self, certificate: ProviderCertificate) -> Optional[datetime]: """Return the expiry time or expiry notification time. Extracts the expiry time from the provided certificate, calculates the @@ -1813,17 +1920,18 @@ def _get_next_secret_expiry_time(self, certificate: str) -> Optional[datetime]: the future. Args: - certificate: x509 certificate + certificate: ProviderCertificate object Returns: Optional[datetime]: None if the certificate expiry time cannot be read, next expiry time otherwise. """ - expiry_time = _get_certificate_expiry_time(certificate) - if not expiry_time: + if not certificate.expiry_time or not certificate.expiry_notification_time: return None - expiry_notification_time = expiry_time - timedelta(hours=self.expiry_notification_time) - return _get_closest_future_time(expiry_notification_time, expiry_time) + return _get_closest_future_time( + certificate.expiry_notification_time, + certificate.expiry_time, + ) def _on_relation_broken(self, event: RelationBrokenEvent) -> None: """Handle Relation Broken Event. @@ -1864,20 +1972,19 @@ def _on_secret_expired(self, event: SecretExpiredEvent) -> None: event.secret.remove_all_revisions() return - expiry_time = _get_certificate_expiry_time(provider_certificate.certificate) - if not expiry_time: + if not provider_certificate.expiry_time: # A secret expired but matching certificate is invalid. Cleaning up event.secret.remove_all_revisions() return - if datetime.now(timezone.utc) < expiry_time: + if datetime.now(timezone.utc) < provider_certificate.expiry_time: logger.warning("Certificate almost expired") self.on.certificate_expiring.emit( certificate=provider_certificate.certificate, - expiry=expiry_time.isoformat(), + expiry=provider_certificate.expiry_time.isoformat(), ) event.secret.set_info( - expire=_get_certificate_expiry_time(provider_certificate.certificate), + expire=provider_certificate.expiry_time, ) else: logger.warning("Certificate is expired") diff --git a/requirements.txt b/requirements.txt index 99c9381cc..fb2c88777 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,3 @@ cryptography # deps: tracing, charm_tracing pydantic opentelemetry-exporter-otlp-proto-http==1.21.0 - -# deps: charm_logging -requests \ No newline at end of file diff --git a/src/charm.py b/src/charm.py index 6bf73eca7..a53047c71 100755 --- a/src/charm.py +++ b/src/charm.py @@ -116,6 +116,7 @@ class LokiOperatorCharm(CharmBase): _port = HTTP_LISTEN_PORT _name = "loki" _loki_push_api_endpoint = "/loki/api/v1/push" + _loki_rules_endpoint = "/loki/api/v1/rules" _ca_cert_path = "/usr/local/share/ca-certificates/cos-ca.crt" def __init__(self, *args): @@ -659,7 +660,7 @@ def _check_alert_rules(self): ssl_context = ssl.create_default_context( cafile=self._ca_cert_path if Path(self._ca_cert_path).exists() else None, ) - url = f"{self._internal_url}/loki/api/v1/rules" + url = f"{self._internal_url}{self._loki_rules_endpoint}" try: logger.debug(f"Checking loki alert rules via {url}.") urllib.request.urlopen(url, timeout=2.0, context=ssl_context) @@ -732,13 +733,13 @@ def tracing_endpoint(self) -> Optional[str]: def logging_endpoints(self) -> Optional[List[str]]: """Loki endpoint for charm logging.""" if self._loki_container.get_service(self._name).current is ops.pebble.ServiceStatus.ACTIVE: - return [self._internal_url + self._loki_push_api_endpoint] + return ["http://localhost:3100" + self._loki_push_api_endpoint] return [] @property def server_cert_path(self) -> Optional[str]: """Server certificate path for TLS tracing.""" - return self._ca_cert_path + return self._ca_cert_path if Path(self._ca_cert_path).exists() else None if __name__ == "__main__": From 87591e4682dda339c771d858466bd824ddc1f3b9 Mon Sep 17 00:00:00 2001 From: Pietro Pasotti Date: Tue, 28 May 2024 14:08:09 +0200 Subject: [PATCH 13/13] simple hardcoded ds integration --- lib/charms/loki_k8s/v0/charm_logging.py | 26 ++++++++++++++++++++++--- src/charm.py | 23 +++++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/lib/charms/loki_k8s/v0/charm_logging.py b/lib/charms/loki_k8s/v0/charm_logging.py index dffd69ba4..5135f795e 100644 --- a/lib/charms/loki_k8s/v0/charm_logging.py +++ b/lib/charms/loki_k8s/v0/charm_logging.py @@ -117,17 +117,21 @@ class LokiEmitter: ("-", "_"), ) - def __init__(self, url: str, tags: Optional[dict] = None, cert: Optional[str] = None): + def __init__(self, url: str, tags: Optional[dict] = None, fields: Optional[dict] = None, cert: Optional[str] = None): """Create new Loki emitter. Arguments: url: Endpoint used to send log entries to Loki (e.g. `https://my-loki-instance/loki/api/v1/push`). tags: Default tags added to every log record. + fields: Default fields added to every log record. cert: Absolute path to a ca cert for TLS authentication. """ #: Tags that will be added to all records handled by this handler. self.tags = tags or {} + # fields will be prepended to all records handled by this handler + self.prefix = " ".join(f"{field}={value}" for field, value in fields.items()) + " " if fields else "" + #: Loki JSON push endpoint (e.g `http://127.0.0.1/loki/api/v1/push`) self.url = url #: Optional cert for TLS auth @@ -135,7 +139,7 @@ def __init__(self, url: str, tags: Optional[dict] = None, cert: Optional[str] = def __call__(self, record: logging.LogRecord, line: str): """Send log record to Loki.""" - payload = self.build_payload(record, line) + payload = self.build_payload(record, self.prefix + line) req = request.Request(self.url, method='POST') req.add_header('Content-Type', 'application/json; charset=utf-8') jsondata_encoded = json.dumps(payload).encode("utf-8") @@ -164,6 +168,7 @@ def build_payload(self, record: logging.LogRecord, line) -> dict: "stream": labels, "values": [[ts, line]], } + print(stream) return {"streams": [stream]} @functools.lru_cache(256) @@ -205,6 +210,7 @@ def __init__( self, url: str, tags: Optional[dict] = None, + fields: Optional[dict] = None, # username, password tuple cert: Optional[str] = None, ): @@ -219,7 +225,7 @@ def __init__( """ super().__init__() - self.emitter = LokiEmitter(url, tags, cert) + self.emitter = LokiEmitter(url, tags, fields, cert) def emit(self, record: logging.LogRecord): """Send log record to Loki.""" @@ -356,6 +362,19 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): "charm_type_name": type(self).__name__, "dispatch_path": os.getenv("JUJU_DISPATCH_PATH", ""), } + fields = {} + try: + from charms.tempo_k8s.v1.charm_tracing import get_current_span + span = get_current_span() + if span: + logger.debug('root span found') + + fields['trace_id'] = str(hex(span.get_span_context().trace_id)[2:]) # strip 0x prefix + fields['span_id'] = str(hex(span.get_span_context().span_id)[2:]) # strip 0x prefix + + except ModuleNotFoundError: + logger.debug("no tracing library found: will not attach trace ids to logs") + server_cert: Optional[Union[str, Path]] = ( _get_server_cert(server_cert_getter, self, charm) if server_cert_getter else None ) @@ -366,6 +385,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): handler = LokiHandler( url=url, tags=tags, + fields=fields, cert=str(server_cert) if server_cert else None, # auth=("username", "password"), ) diff --git a/src/charm.py b/src/charm.py index 4b9120137..0c6f6db1f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -98,6 +98,8 @@ def to_status(tpl: Tuple[str, str]) -> StatusBase: return StatusBase.from_name(name, message) +# call log_charm second, so that trace_charm has a chance to set up the root span +@log_charm(logging_endpoints="logging_endpoints", server_cert="server_ca_cert_path") @trace_charm( tracing_endpoint="tracing_endpoint", server_cert="server_ca_cert_path", @@ -110,7 +112,6 @@ def to_status(tpl: Tuple[str, str]) -> StatusBase: MetricsEndpointProvider, ], ) -@log_charm(logging_endpoints="logging_endpoints", server_cert="server_cert_path") class LokiOperatorCharm(CharmBase): """Charm the service.""" @@ -191,6 +192,7 @@ def __init__(self, *args): ], source_type="loki", source_url=self._external_url, + extra_fields=self._extra_ds_fields ) self.metrics_provider = MetricsEndpointProvider( @@ -391,6 +393,25 @@ def internal_url(self): return f"{scheme}://{self.hostname}:{self._port}" @property + def _extra_ds_fields(self) -> Dict: + try: + # super duper ugly, but we have no way to exchange datasource IDs atm. + tempo_datasource_uid = f"juju_{self.model.name}_{self.model.uuid}_{self.tracing.relations[0].app.name}_0" + except Exception: + logger.error("failed to construct extra ds fields") + return {} + + return { + "derivedFields": [ + { + "datasourceUid": tempo_datasource_uid, + "matcherRegex": r"traceId=(\w+)", + "url": "$${__value.raw}", + "name": "trace_id" + } + ] + } + @property def _external_url(self) -> str: """Return the external hostname to be passed to ingress via the relation.""" if ingress_url := self.ingress_per_unit.url: