Skip to content

Commit

Permalink
feat: remove machine and aws/karpenter-core dependency from kaito
Browse files Browse the repository at this point in the history
1. remove machine resource
2. remove aws/karpenter-core dependency
3. upgrade gpu-provisioner version to v0.2.2(support nodeclaim api)
4. fix: make sure nodeclaims are initialized before ensuring other resources.

Signed-off-by: rambohe-ch <[email protected]>
  • Loading branch information
rambohe-ch committed Jan 13, 2025
1 parent 3d04ba6 commit b571049
Show file tree
Hide file tree
Showing 28 changed files with 151 additions and 1,692 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
REGISTRY ?= YOUR_REGISTRY
IMG_NAME ?= workspace
VERSION ?= v0.4.1
GPU_PROVISIONER_VERSION ?= 0.2.1
GPU_PROVISIONER_VERSION ?= 0.3.0
IMG_TAG ?= $(subst v,,$(VERSION))

ROOT_DIR := $(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
Expand Down Expand Up @@ -130,7 +130,7 @@ GINKGO_SKIP ?=
GINKGO_NODES ?= 2
GINKGO_NO_COLOR ?= false
GINKGO_TIMEOUT ?= 120m
GINKGO_ARGS ?= -focus="$(GINKGO_FOCUS)" -skip="$(GINKGO_SKIP)" -nodes=$(GINKGO_NODES) -no-color=$(GINKGO_NO_COLOR) --output-interceptor-mode=none -timeout=$(GINKGO_TIMEOUT) --fail-fast
GINKGO_ARGS ?= -focus="$(GINKGO_FOCUS)" -skip="$(GINKGO_SKIP)" -nodes=$(GINKGO_NODES) -no-color=$(GINKGO_NO_COLOR) -timeout=$(GINKGO_TIMEOUT) --fail-fast

$(E2E_TEST):
(cd test/e2e && go test -c . -o $(E2E_TEST))
Expand Down
3 changes: 0 additions & 3 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ package v1alpha1
type ConditionType string

const (
// ConditionTypeMachineStatus is the state when checking machine status.
ConditionTypeMachineStatus = ConditionType("MachineReady")

// ConditionTypeNodeClaimStatus is the state when checking nodeClaim status.
ConditionTypeNodeClaimStatus = ConditionType("NodeClaimReady")

Expand Down
293 changes: 0 additions & 293 deletions charts/kaito/workspace/crds/karpenter.sh_machines.yaml

This file was deleted.

1 change: 0 additions & 1 deletion charts/kaito/workspace/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ securityContext:
drop:
- "ALL"
featureGates:
Karpenter: "false"
vLLM: "true"
webhook:
port: 9443
Expand Down
4 changes: 1 addition & 3 deletions cmd/ragengine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ import (
"time"

azurev1alpha2 "github.com/Azure/karpenter-provider-azure/pkg/apis/v1alpha2"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
awsv1beta1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/kaito-project/kaito/pkg/k8sclient"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/kaito-project/kaito/pkg/ragengine/controllers"
"github.com/kaito-project/kaito/pkg/ragengine/webhooks"
"k8s.io/api/apps/v1beta1"
"k8s.io/klog/v2"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/webhook"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand Down Expand Up @@ -57,7 +56,6 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kaitov1alpha1.AddToScheme(scheme))
utilruntime.Must(v1alpha5.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(v1beta1.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(azurev1alpha2.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(awsv1beta1.SchemeBuilder.AddToScheme(scheme))
Expand Down
11 changes: 3 additions & 8 deletions cmd/workspace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ import (
awsv1beta1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1beta1"
"github.com/kaito-project/kaito/pkg/featuregates"
"github.com/kaito-project/kaito/pkg/k8sclient"
"github.com/kaito-project/kaito/pkg/utils/consts"
"github.com/kaito-project/kaito/pkg/utils/nodeclaim"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/kaito-project/kaito/pkg/workspace/controllers"
"github.com/kaito-project/kaito/pkg/workspace/webhooks"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -60,7 +58,6 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(kaitov1alpha1.AddToScheme(scheme))
utilruntime.Must(v1alpha5.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(v1beta1.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(azurev1alpha2.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(awsv1beta1.SchemeBuilder.AddToScheme(scheme))
Expand Down Expand Up @@ -168,11 +165,9 @@ func main() {
exitWithErrorFunc()
}

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
err = nodeclaim.CheckNodeClass(ctx, kClient)
if err != nil {
exitWithErrorFunc()
}
err = nodeclaim.CheckNodeClass(ctx, kClient)
if err != nil {
exitWithErrorFunc()
}

klog.InfoS("starting manager")
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ toolchain go1.22.5

require (
github.com/Azure/karpenter-provider-azure v0.5.4
github.com/aws/karpenter-core v0.29.2
github.com/aws/karpenter-provider-aws v0.36.2
github.com/go-logr/logr v1.4.2
github.com/onsi/ginkgo/v2 v2.22.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHS
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.51.16 h1:vnWKK8KjbftEkuPX8bRj3WHsLy1uhotn0eXptpvrxJI=
github.com/aws/aws-sdk-go v1.51.16/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/karpenter-core v0.29.2 h1:iS8bjC1911LA459gLEl7Jkr0QRbyKMeXB2b4NEVGQIE=
github.com/aws/karpenter-core v0.29.2/go.mod h1:GzFITbd2ijUiV4UJ0wox4RJQsFD2ncyJYtLmUlYnmJY=
github.com/aws/karpenter-provider-aws v0.36.2 h1:4JEK2OMLDOkNthITDyngbvFo2FZRd8JJFQuDLQoGc9s=
github.com/aws/karpenter-provider-aws v0.36.2/go.mod h1:sLDYiuoQr4PPTE9c3ZwFtVCTz6my+bjxIiO5SF5/M1A=
github.com/awslabs/amazon-eks-ami/nodeadm v0.0.0-20240229193347-cfab22a10647 h1:8yRBVsjGmI7qQsPWtIrbWP+XfwHO9Wq7gdLVzjqiZFs=
Expand Down
3 changes: 1 addition & 2 deletions pkg/featuregates/featuregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import (
var (
// FeatureGates is a map that holds the feature gates and their default values for Kaito.
FeatureGates = map[string]bool{
consts.FeatureFlagKarpenter: false,
consts.FeatureFlagVLLM: true,
consts.FeatureFlagVLLM: true,
// Add more feature gates here
}
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/featuregates/featuregates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ func TestParseFeatureGates(t *testing.T) {
}{
{
name: "WithValidEnableFeatureGates",
featureGates: "Karpenter=true",
featureGates: "vLLM=true",
expectedError: false,
expectedValue: "true",
},
{
name: "WithDuplicateFeatureGates",
featureGates: "Karpenter=false,Karpenter=true",
featureGates: "vLLM=false,vLLM=true",
expectedError: false,
expectedValue: "true", // Apply the last value.
},
Expand All @@ -34,12 +34,12 @@ func TestParseFeatureGates(t *testing.T) {
},
{
name: "WithUnsupportedFeatureGate",
featureGates: "unsupported=true,Karpenter=false",
featureGates: "unsupported=true,vLLM=false",
expectedError: true,
},
{
name: "WithValidDisableFeatureGates",
featureGates: "Karpenter=false",
featureGates: "vLLM=false",
expectedError: false,
expectedValue: "false",
},
Expand Down
142 changes: 16 additions & 126 deletions pkg/ragengine/controllers/ragengine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,16 @@ import (
"strings"
"time"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/go-logr/logr"
kaitov1alpha1 "github.com/kaito-project/kaito/api/v1alpha1"
"github.com/kaito-project/kaito/pkg/featuregates"
"github.com/kaito-project/kaito/pkg/ragengine/manifests"
"github.com/kaito-project/kaito/pkg/utils"
"github.com/kaito-project/kaito/pkg/utils/consts"
"github.com/kaito-project/kaito/pkg/utils/machine"
"github.com/kaito-project/kaito/pkg/utils/nodeclaim"
"github.com/kaito-project/kaito/pkg/utils/resources"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,7 +33,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"knative.dev/pkg/apis"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -349,16 +344,9 @@ func computeHash(ragEngineObj *kaitov1alpha1.RAGEngine) string {

// applyRAGEngineResource applies RAGEngine resource spec.
func (c *RAGEngineReconciler) applyRAGEngineResource(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) error {
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
// Wait for pending nodeClaims if any before we decide whether to create new node or not.
if err := nodeclaim.WaitForPendingNodeClaims(ctx, ragEngineObj, c.Client); err != nil {
return err
}
} else {
// Wait for pending machines if any before we decide whether to create new machine or not.
if err := machine.WaitForPendingMachines(ctx, ragEngineObj, c.Client); err != nil {
return err
}
// Wait for pending nodeClaims if any before we decide whether to create new node or not.
if err := nodeclaim.WaitForPendingNodeClaims(ctx, ragEngineObj, c.Client); err != nil {
return err
}

// Find all nodes that match the labelSelector and instanceType, they are not necessarily created by machines/nodeClaims.
Expand All @@ -372,16 +360,9 @@ func (c *RAGEngineReconciler) applyRAGEngineResource(ctx context.Context, ragEng

if newNodesCount > 0 {
klog.InfoS("need to create more nodes", "NodeCount", newNodesCount)
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
if err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionUnknown,
"CreateNodeClaimPending", fmt.Sprintf("creating %d nodeClaims", newNodesCount)); err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return err
}
} else if err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeMachineStatus, metav1.ConditionUnknown,
"CreateMachinePending", fmt.Sprintf("creating %d machines", newNodesCount)); err != nil {
if err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionUnknown,
"CreateNodeClaimPending", fmt.Sprintf("creating %d nodeClaims", newNodesCount)); err != nil {

Check warning on line 365 in pkg/ragengine/controllers/ragengine_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/ragengine/controllers/ragengine_controller.go#L363-L365

Added lines #L363 - L365 were not covered by tests
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return err
}
Expand Down Expand Up @@ -415,16 +396,9 @@ func (c *RAGEngineReconciler) applyRAGEngineResource(ctx context.Context, ragEng
}
}

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
if err = c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionTrue,
"installNodePluginsSuccess", "nodeClaim plugins have been installed successfully"); err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return err
}
} else if err = c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeMachineStatus, metav1.ConditionTrue,
"installNodePluginsSuccess", "machines plugins have been installed successfully"); err != nil {
if err = c.updateStatusConditionIfNotMatch(ctx, ragEngineObj,
kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionTrue,
"installNodePluginsSuccess", "nodeClaim plugins have been installed successfully"); err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return err
}
Expand Down Expand Up @@ -502,47 +476,7 @@ func (c *RAGEngineReconciler) createAndValidateNode(ctx context.Context, ragEngi
if nodeOSDiskSize == "" {
nodeOSDiskSize = "0" // The default OS size is used
}

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
return c.CreateNodeClaim(ctx, ragEngineObj, nodeOSDiskSize)
} else {
return c.CreateMachine(ctx, ragEngineObj, nodeOSDiskSize)
}
}

func (c *RAGEngineReconciler) CreateMachine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine, nodeOSDiskSize string) (*corev1.Node, error) {
RetryWithDifferentName:
newMachine := machine.GenerateMachineManifest(ctx, nodeOSDiskSize, ragEngineObj)

if err := machine.CreateMachine(ctx, newMachine, c.Client); err != nil {
if apierrors.IsAlreadyExists(err) {
klog.InfoS("A machine exists with the same name, retry with a different name", "machine", klog.KObj(newMachine))
goto RetryWithDifferentName
} else {

klog.ErrorS(err, "failed to create machine", "machine", newMachine.Name)
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.ConditionTypeMachineStatus, metav1.ConditionFalse,
"machineFailedCreation", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return nil, updateErr
}
return nil, err
}
}

// check machine status until it is ready
err := machine.CheckMachineStatus(ctx, newMachine, c.Client)
if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.ConditionTypeMachineStatus, metav1.ConditionFalse,
"checkMachineStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return nil, updateErr
}
return nil, err
}

// get the node object from the machine status nodeName.
return resources.GetNode(ctx, newMachine.Status.NodeName, c.Client)
return c.CreateNodeClaim(ctx, ragEngineObj, nodeOSDiskSize)
}

func (c *RAGEngineReconciler) CreateNodeClaim(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine, nodeOSDiskSize string) (*corev1.Node, error) {
Expand Down Expand Up @@ -598,18 +532,10 @@ func (c *RAGEngineReconciler) ensureNodePlugins(ctx context.Context, ragEngineOb
if err := resources.UpdateNodeWithLabel(ctx, nodeObj.Name, resources.LabelKeyNvidia, resources.LabelValueNvidia, c.Client); err != nil {
if apierrors.IsNotFound(err) {
klog.ErrorS(err, "nvidia plugin cannot be installed, node not found", "node", nodeObj.Name)
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionFalse,
"checkNodeClaimStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return updateErr
}
} else {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.ConditionTypeMachineStatus, metav1.ConditionFalse,
"checkMachineStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return updateErr
}
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.ConditionTypeNodeClaimStatus, metav1.ConditionFalse,
"checkNodeClaimStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return updateErr

Check warning on line 538 in pkg/ragengine/controllers/ragengine_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/ragengine/controllers/ragengine_controller.go#L535-L538

Added lines #L535 - L538 were not covered by tests
}
return err
}
Expand All @@ -635,46 +561,10 @@ func (c *RAGEngineReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&kaitov1alpha1.RAGEngine{}).
Owns(&appsv1.ControllerRevision{}).
Owns(&appsv1.Deployment{}).
Watches(&v1alpha5.Machine{}, c.watchMachines()).
Watches(&v1beta1.NodeClaim{}, c.watchNodeClaims()). // watches for nodeClaim with labels indicating ragengine name.

Check warning on line 564 in pkg/ragengine/controllers/ragengine_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/ragengine/controllers/ragengine_controller.go#L564

Added line #L564 was not covered by tests
WithOptions(controller.Options{MaxConcurrentReconciles: 5})
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
builder.Watches(&v1beta1.NodeClaim{}, c.watchNodeClaims()) // watches for nodeClaim with labels indicating ragengine name.
} else {
builder.Watches(&v1alpha5.Machine{}, c.watchMachines())
}
return builder.Complete(c)
}

// watches for machine with labels indicating RAGEngine name.
func (c *RAGEngineReconciler) watchMachines() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, o client.Object) []reconcile.Request {
machineObj := o.(*v1alpha5.Machine)
name, ok := machineObj.Labels[kaitov1alpha1.LabelRAGEngineName]
if !ok {
return nil
}
namespace, ok := machineObj.Labels[kaitov1alpha1.LabelRAGEngineNamespace]
if !ok {
return nil
}
_, conditionFound := lo.Find(machineObj.GetConditions(), func(condition apis.Condition) bool {
return condition.Type == apis.ConditionReady &&
condition.Status == v1.ConditionTrue
})
if conditionFound && machineObj.DeletionTimestamp.IsZero() {
// No need to reconcile ragengine if the machine is in READY state unless machine is deleted.
return nil
}
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Name: name,
Namespace: namespace,
},
},
}
})
return builder.Complete(c)

Check warning on line 567 in pkg/ragengine/controllers/ragengine_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/ragengine/controllers/ragengine_controller.go#L567

Added line #L567 was not covered by tests
}

// watches for nodeClaim with labels indicating RAGEngine name.
Expand Down
Loading

0 comments on commit b571049

Please sign in to comment.