Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Loki Batch emitter #30

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
python-logging-loki
===================
# python-logging-loki

[![PyPI version](https://img.shields.io/pypi/v/python-logging-loki.svg)](https://pypi.org/project/python-logging-loki/)
[![Python version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-blue.svg)](https://www.python.org/)
Expand All @@ -9,43 +8,42 @@ python-logging-loki
Python logging handler for Loki.
https://grafana.com/loki

Installation
============
# Installation

```bash
pip install python-logging-loki
```

Usage
=====
# Usage

```python
import logging
import logging_loki


handler = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
auth=("username", "password"),
version="1",
)

logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(
"Something happened",
"Something happened",
extra={"tags": {"service": "my-service"}},
)
```

Example above will send `Something happened` message along with these labels:

- Default labels from handler
- Message level as `serverity`
- Logger's name as `logger`
- Logger's name as `logger`
- Labels from `tags` item of `extra` dict

The given example is blocking (i.e. each call will wait for the message to be sent).
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.
But you can use the built-in `QueueHandler` and` QueueListener` to send messages in a separate thread.

```python
import logging.handlers
Expand All @@ -56,10 +54,9 @@ from multiprocessing import Queue
queue = Queue(-1)
handler = logging.handlers.QueueHandler(queue)
handler_loki = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
auth=("username", "password"),
version="1",
)
logging.handlers.QueueListener(queue, handler_loki)

Expand All @@ -78,10 +75,9 @@ from multiprocessing import Queue

handler = logging_loki.LokiQueueHandler(
Queue(-1),
url="https://my-loki-instance/loki/api/v1/push",
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app"},
auth=("username", "password"),
version="1",
)

logger = logging.getLogger("my-logger")
Expand Down
2 changes: 1 addition & 1 deletion logging_loki/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
from logging_loki.handlers import LokiQueueHandler

__all__ = ["LokiHandler", "LokiQueueHandler"]
__version__ = "0.3.1"
__version__ = "0.4.0"
name = "logging_loki"
3 changes: 3 additions & 0 deletions logging_loki/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

BATCH_EXPORT_MIN_SIZE = int(os.getenv("BATCH_EXPORT_MIN_SIZE", 10))
57 changes: 33 additions & 24 deletions logging_loki/emitter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
# -*- coding: utf-8 -*-

import abc
import collections
import copy
import functools
import logging
import time
from logging.config import ConvertingDict
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Any, Dict, Optional, Tuple

import requests
import rfc3339

from logging_loki import const
from logging_loki.config import BATCH_EXPORT_MIN_SIZE

BasicAuth = Optional[Tuple[str, str]]

Expand Down Expand Up @@ -53,8 +50,11 @@ def __call__(self, record: logging.LogRecord, line: str):
"""Send log record to Loki."""
payload = self.build_payload(record, line)
resp = self.session.post(self.url, json=payload)
# TODO: Enqueue logs instead of raise an error that lose the logs
if resp.status_code != self.success_response_code:
raise ValueError("Unexpected Loki API response status code: {0}".format(resp.status_code))
raise ValueError(
"Unexpected Loki API response status code: {0}".format(resp.status_code)
)

@abc.abstractmethod
def build_payload(self, record: logging.LogRecord, line) -> dict:
Expand Down Expand Up @@ -105,31 +105,40 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
return tags


class LokiEmitterV0(LokiEmitter):
"""Emitter for Loki < 0.4.0."""

class LokiSimpleEmitter(LokiEmitter):
def build_payload(self, record: logging.LogRecord, line) -> dict:
"""Build JSON payload with a log entry."""
labels = self.build_labels(record)
ts = rfc3339.format_microsecond(record.created)
labels = self.build_tags(record)
ns = 1e9
ts = str(int(time.time() * ns))
stream = {
"labels": labels,
"entries": [{"ts": ts, "line": line}],
"stream": labels,
"values": [[ts, line]],
}
return {"streams": [stream]}

def build_labels(self, record: logging.LogRecord) -> str:
"""Return Loki labels string."""
labels: List[str] = []
for label_name, label_value in self.build_tags(record).items():
cleared_name = self.format_label(str(label_name))
cleared_value = str(label_value).replace('"', r"\"")
labels.append('{0}="{1}"'.format(cleared_name, cleared_value))
return "{{{0}}}".format(",".join(labels))

class LokiBatchEmitter(LokiEmitter):
buffer = collections.deque([])

class LokiEmitterV1(LokiEmitter):
"""Emitter for Loki >= 0.4.0."""
def __call__(self, record: logging.LogRecord, line: str):
"""Send log record to Loki."""
payload = self.build_payload(record, line)
if len(self.buffer) < BATCH_EXPORT_MIN_SIZE:
self.buffer.appendleft(payload["streams"][0])
else:
resp = self.session.post(
self.url,
json={
"streams": [self.buffer.pop() for _ in range(BATCH_EXPORT_MIN_SIZE)]
},
)
if resp.status_code != self.success_response_code:
raise ValueError(
"Unexpected Loki API response status code: {0}".format(
resp.status_code
)
)

def build_payload(self, record: logging.LogRecord, line) -> dict:
"""Build JSON payload with a log entry."""
Expand Down
22 changes: 2 additions & 20 deletions logging_loki/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@ class LokiHandler(logging.Handler):
`Loki API <https://github.com/grafana/loki/blob/master/docs/api.md>`_
"""

emitters: Dict[str, Type[emitter.LokiEmitter]] = {
"0": emitter.LokiEmitterV0,
"1": emitter.LokiEmitterV1,
}

def __init__(
self,
url: str,
tags: Optional[dict] = None,
auth: Optional[emitter.BasicAuth] = None,
version: Optional[str] = None,
emitter: emitter.LokiEmitter = emitter.LokiSimpleEmitter,
):
"""
Create new Loki logging handler.
Expand All @@ -54,20 +49,7 @@ def __init__(

"""
super().__init__()

if version is None and const.emitter_ver == "0":
msg = (
"Loki /api/prom/push endpoint is in the depreciation process starting from version 0.4.0.",
"Explicitly set the emitter version to '0' if you want to use the old endpoint.",
"Or specify '1' if you have Loki version> = 0.4.0.",
"When the old API is removed from Loki, the handler will use the new version by default.",
)
warnings.warn(" ".join(msg), DeprecationWarning)

version = version or const.emitter_ver
if version not in self.emitters:
raise ValueError("Unknown emitter version: {0}".format(version))
self.emitter = self.emitters[version](url, tags, auth)
self.emitter = emitter(url, tags, auth)

def handleError(self, record): # noqa: N802
"""Close emitter and let default handler take actions on error."""
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setuptools.setup(
name="python-logging-loki",
version="0.3.1",
version="0.4.0",
description="Python logging handler for Grafana Loki.",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
Loading