Skip to content

Commit

Permalink
Merge branch 'enh/runner' into beta
Browse files Browse the repository at this point in the history
  • Loading branch information
SebiWrn committed Jan 10, 2025
2 parents 8ea33d0 + 064ed3b commit 17b1671
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 153 deletions.
12 changes: 11 additions & 1 deletion api/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"net/http"
"strconv"
)

func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) {
Expand Down Expand Up @@ -41,7 +42,16 @@ func (a actionRoutes) getFailedActions(c *gin.Context) {

func (a actionRoutes) getActionById(c *gin.Context) {
ctx := context.Background()
model, err := a.dao.GetActionByID(ctx, c.Param("id"))
id, err := strconv.Atoi(c.Param("id"))
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "Invalid action id",
Err: err,
})
return
}
model, err := a.dao.GetActionByID(ctx, uint(id))
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusNotFound,
Expand Down
103 changes: 60 additions & 43 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/TUM-Dev/gocast/model"
"github.com/TUM-Dev/gocast/tools"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
"github.com/tum-dev/gocast/runner/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -20,6 +19,7 @@ import (
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -53,32 +53,26 @@ func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.Regist
}

func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) {
runner := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
}

r, err := g.RunnerDao.Get(ctx, runner.Hostname)
r, err := g.RunnerDao.Get(ctx, request.Hostname)
if err != nil {
log.WithError(err).Error("Failed to get runner")
logger.Error("Failed to get runner", "err", err)
return &protobuf.HeartbeatResponse{Ok: false}, err
}

newStats := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
LastSeen: time.Now(),
Status: "Alive",
Workload: uint(request.Workload),
CPU: request.CPU,
Memory: request.Memory,
Disk: request.Disk,
Uptime: request.Uptime,
Version: request.Version,
Actions: request.CurrentAction,
}
ctx = context.WithValue(ctx, "newStats", newStats)
log.Info("Updating runner stats ", "runner", r)
newStat := make(map[string]interface{})
newStat["LastSeen"] = time.Now()
newStat["Status"] = "Alive"
newStat["Workload"] = uint(request.Workload)
newStat["CPU"] = request.CPU
newStat["Memory"] = request.Memory
newStat["Disk"] = request.Disk
newStat["Uptime"] = request.Uptime
newStat["Version"] = request.Version
newStat["Actions"] = request.CurrentAction

logger.Info("the actions of this runner", "runner", r, "actions", request.CurrentAction)
ctx = context.WithValue(ctx, "newStats", newStat)
logger.Info("Updating runner stats ", "runner", r)
p, err := r.UpdateStats(dao.DB, ctx)
return &protobuf.HeartbeatResponse{Ok: p}, err
}
Expand Down Expand Up @@ -243,7 +237,7 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob
return nil, err
}
if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) {
log.WithFields(log.Fields{"streamId": stream.ID}).Warn("Stream rejected, time out of bounds")
logger.Warn("Stream rejected, time out of bounds", "streamID", stream.ID)
return nil, errors.New("stream rejected")
}
ingestServer, err := g.IngestServerDao.GetBestIngestServer()
Expand Down Expand Up @@ -328,7 +322,7 @@ func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *prot
logger.Error("Can't set StreamLiveNowTimestamp", "err", err)
}

hlsUrl := fmt.Sprintf("%v:%v/%v", tools.Cfg.Edge.Domain, tools.Cfg.Edge.Port, request.HLSUrl)
hlsUrl := fmt.Sprintf("%v/%v", tools.Cfg.Edge.Domain, request.HLSUrl)

time.Sleep(time.Second * 5)
if !isHLSUrlOk(hlsUrl) {
Expand Down Expand Up @@ -458,7 +452,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() {
values["source"] = lectureHallForStream.PresIP
err = CreateJob(dao, ctx, values) //presentation
if err != nil {
log.Error("Can't create job", err)
logger.Error("Can't create job", err)
}
break
case 2: //camera
Expand Down Expand Up @@ -508,9 +502,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
logger.Error("Can't get running actions", err)
}
for _, action := range activeAction {
if action.End.Before(time.Now().Add(5 * time.Minute)) {
if action.End.Before(time.Now().Add(-5 * time.Minute)) {
action.SetToIgnored()
log.Info("Action ignored, check for progress manually", "action", action.ID)
err = dao.ActionDao.UpdateAction(ctx, &action)
logger.Info("Action ignored, check for progress manually", "action", action.ID)
continue
}
runner, err := action.GetCurrentRunner()
Expand All @@ -523,7 +518,8 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
}
continue
}
if !runner.IsAlive() && !action.IsCompleted() {
hasAction := strings.Contains(runner.Actions, strconv.Itoa(int(action.ID)))
if !runner.IsAlive() && !action.IsCompleted() && hasAction {
action.SetToFailed()
err = dao.ActionDao.UpdateAction(ctx, &action)
if err != nil {
Expand All @@ -539,13 +535,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
}
for _, failedAction := range failedActions {
failedAction.SetToRunning()
err = AssignRunnerAction(dao, &failedAction)
err = dao.ActionDao.UpdateAction(ctx, &failedAction)
if err != nil {
return
}
err := AssignRunnerAction(dao, &failedAction)
if err != nil {
logger.Error("Can't assign runner to action", err)
return
}
}

Expand All @@ -556,6 +549,9 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
return
}
for _, job := range jobs {
if job.Actions[0].Status != 3 {
continue
}
action, err := job.GetNextAction()
if err != nil {
logger.Error("Can't get next action", err)
Expand All @@ -565,12 +561,13 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
logger.Error("Can't update job", err)
continue
}
action.SetToRunning()
err = AssignRunnerAction(dao, action)
if err != nil {
logger.Error("Can't assign runner to action", err)
continue
}
action.SetToRunning()

err = dao.ActionDao.UpdateAction(ctx, action)
if err != nil {
return
Expand All @@ -591,15 +588,18 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
return err
}
runner, err := getRunnerWithLeastWorkloadForJob(runners, action.Type)
action.AssignRunner(runner)
ctx := context.Background()

err = dao.AssignRunner(ctx, action, &runner)
if err != nil {
logger.Error("Can't unmarshal json", err)
logger.Error("Can't assign action", err)
return err
}
values := map[string]interface{}{}
err = json.Unmarshal([]byte(action.Values), &values)
if err != nil {
logger.Error("Can't unmarshal json", err)
return err
}
for key, value := range values {
//logger.Info("values", "value", value)
ctx = context.WithValue(ctx, key, value)
Expand All @@ -615,7 +615,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
//TranscodingRequest(ctx, dao, runner)
break
}
action.SetToRunning()
logger.Info("runner counts", "count", len(action.AllRunners))
err = dao.ActionDao.UpdateAction(ctx, action)
if err != nil {
logger.Error("Can't update action", err)
return err
}
return nil
}

Expand All @@ -636,16 +641,28 @@ func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interf
Status: 3,
Type: "stream",
Values: string(value),
}, model.Action{
End: values["end"].(time.Time),
})
job.Actions = append(job.Actions, actions...)
break
case "transcode":
actions = append(actions, model.Action{
Status: 3,
Type: "transcode",
Values: string(value),
}, model.Action{
End: values["end"].(time.Time),
})
job.Actions = append(job.Actions, actions...)
break
case "upload":
actions = append(actions, model.Action{
Status: 3,
Type: "upload",
Values: string(value),
End: values["end"].(time.Time),
})
job.Actions = append(job.Actions, actions...)
break
}
err = dao.CreateJob(ctx, job)
if err != nil {
Expand All @@ -664,7 +681,7 @@ func (g GrpcRunnerServer) mustEmbedUnimplementedFromRunnerServer() {
func StartGrpcRunnerServer() {
lis, err := net.Listen("tcp", ":50056")
if err != nil {
log.WithError(err).Error("Failed to init grpc server")
logger.Error("Failed to init grpc server", "err", err)
return
}
grpcServer := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
Expand All @@ -678,7 +695,7 @@ func StartGrpcRunnerServer() {
reflection.Register(grpcServer)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.WithError(err).Errorf("Can't serve grpc")
logger.Error("Can't serve grpc", "err", err)
}
}()
}
3 changes: 1 addition & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ db:
password: example
user: root
edge:
domain: http://localhost
port: 8089
domain: http://localhost:8089
ingestbase: rtmp://ingest.tum.live/
jwtkey: # This is an example key, delete and restart to generate a proper one
|
Expand Down
28 changes: 18 additions & 10 deletions dao/Action.go → dao/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (

type ActionDao interface {
CreateAction(ctx context.Context, action *model.Action) error
CompleteAction(ctx context.Context, actionID string) error
GetActionByID(ctx context.Context, actionID string) (model.Action, error)
GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error)
CompleteAction(ctx context.Context, actionID uint) error
GetActionByID(ctx context.Context, actionID uint) (model.Action, error)
GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error)
GetAwaitingActions(ctx context.Context) ([]model.Action, error)
GetRunningActions(ctx context.Context) ([]model.Action, error)
GetAll(ctx context.Context) ([]model.Action, error)
GetAllFailedActions(ctx context.Context) ([]model.Action, error)
UpdateAction(ctx context.Context, action *model.Action) error
GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error)
AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error
GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error)
}

type actionDao struct {
Expand All @@ -33,17 +34,17 @@ func (d actionDao) CreateAction(ctx context.Context, action *model.Action) error
return d.db.WithContext(ctx).Create(&action).Error
}

func (d actionDao) CompleteAction(ctx context.Context, actionID string) error {
func (d actionDao) CompleteAction(ctx context.Context, actionID uint) error {
return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", actionID).Update("status", "completed").Error
}

func (d actionDao) GetActionByID(ctx context.Context, actionID string) (model.Action, error) {
func (d actionDao) GetActionByID(ctx context.Context, actionID uint) (model.Action, error) {
var action model.Action
err := d.db.WithContext(ctx).First(&action, "id = ?", actionID).Error
return action, err
}

func (d actionDao) GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) {
func (d actionDao) GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Find(&actions, "job_id = ?", jobID).Error
return actions, err
Expand All @@ -57,7 +58,7 @@ func (d actionDao) GetAwaitingActions(ctx context.Context) ([]model.Action, erro

func (d actionDao) GetRunningActions(ctx context.Context) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Find(&actions, "status = ?", 1).Error
err := d.db.WithContext(ctx).Preload("AllRunners").Find(&actions, "status = ?", 1).Error
return actions, err
}

Expand All @@ -74,10 +75,17 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err
}

func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error {
return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error
err := d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error
act, _ := d.GetActionByID(ctx, action.ID)
logger.Info("updated action", "action", len(act.AllRunners))
return err
}

func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) {
func (d actionDao) AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error {
return d.db.WithContext(ctx).Model(&action).Association("AllRunners").Append(runner)
}

func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Joins("AllRunners").Where("id = ?", runnerID).Find(&actions).Error
return actions, err
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
Expand All @@ -692,6 +693,7 @@ golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
Expand Down Expand Up @@ -751,7 +753,6 @@ google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQf
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand Down
5 changes: 1 addition & 4 deletions model/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,13 @@ func (a *Action) IsCompleted() bool {
}

func (a *Action) GetCurrentRunner() (*Runner, error) {
logger.Info("runner count", "info", a)
if len(a.AllRunners) == 0 {
return nil, errors.New("no runner assigned")
}
return &a.AllRunners[len(a.AllRunners)-1], nil
}

func (a *Action) AssignRunner(runner Runner) {
a.AllRunners = append(a.AllRunners, runner)
}

func (a *Action) GetValues() string {
return a.Values
}
Expand Down
Loading

0 comments on commit 17b1671

Please sign in to comment.