diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index dceba2b6a6b..92891ce7fc4 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -107,7 +107,7 @@ func (sch *PreemptingQueueScheduler) EnableNewPreemptionStrategy() { // Schedule // - preempts jobs belonging to queues with total allocation above their fair share and // - schedules new jobs belonging to queues with total allocation less than their fair share. -func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error) { +func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerobjects.SchedulerResult, error) { defer func() { sch.schedulingContext.Finished = time.Now() }() @@ -258,7 +258,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche return nil, err } } - return &SchedulerResult{ + return &schedulerobjects.SchedulerResult{ PreemptedJobs: preemptedJobs, ScheduledJobs: scheduledJobs, FailedJobs: schedulerResult.FailedJobs, @@ -537,7 +537,7 @@ func addEvictedJobsToNodeDb(ctx *armadacontext.Context, sctx *schedulercontext.S return nil } -func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*SchedulerResult, error) { +func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerobjects.SchedulerResult, error) { jobIteratorByQueue := make(map[string]JobIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { evictedIt, err := inMemoryJobRepo.GetJobIterator(ctx, qctx.Queue) diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index d7f1230455a..ad68755df2a 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1885,7 +1885,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { } jobsByNodeId := make(map[string][]*jobdb.Job) - for _, job := range ScheduledJobsFromSchedulerResult[*jobdb.Job](result) { + for _, job := range schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result) { nodeId := result.NodeIdByJobId[job.GetId()] jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 96497bb9620..81ec6e3e58e 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -60,7 +60,7 @@ func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { sch.gangScheduler.SkipUnsuccessfulSchedulingKeyCheck() } -func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error) { +func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerobjects.SchedulerResult, error) { nodeIdByJobId := make(map[string]string) scheduledJobs := make([]interfaces.LegacySchedulerJob, 0) failedJobs := make([]interfaces.LegacySchedulerJob, 0) @@ -129,7 +129,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul if len(scheduledJobs) != len(nodeIdByJobId) { return nil, errors.Errorf("only %d out of %d jobs mapped to a node", len(nodeIdByJobId), len(scheduledJobs)) } - return &SchedulerResult{ + return &schedulerobjects.SchedulerResult{ PreemptedJobs: nil, ScheduledJobs: scheduledJobs, FailedJobs: failedJobs, diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 00581d01e91..69cb27d0e2a 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -196,9 +196,9 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error { // cycle is a single iteration of the main scheduling loop. // If updateAll is true, we generate events from all jobs in the jobDb. // Otherwise, we only generate events from jobs updated since the last cycle. -func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) (overallSchedulerResult SchedulerResult, err error) { +func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) (overallSchedulerResult schedulerobjects.SchedulerResult, err error) { // TODO: Consider returning a slice of these instead. - overallSchedulerResult = SchedulerResult{} + overallSchedulerResult = schedulerobjects.SchedulerResult{} // Update job state. updatedJobs, err := s.syncState(ctx) @@ -241,7 +241,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke // Schedule jobs. if shouldSchedule { - var result *SchedulerResult + var result *schedulerobjects.SchedulerResult result, err = s.schedulingAlgo.Schedule(ctx, txn) if err != nil { return @@ -390,22 +390,22 @@ func (s *Scheduler) addNodeAntiAffinitiesForAttemptedRunsIfSchedulable(job *jobd } // eventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult. -func (s *Scheduler) eventsFromSchedulerResult(result *SchedulerResult) ([]*armadaevents.EventSequence, error) { +func (s *Scheduler) eventsFromSchedulerResult(result *schedulerobjects.SchedulerResult) ([]*armadaevents.EventSequence, error) { return EventsFromSchedulerResult(result, s.clock.Now()) } // EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult. -func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) { +func EventsFromSchedulerResult(result *schedulerobjects.SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) { eventSequences := make([]*armadaevents.EventSequence, 0, len(result.PreemptedJobs)+len(result.ScheduledJobs)+len(result.FailedJobs)) - eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, PreemptedJobsFromSchedulerResult[*jobdb.Job](result), time) + eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](result), time) if err != nil { return nil, err } - eventSequences, err = AppendEventSequencesFromScheduledJobs(eventSequences, ScheduledJobsFromSchedulerResult[*jobdb.Job](result), time) + eventSequences, err = AppendEventSequencesFromScheduledJobs(eventSequences, schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result), time) if err != nil { return nil, err } - eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, FailedJobsFromSchedulerResult[*jobdb.Job](result), time) + eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](result), time) if err != nil { return nil, err } diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index d9cd884ecf3..8129e0fae7f 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -1,6 +1,7 @@ package scheduler import ( + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "time" "github.com/armadaproject/armada/internal/common/util" @@ -310,7 +311,7 @@ func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duratio metrics.reconcileCycleTime.Observe(float64(cycleTime.Milliseconds())) } -func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result SchedulerResult) { +func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result schedulerobjects.SchedulerResult) { if len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 && len(result.FailedJobs) == 0 { return } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index d7379a5fd1d..be6fdd11494 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1126,7 +1126,7 @@ type testSchedulingAlgo struct { shouldError bool } -func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error) { +func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*schedulerobjects.SchedulerResult, error) { t.numberOfScheduleCalls++ if t.shouldError { return nil, errors.New("error scheduling jobs") @@ -1189,7 +1189,7 @@ func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob]( scheduledJobs S, failedJobs S, nodeIdByJobId map[string]string, -) *SchedulerResult { +) *schedulerobjects.SchedulerResult { castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs)) for i, job := range preemptedJobs { castPreemptedJobs[i] = job @@ -1202,7 +1202,7 @@ func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob]( for i, job := range failedJobs { castFailedJobs[i] = job } - return &SchedulerResult{ + return &schedulerobjects.SchedulerResult{ PreemptedJobs: castPreemptedJobs, ScheduledJobs: castScheduledJobs, NodeIdByJobId: nodeIdByJobId, diff --git a/internal/scheduler/result.go b/internal/scheduler/schedulerobjects/result.go similarity index 98% rename from internal/scheduler/result.go rename to internal/scheduler/schedulerobjects/result.go index f577c8a53e0..88344b5d080 100644 --- a/internal/scheduler/result.go +++ b/internal/scheduler/schedulerobjects/result.go @@ -1,4 +1,4 @@ -package scheduler +package schedulerobjects import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 48c06e9c56c..7897a4b7458 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -34,7 +34,7 @@ import ( type SchedulingAlgo interface { // Schedule should assign jobs to nodes. // Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided. - Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error) + Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*schedulerobjects.SchedulerResult, error) } // FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler. @@ -90,13 +90,13 @@ func NewFairSchedulingAlgo( func (l *FairSchedulingAlgo) Schedule( ctx *armadacontext.Context, txn *jobdb.Txn, -) (*SchedulerResult, error) { +) (*schedulerobjects.SchedulerResult, error) { var cancel context.CancelFunc if l.maxSchedulingDuration != 0 { ctx, cancel = armadacontext.WithTimeout(ctx, l.maxSchedulingDuration) defer cancel() } - overallSchedulerResult := &SchedulerResult{ + overallSchedulerResult := &schedulerobjects.SchedulerResult{ NodeIdByJobId: make(map[string]string), SchedulingContexts: make([]*schedulercontext.SchedulingContext, 0, 0), FailedJobs: make([]interfaces.LegacySchedulerJob, 0), @@ -169,9 +169,9 @@ func (l *FairSchedulingAlgo) Schedule( } } - preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) - scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) - failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) if err := txn.Upsert(preemptedJobs); err != nil { return nil, err } @@ -337,7 +337,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( pool string, minimumJobSize schedulerobjects.ResourceList, executors []*schedulerobjects.Executor, -) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { +) (*schedulerobjects.SchedulerResult, *schedulercontext.SchedulingContext, error) { nodeDb, err := nodedb.NewNodeDb( l.schedulingConfig.Preemption.PriorityClasses, l.schedulingConfig.MaxExtraNodesToConsider, diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index 023572bf3ff..b550e1ef4d0 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -425,7 +425,7 @@ func TestSchedule(t *testing.T) { require.NoError(t, err) // Check that the expected preemptions took place. - preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) actualPreemptedJobsByExecutorIndexAndNodeIndex := make(map[int]map[int][]int) for _, job := range preemptedJobs { executorIndex := executorIndexByJobId[job.Id()] @@ -450,7 +450,7 @@ func TestSchedule(t *testing.T) { } // Check that jobs were scheduled as expected. - scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) actualScheduledIndices := make([]int, 0) for _, job := range scheduledJobs { actualScheduledIndices = append(actualScheduledIndices, queueIndexByJobId[job.Id()]) @@ -463,7 +463,7 @@ func TestSchedule(t *testing.T) { } // Check that we failed the correct number of excess jobs when a gang schedules >= minimum cardinality - failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) assert.Equal(t, tc.expectedFailedJobCount, len(failedJobs)) // Check that preempted jobs are marked as such consistently. diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 75b10a6afdf..a31faf9d750 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -477,9 +477,9 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { // Update jobDb to reflect the decisions by the scheduler. // Sort jobs to ensure deterministic event ordering. - preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult[*jobdb.Job](result) - scheduledJobs := scheduler.ScheduledJobsFromSchedulerResult[*jobdb.Job](result) - failedJobs := scheduler.FailedJobsFromSchedulerResult[*jobdb.Job](result) + preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](result) + scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result) + failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](result) less := func(a, b *jobdb.Job) bool { if a.Queue() < b.Queue() { return true