Skip to content

Commit

Permalink
work on day
Browse files Browse the repository at this point in the history
  • Loading branch information
DawinYurtseven committed Jan 10, 2025
1 parent 5be048f commit 4f01471
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 69 deletions.
12 changes: 11 additions & 1 deletion api/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"net/http"
"strconv"
)

func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) {
Expand Down Expand Up @@ -41,7 +42,16 @@ func (a actionRoutes) getFailedActions(c *gin.Context) {

func (a actionRoutes) getActionById(c *gin.Context) {
ctx := context.Background()
model, err := a.dao.GetActionByID(ctx, c.Param("id"))
id, err := strconv.Atoi(c.Param("id"))
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "Invalid action id",
Err: err,
})
return
}
model, err := a.dao.GetActionByID(ctx, uint(id))
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusNotFound,
Expand Down
77 changes: 36 additions & 41 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/TUM-Dev/gocast/model"
"github.com/TUM-Dev/gocast/tools"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
"github.com/tum-dev/gocast/runner/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -20,6 +19,7 @@ import (
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -53,32 +53,26 @@ func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.Regist
}

func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) {
runner := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
}

r, err := g.RunnerDao.Get(ctx, runner.Hostname)
r, err := g.RunnerDao.Get(ctx, request.Hostname)
if err != nil {
log.WithError(err).Error("Failed to get runner")
logger.Error("Failed to get runner", "err", err)
return &protobuf.HeartbeatResponse{Ok: false}, err
}

newStats := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
LastSeen: time.Now(),
Status: "Alive",
Workload: uint(request.Workload),
CPU: request.CPU,
Memory: request.Memory,
Disk: request.Disk,
Uptime: request.Uptime,
Version: request.Version,
Actions: request.CurrentAction,
}
ctx = context.WithValue(ctx, "newStats", newStats)
log.Info("Updating runner stats ", "runner", r)
newStat := make(map[string]interface{})
newStat["LastSeen"] = time.Now()
newStat["Status"] = "Alive"
newStat["Workload"] = uint(request.Workload)
newStat["CPU"] = request.CPU
newStat["Memory"] = request.Memory
newStat["Disk"] = request.Disk
newStat["Uptime"] = request.Uptime
newStat["Version"] = request.Version
newStat["Actions"] = request.CurrentAction

logger.Info("the actions of this runner", "runner", r, "actions", request.CurrentAction)
ctx = context.WithValue(ctx, "newStats", newStat)
logger.Info("Updating runner stats ", "runner", r)
p, err := r.UpdateStats(dao.DB, ctx)
return &protobuf.HeartbeatResponse{Ok: p}, err
}
Expand Down Expand Up @@ -243,7 +237,7 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob
return nil, err
}
if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) {
log.WithFields(log.Fields{"streamId": stream.ID}).Warn("Stream rejected, time out of bounds")
logger.Warn("Stream rejected, time out of bounds", "streamID", stream.ID)
return nil, errors.New("stream rejected")
}
ingestServer, err := g.IngestServerDao.GetBestIngestServer()
Expand Down Expand Up @@ -458,7 +452,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() {
values["source"] = lectureHallForStream.PresIP
err = CreateJob(dao, ctx, values) //presentation
if err != nil {
log.Error("Can't create job", err)
logger.Error("Can't create job", err)
}
break
case 2: //camera
Expand Down Expand Up @@ -511,7 +505,7 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
if action.End.Before(time.Now().Add(-5 * time.Minute)) {
action.SetToIgnored()
err = dao.ActionDao.UpdateAction(ctx, &action)
log.Info("Action ignored, check for progress manually", "action", action.ID)
logger.Info("Action ignored, check for progress manually", "action", action.ID)
continue
}
runner, err := action.GetCurrentRunner()
Expand All @@ -524,7 +518,8 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
}
continue
}
if !runner.IsAlive() && !action.IsCompleted() {
hasAction := strings.Contains(runner.Actions, strconv.Itoa(int(action.ID)))
if !runner.IsAlive() && !action.IsCompleted() && hasAction {
action.SetToFailed()
err = dao.ActionDao.UpdateAction(ctx, &action)
if err != nil {
Expand All @@ -540,13 +535,10 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
}
for _, failedAction := range failedActions {
failedAction.SetToRunning()
err = AssignRunnerAction(dao, &failedAction)
err = dao.ActionDao.UpdateAction(ctx, &failedAction)
if err != nil {
return
}
err := AssignRunnerAction(dao, &failedAction)
if err != nil {
logger.Error("Can't assign runner to action", err)
return
}
}

Expand All @@ -569,12 +561,13 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
logger.Error("Can't update job", err)
continue
}
action.SetToRunning()
err = AssignRunnerAction(dao, action)
if err != nil {
logger.Error("Can't assign runner to action", err)
continue
}
action.SetToRunning()

err = dao.ActionDao.UpdateAction(ctx, action)
if err != nil {
return
Expand All @@ -595,15 +588,18 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
return err
}
runner, err := getRunnerWithLeastWorkloadForJob(runners, action.Type)
action.AssignRunner(runner)
ctx := context.Background()

err = dao.AssignRunner(ctx, action, &runner)
if err != nil {
logger.Error("Can't unmarshal json", err)
logger.Error("Can't assign action", err)
return err
}
values := map[string]interface{}{}
err = json.Unmarshal([]byte(action.Values), &values)
if err != nil {
logger.Error("Can't unmarshal json", err)
return err
}
for key, value := range values {
//logger.Info("values", "value", value)
ctx = context.WithValue(ctx, key, value)
Expand All @@ -619,13 +615,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
//TranscodingRequest(ctx, dao, runner)
break
}
//action.SetToRunning()
logger.Info("runner counts", "count", len(action.AllRunners))
err = dao.ActionDao.UpdateAction(ctx, action)
if err != nil {
logger.Error("couldn't update action!", "error", err)
logger.Error("Can't update action", err)
return err
}
logger.Info("runner counts", "count", len(action.AllRunners))
return nil
}

Expand Down Expand Up @@ -686,7 +681,7 @@ func (g GrpcRunnerServer) mustEmbedUnimplementedFromRunnerServer() {
func StartGrpcRunnerServer() {
lis, err := net.Listen("tcp", ":50056")
if err != nil {
log.WithError(err).Error("Failed to init grpc server")
logger.Error("Failed to init grpc server", "err", err)
return
}
grpcServer := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
Expand All @@ -700,7 +695,7 @@ func StartGrpcRunnerServer() {
reflection.Register(grpcServer)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.WithError(err).Errorf("Can't serve grpc")
logger.Error("Can't serve grpc", "err", err)
}
}()
}
28 changes: 18 additions & 10 deletions dao/Action.go → dao/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (

type ActionDao interface {
CreateAction(ctx context.Context, action *model.Action) error
CompleteAction(ctx context.Context, actionID string) error
GetActionByID(ctx context.Context, actionID string) (model.Action, error)
GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error)
CompleteAction(ctx context.Context, actionID uint) error
GetActionByID(ctx context.Context, actionID uint) (model.Action, error)
GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error)
GetAwaitingActions(ctx context.Context) ([]model.Action, error)
GetRunningActions(ctx context.Context) ([]model.Action, error)
GetAll(ctx context.Context) ([]model.Action, error)
GetAllFailedActions(ctx context.Context) ([]model.Action, error)
UpdateAction(ctx context.Context, action *model.Action) error
GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error)
AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error
GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error)
}

type actionDao struct {
Expand All @@ -33,17 +34,17 @@ func (d actionDao) CreateAction(ctx context.Context, action *model.Action) error
return d.db.WithContext(ctx).Create(&action).Error
}

func (d actionDao) CompleteAction(ctx context.Context, actionID string) error {
func (d actionDao) CompleteAction(ctx context.Context, actionID uint) error {
return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", actionID).Update("status", "completed").Error
}

func (d actionDao) GetActionByID(ctx context.Context, actionID string) (model.Action, error) {
func (d actionDao) GetActionByID(ctx context.Context, actionID uint) (model.Action, error) {
var action model.Action
err := d.db.WithContext(ctx).First(&action, "id = ?", actionID).Error
return action, err
}

func (d actionDao) GetActionsByJobID(ctx context.Context, jobID string) ([]model.Action, error) {
func (d actionDao) GetActionsByJobID(ctx context.Context, jobID uint) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Find(&actions, "job_id = ?", jobID).Error
return actions, err
Expand All @@ -57,7 +58,7 @@ func (d actionDao) GetAwaitingActions(ctx context.Context) ([]model.Action, erro

func (d actionDao) GetRunningActions(ctx context.Context) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Find(&actions, "status = ?", 1).Error
err := d.db.WithContext(ctx).Preload("AllRunners").Find(&actions, "status = ?", 1).Error
return actions, err
}

Expand All @@ -74,10 +75,17 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err
}

func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error {
return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error
err := d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error
act, _ := d.GetActionByID(ctx, action.ID)
logger.Info("updated action", "action", len(act.AllRunners))
return err
}

func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID string) ([]model.Action, error) {
func (d actionDao) AssignRunner(ctx context.Context, action *model.Action, runner *model.Runner) error {
return d.db.WithContext(ctx).Model(&action).Association("AllRunners").Append(runner)
}

func (d actionDao) GetAllActionOfRunner(ctx context.Context, runnerID uint) ([]model.Action, error) {
var actions []model.Action
err := d.db.WithContext(ctx).Joins("AllRunners").Where("id = ?", runnerID).Find(&actions).Error
return actions, err
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY=
Expand All @@ -704,6 +705,7 @@ golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
Expand Down Expand Up @@ -766,7 +768,6 @@ google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQf
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand Down
5 changes: 0 additions & 5 deletions model/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ func (a *Action) GetCurrentRunner() (*Runner, error) {
return &a.AllRunners[len(a.AllRunners)-1], nil
}

func (a *Action) AssignRunner(runner Runner) {
a.AllRunners = append(a.AllRunners, runner)
logger.Info("runner count", "count", len(a.AllRunners))
}

func (a *Action) GetValues() string {
return a.Values
}
Expand Down
6 changes: 3 additions & 3 deletions model/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ func (r *Runner) BeforeCreate(tx *gorm.DB) (err error) {

// UpdateStats SendHeartbeat updates the last seen time of the runner and gives runner stats
func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) {
newStats := ctx.Value("newStats").(Runner)
newStats := ctx.Value("newStats").(map[string]interface{})
logger.Info("updating stats", "newStats", newStats)
err := tx.WithContext(ctx).Model(&r).Updates(newStats).Error
if err != nil {
return false, err
}

return true, nil
}

func (r *Runner) IsAlive() bool {
r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -2))
r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1))
if r.Alive {
r.Status = "Alive"
} else {
Expand Down
2 changes: 1 addition & 1 deletion runner/ServerHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Runner) RegisterWithGocast(retries int) {
go func() {
for {
r.ReadDiagnostics(5)
time.Sleep(time.Minute)
time.Sleep(time.Second * 30)
}
}()
}
Expand Down
Loading

0 comments on commit 4f01471

Please sign in to comment.