Skip to content

Commit

Permalink
Merge pull request #45 from GMH233/feature/fk-spec-change
Browse files Browse the repository at this point in the history
Feature/fk spec change
  • Loading branch information
sjtuzc954 authored May 31, 2024
2 parents 296fd2d + 7704972 commit 5f0237a
Show file tree
Hide file tree
Showing 11 changed files with 617 additions and 9 deletions.
51 changes: 49 additions & 2 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,56 @@
package main

import "minikubernetes/pkg/kubelet/app"
import (
"encoding/json"
"fmt"
v1 "minikubernetes/pkg/api/v1"
"minikubernetes/pkg/kubectl/utils"
"minikubernetes/pkg/kubelet/app"
"net"
"os"
)

func usage() {
fmt.Println("usage: kubelet -j|--join <apiServerIP> [-c|--config <configFile>]")
os.Exit(1)
}

func main() {
kubeletServer, err := app.NewKubeletServer("10.119.12.123")
if len(os.Args) != 3 && len(os.Args) != 5 {
usage()
}
if os.Args[1] != "-j" && os.Args[1] != "--join" {
usage()
}
ip := os.Args[2]
if net.ParseIP(ip) == nil {
fmt.Printf("invalid ip: %s\n", ip)
os.Exit(1)
}
var node v1.Node
if len(os.Args) == 5 {
if os.Args[3] != "-c" && os.Args[3] != "--config" {
usage()
}
filename := os.Args[4]
yamlBytes, err := os.ReadFile(filename)
if err != nil {
fmt.Printf("failed to read file %s: %v\n", filename, err)
os.Exit(1)
}
jsonBytes, err := utils.YAML2JSON(yamlBytes)
if err != nil {
fmt.Printf("failed to parse yaml file\n")
os.Exit(1)
}
err = json.Unmarshal(jsonBytes, &node)
if err != nil {
fmt.Printf("failed to parse yaml file\n")
os.Exit(1)
}
}
kubeletServer, err := app.NewKubeletServer(ip, &node)
// kubeletServer, err := app.NewKubeletServer("10.119.12.123")
if err != nil {
return
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func main() {
break
case "Random_Policy":
break
case "NodeAffinity_Policy":
break
default:
fmt.Println("Invalid policy")
os.Exit(1)
Expand Down
26 changes: 26 additions & 0 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,29 @@ type SubsetSpec struct {
// pod名
Pods []string `json:"pods,omitempty"`
}

type RollingUpdate struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
Spec RollingUpdateSpec `json:"spec,omitempty"`
Status RollingUpdateStatus `json:"status,omitempty"`
}

type RollingUpdateSpec struct {
ServiceRef string `json:"serviceRef,omitempty"`
Port int32 `json:"port,omitempty"`
MinimumAlive int32 `json:"minimumAlive,omitempty"`
Interval int32 `json:"interval,omitempty"`
}

type RollingUpdatePhase string

const (
RollingUpdatePending RollingUpdatePhase = "Pending"
RollingUpdateRunning RollingUpdatePhase = "Running"
RollingUpdateFinished RollingUpdatePhase = "Finished"
)

type RollingUpdateStatus struct {
Phase RollingUpdatePhase `json:"phase,omitempty"`
}
191 changes: 191 additions & 0 deletions pkg/kubeapiserver/app/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ const (

SidecarMappingURL = "/api/v1/sidecar-mapping"
SidecarServiceNameMappingURL = "/api/v1/sidecar-service-name-mapping"

AllRollingUpdateURL = "/api/v1/rollingupdates"
NamespaceRollingUpdateURL = "/api/v1/namespaces/:namespace/rollingupdates"
SingleRollingUpdateURL = "/api/v1/namespaces/:namespace/rollingupdates/:rollingupdatename"
)

/* NAMESPACE
Expand Down Expand Up @@ -239,6 +243,11 @@ func (ser *kubeApiServer) binder() {
ser.router.GET(SidecarMappingURL, ser.GetSidecarMapping)
ser.router.POST(SidecarMappingURL, ser.SaveSidecarMapping)
ser.router.GET(SidecarServiceNameMappingURL, ser.GetSidecarServiceNameMapping)

ser.router.GET(AllRollingUpdateURL, ser.GetAllRollingUpdatesHandler)
ser.router.POST(NamespaceRollingUpdateURL, ser.AddRollingUpdateHandler)
ser.router.POST(SingleRollingUpdateURL, ser.UpdateRollingUpdateStatusHandler)
ser.router.DELETE(SingleRollingUpdateURL, ser.DeleteRollingUpdateHandler)
}

func (s *kubeApiServer) GetStatsDataHandler(c *gin.Context) {
Expand Down Expand Up @@ -1696,6 +1705,14 @@ func (s *kubeApiServer) DeleteDNSHandler(c *gin.Context) {
func (s *kubeApiServer) RegisterNodeHandler(c *gin.Context) {
s.lock.Lock()
defer s.lock.Unlock()
var n v1.Node
err := c.ShouldBind(&n)
if err != nil {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.Node]{
Error: "invalid node json",
})
return
}
address := c.Query("address")
if net.ParseIP(address) == nil {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.Node]{
Expand Down Expand Up @@ -1740,6 +1757,7 @@ func (s *kubeApiServer) RegisterNodeHandler(c *gin.Context) {
Namespace: Default_Namespace,
UID: v1.UID(uuid.NewUUID()),
CreationTimestamp: timestamp.NewTimestamp(),
Labels: n.Labels,
},
Status: v1.NodeStatus{
Address: address,
Expand Down Expand Up @@ -2748,3 +2766,176 @@ func (s *kubeApiServer) GetSidecarServiceNameMapping(c *gin.Context) {
Data: mapping,
})
}

func (s *kubeApiServer) AddRollingUpdateHandler(c *gin.Context) {
s.lock.Lock()
defer s.lock.Unlock()
var ru v1.RollingUpdate
err := c.ShouldBind(&ru)
if err != nil {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.RollingUpdate]{
Error: "invalid rolling update json",
})
return
}
namespace := c.Param("namespace")
if namespace == "" {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.RollingUpdate]{
Error: "namespace is required",
})
return
}
namespaceKey := fmt.Sprintf("/registry/namespaces/%s/rollingupdates/%s", namespace, ru.Name)
uid, err := s.store_cli.Get(namespaceKey)
if err == nil && uid != "" {
c.JSON(http.StatusConflict, v1.BaseResponse[*v1.RollingUpdate]{
Error: fmt.Sprintf("rolling update %s/%s already exists", namespace, ru.Name),
})
return
}
ru.Namespace = namespace
ru.CreationTimestamp = timestamp.NewTimestamp()
ru.UID = v1.UID(uuid.NewUUID())
ru.Status.Phase = v1.RollingUpdatePending
allKey := fmt.Sprintf("/registry/rollingupdates/%s", ru.UID)
ruJson, _ := json.Marshal(ru)
err = s.store_cli.Set(allKey, string(ruJson))
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdate]{
Error: "error in writing rolling update to etcd",
})
return
}
err = s.store_cli.Set(namespaceKey, string(ru.UID))
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdate]{
Error: "error in writing rolling update to etcd",
})
return
}
c.JSON(http.StatusCreated, v1.BaseResponse[*v1.RollingUpdate]{
Data: &ru,
})
}

func (s *kubeApiServer) GetAllRollingUpdatesHandler(c *gin.Context) {
s.lock.Lock()
defer s.lock.Unlock()
allKey := "/registry/rollingupdates"
res, err := s.store_cli.GetSubKeysValues(allKey)
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[[]*v1.RollingUpdate]{
Error: "error in reading from etcd",
})
return
}
rus := make([]*v1.RollingUpdate, 0)
for _, v := range res {
var ru v1.RollingUpdate
err = json.Unmarshal([]byte(v), &ru)
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[[]*v1.RollingUpdate]{
Error: "error in json unmarshal",
})
return
}
rus = append(rus, &ru)
}
c.JSON(http.StatusOK, v1.BaseResponse[[]*v1.RollingUpdate]{
Data: rus,
})
}

func (s *kubeApiServer) UpdateRollingUpdateStatusHandler(c *gin.Context) {
s.lock.Lock()
defer s.lock.Unlock()
var ruStatus v1.RollingUpdateStatus
err := c.ShouldBind(&ruStatus)
if err != nil {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.RollingUpdateStatus]{
Error: "invalid rolling update status json",
})
return
}
namespace := c.Param("namespace")
ruName := c.Param("rollingupdatename")
if namespace == "" || ruName == "" {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.RollingUpdateStatus]{
Error: "namespace and rolling update name cannot be empty",
})
return
}
namespaceKey := fmt.Sprintf("/registry/namespaces/%s/rollingupdates/%s", namespace, ruName)
uid, err := s.store_cli.Get(namespaceKey)
if err != nil || uid == "" {
c.JSON(http.StatusNotFound, v1.BaseResponse[*v1.RollingUpdateStatus]{
Error: fmt.Sprintf("rolling update %s/%s not found", namespace, ruName),
})
return
}
allKey := fmt.Sprintf("/registry/rollingupdates/%s", uid)
ruJson, err := s.store_cli.Get(allKey)
if err != nil || ruJson == "" {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdateStatus]{
Error: "error in reading rolling update from etcd",
})
return
}
var ru v1.RollingUpdate
_ = json.Unmarshal([]byte(ruJson), &ru)

ru.Status = ruStatus
newRuJson, _ := json.Marshal(ru)
err = s.store_cli.Set(allKey, string(newRuJson))
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdateStatus]{
Error: "error in writing rolling update to etcd",
})
return
}
c.JSON(http.StatusOK, v1.BaseResponse[*v1.RollingUpdateStatus]{
Data: &ruStatus,
})
}

func (s *kubeApiServer) DeleteRollingUpdateHandler(c *gin.Context) {
s.lock.Lock()
defer s.lock.Unlock()
namespace := c.Param("namespace")
ruName := c.Param("rollingupdatename")
if namespace == "" || ruName == "" {
c.JSON(http.StatusBadRequest, v1.BaseResponse[*v1.RollingUpdate]{
Error: "namespace and rolling update name cannot be empty",
})
return
}
namespaceKey := fmt.Sprintf("/registry/namespaces/%s/rollingupdates/%s", namespace, ruName)
uid, err := s.store_cli.Get(namespaceKey)
if err != nil || uid == "" {
c.JSON(http.StatusNotFound, v1.BaseResponse[*v1.RollingUpdate]{
Error: fmt.Sprintf("rolling update %s/%s not found", namespace, ruName),
})
return
}
allKey := fmt.Sprintf("/registry/rollingupdates/%s", uid)
ruJson, _ := s.store_cli.Get(allKey)
var ru v1.RollingUpdate
_ = json.Unmarshal([]byte(ruJson), &ru)
err = s.store_cli.Delete(namespaceKey)
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdate]{
Error: "error in deleting rolling update from etcd",
})
return
}
err = s.store_cli.Delete(allKey)
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[*v1.RollingUpdate]{
Error: "error in deleting rolling update from etcd",
})
return
}
c.JSON(http.StatusOK, v1.BaseResponse[*v1.RollingUpdate]{
Data: &ru,
})
}
Loading

0 comments on commit 5f0237a

Please sign in to comment.