Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed Nov 14, 2023
1 parent ead55bc commit 651bebe
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 32 deletions.
6 changes: 3 additions & 3 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (sch *PreemptingQueueScheduler) EnableNewPreemptionStrategy() {
// Schedule
// - preempts jobs belonging to queues with total allocation above their fair share and
// - schedules new jobs belonging to queues with total allocation less than their fair share.
func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error) {
func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerobjects.SchedulerResult, error) {
defer func() {
sch.schedulingContext.Finished = time.Now()
}()
Expand Down Expand Up @@ -258,7 +258,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
return nil, err
}
}
return &SchedulerResult{
return &schedulerobjects.SchedulerResult{
PreemptedJobs: preemptedJobs,
ScheduledJobs: scheduledJobs,
FailedJobs: schedulerResult.FailedJobs,
Expand Down Expand Up @@ -537,7 +537,7 @@ func addEvictedJobsToNodeDb(ctx *armadacontext.Context, sctx *schedulercontext.S
return nil
}

func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*SchedulerResult, error) {
func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerobjects.SchedulerResult, error) {
jobIteratorByQueue := make(map[string]JobIterator)
for _, qctx := range sch.schedulingContext.QueueSchedulingContexts {
evictedIt, err := inMemoryJobRepo.GetJobIterator(ctx, qctx.Queue)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,7 +1885,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
}

jobsByNodeId := make(map[string][]*jobdb.Job)
for _, job := range ScheduledJobsFromSchedulerResult[*jobdb.Job](result) {
for _, job := range schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result) {
nodeId := result.NodeIdByJobId[job.GetId()]
jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.gangScheduler.SkipUnsuccessfulSchedulingKeyCheck()
}

func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResult, error) {
func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerobjects.SchedulerResult, error) {
nodeIdByJobId := make(map[string]string)
scheduledJobs := make([]interfaces.LegacySchedulerJob, 0)
failedJobs := make([]interfaces.LegacySchedulerJob, 0)
Expand Down Expand Up @@ -129,7 +129,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
if len(scheduledJobs) != len(nodeIdByJobId) {
return nil, errors.Errorf("only %d out of %d jobs mapped to a node", len(nodeIdByJobId), len(scheduledJobs))
}
return &SchedulerResult{
return &schedulerobjects.SchedulerResult{
PreemptedJobs: nil,
ScheduledJobs: scheduledJobs,
FailedJobs: failedJobs,
Expand Down
16 changes: 8 additions & 8 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {
// cycle is a single iteration of the main scheduling loop.
// If updateAll is true, we generate events from all jobs in the jobDb.
// Otherwise, we only generate events from jobs updated since the last cycle.
func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) (overallSchedulerResult SchedulerResult, err error) {
func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToken LeaderToken, shouldSchedule bool) (overallSchedulerResult schedulerobjects.SchedulerResult, err error) {
// TODO: Consider returning a slice of these instead.
overallSchedulerResult = SchedulerResult{}
overallSchedulerResult = schedulerobjects.SchedulerResult{}

// Update job state.
updatedJobs, err := s.syncState(ctx)
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke

// Schedule jobs.
if shouldSchedule {
var result *SchedulerResult
var result *schedulerobjects.SchedulerResult
result, err = s.schedulingAlgo.Schedule(ctx, txn)
if err != nil {
return
Expand Down Expand Up @@ -390,22 +390,22 @@ func (s *Scheduler) addNodeAntiAffinitiesForAttemptedRunsIfSchedulable(job *jobd
}

// eventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.
func (s *Scheduler) eventsFromSchedulerResult(result *SchedulerResult) ([]*armadaevents.EventSequence, error) {
func (s *Scheduler) eventsFromSchedulerResult(result *schedulerobjects.SchedulerResult) ([]*armadaevents.EventSequence, error) {
return EventsFromSchedulerResult(result, s.clock.Now())
}

// EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult.
func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) {
func EventsFromSchedulerResult(result *schedulerobjects.SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) {
eventSequences := make([]*armadaevents.EventSequence, 0, len(result.PreemptedJobs)+len(result.ScheduledJobs)+len(result.FailedJobs))
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, PreemptedJobsFromSchedulerResult[*jobdb.Job](result), time)
eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](result), time)
if err != nil {
return nil, err
}
eventSequences, err = AppendEventSequencesFromScheduledJobs(eventSequences, ScheduledJobsFromSchedulerResult[*jobdb.Job](result), time)
eventSequences, err = AppendEventSequencesFromScheduledJobs(eventSequences, schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result), time)
if err != nil {
return nil, err
}
eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, FailedJobsFromSchedulerResult[*jobdb.Job](result), time)
eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](result), time)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/scheduler_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"time"

"github.com/armadaproject/armada/internal/common/util"
Expand Down Expand Up @@ -310,7 +311,7 @@ func (metrics *SchedulerMetrics) ReportReconcileCycleTime(cycleTime time.Duratio
metrics.reconcileCycleTime.Observe(float64(cycleTime.Milliseconds()))
}

func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result SchedulerResult) {
func (metrics *SchedulerMetrics) ReportSchedulerResult(ctx *armadacontext.Context, result schedulerobjects.SchedulerResult) {
if len(result.ScheduledJobs) == 0 && len(result.PreemptedJobs) == 0 && len(result.FailedJobs) == 0 {
return
}
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ type testSchedulingAlgo struct {
shouldError bool
}

func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error) {
func (t *testSchedulingAlgo) Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*schedulerobjects.SchedulerResult, error) {
t.numberOfScheduleCalls++
if t.shouldError {
return nil, errors.New("error scheduling jobs")
Expand Down Expand Up @@ -1189,7 +1189,7 @@ func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob](
scheduledJobs S,
failedJobs S,
nodeIdByJobId map[string]string,
) *SchedulerResult {
) *schedulerobjects.SchedulerResult {
castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs))
for i, job := range preemptedJobs {
castPreemptedJobs[i] = job
Expand All @@ -1202,7 +1202,7 @@ func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob](
for i, job := range failedJobs {
castFailedJobs[i] = job
}
return &SchedulerResult{
return &schedulerobjects.SchedulerResult{
PreemptedJobs: castPreemptedJobs,
ScheduledJobs: castScheduledJobs,
NodeIdByJobId: nodeIdByJobId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package scheduler
package schedulerobjects

import (
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down
14 changes: 7 additions & 7 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type SchedulingAlgo interface {
// Schedule should assign jobs to nodes.
// Any jobs that are scheduled should be marked as such in the JobDb using the transaction provided.
Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*SchedulerResult, error)
Schedule(ctx *armadacontext.Context, txn *jobdb.Txn) (*schedulerobjects.SchedulerResult, error)
}

// FairSchedulingAlgo is a SchedulingAlgo based on PreemptingQueueScheduler.
Expand Down Expand Up @@ -90,13 +90,13 @@ func NewFairSchedulingAlgo(
func (l *FairSchedulingAlgo) Schedule(
ctx *armadacontext.Context,
txn *jobdb.Txn,
) (*SchedulerResult, error) {
) (*schedulerobjects.SchedulerResult, error) {
var cancel context.CancelFunc
if l.maxSchedulingDuration != 0 {
ctx, cancel = armadacontext.WithTimeout(ctx, l.maxSchedulingDuration)
defer cancel()
}
overallSchedulerResult := &SchedulerResult{
overallSchedulerResult := &schedulerobjects.SchedulerResult{
NodeIdByJobId: make(map[string]string),
SchedulingContexts: make([]*schedulercontext.SchedulingContext, 0, 0),
FailedJobs: make([]interfaces.LegacySchedulerJob, 0),
Expand Down Expand Up @@ -169,9 +169,9 @@ func (l *FairSchedulingAlgo) Schedule(
}
}

preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
if err := txn.Upsert(preemptedJobs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
pool string,
minimumJobSize schedulerobjects.ResourceList,
executors []*schedulerobjects.Executor,
) (*SchedulerResult, *schedulercontext.SchedulingContext, error) {
) (*schedulerobjects.SchedulerResult, *schedulercontext.SchedulingContext, error) {
nodeDb, err := nodedb.NewNodeDb(
l.schedulingConfig.Preemption.PriorityClasses,
l.schedulingConfig.MaxExtraNodesToConsider,
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/scheduling_algo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestSchedule(t *testing.T) {
require.NoError(t, err)

// Check that the expected preemptions took place.
preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
actualPreemptedJobsByExecutorIndexAndNodeIndex := make(map[int]map[int][]int)
for _, job := range preemptedJobs {
executorIndex := executorIndexByJobId[job.Id()]
Expand All @@ -450,7 +450,7 @@ func TestSchedule(t *testing.T) {
}

// Check that jobs were scheduled as expected.
scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
actualScheduledIndices := make([]int, 0)
for _, job := range scheduledJobs {
actualScheduledIndices = append(actualScheduledIndices, queueIndexByJobId[job.Id()])
Expand All @@ -463,7 +463,7 @@ func TestSchedule(t *testing.T) {
}

// Check that we failed the correct number of excess jobs when a gang schedules >= minimum cardinality
failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult)
assert.Equal(t, tc.expectedFailedJobCount, len(failedJobs))

// Check that preempted jobs are marked as such consistently.
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,9 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {

// Update jobDb to reflect the decisions by the scheduler.
// Sort jobs to ensure deterministic event ordering.
preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult[*jobdb.Job](result)
scheduledJobs := scheduler.ScheduledJobsFromSchedulerResult[*jobdb.Job](result)
failedJobs := scheduler.FailedJobsFromSchedulerResult[*jobdb.Job](result)
preemptedJobs := schedulerobjects.PreemptedJobsFromSchedulerResult[*jobdb.Job](result)
scheduledJobs := schedulerobjects.ScheduledJobsFromSchedulerResult[*jobdb.Job](result)
failedJobs := schedulerobjects.FailedJobsFromSchedulerResult[*jobdb.Job](result)
less := func(a, b *jobdb.Job) bool {
if a.Queue() < b.Queue() {
return true
Expand Down

0 comments on commit 651bebe

Please sign in to comment.