Skip to content

Commit

Permalink
Add additional annotations to JobRunLeased event (armadaproject#3140)
Browse files Browse the repository at this point in the history
  • Loading branch information
msumner91 authored Dec 21, 2023
1 parent f1f0c06 commit 46d0d6a
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 282 deletions.
4 changes: 4 additions & 0 deletions internal/armada/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
FailFastAnnotation = "armadaproject.io/failFast"
)

const (
RuntimeGangCardinality = "runtime_gang_cardinality"
)

var ReturnLeaseRequestTrackedAnnotations = map[string]struct{}{
FailFastAnnotation: {},
}
115 changes: 71 additions & 44 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,28 @@ func TestGangScheduler(t *testing.T) {
// If present, assert that gang `i` is scheduled on nodes with node
// uniformity label `ExpectedNodeUniformity[i]`.
ExpectedNodeUniformity map[int]string
// The expected number of jobs we successfully scheduled between min gang cardinality and gang cardinality.
ExpectedRuntimeGangCardinality []int
}{
"simple success": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32},
ExpectedRuntimeGangCardinality: []int{32},
},
"simple failure": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33)),
},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"simple success where min cardinality is met": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand All @@ -73,8 +77,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 40),
),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32},
ExpectedRuntimeGangCardinality: []int{32},
},
"simple failure where min cardinality is not met": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand All @@ -85,8 +90,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 40),
),
},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"one success and one failure": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand All @@ -95,8 +101,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 32)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32, 32},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32, 32},
ExpectedRuntimeGangCardinality: []int{32, 0},
},
"one success and one failure using min cardinality": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand All @@ -108,17 +115,19 @@ func TestGangScheduler(t *testing.T) {
),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32, 32},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{32, 32},
ExpectedRuntimeGangCardinality: []int{32, 0},
},
"multiple nodes": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 64)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{64},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{64},
ExpectedRuntimeGangCardinality: []int{64},
},
"MaximumResourceFractionToSchedule": {
SchedulingConfig: testfixtures.WithRoundLimitsConfig(
Expand All @@ -131,8 +140,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 16)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 8)),
},
ExpectedScheduledIndices: []int{0, 1},
ExpectedScheduledJobs: []int{8, 24, 24},
ExpectedScheduledIndices: []int{0, 1},
ExpectedScheduledJobs: []int{8, 24, 24},
ExpectedRuntimeGangCardinality: []int{8, 16, 0},
},
"MaximumResourceFractionToScheduleByPool": {
SchedulingConfig: testfixtures.WithRoundLimitsConfig(
Expand All @@ -150,8 +160,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: []int{0, 1, 2},
ExpectedScheduledJobs: []int{1, 2, 3, 3, 3},
ExpectedScheduledIndices: []int{0, 1, 2},
ExpectedScheduledJobs: []int{1, 2, 3, 3, 3},
ExpectedRuntimeGangCardinality: []int{1, 1, 1, 0, 0},
},
"MaximumResourceFractionToScheduleByPool non-existing pool": {
SchedulingConfig: testfixtures.WithRoundLimitsConfig(
Expand All @@ -169,8 +180,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: []int{0, 1, 2, 3},
ExpectedScheduledJobs: []int{1, 2, 3, 4, 4},
ExpectedScheduledIndices: []int{0, 1, 2, 3},
ExpectedScheduledJobs: []int{1, 2, 3, 4, 4},
ExpectedRuntimeGangCardinality: []int{1, 1, 1, 1, 0},
},
"MaximumResourceFractionPerQueue": {
SchedulingConfig: testfixtures.WithPerPriorityLimitsConfig(
Expand All @@ -193,8 +205,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass3, 4)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass3, 5)),
},
ExpectedScheduledIndices: []int{0, 2, 4, 6},
ExpectedScheduledJobs: []int{1, 1, 3, 3, 6, 6, 10, 10},
ExpectedScheduledIndices: []int{0, 2, 4, 6},
ExpectedScheduledJobs: []int{1, 1, 3, 3, 6, 6, 10, 10},
ExpectedRuntimeGangCardinality: []int{1, 0, 2, 0, 3, 0, 4, 0},
},
"resolution has no impact on jobs of size a multiple of the resolution": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
Expand All @@ -213,8 +226,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 5),
ExpectedScheduledJobs: testfixtures.IntRange(1, 6),
ExpectedScheduledIndices: testfixtures.IntRange(0, 5),
ExpectedScheduledJobs: testfixtures.IntRange(1, 6),
ExpectedRuntimeGangCardinality: []int{1, 1, 1, 1, 1, 1},
},
"jobs of size not a multiple of the resolution blocks scheduling new jobs": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
Expand All @@ -231,8 +245,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)),
testfixtures.WithGangAnnotationsJobs(testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 2),
ExpectedScheduledJobs: []int{1, 2, 3, 3},
ExpectedScheduledIndices: testfixtures.IntRange(0, 2),
ExpectedScheduledJobs: []int{1, 2, 3, 3},
ExpectedRuntimeGangCardinality: []int{1, 1, 1, 0},
},
"consider all nodes in the bucket": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
Expand Down Expand Up @@ -268,8 +283,9 @@ func TestGangScheduler(t *testing.T) {
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsJobs(testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{1},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedScheduledJobs: []int{1},
ExpectedRuntimeGangCardinality: []int{1},
},
"NodeUniformityLabel set but not indexed": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand All @@ -284,8 +300,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1),
)),
},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"NodeUniformityLabel not set": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
Expand All @@ -300,8 +317,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 1),
)),
},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"NodeUniformityLabel insufficient capacity": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
Expand All @@ -323,8 +341,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.WithNodeUniformityLabelAnnotationJobs("foo", testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 3)),
),
},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedScheduledIndices: nil,
ExpectedScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"NodeUniformityLabel": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
Expand Down Expand Up @@ -360,8 +379,9 @@ func TestGangScheduler(t *testing.T) {
testfixtures.N16Cpu128GiJobs("A", testfixtures.PriorityClass0, 4),
)),
},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{4},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{4},
ExpectedRuntimeGangCardinality: []int{4},
},
"NodeUniformityLabel NumScheduled tiebreak": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
Expand All @@ -387,9 +407,10 @@ func TestGangScheduler(t *testing.T) {
),
),
},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{3},
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{3},
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedRuntimeGangCardinality: []int{3},
},
"NodeUniformityLabel PreemptedAtPriority tiebreak": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
Expand Down Expand Up @@ -423,9 +444,10 @@ func TestGangScheduler(t *testing.T) {
),
),
},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{2},
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedScheduledIndices: []int{0},
ExpectedScheduledJobs: []int{2},
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedRuntimeGangCardinality: []int{2},
},
"AwayNodeTypes": {
SchedulingConfig: func() configuration.SchedulingConfig {
Expand Down Expand Up @@ -491,8 +513,9 @@ func TestGangScheduler(t *testing.T) {
gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away-both", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000))})
return
}(),
ExpectedScheduledIndices: []int{1},
ExpectedScheduledJobs: []int{0, 1},
ExpectedScheduledIndices: []int{1},
ExpectedScheduledJobs: []int{0, 1},
ExpectedRuntimeGangCardinality: []int{0, 1},
},
}
for name, tc := range tests {
Expand Down Expand Up @@ -572,6 +595,10 @@ func TestGangScheduler(t *testing.T) {
gctx := schedulercontext.NewGangSchedulingContext(jctxs)
ok, reason, err := sch.Schedule(armadacontext.Background(), gctx)
require.NoError(t, err)

// Runtime cardinality should match expected scheduled jobs
require.Equal(t, tc.ExpectedRuntimeGangCardinality[i], gctx.Fit().NumScheduled)

if ok {
require.Empty(t, reason)
actualScheduledIndices = append(actualScheduledIndices, i)
Expand Down
11 changes: 6 additions & 5 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,12 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
}
}
return &SchedulerResult{
PreemptedJobs: preemptedJobs,
ScheduledJobs: scheduledJobs,
FailedJobs: schedulerResult.FailedJobs,
NodeIdByJobId: sch.nodeIdByJobId,
SchedulingContexts: []*schedulercontext.SchedulingContext{sch.schedulingContext},
PreemptedJobs: preemptedJobs,
ScheduledJobs: scheduledJobs,
FailedJobs: schedulerResult.FailedJobs,
NodeIdByJobId: sch.nodeIdByJobId,
AdditionalAnnotationsByJobId: schedulerResult.AdditionalAnnotationsByJobId,
SchedulingContexts: []*schedulercontext.SchedulingContext{sch.schedulingContext},
}, nil
}

Expand Down
19 changes: 14 additions & 5 deletions internal/scheduler/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package scheduler
import (
"container/heap"
"reflect"
"strconv"

"github.com/armadaproject/armada/internal/armada/configuration"

"github.com/pkg/errors"

Expand Down Expand Up @@ -60,6 +63,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
nodeIdByJobId := make(map[string]string)
var scheduledJobs []*schedulercontext.JobSchedulingContext
var failedJobs []*schedulercontext.JobSchedulingContext
additionalAnnotationsByJobId := map[string]map[string]string{}
for {
// Peek() returns the next gang to try to schedule. Call Clear() before calling Peek() again.
// Calling Clear() after (failing to) schedule ensures we get the next gang in order of smallest fair share.
Expand Down Expand Up @@ -89,10 +93,14 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
return nil, err
} else if ok {
// We scheduled the minimum number of gang jobs required.
numScheduled := gctx.Fit().NumScheduled
for _, jctx := range gctx.JobSchedulingContexts {
if pctx := jctx.PodSchedulingContext; pctx.IsSuccessful() {
scheduledJobs = append(scheduledJobs, jctx)
nodeIdByJobId[jctx.JobId] = pctx.NodeId

// Add additional annotations for runtime gang cardinality
additionalAnnotationsByJobId[jctx.JobId] = map[string]string{configuration.RuntimeGangCardinality: strconv.Itoa(numScheduled)}
}
}

Expand Down Expand Up @@ -125,11 +133,12 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul
return nil, errors.Errorf("only %d out of %d jobs mapped to a node", len(nodeIdByJobId), len(scheduledJobs))
}
return &SchedulerResult{
PreemptedJobs: nil,
ScheduledJobs: scheduledJobs,
FailedJobs: failedJobs,
NodeIdByJobId: nodeIdByJobId,
SchedulingContexts: []*schedulercontext.SchedulingContext{sch.schedulingContext},
PreemptedJobs: nil,
ScheduledJobs: scheduledJobs,
FailedJobs: failedJobs,
NodeIdByJobId: nodeIdByJobId,
AdditionalAnnotationsByJobId: additionalAnnotationsByJobId,
SchedulingContexts: []*schedulercontext.SchedulingContext{sch.schedulingContext},
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/scheduler/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type SchedulerResult struct {
// These are the corresponding scheduling contexts.
// TODO: This doesn't seem like the right approach.
SchedulingContexts []*schedulercontext.SchedulingContext
// Additional annotations to be appended to the PodSpec.
// Format: JobId -> AnnotationName -> AnnotationValue.
AdditionalAnnotationsByJobId map[string]map[string]string
}

// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result cast to type T.
Expand Down
Loading

0 comments on commit 46d0d6a

Please sign in to comment.