Skip to content

Commit

Permalink
feat: client cannot write tag that within reserved range (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
woorui authored May 16, 2024
1 parent cabde84 commit 29d215a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 10 deletions.
2 changes: 1 addition & 1 deletion ai/function_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// ReducerTag is the observed tag of the reducer
var ReducerTag uint32 = 0x61
var ReducerTag uint32 = 0xE001

// FunctionCall describes the data structure when invoking the sfn function
type FunctionCall struct {
Expand Down
2 changes: 2 additions & 0 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func TestConnectTo(t *testing.T) {
ClientTypeSource,
WithLogger(discardingLogger),
)
assert.Equal(t, "source", source.Name())
assert.Equal(t, source.clientID, source.ClientID())

_ = source.Connect(context.TODO())

Expand Down
12 changes: 12 additions & 0 deletions core/frame/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frame

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -174,6 +175,17 @@ type Writer interface {
WriteFrame(Frame) error
}

// ErrReservedTag is returned when write a reserved tag.
var ErrReservedTag = errors.New("[0xF000, 0xFFFF] is reserved; please do not write within this range")

// IsReservedTag returns error when write a reserved tag.
func IsReservedTag(tag Tag) error {
if tag >= 0xF000 && tag <= 0xFFFF {
return ErrReservedTag
}
return nil
}

// Listener accepts Conns.
type Listener interface {
// Accept accepts Conns.
Expand Down
8 changes: 6 additions & 2 deletions core/serverless/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func (c *Context) Write(tag uint32, data []byte) error {
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
mdBytes, err := c.md.Encode()
if err != nil {
return err
Expand All @@ -63,7 +65,9 @@ func (c *Context) WriteWithTarget(tag uint32, data []byte, target string) error
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
if target != "" {
c.md.Set(metadata.TargetKey, target)
}
Expand Down
8 changes: 6 additions & 2 deletions core/serverless/cron_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func (c *CronContext) Write(tag uint32, data []byte) error {
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: c.mdBytes,
Expand All @@ -43,7 +45,9 @@ func (c *CronContext) WriteWithTarget(tag uint32, data []byte, target string) er
if data == nil {
return nil
}

if err := frame.IsReservedTag(tag); err != nil {
return err
}
if target == "" {
return c.Write(tag, data)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/quic-go/quic-go v0.43.1
github.com/reactivex/rxgo/v2 v2.5.0
github.com/robfig/cron/v3 v3.0.1
github.com/sashabaranov/go-openai v1.23.1
github.com/sashabaranov/go-openai v1.24.0
github.com/second-state/WasmEdge-go v0.13.4
github.com/shirou/gopsutil/v3 v3.24.2
github.com/spf13/cobra v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke
github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/sashabaranov/go-openai v1.23.1 h1:b2IsEG9+BdJ3f6G3gGu9Lon2Mw/C0aYqME3YzwBHcls=
github.com/sashabaranov/go-openai v1.23.1/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg=
github.com/sashabaranov/go-openai v1.24.0 h1:4H4Pg8Bl2RH/YSnU8DYumZbuHnnkfioor/dtNlB20D4=
github.com/sashabaranov/go-openai v1.24.0/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg=
github.com/second-state/WasmEdge-go v0.13.4 h1:NHfJC+aayUW93ydAzlcX7Jx1WDRpI24KvY5SAbeTyvY=
github.com/second-state/WasmEdge-go v0.13.4/go.mod h1:HyBf9hVj1sRAjklsjc1Yvs9b5RcmthPG9z99dY78TKg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down
7 changes: 5 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (s *yomoSource) Connect() error {

// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
tracer := trace.NewTracer("Source")
Expand Down Expand Up @@ -98,8 +101,8 @@ func (s *yomoSource) Write(tag uint32, data []byte) error {

// WritePayload writes `yomo.Payload` with specified tag.
func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
if data == nil {
return nil
if err := frame.IsReservedTag(tag); err != nil {
return err
}
md := core.NewMetadata(s.client.ClientID(), id.New())
// add trace
Expand Down
7 changes: 7 additions & 0 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/ylog"
)

Expand Down Expand Up @@ -35,6 +36,9 @@ func TestSource(t *testing.T) {
err := source.Connect()
assert.Nil(t, err)

err = source.Write(0xF001, []byte("reserved tag"))
assert.Equal(t, frame.ErrReservedTag, err)

// send data to zipper from source
err = source.Write(0x23, []byte("pipe test"))
assert.Nil(t, err)
Expand All @@ -43,6 +47,9 @@ func TestSource(t *testing.T) {
err = source.Write(0x21, []byte("test"))
assert.Nil(t, err)

err = source.WriteWithTarget(0xF002, []byte("reserved tag"), mockTargetString)
assert.Equal(t, frame.ErrReservedTag, err)

err = source.WriteWithTarget(0x22, []byte("message from source"), mockTargetString)
assert.Nil(t, err)

Expand Down

0 comments on commit 29d215a

Please sign in to comment.