Skip to content

Commit

Permalink
Merge pull request #58 from CloudOS-Group3/feature/gpu
Browse files Browse the repository at this point in the history
Feature/gpu
  • Loading branch information
Doris-xm authored Jun 9, 2024
2 parents b7087aa + f1cdca2 commit d0b7718
Show file tree
Hide file tree
Showing 22 changed files with 913 additions and 5 deletions.
23 changes: 23 additions & 0 deletions pkg/api/gpuJob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package api

type GPUJob struct {
// Metadata: name, namespace, uuid
Metadata ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"`

Args map[string]string `json:"args,omitempty" yaml:"args,omitempty"`

// SourcePath: the path of the source file
SourcePath string `json:"sourcePath,omitempty" yaml:"sourcePath,omitempty"`

// Result: the result of the job
Result string `json:"result,omitempty" yaml:"result,omitempty"`

// Status: Created, Running, Ended
Status string `json:"status,omitempty" yaml:"status,omitempty"`

// StartTime: the start time of the job
StartTime string `json:"startTime,omitempty" yaml:"startTime,omitempty"`

// EndTime: the end time of the job
EndTime string `json:"endTime,omitempty" yaml:"endTime,omitempty"`
}
80 changes: 80 additions & 0 deletions pkg/apiserver/handlers/gpujob_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package handlers

import (
"encoding/json"
"github.com/gin-gonic/gin"
"minik8s/pkg/api"
"minik8s/pkg/api/msg_type"
"minik8s/pkg/config"
"minik8s/pkg/gpu"
"minik8s/util/log"
"minik8s/util/stringutil"
"net/http"
"time"
)

func AddGpuFunc(context *gin.Context) {
log.Info("Add gpu spec")
var gpujob api.GPUJob
if err := context.ShouldBind(&gpujob); err != nil {
context.JSON(http.StatusBadRequest, gin.H{
"status": "wrong",
})
return
}
gpujob.Metadata.UUID = stringutil.GenerateRandomString(5)

// Step1: Build image
err := gpu.CreateGpuImage(&gpujob)
if err != nil {
log.Error("Error create image: %s", err.Error())
context.JSON(http.StatusBadRequest, gin.H{
"status": "wrong",
})
return
}

// Step2: Create pod to run the image
pod := gpu.CreateGPUPod(&gpujob)
etcdClient.PutPod(*pod)
msg := msg_type.PodMsg{
Opt: msg_type.Add,
NewPod: *pod,
}
msg_json, _ := json.Marshal(msg)
publisher.Publish(msg_type.PodTopic, string(msg_json))

gpujob.StartTime = time.Now().Format("2006-01-02 15:04:05")

// Step3: Save gpujob to etcd, we save as <name>-<uuid>
URL := config.GPUjobPath + gpujob.Metadata.Name + "-" + gpujob.Metadata.UUID
gpuByteArr, err := json.Marshal(gpujob)
if err != nil {
log.Error("Error marshal gpu spec: %s", err.Error())
return
}
etcdClient.PutEtcdPair(URL, string(gpuByteArr))
}

func GetAllGpuJobs(context *gin.Context) {
URL := config.GPUjobPath
gpujobs := etcdClient.PrefixGet(URL)

jsonString := stringutil.EtcdResEntryToJSON(gpujobs)
context.JSON(http.StatusOK, gin.H{
"data": jsonString,
})

}

func GetGpuJobsByName(context *gin.Context) {
name := context.Param("name")
URL := config.GPUjobPath + name
gpujobs := etcdClient.PrefixGet(URL)

jsonString := stringutil.EtcdResEntryToJSON(gpujobs)
context.JSON(http.StatusOK, gin.H{
"data": jsonString,
})

}
49 changes: 49 additions & 0 deletions pkg/apiserver/handlers/jobhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"minik8s/pkg/api"
msg "minik8s/pkg/api/msg_type"
"minik8s/pkg/config"
"minik8s/pkg/gpu"
"minik8s/util/httputil"
"minik8s/util/log"
"minik8s/util/stringutil"
"net/http"
"os/exec"
"strings"
"time"
)

func GetJobs(context *gin.Context) {
Expand Down Expand Up @@ -197,3 +200,49 @@ func JobResultHandler(context *gin.Context) {
msg_json, _ := json.Marshal(message)
publisher.Publish(msg.JobTopic, string(msg_json))
}

func GpuResultHandler(context *gin.Context) {
log.Info("received gpu result request")
var newResult api.JobResult
bytes, _ := ioutil.ReadAll(context.Request.Body)
_ = json.Unmarshal(bytes, &newResult)
log.Info("job result info: %+v", newResult)

// Here UUID is <job_name>-<uuid>
URL := config.GPUjobPath + newResult.UUID
jobJson := etcdClient.GetEtcdPair(URL)
var job api.GPUJob
_ = json.Unmarshal([]byte(jobJson), &job)
log.Info("job info: %+v", job)

job.Status = api.JOB_ENDED
if newResult.Error == "" {
job.Result = newResult.Result
log.Info("job result info: %+v", job.Result)
} else {
job.Result = job.SourcePath
}
job.EndTime = time.Now().Format("2006-01-02 15:04:05")

jobByteArr, _ := json.Marshal(job)
log.Info("job result: %v", job)
etcdClient.PutEtcdPair(URL, string(jobByteArr))

// get pod, pod name is <job_name>-<uuid>
pod, success := etcdClient.GetPod(gpu.GPUNamespace, newResult.UUID)
if pod.Spec.NodeName == "node1" {
_ = exec.Command("scp", "-r", "[email protected]:"+job.SourcePath, job.SourcePath).Run()
} else if pod.Spec.NodeName == "node2" {
_ = exec.Command("scp", "-r", "[email protected]:"+job.SourcePath, job.SourcePath).Run()
}
if success {
etcdClient.DeletePod(gpu.GPUNamespace, pod.Metadata.Name)
message := msg.PodMsg{
Opt: msg.Delete,
OldPod: pod,
}
msg_json, _ := json.Marshal(message)
publisher.Publish(msg.PodTopic, string(msg_json))
}

}
5 changes: 5 additions & 0 deletions pkg/apiserver/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,10 @@ func (server *apiServer) bind() {
server.router.DELETE(config.JobURL, handlers.DeleteJob)
server.router.PUT(config.JobURL, handlers.UpdateJob)
server.router.POST(config.JobResultURL, handlers.JobResultHandler)
server.router.POST(config.GpuJobResultURL, handlers.GpuResultHandler)

server.router.POST(config.GPUJobURL, handlers.AddGpuFunc)
server.router.GET(config.GPUJobsURL, handlers.GetAllGpuJobs)
server.router.GET(config.GPUJobURL, handlers.GetGpuJobsByName)

}
3 changes: 3 additions & 0 deletions pkg/config/etcdconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ const (

// usage: /registry/pvc/<name>
ETcdPVCPath = "/registry/pvc/"

// usage: /registry/gpujob/<name>
GPUjobPath = "/registry/gpujob/"
)
6 changes: 5 additions & 1 deletion pkg/config/urlconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ const (
TriggerResultURL = "/api/v1/triggers/result/:uuid"
TriggerResultsURL = "/api/v1/triggers/result"

JobResultURL = "/result"
JobResultURL = "/result"
GpuJobResultURL = "/gpu_result"

GPUJobsURL = "/api/v1/gpujobs"
GPUJobURL = "/api/v1/gpujob/:name"
)

// const used to send and parse url
Expand Down
97 changes: 97 additions & 0 deletions pkg/gpu/gpu_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package gpu

import (
"github.com/google/uuid"
"minik8s/pkg/api"
"minik8s/pkg/config"
"minik8s/pkg/serverless/function/function_util"
"minik8s/util/log"
"minik8s/util/stringutil"
"os/exec"
)

const GPUNamespace = "GPUJob"

func GetGPUPodName(gpu_config *api.GPUJob) string {
return gpu_config.Metadata.Name + "-" + gpu_config.Metadata.UUID
}

func CreateGPUPod(gpu_config *api.GPUJob) *api.Pod {
log.Info("Create gpu pod")
imageName := config.Remotehost + ":" + function_util.RegistryPort + "/" + function_util.GetImageName(gpu_config.Metadata.Name, GPUNamespace)
pod := &api.Pod{
Metadata: api.ObjectMeta{
Name: GetGPUPodName(gpu_config),
NameSpace: GPUNamespace,
UUID: uuid.NewString(),
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: gpu_config.Metadata.Name + stringutil.GenerateRandomString(4),
Image: imageName,
ImagePullPolicy: api.PullFromRegistry,
VolumeMounts: []api.VolumeMount{
{
Name: "shared-data",
MountPath: "/app/src",
},
},
},
},
Volumes: []api.Volume{
{
Name: "shared-data",
HostPath: gpu_config.SourcePath,
},
},
},
}
log.Debug("%+v\n", pod)
return pod
}

func CreateGpuImage(gpu_config *api.GPUJob) error {
// Step 1: Build Image
// docker build --build-arg SOURCE_DIR=/path/to/source -t my-python-app .
cmd := exec.Command("docker", "build",
"--build-arg", "job_name="+gpu_config.Metadata.Name+"-"+gpu_config.Metadata.UUID,
"--build-arg", "partition="+gpu_config.Args["partition"],
"--build-arg", "N="+gpu_config.Args["N"],
"--build-arg", "ntasks_per_node="+gpu_config.Args["ntasks-per-node"],
"--build-arg", "cpus_per_task="+gpu_config.Args["cpus-per-task"],
"--build-arg", "gres="+gpu_config.Args["gres"],
"-t",
function_util.GetImageName(gpu_config.Metadata.Name, GPUNamespace), "/root/minik8s/pkg/gpu/image/")
output, err := cmd.CombinedOutput()
log.Info("output: %s", string(output))
if err != nil {
log.Error("failed to run build: %s", string(output))
return err
}

// Step 2: Tag Image
// docker tag myimage:latest localhost:5000/myimage:latest
cmd = exec.Command("docker", "tag", function_util.GetImageName(gpu_config.Metadata.Name, GPUNamespace),
config.Remotehost+":"+function_util.RegistryPort+"/"+function_util.GetImageName(gpu_config.Metadata.Name, GPUNamespace))
log.Info("cmd: %s", cmd.String())
output, err = cmd.CombinedOutput()
//log.Info("output: %s", string(output))
if err != nil {
log.Error("failed to run tag: %s", string(output))
return err
}

// Step 3: Push Image
// docker push localhost:5000/myimage:latest
cmd = exec.Command("docker", "push",
config.Remotehost+":"+function_util.RegistryPort+"/"+function_util.GetImageName(gpu_config.Metadata.Name, GPUNamespace))
//log.Info("cmd: %s", cmd.String())
output, err = cmd.CombinedOutput()
log.Info("output: %s", string(output))
if err != nil {
log.Error("failed to run push: %s", string(output))
return err
}
return nil
}
36 changes: 36 additions & 0 deletions pkg/gpu/image/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
FROM python:3.11-slim-bookworm

ENV PIP_DEFAULT_TIMEOUT=100 \
# Allow statements and log messages to immediately appear
PYTHONUNBUFFERED=1 \
# disable a pip version check to reduce run-time & log-spam
PIP_DISABLE_PIP_VERSION_CHECK=1 \
# cache is useless in docker image, so disable to reduce image size
PIP_NO_CACHE_DIR=1

# Declare build arguments
ARG job_name
ARG partition
ARG N
ARG ntasks_per_node
ARG cpus_per_task
ARG gres

# Set environment variables from build arguments
ENV JOB_NAME=${job_name}
ENV PARTITION=${partition}
ENV N=${N}
ENV NTASKS_PER_NODE=${ntasks_per_node}
ENV CPUS_PER_TASK=${cpus_per_task}
ENV GRES=${gres}

WORKDIR /app

COPY gpu_server.py /app

RUN cd /app
RUN pip install requests
RUN pip install paramiko

CMD ["python", "gpu_server.py"]

Loading

0 comments on commit d0b7718

Please sign in to comment.