Skip to content

Commit

Permalink
feat: add cron handler for sfn (#713)
Browse files Browse the repository at this point in the history
# Description

Adding `SetCronHandler` api for sfn, call it like below:

```go
sfn.SetCronHandler("@every 1s", func(ctx serverless.CronContext) {
    ctx.Write(0x22, []byte("message from cron sfn"))
    ctx.WriteWithTarget(0x23, []byte("message from cron sfn"),  "target-id")
})
```
  • Loading branch information
woorui authored Feb 4, 2024
1 parent 752305d commit 3a44b2d
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 10 deletions.
5 changes: 4 additions & 1 deletion core/handler_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"github.com/yomorun/yomo/serverless"
)

// AsyncHandler is the request-response mode (asnyc)
// CronHandler is the cron mode.
type CronHandler func(ctx serverless.CronContext)

// AsyncHandler is the request-response mode (asnyc).
type AsyncHandler func(ctx serverless.Context)

// PipeHandler is the bidirectional stream mode (blocking).
Expand Down
29 changes: 26 additions & 3 deletions core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,33 @@ func SetMetadataTarget(m metadata.M, target string) {
m.Set(metadata.TargetKey, target)
}

// SourceMetadata generates source metadata with trace information.
func SourceMetadata(
// InitialSourceMetadata generates initial source metadata with trace information.
// the span name typically corresponds to the source's name.
func InitialSourceMetadata(
sourceID, tid string,
spanName string, // the span name usually is the source name.
spanName string,
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
return initialMetadata(sourceID, tid, "Source", spanName, tp, logger)
}

// InitialSfnMetadata generates initial sfn metadata with trace information.
// the span name typically corresponds to the sfn's name.
func InitialSfnMetadata(
sourceID, tid string,
spanName string,
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
return initialMetadata(sourceID, tid, "StreamFunction", spanName, tp, logger)
}

// initialMetadata builds a metadata with trace information.
// the tracer name is `Source` or `StreamFunction`.
// span name typically corresponds to the source's name or sfn's name.
func initialMetadata(
sourceID, tid string,
tracerName string,
spanName string,
tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func()) {
var (
Expand Down
7 changes: 6 additions & 1 deletion core/serverless/context_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import (
"github.com/yomorun/yomo/serverless"
)

// HTTP is the interface for HTTP request, but it is not implemented in the server side
// HTTP is the interface of Context for HTTP request, but it is not implemented in the server side
func (c *Context) HTTP() serverless.HTTP {
return nil
}

// HTTP is the interface of CronContext for HTTP request, but it is not implemented in the server side
func (c *CronContext) HTTP() serverless.HTTP {
return nil
}
65 changes: 65 additions & 0 deletions core/serverless/cron_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package serverless

import (
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
)

// CronContext sfn cron handler context
type CronContext struct {
writer frame.Writer
md metadata.M
mdBytes []byte
}

// NewCronContext creates a new serverless CronContext
func NewCronContext(writer frame.Writer, md metadata.M) *CronContext {
mdBytes, _ := md.Encode()

return &CronContext{
writer: writer,
md: md,
mdBytes: mdBytes,
}
}

// Write writes the data to next sfn instance.
func (c *CronContext) Write(tag uint32, data []byte) error {
if data == nil {
return nil
}

dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: c.mdBytes,
Payload: data,
}

return c.writer.WriteFrame(dataFrame)
}

// WriteWithTarget writes the data to next sfn instance with specified target.
func (c *CronContext) WriteWithTarget(tag uint32, data []byte, target string) error {
if data == nil {
return nil
}

if target == "" {
return c.Write(tag, data)
}

c.md.Set(metadata.TargetKey, target)

mdBytes, err := c.md.Encode()
if err != nil {
return err
}

dataFrame := &frame.DataFrame{
Tag: tag,
Metadata: mdBytes,
Payload: data,
}

return c.writer.WriteFrame(dataFrame)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/quic-go/quic-go v0.40.1
github.com/reactivex/rxgo/v2 v2.5.0
github.com/robfig/cron/v3 v3.0.1
github.com/second-state/WasmEdge-go v0.13.4
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ github.com/quic-go/quic-go v0.40.1 h1:X3AGzUNFs0jVuO3esAGnTfvdgvL4fq655WaOi1snv1
github.com/quic-go/quic-go v0.40.1/go.mod h1:PeN7kuVJ4xZbxSv/4OX6S1USOX8MJvydwpTx31vx60c=
github.com/reactivex/rxgo/v2 v2.5.0 h1:FhPgHwX9vKdNQB2gq9EPt+EKk9QrrzoeztGbEEnZam4=
github.com/reactivex/rxgo/v2 v2.5.0/go.mod h1:bs4fVZxcb5ZckLIOeIeVH942yunJLWDABWGbrHAW+qU=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
10 changes: 10 additions & 0 deletions serverless/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ type Context interface {
WriteWithTarget(tag uint32, data []byte, target string) error
}

// CronContext sfn corn handler context
type CronContext interface {
// Write writes data
Write(tag uint32, data []byte) error
// HTTP http interface
HTTP() HTTP
// WriteWithTarget writes data to sfn instance with specified target
WriteWithTarget(tag uint32, data []byte, target string) error
}

// HTTP http interface
type HTTP interface {
Send(req *HTTPRequest) (*HTTPResponse, error)
Expand Down
47 changes: 45 additions & 2 deletions sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"
"errors"

"github.com/robfig/cron/v3"

"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/core/serverless"
"github.com/yomorun/yomo/pkg/id"
oteltrace "go.opentelemetry.io/otel/trace"
)

Expand All @@ -26,6 +29,13 @@ type StreamFunction interface {
SetErrorHandler(fn func(err error))
// SetPipeHandler set the pipe handler function
SetPipeHandler(fn core.PipeHandler) error
// SetCronHandler set the cron handler function.
// Examples:
// sfn.SetCronHandler("0 30 * * * *", func(ctx serverless.CronContext) {})
// sfn.SetCronHandler("@hourly", func(ctx serverless.CronContext) {})
// sfn.SetCronHandler("@every 1h30m", func(ctx serverless.CronContext) {})
// more spec style see: https://pkg.go.dev/github.com/robfig/cron#hdr-Usage
SetCronHandler(spec string, fn core.CronHandler) error
// Connect create a connection to the zipper
Connect() error
// Close will close the connection
Expand Down Expand Up @@ -71,6 +81,9 @@ type streamFunction struct {
fn core.AsyncHandler // user's function which will be invoked when data arrived
pfn core.PipeHandler
pIn chan []byte
cronSpec string
cronFn core.CronHandler
cron *cron.Cron
pOut chan *frame.DataFrame
}

Expand All @@ -92,16 +105,42 @@ func (s *streamFunction) SetHandler(fn core.AsyncHandler) error {
return nil
}

func (s *streamFunction) SetCronHandler(cronSpec string, fn core.CronHandler) error {
s.cronSpec = cronSpec
s.cronFn = fn
s.client.Logger.Debug("set cron handler")
return nil
}

func (s *streamFunction) SetPipeHandler(fn core.PipeHandler) error {
s.pfn = fn
s.client.Logger.Debug("set pipe handler")
return nil
}

// Connect create a connection to the zipper, when data arrvied, the data will be passed to the
// handler which setted by SetHandler method.
// handler set by SetHandler method.
func (s *streamFunction) Connect() error {
if len(s.observeDataTags) == 0 {
hasCron := s.cronFn != nil && s.cronSpec != ""
if hasCron {
s.cron = cron.New()
s.cron.AddFunc(s.cronSpec, func() {
md, deferFunc := core.InitialSfnMetadata(
s.client.ClientID(),
id.New(),
s.name,
s.client.TracerProvider(),
s.client.Logger,
)
defer deferFunc()

cronCtx := serverless.NewCronContext(s.client, md)
s.cronFn(cronCtx)
})
s.cron.Start()
}

if len(s.observeDataTags) == 0 && !hasCron {
return errors.New("streamFunction cannot observe data because the required tag has not been set")
}

Expand Down Expand Up @@ -169,6 +208,10 @@ func (s *streamFunction) Close() error {
close(s.pOut)
}

if s.cron != nil {
s.cron.Stop()
}

if s.client != nil {
if err := s.client.Close(); err != nil {
s.client.Logger.Error("failed to close sfn", "err", err)
Expand Down
28 changes: 27 additions & 1 deletion sfn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func TestSfnWantedTarget(t *testing.T) {
sfn.SetHandler(func(ctx serverless.Context) {
t.Logf("unittest handler sfn receive <- (%d)", len(ctx.Data()))
assert.Equal(t, uint32(0x22), ctx.Tag())
assert.Contains(t, []string{"message from source", "message from sfn"}, string(ctx.Data()))
assert.Contains(t, []string{
"message from source",
"message from sfn",
"message from cron sfn",
}, string(ctx.Data()))
})

err := sfn.Connect()
Expand All @@ -89,3 +93,25 @@ func TestSfnInit(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(1), total)
}

func TestSfnCron(t *testing.T) {
t.Parallel()

sfn := NewStreamFunction("sfn-cron", "localhost:9000", WithSfnCredential("token:<CREDENTIAL>"))

time.AfterFunc(time.Second, func() {
sfn.Close()
})

// set cron handler
sfn.SetCronHandler("@every 200ms", func(ctx serverless.CronContext) {
t.Log("unittest cron sfn, time reached")
ctx.Write(0x22, []byte("message from cron sfn"))
ctx.WriteWithTarget(0x22, []byte("message from cron sfn"), mockTargetString)
})

err := sfn.Connect()
assert.Nil(t, err)

sfn.Wait()
}
4 changes: 2 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *yomoSource) Connect() error {

// Write writes data with specified tag.
func (s *yomoSource) Write(tag uint32, data []byte) error {
md, deferFunc := core.SourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
md, deferFunc := core.InitialSourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
defer deferFunc()

mdBytes, err := md.Encode()
Expand All @@ -93,7 +93,7 @@ func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) err
if data == nil {
return nil
}
md, deferFunc := core.SourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
md, deferFunc := core.InitialSourceMetadata(s.client.ClientID(), id.New(), s.name, s.client.TracerProvider(), s.client.Logger)
defer deferFunc()

if target != "" {
Expand Down

0 comments on commit 3a44b2d

Please sign in to comment.