Skip to content

Commit

Permalink
feat(log): better log outputs when dispatching to downstreams (#644)
Browse files Browse the repository at this point in the history
  • Loading branch information
woorui authored Oct 20, 2023
1 parent cf56978 commit 82e17ff
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 143 deletions.
72 changes: 32 additions & 40 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
zipperAddr string
name string // name of the client
clientID string // id of the client
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackflowFrame) // function to invoke when data is processed
errorfn func(error) // function to invoke when error occured
opts *clientOptions
logger *slog.Logger
Logger *slog.Logger
tracerProvider oteltrace.TracerProvider

// ctx and ctxCancel manage the lifecycle of client.
Expand All @@ -39,30 +40,27 @@ type Client struct {
}

// NewClient creates a new YoMo-Client.
func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client {
func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client {
option := defaultClientOption()

for _, o := range opts {
o(option)
}
clientID := id.New()

logger := option.logger.With("component", clientType.String(), "client_id", clientID, "client_name", appName)

if option.credential != nil {
logger.Info("use credential", "credential_name", option.credential.Name())
}
logger := option.logger

ctx, ctxCancel := context.WithCancelCause(context.Background())

return &Client{
zipperAddr: zipperAddr,
name: appName,
clientID: clientID,
processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") },
receiver: func(bf *frame.BackflowFrame) { logger.Warn("the receiver has not been set") },
clientType: clientType,
opts: option,
logger: logger,
Logger: logger,
tracerProvider: option.tracerProvider,
errorfn: func(err error) { logger.Error("client err", "err", err) },
writeFrameChan: make(chan frame.Frame),
Expand All @@ -85,8 +83,8 @@ func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connect
}
}

func (c *Client) connect(ctx context.Context, addr string) *connectResult {
conn, err := quic.DialAddr(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
func (c *Client) connect(ctx context.Context) *connectResult {
conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}
Expand All @@ -98,6 +96,10 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

if credential := c.opts.credential; credential != nil {
c.Logger.Info("use credential", "credential_name", credential.Name())
}

hf := &frame.HandshakeFrame{
Name: c.name,
ID: c.clientID,
Expand Down Expand Up @@ -130,7 +132,7 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {
}
}

func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Connection, fs *FrameStream) {
func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) {
reconnection := make(chan struct{})

go c.handleReadFrames(fs, reconnection)
Expand All @@ -149,12 +151,12 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}
case <-reconnection:
reconnect:
cr := c.connect(ctx, addr)
cr := c.connect(ctx)
if err := cr.err; err != nil {
if errors.As(err, new(ErrAuthenticateFailed)) {
return
}
c.logger.Error("reconnect to zipper error", "err", cr.err)
c.Logger.Error("reconnect to zipper error", "err", cr.err)
time.Sleep(time.Second)
goto reconnect
}
Expand All @@ -165,27 +167,21 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context, addr string) error {
if c.clientType == ClientTypeStreamFunction && len(c.opts.observeDataTags) == 0 {
return errors.New("yomo: streamFunction cannot observe data because the required tag has not been set")
}

c.logger = c.logger.With("zipper_addr", addr)

func (c *Client) Connect(ctx context.Context) error {
connect:
result := c.connect(ctx, addr)
result := c.connect(ctx)
if result.err != nil {
if c.opts.connectUntilSucceed {
c.logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
time.Sleep(time.Second)
goto connect
}
c.logger.Error("can not connect to zipper", "err", result.err)
c.Logger.Error("can not connect to zipper", "err", result.err)
return result.err
}
c.logger.Info("connected to zipper")
c.Logger.Info("connected to zipper")

go c.runBackground(ctx, addr, result.conn, result.fs)
go c.runBackground(ctx, result.conn, result.fs)

return nil
}
Expand Down Expand Up @@ -217,7 +213,7 @@ func (c *Client) nonBlockWriteFrame(f frame.Frame) error {
return nil
default:
err := errors.New("yomo: client has lost connection")
c.logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
c.Logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
return err
}
}
Expand Down Expand Up @@ -275,7 +271,7 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
buf = buf[:runtime.Stack(buf, false)]

perr := fmt.Errorf("%v", e)
c.logger.Error("stream panic", "err", perr)
c.Logger.Error("stream panic", "err", perr)
c.errorfn(fmt.Errorf("yomo: stream panic: %v\n%s", perr, buf))
}
}()
Expand All @@ -287,14 +283,14 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
func (c *Client) handleFrame(f frame.Frame) {
switch ff := f.(type) {
case *frame.RejectedFrame:
c.logger.Error("rejected error", "err", ff.Message)
c.Logger.Error("rejected error", "err", ff.Message)
_ = c.Close()
case *frame.DataFrame:
c.processor(ff)
case *frame.BackflowFrame:
c.receiver(ff)
default:
c.logger.Warn("received unexpected frame", "frame_type", f.Type().String())
c.Logger.Warn("received unexpected frame", "frame_type", f.Type().String())
}
}

Expand All @@ -313,15 +309,10 @@ func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.observeDataTags = tag
}

// Logger get client's logger instance, you can customize this using `yomo.WithLogger`
func (c *Client) Logger() *slog.Logger {
return c.logger
}

// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
c.errorfn = fn
c.logger.Debug("the error handler has been set")
c.Logger.Debug("the error handler has been set")
}

// ClientID returns the ID of client.
Expand All @@ -330,13 +321,14 @@ func (c *Client) ClientID() string { return c.clientID }
// Name returns the name of client.
func (c *Client) Name() string { return c.name }

// FrameWriterConnection represents a frame writer that can connect to an addr.
type FrameWriterConnection interface {
// Downstream represents a frame writer that can connect to an addr.
type Downstream interface {
frame.Writer
ClientID() string
Name() string
ID() string
LocalName() string
RemoteName() string
Close() error
Connect(context.Context, string) error
Connect(context.Context) error
}

// TracerProvider returns the tracer provider of client.
Expand Down
48 changes: 26 additions & 22 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/core/ylog"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
)

const testaddr = "127.0.0.1:19999"
Expand All @@ -29,8 +28,8 @@ var discardingLogger = ylog.NewFromConfig(ylog.Config{Output: "/dev/null", Error
func TestClientDialNothing(t *testing.T) {
ctx := context.Background()

client := NewClient("source", ClientTypeSource, WithLogger(discardingLogger))
err := client.Connect(ctx, testaddr)
client := NewClient("source", testaddr, ClientTypeSource, WithLogger(discardingLogger))
err := client.Connect(ctx)

qerr := net.ErrClosed
assert.ErrorAs(t, err, &qerr, "dial must timeout")
Expand Down Expand Up @@ -60,22 +59,23 @@ func TestFrameRoundTrip(t *testing.T) {
server.SetBeforeHandlers(ht.beforeHandler)
server.SetAfterHandlers(ht.afterHandler)

recorder := newFrameWriterRecorder("mockClient")
server.AddDownstreamServer("mockAddr", recorder)
recorder := newFrameWriterRecorder("mockID", "mockClientLocal", "mockClientRemote")
server.AddDownstreamServer(recorder)

assert.Equal(t, server.Downstreams()["mockAddr"], recorder.ClientID())
assert.Equal(t, server.Downstreams()["mockID"], recorder.ID())

go func() {
err := server.ListenAndServe(ctx, testaddr)
fmt.Println(err)
}()

illegalTokenSource := NewClient("source", ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger))
err := illegalTokenSource.Connect(ctx, testaddr)
illegalTokenSource := NewClient("source", testaddr, ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger))
err := illegalTokenSource.Connect(ctx)
assert.Equal(t, "authentication failed: client credential name is token", err.Error())

source := NewClient(
"source",
testaddr,
ClientTypeSource,
WithCredential("token:auth-token"),
WithClientQuicConfig(DefalutQuicConfig),
Expand All @@ -90,10 +90,10 @@ func TestFrameRoundTrip(t *testing.T) {
assert.Equal(t, string(backflow), string(bf.Carriage))
})

err = source.Connect(ctx, testaddr)
err = source.Connect(ctx)
assert.NoError(t, err, "source connect must be success")
closeEarlySfn := createTestStreamFunction("close-early-sfn", observedTag)
closeEarlySfn.Connect(ctx, testaddr)
closeEarlySfn := createTestStreamFunction("close-early-sfn", testaddr, observedTag)
closeEarlySfn.Connect(ctx)
assert.Equal(t, nil, err)

// test close early.
Expand All @@ -104,7 +104,7 @@ func TestFrameRoundTrip(t *testing.T) {
assert.True(t, exited, "close-early-sfn should exited")

// sfn to zipper.
sfn := createTestStreamFunction("sfn-1", observedTag)
sfn := createTestStreamFunction("sfn-1", testaddr, observedTag)
sfn.SetDataFrameObserver(func(bf *frame.DataFrame) {
assert.Equal(t, string(payload), string(bf.Payload))

Expand All @@ -123,7 +123,7 @@ func TestFrameRoundTrip(t *testing.T) {
}
})

err = sfn.Connect(ctx, testaddr)
err = sfn.Connect(ctx)
assert.NoError(t, err, "sfn connect should replace the old sfn stream")

exited = checkClientExited(sfn, time.Second)
Expand Down Expand Up @@ -211,9 +211,10 @@ func (a *hookTester) afterHandler(ctx *Context) error {
return nil
}

func createTestStreamFunction(name string, observedTag frame.Tag) *Client {
func createTestStreamFunction(name string, zipperAddr string, observedTag frame.Tag) *Client {
sfn := NewClient(
name,
zipperAddr,
ClientTypeStreamFunction,
WithCredential("token:auth-token"),
WithLogger(discardingLogger),
Expand All @@ -226,27 +227,30 @@ func createTestStreamFunction(name string, observedTag frame.Tag) *Client {
// frameWriterRecorder frames be writen.
type frameWriterRecorder struct {
id string
name string
localName string
remoteName string
codec frame.Codec
packetReader frame.PacketReadWriter
mu sync.Mutex
buf *bytes.Buffer
}

func newFrameWriterRecorder(name string) *frameWriterRecorder {
func newFrameWriterRecorder(id, localName, remoteName string) *frameWriterRecorder {
return &frameWriterRecorder{
id: id.New(),
name: name,
id: id,
localName: localName,
remoteName: remoteName,
codec: y3codec.Codec(),
packetReader: y3codec.PacketReadWriter(),
buf: new(bytes.Buffer),
}
}

func (w *frameWriterRecorder) ClientID() string { return w.id }
func (w *frameWriterRecorder) Name() string { return w.name }
func (w *frameWriterRecorder) Close() error { return nil }
func (w *frameWriterRecorder) Connect(_ context.Context, _ string) error { return nil }
func (w *frameWriterRecorder) ID() string { return w.id }
func (w *frameWriterRecorder) LocalName() string { return w.localName }
func (w *frameWriterRecorder) RemoteName() string { return w.remoteName }
func (w *frameWriterRecorder) Close() error { return nil }
func (w *frameWriterRecorder) Connect(_ context.Context) error { return nil }

func (w *frameWriterRecorder) WriteFrame(f frame.Frame) error {
w.mu.Lock()
Expand Down
15 changes: 0 additions & 15 deletions core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"github.com/yomorun/yomo/core/metadata"
"golang.org/x/exp/slog"
)

const (
Expand Down Expand Up @@ -68,17 +67,3 @@ func SetTracedToMetadata(m metadata.M, traced bool) {
}
m.Set(MetaTraced, tracedString)
}

// MetadataSlogAttr returns slog.Attr from metadata.
func MetadataSlogAttr(md metadata.M) slog.Attr {
kvStrings := make([]any, len(md)*2)
i := 0
for k, v := range md {
kvStrings[i] = k
i++
kvStrings[i] = v
i++
}

return slog.Group("metadata", kvStrings...)
}
Loading

1 comment on commit 82e17ff

@vercel
Copy link

@vercel vercel bot commented on 82e17ff Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

yomo – ./

yomo.run
yomo-yomorun.vercel.app
yomo.vercel.app
www.yomo.run
yomo-git-master-yomorun.vercel.app

Please sign in to comment.