Skip to content

Commit

Permalink
feat: serverless event trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
illyaks committed May 26, 2024
1 parent 5e7f323 commit 4f72108
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 8 deletions.
7 changes: 4 additions & 3 deletions pkg/api/serverless/trigger.go → pkg/api/trigger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package serverless
package api

type Trigger struct {
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
Expand All @@ -7,6 +7,7 @@ type Trigger struct {
}

type TriggerSpec struct {
Type string `json:"type,omitempty" yaml:"type,omitempty"`
Function string `json:"function,omitempty" yaml:"function,omitempty"`
Type string `json:"type,omitempty" yaml:"type,omitempty"`
FunctionNamespace string `json:"functionNamespace,omitempty" yaml:"functionNamespace,omitempty"`
FunctionName string `json:"functionName,omitempty" yaml:"functionName,omitempty"`
}
21 changes: 16 additions & 5 deletions pkg/apiserver/handlers/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package handlers

import (
"context"
"encoding/json"
"minik8s/pkg/api"
"minik8s/pkg/api/msg_type"
"minik8s/pkg/config"
"minik8s/pkg/etcd"
"minik8s/pkg/kafka"
"strings"
"sync"
)

Expand All @@ -13,14 +17,21 @@ var etcdClient etcd.Store

func WatchHandler(key string, value string) {
// "/trigger/function_namespace/function_name"
function := key[9:]
// TODO: get function namespace and name, generate URL
URL := ""
str := etcdClient.GetEtcdPair(URL)
str := key[9:]
strList := strings.Split(str, "/")
functionNamespace := strList[0]
functionName := strList[1]
URL := config.FunctionPath + functionNamespace + "/" + functionName
str = etcdClient.GetEtcdPair(URL)
if str == "" {
return
}
// TODO: check function and send message
var function api.Function
_ = json.Unmarshal([]byte(str), &function)
if function.Trigger.Event == true {
jsonString, _ := json.Marshal(function)
publisher.Publish(msg_type.TriggerTopic, string(jsonString))
}
}

func init() {
Expand Down
87 changes: 87 additions & 0 deletions pkg/apiserver/handlers/triggerhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package handlers

import (
"encoding/json"
"github.com/gin-gonic/gin"
"minik8s/pkg/api"
"minik8s/pkg/config"
"minik8s/util/log"
"minik8s/util/stringutil"
"net/http"
)

func AddTrigger(context *gin.Context) {
// Add function
log.Info("Add trigger")
var trigger api.Trigger
if err := context.ShouldBind(&trigger); err != nil {
context.JSON(http.StatusBadRequest, gin.H{
"status": "wrong",
})
return
}

funcURL := config.FunctionPath + trigger.Spec.FunctionNamespace + "/" + trigger.Spec.FunctionNamespace
str := etcdClient.GetEtcdPair(funcURL)
if str == "" {
context.JSON(http.StatusBadRequest, gin.H{
"status": "unknown function",
})
return
}

var function api.Function
_ = json.Unmarshal([]byte(str), &function)
function.Trigger.Event = true
byteArr, _ := json.Marshal(function)
etcdClient.PutEtcdPair(funcURL, string(byteArr))
TriggerURL := config.EtcdTriggerPath + trigger.Spec.FunctionNamespace + "/" + trigger.Spec.FunctionNamespace
byteArr, _ = json.Marshal(trigger)
etcdClient.PutEtcdPair(TriggerURL, string(byteArr))

}

func GetTriggers(context *gin.Context) {
// Get function
log.Info("Get triggers")
URL := config.EtcdTriggerPath
triggers := etcdClient.PrefixGet(URL)

log.Debug("get all nodes are: %+v", triggers)
jsonString := stringutil.EtcdResEntryToJSON(triggers)
context.JSON(http.StatusOK, gin.H{
"data": jsonString,
})
}

func DeleteTrigger(context *gin.Context) {
// Delete function
log.Info("Delete function")
name := context.Param(config.NameParam)
namespace := context.Param(config.NamespaceParam)
TriggerURL := config.EtcdTriggerPath + namespace + "/" + name
str := etcdClient.GetEtcdPair(TriggerURL)
if str == "" {
context.JSON(http.StatusBadRequest, gin.H{
"status": "wrong",
})
return
}
var trigger api.Trigger
_ = json.Unmarshal([]byte(str), &trigger)
funcURL := config.FunctionPath + trigger.Spec.FunctionNamespace + "/" + trigger.Spec.FunctionNamespace
str = etcdClient.GetEtcdPair(funcURL)
if str == "" {
context.JSON(http.StatusBadRequest, gin.H{
"status": "unknown function",
})
return
}

var function api.Function
_ = json.Unmarshal([]byte(str), &function)
function.Trigger.Event = false
byteArr, _ := json.Marshal(function)
etcdClient.PutEtcdPair(funcURL, string(byteArr))
etcdClient.DeleteEtcdPair(TriggerURL)
}
4 changes: 4 additions & 0 deletions pkg/apiserver/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ func (server *apiServer) bind() {
server.router.PUT(config.FunctionURL, handlers.UpdateFunction)
server.router.POST(config.FunctionURL, handlers.AddFunction)
server.router.DELETE(config.FunctionURL, handlers.DeleteFunction)

server.router.GET(config.TriggersURL, handlers.GetTriggers)
server.router.POST(config.TriggersURL, handlers.AddTrigger)
server.router.DELETE(config.TriggerURL, handlers.DeleteTrigger)
}
3 changes: 3 additions & 0 deletions pkg/config/urlconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
FunctionsAllURL = "/api/v1/functions"
FunctionsURL = "/api/v1/namespaces/:namespace/functions"
FunctionURL = "/api/v1/namespaces/:namespace/functions/:name"

TriggersURL = "/api/v1/triggers"
TriggerURL = "/api/v1/namespace/:namespace/trigger/:name"
)

// const used to send and parse url
Expand Down
24 changes: 24 additions & 0 deletions pkg/kubectl/cmd/applycmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func applyCmdHandler(cmd *cobra.Command, args []string) {
applyDNSHandler(content)
case "Function":
applyFunctionHandler(content)
case "Trigger":
applyTriggerHandler(content)
default:
log.Warn("Unknown resource kind")
}
Expand Down Expand Up @@ -264,3 +266,25 @@ func applyDNSHandler(content []byte) {
}
log.Info("apply DNS successed")
}

func applyTriggerHandler(content []byte) {
log.Info("creating or updating trigger")
trigger := &api.Trigger{}
err := yaml.Unmarshal(content, trigger)
if err != nil {
log.Error("Error yaml unmarshal trigger")
return
}
byteArr, err := json.Marshal(*trigger)
if err != nil {
log.Error("Error json marshal trigger")
return
}
URL := config.GetUrlPrefix() + config.TriggersURL
err = httputil.Post(URL, byteArr)
if err != nil {
log.Error("Error http post: %s", err.Error())
return
}
log.Info("apply trigger successed")
}
30 changes: 30 additions & 0 deletions pkg/kubectl/cmd/deletecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,24 @@ func DeleteCmd() *cobra.Command {
Run: deleteFunctionCmdHandler,
}

deleteTriggerCmd := &cobra.Command{
Use: "trigger [function name]",
Short: "delete trigger",
Run: deleteTriggerCmdHandler,
}

deletePodCmd.Flags().StringP("namespace", "n", "default", "specify the namespace of the resource")
deleteServiceCmd.Flags().StringP("namespace", "n", "default", "specify the namespace of the resource")
deleteFunctionCmd.Flags().StringP("namespace", "n", "default", "specify the namespace of the resource")
deleteTriggerCmd.Flags().StringP("namespace", "n", "default", "specify the namespace of the resource")

deleteCmd.AddCommand(deletePodCmd)
deleteCmd.AddCommand(deleteDeploymentCmd)
deleteCmd.AddCommand(deleteServiceCmd)
deleteCmd.AddCommand(deleteHPACmd)
deleteCmd.AddCommand(deleteDNSCmd)
deleteCmd.AddCommand(deleteFunctionCmd)
deleteCmd.AddCommand(deleteTriggerCmd)

return deleteCmd
}
Expand Down Expand Up @@ -169,3 +177,25 @@ func deleteDNSCmdHandler(cmd *cobra.Command, args []string) {
return
}
}

func deleteTriggerCmdHandler(cmd *cobra.Command, args []string) {
if len(args) == 0 {
log.Error("function name is required")
return
}
name := args[0]
namespace, err := cmd.Flags().GetString("namespace")
if err != nil {
log.Error("Error getting flags: %s", err)
return
}
path := strings.Replace(config.TriggerURL, config.NamespacePlaceholder, namespace, -1)
path = strings.Replace(path, config.NamePlaceholder, name, -1)
URL := config.GetUrlPrefix() + path
err = httputil.Delete(URL)
if err != nil {
log.Error("error http post: %s", err.Error())
return
}
log.Info("function name: %s, namespace: %s", name, namespace)
}
20 changes: 20 additions & 0 deletions pkg/kubectl/cmd/getcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func GetCmd() *cobra.Command {
Run: getNodeCmdHandler,
}

getTriggerCmd := &cobra.Command{
Use: "trigger",
Short: "get trigger",
Run: getTriggerCmdHandler,
}

getServiceCmd := &cobra.Command{
Use: "service",
Short: "get service",
Expand Down Expand Up @@ -65,6 +71,7 @@ func GetCmd() *cobra.Command {
getCmd.AddCommand(getNodeCmd)
getCmd.AddCommand(getDeploymentCmd)
getCmd.AddCommand(getServiceCmd)
getCmd.AddCommand(getTriggerCmd)

return getCmd
}
Expand Down Expand Up @@ -219,6 +226,19 @@ func getNodeCmdHandler(cmd *cobra.Command, args []string) {
}
}

func getTriggerCmdHandler(cmd *cobra.Command, args []string) {
log.Debug("the length of args is: %v", len(args))

matchTriggers := []api.Trigger{}
log.Debug("getting all triggers")
URL := config.GetUrlPrefix() + config.TriggersURL
err := httputil.Get(URL, &matchTriggers, "data")
if err != nil {
log.Error("error getting all triggers: %s", err.Error())
return
}
}

func getHPACmdHandler(cmd *cobra.Command, args []string) {
log.Debug("the length of args is: %v", len(args))

Expand Down

0 comments on commit 4f72108

Please sign in to comment.