Skip to content

Commit

Permalink
Remove support for Partial Gangs (armadaproject#3621)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* fix test

Signed-off-by: Chris Martin <[email protected]>

* fix test

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: JamesMurkin <[email protected]>
  • Loading branch information
3 people authored May 28, 2024
1 parent ae456f5 commit b3c23d1
Show file tree
Hide file tree
Showing 20 changed files with 48 additions and 470 deletions.
7 changes: 0 additions & 7 deletions internal/armada/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ const (
// GangCardinalityAnnotation All jobs in a gang must specify the total number of jobs in the gang via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangCardinalityAnnotation = "armadaproject.io/gangCardinality"
// GangMinimumCardinalityAnnotation All jobs in a gang must specify the minimum size for the gang to be schedulable via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangMinimumCardinalityAnnotation = "armadaproject.io/gangMinimumCardinality"
// The jobs that make up a gang may be constrained to be scheduled across a set of uniform nodes.
// Specifically, if provided, all gang jobs are scheduled onto nodes for which the value of the provided label is equal.
// Used to ensure, e.g., that all gang jobs are scheduled onto the same cluster or rack.
GangNodeUniformityLabelAnnotation = "armadaproject.io/gangNodeUniformityLabel"
// GangNumJobsScheduledAnnotation is set by the scheduler and indicates how many gang jobs were scheduled.
// For example, a gang composed of 4 jobs may only have a subset be scheduled if GangMinimumCardinalityAnnotation < 4.
GangNumJobsScheduledAnnotation = "armadaproject.io/numGangJobsScheduled"
// FailFastAnnotation, if set to true, ensures Armada does not re-schedule jobs that fail to start.
// Instead, the job the pod is part of fails immediately.
FailFastAnnotation = "armadaproject.io/failFast"
Expand All @@ -25,9 +20,7 @@ const (
var schedulingAnnotations = map[string]bool{
GangIdAnnotation: true,
GangCardinalityAnnotation: true,
GangMinimumCardinalityAnnotation: true,
GangNodeUniformityLabelAnnotation: true,
GangNumJobsScheduledAnnotation: true,
FailFastAnnotation: true,
}

Expand Down
6 changes: 0 additions & 6 deletions internal/armada/submit/validation/submit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,6 @@ func validateGangs(request *api.JobSubmitRequest, _ configuration.SubmissionConf
actual.Id, expected.Cardinality, actual.Cardinality,
)
}
if expected.MinimumCardinality != actual.MinimumCardinality {
return errors.Errorf(
"inconsistent gang minimum cardinality in gang %s: expected %d but got %d",
actual.Id, expected.MinimumCardinality, actual.MinimumCardinality,
)
}
if expected.PriorityClassName != actual.PriorityClassName {
return errors.Errorf(
"inconsistent PriorityClassName in gang %s: expected %s but got %s",
Expand Down
12 changes: 0 additions & 12 deletions internal/armada/submit/validation/submit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,6 @@ func TestValidateGangs(t *testing.T) {
},
expectSuccess: true,
},
"complete gang job of cardinality 2 with minimum cardinality of 1": {
jobRequests: []*api.JobSubmitRequestItem{
{
Annotations: map[string]string{
configuration.GangIdAnnotation: "foo",
configuration.GangCardinalityAnnotation: strconv.Itoa(2),
configuration.GangMinimumCardinalityAnnotation: strconv.Itoa(1),
},
},
},
expectSuccess: true,
},
"empty gangId": {
jobRequests: []*api.JobSubmitRequestItem{
{
Expand Down
55 changes: 12 additions & 43 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,16 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {

func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) (bool, error) {
allJobsEvictedInThisRound := true
numberOfSuccessfulJobs := 0
allJobsSuccessful := true
for _, jctx := range gctx.JobSchedulingContexts {
evictedInThisRound, err := sctx.AddJobSchedulingContext(jctx)
if err != nil {
return false, err
}
allJobsEvictedInThisRound = allJobsEvictedInThisRound && evictedInThisRound
if jctx.IsSuccessful() {
numberOfSuccessfulJobs++
}
allJobsSuccessful = allJobsSuccessful && jctx.IsSuccessful()
}
if numberOfSuccessfulJobs >= gctx.GangInfo.MinimumCardinality && !allJobsEvictedInThisRound {
if allJobsSuccessful && !allJobsEvictedInThisRound {
sctx.NumScheduledGangs++
}
return allJobsEvictedInThisRound, nil
Expand Down Expand Up @@ -358,13 +356,6 @@ type QueueSchedulingContext struct {
EvictedJobsById map[string]bool
}

func GetSchedulingContextFromQueueSchedulingContext(qctx *QueueSchedulingContext) *SchedulingContext {
if qctx == nil {
return nil
}
return qctx.SchedulingContext
}

func (qctx *QueueSchedulingContext) String() string {
return qctx.ReportString(0)
}
Expand Down Expand Up @@ -629,8 +620,6 @@ type JobSchedulingContext struct {
// GangInfo holds all the information that is necessary to schedule a gang,
// such as the lower and upper bounds on its size.
GangInfo
// If set, indicates this job should be failed back to the client when the gang is scheduled.
ShouldFail bool
}

func (jctx *JobSchedulingContext) String() string {
Expand Down Expand Up @@ -689,24 +678,22 @@ func (jctx *JobSchedulingContext) GetNodeSelector(key string) (string, bool) {
}

type GangInfo struct {
Id string
Cardinality int
MinimumCardinality int
PriorityClassName string
NodeUniformity string
Id string
Cardinality int
PriorityClassName string
NodeUniformity string
}

// EmptyGangInfo returns a GangInfo for a job that is not in a gang.
func EmptyGangInfo(job interfaces.MinimalJob) GangInfo {
return GangInfo{
// An Id of "" indicates that this job is not in a gang; we set
// Cardinality and MinimumCardinality (as well as the other fields,
// Cardinality (as well as the other fields,
// which all make sense in this context) accordingly.
Id: "",
Cardinality: 1,
MinimumCardinality: 1,
PriorityClassName: job.PriorityClassName(),
NodeUniformity: job.Annotations()[configuration.GangNodeUniformityLabelAnnotation],
Id: "",
Cardinality: 1,
PriorityClassName: job.PriorityClassName(),
NodeUniformity: job.Annotations()[configuration.GangNodeUniformityLabelAnnotation],
}
}

Expand Down Expand Up @@ -735,25 +722,8 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error)
return gangInfo, errors.Errorf("gang cardinality %d is non-positive", gangCardinality)
}

gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation]
if !ok {
// If it is not set, use gangCardinality as the minimum gang size.
gangMinimumCardinalityString = gangCardinalityString
}
gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString)
if err != nil {
return gangInfo, errors.WithStack(err)
}
if gangMinimumCardinality <= 0 {
return gangInfo, errors.Errorf("gang minimum cardinality %d is non-positive", gangMinimumCardinality)
}
if gangMinimumCardinality > gangCardinality {
return gangInfo, errors.Errorf("gang minimum cardinality %d is greater than gang cardinality %d", gangMinimumCardinality, gangCardinality)
}

gangInfo.Id = gangId
gangInfo.Cardinality = gangCardinality
gangInfo.MinimumCardinality = gangMinimumCardinality
return gangInfo, nil
}

Expand All @@ -776,7 +746,6 @@ func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
Job: job,
PodRequirements: job.PodRequirements(),
GangInfo: gangInfo,
ShouldFail: false,
}
}

Expand Down
9 changes: 0 additions & 9 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,9 @@ func (sch *GangScheduler) tryScheduleGangWithTxn(_ *armadacontext.Context, txn *
} else {
unschedulableReason = "job does not fit on any node"
}
} else {
// When a gang schedules successfully, update state for failed jobs if they exist.
for _, jctx := range gctx.JobSchedulingContexts {
if jctx.ShouldFail {
jctx.Fail("job does not fit on any node")
}
}
}

return
}

return
}

Expand Down
92 changes: 2 additions & 90 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,6 @@ func TestGangScheduler(t *testing.T) {
ExpectedCumulativeScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"simple success where min cardinality is met": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsAndMinCardinalityJobs(
32,
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 40),
),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedCumulativeScheduledJobs: []int{32},
ExpectedRuntimeGangCardinality: []int{32},
},
"simple failure where min cardinality is not met": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsAndMinCardinalityJobs(
33,
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 40),
),
},
ExpectedScheduledIndices: nil,
ExpectedCumulativeScheduledJobs: []int{0},
ExpectedRuntimeGangCardinality: []int{0},
},
"one success and one failure": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Expand All @@ -106,20 +80,6 @@ func TestGangScheduler(t *testing.T) {
ExpectedCumulativeScheduledJobs: []int{32, 32},
ExpectedRuntimeGangCardinality: []int{32, 0},
},
"one success and one failure using min cardinality": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsAndMinCardinalityJobs(
32,
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 33),
),
testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)),
},
ExpectedScheduledIndices: testfixtures.IntRange(0, 0),
ExpectedCumulativeScheduledJobs: []int{32, 32},
ExpectedRuntimeGangCardinality: []int{32, 0},
},
"multiple nodes": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Expand Down Expand Up @@ -400,11 +360,10 @@ func TestGangScheduler(t *testing.T) {
)...,
),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsAndMinCardinalityJobs(
2,
testfixtures.WithGangAnnotationsJobs(
testfixtures.WithNodeUniformityLabelAnnotationJobs(
"my-cool-node-uniformity",
testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 4),
testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3),
),
),
},
Expand All @@ -413,43 +372,6 @@ func TestGangScheduler(t *testing.T) {
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedRuntimeGangCardinality: []int{3},
},
"NodeUniformityLabel PreemptedAtPriority tiebreak": {
SchedulingConfig: testfixtures.WithIndexedNodeLabelsConfig(
[]string{"my-cool-node-uniformity"},
testfixtures.TestSchedulingConfig(),
),
Nodes: append(
testfixtures.WithUsedResourcesNodes(
1,
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}},
testfixtures.WithLabelsNodes(
map[string]string{"my-cool-node-uniformity": "a"},
testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
),
),
testfixtures.WithUsedResourcesNodes(
0,
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}},
testfixtures.WithLabelsNodes(
map[string]string{"my-cool-node-uniformity": "b"},
testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
),
)...,
),
Gangs: [][]*jobdb.Job{
testfixtures.WithGangAnnotationsAndMinCardinalityJobs(
2,
testfixtures.WithNodeUniformityLabelAnnotationJobs(
"my-cool-node-uniformity",
testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass2, 4),
),
),
},
ExpectedScheduledIndices: []int{0},
ExpectedCumulativeScheduledJobs: []int{2},
ExpectedNodeUniformity: map[int]string{0: "b"},
ExpectedRuntimeGangCardinality: []int{2},
},
"AwayNodeTypes": {
SchedulingConfig: func() configuration.SchedulingConfig {
config := testfixtures.TestSchedulingConfig()
Expand Down Expand Up @@ -691,16 +613,6 @@ func TestGangScheduler(t *testing.T) {
}
}

// Verify any excess jobs that failed have the correct state set
for _, jctx := range jctxs {
if jctx.ShouldFail {
if jctx.PodSchedulingContext != nil {
require.Equal(t, "", jctx.PodSchedulingContext.NodeId)
}
require.Equal(t, "job does not fit on any node", jctx.UnschedulableReason)
}
}

// Verify accounting
scheduledGangs++
require.Equal(t, scheduledGangs, sch.schedulingContext.NumScheduledGangs)
Expand Down
35 changes: 13 additions & 22 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,43 +444,34 @@ func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*internaltypes.Node, map[str

func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) {
// Attempt to schedule pods one by one in a transaction.
numScheduled := 0
for _, jctx := range gctx.JobSchedulingContexts {
// In general, we may attempt to schedule a gang multiple times (in
// order to find the best fit for this gang); clear out any remnants of
// previous attempts.
jctx.UnschedulableReason = ""
jctx.ShouldFail = false

node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx)
if err != nil {
return false, err
}

if node == nil {
// Indicates that when the min cardinality is met, we should fail this job back to the client.
jctx.ShouldFail = true
continue
}

// If we found a node for this pod, bind it and continue to the next pod.
if node, err := nodeDb.bindJobToNode(node, jctx.Job, jctx.PodSchedulingContext.ScheduledAtPriority); err != nil {
return false, err
} else {
if err := nodeDb.UpsertWithTxn(txn, node); err != nil {
if node != nil {
// If we found a node for this pod, bind it and continue to the next pod.
if node, err := nodeDb.bindJobToNode(node, jctx.Job, jctx.PodSchedulingContext.ScheduledAtPriority); err != nil {
return false, err
} else {
if err := nodeDb.UpsertWithTxn(txn, node); err != nil {
return false, err
}
}
}

// Once a job is scheduled, it should no longer be considered for preemption.
if err := deleteEvictedJobSchedulingContextIfExistsWithTxn(txn, jctx.JobId); err != nil {
return false, err
// Once a job is scheduled, it should no longer be considered for preemption.
if err := deleteEvictedJobSchedulingContextIfExistsWithTxn(txn, jctx.JobId); err != nil {
return false, err
}
} else {
return false, nil
}

numScheduled++
}
if numScheduled < gctx.GangInfo.MinimumCardinality {
return false, nil
}
return true, nil
}
Expand Down
Loading

0 comments on commit b3c23d1

Please sign in to comment.