Skip to content

Commit

Permalink
add node event
Browse files Browse the repository at this point in the history
Signed-off-by: Monokaix <[email protected]>
  • Loading branch information
Monokaix committed Jan 11, 2025
1 parent 837d85f commit 4242560
Show file tree
Hide file tree
Showing 10 changed files with 616 additions and 102 deletions.
35 changes: 35 additions & 0 deletions pkg/scheduler/api/hyper_node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ func (hni *HyperNodesInfo) HyperNodes() map[string]*HyperNodeInfo {
return hni.hyperNodes
}

// HyperNode returns a hyperNode by name.
func (hni *HyperNodesInfo) HyperNode(name string) *topologyv1alpha1.HyperNode {
hn := hni.hyperNodes[name]
if hn == nil {
return nil
}
return hn.HyperNode
}

// HyperNodesListByTier returns a deep copy of the map that groups HyperNode names by their tier.
// This ensures that the returned map is independent of the original, preventing unintended modifications.
func (hni *HyperNodesInfo) HyperNodesListByTier() map[int]sets.Set[string] {
Expand Down Expand Up @@ -314,6 +323,32 @@ func (hni *HyperNodesInfo) getChildren(hyperNodeName string) sets.Set[string] {
return children
}

// GetRegexSelectorLeafHyperNodes returns leaf hyperNodes whose member's selector is regex match.
func (hni *HyperNodesInfo) GetRegexSelectorLeafHyperNodes() sets.Set[string] {
leaf := sets.New[string]()
for name, hnInfo := range hni.hyperNodes {
if hnInfo == nil || hnInfo.HyperNode == nil {
continue
}

isLeaf := true
hasRegexMatch := false
for _, member := range hnInfo.HyperNode.Spec.Members {
if member.Type == topologyv1alpha1.MemberTypeHyperNode {
isLeaf = false
break
}
if member.Selector.RegexMatch != nil {
hasRegexMatch = true
}
}
if isLeaf && hasRegexMatch {
leaf.Insert(name)
}
}
return leaf
}

// setParent sets the parent of a HyperNode member.
func (hni *HyperNodesInfo) setParent(member, parent string) error {
hn, ok := hni.hyperNodes[member]
Expand Down
105 changes: 93 additions & 12 deletions pkg/scheduler/api/hyper_node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@ import (

func TestHyperNodesInfo_UpdateHyperNode_Normal(t *testing.T) {
selector := "exact"
s0 := BuildHyperNode("s0", 1, topologyv1alpha1.MemberTypeNode, []string{"node-0", "node-1"}, selector)
s1 := BuildHyperNode("s1", 1, topologyv1alpha1.MemberTypeNode, []string{"node-2", "node-3"}, selector)
s2 := BuildHyperNode("s2", 2, topologyv1alpha1.MemberTypeHyperNode, []string{"s1", "s0"}, selector)
s0 := BuildHyperNode("s0", 1, []MemberConfig{
{"node-0", topologyv1alpha1.MemberTypeNode, selector},
{"node-1", topologyv1alpha1.MemberTypeNode, selector},
})
s1 := BuildHyperNode("s1", 1, []MemberConfig{
{"node-2", topologyv1alpha1.MemberTypeNode, selector},
{"node-3", topologyv1alpha1.MemberTypeNode, selector},
})
s2 := BuildHyperNode("s2", 2, []MemberConfig{
{"s1", topologyv1alpha1.MemberTypeHyperNode, selector},
{"s0", topologyv1alpha1.MemberTypeHyperNode, selector},
})
initialHyperNodes := []*topologyv1alpha1.HyperNode{s2, s0, s1}

tests := []struct {
Expand Down Expand Up @@ -86,11 +95,21 @@ func TestHyperNodesInfo_UpdateHyperNode_Normal(t *testing.T) {

func TestHyperNodesInfo_UpdateHyperNode_WithCycle(t *testing.T) {
selector := "exact"
s0 := BuildHyperNode("s0", 1, topologyv1alpha1.MemberTypeNode, []string{"node-0", "node-1"}, selector)
s1 := BuildHyperNode("s1", 2, topologyv1alpha1.MemberTypeHyperNode, []string{"s0", "s3"}, selector)
s11 := BuildHyperNode("s1", 2, topologyv1alpha1.MemberTypeHyperNode, []string{"s0"}, selector)
s2 := BuildHyperNode("s2", 3, topologyv1alpha1.MemberTypeHyperNode, []string{"s1"}, selector)
s3 := BuildHyperNode("s3", 4, topologyv1alpha1.MemberTypeHyperNode, []string{"s2"}, selector)
s0 := BuildHyperNode("s0", 1, []MemberConfig{
{"node-0", topologyv1alpha1.MemberTypeNode, selector},
{"node-1", topologyv1alpha1.MemberTypeNode, selector}})
s1 := BuildHyperNode("s1", 2, []MemberConfig{
{"s0", topologyv1alpha1.MemberTypeHyperNode, selector},
{"s3", topologyv1alpha1.MemberTypeHyperNode, selector}})
s11 := BuildHyperNode("s1", 2, []MemberConfig{
{"s0", topologyv1alpha1.MemberTypeHyperNode, selector},
})
s2 := BuildHyperNode("s2", 3, []MemberConfig{
{"s1", topologyv1alpha1.MemberTypeHyperNode, selector},
})
s3 := BuildHyperNode("s3", 4, []MemberConfig{
{"s2", topologyv1alpha1.MemberTypeHyperNode, selector},
})
initialHyperNodes := []*topologyv1alpha1.HyperNode{s2, s0, s1, s3}

tests := []struct {
Expand Down Expand Up @@ -160,10 +179,19 @@ func TestHyperNodesInfo_UpdateHyperNode_WithCycle(t *testing.T) {

func TestHyperNodesInfo_UpdateHyperNode_MultipleParents(t *testing.T) {
selector := "exact"
s0 := BuildHyperNode("s0", 1, topologyv1alpha1.MemberTypeNode, []string{"node-0", "node-1"}, selector)
s1 := BuildHyperNode("s1", 2, topologyv1alpha1.MemberTypeHyperNode, []string{"s0"}, selector)
s2 := BuildHyperNode("s2", 3, topologyv1alpha1.MemberTypeHyperNode, []string{"s0"}, selector)
s22 := BuildHyperNode("s2", 3, topologyv1alpha1.MemberTypeHyperNode, []string{"s1"}, selector)
s0 := BuildHyperNode("s0", 1, []MemberConfig{
{Name: "node-0", Type: topologyv1alpha1.MemberTypeNode, Selector: selector},
{Name: "node-1", Type: topologyv1alpha1.MemberTypeNode, Selector: selector},
})
s1 := BuildHyperNode("s1", 2, []MemberConfig{
{Name: "s0", Type: topologyv1alpha1.MemberTypeHyperNode, Selector: selector},
})
s2 := BuildHyperNode("s2", 3, []MemberConfig{
{Name: "s0", Type: topologyv1alpha1.MemberTypeHyperNode, Selector: selector},
})
s22 := BuildHyperNode("s2", 3, []MemberConfig{
{Name: "s1", Type: topologyv1alpha1.MemberTypeHyperNode, Selector: selector},
})
initialHyperNodes := []*topologyv1alpha1.HyperNode{s0, s1, s2}

tests := []struct {
Expand Down Expand Up @@ -226,3 +254,56 @@ func TestHyperNodesInfo_UpdateHyperNode_MultipleParents(t *testing.T) {
})
}
}

func TestHyperNodesInfo_GetRegexSelectorLeafHyperNodes(t *testing.T) {
exactSelector := "exact"
regexSelector := "regex"
s0 := BuildHyperNode("s0", 1, []MemberConfig{
{"node-1", topologyv1alpha1.MemberTypeNode, exactSelector},
{"node-1", topologyv1alpha1.MemberTypeNode, exactSelector},
})
s1 := BuildHyperNode("s1", 1, []MemberConfig{
{"node-2", topologyv1alpha1.MemberTypeNode, regexSelector},
{"node-3", topologyv1alpha1.MemberTypeNode, regexSelector},
})
s2 := BuildHyperNode("s2", 1, []MemberConfig{
{"node-4", topologyv1alpha1.MemberTypeNode, regexSelector},
{"node-5", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
})
s3 := BuildHyperNode("s3", 1, []MemberConfig{
{"node-6", topologyv1alpha1.MemberTypeNode, regexSelector},
{"node-7", topologyv1alpha1.MemberTypeNode, exactSelector},
})
s4 := BuildHyperNode("s4", 2, []MemberConfig{
{"s0", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
{"s1", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
})
s5 := BuildHyperNode("s5", 2, []MemberConfig{
{"s2", topologyv1alpha1.MemberTypeHyperNode, regexSelector},
{"s3", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
})
s6 := BuildHyperNode("s6", 3, []MemberConfig{
{"s4", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
{"s5", topologyv1alpha1.MemberTypeHyperNode, exactSelector},
})
tests := []struct {
name string
hyperNods []*topologyv1alpha1.HyperNode
want sets.Set[string]
}{
{
name: "get all leaf hyperNodes correctly",
hyperNods: []*topologyv1alpha1.HyperNode{s0, s1, s2, s3, s4, s5, s6},
want: sets.New[string]("s1", "s3"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hni := NewHyperNodesInfo(nil)
for _, hn := range tt.hyperNods {
hni.hyperNodes[hn.Name] = NewHyperNodeInfo(hn)
}
assert.Equalf(t, tt.want, hni.GetRegexSelectorLeafHyperNodes(), "GetRegexSelcectorLeafHyperNodes()")
})
}
}
30 changes: 19 additions & 11 deletions pkg/scheduler/api/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,13 @@ func BuildPodgroup(name, ns string, minMember int32, minResource v1.ResourceList
}
}

// BuildHyperNode builds a hyperNode.
func BuildHyperNode(name string, tier int, memberType topologyv1alpha1.MemberType, members []string, selector string) *topologyv1alpha1.HyperNode {
type MemberConfig struct {
Name string
Type topologyv1alpha1.MemberType
Selector string
}

func BuildHyperNode(name string, tier int, members []MemberConfig) *topologyv1alpha1.HyperNode {
hn := &topologyv1alpha1.HyperNode{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -147,17 +152,20 @@ func BuildHyperNode(name string, tier int, memberType topologyv1alpha1.MemberTyp
}

for i, member := range members {
hn.Spec.Members[i] = topologyv1alpha1.MemberSpec{
Type: memberType,
}
if selector == "exact" {
hn.Spec.Members[i].Selector.ExactMatch = &topologyv1alpha1.ExactMatch{Name: member}
continue
memberSpec := topologyv1alpha1.MemberSpec{
Type: member.Type,
}
if selector == "regex" {
hn.Spec.Members[i].Selector.RegexMatch = &topologyv1alpha1.RegexMatch{Pattern: member}
continue
switch member.Selector {
case "exact":
memberSpec.Selector.ExactMatch = &topologyv1alpha1.ExactMatch{Name: member.Name}
case "regex":
memberSpec.Selector.RegexMatch = &topologyv1alpha1.RegexMatch{Pattern: member.Name}
default:
return nil
}

hn.Spec.Members[i] = memberSpec
}

return hn
}
45 changes: 40 additions & 5 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1"
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
topologyinformerv1alpha1 "volcano.sh/apis/pkg/client/informers/externalversions/topology/v1alpha1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
Expand Down Expand Up @@ -107,6 +108,7 @@ type SchedulerCache struct {

podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
hyperNodeInformer topologyinformerv1alpha1.HyperNodeInformer
podGroupInformerV1beta1 vcinformerv1.PodGroupInformer
queueInformerV1beta1 vcinformerv1.QueueInformer
pvInformer infov1.PersistentVolumeInformer
Expand Down Expand Up @@ -139,9 +141,10 @@ type SchedulerCache struct {

NamespaceCollection map[string]*schedulingapi.NamespaceCollection

errTasks workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
DeletedJobs workqueue.RateLimitingInterface
errTasks workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
DeletedJobs workqueue.RateLimitingInterface
hyperNodesQueue workqueue.TypedRateLimitingInterface[string]

informerFactory informers.SharedInformerFactory
vcInformerFactory vcinformer.SharedInformerFactory
Expand Down Expand Up @@ -572,6 +575,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
errTasks: workqueue.NewRateLimitingQueue(errTaskRateLimiter),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
hyperNodesQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
kubeClient: kubeClient,
vcClient: vcClient,
restConfig: config,
Expand Down Expand Up @@ -798,8 +802,8 @@ func (sc *SchedulerCache) addEventHandler() {
})
}

hyperNodeInformer := sc.vcInformerFactory.Topology().V1alpha1().HyperNodes()
hyperNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
sc.hyperNodeInformer = sc.vcInformerFactory.Topology().V1alpha1().HyperNodes()
sc.hyperNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddHyperNode,
UpdateFunc: sc.UpdateHyperNode,
DeleteFunc: sc.DeleteHyperNode,
Expand All @@ -815,6 +819,9 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go wait.Until(sc.runNodeWorker, 0, stopCh)
}

// Sync hyperNode.
go wait.Until(sc.processSyncHyperNode, 0, stopCh)

// Re-sync error tasks.
go wait.Until(sc.processResyncTask, 0, stopCh)

Expand Down Expand Up @@ -1235,6 +1242,34 @@ func (sc *SchedulerCache) processSyncNode() bool {
return true
}

func (sc *SchedulerCache) processSyncHyperNode() {
worker := func() bool {
name, shutdown := sc.hyperNodesQueue.Get()
if shutdown {
return false
}
defer sc.hyperNodesQueue.Done(name)

klog.V(5).Infof("started sync hyperNode %s", name)
err := sc.SyncHyperNode(name)
if err == nil {
sc.hyperNodesQueue.Forget(name)
return true
}

klog.ErrorS(err, "Failed to sync hyperNode, retry it.", "name", name)
sc.hyperNodesQueue.AddRateLimited(name)
return true
}
for worker() {
}
}

func (sc *SchedulerCache) runHyperNodeWorker() {
for sc.processSyncNode() {
}
}

// AddBindTask add task to be bind to a cache which consumes by go runtime
func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name)
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/cache/cache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func newMockSchedulerCache(schedulerName string) *SchedulerCache {
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
hyperNodesQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
kubeClient: fake.NewSimpleClientset(),
vcClient: fakevcClient.NewSimpleClientset(),
restConfig: nil,
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func encodeCache(file *os.File, v ...interface{}) error {
// dumpAll prints all information to log
func (d *Dumper) dumpAll() {
snapshot := d.Cache.Snapshot()
klog.Info("Dump of nodes info in scheduler cache")
klog.Info("Dump of initialNodes info in scheduler cache")
for _, nodeInfo := range snapshot.Nodes {
klog.Info(d.printNodeInfo(nodeInfo))
}
Expand Down
Loading

0 comments on commit 4242560

Please sign in to comment.