-
Notifications
You must be signed in to change notification settings - Fork 715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KEP-2170: Implement TrainJob Reconciler to manage objects #2295
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ bin/ | |
/tf-operator | ||
vendor/ | ||
testbin/* | ||
manifests/external-crds/ | ||
cover.out | ||
|
||
# IDEs | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,15 +74,16 @@ HAS_SETUP_ENVTEST := $(shell command -v setup-envtest;) | |
testall: manifests generate fmt vet golangci-lint test ## Run tests. | ||
|
||
test: envtest | ||
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out | ||
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" \ | ||
go test ./pkg/apis/kubeflow.org/v1/... ./pkg/cert/... ./pkg/common/... ./pkg/config/... ./pkg/controller.v1/... ./pkg/core/... ./pkg/util/... ./pkg/webhooks/... -coverprofile cover.out | ||
|
||
.PHONY: test-integrationv2 | ||
test-integrationv2: envtest | ||
test-integrationv2: envtest jobset-operator-crd scheduler-plugins-crd | ||
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/... -coverprofile cover.out | ||
|
||
.PHONY: testv2 | ||
testv2: | ||
go test ./pkg/controller.v2/... ./pkg/runtime.v2/... ./pkg/webhook.v2/... ./pkg/util.v2/... -coverprofile cover.out | ||
go test ./pkg/apis/kubeflow.org/v2alpha1/... ./pkg/controller.v2/... ./pkg/runtime.v2/... ./pkg/webhook.v2/... ./pkg/util.v2/... -coverprofile cover.out | ||
|
||
envtest: | ||
ifndef HAS_SETUP_ENVTEST | ||
|
@@ -127,3 +128,18 @@ controller-gen: ## Download controller-gen locally if necessary. | |
KUSTOMIZE = $(shell pwd)/bin/kustomize | ||
kustomize: ## Download kustomize locally if necessary. | ||
GOBIN=$(PROJECT_DIR)/bin go install sigs.k8s.io/kustomize/kustomize/[email protected] | ||
|
||
## Download external CRDs for the integration testings. | ||
EXTERNAL_CRDS_DIR ?= $(PROJECT_DIR)/manifests/external-crds | ||
|
||
JOBSET_ROOT = $(shell go list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/jobset) | ||
.PHONY: jobset-operator-crd | ||
jobset-operator-crd: ## Copy the CRDs from the jobset-operator to the manifests/external-crds directory. | ||
mkdir -p $(EXTERNAL_CRDS_DIR)/jobset-operator/ | ||
cp -f $(JOBSET_ROOT)/config/components/crd/bases/* $(EXTERNAL_CRDS_DIR)/jobset-operator/ | ||
|
||
SCHEDULER_PLUGINS_ROOT = $(shell go list -m -f "{{.Dir}}" sigs.k8s.io/scheduler-plugins) | ||
.PHONY: scheduler-plugins-crd | ||
scheduler-plugins-crd: | ||
mkdir -p $(EXTERNAL_CRDS_DIR)/scheduler-plugins/ | ||
cp -f $(SCHEDULER_PLUGINS_ROOT)/manifests/coscheduling/* $(EXTERNAL_CRDS_DIR)/scheduler-plugins |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,28 +18,37 @@ package controllerv2 | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/go-logr/logr" | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
"k8s.io/client-go/tools/record" | ||
"k8s.io/klog/v2" | ||
"k8s.io/utils/ptr" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil" | ||
|
||
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" | ||
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||
jobruntimes "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||
) | ||
|
||
var errorUnsupportedRuntime = errors.New("the specified runtime is not supported") | ||
|
||
type TrainJobReconciler struct { | ||
log logr.Logger | ||
client client.Client | ||
recorder record.EventRecorder | ||
runtimes map[string]jobruntimes.Runtime | ||
} | ||
|
||
func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder) *TrainJobReconciler { | ||
func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder, runtimes map[string]jobruntimes.Runtime) *TrainJobReconciler { | ||
return &TrainJobReconciler{ | ||
log: ctrl.Log.WithName("trainjob-controller"), | ||
client: client, | ||
recorder: recorder, | ||
runtimes: runtimes, | ||
} | ||
} | ||
|
||
|
@@ -49,16 +58,74 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c | |
return ctrl.Result{}, client.IgnoreNotFound(err) | ||
} | ||
log := ctrl.LoggerFrom(ctx).WithValues("trainJob", klog.KObj(&trainJob)) | ||
ctrl.LoggerInto(ctx, log) | ||
ctx = ctrl.LoggerInto(ctx, log) | ||
log.V(2).Info("Reconciling TrainJob") | ||
if err := r.createOrUpdateObjs(ctx, &trainJob); err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
// TODO (tenzen-y): Do update the status. | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
func (r *TrainJobReconciler) SetupWithManager(mgr ctrl.Manager, runtimes map[string]runtime.Runtime) error { | ||
func (r *TrainJobReconciler) createOrUpdateObjs(ctx context.Context, trainJob *kubeflowv2.TrainJob) error { | ||
log := ctrl.LoggerFrom(ctx) | ||
|
||
runtimeRefGK := runtimeRefToGroupKind(trainJob.Spec.RuntimeRef).String() | ||
runtime, ok := r.runtimes[runtimeRefGK] | ||
if !ok { | ||
return fmt.Errorf("%w: %s", errorUnsupportedRuntime, runtimeRefGK) | ||
} | ||
objs, err := runtime.NewObjects(ctx, trainJob) | ||
if err != nil { | ||
return err | ||
} | ||
for _, obj := range objs { | ||
var gvk schema.GroupVersionKind | ||
if gvk, err = apiutil.GVKForObject(obj.DeepCopyObject(), r.client.Scheme()); err != nil { | ||
return err | ||
} | ||
logKeysAndValues := []any{ | ||
"groupVersionKind", gvk.String(), | ||
"namespace", obj.GetNamespace(), | ||
"name", obj.GetName(), | ||
} | ||
// TODO (tenzen-y): Ideally, we should use the SSA instead of checking existence. | ||
// Non-empty resourceVersion indicates UPDATE operation. | ||
var creationErr error | ||
var created bool | ||
if obj.GetResourceVersion() == "" { | ||
creationErr = r.client.Create(ctx, obj) | ||
created = creationErr == nil | ||
} | ||
switch { | ||
case created: | ||
log.V(5).Info("Succeeded to create object", logKeysAndValues) | ||
continue | ||
case client.IgnoreAlreadyExists(creationErr) != nil: | ||
return creationErr | ||
default: | ||
// This indicates CREATE operation has not been performed or the object has already existed in the cluster. | ||
if err = r.client.Update(ctx, obj); err != nil { | ||
return err | ||
} | ||
log.V(5).Info("Succeeded to update object", logKeysAndValues) | ||
} | ||
Comment on lines
+100
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to make this condition simpler ? E.g. if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that we need to check if the After we migrate to the SSA patch (#2297), we can avoid this complexity and just issue the SSA patch for CREATE and UPDATE. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, that makes sense! |
||
} | ||
return nil | ||
} | ||
|
||
func runtimeRefToGroupKind(runtimeRef kubeflowv2.RuntimeRef) schema.GroupKind { | ||
return schema.GroupKind{ | ||
Group: ptr.Deref(runtimeRef.APIGroup, ""), | ||
Kind: ptr.Deref(runtimeRef.Kind, ""), | ||
Comment on lines
+119
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to use Deref here if APIGroup and Kind can't be empty at this stage ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that we can not restrict these field like non empty. Especially, these should be accepted an empty value so that they can specify the core API type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What is the use-case when users can reference the object from Kubernetes Core, like Service or Pods ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that they want to register Pod to the runtimes, and I sometimes have heard from some users the use cases since they often use the plain Pod with many special annotations in the production cluster to mitigate CRD migration costs. For sure, I understand that plain Pod with special annotations is not ideal approach. But, no restrictions would be better in the runtime registration level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, for the Kind, should it be always non-empty ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
As runtime specifications, Kind should be always non-empty. |
||
} | ||
} | ||
|
||
func (r *TrainJobReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
b := ctrl.NewControllerManagedBy(mgr). | ||
For(&kubeflowv2.TrainJob{}) | ||
for _, run := range runtimes { | ||
for _, registrar := range run.EventHandlerRegistrars() { | ||
for _, runtime := range r.runtimes { | ||
for _, registrar := range runtime.EventHandlerRegistrars() { | ||
if registrar != nil { | ||
b = registrar(b, mgr.GetClient()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,29 +74,37 @@ func (j *JobSet) Build(ctx context.Context, info *runtime.Info, trainJob *kubefl | |
if !ok { | ||
return nil, nil | ||
} | ||
jobSetBuilder := NewBuilder(client.ObjectKeyFromObject(trainJob), kubeflowv2.JobSetTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: info.Labels, | ||
Annotations: info.Annotations, | ||
}, | ||
Spec: raw.Spec, | ||
}) | ||
|
||
var jobSetBuilder *Builder | ||
oldJobSet := &jobsetv1alpha2.JobSet{} | ||
if err := j.client.Get(ctx, client.ObjectKeyFromObject(trainJob), oldJobSet); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return nil, err | ||
} | ||
jobSetBuilder = NewBuilder(client.ObjectKeyFromObject(trainJob), kubeflowv2.JobSetTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: info.Labels, | ||
Annotations: info.Annotations, | ||
}, | ||
Spec: raw.Spec, | ||
}) | ||
oldJobSet = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we need this for the |
||
} else { | ||
jobSetBuilder = &Builder{ | ||
JobSet: *oldJobSet.DeepCopy(), | ||
} | ||
} | ||
|
||
// TODO (tenzen-y): We should support all field propagation in builder. | ||
jobSet := jobSetBuilder. | ||
Suspend(trainJob.Spec.Suspend). | ||
ContainerImage(trainJob.Spec.Trainer.Image). | ||
JobCompletionMode(batchv1.IndexedCompletion). | ||
PodLabels(info.PodLabels). | ||
Build() | ||
if err := ctrlutil.SetControllerReference(trainJob, jobSet, j.scheme); err != nil { | ||
return nil, err | ||
} | ||
oldJobSet := &jobsetv1alpha2.JobSet{} | ||
if err := j.client.Get(ctx, client.ObjectKeyFromObject(jobSet), oldJobSet); err != nil { | ||
if !apierrors.IsNotFound(err) { | ||
return nil, err | ||
} | ||
oldJobSet = nil | ||
} | ||
if err := info.Update(jobSet); err != nil { | ||
return nil, err | ||
} | ||
|
@@ -106,9 +114,14 @@ func (j *JobSet) Build(ctx context.Context, info *runtime.Info, trainJob *kubefl | |
return nil, nil | ||
} | ||
|
||
func needsCreateOrUpdate(old, new *jobsetv1alpha2.JobSet, suspended bool) bool { | ||
func needsCreateOrUpdate(old, new *jobsetv1alpha2.JobSet, trainJobIsSuspended bool) bool { | ||
return old == nil || | ||
suspended && (!equality.Semantic.DeepEqual(old.Spec, new.Spec) || !maps.Equal(old.Labels, new.Labels) || !maps.Equal(old.Annotations, new.Annotations)) | ||
(!trainJobIsSuspended && jobSetIsSuspended(old) && !jobSetIsSuspended(new)) || | ||
(trainJobIsSuspended && (!equality.Semantic.DeepEqual(old.Spec, new.Spec) || !maps.Equal(old.Labels, new.Labels) || !maps.Equal(old.Annotations, new.Annotations))) | ||
} | ||
|
||
func jobSetIsSuspended(jobSet *jobsetv1alpha2.JobSet) bool { | ||
return ptr.Deref(jobSet.Spec.Suspend, false) | ||
} | ||
|
||
func (j *JobSet) ReconcilerBuilders() []runtime.ReconcilerBuilder { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we set name and namespace for the build objects ?
E.g. here I can see that we only set the TypeMeta and Spec:
training-operator/pkg/runtime.v2/core/trainingruntime.go
Lines 113 to 119 in 9e04bdd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we do. Actual resource names are set in each plugins in the following:
JobSet:
training-operator/pkg/runtime.v2/framework/plugins/jobset/builder.go
Lines 42 to 43 in 9e04bdd
PodGroup:
training-operator/pkg/runtime.v2/framework/plugins/coscheduling/coscheduling.go
Lines 121 to 122 in 9e04bdd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why for JobSet plugin you want to have separate Builder, but for co-scheduling you create a PodSpec object directly under
Build()
API ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is simply reason. There are so many fields and nested levels in the JobSet, but the PodGroup has a few field and a few nested level. For sure, we can provide the builder in the PodGroup as well. But, I guess that the PodGroup builder seems to be slightly overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!