Skip to content

Commit

Permalink
Surface memberlist logs on adequate level (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored Nov 28, 2024
1 parent 23100d7 commit 10866ab
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 5 deletions.
59 changes: 59 additions & 0 deletions logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ckit

import (
"bytes"
"io"
golog "log"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

var (
errPrefix = []byte("[ERR]")
warnPrefix = []byte("[WARN]")
infoPrefix = []byte("[INFO]")
debugPrefix = []byte("[DEBUG]")
memberListPrefix = []byte("memberlist: ")
)

// memberListOutputLogger will do best-effort classification of the logging level that memberlist uses in its log lines
// and use the corresponding level when logging with logger. It will drop redundant `[<level>] memberlist:` parts.
// This helps us surface only desired log messages from memberlist. If classification fails, info level is used as
// a fallback. See tests for detailed behaviour.
type memberListOutputLogger struct {
logger log.Logger
}

var _ io.Writer = (*memberListOutputLogger)(nil)

func newMemberListLogger(logger log.Logger) *golog.Logger {
return golog.New(&memberListOutputLogger{logger: logger}, "", 0)
}

func (m *memberListOutputLogger) Write(p []byte) (int, error) {
var err error

sanitizeFn := func(dropPrefix []byte, msg []byte) []byte {
noLevel := bytes.TrimSpace(bytes.TrimPrefix(msg, dropPrefix))
return bytes.TrimPrefix(noLevel, memberListPrefix)
}

switch {
case bytes.HasPrefix(p, errPrefix):
err = level.Error(m.logger).Log("msg", sanitizeFn(errPrefix, p))
case bytes.HasPrefix(p, warnPrefix):
err = level.Warn(m.logger).Log("msg", sanitizeFn(warnPrefix, p))
case bytes.HasPrefix(p, infoPrefix):
err = level.Info(m.logger).Log("msg", sanitizeFn(infoPrefix, p))
case bytes.HasPrefix(p, debugPrefix):
err = level.Debug(m.logger).Log("msg", sanitizeFn(debugPrefix, p))
default:
err = level.Info(m.logger).Log("msg", sanitizeFn(nil, p))
}

if err != nil {
return 0, err
}
return len(p), nil
}
86 changes: 86 additions & 0 deletions logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package ckit

import (
"fmt"
golog "log"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestLogging(t *testing.T) {
cases := []struct {
name string
message string
expected string
}{
{
name: "debug level",
message: "[DEBUG] memberlist: Stream connection from=127.0.0.1:56631\n",
expected: `level="debug", msg="Stream connection from=127.0.0.1:56631"`,
},
{
name: "info level",
message: "[INFO] memberlist: Suspect node-a has failed, no acks received\n",
expected: `level="info", msg="Suspect node-a has failed, no acks received"`,
},
{
name: "warn level",
message: "[WARN] memberlist: Refuting a dead message (from: node-b)\n",
expected: `level="warn", msg="Refuting a dead message (from: node-b)"`,
},
{
name: "error level",
message: "[ERR] memberlist: Failed fallback TCP ping: io: read/write on closed pipe\n",
expected: `level="error", msg="Failed fallback TCP ping: io: read/write on closed pipe"`,
},
{
name: "error without memberlist",
message: "[ERR] Failed to shutdown transport: test\n",
expected: `level="error", msg="Failed to shutdown transport: test"`,
},
{
name: "default level",
message: "a message without level\n",
expected: `level="info", msg="a message without level"`,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
capture := newCaptureLogger()
logger := newMemberListLogger(capture)
logger.Println(c.message)
require.Len(t, capture.lines, 1)
require.Equal(t, c.expected, capture.lines[0])
})
}
}

func newCaptureLogger() *captureLogger {
c := &captureLogger{
lines: make([]string, 0),
}
c.adapter = golog.New(c, "", 0)
return c
}

type captureLogger struct {
lines []string
adapter *golog.Logger
}

func (c *captureLogger) Write(p []byte) (n int, err error) {
c.lines = append(c.lines, string(p))
return len(p), nil
}

func (c *captureLogger) Log(keyvals ...interface{}) error {
lineParts := make([]string, 0)
for i := 0; i < len(keyvals); i += 2 {
lineParts = append(lineParts, fmt.Sprintf("%v=%q", keyvals[i], keyvals[i+1]))
}
c.lines = append(c.lines, strings.Join(lineParts, ", "))
return nil
}
12 changes: 7 additions & 5 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/ckit/internal/gossiphttp"
"github.com/grafana/ckit/internal/lamport"
"github.com/grafana/ckit/internal/messages"
"github.com/grafana/ckit/internal/queue"
"github.com/grafana/ckit/peer"
"github.com/grafana/ckit/shard"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand Down Expand Up @@ -178,11 +179,12 @@ func NewNode(cli *http.Client, cfg Config) (*Node, error) {
mlc.Transport = httpTransport
mlc.AdvertiseAddr = advertiseIP.String()
mlc.AdvertisePort = advertisePort
mlc.LogOutput = io.Discard
mlc.Label = cfg.Label

if cfg.Log != nil {
mlc.LogOutput = log.NewStdlibAdapter(level.Debug(log.With(cfg.Log, "component", "memberlist")))
mlc.Logger = newMemberListLogger(log.With(cfg.Log, "subsystem", "memberlist"))
} else {
mlc.LogOutput = io.Discard
}

n := &Node{
Expand Down

0 comments on commit 10866ab

Please sign in to comment.