diff --git a/api/action.go b/api/action.go index 406e496b4..25c5859fd 100644 --- a/api/action.go +++ b/api/action.go @@ -7,6 +7,7 @@ import ( "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" "net/http" + "strconv" ) func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) { @@ -41,7 +42,16 @@ func (a actionRoutes) getFailedActions(c *gin.Context) { func (a actionRoutes) getActionById(c *gin.Context) { ctx := context.Background() - model, err := a.dao.GetActionByID(ctx, c.Param("id")) + id, err := strconv.Atoi(c.Param("id")) + if err != nil { + _ = c.Error(tools.RequestError{ + Status: http.StatusBadRequest, + CustomMessage: "Invalid action id", + Err: err, + }) + return + } + model, err := a.dao.GetActionByID(ctx, uint(id)) if err != nil { _ = c.Error(tools.RequestError{ Status: http.StatusNotFound, diff --git a/api/runner_grpc.go b/api/runner_grpc.go index a883e1636..3c29a94e2 100644 --- a/api/runner_grpc.go +++ b/api/runner_grpc.go @@ -9,7 +9,6 @@ import ( "github.com/TUM-Dev/gocast/model" "github.com/TUM-Dev/gocast/tools" "github.com/getsentry/sentry-go" - log "github.com/sirupsen/logrus" "github.com/tum-dev/gocast/runner/protobuf" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -20,6 +19,7 @@ import ( "net" "net/http" "regexp" + "strconv" "strings" "time" ) @@ -53,32 +53,26 @@ func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.Regist } func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) { - runner := model.Runner{ - Hostname: request.Hostname, - Port: int(request.Port), - } - - r, err := g.RunnerDao.Get(ctx, runner.Hostname) + r, err := g.RunnerDao.Get(ctx, request.Hostname) if err != nil { - log.WithError(err).Error("Failed to get runner") + logger.Error("Failed to get runner", "err", err) return &protobuf.HeartbeatResponse{Ok: false}, err } - newStats := model.Runner{ - Hostname: request.Hostname, - Port: int(request.Port), - LastSeen: time.Now(), - Status: "Alive", - Workload: uint(request.Workload), - CPU: request.CPU, - Memory: request.Memory, - Disk: request.Disk, - Uptime: request.Uptime, - Version: request.Version, - Actions: request.CurrentAction, - } - ctx = context.WithValue(ctx, "newStats", newStats) - log.Info("Updating runner stats ", "runner", r) + newStat := make(map[string]interface{}) + newStat["LastSeen"] = time.Now() + newStat["Status"] = "Alive" + newStat["Workload"] = uint(request.Workload) + newStat["CPU"] = request.CPU + newStat["Memory"] = request.Memory + newStat["Disk"] = request.Disk + newStat["Uptime"] = request.Uptime + newStat["Version"] = request.Version + newStat["Actions"] = request.CurrentAction + + logger.Info("the actions of this runner", "runner", r, "actions", request.CurrentAction) + ctx = context.WithValue(ctx, "newStats", newStat) + logger.Info("Updating runner stats ", "runner", r) p, err := r.UpdateStats(dao.DB, ctx) return &protobuf.HeartbeatResponse{Ok: p}, err } @@ -243,7 +237,7 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob return nil, err } if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) { - log.WithFields(log.Fields{"streamId": stream.ID}).Warn("Stream rejected, time out of bounds") + logger.Warn("Stream rejected, time out of bounds", "streamID", stream.ID) return nil, errors.New("stream rejected") } ingestServer, err := g.IngestServerDao.GetBestIngestServer() @@ -458,7 +452,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() { values["source"] = lectureHallForStream.PresIP err = CreateJob(dao, ctx, values) //presentation if err != nil { - log.Error("Can't create job", err) + logger.Error("Can't create job", err) } break case 2: //camera @@ -511,7 +505,7 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { if action.End.Before(time.Now().Add(-5 * time.Minute)) { action.SetToIgnored() err = dao.ActionDao.UpdateAction(ctx, &action) - log.Info("Action ignored, check for progress manually", "action", action.ID) + logger.Info("Action ignored, check for progress manually", "action", action.ID) continue } runner, err := action.GetCurrentRunner() @@ -524,7 +518,8 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { } continue } - if !runner.IsAlive() && !action.IsCompleted() { + hasAction := strings.Contains(runner.Actions, strconv.Itoa(int(action.ID))) + if !runner.IsAlive() && !action.IsCompleted() && hasAction { action.SetToFailed() err = dao.ActionDao.UpdateAction(ctx, &action) if err != nil { @@ -540,13 +535,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { } for _, failedAction := range failedActions { failedAction.SetToRunning() - err = AssignRunnerAction(dao, &failedAction) - err = dao.ActionDao.UpdateAction(ctx, &failedAction) - if err != nil { - return - } + err := AssignRunnerAction(dao, &failedAction) if err != nil { logger.Error("Can't assign runner to action", err) + return } } @@ -569,12 +561,13 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { logger.Error("Can't update job", err) continue } + action.SetToRunning() err = AssignRunnerAction(dao, action) if err != nil { logger.Error("Can't assign runner to action", err) continue } - action.SetToRunning() + err = dao.ActionDao.UpdateAction(ctx, action) if err != nil { return @@ -595,15 +588,18 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error { return err } runner, err := getRunnerWithLeastWorkloadForJob(runners, action.Type) - action.AssignRunner(runner) ctx := context.Background() - + err = dao.AssignRunner(ctx, action, &runner) if err != nil { - logger.Error("Can't unmarshal json", err) + logger.Error("Can't assign action", err) return err } values := map[string]interface{}{} err = json.Unmarshal([]byte(action.Values), &values) + if err != nil { + logger.Error("Can't unmarshal json", err) + return err + } for key, value := range values { //logger.Info("values", "value", value) ctx = context.WithValue(ctx, key, value) @@ -619,13 +615,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error { //TranscodingRequest(ctx, dao, runner) break } - //action.SetToRunning() + logger.Info("runner counts", "count", len(action.AllRunners)) err = dao.ActionDao.UpdateAction(ctx, action) if err != nil { - logger.Error("couldn't update action!", "error", err) + logger.Error("Can't update action", err) return err } - logger.Info("runner counts", "count", len(action.AllRunners)) return nil } @@ -686,7 +681,7 @@ func (g GrpcRunnerServer) mustEmbedUnimplementedFromRunnerServer() { func StartGrpcRunnerServer() { lis, err := net.Listen("tcp", ":50056") if err != nil { - log.WithError(err).Error("Failed to init grpc server") + logger.Error("Failed to init grpc server", "err", err) return } grpcServer := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ @@ -700,7 +695,7 @@ func StartGrpcRunnerServer() { reflection.Register(grpcServer) go func() { if err = grpcServer.Serve(lis); err != nil { - log.WithError(err).Errorf("Can't serve grpc") + logger.Error("Can't serve grpc", "err", err) } }() } diff --git a/dao/Action.go b/dao/action.go similarity index 69% rename from dao/Action.go rename to dao/action.go index 912921890..06caa11ea 100644 --- a/dao/Action.go +++ b/dao/action.go @@ -10,15 +10,16 @@ import ( type ActionDao interface { CreateAction(ctx context.Context, action *model.Action) error - CompleteAction(ctx context.Context, actionID string) error - GetActionByID(ctx context.Context, actionID string) (model.Action, error) - GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) + CompleteAction(ctx context.Context, actionID uint) error + GetActionByID(ctx context.Context, actionID uint) (model.Action, error) + GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) GetAwaitingActions(ctx context.Context) ([]model.Action, error) GetRunningActions(ctx context.Context) ([]model.Action, error) GetAll(ctx context.Context) ([]model.Action, error) GetAllFailedActions(ctx context.Context) ([]model.Action, error) UpdateAction(ctx context.Context, action *model.Action) error - GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) + AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error + GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) } type actionDao struct { @@ -33,17 +34,17 @@ func (d actionDao) CreateAction(ctx context.Context, action *model.Action) error return d.db.WithContext(ctx).Create(&action).Error } -func (d actionDao) CompleteAction(ctx context.Context, actionID string) error { +func (d actionDao) CompleteAction(ctx context.Context, actionID uint) error { return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", actionID).Update("status", "completed").Error } -func (d actionDao) GetActionByID(ctx context.Context, actionID string) (model.Action, error) { +func (d actionDao) GetActionByID(ctx context.Context, actionID uint) (model.Action, error) { var action model.Action err := d.db.WithContext(ctx).First(&action, "id = ?", actionID).Error return action, err } -func (d actionDao) GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) { +func (d actionDao) GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) { var actions []model.Action err := d.db.WithContext(ctx).Find(&actions, "job_id = ?", jobID).Error return actions, err @@ -57,7 +58,7 @@ func (d actionDao) GetAwaitingActions(ctx context.Context) ([]model.Action, erro func (d actionDao) GetRunningActions(ctx context.Context) ([]model.Action, error) { var actions []model.Action - err := d.db.WithContext(ctx).Find(&actions, "status = ?", 1).Error + err := d.db.WithContext(ctx).Preload("AllRunners").Find(&actions, "status = ?", 1).Error return actions, err } @@ -74,10 +75,17 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err } func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error { - return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error + err := d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error + act, _ := d.GetActionByID(ctx, action.ID) + logger.Info("updated action", "action", len(act.AllRunners)) + return err } -func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) { +func (d actionDao) AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error { + return d.db.WithContext(ctx).Model(&action).Association("AllRunners").Append(runner) +} + +func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) { var actions []model.Action err := d.db.WithContext(ctx).Joins("AllRunners").Where("id = ?", runnerID).Find(&actions).Error return actions, err diff --git a/dao/Jobs.go b/dao/jobs.go similarity index 100% rename from dao/Jobs.go rename to dao/jobs.go diff --git a/go.work.sum b/go.work.sum index e89e0d21a..2e6e52e7b 100644 --- a/go.work.sum +++ b/go.work.sum @@ -695,6 +695,7 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY= @@ -704,6 +705,7 @@ golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= @@ -766,7 +768,6 @@ google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQf google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/model/action.go b/model/action.go index e77eb0055..39a3302ca 100644 --- a/model/action.go +++ b/model/action.go @@ -71,11 +71,6 @@ func (a *Action) GetCurrentRunner() (*Runner, error) { return &a.AllRunners[len(a.AllRunners)-1], nil } -func (a *Action) AssignRunner(runner Runner) { - a.AllRunners = append(a.AllRunners, runner) - logger.Info("runner count", "count", len(a.AllRunners)) -} - func (a *Action) GetValues() string { return a.Values } diff --git a/model/runner.go b/model/runner.go index ab592e7db..8e79feda4 100644 --- a/model/runner.go +++ b/model/runner.go @@ -38,17 +38,17 @@ func (r *Runner) BeforeCreate(tx *gorm.DB) (err error) { // UpdateStats SendHeartbeat updates the last seen time of the runner and gives runner stats func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) { - newStats := ctx.Value("newStats").(Runner) + newStats := ctx.Value("newStats").(map[string]interface{}) + logger.Info("updating stats", "newStats", newStats) err := tx.WithContext(ctx).Model(&r).Updates(newStats).Error if err != nil { return false, err } - return true, nil } func (r *Runner) IsAlive() bool { - r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -2)) + r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1)) if r.Alive { r.Status = "Alive" } else { diff --git a/runner/ServerHandler.go b/runner/ServerHandler.go index f0035a7bb..f9bf05e3c 100644 --- a/runner/ServerHandler.go +++ b/runner/ServerHandler.go @@ -39,7 +39,7 @@ func (r *Runner) RegisterWithGocast(retries int) { go func() { for { r.ReadDiagnostics(5) - time.Sleep(time.Minute) + time.Sleep(time.Second * 30) } }() } diff --git a/runner/actions/stream.go b/runner/actions/stream.go index 7a5c1938a..3af65b337 100644 --- a/runner/actions/stream.go +++ b/runner/actions/stream.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/tum-dev/gocast/runner/protobuf" + "io" "log/slog" "os" "os/exec" @@ -77,19 +78,19 @@ func (a *ActionProvider) StreamAction() *Action { } } - src := "" + opt := "" if strings.HasPrefix(source, "rtsp") { - src += "-rtsp_transport tcp" + opt += "-rtsp_transport tcp" } else if strings.HasPrefix(source, "rtmp") { - src += "-rw_timeout 5000000" // timeout selfstream s after 5 seconds of no data + opt += "-rw_timeout 5000000" // timeout selfstream s after 5 seconds of no data } else { - src += "-re" // read input at native framerate, e.g. when streaming a file in realtime + opt += "-re" // read input at native framerate, e.g. when streaming a file in realtime } log.Info("streaming", "source", source, "end", time.Until(end).Seconds()) //changing the end variable from a date to a duration and adding the duration to the current time - cmd := fmt.Sprintf(a.Cmd.Stream, src, time.Until(end).Seconds(), source, filename, filepath.Join(a.GetLiveDir(courseID, streamID, version), end.Format("15-04-05")), livePlaylist) + cmd := fmt.Sprintf(a.Cmd.Stream, opt, time.Until(end).Seconds(), source, filepath.Join(a.GetLiveDir(courseID, streamID, version), end.Format("15-04-05")), livePlaylist) c := exec.CommandContext(ctx, "ffmpeg", strings.Split(cmd, " ")...) c.Stderr = os.Stderr @@ -124,6 +125,33 @@ func (a *ActionProvider) StreamAction() *Action { time.Sleep(5 * time.Second) // little backoff to prevent dossing source continue } + //move the files into the ceph storage + file, err := os.Open(filename) + if err != nil { + log.Warn("streamAction: failed to open file, move file to dest manually", "err", err) + return ctx, err + } + defer file.Close() + + copy, err := os.Create(filepath.Join(a.GetMassDir(courseID, streamID, version), end.Format("15-04-05")+".ts")) + if err != nil { + log.Warn("streamAction: failed to create file, move file to dest manually", "err", err) + return ctx, err + } + defer copy.Close() + + _, err = io.Copy(copy, file) + if err != nil { + log.Warn("streamAction: failed to copy file, move file to dest manually", "err", err) + return ctx, err + } + err = os.Remove(filename) + if err != nil { + log.Warn("streamAction: failed to remove file, move file to dest manually", "err", err) + return ctx, err + } + //end file moving + log.Info("stream finished. now sending notification") resp = a.Server.NotifyStreamEnded(ctx, &protobuf.StreamEnded{ RunnerID: hostname, diff --git a/runner/cmd.yaml b/runner/cmd.yaml index 5d2aa34e7..6e1d5d301 100644 --- a/runner/cmd.yaml +++ b/runner/cmd.yaml @@ -1,4 +1,6 @@ -stream: '-y -hide_banner -loglevel quiet -nostats %v -t %.0f -i %v -c:v copy -c:a copy -f mpegts %v -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %v/%%05d.ts %v' +stream: '-y -hide_banner -loglevel quiet -nostats %s -t %.0f -i %s -c:v copy -c:a copy -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %s/%%05d.ts %s' + +selfstream: '-preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -ar 44100 -b:a 128k' SeparateAudioFast: "-i %v -vn -c:a copy %v" SeparateAudio: "-i %v -vn %v" diff --git a/runner/handlers.go b/runner/handlers.go index f4329751f..803a156bd 100644 --- a/runner/handlers.go +++ b/runner/handlers.go @@ -24,7 +24,6 @@ func contextFromTranscodingReq(req *protobuf.TranscodingRequest, ctx context.Con } func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest) (*protobuf.StreamResponse, error) { - r.ReadDiagnostics(5) ctx = context.Background() ctx = contextFromStreamReq(req, ctx) ctx = context.WithValue(ctx, "URL", "")