Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka shim #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
12 changes: 10 additions & 2 deletions blueox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,28 @@
from .errors import Error
from .logger import LogHandler
from .timer import timeit
from .recorders import kafka

log = logging.getLogger(__name__)


def configure(host, port, recorder=None):
"""Initialize blueox

This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will
This instructs the blueox system where to send its logging data. If blueox is not configured, log data will
be silently dropped.

Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or
to the specified recorder function
"""
if recorder:
override_kafka_recorder = os.getenv('BLUEOX_OVERRIDE_KAFKA_RECORDER', 0)

if int(override_kafka_recorder) == 1:
log.info("Kafka override set, using kafka recorder")
host = ports.default_kafka_host()
kafka.init(host)
_context_mod._recorder_function = kafka.send
elif recorder:
_context_mod._recorder_function = recorder
elif host and port:
network.init(host, port)
Expand Down
10 changes: 10 additions & 0 deletions blueox/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,13 @@ def default_control_host(host=None):
def default_collect_host(host=None):
default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_COLLECT_PORT)


# For consistency, we'll abstract kafka connections in the same way
ENV_VAR_KAFKA_HOST = 'BLUEOX_KAFKA_HOST'
DEFAULT_KAFKA_PORT = 9092


def default_kafka_host(host=None):
default_host = os.environ.get(ENV_VAR_KAFKA_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_KAFKA_PORT)
Empty file added blueox/recorders/__init__.py
Empty file.
102 changes: 102 additions & 0 deletions blueox/recorders/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
"""
blueox.kafka
~~~~~~~~

This module provides the interface into Kafka

:copyright: (c) 2018 by Aaron Biller??
:license: ISC, see LICENSE for more details.

"""
from __future__ import absolute_import

import atexit
import logging
import msgpack
import threading

from kafka import KafkaProducer

from blueox import utils

log = logging.getLogger(__name__)

# If we have pending outgoing messages, this is how long we'll wait after
# being told to exit.
LINGER_SHUTDOWN_MSECS = 2000


threadLocal = threading.local()

# Context can be shared between threads
_kafka_hosts = None


def init(host):
global _kafka_hosts

_kafka_hosts = host


def _thread_connect():
if _kafka_hosts and not getattr(threadLocal, 'kp', None):
threadLocal.kp = KafkaProducer(bootstrap_servers=_kafka_hosts)


def _serialize_context(context):
context_dict = context.to_dict()
for key in ('host', 'type'):
if len(context_dict.get(key, "")) > 64:
raise ValueError("Value too long: %r" % key)

context_dict = {
k: v.encode('utf-8') if isinstance(v, unicode)
else v for k, v in context_dict.items()
}

try:
context_data = msgpack.packb(context_dict)
except TypeError:
try:
# If we fail to serialize our context, we can try again with an
# enhanced packer (it's slower though)
context_data = msgpack.packb(context_dict,
default=utils.msgpack_encode_default)
except TypeError:
log.exception("Serialization failure (not fatal, dropping data)")

# One last try after dropping the body
context_dict['body'] = None
context_data = msgpack.packb(context_dict)

return context_data


def send(context):
_thread_connect()

try:
context_data = _serialize_context(context)
except Exception:
log.exception("Failed to serialize context")
return

if _kafka_hosts and threadLocal.kp is not None:
try:
log.debug("Sending msg")
threadLocal.kp.send('events', context_data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually going to get really slow and backup blueox and cause an outage.
You need a non blocking way to send these. Also I do worry a little bit about the threadlocal approach, because Python and threading never play well together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a number of kwargs that can configure this better to prevent things from backing up:

  • acks (0, 1, 'all') –
    The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:

    0: Producer will not wait for any acknowledgment from the server.

    The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.

    1: Wait for leader to write the record to its local log only.

    Broker will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

    all: Wait for the full set of in-sync replicas to write the record.

    This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

    If unset, defaults to acks=1.

  • retries (int) – Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max_in_flight_requests_per_connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Default: 0.

  • max_block_ms (int) – Number of milliseconds to block during send() and partitions_for(). These methods can be blocked either because the buffer is full or metadata unavailable. Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. Default: 60000.

  • retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
    request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.

Copy link

@pulltab pulltab Aug 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My $.02, let's not tightly couple postal-main to Kafka. kafka-python uses a thread under the hood to send events and things can go unexpected in the presence of multiprocessing and monkey patching (which we may do some day, who knows).

Proposal:

  1. We configure a separate source / sink in cernan / cernan-events for blueox events:

    • Kafka bootstrapping is done async.
    • We can tune the blueox sources/sinks to discard (or not) in the presence of Kafka slowness as we see fit.
    • Financial events are already slated to use cernan, so we are properly entrenched already.
  2. Using pycernan forces us to use avro for blueox like we are going to do with everything else.

  3. RUST!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. Takes the burden of async off of blueox/postal, and I was considering the avro route anyway.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: cernan, I was under the impression that we're migrating to fluentbit for log transport?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fluentbit is just replacing cernan when it comes to consuming on disk logs, no?

Now we get into a meaty discussion around whether or not blueox emits logs or events.

except Exception:
log.exception("Failed during publish to kafka.")
else:
log.info("Skipping sending event %s", context.name)


def close():
if getattr(threadLocal, 'kp', None):
threadLocal.kp.flush()
threadLocal.kp.close(timeout=LINGER_SHUTDOWN_MSECS)
threadLocal.kp = None


atexit.register(close)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pyflakes
tornado==3.2
boto
yapf
kafka-python
28 changes: 27 additions & 1 deletion tests/ports_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
from testify import *
from testify import (
TestCase,
assert_equal,
teardown)

from blueox import ports

Expand Down Expand Up @@ -71,3 +74,26 @@ def test_env_port(self):
os.environ['BLUEOX_HOST'] = 'master:123'
host = ports.default_collect_host()
assert_equal(host, "master:123")


class DefaultKafkaHost(TestCase):
@teardown
def clear_env(self):
try:
del os.environ['BLUEOX_KAFKA_HOST']
except KeyError:
pass

def test_emtpy(self):
host = ports.default_kafka_host()
assert_equal(host, '127.0.0.1:9092')

def test_env(self):
os.environ['BLUEOX_KAFKA_HOST'] = 'local.svc.team-me.aws.jk8s'
host = ports.default_kafka_host()
assert_equal(host, 'local.svc.team-me.aws.jk8s:9092')

def test_env_port(self):
os.environ['BLUEOX_KAFKA_HOST'] = 'local.svc.team-me.aws.jk8s:9002'
host = ports.default_kafka_host()
assert_equal(host, 'local.svc.team-me.aws.jk8s:9002')
Empty file added tests/recorders/__init__.py
Empty file.
120 changes: 120 additions & 0 deletions tests/recorders/kafka_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import os
import random
import decimal
import datetime

import msgpack
from testify import (
TestCase,
setup,
teardown,
assert_equal)

from blueox import default_configure
from blueox import utils
from blueox import network
from blueox import context
from blueox.recorders import kafka


class MockKafkaProducer(object):
last_topic = None
last_data = None
close_timeout = None

def __call__(self, bootstrap_servers=None):
self.bootstrap_servers = bootstrap_servers
return self

def send(self, topic, data):
self.last_topic = topic
self.last_data = data

def flush(self):
pass

def close(self, timeout=None):
self.close_timeout = timeout


class KafkaOverrideTestCase(TestCase):
@teardown
def clear_env(self):
try:
del os.environ['BLUEOX_OVERRIDE_KAFKA_RECORDER']
except KeyError:
pass

def test_configure_no_override(self):
default_configure()
assert_equal(context._recorder_function, network.send)

def test_configure_override(self):
os.environ['BLUEOX_OVERRIDE_KAFKA_RECORDER'] = '1'
default_configure()
assert_equal(context._recorder_function, kafka.send)


class KafkaSendTestCase(TestCase):
@setup
def build_context(self):
self.context = context.Context('test', 1)

@setup
def init_kafka(self):
self.port = random.randint(30000, 40000)
kafka.init('127.0.0.1:{}'.format(self.port))

@setup
def configure_kafka(self):
context._recorder_function = kafka.send
self.kp = MockKafkaProducer()
kafka.KafkaProducer = self.kp

@teardown
def unconfigure_kafka(self):
context._recorder_function = None

def test(self):
with self.context:
self.context.set('foo', True)
self.context.set('bar.baz', 10.0)

data = msgpack.unpackb(self.kp.last_data)
assert_equal(self.kp.last_topic, 'events')
assert_equal(data['id'], 1)
assert_equal(data['type'], 'test')
assert_equal(utils.get_deep(data['body'], "bar.baz"), 10.0)

kafka.close()
assert_equal(self.kp.close_timeout, kafka.LINGER_SHUTDOWN_MSECS)


class SerializeContextTestCase(TestCase):
@setup
def build_context(self):
self.context = context.Context('test', 1)

def test_types(self):
with self.context:
self.context.set('decimal_value', decimal.Decimal("6.66"))
self.context.set('date_value', datetime.date(2013, 12, 10))
self.context.set('datetime_value', datetime.datetime(2013, 12, 10, 12, 12, 12))

context_data = kafka._serialize_context(self.context)
data = msgpack.unpackb(context_data)
assert_equal(data['body']['decimal_value'], "6.66")
assert_equal(data['body']['date_value'], "2013-12-10")
assert_equal(
datetime.datetime.fromtimestamp(float(data['body']['datetime_value'])),
datetime.datetime(2013, 12, 10, 12, 12, 12))

def test_exception(self):
with self.context:
self.context.set('value', Exception('hello'))

context_data = kafka._serialize_context(self.context)
data = msgpack.unpackb(context_data)

# The serialization should fail, but that just means we don't have any data.
assert_equal(data['body'], None)