From 10866ab8108e6a51651919c84e1df80148ad89f8 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Thu, 28 Nov 2024 14:51:06 +0000 Subject: [PATCH] Surface memberlist logs on adequate level (#68) --- logging.go | 59 +++++++++++++++++++++++++++++++++ logging_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ node.go | 12 ++++--- 3 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 logging.go create mode 100644 logging_test.go diff --git a/logging.go b/logging.go new file mode 100644 index 0000000..9738e1e --- /dev/null +++ b/logging.go @@ -0,0 +1,59 @@ +package ckit + +import ( + "bytes" + "io" + golog "log" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +var ( + errPrefix = []byte("[ERR]") + warnPrefix = []byte("[WARN]") + infoPrefix = []byte("[INFO]") + debugPrefix = []byte("[DEBUG]") + memberListPrefix = []byte("memberlist: ") +) + +// memberListOutputLogger will do best-effort classification of the logging level that memberlist uses in its log lines +// and use the corresponding level when logging with logger. It will drop redundant `[] memberlist:` parts. +// This helps us surface only desired log messages from memberlist. If classification fails, info level is used as +// a fallback. See tests for detailed behaviour. +type memberListOutputLogger struct { + logger log.Logger +} + +var _ io.Writer = (*memberListOutputLogger)(nil) + +func newMemberListLogger(logger log.Logger) *golog.Logger { + return golog.New(&memberListOutputLogger{logger: logger}, "", 0) +} + +func (m *memberListOutputLogger) Write(p []byte) (int, error) { + var err error + + sanitizeFn := func(dropPrefix []byte, msg []byte) []byte { + noLevel := bytes.TrimSpace(bytes.TrimPrefix(msg, dropPrefix)) + return bytes.TrimPrefix(noLevel, memberListPrefix) + } + + switch { + case bytes.HasPrefix(p, errPrefix): + err = level.Error(m.logger).Log("msg", sanitizeFn(errPrefix, p)) + case bytes.HasPrefix(p, warnPrefix): + err = level.Warn(m.logger).Log("msg", sanitizeFn(warnPrefix, p)) + case bytes.HasPrefix(p, infoPrefix): + err = level.Info(m.logger).Log("msg", sanitizeFn(infoPrefix, p)) + case bytes.HasPrefix(p, debugPrefix): + err = level.Debug(m.logger).Log("msg", sanitizeFn(debugPrefix, p)) + default: + err = level.Info(m.logger).Log("msg", sanitizeFn(nil, p)) + } + + if err != nil { + return 0, err + } + return len(p), nil +} diff --git a/logging_test.go b/logging_test.go new file mode 100644 index 0000000..053cc30 --- /dev/null +++ b/logging_test.go @@ -0,0 +1,86 @@ +package ckit + +import ( + "fmt" + golog "log" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLogging(t *testing.T) { + cases := []struct { + name string + message string + expected string + }{ + { + name: "debug level", + message: "[DEBUG] memberlist: Stream connection from=127.0.0.1:56631\n", + expected: `level="debug", msg="Stream connection from=127.0.0.1:56631"`, + }, + { + name: "info level", + message: "[INFO] memberlist: Suspect node-a has failed, no acks received\n", + expected: `level="info", msg="Suspect node-a has failed, no acks received"`, + }, + { + name: "warn level", + message: "[WARN] memberlist: Refuting a dead message (from: node-b)\n", + expected: `level="warn", msg="Refuting a dead message (from: node-b)"`, + }, + { + name: "error level", + message: "[ERR] memberlist: Failed fallback TCP ping: io: read/write on closed pipe\n", + expected: `level="error", msg="Failed fallback TCP ping: io: read/write on closed pipe"`, + }, + { + name: "error without memberlist", + message: "[ERR] Failed to shutdown transport: test\n", + expected: `level="error", msg="Failed to shutdown transport: test"`, + }, + { + name: "default level", + message: "a message without level\n", + expected: `level="info", msg="a message without level"`, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + capture := newCaptureLogger() + logger := newMemberListLogger(capture) + logger.Println(c.message) + require.Len(t, capture.lines, 1) + require.Equal(t, c.expected, capture.lines[0]) + }) + } +} + +func newCaptureLogger() *captureLogger { + c := &captureLogger{ + lines: make([]string, 0), + } + c.adapter = golog.New(c, "", 0) + return c +} + +type captureLogger struct { + lines []string + adapter *golog.Logger +} + +func (c *captureLogger) Write(p []byte) (n int, err error) { + c.lines = append(c.lines, string(p)) + return len(p), nil +} + +func (c *captureLogger) Log(keyvals ...interface{}) error { + lineParts := make([]string, 0) + for i := 0; i < len(keyvals); i += 2 { + lineParts = append(lineParts, fmt.Sprintf("%v=%q", keyvals[i], keyvals[i+1])) + } + c.lines = append(c.lines, strings.Join(lineParts, ", ")) + return nil +} diff --git a/node.go b/node.go index 95a2fce..9404ed8 100644 --- a/node.go +++ b/node.go @@ -14,15 +14,16 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/memberlist" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/ckit/internal/gossiphttp" "github.com/grafana/ckit/internal/lamport" "github.com/grafana/ckit/internal/messages" "github.com/grafana/ckit/internal/queue" "github.com/grafana/ckit/peer" "github.com/grafana/ckit/shard" - "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/memberlist" - "github.com/prometheus/client_golang/prometheus" ) var ( @@ -178,11 +179,12 @@ func NewNode(cli *http.Client, cfg Config) (*Node, error) { mlc.Transport = httpTransport mlc.AdvertiseAddr = advertiseIP.String() mlc.AdvertisePort = advertisePort - mlc.LogOutput = io.Discard mlc.Label = cfg.Label if cfg.Log != nil { - mlc.LogOutput = log.NewStdlibAdapter(level.Debug(log.With(cfg.Log, "component", "memberlist"))) + mlc.Logger = newMemberListLogger(log.With(cfg.Log, "subsystem", "memberlist")) + } else { + mlc.LogOutput = io.Discard } n := &Node{