Skip to content

Commit

Permalink
Merge pull request #41 from GMH233/feature/microservice-zc
Browse files Browse the repository at this point in the history
feat: bookinfo部署
  • Loading branch information
sjtuzc954 authored May 27, 2024
2 parents e66c6db + 554f210 commit 1414b32
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 38 deletions.
3 changes: 3 additions & 0 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ type NodeStatus struct {
Address string `json:"address,omitempty"`
}

// ServiceName -> ClusterIP
type SidecarServiceNameMapping map[string]string

type SidecarMapping map[string][]SidecarEndpoints

type SidecarEndpoints struct {
Expand Down
47 changes: 45 additions & 2 deletions pkg/kubeapiserver/app/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ const (
NamespaceSubsetsURL = "/api/v1/namespaces/:namespace/subsets"
SingleSubsetURL = "/api/v1/namespaces/:namespace/subsets/:subsetname"

SidecarMappingURL = "/api/v1/sidecar-mapping"
SidecarMappingURL = "/api/v1/sidecar-mapping"
SidecarServiceNameMappingURL = "/api/v1/sidecar-service-name-mapping"
)

/* NAMESPACE
Expand Down Expand Up @@ -237,6 +238,7 @@ func (ser *kubeApiServer) binder() {

ser.router.GET(SidecarMappingURL, ser.GetSidecarMapping)
ser.router.POST(SidecarMappingURL, ser.SaveSidecarMapping)
ser.router.GET(SidecarServiceNameMappingURL, ser.GetSidecarServiceNameMapping)
}

func (s *kubeApiServer) GetStatsDataHandler(c *gin.Context) {
Expand Down Expand Up @@ -673,6 +675,30 @@ func (ser *kubeApiServer) AddPodHandler(con *gin.Context) {

pod.Status.Phase = v1.PodPending

namespace := con.Param("namespace")
if namespace == "" {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace is required",
})
return
}
if pod.Namespace == "" {
if namespace != Default_Namespace {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace does not match",
})
return
}
} else {
if pod.Namespace != namespace {
con.JSON(http.StatusBadRequest, gin.H{
"error": "namespace does not match",
})
return
}
}
pod.Namespace = namespace

/* fake store pod to:
1. namespace , store the binding of podname and uid
2. node , only uid
Expand All @@ -687,7 +713,7 @@ func (ser *kubeApiServer) AddPodHandler(con *gin.Context) {
all_pod_keystr := prefix + "/pods/" + string(pod.ObjectMeta.UID)

// namespace里面对应的是podname和uid的映射
namespace_pod_keystr := prefix + "/namespaces/" + Default_Namespace + "/pods/" + pod_name
namespace_pod_keystr := prefix + "/namespaces/" + namespace + "/pods/" + pod_name

// node里面对应的也是podname和uid的映射
// node_pod_keystr := prefix + "/nodes/" + Default_Nodename + "/pods/" + pod_name
Expand Down Expand Up @@ -2705,3 +2731,20 @@ func (s *kubeApiServer) GetSidecarMapping(c *gin.Context) {
Data: &mapping,
})
}

func (s *kubeApiServer) GetSidecarServiceNameMapping(c *gin.Context) {
services, err := s.getAllServicesFromEtcd()
if err != nil {
c.JSON(http.StatusInternalServerError, v1.BaseResponse[v1.SidecarServiceNameMapping]{
Error: err.Error(),
})
return
}
mapping := make(v1.SidecarServiceNameMapping)
for _, svc := range services {
mapping[svc.Name] = svc.Spec.ClusterIP
}
c.JSON(http.StatusOK, v1.BaseResponse[v1.SidecarServiceNameMapping]{
Data: mapping,
})
}
25 changes: 24 additions & 1 deletion pkg/kubeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Client interface {
GetSubsetByName(name, namespace string) (*v1.Subset, error)

AddSidecarMapping(maps v1.SidecarMapping) error

GetSidecarServiceNameMapping() (v1.SidecarServiceNameMapping, error)
}

type client struct {
Expand Down Expand Up @@ -300,7 +302,7 @@ func (c *client) UploadPodMetrics(metrics []*v1.PodRawMetrics) error {

metricsStr, _ := json.Marshal(metrics)

fmt.Printf("upload metrics str: %s\n", string(metricsStr))
// fmt.Printf("upload metrics str: %s\n", string(metricsStr))

req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(metricsStr))
if err != nil {
Expand Down Expand Up @@ -432,3 +434,24 @@ func (c *client) AddSidecarMapping(maps v1.SidecarMapping) error {
}
return nil
}

func (c *client) GetSidecarServiceNameMapping() (v1.SidecarServiceNameMapping, error) {
resp, err := http.Get(fmt.Sprintf("http://%s:8001/api/v1/sidecar-service-name-mapping", c.apiServerIP))
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var baseResponse v1.BaseResponse[v1.SidecarServiceNameMapping]
err = json.Unmarshal(body, &baseResponse)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("get sidecar service name mapping failed, error: %s", baseResponse.Error)
}
return baseResponse.Data, nil
}
1 change: 1 addition & 0 deletions pkg/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (kls *KubeletServer) RunKubelet(ctx context.Context, wg *sync.WaitGroup) {
func (kls *KubeletServer) createAndInitKubelet() (*kubelet.Kubelet, error) {
kl, err := kubelet.NewMainKubelet(kls.nodeName, kls.kubeClient)
if err != nil {
log.Printf("Failed to create kubelet: %v", err)
return nil, err
}
return kl, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan types.
case <-syncCh:
// TODO 定时同步Pod信息到metrics collector
allPods, err := kl.runtimeManager.GetAllPods()
log.Printf("allPods: %v\n", allPods)
// log.Printf("allPods: %v\n", allPods)
if err != nil {
log.Printf("Failed to get all pods: %v\n", err)
return true
}
// 由allPods取到所有的status
var podStatusList []*runtime.PodStatus
for _, pod := range allPods {
log.Printf("pod: %v\n", pod)
// log.Printf("pod: %v\n", pod)
podStatus, err := kl.runtimeManager.GetPodStatus(pod.ID, pod.Name, pod.Namespace)
if err != nil {
log.Printf("Failed to get pod %v status: %v\n", pod.Name, err)
Expand Down Expand Up @@ -186,6 +186,7 @@ func (kl *Kubelet) SyncPod(pod *v1.Pod, syncPodType types.SyncPodType, podStatus
log.Printf("Creating pod %v using container manager.\n", pod.Name)
err := kl.runtimeManager.AddPod(pod)
if err != nil {
log.Printf("Failed to create pod %v: %v\n", pod.Name, err)
return
}
log.Printf("Pod %v created.\n", pod.Name)
Expand Down
79 changes: 53 additions & 26 deletions pkg/kubelet/runtime/runtime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (rm *runtimeManager) GetAllPods() ([]*Pod, error) {
defer rm.lock.Unlock()
containers, err := rm.getAllContainers()
if err != nil {
panic(err)
// panic(err)
return nil, err
}
var ret []*Pod

Expand Down Expand Up @@ -87,7 +88,8 @@ func (rm *runtimeManager) GetPodStatus(ID v1.UID, PodName string, PodSpace strin
defer rm.lock.Unlock()
containers, err := rm.getPodContainers(PodName)
if err != nil {
panic(err)
//panic(err)
return nil, err
}
podStatus := &PodStatus{
ID: ID,
Expand All @@ -107,7 +109,8 @@ func (rm *runtimeManager) GetPodStatus(ID v1.UID, PodName string, PodSpace strin
func (rm *runtimeManager) getPodContainers(PodName string) ([]*ContainerStatus, error) {
containers, err := rm.getAllContainers()
if err != nil {
panic(err)
//panic(err)
return nil, err
}
var ret []*ContainerStatus
for _, container := range containers {
Expand Down Expand Up @@ -138,7 +141,8 @@ func (rm *runtimeManager) AddPod(pod *v1.Pod) error {
//PauseId, err := rm.CreatePauseContainer(pod.UID, pod.Name, pod.Namespace)
PauseId, err := rm.CreatePauseContainer(pod)
if err != nil {
panic(err)
//panic(err)
return err
}

for _, c := range pod.Spec.InitContainers {
Expand All @@ -156,7 +160,8 @@ func (rm *runtimeManager) AddPod(pod *v1.Pod) error {
}
_, err = rm.createContainer(&container, PauseId, pod.UID, pod.Name, pod.Namespace, volumes)
if err != nil {
panic(err)
//panic(err)
return err
}
}

Expand All @@ -171,19 +176,22 @@ func (rm *runtimeManager) CreatePauseContainer(pod *v1.Pod) (string, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer cli.Close()
PauseContainerImage := "registry.aliyuncs.com/google_containers/pause:3.6"
exi, err := rm.checkImages(PauseContainerImage)
if err != nil {
panic(err)
//panic(err)
return "", err
}
if !exi {
//fmt.Println("yes")
reader, err := cli.ImagePull(ctx, PauseContainerImage, image.PullOptions{})
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer reader.Close()
io.Copy(os.Stdout, reader)
Expand All @@ -206,16 +214,19 @@ func (rm *runtimeManager) CreatePauseContainer(pod *v1.Pod) (string, error) {
DNS: []string{rm.nameserverIP},
}, nil, nil, "")
if err != nil {
panic(err)
//panic(err)
return "", err
}

if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
panic(err)
//panic(err)
return "", err
}

ip, err := nw.Attach(resp.ID)
if err != nil {
panic(err)
//panic(err)
return "", err
}
rm.IpMap[PodID] = ip

Expand Down Expand Up @@ -276,10 +287,13 @@ func (rm *runtimeManager) createInitContainer(c *v1.Container, pauseID string) e
return err
}
if !exist {
_, err = cli.ImagePull(context.Background(), c.Image, image.PullOptions{})
readCloser, err := cli.ImagePull(context.Background(), c.Image, image.PullOptions{})
if err != nil {
return err
}
// 读取pull的输出
_, _ = io.ReadAll(readCloser)
_ = readCloser.Close()
}
hostConfig := &container.HostConfig{
NetworkMode: container.NetworkMode("container:" + pauseID),
Expand Down Expand Up @@ -325,19 +339,22 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer cli.Close()

exi, err := rm.checkImages(repotag)
if err != nil {
panic(err)
//panic(err)
return "", err
}
if !exi {
//fmt.Println("yes")
reader, err := cli.ImagePull(ctx, "docker.io/library/"+repotag, image.PullOptions{})
reader, err := cli.ImagePull(ctx, repotag, image.PullOptions{})
if err != nil {
panic(err)
//panic(err)
return "", err
}
defer reader.Close()
io.Copy(os.Stdout, reader)
Expand Down Expand Up @@ -403,11 +420,13 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI

resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, nil, "")
if err != nil {
panic(err)
//panic(err)
return "", err
}

if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
panic(err)
//panic(err)
return "", err
}

// statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
Expand All @@ -421,7 +440,8 @@ func (rm *runtimeManager) createContainer(ct *v1.Container, PauseId string, PodI

out, err := cli.ContainerLogs(ctx, resp.ID, container.LogsOptions{ShowStdout: true})
if err != nil {
panic(err)
//panic(err)
return "", err
}

stdcopy.StdCopy(os.Stdout, os.Stderr, out)
Expand All @@ -432,13 +452,15 @@ func (rm *runtimeManager) checkImages(repotag string) (bool, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return false, err
}
defer cli.Close()
images, err := cli.ImageList(ctx, image.ListOptions{})
if err != nil {
//fmt.Println("fail to get images", err)
panic(err)
//panic(err)
return false, err
}
//fmt.Println("Docker Images:")
for _, image := range images {
Expand Down Expand Up @@ -477,13 +499,15 @@ func (rm *runtimeManager) getAllContainers() ([]types.Container, error) {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return nil, err
}
defer cli.Close()

containers, err := cli.ContainerList(ctx, container.ListOptions{All: true})
if err != nil {
panic(err)
//panic(err)
return nil, err
}
var ret []types.Container
for _, container := range containers {
Expand Down Expand Up @@ -610,17 +634,20 @@ func (rm *runtimeManager) deleteContainer(ct types.Container) error {
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
panic(err)
//panic(err)
return err
}
defer cli.Close()
noWaitTimeout := 0
if ct.State == "running" {
if err := cli.ContainerStop(ctx, ct.ID, container.StopOptions{Timeout: &noWaitTimeout}); err != nil {
panic(err)
//panic(err)
return err
}
}
if err := cli.ContainerRemove(ctx, ct.ID, container.RemoveOptions{}); err != nil {
panic(err)
//panic(err)
return err
}

return nil
Expand Down
Loading

0 comments on commit 1414b32

Please sign in to comment.