Skip to content

Commit

Permalink
实现ws的链路追踪
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 16, 2024
1 parent 7a3e83c commit f2e270d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
17 changes: 14 additions & 3 deletions websocket/baseContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package websocket

import (
ctx "context"
"encoding/json"
"errors"
"fmt"
"github.com/farseer-go/fs/container"
"github.com/farseer-go/fs/exception"
"github.com/farseer-go/fs/fastReflect"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/fs/parse"
"github.com/farseer-go/fs/trace"
"github.com/farseer-go/webapi/context"
"github.com/timandy/routine"
Expand Down Expand Up @@ -57,7 +59,7 @@ func (receiver *BaseContext) ReceiverMessageFunc(d time.Duration, f func(message
func() {
var err error
// 创建链路追踪上下文
trackContext := container.Resolve[trace.IManager]().EntryWebSocket(receiver.HttpContext.URI.Host, receiver.HttpContext.URI.Url, receiver.HttpContext.ContentType, receiver.HttpContext.Header.ToMap(), receiver.HttpContext.URI.GetRealIp())
trackContext := container.Resolve[trace.IManager]().EntryWebSocket(receiver.HttpContext.URI.Host, receiver.HttpContext.URI.Url, receiver.HttpContext.Header.ToMap(), receiver.HttpContext.URI.GetRealIp())
defer func() {
trackContext.End(err)
}()
Expand Down Expand Up @@ -86,18 +88,27 @@ func (receiver *BaseContext) ReceiverMessageFunc(d time.Duration, f func(message
// Send 发送消息,如果msg不是go的基础类型,则会自动序列化成json
func (receiver *BaseContext) Send(msg any) error {
var err error
var message string
// 基础类型不需要进行序列化
if fastReflect.PointerOf(msg).Type == fastReflect.GoBasicType {
err = websocket.Message.Send(receiver.HttpContext.WebsocketConn, msg)
message = parse.ToString(message)
} else {
// 其余类型,一律使用json
err = websocket.JSON.Send(receiver.HttpContext.WebsocketConn, msg)
marshal, _ := json.Marshal(msg)
message = string(marshal)
}
err = websocket.Message.Send(receiver.HttpContext.WebsocketConn, message)

if err != nil {
receiver.errorIsClose(err)
flog.Warningf("路由:%s 发送数据时失败:%s", receiver.HttpContext.Route.RouteUrl, err.Error())
}

// 如果使用了链路追踪,则记录异常
if traceContext := trace.CurTraceContext.Get(); traceContext != nil {
traceContext.SetResponseBody(message)
traceContext.Error(err)
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion websocket/webSocketContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (receiver *Context[T]) ReceiverFunc(d time.Duration, f func(message *T)) {
func() {
var err error
// 创建链路追踪上下文
trackContext := container.Resolve[trace.IManager]().EntryWebSocket(receiver.HttpContext.URI.Host, receiver.HttpContext.URI.Url, receiver.HttpContext.ContentType, receiver.HttpContext.Header.ToMap(), receiver.HttpContext.URI.GetRealIp())
trackContext := container.Resolve[trace.IManager]().EntryWebSocket(receiver.HttpContext.URI.Host, receiver.HttpContext.URI.Url, receiver.HttpContext.Header.ToMap(), receiver.HttpContext.URI.GetRealIp())
defer func() {
trackContext.End(err)
}()
Expand Down

0 comments on commit f2e270d

Please sign in to comment.