From fb4d939623981e255ad9f62c03df62cbb9d5b933 Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Mon, 11 Nov 2024 09:21:49 +0000 Subject: [PATCH] Scheduler: tidy, more to internaltypes.ResourceList (#4038) * Scheduler: tidy, more Signed-off-by: Robert Smith * fix Signed-off-by: Robert Smith * more to internaltypes Signed-off-by: Robert Smith --------- Signed-off-by: Robert Smith --- .../scheduler/scheduling/context/queue.go | 30 ++++++------- .../scheduling/context/scheduling.go | 43 ++++++++----------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/internal/scheduler/scheduling/context/queue.go b/internal/scheduler/scheduling/context/queue.go index b4707029f3b..918028a99f8 100644 --- a/internal/scheduler/scheduling/context/queue.go +++ b/internal/scheduler/scheduling/context/queue.go @@ -14,7 +14,6 @@ import ( armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) // QueueSchedulingContext captures the decisions made by the scheduler during one invocation @@ -48,9 +47,9 @@ type QueueSchedulingContext struct { // Includes jobs scheduled during this invocation of the scheduler. AllocatedByPriorityClass map[string]internaltypes.ResourceList // Resources assigned to this queue during this scheduling cycle. - ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] + ScheduledResourcesByPriorityClass map[string]internaltypes.ResourceList // Resources evicted from this queue during this scheduling cycle. - EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] + EvictedResourcesByPriorityClass map[string]internaltypes.ResourceList // Job scheduling contexts associated with successful scheduling attempts. SuccessfulJobSchedulingContexts map[string]*JobSchedulingContext // Job scheduling contexts associated with unsuccessful scheduling attempts. @@ -82,10 +81,10 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { fmt.Fprintf(w, "Time:\t%s\n", qctx.Created) fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue) } - fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString()) - fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String()) - fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString()) - fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String()) + fmt.Fprintf(w, "Scheduled resources:\t%s\n", internaltypes.RlMapSumValues(qctx.ScheduledResourcesByPriorityClass).String()) + fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.ScheduledResourcesByPriorityClass)) + fmt.Fprintf(w, "Preempted resources:\t%s\n", internaltypes.RlMapSumValues(qctx.EvictedResourcesByPriorityClass).String()) + fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.EvictedResourcesByPriorityClass)) if verbosity >= 0 { fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.String()) for pc, res := range qctx.AllocatedByPriorityClass { @@ -171,17 +170,18 @@ func (qctx *QueueSchedulingContext) addJobSchedulingContext(jctx *JobSchedulingC // Always update ResourcesByPriority. // Since ResourcesByPriority is used to order queues by fraction of fair share. pcName := jctx.Job.PriorityClassName() - qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(jctx.Job.AllResourceRequirements()) - qctx.Allocated = qctx.Allocated.Add(jctx.Job.AllResourceRequirements()) + rl := jctx.Job.AllResourceRequirements() + qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(rl) + qctx.Allocated = qctx.Allocated.Add(rl) // Only if the job is not evicted, update ScheduledResourcesByPriority. // Since ScheduledResourcesByPriority is used to control per-round scheduling constraints. if evictedInThisRound { delete(qctx.EvictedJobsById, jctx.JobId) - qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Subtract(rl) } else { qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx - qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Add(rl) } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx @@ -197,16 +197,16 @@ func (qctx *QueueSchedulingContext) evictJob(job *jobdb.Job) (bool, error) { if _, ok := qctx.EvictedJobsById[jobId]; ok { return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId) } - rl := job.ResourceRequirements().Requests + pcName := job.PriorityClassName() + rl := job.AllResourceRequirements() _, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId] if scheduledInThisRound { - qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl) + qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Subtract(rl) delete(qctx.SuccessfulJobSchedulingContexts, jobId) } else { - qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl) + qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Add(rl) qctx.EvictedJobsById[jobId] = true } - pcName := job.PriorityClassName() qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Subtract(job.AllResourceRequirements()) qctx.Allocated = qctx.Allocated.Subtract(job.AllResourceRequirements()) diff --git a/internal/scheduler/scheduling/context/scheduling.go b/internal/scheduler/scheduling/context/scheduling.go index 0feb303ba73..284f543cfd9 100644 --- a/internal/scheduler/scheduling/context/scheduling.go +++ b/internal/scheduler/scheduling/context/scheduling.go @@ -42,11 +42,9 @@ type SchedulingContext struct { // Allocated resources across all clusters in this pool Allocated internaltypes.ResourceList // Resources assigned across all queues during this scheduling cycle. - ScheduledResources internaltypes.ResourceList - ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] + ScheduledResources internaltypes.ResourceList // Resources evicted across all queues during this scheduling cycle. - EvictedResources schedulerobjects.ResourceList - EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] + EvictedResources internaltypes.ResourceList // Total number of successfully scheduled jobs. NumScheduledJobs int // Total number of successfully scheduled gangs. @@ -71,17 +69,16 @@ func NewSchedulingContext( totalResources internaltypes.ResourceList, ) *SchedulingContext { return &SchedulingContext{ - Started: time.Now(), - Pool: pool, - FairnessCostProvider: fairnessCostProvider, - Limiter: limiter, - QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), - TotalResources: totalResources, - ScheduledResources: internaltypes.ResourceList{}, - ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), - EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), - SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(), - UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext), + Started: time.Now(), + Pool: pool, + FairnessCostProvider: fairnessCostProvider, + Limiter: limiter, + QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), + TotalResources: totalResources, + ScheduledResources: internaltypes.ResourceList{}, + EvictedResources: internaltypes.ResourceList{}, + SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(), + UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext), } } @@ -125,8 +122,8 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext( Demand: demand, CappedDemand: cappedDemand, AllocatedByPriorityClass: initialAllocatedByPriorityClass, - ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), - EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), + ScheduledResourcesByPriorityClass: make(map[string]internaltypes.ResourceList), + EvictedResourcesByPriorityClass: make(map[string]internaltypes.ResourceList), SuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext), UnsuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext), EvictedJobsById: make(map[string]bool), @@ -221,7 +218,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string { fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason) fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.String()) fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.String()) - fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString()) + fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.String()) fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs) fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs) fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs) @@ -293,12 +290,10 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex } if jctx.IsSuccessful() { if evictedInThisRound { - sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) - sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + sctx.EvictedResources = sctx.EvictedResources.Subtract(jctx.Job.AllResourceRequirements()) sctx.NumEvictedJobs-- } else { sctx.ScheduledResources = sctx.ScheduledResources.Add(jctx.Job.AllResourceRequirements()) - sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ } sctx.Allocated = sctx.Allocated.Add(jctx.Job.AllResourceRequirements()) @@ -340,14 +335,12 @@ func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error if err != nil { return false, err } - rl := jctx.Job.ResourceRequirements().Requests + if scheduledInThisRound { sctx.ScheduledResources = sctx.ScheduledResources.Subtract(jctx.Job.AllResourceRequirements()) - sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), rl) sctx.NumScheduledJobs-- } else { - sctx.EvictedResources.AddV1ResourceList(rl) - sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), rl) + sctx.EvictedResources = sctx.EvictedResources.Add(jctx.Job.AllResourceRequirements()) sctx.NumEvictedJobs++ } sctx.Allocated = sctx.Allocated.Subtract(jctx.Job.AllResourceRequirements())