Skip to content

Commit

Permalink
support preemption when the number of attachment volumes of a node re…
Browse files Browse the repository at this point in the history
…aches the upper limit

Signed-off-by: lili <[email protected]>
  • Loading branch information
Lily922 committed Nov 10, 2023
1 parent 17f266b commit 91f7670
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 41 deletions.
1 change: 0 additions & 1 deletion pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (r *Resource) Add(rr *Resource) *Resource {
}
r.ScalarResources[rName] += rQuant
}

return r
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type SchedulerCache struct {
NamespaceCollection map[string]*schedulingapi.NamespaceCollection

errTasks workqueue.RateLimitingInterface
errNodes workqueue.RateLimitingInterface
DeletedJobs workqueue.RateLimitingInterface

informerFactory informers.SharedInformerFactory
Expand Down Expand Up @@ -435,6 +436,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo),
PriorityClasses: make(map[string]*schedulingv1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
errNodes: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeClient: kubeClient,
vcClient: vcClient,
Expand Down Expand Up @@ -703,6 +705,11 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
// Re-sync error tasks.
go wait.Until(sc.processResyncTask, 0, stopCh)

go func() {
sc.WaitForCacheSync(stopCh)
wait.Until(sc.processResyncNode, 0, stopCh)
}()

// Cleanup jobs.
go wait.Until(sc.processCleanupJob, 0, stopCh)

Expand Down Expand Up @@ -990,6 +997,34 @@ func (sc *SchedulerCache) processResyncTask() {
}
}

func (sc *SchedulerCache) resyncNode(nodeName string) {
sc.errNodes.AddRateLimited(nodeName)
}

func (sc *SchedulerCache) processResyncNode() {
obj, shutdown := sc.errNodes.Get()
if shutdown {
return
}
defer sc.errNodes.Done(obj)

nodeName, ok := obj.(string)
if !ok {
klog.Errorf("failed to convert %v to string", obj)
return
}

err := sc.syncNode(nodeName)
if err == nil {
sc.errNodes.Forget(nodeName)
return
}

klog.Errorf("Failed to sync node <%s>, retry it.", nodeName)
sc.resyncNode(nodeName)
return
}

// AddBindTask add task to be bind to a cache which consumes by go runtime
func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name)
Expand Down
Loading

0 comments on commit 91f7670

Please sign in to comment.