Skip to content

Commit

Permalink
Home-away scheduling fixes (#3447)
Browse files Browse the repository at this point in the history
* Correctly pass through priority to schedule pods at in nodeDb

* Pass through tolerations after eviction

* Restore temporary workaround

* mage proto

* mage proto

* Remove unused proto import

* mage proto
  • Loading branch information
severinson authored Mar 7, 2024
1 parent f3640f8 commit 0f09fc5
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 321 deletions.
3 changes: 2 additions & 1 deletion internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,8 @@ type PodSchedulingContext struct {
NodeId string
// If set, indicates that the pod was scheduled on a specific node type.
WellKnownNodeTypeName string
// Priority at which this pod was scheduled.
// Priority this pod was most recently attempted to be scheduled at.
// If scheduling was successful, resources were marked as allocated to the job at this priority.
ScheduledAtPriority int32
// Maximum priority that this pod preempted other pods at.
PreemptedAtPriority int32
Expand Down
183 changes: 115 additions & 68 deletions internal/scheduler/gang_scheduler_test.go

Large diffs are not rendered by default.

45 changes: 24 additions & 21 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,12 +574,17 @@ func deleteEvictedJobSchedulingContextIfExistsWithTxn(txn *memdb.Txn, jobId stri
// SelectNodeForJobWithTxn selects a node on which the job can be scheduled.
func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*Node, error) {
req := jctx.PodRequirements

priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(nodeDb.priorityClasses, nodeDb.defaultPriorityClass, jctx.Job)

// If the job has already been scheduled, get the priority at which it was scheduled.
// Otherwise, get the original priority the job was submitted with.
priority, ok := nodeDb.GetScheduledAtPriority(jctx.JobId)
if !ok {
priority = req.Priority
}
pctx := &schedulercontext.PodSchedulingContext{
Created: time.Now(),
ScheduledAtPriority: -1,
ScheduledAtPriority: priority,
PreemptedAtPriority: MinPriority,
NumNodes: nodeDb.numNodes,
NumExcludedNodesByReason: make(map[string]int),
Expand All @@ -606,14 +611,6 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
if it, err := txn.Get("nodes", "id", nodeId); err != nil {
return nil, errors.WithStack(err)
} else {
priority, ok := nodeDb.GetScheduledAtPriority(jctx.JobId)
if !ok {
// We only get here if the node ID label was set by the user
// (instead of the scheduler); home-away preemption is ignored
// in that case.
priority = req.Priority
}
pctx.ScheduledAtPriority = priority
if node, err := nodeDb.selectNodeForPodWithItAtPriority(it, jctx, priority, true); err != nil {
return nil, err
} else {
Expand All @@ -622,7 +619,7 @@ func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercon
}
}

node, err := nodeDb.selectNodeForJobWithTxnAtPriority(txn, jctx, req.Priority)
node, err := nodeDb.selectNodeForJobWithTxnAtPriority(txn, jctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -672,25 +669,22 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAndAwayNodeType(
jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, v1.Toleration{Key: taint.Key, Value: taint.Value, Effect: taint.Effect})
}

node, err = nodeDb.selectNodeForJobWithTxnAtPriority(txn, jctx, awayNodeType.Priority)
jctx.PodSchedulingContext.ScheduledAtPriority = awayNodeType.Priority
node, err = nodeDb.selectNodeForJobWithTxnAtPriority(txn, jctx)
return
}

func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
txn *memdb.Txn,
jctx *schedulercontext.JobSchedulingContext,
priority int32,
) (*Node, error) {
req := jctx.PodRequirements
pctx := jctx.PodSchedulingContext

matchingNodeTypeIds, numExcludedNodesByReason, err := nodeDb.NodeTypesMatchingJob(jctx)
if err != nil {
return nil, err
}

pctx := jctx.PodSchedulingContext
pctx.ScheduledAtPriority = priority

// Try scheduling at evictedPriority. If this succeeds, no preemption is necessary.
pctx.NumExcludedNodesByReason = maps.Clone(numExcludedNodesByReason)
if node, err := nodeDb.selectNodeForPodAtPriority(txn, jctx, matchingNodeTypeIds, evictedPriority); err != nil {
Expand All @@ -704,7 +698,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithTxnAtPriority(
// Try scheduling at the job priority. If this fails, scheduling is impossible and we return.
// This is an optimisation to avoid looking for preemption targets for unschedulable jobs.
pctx.NumExcludedNodesByReason = maps.Clone(numExcludedNodesByReason)
if node, err := nodeDb.selectNodeForPodAtPriority(txn, jctx, matchingNodeTypeIds, req.Priority); err != nil {
if node, err := nodeDb.selectNodeForPodAtPriority(txn, jctx, matchingNodeTypeIds, pctx.ScheduledAtPriority); err != nil {
return nil, err
} else if err := assertPodSchedulingContextNode(pctx, node); err != nil {
return nil, err
Expand Down Expand Up @@ -759,7 +753,6 @@ func (nodeDb *NodeDb) selectNodeForJobWithUrgencyPreemption(
jctx *schedulercontext.JobSchedulingContext,
matchingNodeTypeIds []uint64,
) (*Node, error) {
req := jctx.PodRequirements
pctx := jctx.PodSchedulingContext
numExcludedNodesByReason := pctx.NumExcludedNodesByReason
for _, priority := range nodeDb.nodeDbPriorities {
Expand All @@ -768,7 +761,9 @@ func (nodeDb *NodeDb) selectNodeForJobWithUrgencyPreemption(
continue
}

if priority > req.Priority {
// Using pctx.ScheduledAtPriority instead of jctx.PodRequirements.Priority,
// since the pctx.ScheduledAtPriority may differ, e.g., in case of home-away scheduling.
if priority > pctx.ScheduledAtPriority {
break
}

Expand Down Expand Up @@ -923,13 +918,21 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
}
nodesById[nodeId] = node
evictedJobSchedulingContextsByNodeId[nodeId] = append(evictedJobSchedulingContextsByNodeId[nodeId], evictedJobSchedulingContext)
if priority := evictedJctx.PodRequirements.Priority; priority > maxPriority {

priority, ok := nodeDb.GetScheduledAtPriority(evictedJctx.JobId)
if !ok {
priority = evictedJctx.PodRequirements.Priority
}
if priority > maxPriority {
maxPriority = priority
}
matches, _, reason, err := JobRequirementsMet(
node.Taints,
node.Labels,
node.TotalResources,
// At this point, we've unbound the jobs running on the node.
// Hence, we should check if the job is schedulable at evictedPriority,
// since that indicates the job can be scheduled without causing further preemptions.
node.AllocatableByPriority[evictedPriority],
jctx,
)
Expand Down
13 changes: 11 additions & 2 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/armadacontext"
Expand Down Expand Up @@ -852,12 +853,20 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev
}

for _, job := range evictedJobs {
// Create a scheduling context for when re-scheduling this job.
// Mark as evicted and add a node selector to ensure the job is re-scheduled onto the node it was evicted from.
// Create a scheduling context for the attempt to re-schedule the job, and:
// 1. Mark the job as evicted. This ensures total scheduled resources is calculated correctly.
// 2. Add a node selector ensuring the job can only be re-scheduled onto the node it was evicted from.
// 3. Add tolerations for all taints on the node. This to ensure that:
// - Adding taints to a node doesn't cause jobs already running on the node to be preempted.
// - Jobs scheduled as away jobs have the necessary tolerations to be re-scheduled.
// TODO(albin): We can remove the checkOnlyDynamicRequirements flag in the nodeDb now that we've added the tolerations.
jctx := schedulercontext.JobSchedulingContextFromJob(evi.priorityClasses, job)
jctx.IsEvicted = true
jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, node.Id)
evictedJctxsByJobId[job.GetId()] = jctx
for _, taint := range node.Taints {
jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, v1.Toleration{Key: taint.Key, Value: taint.Value, Effect: taint.Effect})
}

nodeIdByJobId[job.GetId()] = node.Id
}
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,16 @@ func TestPreemptingQueueScheduler(t *testing.T) {
assert.True(t, ok)
assert.NotEmpty(t, nodeId)

node, err := nodeDb.GetNode(nodeId)
require.NoError(t, err)
assert.NotEmpty(t, node)

// Check that the job can actually go onto this node.
matches, reason, err := nodedb.StaticJobRequirementsMet(node.Taints, node.Labels, node.TotalResources, jctx)
require.NoError(t, err)
assert.Empty(t, reason)
assert.True(t, matches)

// Check that scheduled jobs are consistently assigned to the same node.
// (We don't allow moving jobs between nodes.)
if expectedNodeId, ok := nodeIdByJobId[job.GetId()]; ok {
Expand Down
10 changes: 10 additions & 0 deletions internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,16 @@ func TestQueueScheduler(t *testing.T) {
nodeId, ok := result.NodeIdByJobId[jctx.JobId]
assert.True(t, ok)
assert.NotEmpty(t, nodeId)

node, err := nodeDb.GetNode(nodeId)
require.NoError(t, err)
assert.NotEmpty(t, node)

// Check that the job can actually go onto this node.
matches, reason, err := nodedb.StaticJobRequirementsMet(node.Taints, node.Labels, node.TotalResources, jctx)
require.NoError(t, err)
assert.Empty(t, reason)
assert.True(t, matches)
}

// For jobs that could not be scheduled,
Expand Down
5 changes: 4 additions & 1 deletion internal/scheduler/schedulerobjects/schedulerobjects.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion internal/scheduler/schedulerobjects/schedulerobjects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ message PodRequirements {
repeated k8s.io.api.core.v1.Toleration tolerations = 3 [(gogoproto.nullable) = false];
// Kubernetes annotations. Included here since we use annotations with special meaning.
map<string, string> annotations = 7;
// Pod priority. Should be mapped from the priority class name of the submitted pod.
// Priority class priority of the pod as submitted. Should be mapped from the priority class name of the submitted pod.
//
// During scheduling, the priority stored on the podSchedulingContext should be used instead,
// since a pod may be scheduled at a priority different from the priority it was submitted with.
int32 priority = 4;
// One of Never, PreemptLowerPriority.
// Defaults to PreemptLowerPriority if unset.
Expand Down
Loading

0 comments on commit 0f09fc5

Please sign in to comment.