From 190984ceaefcbe96f77861813667c7f42a014f56 Mon Sep 17 00:00:00 2001 From: Qinlong Wang Date: Sat, 1 Feb 2025 20:47:47 +0800 Subject: [PATCH] Implement an elastic scheduler to create Pods. (#1456) * Use APIs of k8s client-go to create Pods. * Replace context.todo with backgroud. * Create a routine to launch Pods. --- go/master/main.go | 10 +- go/master/master_suite_test.go | 26 +++ .../batchscheduler_suite_test.go | 26 +++ go/master/pkg/batchscheduler/elastic.go | 85 ++++++++++ .../batchscheduler/elastic_internal_test.go | 62 +++++++ go/master/pkg/batchscheduler/plan.go | 14 ++ go/master/pkg/batchscheduler/scheduler.go | 40 +++++ go/master/pkg/batchscheduler/volcano.go | 14 ++ go/master/pkg/common/common_suite_test.go | 26 +++ go/master/pkg/common/context.go | 26 +++ go/master/pkg/common/queue.go | 74 +++++++++ go/master/pkg/common/queue_test.go | 36 ++++ go/master/pkg/jobmanager/manager.go | 18 ++ go/master/pkg/jobmanager/pytorch.go | 14 ++ go/master/pkg/jobmanager/ray.go | 14 ++ go/master/pkg/jobmanager/tensorflow.go | 14 ++ .../pkg/{kubernetes => kubeutils}/client.go | 22 ++- .../{kubernetes => kubeutils}/elasticjob.go | 2 +- .../elasticjob_internal_test.go | 2 +- .../kubernetes_suite_test.go | 2 +- go/master/pkg/kubeutils/pod.go | 157 ++++++++++++++++++ go/master/pkg/kubeutils/pod_internal_test.go | 84 ++++++++++ go/master/pkg/master.go | 8 +- 23 files changed, 758 insertions(+), 18 deletions(-) create mode 100644 go/master/master_suite_test.go create mode 100644 go/master/pkg/batchscheduler/batchscheduler_suite_test.go create mode 100644 go/master/pkg/batchscheduler/elastic.go create mode 100644 go/master/pkg/batchscheduler/elastic_internal_test.go create mode 100644 go/master/pkg/batchscheduler/plan.go create mode 100644 go/master/pkg/batchscheduler/scheduler.go create mode 100644 go/master/pkg/batchscheduler/volcano.go create mode 100644 go/master/pkg/common/common_suite_test.go create mode 100644 go/master/pkg/common/context.go create mode 100644 go/master/pkg/common/queue.go create mode 100644 go/master/pkg/common/queue_test.go create mode 100644 go/master/pkg/jobmanager/manager.go create mode 100644 go/master/pkg/jobmanager/pytorch.go create mode 100644 go/master/pkg/jobmanager/ray.go create mode 100644 go/master/pkg/jobmanager/tensorflow.go rename go/master/pkg/{kubernetes => kubeutils}/client.go (85%) rename go/master/pkg/{kubernetes => kubeutils}/elasticjob.go (98%) rename go/master/pkg/{kubernetes => kubeutils}/elasticjob_internal_test.go (98%) rename go/master/pkg/{kubernetes => kubeutils}/kubernetes_suite_test.go (97%) create mode 100644 go/master/pkg/kubeutils/pod.go create mode 100644 go/master/pkg/kubeutils/pod_internal_test.go diff --git a/go/master/main.go b/go/master/main.go index 751b43368..0f07295c0 100644 --- a/go/master/main.go +++ b/go/master/main.go @@ -17,12 +17,10 @@ import ( "flag" "strconv" - logger "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - master "github.com/intelligent-machine-learning/dlrover/go/master/pkg" - "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/server" + logger "github.com/sirupsen/logrus" ) func main() { @@ -41,9 +39,9 @@ func main() { // Listen and serve on defined port logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port) - var k8sClient *kubernetes.K8sClient + var k8sClient *kubeutils.K8sClient if k8sScheduling { - k8sClient = kubernetes.NewK8sClient(namespace, jobName) + k8sClient = kubeutils.NewK8sClient("") } master := master.NewJobMaster(namespace, jobName, k8sClient) master.Run() diff --git a/go/master/master_suite_test.go b/go/master/master_suite_test.go new file mode 100644 index 000000000..2046daa0d --- /dev/null +++ b/go/master/master_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMaster(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Master Suite") +} diff --git a/go/master/pkg/batchscheduler/batchscheduler_suite_test.go b/go/master/pkg/batchscheduler/batchscheduler_suite_test.go new file mode 100644 index 000000000..ac73d66e7 --- /dev/null +++ b/go/master/pkg/batchscheduler/batchscheduler_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBatchscheduler(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Batchscheduler Suite") +} diff --git a/go/master/pkg/batchscheduler/elastic.go b/go/master/pkg/batchscheduler/elastic.go new file mode 100644 index 000000000..6101b78e5 --- /dev/null +++ b/go/master/pkg/batchscheduler/elastic.go @@ -0,0 +1,85 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + "time" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" + logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" +) + +// ElasticScheduler launches pods without waiting for all resouces of pod are ready +type ElasticScheduler struct { + k8sClient *kubeutils.K8sClient + toCreatePods *common.Queue +} + +// NewElasticScheduler creates an elastic scheduler. +func NewElasticScheduler(k8sClient *kubeutils.K8sClient) *ElasticScheduler { + return &ElasticScheduler{ + k8sClient: k8sClient, + toCreatePods: common.NewQueue(), + } +} + +// Start starts a routine to launch Pods. +func (scheduler *ElasticScheduler) Start(jobContext *common.JobContext) { + go scheduler.createPodLoop(jobContext.NameSpace) +} + +// DoScheduling creates/updates/deletes pods +func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) { + for replicaType, spec := range plan.ReplicaSpecs { + for i := int32(0); i < spec.Replicas; i++ { + replicaConfig := &kubeutils.ReplicaConfig{ + Type: string(replicaType), + ID: i, + Number: spec.Replicas, + Rank: i, + } + + podConfig := &kubeutils.PodConfig{ + Replica: replicaConfig, + TemplateSpec: spec.Template.DeepCopy(), + } + pod := kubeutils.BuildPod(jobContext, podConfig) + scheduler.toCreatePods.PushBack(pod) + } + } +} + +func (scheduler *ElasticScheduler) createPodLoop(namespace string) { + for { + for scheduler.toCreatePods.Len() > 0 { + pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) + err := scheduler.k8sClient.CreatePod(namespace, pod) + if errors.IsAlreadyExists(err) { + logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name) + } else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + // Retry to create pod due to timeout. + scheduler.toCreatePods.PushFront(pod) + time.Sleep(5 * time.Second) + } else { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + panic(err.Error()) + } + } + time.Sleep(1 * time.Second) + } +} diff --git a/go/master/pkg/batchscheduler/elastic_internal_test.go b/go/master/pkg/batchscheduler/elastic_internal_test.go new file mode 100644 index 000000000..fe66f4ccb --- /dev/null +++ b/go/master/pkg/batchscheduler/elastic_internal_test.go @@ -0,0 +1,62 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + "fmt" + + commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("Elastic", func() { + It("Do scheduling to launch pods.", func() { + jobContext := &common.JobContext{ + NameSpace: "dlrover", + Name: "train-demo", + MasterHost: "127.0.0.1", + MasterPort: 12345, + } + + container := corev1.Container{ + Name: "main", + Image: "python:3.12.8", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/bash", "-c", "echo 0"}, + } + replicas := make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec) + replicas["worker"] = &commonv1.ReplicaSpec{ + Replicas: 3, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{container}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + } + schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas} + scheduler := NewElasticScheduler(nil) + scheduler.DoScheduling(jobContext, schedulingPlan) + Expect(scheduler.toCreatePods.Len()).To(Equal(3)) + for i := 0; i < 3; i++ { + pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) + expectPodName := fmt.Sprintf("train-demo-worker-%d", i) + Expect(pod.ObjectMeta.Name).To(Equal(expectPodName)) + } + }) +}) diff --git a/go/master/pkg/batchscheduler/plan.go b/go/master/pkg/batchscheduler/plan.go new file mode 100644 index 000000000..87143e224 --- /dev/null +++ b/go/master/pkg/batchscheduler/plan.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler diff --git a/go/master/pkg/batchscheduler/scheduler.go b/go/master/pkg/batchscheduler/scheduler.go new file mode 100644 index 000000000..cc4a9cf1b --- /dev/null +++ b/go/master/pkg/batchscheduler/scheduler.go @@ -0,0 +1,40 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" +) + +// BatchScheduler creates/updates/deletes the batch pods of an elastic job. +type BatchScheduler interface { + DoScheduling(*SchedulingPlan) +} + +// SchedulingPlan is the scheduling plan to notify the scheduler CURD pods. +type SchedulingPlan struct { + // ReplicaSpecs is a map which contains the replica specification to create Pods. + ReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec + + // CreatedPods are Pods to be created. + CreatedPods []*kubeutils.PodConfig + + // RemovedPods are Pods to be removed + RemovedPods []*kubeutils.PodConfig + + // OwnerJob specifies a job to scale. + OwnerJob *elasticjob.ElasticJob +} diff --git a/go/master/pkg/batchscheduler/volcano.go b/go/master/pkg/batchscheduler/volcano.go new file mode 100644 index 000000000..87143e224 --- /dev/null +++ b/go/master/pkg/batchscheduler/volcano.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler diff --git a/go/master/pkg/common/common_suite_test.go b/go/master/pkg/common/common_suite_test.go new file mode 100644 index 000000000..04d1cee41 --- /dev/null +++ b/go/master/pkg/common/common_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCommon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Common Suite") +} diff --git a/go/master/pkg/common/context.go b/go/master/pkg/common/context.go new file mode 100644 index 000000000..80a083422 --- /dev/null +++ b/go/master/pkg/common/context.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +// JobContext stores the elastic job context. +type JobContext struct { + // Namespace is the kubernetes namespace where the job runs. + NameSpace string + // Name is the name of an elastic job. + Name string + // MasterHost is the host of master service. + MasterHost string + // MasterPort is the host of master port. + MasterPort int32 +} diff --git a/go/master/pkg/common/queue.go b/go/master/pkg/common/queue.go new file mode 100644 index 000000000..706271297 --- /dev/null +++ b/go/master/pkg/common/queue.go @@ -0,0 +1,74 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "container/list" + "sync" +) + +// Queue is a thread-safe queue +type Queue struct { + lock sync.Mutex + data *list.List +} + +// NewQueue creates a Queue instance. +func NewQueue() *Queue { + q := new(Queue) + q.data = list.New() + q.lock = sync.Mutex{} + return q +} + +// PushFront pushes an element at the head of the queue. +func (q *Queue) PushFront(v interface{}) { + defer q.lock.Unlock() + q.lock.Lock() + q.data.PushFront(v) +} + +// PushBack pushes an element at the back of the queue. +func (q *Queue) PushBack(v interface{}) { + defer q.lock.Unlock() + q.lock.Lock() + q.data.PushBack(v) +} + +// PopFront gets the front element and removes it from the queue. +func (q *Queue) PopFront() interface{} { + defer q.lock.Unlock() + q.lock.Lock() + iter := q.data.Front() + v := iter.Value + q.data.Remove(iter) + return v +} + +// PopBack gets the back element and removes it from the queue. +func (q *Queue) PopBack() interface{} { + defer q.lock.Unlock() + q.lock.Lock() + iter := q.data.Back() + v := iter.Value + q.data.Remove(iter) + return v +} + +// Len gets the number of elements in the queue. +func (q *Queue) Len() int { + defer q.lock.Unlock() + q.lock.Lock() + return q.data.Len() +} diff --git a/go/master/pkg/common/queue_test.go b/go/master/pkg/common/queue_test.go new file mode 100644 index 000000000..70c8c3b5f --- /dev/null +++ b/go/master/pkg/common/queue_test.go @@ -0,0 +1,36 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + +var _ = Describe("Queue", func() { + It("Test Queue", func() { + queue := common.NewQueue() + queue.PushBack(2) + queue.PushFront(1) + queue.PushBack(3) + Expect(queue.Len()).To(Equal(3)) + front := queue.PopFront().(int) + Expect(front).To(Equal(1)) + back := queue.PopBack().(int) + Expect(back).To(Equal(3)) + }) + +}) diff --git a/go/master/pkg/jobmanager/manager.go b/go/master/pkg/jobmanager/manager.go new file mode 100644 index 000000000..86a61d2a3 --- /dev/null +++ b/go/master/pkg/jobmanager/manager.go @@ -0,0 +1,18 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager + +// JobManager is the interface to manager job lifecycle. +type JobManager interface { +} diff --git a/go/master/pkg/jobmanager/pytorch.go b/go/master/pkg/jobmanager/pytorch.go new file mode 100644 index 000000000..69e790164 --- /dev/null +++ b/go/master/pkg/jobmanager/pytorch.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager diff --git a/go/master/pkg/jobmanager/ray.go b/go/master/pkg/jobmanager/ray.go new file mode 100644 index 000000000..69e790164 --- /dev/null +++ b/go/master/pkg/jobmanager/ray.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager diff --git a/go/master/pkg/jobmanager/tensorflow.go b/go/master/pkg/jobmanager/tensorflow.go new file mode 100644 index 000000000..69e790164 --- /dev/null +++ b/go/master/pkg/jobmanager/tensorflow.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager diff --git a/go/master/pkg/kubernetes/client.go b/go/master/pkg/kubeutils/client.go similarity index 85% rename from go/master/pkg/kubernetes/client.go rename to go/master/pkg/kubeutils/client.go index d943eccd0..a5195474d 100644 --- a/go/master/pkg/kubernetes/client.go +++ b/go/master/pkg/kubeutils/client.go @@ -11,12 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "context" logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,6 +34,11 @@ type K8sClient struct { dynamicClient *dynamic.DynamicClient } +// GetGroupVersionResource :- gets GroupVersionResource for dynamic client +func GetGroupVersionResource(group, version, resource string) schema.GroupVersionResource { + return schema.GroupVersionResource{Group: group, Version: version, Resource: resource} +} + // NewK8sClient creates a k8s client instance. func NewK8sClient(kubeConfigPath string) *K8sClient { client := &K8sClient{} @@ -76,14 +82,20 @@ func (client *K8sClient) GetCustomResourceInstance( utd, err := client.dynamicClient. Resource(gvr). Namespace(namespace). - Get(context.TODO(), name, metav1.GetOptions{}) + Get(context.Background(), name, metav1.GetOptions{}) if err != nil { logger.Infof("fail to get %s %s", gvr.String(), name) } return utd, err } -// GetGroupVersionResource :- gets GroupVersionResource for dynamic client -func GetGroupVersionResource(group, version, resource string) schema.GroupVersionResource { - return schema.GroupVersionResource{Group: group, Version: version, Resource: resource} +// CreatePod creates a Pod instance in the cluster +func (client *K8sClient) CreatePod(namespace string, pod *corev1.Pod) error { + _, err := client.clientset.CoreV1().Pods(namespace).Create( + context.Background(), pod, metav1.CreateOptions{}, + ) + if err != nil { + logger.Infof("fail to create a pod : %s", pod.ObjectMeta.Name) + } + return err } diff --git a/go/master/pkg/kubernetes/elasticjob.go b/go/master/pkg/kubeutils/elasticjob.go similarity index 98% rename from go/master/pkg/kubernetes/elasticjob.go rename to go/master/pkg/kubeutils/elasticjob.go index 16d597b58..bff45b237 100644 --- a/go/master/pkg/kubernetes/elasticjob.go +++ b/go/master/pkg/kubeutils/elasticjob.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" diff --git a/go/master/pkg/kubernetes/elasticjob_internal_test.go b/go/master/pkg/kubeutils/elasticjob_internal_test.go similarity index 98% rename from go/master/pkg/kubernetes/elasticjob_internal_test.go rename to go/master/pkg/kubeutils/elasticjob_internal_test.go index 6bdec0934..45a4ce06b 100644 --- a/go/master/pkg/kubernetes/elasticjob_internal_test.go +++ b/go/master/pkg/kubeutils/elasticjob_internal_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "os" diff --git a/go/master/pkg/kubernetes/kubernetes_suite_test.go b/go/master/pkg/kubeutils/kubernetes_suite_test.go similarity index 97% rename from go/master/pkg/kubernetes/kubernetes_suite_test.go rename to go/master/pkg/kubeutils/kubernetes_suite_test.go index 261143afc..d9c0002b8 100644 --- a/go/master/pkg/kubernetes/kubernetes_suite_test.go +++ b/go/master/pkg/kubeutils/kubernetes_suite_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "testing" diff --git a/go/master/pkg/kubeutils/pod.go b/go/master/pkg/kubeutils/pod.go new file mode 100644 index 000000000..4fcbe2529 --- /dev/null +++ b/go/master/pkg/kubeutils/pod.go @@ -0,0 +1,157 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubeutils + +import ( + "fmt" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + envMasterAddr = "DLROVER_MASTER_ADDR" + envPodName = "MY_POD_NAME" + envPodIP = "MY_POD_IP" + envHostIP = "MY_HOST_IP" + envReplicaType = "REPLICA_TYPE" + envReplicaID = "REPLICA_ID" + envReplicaRank = "REPLICA_RANK" + envReplicaNum = "REPLICA_NUM" + + labelJobKey = "elasticjob.dlrover/name" + labelReplicaTypeKey = "elasticjob.dlrover/replica-type" + labelReplicaIDKey = "elasticjob.dlrover/replica-id" + labelReplicaRankKey = "elasticjob.dlrover/rank" +) + +// ReplicaConfig contains the replica specification. +type ReplicaConfig struct { + Type string + ID int32 + // Number if the total number of the replicas. + Number int32 + // Rank is the rank of the pod in the replicas. + Rank int32 +} + +// PodConfig contains the replica config and pod template spec. +type PodConfig struct { + Replica *ReplicaConfig + TemplateSpec *corev1.PodTemplateSpec +} + +// BuildPod builds a corev1.Pod. +func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod { + podName := fmt.Sprintf("%s-%s-%d", jobContext.Name, podConfig.Replica.Type, podConfig.Replica.ID) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: podConfig.TemplateSpec.ObjectMeta, + Spec: podConfig.TemplateSpec.Spec, + } + // Set pod name and namespace. + pod.ObjectMeta.Name = podName + pod.ObjectMeta.Namespace = jobContext.NameSpace + + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) + } + + // Insert Replica specifications into the pod labels. + pod.ObjectMeta.Labels[labelJobKey] = jobContext.Name + pod.ObjectMeta.Labels[labelReplicaTypeKey] = podConfig.Replica.Type + pod.ObjectMeta.Labels[labelReplicaIDKey] = fmt.Sprintf("%d", podConfig.Replica.ID) + pod.ObjectMeta.Labels[labelReplicaRankKey] = fmt.Sprintf("%d", podConfig.Replica.Rank) + + mainContainer := &pod.Spec.Containers[0] + insertJobMasterAddrEnv(mainContainer, jobContext.MasterHost, jobContext.MasterPort) + insertPodMetaEnv(mainContainer) + insertReplicaEnv(mainContainer, podConfig.Replica) + + return pod +} + +func insertJobMasterAddrEnv(container *corev1.Container, host string, port int32) { + jobMasterServiceEnv := corev1.EnvVar{ + Name: envMasterAddr, + Value: fmt.Sprintf("%s:%d", host, port), + } + container.Env = append(container.Env, jobMasterServiceEnv) + +} + +func insertPodMetaEnv(container *corev1.Container) { + podNameEnv := corev1.EnvVar{ + Name: envPodName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, + }, + } + container.Env = append(container.Env, podNameEnv) + + podIPEnv := corev1.EnvVar{ + Name: envPodIP, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + } + container.Env = append(container.Env, podIPEnv) + + hostIPEnv := corev1.EnvVar{ + Name: envHostIP, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.hostIP", + }, + }, + } + container.Env = append(container.Env, hostIPEnv) +} + +func insertReplicaEnv(container *corev1.Container, replicaConfig *ReplicaConfig) { + replicaTypeEnv := corev1.EnvVar{ + Name: envReplicaType, + Value: string(replicaConfig.Type), + } + container.Env = append(container.Env, replicaTypeEnv) + + replicaIDEnv := corev1.EnvVar{ + Name: envReplicaID, + Value: fmt.Sprintf("%d", replicaConfig.ID), + } + container.Env = append(container.Env, replicaIDEnv) + + rankIDEnv := corev1.EnvVar{ + Name: envReplicaNum, + Value: fmt.Sprintf("%d", replicaConfig.Rank), + } + container.Env = append(container.Env, rankIDEnv) + + replicaNumEnv := corev1.EnvVar{ + Name: envReplicaRank, + Value: fmt.Sprintf("%d", replicaConfig.Number), + } + container.Env = append(container.Env, replicaNumEnv) +} diff --git a/go/master/pkg/kubeutils/pod_internal_test.go b/go/master/pkg/kubeutils/pod_internal_test.go new file mode 100644 index 000000000..181a4e854 --- /dev/null +++ b/go/master/pkg/kubeutils/pod_internal_test.go @@ -0,0 +1,84 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubeutils + +import ( + "errors" + "fmt" + "os" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" +) + +var _ = Describe("Pod", func() { + It("Create a Pod", func() { + jobContext := &common.JobContext{ + NameSpace: "dlrover", + Name: "train-demo", + MasterHost: "127.0.0.1", + MasterPort: 12345, + } + container := corev1.Container{ + Name: "main", + Image: "python:3.12.8", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/bash", "-c", "echo 0"}, + } + podConfig := &PodConfig{ + Replica: &ReplicaConfig{ + Type: "worker", + ID: 0, + Number: 8, + Rank: 0, + }, + TemplateSpec: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{container}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + } + pod := BuildPod(jobContext, podConfig) + Expect(pod.ObjectMeta.Name).To(Equal("train-demo-worker-0")) + Expect(pod.ObjectMeta.Namespace).To(Equal("dlrover")) + jobName, ok := pod.ObjectMeta.Labels[labelJobKey] + Expect(ok).To(BeTrue()) + Expect(jobName).To(Equal("train-demo")) + replicaType, ok := pod.ObjectMeta.Labels[labelReplicaTypeKey] + Expect(ok).To(BeTrue()) + Expect(replicaType).To(Equal("worker")) + + configPath := os.Getenv("KUBERNETES_CONFIG_PATH") + if _, err := os.Stat(configPath); errors.Is(err, os.ErrNotExist) { + Skip(fmt.Sprintf("The config file %s is not exist.", configPath)) + } + + k8sClient := NewK8sClient(configPath) + pod.ObjectMeta.Namespace = "no-namspace" + err := k8sClient.CreatePod("dlrover", pod) + Expect(kubeerrors.IsBadRequest(err)).To(BeTrue()) + + pod.ObjectMeta.Namespace = "dlrover" + err = k8sClient.CreatePod("dlrover", pod) + Expect(kubeerrors.IsAlreadyExists(err)).To(BeTrue()) + + pod.ObjectMeta.Name = "" + err = k8sClient.CreatePod("dlrover", pod) + Expect(kubeerrors.IsInvalid(err)).To(BeTrue()) + }) +}) diff --git a/go/master/pkg/master.go b/go/master/pkg/master.go index 7baf4c2b5..1b13fceae 100644 --- a/go/master/pkg/master.go +++ b/go/master/pkg/master.go @@ -17,7 +17,7 @@ import ( "time" elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" - "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" logger "github.com/sirupsen/logrus" ) @@ -25,18 +25,18 @@ import ( type JobMaster struct { Namespace string JobName string - K8sClient *kubernetes.K8sClient + K8sClient *kubeutils.K8sClient Job *elasticjob.ElasticJob } // NewJobMaster creates the master for an elasticjob. -func NewJobMaster(namespace string, jobName string, k8sClient *kubernetes.K8sClient) *JobMaster { +func NewJobMaster(namespace string, jobName string, k8sClient *kubeutils.K8sClient) *JobMaster { master := &JobMaster{ Namespace: namespace, JobName: jobName, } if k8sClient != nil { - job := kubernetes.GetElasticJobInstance(k8sClient, namespace, jobName) + job := kubeutils.GetElasticJobInstance(k8sClient, namespace, jobName) master.K8sClient = k8sClient master.Job = job }