Skip to content

Commit

Permalink
feat: support route dataframe to dedicate sfn instance (#704)
Browse files Browse the repository at this point in the history
Adding `WriteWithTarget` api for `Source`, call it like:

```go
source.WriteWithTarget(0x33, []byte("hello handler"), "target-id-1")
```

And sfn will receive the payload that be written by source like this:

```go
sfn.SetWantedTarget("target-id-1")

sfn.SetHandler(func(ctx serverless.Context) {
  fmt.Println(ctx.Data())
  ctx.WriteWithTarget(0x34, []byte("hello next"), "target-id-2")
})
```

And the sfn that has called `sfn.SetWantedTarget("target-id-2")` will
receive data from the above sfn.

The route rules is:

1. find all sfn in tag.
2. if the sfn do not called `SetWantedTarget`, send data to the sfn.
3. if the sfn called `SetWantedTarget`, send the data only when
`dataFrame.Metadata.Target == sfn.WantedTarget`.

TODO: cli/wasm support.

---------

Co-authored-by: venjiang <[email protected]>
  • Loading branch information
woorui and venjiang authored Feb 2, 2024
1 parent c9ed644 commit 752305d
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 84 deletions.
6 changes: 3 additions & 3 deletions cli/serverless/golang/templates/init_test.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ func TestHandler(t *testing.T) {
name string
ctx *mock.MockContext
// want is the expected data and tag that be written by ctx.Write
want []mock.DataAndTag
want []mock.WriteRecord
}{
{
name: "upper",
ctx: mock.NewMockContext([]byte("hello"), 0x33),
want: []mock.DataAndTag{
want: []mock.WriteRecord{
{Data: []byte("HELLO"), Tag: 0x34},
},
},
Expand All @@ -26,7 +26,7 @@ func TestHandler(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Handler(tt.ctx)
got := tt.ctx.RecordWritten()
got := tt.ctx.RecordsWritten()

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("TestHandler got: %v, want: %v", got, tt.want)
Expand Down
7 changes: 7 additions & 0 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Client struct {
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
errorfn func(error) // function to invoke when error occured
wantedTarget string
opts *clientOptions
Logger *slog.Logger
tracerProvider oteltrace.TracerProvider
Expand Down Expand Up @@ -81,6 +82,11 @@ func NewClient(appName, zipperAddr string, clientType ClientType, opts ...Client
}
}

// SetWantedTarget set the wanted target string.
func (c *Client) SetWantedTarget(target string) {
c.wantedTarget = target
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context) error {
CONNECT:
Expand Down Expand Up @@ -179,6 +185,7 @@ func (c *Client) connect(ctx context.Context, addr string) (frame.Conn, error) {
AuthName: c.opts.credential.Name(),
AuthPayload: c.opts.credential.Payload(),
Version: Version,
WantedTarget: c.wantedTarget,
}

if err := conn.WriteFrame(hf); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type HandshakeFrame struct {
AuthPayload string
// Version is used by the source/sfn to communicate their spec version to the server.
Version string
// WantedTarget represents the target that accepts the data frames that carrying the same target.
WantedTarget string
}

// Type returns the type of HandshakeFrame.
Expand Down
46 changes: 17 additions & 29 deletions core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,34 @@ import (
"golang.org/x/exp/slog"
)

const (
// the keys for yomo working.
MetadataSourceIDKey = "yomo-source-id"
MetadataTIDKey = "yomo-tid"

// the keys for tracing.
MetadataTraceIDKey = "yomo-trace-id"
MetadataSpanIDKey = "yomo-span-id"
MetaTracedKey = "yomo-traced"
)

// NewMetadata returns metadata for yomo working.
func NewMetadata(sourceID, tid string, traceID string, spanID string, traced bool) metadata.M {
return metadata.M{
MetadataSourceIDKey: sourceID,
MetadataTIDKey: tid,
MetadataTraceIDKey: traceID,
MetadataSpanIDKey: spanID,
MetaTracedKey: tracedString(traced),
metadata.SourceIDKey: sourceID,
metadata.TIDKey: tid,
metadata.TraceIDKey: traceID,
metadata.SpanIDKey: spanID,
metadata.TracedKey: tracedString(traced),
}
}

// GetSourceIDFromMetadata gets sourceID from metadata.
func GetSourceIDFromMetadata(m metadata.M) string {
sourceID, _ := m.Get(MetadataSourceIDKey)
return sourceID
}

// GetTIDFromMetadata gets TID from metadata.
func GetTIDFromMetadata(m metadata.M) string {
tid, _ := m.Get(MetadataTIDKey)
tid, _ := m.Get(metadata.TIDKey)
return tid
}

// GetTracedFromMetadata gets traced from metadata.
func GetTracedFromMetadata(m metadata.M) bool {
tracedString, _ := m.Get(MetaTracedKey)
tracedString, _ := m.Get(metadata.TracedKey)
return tracedString == "true"
}

// SetMetadataTarget sets target in metadata.
func SetMetadataTarget(m metadata.M, target string) {
m.Set(metadata.TargetKey, target)
}

// SourceMetadata generates source metadata with trace information.
func SourceMetadata(
sourceID, tid string,
Expand Down Expand Up @@ -97,8 +85,8 @@ func ExtendTraceMetadata(
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
var (
traceID, _ = md.Get(MetadataTraceIDKey)
spanID, _ = md.Get(MetadataSpanIDKey)
traceID, _ = md.Get(metadata.TraceIDKey)
spanID, _ = md.Get(metadata.SpanIDKey)
parentTraced = GetTracedFromMetadata(md)
endFn = func() {}
)
Expand Down Expand Up @@ -140,9 +128,9 @@ func ExtendTraceMetadata(
}

// reallocate metadata with new TraceID and SpanID
md.Set(MetadataTraceIDKey, traceID)
md.Set(MetadataSpanIDKey, spanID)
md.Set(MetaTracedKey, tracedString(traced))
md.Set(metadata.TraceIDKey, traceID)
md.Set(metadata.SpanIDKey, spanID)
md.Set(metadata.TracedKey, tracedString(traced))

return md, endFn
}
Expand Down
16 changes: 16 additions & 0 deletions core/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ func (m M) Encode() ([]byte, error) {
}
return msgpack.Marshal(m)
}

// yomo reserved metadata keys.
const (
// the keys for yomo working.
SourceIDKey = "yomo-source-id"
TIDKey = "yomo-tid"

// the keys for tracing.
TraceIDKey = "yomo-trace-id"
SpanIDKey = "yomo-span-id"
TracedKey = "yomo-traced"

// the keys for target system working.
TargetKey = "yomo-target"
WantedTargetKey = "yomo-wanted-target"
)
7 changes: 6 additions & 1 deletion core/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/yomorun/yomo/core/metadata"
)

func TestMetadata(t *testing.T) {
md := NewMetadata("source", "tid", "traceID", "spanID", true)

assert.Equal(t, "source", GetSourceIDFromMetadata(md))
SetMetadataTarget(md, "target")
v, ok := md.Get(metadata.TargetKey)
assert.True(t, ok)
assert.Equal(t, "target", v)

assert.Equal(t, "tid", GetTIDFromMetadata(md))
assert.Equal(t, true, GetTracedFromMetadata(md))
}
32 changes: 28 additions & 4 deletions core/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type defaultRouter struct {
// mu protects data.
mu sync.RWMutex

// targets stores the mapping between connID and the target string that conn wanted.
targets map[string]string

// data stores tag and connID connection.
// The key is frame tag, The value is connID connection.
data map[frame.Tag]map[string]struct{}
Expand All @@ -34,15 +37,21 @@ type defaultRouter struct {
// It routes data according to observed tag or connID.
func Default() *defaultRouter {
return &defaultRouter{
data: make(map[frame.Tag]map[string]struct{}),
targets: make(map[string]string),
data: make(map[frame.Tag]map[string]struct{}),
}
}

func (r *defaultRouter) Add(connID string, ObserveDataTags []uint32, md metadata.M) error {
func (r *defaultRouter) Add(connID string, observeDataTags []uint32, md metadata.M) error {
r.mu.Lock()
defer r.mu.Unlock()

for _, tag := range ObserveDataTags {
target, ok := md.Get(metadata.WantedTargetKey)
if ok {
r.targets[connID] = target
}

for _, tag := range observeDataTags {
conns := r.data[tag]
if conns == nil {
conns = map[string]struct{}{}
Expand All @@ -58,19 +67,30 @@ func (r *defaultRouter) Route(dataTag uint32, md metadata.M) []string {
r.mu.RLock()
defer r.mu.RUnlock()

target, existed := md.Get(metadata.TargetKey)

var connID []string
if conns, ok := r.data[dataTag]; ok {
for k := range conns {
connID = append(connID, k)
if existed {
if wt, ok := r.targets[k]; ok && wt == target {
connID = append(connID, k)
}
} else {
connID = append(connID, k)
}
}
}

return connID
}

func (r *defaultRouter) Remove(connID string) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.targets, connID)

for _, conns := range r.data {
delete(conns, connID)
}
Expand All @@ -80,6 +100,10 @@ func (r *defaultRouter) Release() {
r.mu.Lock()
defer r.mu.Unlock()

for key := range r.targets {
delete(r.targets, key)
}

for key := range r.data {
delete(r.data, key)
}
Expand Down
24 changes: 24 additions & 0 deletions core/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,27 @@ func TestRouter(t *testing.T) {
ids = router.Route(1, nil)
assert.Equal(t, []string(nil), ids)
}

func TestTargetRouter(t *testing.T) {
router := Default()

err := router.Add("conn-1", []uint32{1}, metadata.M{metadata.WantedTargetKey: "target-1"})
assert.NoError(t, err)

err = router.Add("conn-2", []uint32{1}, metadata.M{metadata.WantedTargetKey: "target-1"})
assert.NoError(t, err)

err = router.Add("conn-3", []uint32{1}, metadata.M{metadata.WantedTargetKey: "target-2"})
assert.NoError(t, err)

ids := router.Route(1, metadata.M{metadata.TargetKey: "target-1"})
assert.ElementsMatch(t, []string{"conn-1", "conn-2"}, ids)

ids = router.Route(1, metadata.M{})
assert.ElementsMatch(t, []string{"conn-1", "conn-2", "conn-3"}, ids)

router.Release()

ids = router.Route(1, nil)
assert.Equal(t, []string(nil), ids)
}
3 changes: 3 additions & 0 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func (s *Server) authenticate(hf *frame.HandshakeFrame) (metadata.M, error) {
}

func (s *Server) createConnection(hf *frame.HandshakeFrame, md metadata.M, fconn frame.Conn) (*Connection, error) {
if hf.WantedTarget != "" {
md.Set(metadata.WantedTargetKey, hf.WantedTarget)
}
conn := newConnection(
hf.Name,
hf.ID,
Expand Down
49 changes: 41 additions & 8 deletions core/serverless/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,35 @@ package serverless

import (
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
)

// Context sfn handler context
type Context struct {
writer frame.Writer
dataFrame *frame.DataFrame
writer frame.Writer
tag uint32
md metadata.M
data []byte
}

// NewContext creates a new serverless Context
func NewContext(writer frame.Writer, dataFrame *frame.DataFrame) *Context {
func NewContext(writer frame.Writer, tag uint32, md metadata.M, data []byte) *Context {
return &Context{
writer: writer,
dataFrame: dataFrame,
writer: writer,
tag: tag,
md: md,
data: data,
}
}

// Tag returns the tag of the data frame
func (c *Context) Tag() uint32 {
return c.dataFrame.Tag
return c.tag
}

// Data returns the data of the data frame
func (c *Context) Data() []byte {
return c.dataFrame.Payload
return c.data
}

// Write writes the data
Expand All @@ -35,9 +40,37 @@ func (c *Context) Write(tag uint32, data []byte) error {
return nil
}

mdBytes, err := c.md.Encode()
if err != nil {
return err
}

dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}

return c.writer.WriteFrame(dataFrame)
}

func (c *Context) WriteWithTarget(tag uint32, data []byte, target string) error {
if data == nil {
return nil
}

if target != "" {
c.md.Set(metadata.TargetKey, target)
}

mdBytes, err := c.md.Encode()
if err != nil {
return err
}

dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: c.dataFrame.Metadata,
Metadata: mdBytes,
Payload: data,
}

Expand Down
Loading

0 comments on commit 752305d

Please sign in to comment.