From 82e17ff4e230cd57212dd5e2719e8a6374c5c7fe Mon Sep 17 00:00:00 2001 From: wurui Date: Fri, 20 Oct 2023 23:33:43 +0800 Subject: [PATCH] feat(log): better log outputs when dispatching to downstreams (#644) --- core/client.go | 72 +++++++++++++++++++----------------------- core/client_test.go | 48 +++++++++++++++------------- core/metadata.go | 15 --------- core/metadata_test.go | 27 ---------------- core/server.go | 24 ++++++++------ core/server_options.go | 3 +- sfn.go | 49 ++++++++++++++++++---------- source.go | 21 ++++++++---- zipper.go | 27 ++++++++++++++-- 9 files changed, 143 insertions(+), 143 deletions(-) diff --git a/core/client.go b/core/client.go index 4a673f78a..8d18cef3f 100644 --- a/core/client.go +++ b/core/client.go @@ -21,6 +21,7 @@ import ( // Client is the abstraction of a YoMo-Client. a YoMo-Client can be // Source, Upstream Zipper or StreamFunction. type Client struct { + zipperAddr string name string // name of the client clientID string // id of the client clientType ClientType // type of the client @@ -28,7 +29,7 @@ type Client struct { receiver func(*frame.BackflowFrame) // function to invoke when data is processed errorfn func(error) // function to invoke when error occured opts *clientOptions - logger *slog.Logger + Logger *slog.Logger tracerProvider oteltrace.TracerProvider // ctx and ctxCancel manage the lifecycle of client. @@ -39,7 +40,7 @@ type Client struct { } // NewClient creates a new YoMo-Client. -func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client { +func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client { option := defaultClientOption() for _, o := range opts { @@ -47,22 +48,19 @@ func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Cli } clientID := id.New() - logger := option.logger.With("component", clientType.String(), "client_id", clientID, "client_name", appName) - - if option.credential != nil { - logger.Info("use credential", "credential_name", option.credential.Name()) - } + logger := option.logger ctx, ctxCancel := context.WithCancelCause(context.Background()) return &Client{ + zipperAddr: zipperAddr, name: appName, clientID: clientID, processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") }, receiver: func(bf *frame.BackflowFrame) { logger.Warn("the receiver has not been set") }, clientType: clientType, opts: option, - logger: logger, + Logger: logger, tracerProvider: option.tracerProvider, errorfn: func(err error) { logger.Error("client err", "err", err) }, writeFrameChan: make(chan frame.Frame), @@ -85,8 +83,8 @@ func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connect } } -func (c *Client) connect(ctx context.Context, addr string) *connectResult { - conn, err := quic.DialAddr(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig) +func (c *Client) connect(ctx context.Context) *connectResult { + conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig) if err != nil { return newConnectResult(conn, nil, err) } @@ -98,6 +96,10 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult { fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter()) + if credential := c.opts.credential; credential != nil { + c.Logger.Info("use credential", "credential_name", credential.Name()) + } + hf := &frame.HandshakeFrame{ Name: c.name, ID: c.clientID, @@ -130,7 +132,7 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult { } } -func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Connection, fs *FrameStream) { +func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) { reconnection := make(chan struct{}) go c.handleReadFrames(fs, reconnection) @@ -149,12 +151,12 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne } case <-reconnection: reconnect: - cr := c.connect(ctx, addr) + cr := c.connect(ctx) if err := cr.err; err != nil { if errors.As(err, new(ErrAuthenticateFailed)) { return } - c.logger.Error("reconnect to zipper error", "err", cr.err) + c.Logger.Error("reconnect to zipper error", "err", cr.err) time.Sleep(time.Second) goto reconnect } @@ -165,27 +167,21 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne } // Connect connect client to server. -func (c *Client) Connect(ctx context.Context, addr string) error { - if c.clientType == ClientTypeStreamFunction && len(c.opts.observeDataTags) == 0 { - return errors.New("yomo: streamFunction cannot observe data because the required tag has not been set") - } - - c.logger = c.logger.With("zipper_addr", addr) - +func (c *Client) Connect(ctx context.Context) error { connect: - result := c.connect(ctx, addr) + result := c.connect(ctx) if result.err != nil { if c.opts.connectUntilSucceed { - c.logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err) + c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err) time.Sleep(time.Second) goto connect } - c.logger.Error("can not connect to zipper", "err", result.err) + c.Logger.Error("can not connect to zipper", "err", result.err) return result.err } - c.logger.Info("connected to zipper") + c.Logger.Info("connected to zipper") - go c.runBackground(ctx, addr, result.conn, result.fs) + go c.runBackground(ctx, result.conn, result.fs) return nil } @@ -217,7 +213,7 @@ func (c *Client) nonBlockWriteFrame(f frame.Frame) error { return nil default: err := errors.New("yomo: client has lost connection") - c.logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err) + c.Logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err) return err } } @@ -275,7 +271,7 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) { buf = buf[:runtime.Stack(buf, false)] perr := fmt.Errorf("%v", e) - c.logger.Error("stream panic", "err", perr) + c.Logger.Error("stream panic", "err", perr) c.errorfn(fmt.Errorf("yomo: stream panic: %v\n%s", perr, buf)) } }() @@ -287,14 +283,14 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) { func (c *Client) handleFrame(f frame.Frame) { switch ff := f.(type) { case *frame.RejectedFrame: - c.logger.Error("rejected error", "err", ff.Message) + c.Logger.Error("rejected error", "err", ff.Message) _ = c.Close() case *frame.DataFrame: c.processor(ff) case *frame.BackflowFrame: c.receiver(ff) default: - c.logger.Warn("received unexpected frame", "frame_type", f.Type().String()) + c.Logger.Warn("received unexpected frame", "frame_type", f.Type().String()) } } @@ -313,15 +309,10 @@ func (c *Client) SetObserveDataTags(tag ...frame.Tag) { c.opts.observeDataTags = tag } -// Logger get client's logger instance, you can customize this using `yomo.WithLogger` -func (c *Client) Logger() *slog.Logger { - return c.logger -} - // SetErrorHandler set error handler func (c *Client) SetErrorHandler(fn func(err error)) { c.errorfn = fn - c.logger.Debug("the error handler has been set") + c.Logger.Debug("the error handler has been set") } // ClientID returns the ID of client. @@ -330,13 +321,14 @@ func (c *Client) ClientID() string { return c.clientID } // Name returns the name of client. func (c *Client) Name() string { return c.name } -// FrameWriterConnection represents a frame writer that can connect to an addr. -type FrameWriterConnection interface { +// Downstream represents a frame writer that can connect to an addr. +type Downstream interface { frame.Writer - ClientID() string - Name() string + ID() string + LocalName() string + RemoteName() string Close() error - Connect(context.Context, string) error + Connect(context.Context) error } // TracerProvider returns the tracer provider of client. diff --git a/core/client_test.go b/core/client_test.go index 7eee0c4ee..22b660cdf 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -16,7 +16,6 @@ import ( "github.com/yomorun/yomo/core/router" "github.com/yomorun/yomo/core/ylog" "github.com/yomorun/yomo/pkg/frame-codec/y3codec" - "github.com/yomorun/yomo/pkg/id" ) const testaddr = "127.0.0.1:19999" @@ -29,8 +28,8 @@ var discardingLogger = ylog.NewFromConfig(ylog.Config{Output: "/dev/null", Error func TestClientDialNothing(t *testing.T) { ctx := context.Background() - client := NewClient("source", ClientTypeSource, WithLogger(discardingLogger)) - err := client.Connect(ctx, testaddr) + client := NewClient("source", testaddr, ClientTypeSource, WithLogger(discardingLogger)) + err := client.Connect(ctx) qerr := net.ErrClosed assert.ErrorAs(t, err, &qerr, "dial must timeout") @@ -60,22 +59,23 @@ func TestFrameRoundTrip(t *testing.T) { server.SetBeforeHandlers(ht.beforeHandler) server.SetAfterHandlers(ht.afterHandler) - recorder := newFrameWriterRecorder("mockClient") - server.AddDownstreamServer("mockAddr", recorder) + recorder := newFrameWriterRecorder("mockID", "mockClientLocal", "mockClientRemote") + server.AddDownstreamServer(recorder) - assert.Equal(t, server.Downstreams()["mockAddr"], recorder.ClientID()) + assert.Equal(t, server.Downstreams()["mockID"], recorder.ID()) go func() { err := server.ListenAndServe(ctx, testaddr) fmt.Println(err) }() - illegalTokenSource := NewClient("source", ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger)) - err := illegalTokenSource.Connect(ctx, testaddr) + illegalTokenSource := NewClient("source", testaddr, ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger)) + err := illegalTokenSource.Connect(ctx) assert.Equal(t, "authentication failed: client credential name is token", err.Error()) source := NewClient( "source", + testaddr, ClientTypeSource, WithCredential("token:auth-token"), WithClientQuicConfig(DefalutQuicConfig), @@ -90,10 +90,10 @@ func TestFrameRoundTrip(t *testing.T) { assert.Equal(t, string(backflow), string(bf.Carriage)) }) - err = source.Connect(ctx, testaddr) + err = source.Connect(ctx) assert.NoError(t, err, "source connect must be success") - closeEarlySfn := createTestStreamFunction("close-early-sfn", observedTag) - closeEarlySfn.Connect(ctx, testaddr) + closeEarlySfn := createTestStreamFunction("close-early-sfn", testaddr, observedTag) + closeEarlySfn.Connect(ctx) assert.Equal(t, nil, err) // test close early. @@ -104,7 +104,7 @@ func TestFrameRoundTrip(t *testing.T) { assert.True(t, exited, "close-early-sfn should exited") // sfn to zipper. - sfn := createTestStreamFunction("sfn-1", observedTag) + sfn := createTestStreamFunction("sfn-1", testaddr, observedTag) sfn.SetDataFrameObserver(func(bf *frame.DataFrame) { assert.Equal(t, string(payload), string(bf.Payload)) @@ -123,7 +123,7 @@ func TestFrameRoundTrip(t *testing.T) { } }) - err = sfn.Connect(ctx, testaddr) + err = sfn.Connect(ctx) assert.NoError(t, err, "sfn connect should replace the old sfn stream") exited = checkClientExited(sfn, time.Second) @@ -211,9 +211,10 @@ func (a *hookTester) afterHandler(ctx *Context) error { return nil } -func createTestStreamFunction(name string, observedTag frame.Tag) *Client { +func createTestStreamFunction(name string, zipperAddr string, observedTag frame.Tag) *Client { sfn := NewClient( name, + zipperAddr, ClientTypeStreamFunction, WithCredential("token:auth-token"), WithLogger(discardingLogger), @@ -226,27 +227,30 @@ func createTestStreamFunction(name string, observedTag frame.Tag) *Client { // frameWriterRecorder frames be writen. type frameWriterRecorder struct { id string - name string + localName string + remoteName string codec frame.Codec packetReader frame.PacketReadWriter mu sync.Mutex buf *bytes.Buffer } -func newFrameWriterRecorder(name string) *frameWriterRecorder { +func newFrameWriterRecorder(id, localName, remoteName string) *frameWriterRecorder { return &frameWriterRecorder{ - id: id.New(), - name: name, + id: id, + localName: localName, + remoteName: remoteName, codec: y3codec.Codec(), packetReader: y3codec.PacketReadWriter(), buf: new(bytes.Buffer), } } -func (w *frameWriterRecorder) ClientID() string { return w.id } -func (w *frameWriterRecorder) Name() string { return w.name } -func (w *frameWriterRecorder) Close() error { return nil } -func (w *frameWriterRecorder) Connect(_ context.Context, _ string) error { return nil } +func (w *frameWriterRecorder) ID() string { return w.id } +func (w *frameWriterRecorder) LocalName() string { return w.localName } +func (w *frameWriterRecorder) RemoteName() string { return w.remoteName } +func (w *frameWriterRecorder) Close() error { return nil } +func (w *frameWriterRecorder) Connect(_ context.Context) error { return nil } func (w *frameWriterRecorder) WriteFrame(f frame.Frame) error { w.mu.Lock() diff --git a/core/metadata.go b/core/metadata.go index a60b6a958..ce99886f0 100644 --- a/core/metadata.go +++ b/core/metadata.go @@ -2,7 +2,6 @@ package core import ( "github.com/yomorun/yomo/core/metadata" - "golang.org/x/exp/slog" ) const ( @@ -68,17 +67,3 @@ func SetTracedToMetadata(m metadata.M, traced bool) { } m.Set(MetaTraced, tracedString) } - -// MetadataSlogAttr returns slog.Attr from metadata. -func MetadataSlogAttr(md metadata.M) slog.Attr { - kvStrings := make([]any, len(md)*2) - i := 0 - for k, v := range md { - kvStrings[i] = k - i++ - kvStrings[i] = v - i++ - } - - return slog.Group("metadata", kvStrings...) -} diff --git a/core/metadata_test.go b/core/metadata_test.go index c39f93dcb..9037e4891 100644 --- a/core/metadata_test.go +++ b/core/metadata_test.go @@ -1,12 +1,9 @@ package core import ( - "bytes" "testing" "github.com/stretchr/testify/assert" - "github.com/yomorun/yomo/core/metadata" - "golang.org/x/exp/slog" ) func TestMetadata(t *testing.T) { @@ -26,27 +23,3 @@ func TestMetadata(t *testing.T) { SetTracedToMetadata(md, false) assert.Equal(t, false, GetTracedFromMetadata(md)) } - -func TestMetadataSlogAttr(t *testing.T) { - md := metadata.New(map[string]string{ - "aaaa": "bbbb", - }) - - buf := bytes.NewBuffer(nil) - - logger := slog.New(slog.NewTextHandler(buf, &slog.HandlerOptions{ - AddSource: false, - Level: slog.LevelDebug, - // display time attr. - ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { - if a.Key == "time" { - return slog.Attr{} - } - return a - }, - })) - - logger.Debug("test metadata", MetadataSlogAttr(md)) - - assert.Equal(t, "level=DEBUG msg=\"test metadata\" metadata.aaaa=bbbb\n", buf.String()) -} diff --git a/core/server.go b/core/server.go index 9b28b57ba..d34b1f0a0 100644 --- a/core/server.go +++ b/core/server.go @@ -45,7 +45,7 @@ type Server struct { codec frame.Codec packetReadWriter frame.PacketReadWriter counterOfDataFrame int64 - downstreams map[string]FrameWriterConnection + downstreams map[string]Downstream mu sync.Mutex opts *serverOptions startHandlers []FrameHandler @@ -73,7 +73,7 @@ func NewServer(name string, opts ...ServerOption) *Server { ctx: ctx, ctxCancel: ctxCancel, name: name, - downstreams: make(map[string]FrameWriterConnection), + downstreams: make(map[string]Downstream), logger: logger, tracerProvider: options.tracerProvider, codec: y3codec.Codec(), @@ -96,8 +96,8 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { } // connect to all downstreams. - for addr, client := range s.downstreams { - go client.Connect(ctx, addr) + for _, client := range s.downstreams { + go client.Connect(ctx) } return s.Serve(ctx, conn) @@ -149,7 +149,7 @@ func (s *Server) handleConnection(qconn quic.Connection, fs *FrameStream, logger } logger = logger.With("conn_id", conn.ID(), "conn_name", conn.Name()) - logger.Info("client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String()) + logger.Info("new client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String()) c := newContext(conn, route, logger) @@ -300,7 +300,7 @@ func (s *Server) Close() error { return nil } -func closeServer(downstreams map[string]FrameWriterConnection, connector *Connector, listener *quic.Listener, router router.Router) error { +func closeServer(downstreams map[string]Downstream, connector *Connector, listener *quic.Listener, router router.Router) error { for _, ds := range downstreams { ds.Close() } @@ -482,7 +482,7 @@ func (s *Server) Downstreams() map[string]string { snapshotOfDownstream := make(map[string]string, len(s.downstreams)) for addr, client := range s.downstreams { - snapshotOfDownstream[addr] = client.ClientID() + snapshotOfDownstream[addr] = client.ID() } return snapshotOfDownstream } @@ -497,9 +497,9 @@ func (s *Server) ConfigRouter(router router.Router) { // AddDownstreamServer add a downstream server to this server. all the DataFrames will be // dispatch to all the downstreams. -func (s *Server) AddDownstreamServer(addr string, c FrameWriterConnection) { +func (s *Server) AddDownstreamServer(c Downstream) { s.mu.Lock() - s.downstreams[addr] = c + s.downstreams[c.ID()] = c s.mu.Unlock() } @@ -524,7 +524,11 @@ func (s *Server) dispatchToDownstreams(c *Context) { dataFrame.Metadata = mdBytes for _, ds := range s.downstreams { - c.Logger.Info("dispatching to downstream", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "downstream_id", ds.ClientID()) + c.Logger.Info( + "dispatching to downstream", + "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), + "downstream_id", ds.ID(), "downstream_name", ds.LocalName(), + ) _ = ds.WriteFrame(dataFrame) } } diff --git a/core/server_options.go b/core/server_options.go index 63e570455..18d565f27 100644 --- a/core/server_options.go +++ b/core/server_options.go @@ -27,8 +27,7 @@ var DefalutQuicConfig = &quic.Config{ // ServerOption is the option for server. type ServerOption func(*serverOptions) -// ServerOptions are the options for YoMo server. -// TODO: quic alpn function. +// serverOptions are the options for YoMo server. type serverOptions struct { quicConfig *quic.Config tlsConfig *tls.Config diff --git a/sfn.go b/sfn.go index 26b35e1b8..364ef38b4 100644 --- a/sfn.go +++ b/sfn.go @@ -2,6 +2,7 @@ package yomo import ( "context" + "errors" "github.com/yomorun/yomo/core" "github.com/yomorun/yomo/core/frame" @@ -39,7 +40,16 @@ func NewStreamFunction(name, zipperAddr string, opts ...SfnOption) StreamFunctio for k, v := range opts { clientOpts[k] = core.ClientOption(v) } - client := core.NewClient(name, core.ClientTypeStreamFunction, clientOpts...) + + client := core.NewClient(name, zipperAddr, core.ClientTypeStreamFunction, clientOpts...) + + client.Logger = client.Logger.With( + "component", core.ClientTypeStreamFunction.String(), + "sfn_id", client.ClientID(), + "sfn_name", client.Name(), + "zipper_addr", zipperAddr, + ) + sfn := &streamFunction{ name: name, zipperAddr: zipperAddr, @@ -67,30 +77,35 @@ type streamFunction struct { // SetObserveDataTags set the data tag list that will be observed. // Deprecated: use yomo.WithObserveDataTags instead func (s *streamFunction) SetObserveDataTags(tag ...uint32) { + s.observeDataTags = tag s.client.SetObserveDataTags(tag...) - s.client.Logger().Debug("set sfn observe data tasg", "tags", s.observeDataTags) + s.client.Logger.Debug("set sfn observe data tasg", "tags", s.observeDataTags) } // SetHandler set the handler function, which accept the raw bytes data and return the tag & response. func (s *streamFunction) SetHandler(fn core.AsyncHandler) error { s.fn = fn - s.client.Logger().Debug("set async handler") + s.client.Logger.Debug("set async handler") return nil } func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error { s.pfn = fn - s.client.Logger().Debug("set pipe handler") + s.client.Logger.Debug("set pipe handler") return nil } // Connect create a connection to the zipper, when data arrvied, the data will be passed to the // handler which setted by SetHandler method. func (s *streamFunction) Connect() error { - s.client.Logger().Debug("sfn connecting to zipper ...") + if len(s.observeDataTags) == 0 { + return errors.New("streamFunction cannot observe data because the required tag has not been set") + } + + s.client.Logger.Debug("sfn connecting to zipper ...") // notify underlying network operations, when data with tag we observed arrived, invoke the func s.client.SetDataFrameObserver(func(data *frame.DataFrame) { - s.client.Logger().Debug("received data frame") + s.client.Logger.Debug("received data frame") s.onDataFrame(data) }) @@ -108,19 +123,19 @@ func (s *streamFunction) Connect() error { for { data := <-s.pOut if data != nil { - s.client.Logger().Debug("pipe fn send", "payload_frame", data) + s.client.Logger.Debug("pipe fn send", "payload_frame", data) md, err := metadata.Decode(data.Metadata) if err != nil { - s.client.Logger().Error("sfn decode metadata error", "err", err) + s.client.Logger.Error("sfn decode metadata error", "err", err) break } - newMd, deferFunc := ExtendTraceMetadata(md, s.client.ClientID(), s.client.Name(), s.client.TracerProvider(), s.client.Logger()) + newMd, deferFunc := ExtendTraceMetadata(md, s.client.ClientID(), s.client.Name(), s.client.TracerProvider(), s.client.Logger) defer deferFunc() newMetadata, err := newMd.Encode() if err != nil { - s.client.Logger().Error("sfn encode metadata error", "err", err) + s.client.Logger.Error("sfn encode metadata error", "err", err) break } data.Metadata = newMetadata @@ -137,7 +152,7 @@ func (s *streamFunction) Connect() error { }() } - err := s.client.Connect(context.Background(), s.zipperAddr) + err := s.client.Connect(context.Background()) return err } @@ -153,7 +168,7 @@ func (s *streamFunction) Close() error { if s.client != nil { if err := s.client.Close(); err != nil { - s.client.Logger().Error("failed to close sfn", "err", err) + s.client.Logger.Error("failed to close sfn", "err", err) return err } } @@ -174,16 +189,16 @@ func (s *streamFunction) onDataFrame(dataFrame *frame.DataFrame) { go func(tp oteltrace.TracerProvider, dataFrame *frame.DataFrame) { md, err := metadata.Decode(dataFrame.Metadata) if err != nil { - s.client.Logger().Error("sfn decode metadata error", "err", err) + s.client.Logger.Error("sfn decode metadata error", "err", err) return } - newMd, deferFunc := ExtendTraceMetadata(md, s.client.ClientID(), s.client.Name(), s.client.TracerProvider(), s.client.Logger()) + newMd, deferFunc := ExtendTraceMetadata(md, s.client.ClientID(), s.client.Name(), s.client.TracerProvider(), s.client.Logger) defer deferFunc() newMetadata, err := newMd.Encode() if err != nil { - s.client.Logger().Error("sfn encode metadata error", "err", err) + s.client.Logger.Error("sfn encode metadata error", "err", err) return } dataFrame.Metadata = newMetadata @@ -193,10 +208,10 @@ func (s *streamFunction) onDataFrame(dataFrame *frame.DataFrame) { }(tp, dataFrame) } else if s.pfn != nil { data := dataFrame.Payload - s.client.Logger().Debug("pipe sfn receive", "data_len", len(data), "data", data) + s.client.Logger.Debug("pipe sfn receive", "data_len", len(data), "data", data) s.pIn <- data } else { - s.client.Logger().Warn("sfn does not have a handler") + s.client.Logger.Warn("sfn does not have a handler") } } diff --git a/source.go b/source.go index 72e8d3b19..61718f807 100644 --- a/source.go +++ b/source.go @@ -43,7 +43,14 @@ func NewSource(name, zipperAddr string, opts ...SourceOption) Source { clientOpts[k] = core.ClientOption(v) } - client := core.NewClient(name, core.ClientTypeSource, clientOpts...) + client := core.NewClient(name, zipperAddr, core.ClientTypeSource, clientOpts...) + + client.Logger = client.Logger.With( + "component", core.ClientTypeSource.String(), + "source_id", client.ClientID(), + "source_name", client.Name(), + "zipper_addr", zipperAddr, + ) return &yomoSource{ name: name, @@ -55,10 +62,10 @@ func NewSource(name, zipperAddr string, opts ...SourceOption) Source { // Close will close the connection to YoMo-Zipper. func (s *yomoSource) Close() error { if err := s.client.Close(); err != nil { - s.client.Logger().Error("failed to close the source", "err", err) + s.client.Logger.Error("failed to close the source", "err", err) return err } - s.client.Logger().Debug("the source is closed") + s.client.Logger.Debug("the source is closed") return nil } @@ -71,13 +78,13 @@ func (s *yomoSource) Connect() error { } }) - err := s.client.Connect(context.Background(), s.zipperAddr) + err := s.client.Connect(context.Background()) return err } // Write writes data with specified tag. func (s *yomoSource) Write(tag uint32, data []byte) error { - md, deferFunc := TraceMetadata(s.client.ClientID(), s.name, s.client.TracerProvider(), s.client.Logger()) + md, deferFunc := TraceMetadata(s.client.ClientID(), s.name, s.client.TracerProvider(), s.client.Logger) defer deferFunc() mdBytes, err := md.Encode() @@ -90,7 +97,7 @@ func (s *yomoSource) Write(tag uint32, data []byte) error { Metadata: mdBytes, Payload: data, } - s.client.Logger().Debug("source write", "tag", tag, "data", data) + s.client.Logger.Debug("source write", "tag", tag, "data", data) return s.client.WriteFrame(f) } @@ -102,7 +109,7 @@ func (s *yomoSource) SetErrorHandler(fn func(err error)) { // [Experimental] SetReceiveHandler set the observe handler function func (s *yomoSource) SetReceiveHandler(fn func(uint32, []byte)) { s.fn = fn - s.client.Logger().Info("receive hander set for the source") + s.client.Logger.Info("receive hander set for the source") } // TraceMetadata generates source trace metadata. diff --git a/zipper.go b/zipper.go index 823fc85d1..3b3cfd3c8 100644 --- a/zipper.go +++ b/zipper.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/yomorun/yomo/core" + "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/router" "github.com/yomorun/yomo/pkg/config" "golang.org/x/exp/slog" @@ -65,16 +66,24 @@ func NewZipper(name string, meshConfig map[string]config.Downstream, options ... for downstreamName, meshConf := range meshConfig { addr := fmt.Sprintf("%s:%d", meshConf.Host, meshConf.Port) + dsLogger := server.Logger().With("downstream_name", downstreamName, "downstream_addr", addr) + clientOptions := append( opts.clientOption, core.WithCredential(meshConf.Credential), core.WithNonBlockWrite(), core.WithConnectUntilSucceed(), + core.WithLogger(dsLogger), ) - downstream := core.NewClient(name, core.ClientTypeUpstreamZipper, clientOptions...) - server.Logger().Info("add downstream", "name", downstreamName, "addr", addr, "downstream_id", downstream.ClientID()) - server.AddDownstreamServer(addr, downstream) + downstream := &downstream{ + localName: downstreamName, + client: core.NewClient(name, addr, core.ClientTypeUpstreamZipper, clientOptions...), + } + + server.Logger().Info("add downstream", "downstream_id", downstream.ID(), "downstream_name", downstream.LocalName(), "downstream_addr", addr) + + server.AddDownstreamServer(downstream) } server.ConfigRouter(router.Default()) @@ -96,3 +105,15 @@ func statsToLogger(server *core.Server) { "data_frame_received_num", server.StatsCounter(), ) } + +type downstream struct { + localName string + client *core.Client +} + +func (d *downstream) Close() error { return d.client.Close() } +func (d *downstream) Connect(ctx context.Context) error { return d.client.Connect(ctx) } +func (d *downstream) ID() string { return d.client.ClientID() } +func (d *downstream) LocalName() string { return d.localName } +func (d *downstream) RemoteName() string { return d.client.Name() } +func (d *downstream) WriteFrame(f frame.Frame) error { return d.client.WriteFrame(f) }