Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KEP-2170: Initial Implementations for v2 Manager #2236

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/unittests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ jobs:
with:
go-version-file: ${{ env.GOPATH }}/src/github.com/kubeflow/training-operator/go.mod

- name: Run Go test
- name: Run Go test for v1
run: |
make test ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }}

- name: Run Go test for v2
run: |
make test-integrationv2 ENVTEST_K8S_VERSION=${{ matrix.kubernetes-version }}

- name: Coveralls report
uses: shogo82148/actions-goveralls@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ testall: manifests generate fmt vet golangci-lint test ## Run tests.
test: envtest
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out

.PHONY: test-integrationv2
test-integrationv2: envtest
KUBEBUILDER_ASSETS="$(shell setup-envtest use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/... -coverprofile cover.out

envtest:
ifndef HAS_SETUP_ENVTEST
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@bf15e44028f908c790721fc8fe67c7bf2d06a611 # v0.17.2
Expand Down
160 changes: 160 additions & 0 deletions cmd/training-operator.v2alpha1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,166 @@ limitations under the License.

package main

import (
"crypto/tls"
"errors"
"flag"
"net/http"
"os"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
"github.com/kubeflow/training-operator/pkg/cert"
controllerv2 "github.com/kubeflow/training-operator/pkg/controller.v2"
webhookv2 "github.com/kubeflow/training-operator/pkg/webhook.v2"
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kubeflowv2.AddToScheme(scheme))
utilruntime.Must(jobsetv1alpha2.AddToScheme(scheme))
}

func main() {
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var webhookServerPort int
var webhookServiceName string
var webhookSecretName string
var tlsOpts []func(*tls.Config)

flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")

// Cert generation flags
flag.IntVar(&webhookServerPort, "webhook-server-port", 9443, "Endpoint port for the webhook server.")
flag.StringVar(&webhookServiceName, "webhook-service-name", "training-operator-v2", "Name of the Service used as part of the DNSName")
flag.StringVar(&webhookSecretName, "webhook-secret-name", "training-operator-v2-webhook-cert", "Name of the Secret to store CA and server certs")

opts := zap.Options{
TimeEncoder: zapcore.RFC3339NanoTimeEncoder,
ZapOpts: []zaplog.Option{zaplog.AddCaller()},
}
opts.BindFlags(flag.CommandLine)
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

if !enableHTTP2 {
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
tlsOpts = append(tlsOpts, func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
})
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
TLSOpts: tlsOpts,
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: webhookServerPort,
TLSOpts: tlsOpts,
}),
HealthProbeBindAddress: probeAddr,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

certsReady := make(chan struct{})
if err = cert.ManageCerts(mgr, cert.Config{
WebhookSecretName: webhookSecretName,
WebhookServiceName: webhookServiceName,
}, certsReady); err != nil {
setupLog.Error(err, "unable to set up cert rotation")
os.Exit(1)
}

setupProbeEndpoints(mgr, certsReady)
// Set up controllers using goroutines to start the manager quickly.
go setupControllers(mgr, certsReady)

setupLog.Info("Starting manager")
if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "Could not run manager")
os.Exit(1)
}
}

func setupControllers(mgr ctrl.Manager, certsReady <-chan struct{}) {
setupLog.Info("Waiting for certificate generation to complete")
<-certsReady
setupLog.Info("Certs ready")

if failedCtrlName, err := controllerv2.SetupControllers(mgr); err != nil {
setupLog.Error(err, "Could not create controller", "controller", failedCtrlName)
os.Exit(1)
}
if failedWebhook, err := webhookv2.Setup(mgr); err != nil {
setupLog.Error(err, "Could not create webhook", "webhook", failedWebhook)
os.Exit(1)
}
}

func setupProbeEndpoints(mgr ctrl.Manager, certsReady <-chan struct{}) {
defer setupLog.Info("Probe endpoints are configured on healthz and readyz")

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
// Wait for the webhook server to be listening before advertising the
// training-operator replica as ready. This allows users to wait with sending the first
// requests, requiring webhooks, until the training-operator deployment is available, so
// that the early requests are not rejected during the training-operator's startup.
// We wrap the call to GetWebhookServer in a closure to delay calling
// the function, otherwise a not fully-initialized webhook server (without
// ready certs) fails the start of the manager.
if err := mgr.AddReadyzCheck("readyz", func(req *http.Request) error {
select {
case <-certsReady:
return mgr.GetWebhookServer().StartedChecker()(req)
default:
return errors.New("certificates are not ready")
}
}); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
}
29 changes: 29 additions & 0 deletions pkg/controller.v2/setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2024 The Kubeflow Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllerv2

import ctrl "sigs.k8s.io/controller-runtime"

func SetupControllers(mgr ctrl.Manager) (string, error) {
if err := NewTrainJobReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor("training-operator-trainjob-controller"),
).SetupWithManager(mgr); err != nil {
return "TrainJob", err
}
return "", nil
}
43 changes: 43 additions & 0 deletions pkg/controller.v2/trainjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,46 @@ limitations under the License.
*/

package controllerv2

import (
"context"

"github.com/go-logr/logr"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we agree before to use controller-runtime logger everywhere to be consistent: #2048
Should we make that change @tenzen-y ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is the controller-runtime logger. The controller-runtime logger implement this logr interface, so we need to import this library to use the controller-runtime logger. You can see the actual controller-runtime logger in the Reconcile function.

"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
)

type TrainJobReconciler struct {
log logr.Logger
client client.Client
recorder record.EventRecorder
}

func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder) *TrainJobReconciler {
return &TrainJobReconciler{
log: ctrl.Log.WithName("trainjob-controller"),
client: client,
recorder: recorder,
}
}

func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var trainJob kubeflowv2.TrainJob
if err := r.client.Get(ctx, req.NamespacedName, &trainJob); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := ctrl.LoggerFrom(ctx).WithValues("trainJob", klog.KObj(&trainJob))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value to configure controller runtime logger with klog.KObj(&trainJob) ?
Also, since we add logger to the TrainJob reconciler struct, why do you create a new logger here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value to configure controller runtime logger with klog.KObj(&trainJob) ?
Also, since we add logger to the TrainJob reconciler struct, why do you create a new logger here ?

This is not new logger, actually this logger is propagated from Reconciler and we reuse it.
Here, we add information for which objects are reconciling.

Copy link
Member Author

@tenzen-y tenzen-y Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, we do not generate new logger, and not add a logger name (WithName) here. That is WithValue as you can see here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense!

ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling TrainJob")
return ctrl.Result{}, nil
}

func (r *TrainJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kubeflowv2.TrainJob{}).
Complete(r)
}
23 changes: 23 additions & 0 deletions pkg/webhook.v2/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The Kubeflow Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhookv2

import ctrl "sigs.k8s.io/controller-runtime"

func Setup(ctrl.Manager) (string, error) {
return "", nil
}
42 changes: 42 additions & 0 deletions test/integration/controller.v2/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2024 The Kubeflow Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllerv2

import (
"context"
"testing"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubeflow/training-operator/test/integration/framework"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see a value to separate framework logic into separate file ?
E.g. we can add this logic in the suite_test.go , similar to JobSet: https://github.com/kubernetes-sigs/jobset/blob/main/test/integration/controller/suite_test.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed since we will add webhook server integration testing and we will use this framework to set up testing environment.

- /test/integration/controller.v2/
- /test/integration/webhook.v2/trainjob_test.go
- /test/integration/webhook.v2/trainingruntime_test.go
- /test/integration/webhook.v2/clustertrainingruntime_test.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using integration testing framework would be better between testing for trainjob_controller, trainjob_wwbhook, trainingruntime_webhook, and clustertrainingruntime_webhook.

)

var (
cfg *rest.Config
k8sClient client.Client
ctx context.Context
fwk *framework.Framework
)

func TestAPIs(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

ginkgo.RunSpecs(t, "v2 Controllers Suite")
}
Loading
Loading