From 6ebf28c7d76f37a48ddf7d141501df7c8af0a4ba Mon Sep 17 00:00:00 2001 From: "C.C" Date: Mon, 5 Feb 2024 02:21:46 -0800 Subject: [PATCH] feat: add example for Cron and Target (#715) ## Example 10: Target property and Cron feature ### Steps to run the example #### 1. Start Zipper Server ```bash yomo serve -c ../config.yaml ``` #### 2. Start `sfn-1-executor` This stateful serverless function will emit data every 2 seconds. ```bash go run sfn-1-executor/main.go ``` #### 3. Start two instances of `sfn-2-sink` to consume the data First one start with `USERID=alice`, this instance will consume the data with `target` property set to `alice`. ```bash USERID=alice go run sfn-2-sink/main.go ``` Second one start with `USERID=bob`, this instance will consume the data with `target` property set to `bob`. ```bash USERID=bob go run sfn-2-sink/main.go ``` image --- example/a-target-and-cron/README.md | 31 +++++++++++++ .../a-target-and-cron/sfn-1-executor/main.go | 44 ++++++++++++++++++ example/a-target-and-cron/sfn-2-sink/main.go | 45 +++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 example/a-target-and-cron/README.md create mode 100644 example/a-target-and-cron/sfn-1-executor/main.go create mode 100644 example/a-target-and-cron/sfn-2-sink/main.go diff --git a/example/a-target-and-cron/README.md b/example/a-target-and-cron/README.md new file mode 100644 index 000000000..3f5004479 --- /dev/null +++ b/example/a-target-and-cron/README.md @@ -0,0 +1,31 @@ +## Example 10: Target property and Cron feature + +### Steps to run the example + +#### 1. Start Zipper Server + +```bash +yomo serve -c ../config.yaml +``` + +#### 2. Start `sfn-1-executor` + +This stateful serverless function will emit data every 2 seconds. + +```bash +go run sfn-1-executor/main.go +``` + +#### 3. Start two instances of `sfn-2-sink` to consume the data + +First one start with `USERID=alice`, this instance will consume the data with `target` property set to `alice`. + +```bash +USERID=alice go run sfn-2-sink/main.go +``` + +Second one start with `USERID=bob`, this instance will consume the data with `target` property set to `bob`. + +```bash +USERID=bob go run sfn-2-sink/main.go +``` diff --git a/example/a-target-and-cron/sfn-1-executor/main.go b/example/a-target-and-cron/sfn-1-executor/main.go new file mode 100644 index 000000000..0678290bf --- /dev/null +++ b/example/a-target-and-cron/sfn-1-executor/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "log/slog" + "os" + + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/serverless" +) + +var i = 0 +var target = "bob" + +func main() { + sfn := yomo.NewStreamFunction( + "fn1", + "localhost:9000", + ) + defer sfn.Close() + + sfn.SetCronHandler("@every 1s", func(ctx serverless.CronContext) { + if i%2 == 0 { + target = "alice" + } else { + target = "bob" + } + // ctx.Write(0x33, []byte("message from cron sfn")) + ctx.WriteWithTarget(0x33, []byte(fmt.Sprintf("message from cron sfn %d", i)), target) + i++ + }) + // start + err := sfn.Connect() + if err != nil { + slog.Error("[sfn] connect", "err", err) + os.Exit(1) + } + // set the error handler function when server error occurs + sfn.SetErrorHandler(func(err error) { + slog.Error("[sfn] receive server error", "err", err) + }) + + sfn.Wait() +} diff --git a/example/a-target-and-cron/sfn-2-sink/main.go b/example/a-target-and-cron/sfn-2-sink/main.go new file mode 100644 index 000000000..0fca6e6f9 --- /dev/null +++ b/example/a-target-and-cron/sfn-2-sink/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "os" + + "github.com/yomorun/yomo" + "github.com/yomorun/yomo/serverless" + "golang.org/x/exp/slog" +) + +var instanceID = "bob" + +func main() { + if v := os.Getenv("USERID"); v != "" { + instanceID = v + } + sfn := yomo.NewStreamFunction( + "sink", + "localhost:9000", + ) + sfn.SetObserveDataTags(0x33) + sfn.SetWantedTarget(instanceID) + + // set handler + sfn.SetHandler(handler) + // start + err := sfn.Connect() + if err != nil { + slog.Error("[sfn] connect", "err", err) + os.Exit(1) + } + defer sfn.Close() + + // set the error handler function when server error occurs + sfn.SetErrorHandler(func(err error) { + slog.Error("[sfn] receive server error", "err", err) + }) + + sfn.Wait() +} + +func handler(ctx serverless.Context) { + data := string(ctx.Data()) + slog.Info("Received", "uid", instanceID, "tag", ctx.Tag(), "data", data) +}