From 0978736b92d746e4d2f34c7ad912034eac5d916a Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 14 Nov 2023 15:27:10 +0000 Subject: [PATCH] Refactoring --- internal/scheduler/common.go | 76 ------------------------- internal/scheduler/metrics/metrics.go | 35 ++++++++++++ internal/scheduler/result.go | 51 +++++++++++++++++ internal/scheduler/scheduler_metrics.go | 4 +- internal/scheduler/scheduler_test.go | 27 +++++++++ 5 files changed, 115 insertions(+), 78 deletions(-) create mode 100644 internal/scheduler/metrics/metrics.go create mode 100644 internal/scheduler/result.go diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 55d22c3bac1..789053a8d6c 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -11,86 +11,10 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" - schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) -// SchedulerResult is returned by Rescheduler.Schedule(). -type SchedulerResult struct { - // Whether the scheduler failed to create a result for some reason - EmptyResult bool - // Running jobs that should be preempted. - PreemptedJobs []interfaces.LegacySchedulerJob - // Queued jobs that should be scheduled. - ScheduledJobs []interfaces.LegacySchedulerJob - // Queued jobs that could not be scheduled. - // This is used to fail jobs that could not schedule above `minimumGangCardinality`. - FailedJobs []interfaces.LegacySchedulerJob - // For each preempted job, maps the job id to the id of the node on which the job was running. - // For each scheduled job, maps the job id to the id of the node on which the job should be scheduled. - NodeIdByJobId map[string]string - // The Scheduling Context. Being passed up for metrics decisions made in scheduler.go and scheduler_metrics.go. - // Passing a pointer as the structure is enormous - SchedulingContexts []*schedulercontext.SchedulingContext -} - -func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob]( - preemptedJobs S, - scheduledJobs S, - failedJobs S, - nodeIdByJobId map[string]string, -) *SchedulerResult { - castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs)) - for i, job := range preemptedJobs { - castPreemptedJobs[i] = job - } - castScheduledJobs := make([]interfaces.LegacySchedulerJob, len(scheduledJobs)) - for i, job := range scheduledJobs { - castScheduledJobs[i] = job - } - castFailedJobs := make([]interfaces.LegacySchedulerJob, len(failedJobs)) - for i, job := range failedJobs { - castFailedJobs[i] = job - } - return &SchedulerResult{ - PreemptedJobs: castPreemptedJobs, - ScheduledJobs: castScheduledJobs, - NodeIdByJobId: nodeIdByJobId, - FailedJobs: castFailedJobs, - } -} - -// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result, -// cast to type T. -func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.PreemptedJobs)) - for i, job := range sr.PreemptedJobs { - rv[i] = job.(T) - } - return rv -} - -// ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result, -// cast to type T. -func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.ScheduledJobs)) - for i, job := range sr.ScheduledJobs { - rv[i] = job.(T) - } - return rv -} - -// FailedJobsFromScheduleResult returns the slice of scheduled jobs in the result, -// cast to type T. -func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.FailedJobs)) - for i, job := range sr.FailedJobs { - rv[i] = job.(T) - } - return rv -} - // JobsSummary returns a string giving an overview of the provided jobs meant for logging. // For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]". func JobsSummary(jobs []interfaces.LegacySchedulerJob) string { diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go new file mode 100644 index 00000000000..ba2d1c2687a --- /dev/null +++ b/internal/scheduler/metrics/metrics.go @@ -0,0 +1,35 @@ +package metrics + +import ( + schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/prometheus/client_golang/prometheus" +) + +type Metrics struct { + // Indicates whether this instance is the leader. + isLeader prometheus.Gauge + // Scheduling cycle time percentiles. Only exported when leader. + scheduleCycleTime prometheus.Summary + // State reconciliation cycle time percentiles. + reconcileCycleTime prometheus.Summary + // Number of scheduled jobs. + scheduledJobs prometheus.CounterVec + // Number of preempted jobs. + preemptedJobs prometheus.CounterVec + // Number of failed jobs. + failedJobs prometheus.CounterVec + // Number of successful jobs. + succeededJobs prometheus.CounterVec + // Number of jobs considered when scheduling. + consideredJobs prometheus.CounterVec + // Fair share of each queue. + fairSharePerQueue prometheus.GaugeVec + // Actual share of each queue. + actualSharePerQueue prometheus.GaugeVec + // Determines whether to expose specified metrics + disabledMetrics map[prometheus.Collector]bool +} + +func (m *Metrics) Observe(result schedulercontext.SchedulingContext) { + +} diff --git a/internal/scheduler/result.go b/internal/scheduler/result.go new file mode 100644 index 00000000000..f577c8a53e0 --- /dev/null +++ b/internal/scheduler/result.go @@ -0,0 +1,51 @@ +package scheduler + +import ( + schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/interfaces" +) + +// SchedulerResult is returned by Rescheduler.Schedule(). +type SchedulerResult struct { + // Running jobs that should be preempted. + PreemptedJobs []interfaces.LegacySchedulerJob + // Queued jobs that should be scheduled. + ScheduledJobs []interfaces.LegacySchedulerJob + // Queued jobs that could not be scheduled. + // This is used to fail jobs that could not schedule above `minimumGangCardinality`. + FailedJobs []interfaces.LegacySchedulerJob + // For each preempted job, maps the job id to the id of the node on which the job was running. + // For each scheduled job, maps the job id to the id of the node on which the job should be scheduled. + NodeIdByJobId map[string]string + // Each result may bundle the result of several scheduling decisions. + // These are the corresponding scheduling contexts. + // TODO: This doesn't seem like the right approach. + SchedulingContexts []*schedulercontext.SchedulingContext +} + +// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result cast to type T. +func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { + rv := make([]T, len(sr.PreemptedJobs)) + for i, job := range sr.PreemptedJobs { + rv[i] = job.(T) + } + return rv +} + +// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T. +func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { + rv := make([]T, len(sr.ScheduledJobs)) + for i, job := range sr.ScheduledJobs { + rv[i] = job.(T) + } + return rv +} + +// FailedJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T. +func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { + rv := make([]T, len(sr.FailedJobs)) + for i, job := range sr.FailedJobs { + rv[i] = job.(T) + } + return rv +} diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 8b39bf9fc82..d9cd884ecf3 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -311,8 +311,8 @@ func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duratio } func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result SchedulerResult) { - if result.EmptyResult { - return // TODO: Add logging or maybe place to add failure metric? + if len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 && len(result.FailedJobs) == 0 { + return } nodeFromJob := func(j *jobdb.Job) string { if j.HasRuns() { diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index a4c7244f4eb..a3eaf5e1160 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -19,6 +19,7 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/affinity" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -1183,6 +1184,32 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn return NewSchedulerResultForTest(preemptedJobs, scheduledJobs, failedJobs, nil), nil } +func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob]( + preemptedJobs S, + scheduledJobs S, + failedJobs S, + nodeIdByJobId map[string]string, +) *SchedulerResult { + castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs)) + for i, job := range preemptedJobs { + castPreemptedJobs[i] = job + } + castScheduledJobs := make([]interfaces.LegacySchedulerJob, len(scheduledJobs)) + for i, job := range scheduledJobs { + castScheduledJobs[i] = job + } + castFailedJobs := make([]interfaces.LegacySchedulerJob, len(failedJobs)) + for i, job := range failedJobs { + castFailedJobs[i] = job + } + return &SchedulerResult{ + PreemptedJobs: castPreemptedJobs, + ScheduledJobs: castScheduledJobs, + NodeIdByJobId: nodeIdByJobId, + FailedJobs: castFailedJobs, + } +} + type testPublisher struct { events []*armadaevents.EventSequence shouldError bool