Skip to content

Commit

Permalink
Feat: Single cluster deployment application (#667)
Browse files Browse the repository at this point in the history
* Feat: Single cluster deployment application

Signed-off-by: yeyeye2333 <[email protected]>

* update generated

Signed-off-by: yeyeye2333 <[email protected]>

* Change Comment

Signed-off-by: yeyeye2333 <[email protected]>

---------

Signed-off-by: yeyeye2333 <[email protected]>
  • Loading branch information
yeyeye2333 authored Aug 27, 2024
1 parent 19220c1 commit 1ebcffd
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 63 deletions.
6 changes: 4 additions & 2 deletions docs/content/en/references/apps_v1alpha1_types.html
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ <h3 id="apps.kurator.dev/v1alpha1.Application">Application
<td>
<em>(Optional)</em>
<p>Destination defines the destination clusters where the artifacts will be synced.
It can be overridden by the syncPolicies&rsquo; destination.</p>
It can be overridden by the syncPolicies&rsquo; destination.
And if both the current field and syncPolicies&rsquo; destination are empty, the application will be deployed directly in the cluster where kurator resides.</p>
</td>
</tr>
</table>
Expand Down Expand Up @@ -346,7 +347,8 @@ <h3 id="apps.kurator.dev/v1alpha1.ApplicationSpec">ApplicationSpec
<td>
<em>(Optional)</em>
<p>Destination defines the destination clusters where the artifacts will be synced.
It can be overridden by the syncPolicies&rsquo; destination.</p>
It can be overridden by the syncPolicies&rsquo; destination.
And if both the current field and syncPolicies&rsquo; destination are empty, the application will be deployed directly in the cluster where kurator resides.</p>
</td>
</tr>
</tbody>
Expand Down
25 changes: 25 additions & 0 deletions examples/application/gitrepo-kustomization-demo-without-fleet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: apps.kurator.dev/v1alpha1
kind: Application
metadata:
name: rollout-demo
namespace: default
spec:
source:
gitRepository:
interval: 3m0s
ref:
branch: master
timeout: 1m0s
url: https://github.com/stefanprodan/podinfo
syncPolicies:
- kustomization:
interval: 0s
path: ./deploy/webapp
prune: true
timeout: 2m0s
- kustomization:
targetNamespace: default
interval: 5m0s
path: ./kustomize
prune: true
timeout: 2m0s
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ spec:
description: |-
Destination defines the destination clusters where the artifacts will be synced.
It can be overridden by the syncPolicies' destination.
And if both the current field and syncPolicies' destination are empty, the application will be deployed directly in the cluster where kurator resides.
properties:
clusterSelector:
description: |-
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/apps/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type ApplicationSpec struct {
SyncPolicies []*ApplicationSyncPolicy `json:"syncPolicies"`
// Destination defines the destination clusters where the artifacts will be synced.
// It can be overridden by the syncPolicies' destination.
// And if both the current field and syncPolicies' destination are empty, the application will be deployed directly in the cluster where kurator resides.
// +optional
Destination *ApplicationDestination `json:"destination,omitempty"`
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/fleet-manager/application/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,19 @@ func (a *ApplicationManager) Reconcile(ctx context.Context, req ctrl.Request) (_

// there only one fleet, so pre-fetch it here.
fleetKey := generateFleetKey(app)
fleet := &fleetapi.Fleet{}
// Retrieve fleet object based on the defined fleet key
if err := a.Client.Get(ctx, fleetKey, fleet); err != nil {
if apierrors.IsNotFound(err) {
log.Info("fleet does not exist", "fleet", fleetKey)
return ctrl.Result{RequeueAfter: fleetmanager.RequeueAfter}, nil
var fleet *fleetapi.Fleet
if fleetKey.Name != "" {
fleet = &fleetapi.Fleet{}
// Retrieve fleet object based on the defined fleet key
if err := a.Client.Get(ctx, fleetKey, fleet); err != nil {
if apierrors.IsNotFound(err) {
log.Info("fleet does not exist", "fleet", fleetKey)
return ctrl.Result{RequeueAfter: fleetmanager.RequeueAfter}, nil
}
// Log error and requeue request if error occurred during fleet retrieval
log.Error(err, "failed to find fleet", "fleet", fleetKey)
return ctrl.Result{}, err
}
// Log error and requeue request if error occurred during fleet retrieval
log.Error(err, "failed to find fleet", "fleet", fleetKey)
return ctrl.Result{}, err
}

// Handle deletion reconciliation loop.
Expand Down
39 changes: 25 additions & 14 deletions pkg/fleet-manager/application/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,26 @@ func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applic
log := ctrl.LoggerFrom(ctx)

policyKind := getSyncPolicyKind(syncPolicy)
destination := getPolicyDestination(app, syncPolicy)
if fleet != nil {
destination := getPolicyDestination(app, syncPolicy)

// fetch fleet cluster list that recorded in fleet and matches the destination's cluster selector
fleetClusterList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector)
if err != nil || result.RequeueAfter > 0 {
return result, err
}
// Iterate through all clusters, and create/update kustomization/helmRelease for each of them.
for _, currentFleetCluster := range fleetClusterList {
// fetch kubeconfig for each cluster.
kubeconfig := a.generateKubeConfig(currentFleetCluster)
// fetch fleet cluster list that recorded in fleet and matches the destination's cluster selector
fleetClusterList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector)
if err != nil || result.RequeueAfter > 0 {
return result, err
}
// Iterate through all clusters, and create/update kustomization/helmRelease for each of them.
for _, currentFleetCluster := range fleetClusterList {
// fetch kubeconfig for each cluster.
kubeconfig := a.generateKubeConfig(currentFleetCluster)

if result, err1 := a.handleSyncPolicyByKind(ctx, app, policyKind, syncPolicy, policyName, currentFleetCluster, kubeconfig); err1 != nil || result.RequeueAfter > 0 {
return result, errors.Wrapf(err1, "failed to handleSyncPolicyByKind currentFleetCluster=%s", currentFleetCluster.GetObject().GetName())
if result, err1 := a.handleSyncPolicyByKind(ctx, app, policyKind, syncPolicy, policyName, &currentFleetCluster, kubeconfig); err1 != nil || result.RequeueAfter > 0 {
return result, errors.Wrapf(err1, "failed to handleSyncPolicyByKind currentFleetCluster=%s", currentFleetCluster.GetObject().GetName())
}
}
} else {
if result, err1 := a.handleSyncPolicyByKind(ctx, app, policyKind, syncPolicy, policyName, nil, nil); err1 != nil || result.RequeueAfter > 0 {
return result, errors.Wrapf(err1, "failed to handleSyncPolicyByKind in currentCluster")
}
}

Expand Down Expand Up @@ -156,10 +162,15 @@ func (a *ApplicationManager) handleSyncPolicyByKind(
policyKind string,
syncPolicy *applicationapi.ApplicationSyncPolicy,
policyName string,
fleetCluster fleetmanager.ClusterInterface,
fleetCluster *fleetmanager.ClusterInterface,
kubeConfig *fluxmeta.KubeConfigReference,
) (ctrl.Result, error) {
policyResourceName := generatePolicyResourceName(policyName, fleetCluster.GetObject().GetObjectKind().GroupVersionKind().Kind, fleetCluster.GetObject().GetName())
var policyResourceName string
if kubeConfig != nil && fleetCluster != nil {
policyResourceName = generatePolicyResourceName(policyName, (*fleetCluster).GetObject().GetObjectKind().GroupVersionKind().Kind, (*fleetCluster).GetObject().GetName())
} else {
policyResourceName = generatePolicyResourceName(policyName, currentClusterKind, currentClusterName)
}
// handle kustomization
if policyKind == KustomizationKind {
kustomization := syncPolicy.Kustomization
Expand Down
43 changes: 29 additions & 14 deletions pkg/fleet-manager/application/rollout_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ const (

// StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated.
StatusSyncInterval = 30 * time.Second

currentClusterKind = "currentCluster"
currentClusterName = "host"
)

func (a *ApplicationManager) fetchRolloutClusters(ctx context.Context,
Expand All @@ -54,24 +57,36 @@ func (a *ApplicationManager) fetchRolloutClusters(ctx context.Context,
syncPolicy *applicationapi.ApplicationSyncPolicy,
) (map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, error) {
log := ctrl.LoggerFrom(ctx)
destination := getPolicyDestination(app, syncPolicy)
ClusterInterfaceList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector)
if err != nil || result.RequeueAfter > 0 {
return nil, err
}

fleetclusters := make(map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, len(ClusterInterfaceList))
for _, cluster := range ClusterInterfaceList {
kclient, err := fleetmanager.ClientForCluster(kubeClient, fleet.Namespace, cluster)
var fleetclusters map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster
if fleet == nil {
fleetclusters = make(map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, 1)
client, err := fleetmanager.WrapClient(a.Client)
if err != nil {
return nil, errors.Wrapf(err, "failed to wrap client")
}
fleetclusters[fleetmanager.ClusterKey{Kind: currentClusterKind, Name: currentClusterName}] = &fleetmanager.FleetCluster{
Client: client,
}
} else {
destination := getPolicyDestination(app, syncPolicy)
ClusterInterfaceList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector)
if err != nil || result.RequeueAfter > 0 {
return nil, err
}

kind := cluster.GetObject().GetObjectKind().GroupVersionKind().Kind
fleetclusters[fleetmanager.ClusterKey{Kind: kind, Name: cluster.GetObject().GetName()}] = &fleetmanager.FleetCluster{
Secret: cluster.GetSecretName(),
SecretKey: cluster.GetSecretKey(),
Client: kclient,
fleetclusters = make(map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, len(ClusterInterfaceList))
for _, cluster := range ClusterInterfaceList {
kclient, err := fleetmanager.ClientForCluster(kubeClient, fleet.Namespace, cluster)
if err != nil {
return nil, err
}

kind := cluster.GetObject().GetObjectKind().GroupVersionKind().Kind
fleetclusters[fleetmanager.ClusterKey{Kind: kind, Name: cluster.GetObject().GetName()}] = &fleetmanager.FleetCluster{
Secret: cluster.GetSecretName(),
SecretKey: cluster.GetSecretKey(),
Client: kclient,
}
}
}
log.Info("Successful to fetch destination clusters for Rollout")
Expand Down
8 changes: 8 additions & 0 deletions pkg/fleet-manager/fleet_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

clusterv1alpha1 "kurator.dev/kurator/pkg/apis/cluster/v1alpha1"
fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1"
Expand Down Expand Up @@ -111,6 +112,13 @@ func ClientForCluster(client client.Client, ns string, cluster ClusterInterface)

return kclient.NewClient(kclient.NewRESTClientGetter(rest))
}
func WrapClient(client client.Client) (*kclient.Client, error) {
rest, err := config.GetConfig()
if err != nil {
return nil, err
}
return kclient.NewClient(kclient.NewRESTClientGetter(rest))
}

func (cluster FleetCluster) GetRuntimeClient() client.Client {
return cluster.Client.CtrlRuntimeClient()
Expand Down
8 changes: 6 additions & 2 deletions pkg/webhooks/application_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ func validateFleet(in *v1alpha1.Application) field.ErrorList {
var (
firstPolicyFleet string
isFirst = true
isNil = false
)
for i, policy := range in.Spec.SyncPolicies {
// if individual policy fleet is not set, return err
if policy.Destination == nil || policy.Destination.Fleet == "" {
allErrs = append(allErrs, field.Required(field.NewPath("spec", "syncPolicies").Index(i).Child("destination", "fleet"), "must be set when application.spec.destination.fleet is not set"))
return allErrs
isNil = true
continue
}
if isFirst {
firstPolicyFleet = policy.Destination.Fleet
Expand All @@ -105,6 +106,9 @@ func validateFleet(in *v1alpha1.Application) field.ErrorList {
allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "syncPolicies").Index(i).Child("destination", "fleet"), policy.Destination.Fleet, fmt.Sprintf("must be same as firstPolicyFleet:%v, because fleet must be consistent throughout the application", firstPolicyFleet)))
}
}
if isNil && !isFirst {
allErrs = append(allErrs, field.Required(field.NewPath("spec", "syncPolicies").Child("destination", "fleet"), "must be set when application.spec.destination.fleet is not set"))
}
}

return allErrs
Expand Down
44 changes: 22 additions & 22 deletions pkg/webhooks/application_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,7 @@ import (
func TestValidApplicationValidation(t *testing.T) {
// read configuration from examples directory to test valid application configuration
r := path.Join("../../examples", "application")
caseNames := make([]string, 0)
err := filepath.WalkDir(r, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}

caseNames = append(caseNames, path)

return nil
})
assert.NoError(t, err)
caseNames := getCase(t, r)

wh := &ApplicationWebhook{}
for _, tt := range caseNames {
Expand All @@ -60,17 +50,7 @@ func TestValidApplicationValidation(t *testing.T) {

func TestInvalidApplicationValidation(t *testing.T) {
r := path.Join("testdata", "application")
caseNames := make([]string, 0)
err := filepath.WalkDir(r, func(path string, d fs.DirEntry, err error) error {
if d.IsDir() {
return nil
}

caseNames = append(caseNames, path)

return nil
})
assert.NoError(t, err)
caseNames := getCase(t, r)

wh := &ApplicationWebhook{}
for _, tt := range caseNames {
Expand All @@ -86,6 +66,26 @@ func TestInvalidApplicationValidation(t *testing.T) {
}
}

func getCase(t *testing.T, r string) []string {
caseNames := make([]string, 0)
err := filepath.WalkDir(r, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
if path == r {
return nil
} else {
return filepath.SkipDir
}
}
caseNames = append(caseNames, path)
return nil
})
assert.NoError(t, err)
return caseNames
}

func readApplication(filename string) (*v1alpha1.Application, error) {
b, err := os.ReadFile(filename)
if err != nil {
Expand Down

0 comments on commit 1ebcffd

Please sign in to comment.