Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed Nov 14, 2023
1 parent 351b315 commit 0978736
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 78 deletions.
76 changes: 0 additions & 76 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,86 +11,10 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

// SchedulerResult is returned by Rescheduler.Schedule().
type SchedulerResult struct {
// Whether the scheduler failed to create a result for some reason
EmptyResult bool
// Running jobs that should be preempted.
PreemptedJobs []interfaces.LegacySchedulerJob
// Queued jobs that should be scheduled.
ScheduledJobs []interfaces.LegacySchedulerJob
// Queued jobs that could not be scheduled.
// This is used to fail jobs that could not schedule above `minimumGangCardinality`.
FailedJobs []interfaces.LegacySchedulerJob
// For each preempted job, maps the job id to the id of the node on which the job was running.
// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
NodeIdByJobId map[string]string
// The Scheduling Context. Being passed up for metrics decisions made in scheduler.go and scheduler_metrics.go.
// Passing a pointer as the structure is enormous
SchedulingContexts []*schedulercontext.SchedulingContext
}

func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob](
preemptedJobs S,
scheduledJobs S,
failedJobs S,
nodeIdByJobId map[string]string,
) *SchedulerResult {
castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs))
for i, job := range preemptedJobs {
castPreemptedJobs[i] = job
}
castScheduledJobs := make([]interfaces.LegacySchedulerJob, len(scheduledJobs))
for i, job := range scheduledJobs {
castScheduledJobs[i] = job
}
castFailedJobs := make([]interfaces.LegacySchedulerJob, len(failedJobs))
for i, job := range failedJobs {
castFailedJobs[i] = job
}
return &SchedulerResult{
PreemptedJobs: castPreemptedJobs,
ScheduledJobs: castScheduledJobs,
NodeIdByJobId: nodeIdByJobId,
FailedJobs: castFailedJobs,
}
}

// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result,
// cast to type T.
func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.PreemptedJobs))
for i, job := range sr.PreemptedJobs {
rv[i] = job.(T)
}
return rv
}

// ScheduledJobsFromScheduleResult returns the slice of scheduled jobs in the result,
// cast to type T.
func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.ScheduledJobs))
for i, job := range sr.ScheduledJobs {
rv[i] = job.(T)
}
return rv
}

// FailedJobsFromScheduleResult returns the slice of scheduled jobs in the result,
// cast to type T.
func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.FailedJobs))
for i, job := range sr.FailedJobs {
rv[i] = job.(T)
}
return rv
}

// JobsSummary returns a string giving an overview of the provided jobs meant for logging.
// For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]".
func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
Expand Down
35 changes: 35 additions & 0 deletions internal/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package metrics

import (
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/prometheus/client_golang/prometheus"
)

type Metrics struct {
// Indicates whether this instance is the leader.
isLeader prometheus.Gauge
// Scheduling cycle time percentiles. Only exported when leader.
scheduleCycleTime prometheus.Summary
// State reconciliation cycle time percentiles.
reconcileCycleTime prometheus.Summary
// Number of scheduled jobs.
scheduledJobs prometheus.CounterVec
// Number of preempted jobs.
preemptedJobs prometheus.CounterVec
// Number of failed jobs.
failedJobs prometheus.CounterVec
// Number of successful jobs.
succeededJobs prometheus.CounterVec
// Number of jobs considered when scheduling.
consideredJobs prometheus.CounterVec
// Fair share of each queue.
fairSharePerQueue prometheus.GaugeVec
// Actual share of each queue.
actualSharePerQueue prometheus.GaugeVec
// Determines whether to expose specified metrics
disabledMetrics map[prometheus.Collector]bool
}

func (m *Metrics) Observe(result schedulercontext.SchedulingContext) {

}
51 changes: 51 additions & 0 deletions internal/scheduler/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package scheduler

import (
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
)

// SchedulerResult is returned by Rescheduler.Schedule().
type SchedulerResult struct {
// Running jobs that should be preempted.
PreemptedJobs []interfaces.LegacySchedulerJob
// Queued jobs that should be scheduled.
ScheduledJobs []interfaces.LegacySchedulerJob
// Queued jobs that could not be scheduled.
// This is used to fail jobs that could not schedule above `minimumGangCardinality`.
FailedJobs []interfaces.LegacySchedulerJob
// For each preempted job, maps the job id to the id of the node on which the job was running.
// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
NodeIdByJobId map[string]string
// Each result may bundle the result of several scheduling decisions.
// These are the corresponding scheduling contexts.
// TODO: This doesn't seem like the right approach.
SchedulingContexts []*schedulercontext.SchedulingContext
}

// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result cast to type T.
func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.PreemptedJobs))
for i, job := range sr.PreemptedJobs {
rv[i] = job.(T)
}
return rv
}

// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T.
func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.ScheduledJobs))
for i, job := range sr.ScheduledJobs {
rv[i] = job.(T)
}
return rv
}

// FailedJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T.
func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.FailedJobs))
for i, job := range sr.FailedJobs {
rv[i] = job.(T)
}
return rv
}
4 changes: 2 additions & 2 deletions internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duratio
}

func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result SchedulerResult) {
if result.EmptyResult {
return // TODO: Add logging or maybe place to add failure metric?
if len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 && len(result.FailedJobs) == 0 {
return
}
nodeFromJob := func(j *jobdb.Job) string {
if j.HasRuns() {
Expand Down
27 changes: 27 additions & 0 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/affinity"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
Expand Down Expand Up @@ -1183,6 +1184,32 @@ func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn
return NewSchedulerResultForTest(preemptedJobs, scheduledJobs, failedJobs, nil), nil
}

func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob](
preemptedJobs S,
scheduledJobs S,
failedJobs S,
nodeIdByJobId map[string]string,
) *SchedulerResult {
castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs))
for i, job := range preemptedJobs {
castPreemptedJobs[i] = job
}
castScheduledJobs := make([]interfaces.LegacySchedulerJob, len(scheduledJobs))
for i, job := range scheduledJobs {
castScheduledJobs[i] = job
}
castFailedJobs := make([]interfaces.LegacySchedulerJob, len(failedJobs))
for i, job := range failedJobs {
castFailedJobs[i] = job
}
return &SchedulerResult{
PreemptedJobs: castPreemptedJobs,
ScheduledJobs: castScheduledJobs,
NodeIdByJobId: nodeIdByJobId,
FailedJobs: castFailedJobs,
}
}

type testPublisher struct {
events []*armadaevents.EventSequence
shouldError bool
Expand Down

0 comments on commit 0978736

Please sign in to comment.