Skip to content

Commit

Permalink
refactor: separate the logic of distributed tracing (#736)
Browse files Browse the repository at this point in the history
# Description

Omit the `WithTracerProvider()` options as manual injection of the
tracer provider in the code is unnecessary. The tracing functionality
will be activated upon setting the 'OTEL_EXPORTER_OTLP_ENDPOINT'
environment variable.

# Changes

Before:

```golang
// create trace provider manully
tp, shutdown, e := trace.NewTracerProvider("yomo-sfn")
if e == nil {
  log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())

sfn := yomo.NewStreamFunction(
  name,
  addr,
  yomo.WithCredential(credential),
  yomo.WithTracerProvider(tp),  // add trace provider to opts
)
```

After:

```golang
sfn := yomo.NewStreamFunction(
  name,
  addr,
  yomo.WithCredential(credential),
)
```
run: 

```sh
OTEL_EXPORTER_OTLP_ENDPOINT=https://opentracing.acme.com:7234 yomo run sfn.go
```

# TODO

update related docs.
  • Loading branch information
woorui authored Mar 5, 2024
1 parent 7e9636a commit b5317dc
Show file tree
Hide file tree
Showing 29 changed files with 254 additions and 476 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ jobs:
run: go build $(go list ./... | grep -v /example)

- name: Upload coverage
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v3
with:
file: coverage.txt
9 changes: 1 addition & 8 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/yomorun/yomo/core/router"
pkgconfig "github.com/yomorun/yomo/pkg/config"
"github.com/yomorun/yomo/pkg/log"
"github.com/yomorun/yomo/pkg/trace"

"github.com/yomorun/yomo/pkg/bridge/ai"
"github.com/yomorun/yomo/pkg/bridge/ai/provider/azopenai"
Expand All @@ -51,16 +50,10 @@ var serveCmd = &cobra.Command{
return
}
ctx := context.Background()
// trace
tp, shutdown, err := trace.NewTracerProvider("yomo-zipper")
if err == nil {
log.InfoStatusEvent(os.Stdout, "[zipper] 🛰 trace enabled")
}
defer shutdown(ctx)
// listening address.
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)

options := []yomo.ZipperOption{yomo.WithZipperTracerProvider(tp)}
options := []yomo.ZipperOption{}
tokenString := ""
if _, ok := conf.Auth["type"]; ok {
if tokenString, ok = conf.Auth["token"]; ok {
Expand Down
11 changes: 1 addition & 10 deletions cli/serverless/deno/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package deno

import (
"context"
"encoding/binary"
"errors"
"io"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/pkg/file"
"github.com/yomorun/yomo/pkg/trace"
"github.com/yomorun/yomo/serverless"
)

Expand Down Expand Up @@ -90,17 +88,10 @@ func runDeno(jsPath string, socketPath string, errCh chan<- error) {
}

func startSfn(name string, zipperAddr string, credential string, observed []frame.Tag, conn net.Conn, errCh chan<- error) (yomo.StreamFunction, error) {
// trace
tp, shutdown, err := trace.NewTracerProvider("yomo-sfn")
if err == nil {
log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())
sfn := yomo.NewStreamFunction(
name,
zipperAddr,
yomo.WithSfnCredential(credential),
yomo.WithSfnTracerProvider(tp),
)

// init
Expand Down Expand Up @@ -168,7 +159,7 @@ func startSfn(name string, zipperAddr string, credential string, observed []fram
},
)

err = sfn.Connect()
err := sfn.Connect()
if err != nil {
return nil, err
}
Expand Down
7 changes: 0 additions & 7 deletions cli/serverless/golang/templates/main.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@ func main() {
}

func runSFN(name string, addr string, credential string) (closeFn func() error, err error) {
// trace
tp, shutdown, e := trace.NewTracerProvider("yomo-sfn")
if e == nil {
log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())
sfn := yomo.NewStreamFunction(
name,
addr,
yomo.WithCredential(credential),
yomo.WithTracerProvider(tp),
)
closeFn = sfn.Close

Expand Down
6 changes: 0 additions & 6 deletions cli/serverless/golang/templates/main_rx.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ func main() {
}

func runSFN(name string, addr string, credential string) (closeFn func() error, err error) {
// trace
tp, shutdown, e := trace.NewTracerProvider("yomo-sfn")
if e == nil {
log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())
sfn := yomo.NewStreamFunction(
name,
addr,
yomo.WithCredential(credential),
yomo.WithTracerProvider(tp),
)
closeFn = sfn.Close

Expand Down
12 changes: 1 addition & 11 deletions cli/serverless/wasm/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
package wasm

import (
"context"
"log"
"os"
"sync"

"github.com/yomorun/yomo"
cli "github.com/yomorun/yomo/cli/serverless"
pkglog "github.com/yomorun/yomo/pkg/log"
"github.com/yomorun/yomo/pkg/trace"
"github.com/yomorun/yomo/serverless"
)

Expand Down Expand Up @@ -53,21 +51,13 @@ func (s *wasmServerless) Build(clean bool) error {

// Run the wasm serverless function
func (s *wasmServerless) Run(verbose bool) error {
// trace
tp, shutdown, err := trace.NewTracerProvider("yomo-sfn")
if err == nil {
pkglog.InfoStatusEvent(os.Stdout, "[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())

sfn := yomo.NewStreamFunction(
s.name,
s.zipperAddr,
yomo.WithSfnCredential(s.credential),
yomo.WithSfnTracerProvider(tp),
)
// init
err = sfn.Init(func() error {
err := sfn.Init(func() error {
return s.runtime.RunInit()
})
if err != nil {
Expand Down
53 changes: 19 additions & 34 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime"
"time"

Expand All @@ -16,24 +15,22 @@ import (
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
yquic "github.com/yomorun/yomo/pkg/listener/quic"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)

// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
zipperAddr string
name string // name of the client
clientID string // id of the client
reconnCounter uint // counter for reconnection
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
errorfn func(error) // function to invoke when error occured
wantedTarget string
opts *clientOptions
Logger *slog.Logger
tracerProvider oteltrace.TracerProvider
zipperAddr string
name string // name of the client
clientID string // id of the client
reconnCounter uint // counter for reconnection
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
errorfn func(error) // function to invoke when error occured
wantedTarget string
opts *clientOptions
Logger *slog.Logger

// ctx and ctxCancel manage the lifecycle of client.
ctx context.Context
Expand Down Expand Up @@ -68,16 +65,15 @@ func NewClient(appName, zipperAddr string, clientType ClientType, opts ...Client
ctx, ctxCancel := context.WithCancelCause(context.Background())

return &Client{
zipperAddr: zipperAddr,
name: appName,
clientID: clientID,
processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") },
clientType: clientType,
opts: option,
Logger: logger,
tracerProvider: option.tracerProvider,
ctx: ctx,
ctxCancel: ctxCancel,
zipperAddr: zipperAddr,
name: appName,
clientID: clientID,
processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") },
clientType: clientType,
opts: option,
Logger: logger,
ctx: ctx,
ctxCancel: ctxCancel,

done: make(chan struct{}),
wrCh: make(chan frame.Frame),
Expand Down Expand Up @@ -429,14 +425,3 @@ type Downstream interface {
Close() error
Connect(context.Context) error
}

// TracerProvider returns the tracer provider of client.
func (c *Client) TracerProvider() oteltrace.TracerProvider {
if c.tracerProvider == nil {
return nil
}
if reflect.ValueOf(c.tracerProvider).IsNil() {
return nil
}
return c.tracerProvider
}
9 changes: 0 additions & 9 deletions core/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/ylog"
pkgtls "github.com/yomorun/yomo/pkg/tls"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)

Expand All @@ -32,7 +31,6 @@ type clientOptions struct {
reconnect bool
nonBlockWrite bool
logger *slog.Logger
tracerProvider trace.TracerProvider
// ai function
aiFunctionInputModel any
aiFunctionDescription string
Expand Down Expand Up @@ -107,13 +105,6 @@ func WithLogger(logger *slog.Logger) ClientOption {
}
}

// WithTracerProvider sets tracer provider for the client.
func WithTracerProvider(tp trace.TracerProvider) ClientOption {
return func(o *clientOptions) {
o.tracerProvider = tp
}
}

// WithAIFunctionDefinition sets AI function definition for the client.
func WithAIFunctionDefinition(description string, inputModel any) ClientOption {
return func(o *clientOptions) {
Expand Down
4 changes: 2 additions & 2 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestFrameRoundTrip(t *testing.T) {
exited = checkClientExited(sfn2, time.Second)
assert.False(t, exited, "sfn stream should not exited")

sfnMd := NewMetadata(source.clientID, "tid", "trace-id", "span-id", false)
sfnMd := NewMetadata(source.clientID, "tid")

sfnMetaBytes, _ := sfnMd.Encode()

Expand All @@ -186,7 +186,7 @@ func TestFrameRoundTrip(t *testing.T) {
assert.ElementsMatch(t, nameList, []string{"source", "sfn-1", "sfn-2"})

md := metadata.New(
NewMetadata(source.clientID, "tid", "trace-id", "span-id", false),
NewMetadata(source.clientID, "tid"),
metadata.M{
"foo": "bar",
},
Expand Down
Loading

0 comments on commit b5317dc

Please sign in to comment.