Skip to content

Commit

Permalink
Add GangInfo (armadaproject#3236)
Browse files Browse the repository at this point in the history
Signed-off-by: Noah Held <[email protected]>
  • Loading branch information
zuqq authored Jan 15, 2024
1 parent 3178297 commit 7dd31a1
Show file tree
Hide file tree
Showing 23 changed files with 240 additions and 290 deletions.
4 changes: 2 additions & 2 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,11 @@ func (q *AggregatedQueueServer) getJobs(ctx *armadacontext.Context, req *api.Str

// Group gangs.
for _, job := range jobs {
gangId, _, _, isGangJob, err := scheduler.GangIdAndCardinalityFromLegacySchedulerJob(job)
gangInfo, err := schedulercontext.GangInfoFromLegacySchedulerJob(job)
if err != nil {
return nil, err
}
if isGangJob {
if gangId := gangInfo.Id; gangId != "" {
if m := jobIdsByGangId[gangId]; m != nil {
m[job.Id] = true
} else {
Expand Down
54 changes: 16 additions & 38 deletions internal/common/validation/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package validation
import (
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/scheduler"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/util"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/pkg/api"
)

Expand All @@ -33,63 +31,43 @@ func ValidateApiJobs(jobs []*api.Job, config configuration.SchedulingConfig) ([]
return nil, nil
}

type gangDetails = struct {
expectedCardinality int
expectedMinimumCardinality int
expectedPriorityClassName string
expectedNodeUniformityLabel string
}

func validateGangs(jobs []*api.Job) (map[string]gangDetails, error) {
gangDetailsByGangId := make(map[string]gangDetails)
func validateGangs(jobs []*api.Job) (map[string]schedulercontext.GangInfo, error) {
gangDetailsByGangId := make(map[string]schedulercontext.GangInfo)
for i, job := range jobs {
annotations := job.Annotations
gangId, gangCardinality, gangMinimumCardinality, isGangJob, err := scheduler.GangIdAndCardinalityFromAnnotations(annotations)
nodeUniformityLabel := annotations[configuration.GangNodeUniformityLabelAnnotation]
actual, err := schedulercontext.GangInfoFromLegacySchedulerJob(job)
if err != nil {
return nil, errors.WithMessagef(err, "%d-th job with id %s in gang %s", i, job.Id, gangId)
return nil, errors.WithMessagef(err, "%d-th job with id %s", i, job.Id)
}
if !isGangJob {
if actual.Id == "" {
continue
}
if gangId == "" {
return nil, errors.Errorf("empty gang id for %d-th job with id %s", i, job.Id)
}
podSpec := util.PodSpecFromJob(job)
if details, ok := gangDetailsByGangId[gangId]; ok {
if details.expectedCardinality != gangCardinality {
if expected, ok := gangDetailsByGangId[actual.Id]; ok {
if expected.Cardinality != actual.Cardinality {
return nil, errors.Errorf(
"inconsistent gang cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedCardinality, gangCardinality,
i, job.Id, actual.Id, expected.Cardinality, actual.Cardinality,
)
}
if details.expectedMinimumCardinality != gangMinimumCardinality {
if expected.MinimumCardinality != actual.MinimumCardinality {
return nil, errors.Errorf(
"inconsistent gang minimum cardinality for %d-th job with id %s in gang %s: expected %d but got %d",
i, job.Id, gangId, details.expectedMinimumCardinality, gangMinimumCardinality,
i, job.Id, actual.Id, expected.MinimumCardinality, actual.MinimumCardinality,
)
}
if podSpec != nil && details.expectedPriorityClassName != podSpec.PriorityClassName {
if expected.PriorityClassName != actual.PriorityClassName {
return nil, errors.Errorf(
"inconsistent PriorityClassName for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedPriorityClassName, podSpec.PriorityClassName,
i, job.Id, actual.Id, expected.PriorityClassName, actual.PriorityClassName,
)
}
if nodeUniformityLabel != details.expectedNodeUniformityLabel {
if actual.NodeUniformity != expected.NodeUniformity {
return nil, errors.Errorf(
"inconsistent nodeUniformityLabel for %d-th job with id %s in gang %s: expected %s but got %s",
i, job.Id, gangId, details.expectedNodeUniformityLabel, nodeUniformityLabel,
i, job.Id, actual.Id, expected.NodeUniformity, actual.NodeUniformity,
)
}
gangDetailsByGangId[gangId] = details
} else {
details.expectedCardinality = gangCardinality
details.expectedMinimumCardinality = gangMinimumCardinality
if podSpec != nil {
details.expectedPriorityClassName = podSpec.PriorityClassName
}
details.expectedNodeUniformityLabel = nodeUniformityLabel
gangDetailsByGangId[gangId] = details
gangDetailsByGangId[actual.Id] = actual
}
}
return gangDetailsByGangId, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/common/validation/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestValidateGangs(t *testing.T) {
}

for id, e := range gangDetailsById {
assert.Equal(t, tc.ExpectedGangMinimumCardinalityByGangId[id], e.expectedMinimumCardinality)
assert.Equal(t, tc.ExpectedGangMinimumCardinalityByGangId[id], e.MinimumCardinality)
}
})
}
Expand Down
47 changes: 0 additions & 47 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package scheduler

import (
"fmt"
"strconv"

"github.com/pkg/errors"
"golang.org/x/exp/maps"

"github.com/armadaproject/armada/internal/armada/configuration"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down Expand Up @@ -62,47 +59,3 @@ func JobsSummary(jctxs []*schedulercontext.JobSchedulingContext) string {
jobIdsByQueue,
)
}

// GangIdAndCardinalityFromLegacySchedulerJob returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).
func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (string, int, int, bool, error) {
return GangIdAndCardinalityFromAnnotations(job.GetAnnotations())
}

// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).
func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, int, bool, error) {
if annotations == nil {
return "", 1, 1, false, nil
}
gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return "", 1, 1, false, nil
}
gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return "", 1, 1, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return "", 1, 1, false, errors.WithStack(err)
}
if gangCardinality <= 0 {
return "", 1, 1, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
}
gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation]
if !ok {
// If this is not set, default the minimum gang size to gangCardinality
return gangId, gangCardinality, gangCardinality, true, nil
} else {
gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString)
if err != nil {
return "", 1, 1, false, errors.WithStack(err)
}
if gangMinimumCardinality <= 0 {
return "", 1, 1, false, errors.Errorf("gang minimum cardinality is non-positive %d", gangMinimumCardinality)
}
if gangMinimumCardinality > gangCardinality {
return "", 1, 1, false, errors.Errorf("gang minimum cardinality %d cannot be greater than gang cardinality %d", gangMinimumCardinality, gangCardinality)
}
return gangId, gangCardinality, gangMinimumCardinality, true, nil
}
}
5 changes: 3 additions & 2 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ func (constraints *SchedulingConstraints) CheckConstraints(
}

// PriorityClassSchedulingConstraintsByPriorityClassName check.
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
priorityClassName := gctx.GangInfo.PriorityClassName
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[priorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, MaximumResourcesPerQueueExceededUnschedulableReason, nil
}
}
Expand Down
142 changes: 93 additions & 49 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package context

import (
"fmt"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand Down Expand Up @@ -230,7 +231,7 @@ func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingCont
numberOfSuccessfulJobs++
}
}
if numberOfSuccessfulJobs >= gctx.GangMinCardinality && !allJobsEvictedInThisRound {
if numberOfSuccessfulJobs >= gctx.GangInfo.MinimumCardinality && !allJobsEvictedInThisRound {
sctx.NumScheduledGangs++
}
return allJobsEvictedInThisRound, nil
Expand Down Expand Up @@ -522,46 +523,31 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}

type GangSchedulingContext struct {
Created time.Time
Queue string
PriorityClassName string
Created time.Time
Queue string
GangInfo
JobSchedulingContexts []*JobSchedulingContext
TotalResourceRequests schedulerobjects.ResourceList
AllJobsEvicted bool
NodeUniformityLabel string
GangMinCardinality int
}

func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingContext {
// We assume that all jobs in a gang are in the same queue and have the same priority class
// (which we enforce at job submission).
queue := ""
priorityClassName := ""
nodeUniformityLabel := ""
gangMinCardinality := 1
if len(jctxs) > 0 {
queue = jctxs[0].Job.GetQueue()
priorityClassName = jctxs[0].Job.GetPriorityClassName()
if jctxs[0].PodRequirements != nil {
nodeUniformityLabel = jctxs[0].PodRequirements.Annotations[configuration.GangNodeUniformityLabelAnnotation]
}
gangMinCardinality = jctxs[0].GangMinCardinality
}
allJobsEvicted := true
totalResourceRequests := schedulerobjects.NewResourceList(4)
for _, jctx := range jctxs {
allJobsEvicted = allJobsEvicted && jctx.IsEvicted
totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
}
// Uniformity of the values that we pick off the first job in the gang was
// checked when the jobs were submitted (e.g., in ValidateApiJobs).
representative := jctxs[0]
return &GangSchedulingContext{
Created: time.Now(),
Queue: queue,
PriorityClassName: priorityClassName,
Queue: representative.Job.GetQueue(),
GangInfo: representative.GangInfo,
JobSchedulingContexts: jctxs,
TotalResourceRequests: totalResourceRequests,
AllJobsEvicted: allJobsEvicted,
NodeUniformityLabel: nodeUniformityLabel,
GangMinCardinality: gangMinCardinality,
}
}

Expand Down Expand Up @@ -629,12 +615,9 @@ type JobSchedulingContext struct {
UnschedulableReason string
// Pod scheduling contexts for the individual pods that make up the job.
PodSchedulingContext *PodSchedulingContext
// Id of the gang to which this job belongs.
GangId string
// The size of the gang associated with this job.
GangCardinality int
// The minimum size of the gang associated with this job.
GangMinCardinality int
// GangInfo holds all the information that is necessary to schedule a gang,
// such as the lower and upper bounds on its size.
GangInfo
// If set, indicates this job should be failed back to the client when the gang is scheduled.
ShouldFail bool
}
Expand All @@ -652,7 +635,6 @@ func (jctx *JobSchedulingContext) String() string {
if jctx.PodSchedulingContext != nil {
fmt.Fprint(w, jctx.PodSchedulingContext.String())
}
fmt.Fprintf(w, "GangMinCardinality:\t%d\n", jctx.GangMinCardinality)
w.Flush()
return sb.String()
}
Expand Down Expand Up @@ -699,33 +681,95 @@ func (jctx *JobSchedulingContext) GetNodeSelector(key string) (string, bool) {
return "", false
}

func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J, extractGangInfo func(map[string]string) (string, int, int, bool, error)) []*JobSchedulingContext {
type GangInfo struct {
Id string
Cardinality int
MinimumCardinality int
PriorityClassName string
NodeUniformity string
}

// EmptyGangInfo returns a GangInfo for a job that is not in a gang.
func EmptyGangInfo(job interfaces.LegacySchedulerJob) GangInfo {
return GangInfo{
// An Id of "" indicates that this job is not in a gang; we set
// Cardinality and MinimumCardinality (as well as the other fields,
// which all make sense in this context) accordingly.
Id: "",
Cardinality: 1,
MinimumCardinality: 1,
PriorityClassName: job.GetPriorityClassName(),
NodeUniformity: job.GetAnnotations()[configuration.GangNodeUniformityLabelAnnotation],
}
}

func GangInfoFromLegacySchedulerJob(job interfaces.LegacySchedulerJob) (GangInfo, error) {
gangInfo := EmptyGangInfo(job)

annotations := job.GetAnnotations()

gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return gangInfo, nil
}
if gangId == "" {
return gangInfo, errors.Errorf("gang id is empty")
}

gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return gangInfo, errors.Errorf("annotation %s is missing", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return gangInfo, errors.WithStack(err)
}
if gangCardinality <= 0 {
return gangInfo, errors.Errorf("gang cardinality %d is non-positive", gangCardinality)
}

gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation]
if !ok {
// If it is not set, use gangCardinality as the minimum gang size.
gangMinimumCardinalityString = gangCardinalityString
}
gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString)
if err != nil {
return gangInfo, errors.WithStack(err)
}
if gangMinimumCardinality <= 0 {
return gangInfo, errors.Errorf("gang minimum cardinality %d is non-positive", gangMinimumCardinality)
}
if gangMinimumCardinality > gangCardinality {
return gangInfo, errors.Errorf("gang minimum cardinality %d is greater than gang cardinality %d", gangMinimumCardinality, gangCardinality)
}

gangInfo.Id = gangId
gangInfo.Cardinality = gangCardinality
gangInfo.MinimumCardinality = gangMinimumCardinality
return gangInfo, nil
}

func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
for i, job := range jobs {
jctxs[i] = JobSchedulingContextFromJob(priorityClasses, job, extractGangInfo)
jctxs[i] = JobSchedulingContextFromJob(priorityClasses, job)
}
return jctxs
}

func JobSchedulingContextFromJob(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob, extractGangInfo func(map[string]string) (string, int, int, bool, error)) *JobSchedulingContext {
// TODO: Move cardinality to gang context only and remove from here.
// Requires re-phrasing nodedb in terms of gang context, as well as feeding the value extracted from the annotations downstream.
gangId, gangCardinality, gangMinCardinality, _, err := extractGangInfo(job.GetAnnotations())
func JobSchedulingContextFromJob(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob) *JobSchedulingContext {
gangInfo, err := GangInfoFromLegacySchedulerJob(job)
if err != nil {
logrus.Errorf("failed to get cardinality from job %s: %s", job.GetId(), err)
gangId = job.GetId()
gangCardinality = 1
gangMinCardinality = 1
logrus.Errorf("failed to extract gang info from job %s: %s", job.GetId(), err)
}
return &JobSchedulingContext{
Created: time.Now(),
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
GangId: gangId,
GangCardinality: gangCardinality,
GangMinCardinality: gangMinCardinality,
ShouldFail: false,
Created: time.Now(),
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
GangInfo: gangInfo,
ShouldFail: false,
}
}

Expand Down
Loading

0 comments on commit 7dd31a1

Please sign in to comment.