Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: logger with tid & improve open tracing in metadata #651

Merged
merged 5 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestFrameRoundTrip(t *testing.T) {
exited = checkClientExited(sfn, time.Second)
assert.False(t, exited, "sfn stream should not exited")

sfnMetaBytes, _ := NewDefaultMetadata(source.clientID, "tid", "sid", false).Encode()
sfnMetaBytes, _ := NewMetadata(source.clientID, "tid", "trace-id", "span-id", false).Encode()

err = sfn.WriteFrame(&frame.DataFrame{Tag: backflowTag, Metadata: sfnMetaBytes, Payload: backflow})
assert.NoError(t, err)
Expand All @@ -142,7 +142,7 @@ func TestFrameRoundTrip(t *testing.T) {
assert.ElementsMatch(t, nameList, []string{"source", "sfn-1"})

md := metadata.New(
NewDefaultMetadata(source.clientID, "tid", "sid", false),
NewMetadata(source.clientID, "tid", "trace-id", "span-id", false),
metadata.M{
"foo": "bar",
},
Expand Down
3 changes: 3 additions & 0 deletions core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (c *Context) WithFrame(f frame.Frame) error {
c.Frame = df
c.FrameMetadata = fmd

// log with tid
c.Logger = c.BaseLogger.With("tid", GetTIDFromMetadata(fmd))

return nil
}

Expand Down
165 changes: 123 additions & 42 deletions core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,31 @@

import (
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/trace"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)

const (
// the keys for yomo working.
MetadataSourceIDKey = "yomo-source-id"
MetadataTIDKey = "yomo-tid"
MetadataSIDKey = "yomo-sid"
MetaTraced = "yomo-traced"

// the keys for tracing.
MetadataTraceIDKey = "yomo-trace-id"
MetadataSpanIDKey = "yomo-span-id"
MetaTracedKey = "yomo-traced"
)

// NewDefaultMetadata returns a default metadata.
func NewDefaultMetadata(sourceID string, tid string, sid string, traced bool) metadata.M {
tracedString := "false"
if traced {
tracedString = "true"
}
// NewMetadata returns metadata for yomo working.
func NewMetadata(sourceID, tid string, traceID string, spanID string, traced bool) metadata.M {
return metadata.M{
MetadataSourceIDKey: sourceID,
MetadataTIDKey: tid,
MetadataSIDKey: sid,
MetaTraced: tracedString,
MetadataTraceIDKey: traceID,
MetadataSpanIDKey: spanID,
MetaTracedKey: tracedString(traced),
}
}

Expand All @@ -38,47 +42,124 @@
return tid
}

// GetSIDFromMetadata gets SID from metadata.
func GetSIDFromMetadata(m metadata.M) string {
sid, _ := m.Get(MetadataSIDKey)
return sid
}

// GetTracedFromMetadata gets traced from metadata.
func GetTracedFromMetadata(m metadata.M) bool {
traced, _ := m.Get(MetaTraced)
return traced == "true"
tracedString, _ := m.Get(MetaTracedKey)
return tracedString == "true"
}

// SetTIDToMetadata sets tid to metadata.
func SetTIDToMetadata(m metadata.M, tid string) {
m.Set(MetadataTIDKey, tid)
}
// SourceMetadata generates source metadata with trace information.
func SourceMetadata(
sourceID, tid string,
spanName string, // the span name usually is the source name.
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
var (
traceID string
spanID string
traced bool
endFn = func() {}

Check warning on line 61 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L56-L61

Added lines #L56 - L61 were not covered by tests
)
if tp != nil {
span, err := trace.NewSpan(tp, "Source", spanName, "", "")
if err != nil {
logger.Debug("trace error", "tracer_name", "Source", "span_name", spanName, "err", err)
} else {
endFn = func() { span.End() }
traceID = span.SpanContext().TraceID().String()
spanID = span.SpanContext().SpanID().String()
traced = true

Check warning on line 71 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L63-L71

Added lines #L63 - L71 were not covered by tests
}
}
if traceID == "" {
logger.Debug("create new traceID", "tracer_name", "Source", "span_name", spanName, "trace_id", traceID)
traceID = id.NewTraceID()
}
if spanID == "" {
logger.Debug("create new spanID", "tracer_name", "Source", "span_name", spanName, "span_id", spanID)
spanID = id.NewSpanID()
}
logger.Debug(
"trace metadata",
"tracer_name", "Source", "span_name", spanName,
"trace_id", traceID, "span_id", spanID, "traced", traced,
)
md := NewMetadata(sourceID, id.New(), traceID, spanID, traced)

Check warning on line 87 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L74-L87

Added lines #L74 - L87 were not covered by tests

// SetSIDToMetadata sets sid to metadata.
func SetSIDToMetadata(m metadata.M, sid string) {
m.Set(MetadataSIDKey, sid)
return md, endFn

Check warning on line 89 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L89

Added line #L89 was not covered by tests
}

// SetTracedToMetadata sets traced to metadata.
func SetTracedToMetadata(m metadata.M, traced bool) {
tracedString := "false"
if traced {
tracedString = "true"
// ExtendTraceMetadata extends source metadata with trace information.
func ExtendTraceMetadata(
md metadata.M,
tracerName string, // the tracer name is `StreamFunction` or `Zipper`.
spanName string, // the span name usually is the sfn name.
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
var (
traceID, _ = md.Get(MetadataTraceIDKey)
spanID, _ = md.Get(MetadataSpanIDKey)
parentTraced = GetTracedFromMetadata(md)
endFn = func() {}
)
traced := false
if tp != nil {
var span oteltrace.Span
var err error
// set parent span, if not traced, use empty string
if parentTraced {
span, err = trace.NewSpan(tp, string(tracerName), spanName, traceID, spanID)
} else {
span, err = trace.NewSpan(tp, string(tracerName), spanName, "", "")
}
if err != nil {
logger.Debug("trace error", "tracer_name", tracerName, "span_name", spanName, "err", err)
} else {
endFn = func() { span.End() }
traceID = span.SpanContext().TraceID().String()
spanID = span.SpanContext().SpanID().String()
traced = true

Check warning on line 121 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L107-L121

Added lines #L107 - L121 were not covered by tests
}
}
m.Set(MetaTraced, tracedString)
}
if traceID == "" {
logger.Debug("create new traceID", "tracer_name", tracerName, "span_name", spanName, "trace_id", traceID)
traceID = id.NewTraceID()
}

Check warning on line 127 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L125-L127

Added lines #L125 - L127 were not covered by tests
if spanID == "" {
logger.Debug("create new spanID", "tracer_name", tracerName, "span_name", spanName, "span_id", spanID)
spanID = id.NewSpanID()
}

Check warning on line 131 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L129-L131

Added lines #L129 - L131 were not covered by tests
logger.Debug(
"trace metadata",
"tracer_name", tracerName, "span_name", spanName,
"trace_id", traceID, "span_id", spanID, "traced", traced, "parent_traced", parentTraced,
)

// MetadataSlogAttr returns slog.Attr from metadata.
func MetadataSlogAttr(md metadata.M) slog.Attr {
kvStrings := make([]any, len(md)*2)
i := 0
for k, v := range md {
kvStrings[i] = k
i++
kvStrings[i] = v
i++
if tracerName == "Zipper" {
traced = traced || parentTraced
}

return slog.Group("metadata", kvStrings...)
// reallocate metadata with new TraceID and SpanID
md.Set(MetadataTraceIDKey, traceID)
md.Set(MetadataSpanIDKey, spanID)
md.Set(MetaTracedKey, tracedString(traced))

return md, endFn
}

// SfnTraceMetadata extends metadata for StreamFunction.
func SfnTraceMetadata(md metadata.M, sfnName string, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func()) {
return ExtendTraceMetadata(md, "StreamFunction", sfnName, tp, logger)

Check warning on line 152 in core/metadata.go

View check run for this annotation

Codecov / codecov/patch

core/metadata.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}

// ZipperTraceMetadata extends metadata for Zipper.
func ZipperTraceMetadata(md metadata.M, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func()) {
return ExtendTraceMetadata(md, "Zipper", "zipper endpoint", tp, logger)
}

func tracedString(traced bool) string {
if traced {
return "true"
}
return "false"
}
41 changes: 2 additions & 39 deletions core/metadata_test.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,15 @@
package core

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
"github.com/yomorun/yomo/core/metadata"
"golang.org/x/exp/slog"
)

func TestMetadata(t *testing.T) {
md := NewDefaultMetadata("source", "xxxxxxx", "sssssss", true)
md := NewMetadata("source", "tid", "traceID", "spanID", true)

assert.Equal(t, "source", GetSourceIDFromMetadata(md))
assert.Equal(t, "xxxxxxx", GetTIDFromMetadata(md))
assert.Equal(t, "sssssss", GetSIDFromMetadata(md))
assert.Equal(t, "tid", GetTIDFromMetadata(md))
assert.Equal(t, true, GetTracedFromMetadata(md))

SetTIDToMetadata(md, "ccccccc")
assert.Equal(t, "ccccccc", GetTIDFromMetadata(md))

SetSIDToMetadata(md, "aaaaaaa")
assert.Equal(t, "aaaaaaa", GetSIDFromMetadata(md))

SetTracedToMetadata(md, false)
assert.Equal(t, false, GetTracedFromMetadata(md))
}

func TestMetadataSlogAttr(t *testing.T) {
md := metadata.New(map[string]string{
"aaaa": "bbbb",
})

buf := bytes.NewBuffer(nil)

logger := slog.New(slog.NewTextHandler(buf, &slog.HandlerOptions{
AddSource: false,
Level: slog.LevelDebug,
// display time attr.
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == "time" {
return slog.Attr{}
}
return a
},
}))

logger.Debug("test metadata", MetadataSlogAttr(md))

assert.Equal(t, "level=DEBUG msg=\"test metadata\" metadata.aaaa=bbbb\n", buf.String())
}
61 changes: 11 additions & 50 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
// authentication implements, Currently, only token authentication is implemented
_ "github.com/yomorun/yomo/pkg/auth"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
pkgtls "github.com/yomorun/yomo/pkg/tls"
"github.com/yomorun/yomo/pkg/trace"
oteltrace "go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -345,51 +343,18 @@
// counter +1
atomic.AddInt64(&s.counterOfDataFrame, 1)

from := c.Connection
tid := GetTIDFromMetadata(c.FrameMetadata)
sid := GetSIDFromMetadata(c.FrameMetadata)
parentTraced := GetTracedFromMetadata(c.FrameMetadata)
traced := false
// trace
tp := s.TracerProvider()
if tp != nil {
// create span
var span oteltrace.Span
var err error
// set parent span, if not traced, use empty string
if parentTraced {
span, err = trace.NewSpan(tp, "zipper", "handle DataFrame", tid, sid)
} else {
span, err = trace.NewSpan(tp, "zipper", "handle DataFrame", "", "")
}
if err != nil {
s.logger.Error("zipper trace error", "err", err)
} else {
defer span.End()
tid = span.SpanContext().TraceID().String()
sid = span.SpanContext().SpanID().String()
traced = true
}
}
if tid == "" {
s.logger.Debug("zipper create new tid")
tid = id.TID()
}
if sid == "" {
s.logger.Debug("zipper create new sid")
sid = id.SID()
}
// reallocate metadata with new TID and SID
SetTIDToMetadata(c.FrameMetadata, tid)
SetSIDToMetadata(c.FrameMetadata, sid)
SetTracedToMetadata(c.FrameMetadata, traced || parentTraced)
md, err := c.FrameMetadata.Encode()
md, endFn := ZipperTraceMetadata(c.FrameMetadata, s.TracerProvider(), c.Logger)
defer endFn()

c.FrameMetadata = md

mdBytes, err := c.FrameMetadata.Encode()
if err != nil {
s.logger.Error("encode metadata error", "err", err)
c.Logger.Error("encode metadata error", "err", err)

Check warning on line 353 in core/server.go

View check run for this annotation

Codecov / codecov/patch

core/server.go#L353

Added line #L353 was not covered by tests
return err
}
dataFrame.Metadata = md
s.logger.Debug("zipper metadata", "tid", tid, "sid", sid, "parentTraced", parentTraced, "traced", traced, "frome_stream_name", from.Name())
dataFrame.Metadata = mdBytes

// route
route := s.router.Route(c.FrameMetadata)
if route == nil {
Expand All @@ -415,7 +380,7 @@
continue
}

c.Logger.Info("data routing", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", data_length, "to_id", toID, "to_name", stream.Name())
c.Logger.Info("data routing", "tag", dataFrame.Tag, "data_length", data_length, "to_id", toID, "to_name", stream.Name())

// write data frame to stream
if err := stream.WriteFrame(dataFrame); err != nil {
Expand Down Expand Up @@ -512,10 +477,6 @@
return
}

var (
tid = GetTIDFromMetadata(c.FrameMetadata)
sid = GetSIDFromMetadata(c.FrameMetadata)
)
mdBytes, err := c.FrameMetadata.Encode()
if err != nil {
c.Logger.Error("failed to dispatch to downstream", "err", err)
Expand All @@ -524,7 +485,7 @@
dataFrame.Metadata = mdBytes

for _, ds := range s.downstreams {
c.Logger.Info("dispatching to downstream", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "downstream_id", ds.ClientID())
c.Logger.Info("dispatching to downstream", "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "downstream_id", ds.ClientID())
_ = ds.WriteFrame(dataFrame)
}
}
Expand Down
Loading