Skip to content

Commit

Permalink
Improve handling when node is running jobs from multiple pools (#4131)
Browse files Browse the repository at this point in the history
* Improve handling when node is running jobs from multiple pools

The scenario where a job is running jobs from multiple pools should only happen when a node has its pool changed
 - The jobs already running will have a pool that doesn't match the nodes current pool

Currently what happens is that the scheduler sees the node as empty and double schedules the node

This PR changes that, so we set any resource used by jobs in other pools as unallocatable, to prevent this double scheduling
 - This is a slightly more generic approach than the current approach where we just mark jobs from Away pools as unallocatable

I had to change when we calculate jobsByPool to calculate for running jobs of all pools

Signed-off-by: JamesMurkin <[email protected]>

* Update naming and comments

Signed-off-by: JamesMurkin <[email protected]>

* Rename pool

Signed-off-by: JamesMurkin <[email protected]>

* Fix typo

Signed-off-by: JamesMurkin <[email protected]>

* Improve comment

* Gofumpt

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jan 17, 2025
1 parent f146e3b commit 8db0d5e
Showing 1 changed file with 49 additions and 45 deletions.
94 changes: 49 additions & 45 deletions internal/scheduler/scheduling/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ type FairSchedulingAlgoContext struct {
Txn *jobdb.Txn
}

func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, pool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) {
func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, currentPool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) {
executors, err := l.executorRepository.GetExecutors(ctx)
if err != nil {
return nil, err
Expand All @@ -194,12 +194,12 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con

awayAllocationPools := []string{}
for _, otherPool := range l.schedulingConfig.Pools {
if slices.Contains(otherPool.AwayPools, pool.Name) {
if slices.Contains(otherPool.AwayPools, currentPool.Name) {
awayAllocationPools = append(awayAllocationPools, otherPool.Name)
}
}
allPools := []string{pool.Name}
allPools = append(allPools, pool.AwayPools...)
allPools := []string{currentPool.Name}
allPools = append(allPools, currentPool.AwayPools...)
allPools = append(allPools, awayAllocationPools...)

jobSchedulingInfo, err := calculateJobSchedulingInfo(ctx,
Expand All @@ -208,7 +208,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
func(_ *schedulerobjects.Executor) bool { return true }),
queueByName,
txn.GetAll(),
pool.Name,
currentPool.Name,
awayAllocationPools,
allPools)
if err != nil {
Expand Down Expand Up @@ -238,35 +238,41 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con
ctx.Error(errMes)
})

homeJobs := jobSchedulingInfo.jobsByPool[pool.Name]
awayJobs := []*jobdb.Job{}
currentPoolJobs := jobSchedulingInfo.jobsByPool[currentPool.Name]
otherPoolsJobs := []*jobdb.Job{}

for _, otherPool := range l.schedulingConfig.Pools {
if pool.Name == otherPool.Name {
for _, pool := range l.schedulingConfig.Pools {
if currentPool.Name == pool.Name {
continue
}
if slices.Contains(otherPool.AwayPools, pool.Name) {
homeJobs = append(homeJobs, jobSchedulingInfo.jobsByPool[otherPool.Name]...)
if slices.Contains(pool.AwayPools, currentPool.Name) {
// Jobs from away pools need to be considered in the current scheduling round, so should be added here
// This is so the jobs are available for eviction, if a home job needs to take their place
currentPoolJobs = append(currentPoolJobs, jobSchedulingInfo.jobsByPool[pool.Name]...)
} else {
// Jobs not used by the current pool belong to other pools we aren't currently considering
// Add them here, so their resource can made unallocatable in the nodeDb, preventing us scheduling over them
// The cases this is needed (a node has jobs from multiple pools is)
// - The pool of the node was changed, but still has jobs running from the pool it was previously in
// - A node running home jobs and cross-pool away jobs. In this case when scheduling the cross-pool away jobs
// we need to not schedule over resource used by the home jobs
otherPoolsJobs = append(otherPoolsJobs, jobSchedulingInfo.jobsByPool[pool.Name]...)
}
}

for _, awayPool := range pool.AwayPools {
awayJobs = append(awayJobs, jobSchedulingInfo.jobsByPool[awayPool]...)
}

nodePools := append(pool.AwayPools, pool.Name)
nodePools := append(currentPool.AwayPools, currentPool.Name)

nodeDb, err := l.constructNodeDb(homeJobs, awayJobs,
nodeDb, err := l.constructNodeDb(currentPoolJobs, otherPoolsJobs,
armadaslices.Filter(nodes, func(node *internaltypes.Node) bool { return slices.Contains(nodePools, node.GetPool()) }))
if err != nil {
return nil, err
}

totalResources := nodeDb.TotalKubernetesResources()
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name))
totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(currentPool.Name))

schedulingContext, err := l.constructSchedulingContext(
pool.Name,
currentPool.Name,
totalResources,
jobSchedulingInfo.demandByQueueAndPriorityClass,
jobSchedulingInfo.allocatedByQueueAndPriorityClass,
Expand All @@ -278,7 +284,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con

return &FairSchedulingAlgoContext{
queues: queueByName,
pool: pool.Name,
pool: currentPool.Name,
nodeDb: nodeDb,
schedulingContext: schedulingContext,
nodeIdByJobId: jobSchedulingInfo.nodeIdByJobId,
Expand Down Expand Up @@ -331,17 +337,6 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m
pools = []string{pool}
}

matches := false
for _, pool := range pools {
if slices.Contains(allPools, pool) {
matches = true
break
}
}
if !matches {
continue
}

if slices.Contains(pools, currentPool) {
queueResources, ok := demandByQueueAndPriorityClass[job.Queue()]
if !ok {
Expand Down Expand Up @@ -369,6 +364,21 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m
}

pool := job.LatestRun().Pool()
if _, present := jobsByPool[pool]; !present {
jobsByPool[pool] = []*jobdb.Job{}
}
jobsByPool[pool] = append(jobsByPool[pool], job)

matches := false
for _, pool := range pools {
if slices.Contains(allPools, pool) {
matches = true
break
}
}
if !matches {
continue
}

if _, isActive := activeExecutorsSet[executorId]; isActive {
if pool == currentPool {
Expand All @@ -387,10 +397,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m
awayAllocation[job.PriorityClassName()] = awayAllocation[job.PriorityClassName()].Add(job.AllResourceRequirements())
}
}
if _, present := jobsByPool[pool]; !present {
jobsByPool[pool] = []*jobdb.Job{}
}
jobsByPool[pool] = append(jobsByPool[pool], job)

jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job)
nodeIdByJobId[job.Id()] = nodeId
gangInfo, err := schedulercontext.GangInfoFromLegacySchedulerJob(job)
Expand Down Expand Up @@ -420,7 +427,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m
}, nil
}

func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) {
func (l *FairSchedulingAlgo) constructNodeDb(currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) {
nodeDb, err := nodedb.NewNodeDb(
l.schedulingConfig.PriorityClasses,
l.schedulingConfig.IndexedResources,
Expand All @@ -432,7 +439,7 @@ func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []*
if err != nil {
return nil, err
}
if err := l.populateNodeDb(nodeDb, homeJobs, awayJobs, nodes); err != nil {
if err := l.populateNodeDb(nodeDb, currentPoolJobs, otherPoolsJobs, nodes); err != nil {
return nil, err
}

Expand Down Expand Up @@ -590,15 +597,15 @@ func (l *FairSchedulingAlgo) SchedulePool(
}

// populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb.
func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) error {
func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) error {
txn := nodeDb.Txn(true)
defer txn.Abort()
nodesById := armadaslices.GroupByFuncUnique(
nodes,
func(node *internaltypes.Node) string { return node.GetId() },
)
jobsByNodeId := make(map[string][]*jobdb.Job, len(nodes))
for _, job := range homeJobs {
for _, job := range currentPoolJobs {
if job.InTerminalState() || !job.HasRuns() {
continue
}
Expand All @@ -612,20 +619,17 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*j
}
jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job)
}
for _, job := range awayJobs {
for _, job := range otherPoolsJobs {
if job.InTerminalState() || !job.HasRuns() {
continue
}
nodeId := job.LatestRun().NodeId()
node, ok := nodesById[nodeId]
if !ok {
logrus.Errorf(
"job %s assigned to node %s on executor %s, but no such node found",
job.Id(), nodeId, job.LatestRun().Executor(),
)
// Job is allocated to a node which isn't part of this pool, ignore it
continue
}

// Mark resource used by jobs of other pools as unallocatable so we don't double schedule this resource
markResourceUnallocatable(node.AllocatableByPriority, job.KubernetesResourceRequirements())
}

Expand Down

0 comments on commit 8db0d5e

Please sign in to comment.