Skip to content

Commit

Permalink
Implement an elastic scheduler to create Pods. (#1456)
Browse files Browse the repository at this point in the history
* Use APIs of k8s client-go to create Pods.

* Replace context.todo with backgroud.

* Create a routine to launch Pods.
  • Loading branch information
workingloong authored Feb 1, 2025
1 parent 2ce0e02 commit 190984c
Show file tree
Hide file tree
Showing 23 changed files with 758 additions and 18 deletions.
10 changes: 4 additions & 6 deletions go/master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ import (
"flag"
"strconv"

logger "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

master "github.com/intelligent-machine-learning/dlrover/go/master/pkg"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/server"
logger "github.com/sirupsen/logrus"
)

func main() {
Expand All @@ -41,9 +39,9 @@ func main() {

// Listen and serve on defined port
logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port)
var k8sClient *kubernetes.K8sClient
var k8sClient *kubeutils.K8sClient
if k8sScheduling {
k8sClient = kubernetes.NewK8sClient(namespace, jobName)
k8sClient = kubeutils.NewK8sClient("")
}
master := master.NewJobMaster(namespace, jobName, k8sClient)
master.Run()
Expand Down
26 changes: 26 additions & 0 deletions go/master/master_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestMaster(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Master Suite")
}
26 changes: 26 additions & 0 deletions go/master/pkg/batchscheduler/batchscheduler_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestBatchscheduler(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Batchscheduler Suite")
}
85 changes: 85 additions & 0 deletions go/master/pkg/batchscheduler/elastic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
"time"

"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
logger "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

// ElasticScheduler launches pods without waiting for all resouces of pod are ready
type ElasticScheduler struct {
k8sClient *kubeutils.K8sClient
toCreatePods *common.Queue
}

// NewElasticScheduler creates an elastic scheduler.
func NewElasticScheduler(k8sClient *kubeutils.K8sClient) *ElasticScheduler {
return &ElasticScheduler{
k8sClient: k8sClient,
toCreatePods: common.NewQueue(),
}
}

// Start starts a routine to launch Pods.
func (scheduler *ElasticScheduler) Start(jobContext *common.JobContext) {
go scheduler.createPodLoop(jobContext.NameSpace)
}

// DoScheduling creates/updates/deletes pods
func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) {
for replicaType, spec := range plan.ReplicaSpecs {
for i := int32(0); i < spec.Replicas; i++ {
replicaConfig := &kubeutils.ReplicaConfig{
Type: string(replicaType),
ID: i,
Number: spec.Replicas,
Rank: i,
}

podConfig := &kubeutils.PodConfig{
Replica: replicaConfig,
TemplateSpec: spec.Template.DeepCopy(),
}
pod := kubeutils.BuildPod(jobContext, podConfig)
scheduler.toCreatePods.PushBack(pod)
}
}
}

func (scheduler *ElasticScheduler) createPodLoop(namespace string) {
for {
for scheduler.toCreatePods.Len() > 0 {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
err := scheduler.k8sClient.CreatePod(namespace, pod)
if errors.IsAlreadyExists(err) {
logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name)
} else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
// Retry to create pod due to timeout.
scheduler.toCreatePods.PushFront(pod)
time.Sleep(5 * time.Second)
} else {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
panic(err.Error())
}
}
time.Sleep(1 * time.Second)
}
}
62 changes: 62 additions & 0 deletions go/master/pkg/batchscheduler/elastic_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
"fmt"

commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

var _ = Describe("Elastic", func() {
It("Do scheduling to launch pods.", func() {
jobContext := &common.JobContext{
NameSpace: "dlrover",
Name: "train-demo",
MasterHost: "127.0.0.1",
MasterPort: 12345,
}

container := corev1.Container{
Name: "main",
Image: "python:3.12.8",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"/bin/bash", "-c", "echo 0"},
}
replicas := make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec)
replicas["worker"] = &commonv1.ReplicaSpec{
Replicas: 3,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{container},
RestartPolicy: corev1.RestartPolicyNever,
},
},
}
schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas}
scheduler := NewElasticScheduler(nil)
scheduler.DoScheduling(jobContext, schedulingPlan)
Expect(scheduler.toCreatePods.Len()).To(Equal(3))
for i := 0; i < 3; i++ {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
expectPodName := fmt.Sprintf("train-demo-worker-%d", i)
Expect(pod.ObjectMeta.Name).To(Equal(expectPodName))
}
})
})
14 changes: 14 additions & 0 deletions go/master/pkg/batchscheduler/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler
40 changes: 40 additions & 0 deletions go/master/pkg/batchscheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
)

// BatchScheduler creates/updates/deletes the batch pods of an elastic job.
type BatchScheduler interface {
DoScheduling(*SchedulingPlan)
}

// SchedulingPlan is the scheduling plan to notify the scheduler CURD pods.
type SchedulingPlan struct {
// ReplicaSpecs is a map which contains the replica specification to create Pods.
ReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec

// CreatedPods are Pods to be created.
CreatedPods []*kubeutils.PodConfig

// RemovedPods are Pods to be removed
RemovedPods []*kubeutils.PodConfig

// OwnerJob specifies a job to scale.
OwnerJob *elasticjob.ElasticJob
}
14 changes: 14 additions & 0 deletions go/master/pkg/batchscheduler/volcano.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler
26 changes: 26 additions & 0 deletions go/master/pkg/common/common_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestCommon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Common Suite")
}
26 changes: 26 additions & 0 deletions go/master/pkg/common/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

// JobContext stores the elastic job context.
type JobContext struct {
// Namespace is the kubernetes namespace where the job runs.
NameSpace string
// Name is the name of an elastic job.
Name string
// MasterHost is the host of master service.
MasterHost string
// MasterPort is the host of master port.
MasterPort int32
}
Loading

0 comments on commit 190984c

Please sign in to comment.