From 81b9d5dd25b89fe9484de6dc0837e6ec9b968951 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Tue, 2 Jan 2024 19:50:51 +0800 Subject: [PATCH 1/6] complete distribute rollout configurations to clusters Signed-off-by: LiZhenCheng9527 --- pkg/fleet-manager/application/helper.go | 16 +- .../application/rollout_helper.go | 439 ++++++++++++++++++ .../application/rollout_helper_test.go | 424 +++++++++++++++++ .../manifests/plugins/testloader-deploy.yaml | 61 +++ .../manifests/plugins/testloader-svc.yaml | 15 + 5 files changed, 954 insertions(+), 1 deletion(-) create mode 100644 pkg/fleet-manager/application/rollout_helper.go create mode 100644 pkg/fleet-manager/application/rollout_helper_test.go create mode 100644 pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml create mode 100644 pkg/fleet-manager/manifests/plugins/testloader-svc.yaml diff --git a/pkg/fleet-manager/application/helper.go b/pkg/fleet-manager/application/helper.go index 41574cb73..2711da41e 100644 --- a/pkg/fleet-manager/application/helper.go +++ b/pkg/fleet-manager/application/helper.go @@ -41,8 +41,9 @@ import ( // syncPolicyResource synchronizes the sync policy resources for a given application. func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet, syncPolicy *applicationapi.ApplicationSyncPolicy, policyName string) (ctrl.Result, error) { - policyKind := getSyncPolicyKind(syncPolicy) + log := ctrl.LoggerFrom(ctx) + policyKind := getSyncPolicyKind(syncPolicy) destination := getPolicyDestination(app, syncPolicy) // fetch fleet cluster list that recorded in fleet and matches the destination's cluster selector @@ -60,6 +61,19 @@ func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applic } } + // after finish instaill application, start handle rollout policy + rolloutCluster, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy) + if err != nil { + log.Error(err, "failed to fetch destination clusters for syncPolicy") + return ctrl.Result{}, err + } + + if syncPolicy.Rollout != nil { + if result, err := a.syncRolloutPolicyForCluster(ctx, syncPolicy.Rollout, rolloutCluster, policyName); err != nil { + return result, errors.Wrapf(err, "failed to handleSyncPolicyByKind currentFleetCluster") + } + } + return ctrl.Result{}, nil } diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go new file mode 100644 index 000000000..d6086ca84 --- /dev/null +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -0,0 +1,439 @@ +/* +Copyright Kurator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "context" + "fmt" + "io/fs" + "time" + + flaggerv1b1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/yaml" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" + fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1" + fleetmanager "kurator.dev/kurator/pkg/fleet-manager" + "kurator.dev/kurator/pkg/fleet-manager/manifests" + plugin "kurator.dev/kurator/pkg/fleet-manager/plugin" +) + +const ( + // kurator rollout labels + RolloutLabel = "kurator.dev/rollout" + + // StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated. + StatusSyncInterval = 30 * time.Second +) + +func (a *ApplicationManager) fetchRolloutClusters(ctx context.Context, + app *applicationapi.Application, + kubeClient client.Client, + fleet *fleetapi.Fleet, + syncPolicy *applicationapi.ApplicationSyncPolicy, +) (map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, error) { + log := ctrl.LoggerFrom(ctx) + destination := getPolicyDestination(app, syncPolicy) + ClusterInterfaceList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector) + if err != nil || result.RequeueAfter > 0 { + return nil, err + } + + fleetclusters := make(map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, len(ClusterInterfaceList)) + for _, cluster := range ClusterInterfaceList { + kclient, err := fleetmanager.ClientForCluster(kubeClient, fleet.Namespace, cluster) + if err != nil { + return nil, err + } + + kind := cluster.GetObject().GetObjectKind().GroupVersionKind().Kind + fleetclusters[fleetmanager.ClusterKey{Kind: kind, Name: cluster.GetObject().GetName()}] = &fleetmanager.FleetCluster{ + Secret: cluster.GetSecretName(), + SecretKey: cluster.GetSecretKey(), + Client: kclient, + } + } + log.Info("Successful to fetch destination clusters for Rollout") + return fleetclusters, nil +} + +func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, + rolloutPolicy *applicationapi.RolloutConfig, + destinationClusters map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, + policyName string, +) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + label := map[string]string{ + RolloutLabel: policyName, + } + + namespaceName := types.NamespacedName{ + Namespace: rolloutPolicy.Workload.Namespace, + Name: rolloutPolicy.ServiceName, + } + + testloaderNamespaceName := types.NamespacedName{ + Namespace: rolloutPolicy.Workload.Namespace, + Name: rolloutPolicy.Workload.Name + "-testloader", + } + + for clusterKey, fleetCluster := range destinationClusters { + newClient := fleetCluster.Client.CtrlRuntimeClient() + + // if trafficRoutingProvider is istio, find workload namespace with Istio sidecar injection enabled. + if rolloutPolicy.TrafficRoutingProvider == "istio" { + err := namespaceSidecarInject(ctx, fleetCluster, namespaceName) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to set namespace %s istio-injection enable", namespaceName.Namespace) + } + } + + // if delete private testloader when rollout polity has changed + if rolloutPolicy.TestLoader == nil || !*rolloutPolicy.TestLoader { + testloaderDeploy := &appsv1.Deployment{} + if err := deleteResourceCreateByKurator(ctx, testloaderNamespaceName, newClient, testloaderDeploy); err != nil { + return ctrl.Result{}, err + } + testloaderSvc := &corev1.Service{} + if err := deleteResourceCreateByKurator(ctx, testloaderNamespaceName, newClient, testloaderSvc); err != nil { + return ctrl.Result{}, err + } + } + + // Installation of private testloader if needed + if rolloutPolicy.TestLoader != nil && *rolloutPolicy.TestLoader { + if result, err := installPrivateTestloader(ctx, namespaceName, *fleetCluster, label); err != nil { + return result, fmt.Errorf("failed to install private testloader for workload: %w", err) + } + } + + // Get the configuration of the workload's service and generate a canaryService. + service := &corev1.Service{} + if err := newClient.Get(ctx, namespaceName, service); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{RequeueAfter: StatusSyncInterval}, errors.Wrapf(err, "not found service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) + } + return ctrl.Result{}, errors.Wrapf(err, "failed to get service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) + } + + canaryInCluster := &flaggerv1b1.Canary{} + getErr := newClient.Get(ctx, namespaceName, canaryInCluster) + canaryInCluster = renderCanary(*rolloutPolicy, canaryInCluster) + if canaryService, err := renderCanaryService(*rolloutPolicy, service); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed rander canary configuration") + } else { + canaryInCluster.Spec.Service = *canaryService + } + canaryInCluster.Spec.Analysis = renderCanaryAnalysis(*rolloutPolicy, clusterKey.Name) + // Set up labels to make sure it's a resource created by kurator + canaryInCluster.SetLabels(label) + + if getErr != nil { + if apierrors.IsNotFound(getErr) { + if err := newClient.Create(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create rolloutPolicy: %v", err) + } + } + return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", namespaceName.Name, namespaceName.Namespace) + } + if err := newClient.Update(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update rolloutPolicy: %v", err) + } + + log.Info("sync rolloutPolicy for cluster successful") + } + return ctrl.Result{}, nil +} + +func namespaceSidecarInject(ctx context.Context, cluster *fleetmanager.FleetCluster, namespacedName types.NamespacedName) error { + ns := &corev1.Namespace{} + client := cluster.Client.CtrlRuntimeClient() + namespacedName.Name = namespacedName.Namespace + if err := client.Get(ctx, namespacedName, ns); err != nil { + // if no found, create a namespace + if apierrors.IsNotFound(err) { + ns.SetName(namespacedName.Namespace) + ns.SetLabels(map[string]string{ + "istio-injection": "enabled", + }) + if createErr := client.Create(ctx, ns); createErr != nil { + return errors.Wrapf(createErr, "failed to create namespace %s", namespacedName.Namespace) + } + } + // if found namespace, set labels and update. + ns.SetLabels(map[string]string{ + "istio-injection": "enabled", + }) + if updateErr := client.Update(ctx, ns); updateErr != nil { + return errors.Wrapf(updateErr, "failed to update namespace %s", namespacedName.Namespace) + } + } + return nil +} + +func installPrivateTestloader(ctx context.Context, + namespacedName types.NamespacedName, + fleetCluster fleetmanager.FleetCluster, + labels map[string]string, +) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + clusterClient := fleetCluster.Client.CtrlRuntimeClient() + + // Creating a private testload deployment from a configuration file + filepath := manifests.BuiltinOrDir("") + deployname := "plugins/testloader-deploy.yaml" + deploy, err1 := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace) + if err1 != nil { + return ctrl.Result{}, fmt.Errorf("failed get testloader deployment configuration: %v", err1) + } + // Set up labels to make sure it's a resource created by kurator. + deploy.SetLabels(labels) + + if err := clusterClient.Create(ctx, deploy); err != nil { + if apierrors.IsAlreadyExists(err) { + if updateErr := clusterClient.Update(ctx, deploy); updateErr != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to update private testloader deployment") + } + return ctrl.Result{}, nil + } + return ctrl.Result{}, errors.Wrapf(err, "failed to create private testloader deployment") + } + + // Creating a private testload service from a configuration file + svcName := "plugins/testloader-svc.yaml" + svc, err2 := generateSvcConfig(filepath, svcName, namespacedName.Name, namespacedName.Namespace) + if err2 != nil { + return ctrl.Result{}, fmt.Errorf("failed get testloader service configuration: %v", err2) + } + // Set up labels to make sure it's a resource created by kurator. + svc.SetLabels(labels) + + if err := clusterClient.Create(ctx, svc); err != nil { + if apierrors.IsAlreadyExists(err) { + if updateErr := clusterClient.Update(ctx, deploy); updateErr != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to update private testloader service") + } + } + return ctrl.Result{}, errors.Wrapf(err, "failed to create private testloader service") + } + + log.Info("Create private workload successful") + return ctrl.Result{}, nil +} + +func generateDeployConfig(fsys fs.FS, fileName, name, namespace string) (*appsv1.Deployment, error) { + file, err := fs.ReadFile(fsys, fileName) + if err != nil { + return nil, fmt.Errorf("failed to open telstloader deployment configuration: %v", err) + } + + deploy := appsv1.Deployment{} + if err := yaml.Unmarshal(file, &deploy); err != nil { + return nil, err + } + + deploy.SetName(name + "-testloader") + deploy.SetNamespace(namespace) + deploy.SetLabels(map[string]string{ + "app": name + "-testloader", + }) + // let svc's selector to select private testloader pod + deploy.Spec.Selector.MatchLabels = map[string]string{ + "app": name + "-testloader", + } + deploy.Spec.Template.ObjectMeta.Labels = map[string]string{ + "app": name + "-testloader", + } + + return &deploy, nil +} + +func generateSvcConfig(fsys fs.FS, fileName string, name, namespace string) (*corev1.Service, error) { + file, err1 := fs.ReadFile(fsys, fileName) + if err1 != nil { + return nil, fmt.Errorf("failed to open telstloader service configuration: %v", err1) + } + + svc := corev1.Service{} + if err := yaml.Unmarshal(file, &svc); err != nil { + return nil, err + } + + svc.SetName(name + "-testloader") + svc.SetNamespace(namespace) + svc.SetLabels(map[string]string{ + "app": name + "-testloader", + }) + // let svc's selector to select private testloader pod + svc.Spec.Selector = map[string]string{ + "app": name + "-testloader", + } + + return &svc, nil +} + +// create/update canary configuration +func renderCanary(rolloutPolicy applicationapi.RolloutConfig, canaryInCluster *flaggerv1b1.Canary) *flaggerv1b1.Canary { + value := int32(*rolloutPolicy.RolloutPolicy.RolloutTimeoutSeconds) + ptrValue := &value + + canaryInCluster.ObjectMeta.Namespace = rolloutPolicy.Workload.Namespace + canaryInCluster.ObjectMeta.Name = rolloutPolicy.Workload.Name + canaryInCluster.TypeMeta.Kind = "Canary" + canaryInCluster.TypeMeta.APIVersion = "flagger.app/v1beta1" + canaryInCluster.Spec = flaggerv1b1.CanarySpec{ + Provider: rolloutPolicy.TrafficRoutingProvider, + TargetRef: flaggerv1b1.LocalObjectReference{ + APIVersion: rolloutPolicy.Workload.APIVersion, + Kind: rolloutPolicy.Workload.Kind, + Name: rolloutPolicy.Workload.Name, + }, + ProgressDeadlineSeconds: ptrValue, + SkipAnalysis: rolloutPolicy.RolloutPolicy.SkipTrafficAnalysis, + RevertOnDeletion: rolloutPolicy.RolloutPolicy.RevertOnDeletion, + Suspend: rolloutPolicy.RolloutPolicy.Suspend, + } + + return canaryInCluster +} + +func renderCanaryService(rolloutPolicy applicationapi.RolloutConfig, service *corev1.Service) (*flaggerv1b1.CanaryService, error) { + if service == nil { + return nil, fmt.Errorf("service is nil, build canaryService configuration failed") + } + ports := service.Spec.Ports + canaryService := &flaggerv1b1.CanaryService{ + Name: rolloutPolicy.ServiceName, + Port: rolloutPolicy.Port, + Gateways: rolloutPolicy.RolloutPolicy.TrafficRouting.Gateways, + Hosts: rolloutPolicy.RolloutPolicy.TrafficRouting.Hosts, + Retries: rolloutPolicy.RolloutPolicy.TrafficRouting.Retries, + Headers: rolloutPolicy.RolloutPolicy.TrafficRouting.Headers, + CorsPolicy: rolloutPolicy.RolloutPolicy.TrafficRouting.CorsPolicy, + Primary: (*flaggerv1b1.CustomMetadata)(rolloutPolicy.Primary), + Canary: (*flaggerv1b1.CustomMetadata)(rolloutPolicy.Preview), + } + + Timeout := fmt.Sprintf("%d", rolloutPolicy.RolloutPolicy.TrafficRouting.TimeoutSeconds) + "s" + canaryService.Timeout = Timeout + + for _, port := range ports { + if port.Port == rolloutPolicy.Port { + canaryService.TargetPort = port.TargetPort + break + } + } + + return canaryService, nil +} + +func renderCanaryAnalysis(rolloutPolicy applicationapi.RolloutConfig, clusterName string) *flaggerv1b1.CanaryAnalysis { + canaryAnalysis := flaggerv1b1.CanaryAnalysis{ + Iterations: rolloutPolicy.RolloutPolicy.TrafficRouting.AnalysisTimes, + MaxWeight: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.MaxWeight, + StepWeight: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeight, + StepWeights: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeights, + StepWeightPromotion: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeightPromotion, + Threshold: *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckFailedTimes, + Match: rolloutPolicy.RolloutPolicy.TrafficRouting.Match, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + } + + CheckInterval := fmt.Sprintf("%d", *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckIntervalSeconds) + "s" + canaryAnalysis.Interval = CheckInterval + + canaryMetric := []flaggerv1b1.CanaryMetric{} + for _, metric := range rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics { + metricInterval := fmt.Sprintf("%d", *metric.IntervalSeconds) + "s" + templateMetric := flaggerv1b1.CanaryMetric{ + Name: string(metric.Name), + Interval: metricInterval, + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(metric.ThresholdRange), + } + canaryMetric = append(canaryMetric, templateMetric) + } + canaryAnalysis.Metrics = canaryMetric + + // Trigger testloader to request service before analysis by webhook. + webhookTemplate := flaggerv1b1.CanaryWebhook{ + Name: "generated-testload", + Timeout: "60s", + } + + if len(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.Commands) != 0 { + var url string + // if have private webhook, webhook url is private testloader url + // else is public testloader url + if rolloutPolicy.TestLoader != nil && *rolloutPolicy.TestLoader { + name := rolloutPolicy.ServiceName + "-testloader" + namespace := rolloutPolicy.Workload.Namespace + url = generateWebhookUrl(name, namespace) + } else if namespace, exist := plugin.ProviderNamespace[fleetapi.Provider(rolloutPolicy.TrafficRoutingProvider)]; exist { + name := namespace + "-testloader-" + clusterName + "-loadtester" + url = generateWebhookUrl(name, namespace) + } + webhookTemplate.URL = url + + timeout := fmt.Sprintf("%d", *rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.TimeoutSeconds) + "s" + webhookTemplate.Timeout = timeout + + canaryWebhook := []flaggerv1b1.CanaryWebhook{} + bakName := webhookTemplate.Name + for index, command := range rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.Commands { + metadata := map[string]string{ + "type": "cmd", + "cmd": command, + } + webhookTemplate.Metadata = &metadata + webhookTemplate.Name = bakName + "-" + fmt.Sprintf("%d", index) + canaryWebhook = append(canaryWebhook, webhookTemplate) + } + + canaryAnalysis.Webhooks = canaryWebhook + } + return &canaryAnalysis +} + +func generateWebhookUrl(name, namespace string) string { + url := "http://" + name + "." + namespace + "/" + return url +} + +func deleteResourceCreateByKurator(ctx context.Context, namespaceName types.NamespacedName, kubeClient client.Client, obj client.Object) error { + if err := kubeClient.Get(ctx, namespaceName, obj); err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrapf(err, "get kubernetes resource error") + } + } else { + // verify if the deployment were created by kurator + labels := obj.GetLabels() + if _, exist := labels[RolloutLabel]; exist { + if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil { + return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") + } + } + } + return nil +} diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go new file mode 100644 index 000000000..88b85ee59 --- /dev/null +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -0,0 +1,424 @@ +/* +Copyright Kurator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "fmt" + "reflect" + "testing" + + flaggerv1b1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/fluxcd/flagger/pkg/apis/istio/common/v1alpha1" + istiov1alpha3 "github.com/fluxcd/flagger/pkg/apis/istio/v1alpha3" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" + "kurator.dev/kurator/pkg/fleet-manager/manifests" +) + +func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.RolloutConfig { + timeout := 50 + min := 99.0 + max := 500.0 + + rolloutPolicy := applicationapi.RolloutConfig{ + TestLoader: installPrivateTestloader, + TrafficRoutingProvider: "istio", + Workload: &applicationapi.CrossNamespaceObjectReference{ + APIVersion: "appv1/deployment", + Kind: "Deployment", + Name: "podinfo", + Namespace: "test", + }, + ServiceName: "podinfo-service", + Port: 80, + RolloutPolicy: &applicationapi.RolloutPolicy{ + TrafficRouting: &applicationapi.TrafficRoutingConfig{ + TimeoutSeconds: 50, + Gateways: []string{ + "istio-system/public-gateway", + }, + Hosts: []string{ + "app.example.com", + }, + Retries: &istiov1alpha3.HTTPRetry{ + Attempts: 10, + PerTryTimeout: "40s", + RetryOn: "gateway-error, connect-failure, refused-stream", + }, + Headers: &istiov1alpha3.Headers{ + Request: &istiov1alpha3.HeaderOperations{ + Add: map[string]string{ + "x-some-header": "value", + }, + }, + }, + CorsPolicy: &istiov1alpha3.CorsPolicy{ + AllowOrigin: []string{"example"}, + AllowMethods: []string{"GET"}, + AllowCredentials: false, + AllowHeaders: []string{"x-some-header"}, + MaxAge: "24h", + }, + CanaryStrategy: &applicationapi.CanaryConfig{ + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + }, + AnalysisTimes: 5, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + }, + TrafficAnalysis: &applicationapi.TrafficAnalysis{ + CheckIntervalSeconds: &timeout, + CheckFailedTimes: &timeout, + Metrics: []applicationapi.Metric{ + { + Name: "request-success-rate", + IntervalSeconds: &timeout, + ThresholdRange: &applicationapi.CanaryThresholdRange{ + Min: &min, + }, + }, + { + Name: "request-duration", + IntervalSeconds: &timeout, + ThresholdRange: &applicationapi.CanaryThresholdRange{ + Max: &max, + }, + }, + }, + Webhooks: applicationapi.Webhook{ + TimeoutSeconds: &timeout, + Commands: []string{ + "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + SessionAffinity: &applicationapi.SessionAffinity{ + CookieName: "User", + MaxAge: 24, + }, + }, + RolloutTimeoutSeconds: &timeout, + SkipTrafficAnalysis: false, + RevertOnDeletion: false, + Suspend: false, + }, + } + return rolloutPolicy +} + +func Test_renderCanary(t *testing.T) { + int32Time := int32(50) + sign := true + type args struct { + rolloutPolicy applicationapi.RolloutConfig + } + tests := []struct { + name string + args args + want *flaggerv1b1.Canary + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: generateRolloutPloicy(&sign), + }, + want: &flaggerv1b1.Canary{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "podinfo", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Canary", + APIVersion: "flagger.app/v1beta1", + }, + Spec: flaggerv1b1.CanarySpec{ + Provider: "istio", + TargetRef: flaggerv1b1.LocalObjectReference{ + APIVersion: "appv1/deployment", + Kind: "Deployment", + Name: "podinfo", + }, + ProgressDeadlineSeconds: &int32Time, + SkipAnalysis: false, + RevertOnDeletion: false, + Suspend: false, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := renderCanary(tt.args.rolloutPolicy, &flaggerv1b1.Canary{}); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanary() = %v\n, want %v", got, tt.want) + } + }) + } +} + +func Test_renderCanaryService(t *testing.T) { + sign := true + rolloutPolicy := generateRolloutPloicy(&sign) + type args struct { + rolloutPolicy applicationapi.RolloutConfig + service *corev1.Service + } + tests := []struct { + name string + args args + want *flaggerv1b1.CanaryService + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: rolloutPolicy, + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "podinfo-service", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "podinfo", + }, + Ports: []corev1.ServicePort{ + { + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + }, + }, + want: &flaggerv1b1.CanaryService{ + Name: "podinfo-service", + Port: 80, + Timeout: "50s", + TargetPort: intstr.FromInt(8080), + Gateways: []string{ + "istio-system/public-gateway", + }, + Hosts: []string{ + "app.example.com", + }, + Retries: rolloutPolicy.RolloutPolicy.TrafficRouting.Retries, + Headers: rolloutPolicy.RolloutPolicy.TrafficRouting.Headers, + CorsPolicy: rolloutPolicy.RolloutPolicy.TrafficRouting.CorsPolicy, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, _ := renderCanaryService(tt.args.rolloutPolicy, tt.args.service); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanaryService() = %v\n, want %v", got, tt.want) + } + }) + } +} + +func Test_renderCanaryAnalysis(t *testing.T) { + sign := true + wantFalse := false + timeout := 50 + rolloutPolicy := generateRolloutPloicy(&sign) + wantPublicTestloaderRolloutPolicy := generateRolloutPloicy(&wantFalse) + type args struct { + rolloutPolicy applicationapi.RolloutConfig + } + tests := []struct { + name string + args args + want *flaggerv1b1.CanaryAnalysis + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: rolloutPolicy, + }, + want: &flaggerv1b1.CanaryAnalysis{ + Interval: "50s", + Iterations: 5, + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + Threshold: timeout, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + Metrics: []flaggerv1b1.CanaryMetric{ + { + Name: "request-success-rate", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[0].ThresholdRange), + }, + { + Name: "request-duration", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[1].ThresholdRange), + }, + }, + Webhooks: []flaggerv1b1.CanaryWebhook{ + { + Name: "generated-testload-0", + Timeout: "50s", + URL: "http://podinfo-service-testloader.test/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + }, + }, + { + Name: "generated-testload-1", + Timeout: "50s", + URL: "http://podinfo-service-testloader.test/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + }, + }, + }, + { + name: "public Testloader", + args: args{ + rolloutPolicy: wantPublicTestloaderRolloutPolicy, + }, + want: &flaggerv1b1.CanaryAnalysis{ + Interval: "50s", + Iterations: 5, + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + Threshold: timeout, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + Metrics: []flaggerv1b1.CanaryMetric{ + { + Name: "request-success-rate", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[0].ThresholdRange), + }, + { + Name: "request-duration", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[1].ThresholdRange), + }, + }, + Webhooks: []flaggerv1b1.CanaryWebhook{ + { + Name: "generated-testload-0", + Timeout: "50s", + URL: "http://istio-system-testloader-kurator-member-loadtester.istio-system/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + }, + }, + { + Name: "generated-testload-1", + Timeout: "50s", + URL: "http://istio-system-testloader-kurator-member-loadtester.istio-system/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := renderCanaryAnalysis(tt.args.rolloutPolicy, "kurator-member"); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanaryAnalysis() = %v\n, want %v", got, tt.want) + } + }) + } +} + +func Test_generateDeployConfig(t *testing.T) { + filepath := manifests.BuiltinOrDir("") + //fmt.Printf("%s", filepath) + deployname := "plugins/testloader-deploy.yaml" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "podinfo", + } + if _, err := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace); err != nil { + fmt.Printf("failed get testloader deployment configuration: %v", err) + } +} + +func Test_generateSvcConfig(t *testing.T) { + filepath := manifests.BuiltinOrDir("") + //fmt.Printf("%s", filepath) + svcname := "plugins/testloader-svc.yaml" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "podinfo", + } + if _, err := generateSvcConfig(filepath, svcname, namespacedName.Name, namespacedName.Namespace); err != nil { + fmt.Printf("failed get testloader deployment configuration: %v", err) + } +} diff --git a/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml b/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml new file mode 100644 index 000000000..8e9822d7c --- /dev/null +++ b/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: flagger-loadtester + labels: + app: flagger-loadtester +spec: + selector: + matchLabels: + app: flagger-loadtester + template: + metadata: + labels: + app: flagger-loadtester + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8080" + openservicemesh.io/inbound-port-exclusion-list: "80, 8080" + spec: + containers: + - name: loadtester + image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + command: + - ./loadtester + - -port=8080 + - -log-level=info + - -timeout=1h + livenessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + readinessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + resources: + limits: + memory: "512Mi" + cpu: "1000m" + requests: + memory: "32Mi" + cpu: "10m" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 10001 diff --git a/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml b/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml new file mode 100644 index 000000000..772b20afe --- /dev/null +++ b/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: flagger-loadtester + labels: + app: flagger-loadtester +spec: + type: ClusterIP + selector: + app: flagger-loadtester + ports: + - name: http + port: 80 + protocol: TCP + targetPort: http \ No newline at end of file From a35a472e7755cde64e1722b5160a0ae484b4fe48 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 3 Jan 2024 09:33:27 +0800 Subject: [PATCH 2/6] handle empty pointer panic when generate Analysis config Signed-off-by: LiZhenCheng9527 --- .../application/rollout_helper.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go index d6086ca84..29ff7b4c1 100644 --- a/pkg/fleet-manager/application/rollout_helper.go +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -256,6 +256,7 @@ func generateDeployConfig(fsys fs.FS, fileName, name, namespace string) (*appsv1 deploy.SetName(name + "-testloader") deploy.SetNamespace(namespace) + // Set up labels to make sure it's a resource created by kurator. deploy.SetLabels(map[string]string{ "app": name + "-testloader", }) @@ -283,6 +284,7 @@ func generateSvcConfig(fsys fs.FS, fileName string, name, namespace string) (*co svc.SetName(name + "-testloader") svc.SetNamespace(namespace) + // Set up labels to make sure it's a resource created by kurator. svc.SetLabels(map[string]string{ "app": name + "-testloader", }) @@ -351,14 +353,17 @@ func renderCanaryService(rolloutPolicy applicationapi.RolloutConfig, service *co func renderCanaryAnalysis(rolloutPolicy applicationapi.RolloutConfig, clusterName string) *flaggerv1b1.CanaryAnalysis { canaryAnalysis := flaggerv1b1.CanaryAnalysis{ - Iterations: rolloutPolicy.RolloutPolicy.TrafficRouting.AnalysisTimes, - MaxWeight: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.MaxWeight, - StepWeight: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeight, - StepWeights: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeights, - StepWeightPromotion: rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeightPromotion, - Threshold: *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckFailedTimes, - Match: rolloutPolicy.RolloutPolicy.TrafficRouting.Match, - SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + Iterations: rolloutPolicy.RolloutPolicy.TrafficRouting.AnalysisTimes, + Threshold: *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckFailedTimes, + Match: rolloutPolicy.RolloutPolicy.TrafficRouting.Match, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + } + + if rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy != nil { + canaryAnalysis.MaxWeight = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.MaxWeight + canaryAnalysis.StepWeight = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeight + canaryAnalysis.StepWeights = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeights + canaryAnalysis.StepWeightPromotion = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeightPromotion } CheckInterval := fmt.Sprintf("%d", *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckIntervalSeconds) + "s" From 86c1e93deb0b63e662e1cf952a2e586b2126ccd4 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Sat, 6 Jan 2024 18:38:24 +0800 Subject: [PATCH 3/6] fix by comments Signed-off-by: LiZhenCheng9527 --- pkg/fleet-manager/application/helper.go | 16 +-- .../application/rollout_helper.go | 117 +++++++++++------- .../application/rollout_helper_test.go | 86 ++++++++++++- .../manifests/plugins/testloader-svc.yaml | 2 +- 4 files changed, 167 insertions(+), 54 deletions(-) diff --git a/pkg/fleet-manager/application/helper.go b/pkg/fleet-manager/application/helper.go index 2711da41e..9e5374731 100644 --- a/pkg/fleet-manager/application/helper.go +++ b/pkg/fleet-manager/application/helper.go @@ -61,15 +61,15 @@ func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applic } } - // after finish instaill application, start handle rollout policy - rolloutCluster, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy) - if err != nil { - log.Error(err, "failed to fetch destination clusters for syncPolicy") - return ctrl.Result{}, err - } - if syncPolicy.Rollout != nil { - if result, err := a.syncRolloutPolicyForCluster(ctx, syncPolicy.Rollout, rolloutCluster, policyName); err != nil { + // after finish application install, start handling rollout policy + rolloutClusters, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy) + if err != nil { + log.Error(err, "failed to fetch destination clusters for rollout") + return ctrl.Result{}, err + } + + if result, err := a.syncRolloutPolicyForCluster(ctx, syncPolicy.Rollout, rolloutClusters, policyName); err != nil { return result, errors.Wrapf(err, "failed to handleSyncPolicyByKind currentFleetCluster") } } diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go index 29ff7b4c1..c2cd36076 100644 --- a/pkg/fleet-manager/application/rollout_helper.go +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -41,8 +41,12 @@ import ( const ( // kurator rollout labels - RolloutLabel = "kurator.dev/rollout" + RolloutLabel = "kurator.dev/rollout" + sidecarInject = "istio-injection" + // testloader configuration path + testloaderDeployPath = "plugins/testloader-deploy.yaml" + testloaderSvcPath = "plugins/testloader-svc.yaml" // StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated. StatusSyncInterval = 30 * time.Second ) @@ -85,11 +89,11 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) - label := map[string]string{ + annotations := map[string]string{ RolloutLabel: policyName, } - namespaceName := types.NamespacedName{ + serviceNamespaceName := types.NamespacedName{ Namespace: rolloutPolicy.Workload.Namespace, Name: rolloutPolicy.ServiceName, } @@ -104,9 +108,9 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, // if trafficRoutingProvider is istio, find workload namespace with Istio sidecar injection enabled. if rolloutPolicy.TrafficRoutingProvider == "istio" { - err := namespaceSidecarInject(ctx, fleetCluster, namespaceName) + err := namespaceSidecarInject(ctx, newClient, rolloutPolicy.Workload.Namespace) if err != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to set namespace %s istio-injection enable", namespaceName.Namespace) + return ctrl.Result{}, errors.Wrapf(err, "failed to set namespace %s istio-injection enable", rolloutPolicy.Workload.Namespace) } } @@ -124,14 +128,14 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, // Installation of private testloader if needed if rolloutPolicy.TestLoader != nil && *rolloutPolicy.TestLoader { - if result, err := installPrivateTestloader(ctx, namespaceName, *fleetCluster, label); err != nil { + if result, err := installPrivateTestloader(ctx, testloaderNamespaceName, *fleetCluster, annotations); err != nil { return result, fmt.Errorf("failed to install private testloader for workload: %w", err) } } // Get the configuration of the workload's service and generate a canaryService. service := &corev1.Service{} - if err := newClient.Get(ctx, namespaceName, service); err != nil { + if err := newClient.Get(ctx, serviceNamespaceName, service); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{RequeueAfter: StatusSyncInterval}, errors.Wrapf(err, "not found service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) } @@ -139,7 +143,7 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, } canaryInCluster := &flaggerv1b1.Canary{} - getErr := newClient.Get(ctx, namespaceName, canaryInCluster) + getErr := newClient.Get(ctx, serviceNamespaceName, canaryInCluster) canaryInCluster = renderCanary(*rolloutPolicy, canaryInCluster) if canaryService, err := renderCanaryService(*rolloutPolicy, service); err != nil { return ctrl.Result{}, errors.Wrapf(err, "failed rander canary configuration") @@ -147,8 +151,8 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, canaryInCluster.Spec.Service = *canaryService } canaryInCluster.Spec.Analysis = renderCanaryAnalysis(*rolloutPolicy, clusterKey.Name) - // Set up labels to make sure it's a resource created by kurator - canaryInCluster.SetLabels(label) + // Set up annotations to make sure it's a resource created by kurator + canaryInCluster.SetAnnotations(annotations) if getErr != nil { if apierrors.IsNotFound(getErr) { @@ -156,7 +160,7 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, return ctrl.Result{}, fmt.Errorf("failed to create rolloutPolicy: %v", err) } } - return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", namespaceName.Name, namespaceName.Namespace) + return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", serviceNamespaceName.Name, serviceNamespaceName.Namespace) } if err := newClient.Update(ctx, canaryInCluster); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update rolloutPolicy: %v", err) @@ -167,49 +171,48 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, return ctrl.Result{}, nil } -func namespaceSidecarInject(ctx context.Context, cluster *fleetmanager.FleetCluster, namespacedName types.NamespacedName) error { +func namespaceSidecarInject(ctx context.Context, kubeClient client.Client, namespace string) error { + log := ctrl.LoggerFrom(ctx) + ns := &corev1.Namespace{} - client := cluster.Client.CtrlRuntimeClient() - namespacedName.Name = namespacedName.Namespace - if err := client.Get(ctx, namespacedName, ns); err != nil { + namespacedName := types.NamespacedName{ + Namespace: namespace, + Name: namespace, + } + if err := kubeClient.Get(ctx, namespacedName, ns); err != nil { // if no found, create a namespace if apierrors.IsNotFound(err) { - ns.SetName(namespacedName.Namespace) - ns.SetLabels(map[string]string{ - "istio-injection": "enabled", - }) - if createErr := client.Create(ctx, ns); createErr != nil { + ns.SetName(namespace) + ns := addLablesOrAnnotaions(ns, "labels", sidecarInject, "enabled") + if createErr := kubeClient.Create(ctx, ns); createErr != nil { return errors.Wrapf(createErr, "failed to create namespace %s", namespacedName.Namespace) } } - // if found namespace, set labels and update. - ns.SetLabels(map[string]string{ - "istio-injection": "enabled", - }) - if updateErr := client.Update(ctx, ns); updateErr != nil { + ns := addLablesOrAnnotaions(ns, "labels", sidecarInject, "enabled") + if updateErr := kubeClient.Update(ctx, ns); updateErr != nil { return errors.Wrapf(updateErr, "failed to update namespace %s", namespacedName.Namespace) } } + log.Info("Inject sidecar successful") return nil } func installPrivateTestloader(ctx context.Context, namespacedName types.NamespacedName, fleetCluster fleetmanager.FleetCluster, - labels map[string]string, + annotations map[string]string, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) clusterClient := fleetCluster.Client.CtrlRuntimeClient() // Creating a private testload deployment from a configuration file filepath := manifests.BuiltinOrDir("") - deployname := "plugins/testloader-deploy.yaml" - deploy, err1 := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace) + deploy, err1 := generateDeployConfig(filepath, testloaderDeployPath, namespacedName.Name, namespacedName.Namespace) if err1 != nil { return ctrl.Result{}, fmt.Errorf("failed get testloader deployment configuration: %v", err1) } - // Set up labels to make sure it's a resource created by kurator. - deploy.SetLabels(labels) + // Set up annotations to make sure it's a resource created by kurator. + deploy.SetAnnotations(annotations) if err := clusterClient.Create(ctx, deploy); err != nil { if apierrors.IsAlreadyExists(err) { @@ -222,17 +225,16 @@ func installPrivateTestloader(ctx context.Context, } // Creating a private testload service from a configuration file - svcName := "plugins/testloader-svc.yaml" - svc, err2 := generateSvcConfig(filepath, svcName, namespacedName.Name, namespacedName.Namespace) + svc, err2 := generateSvcConfig(filepath, testloaderSvcPath, namespacedName.Name, namespacedName.Namespace) if err2 != nil { return ctrl.Result{}, fmt.Errorf("failed get testloader service configuration: %v", err2) } - // Set up labels to make sure it's a resource created by kurator. - svc.SetLabels(labels) + // Set up annotations to make sure it's a resource created by kurator. + svc.SetAnnotations(annotations) if err := clusterClient.Create(ctx, svc); err != nil { if apierrors.IsAlreadyExists(err) { - if updateErr := clusterClient.Update(ctx, deploy); updateErr != nil { + if updateErr := clusterClient.Update(ctx, svc); updateErr != nil { return ctrl.Result{}, errors.Wrapf(err, "failed to update private testloader service") } } @@ -254,18 +256,18 @@ func generateDeployConfig(fsys fs.FS, fileName, name, namespace string) (*appsv1 return nil, err } - deploy.SetName(name + "-testloader") + deploy.SetName(name) deploy.SetNamespace(namespace) // Set up labels to make sure it's a resource created by kurator. deploy.SetLabels(map[string]string{ - "app": name + "-testloader", + "app": name, }) // let svc's selector to select private testloader pod deploy.Spec.Selector.MatchLabels = map[string]string{ - "app": name + "-testloader", + "app": name, } deploy.Spec.Template.ObjectMeta.Labels = map[string]string{ - "app": name + "-testloader", + "app": name, } return &deploy, nil @@ -282,15 +284,15 @@ func generateSvcConfig(fsys fs.FS, fileName string, name, namespace string) (*co return nil, err } - svc.SetName(name + "-testloader") + svc.SetName(name) svc.SetNamespace(namespace) // Set up labels to make sure it's a resource created by kurator. svc.SetLabels(map[string]string{ - "app": name + "-testloader", + "app": name, }) // let svc's selector to select private testloader pod svc.Spec.Selector = map[string]string{ - "app": name + "-testloader", + "app": name, } return &svc, nil @@ -433,8 +435,8 @@ func deleteResourceCreateByKurator(ctx context.Context, namespaceName types.Name } } else { // verify if the deployment were created by kurator - labels := obj.GetLabels() - if _, exist := labels[RolloutLabel]; exist { + annotations := obj.GetAnnotations() + if _, exist := annotations[RolloutLabel]; exist { if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil { return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") } @@ -442,3 +444,30 @@ func deleteResourceCreateByKurator(ctx context.Context, namespaceName types.Name } return nil } + +func addLablesOrAnnotaions(obj client.Object, labelsOrAnnotaions, key, value string) client.Object { + switch labelsOrAnnotaions { + case "labels": + labels := obj.GetLabels() + if labels == nil { + obj.SetLabels(map[string]string{ + key: value, + }) + return obj + } + labels[key] = value + obj.SetLabels(labels) + case "annotations": + annotations := obj.GetAnnotations() + if annotations == nil { + obj.SetAnnotations(map[string]string{ + key: value, + }) + return obj + } + annotations[key] = value + obj.SetAnnotations(annotations) + } + + return obj +} diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go index 88b85ee59..265cb6837 100644 --- a/pkg/fleet-manager/application/rollout_helper_test.go +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -28,9 +28,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" "kurator.dev/kurator/pkg/fleet-manager/manifests" + "sigs.k8s.io/controller-runtime/pkg/client" ) func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.RolloutConfig { @@ -422,3 +422,87 @@ func Test_generateSvcConfig(t *testing.T) { fmt.Printf("failed get testloader deployment configuration: %v", err) } } + +func Test_addLablesOrAnnotaions(t *testing.T) { + type args struct { + obj client.Object + labelsOrAnnotaions string + key string + value string + } + tests := []struct { + name string + args args + want client.Object + }{ + { + name: "function test", + args: args{ + obj: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "xxx": "abc", + }, + }, + }, + labelsOrAnnotaions: "labels", + key: "istio-injection", + value: "ebabled", + }, + want: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "xxx": "abc", + "istio-injection": "ebabled", + }, + }, + }, + }, + { + name: "empty labels test", + args: args{ + obj: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + }, + }, + labelsOrAnnotaions: "labels", + key: "XXX", + value: "abc", + }, + want: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "XXX": "abc", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := addLablesOrAnnotaions(tt.args.obj, tt.args.labelsOrAnnotaions, tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { + t.Errorf("addLablesOrAnnotaions() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml b/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml index 772b20afe..0093b319f 100644 --- a/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml +++ b/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml @@ -12,4 +12,4 @@ spec: - name: http port: 80 protocol: TCP - targetPort: http \ No newline at end of file + targetPort: http From 1f18314918f1234a743fb0be5356b478c476bb36 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Mon, 8 Jan 2024 21:40:10 +0800 Subject: [PATCH 4/6] change testloader configuration file from yaml to go and fix by comment Signed-off-by: LiZhenCheng9527 --- .../charts/fleet-manager/templates/rbac.yaml | 1 + .../manifests/rollout_testloader.go | 166 ++++++++++++ .../manifests/rollout_testloader_test.go | 61 +++++ .../manifests/testdata/testloader-deploy.yaml | 64 +++++ .../manifests/testdata/testloader-svc.yaml | 18 ++ .../application/rollout_helper.go | 244 ++++++------------ .../application/rollout_helper_test.go | 29 +-- 7 files changed, 399 insertions(+), 184 deletions(-) create mode 100644 pkg/fleet-manager/application/manifests/rollout_testloader.go create mode 100644 pkg/fleet-manager/application/manifests/rollout_testloader_test.go create mode 100644 pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml create mode 100644 pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml diff --git a/manifests/charts/fleet-manager/templates/rbac.yaml b/manifests/charts/fleet-manager/templates/rbac.yaml index 50abbf8a3..3636921ac 100644 --- a/manifests/charts/fleet-manager/templates/rbac.yaml +++ b/manifests/charts/fleet-manager/templates/rbac.yaml @@ -162,6 +162,7 @@ rules: - create - delete - list + - update - apiGroups: - "source.toolkit.fluxcd.io" resources: diff --git a/pkg/fleet-manager/application/manifests/rollout_testloader.go b/pkg/fleet-manager/application/manifests/rollout_testloader.go new file mode 100644 index 000000000..1576c3cb4 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/rollout_testloader.go @@ -0,0 +1,166 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package render + +import ( + "bytes" + "text/template" + + "github.com/Masterminds/sprig/v3" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/yaml" +) + +type TestloaderConfig struct { + Name string + Namespace string + LabelKey string + LabelValue string + AnnotationKey string + AnnotationValue string +} + +const testloaderTemplateName = "testloader" + +// RenderTestloaderConfig takes generates YAML byte array configuration representing the testloader configuration. +func RenderTestloaderConfig(constTemplateName string, namespacedName types.NamespacedName, annotationKey, annotationsValue string) ([]byte, error) { + cfg := TestloaderConfig{ + Name: namespacedName.Name, + Namespace: namespacedName.Namespace, + LabelKey: "app", + LabelValue: namespacedName.Name, + AnnotationKey: annotationKey, + AnnotationValue: annotationsValue, + } + + return renderTestloaderTemplateConfig(constTemplateName, cfg) +} + +// renderTestloaderTemplateConfig reads, parses, and renders a template file using the provided configuration data. +func renderTestloaderTemplateConfig(constTemplateName string, cfg TestloaderConfig) ([]byte, error) { + tql, err := template.New(testloaderTemplateName).Funcs(funMap()).Parse(constTemplateName) + if err != nil { + return nil, err + } + + var b bytes.Buffer + if err := tql.Execute(&b, cfg); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +// funMap returns a map of functions for use in the template. +func funMap() template.FuncMap { + m := sprig.TxtFuncMap() + m["toYaml"] = toYaml + return m +} + +// toYaml converts a given value to its YAML representation. +func toYaml(value interface{}) string { + y, err := yaml.Marshal(value) + if err != nil { + return "" + } + + return string(y) +} + +const TestlaoderDeployment = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.Name}} + namespace: {{.Namespace}} + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + {{.AnnotationKey}}: {{.AnnotationValue}} +spec: + selector: + matchLabels: + {{.LabelKey}}: {{.LabelValue}} + template: + metadata: + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8080" + openservicemesh.io/inbound-port-exclusion-list: "80, 8080" + spec: + containers: + - name: loadtester + image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + command: + - ./loadtester + - -port=8080 + - -log-level=info + - -timeout=1h + livenessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + readinessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + resources: + limits: + memory: "512Mi" + cpu: "1000m" + requests: + memory: "32Mi" + cpu: "10m" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 10001 +` + +const TestlaoderService = `apiVersion: v1 +kind: Service +metadata: + name: {{.Name}} + namespace: {{.Namespace}} + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + {{.AnnotationKey}}: {{.AnnotationValue}} +spec: + type: ClusterIP + selector: + {{.LabelKey}}: {{.LabelValue}} + ports: + - name: http + port: 80 + protocol: TCP + targetPort: http +` diff --git a/pkg/fleet-manager/application/manifests/rollout_testloader_test.go b/pkg/fleet-manager/application/manifests/rollout_testloader_test.go new file mode 100644 index 000000000..2e47ac238 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/rollout_testloader_test.go @@ -0,0 +1,61 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package render + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" +) + +func TestRenderTestloaderConfig(t *testing.T) { + expectedTestDataFile := "testdata/" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "testloader", + } + annotationKey := "kurator.dev/rollout" + annotationValue := "policy" + cases := []struct { + name string + constTemplateName string + expectFileName string + }{ + { + name: "testloader deploy template", + constTemplateName: TestlaoderDeployment, + expectFileName: "testloader-deploy.yaml", + }, + { + name: "testloader svc template", + constTemplateName: TestlaoderService, + expectFileName: "testloader-svc.yaml", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result, err := RenderTestloaderConfig(tc.constTemplateName, namespacedName, annotationKey, annotationValue) + if err != nil { + assert.Error(t, err) + } else { + expected, err := os.ReadFile(expectedTestDataFile + tc.expectFileName) + assert.NoError(t, err) + assert.Equal(t, result, expected) + } + }) + } +} diff --git a/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml b/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml new file mode 100644 index 000000000..19fe59092 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml @@ -0,0 +1,64 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: testloader + namespace: test + labels: + app: testloader + annotations: + kurator.dev/rollout: policy +spec: + selector: + matchLabels: + app: testloader + template: + metadata: + labels: + app: testloader + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8080" + openservicemesh.io/inbound-port-exclusion-list: "80, 8080" + spec: + containers: + - name: loadtester + image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + command: + - ./loadtester + - -port=8080 + - -log-level=info + - -timeout=1h + livenessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + readinessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + resources: + limits: + memory: "512Mi" + cpu: "1000m" + requests: + memory: "32Mi" + cpu: "10m" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 10001 diff --git a/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml b/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml new file mode 100644 index 000000000..ebfb35de3 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: testloader + namespace: test + labels: + app: testloader + annotations: + kurator.dev/rollout: policy +spec: + type: ClusterIP + selector: + app: testloader + ports: + - name: http + port: 80 + protocol: TCP + targetPort: http diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go index c2cd36076..7fb00733f 100644 --- a/pkg/fleet-manager/application/rollout_helper.go +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -19,7 +19,6 @@ package application import ( "context" "fmt" - "io/fs" "time" flaggerv1b1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" @@ -28,25 +27,22 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/yaml" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1" fleetmanager "kurator.dev/kurator/pkg/fleet-manager" - "kurator.dev/kurator/pkg/fleet-manager/manifests" + render "kurator.dev/kurator/pkg/fleet-manager/application/manifests" plugin "kurator.dev/kurator/pkg/fleet-manager/plugin" + "kurator.dev/kurator/pkg/infra/util" ) const ( // kurator rollout labels - RolloutLabel = "kurator.dev/rollout" - sidecarInject = "istio-injection" + RolloutIdentifier = "kurator.dev/rollout" + sidecarInject = "istio-injection" - // testloader configuration path - testloaderDeployPath = "plugins/testloader-deploy.yaml" - testloaderSvcPath = "plugins/testloader-svc.yaml" // StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated. StatusSyncInterval = 30 * time.Second ) @@ -89,10 +85,6 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) - annotations := map[string]string{ - RolloutLabel: policyName, - } - serviceNamespaceName := types.NamespacedName{ Namespace: rolloutPolicy.Workload.Namespace, Name: rolloutPolicy.ServiceName, @@ -103,12 +95,16 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, Name: rolloutPolicy.Workload.Name + "-testloader", } + annotation := map[string]string{ + RolloutIdentifier: policyName, + } + for clusterKey, fleetCluster := range destinationClusters { - newClient := fleetCluster.Client.CtrlRuntimeClient() + fleetClusterClient := fleetCluster.Client.CtrlRuntimeClient() - // if trafficRoutingProvider is istio, find workload namespace with Istio sidecar injection enabled. + // If the trafficRoutingProvider is Istio, add the sidecar injection label to the workload's namespace. if rolloutPolicy.TrafficRoutingProvider == "istio" { - err := namespaceSidecarInject(ctx, newClient, rolloutPolicy.Workload.Namespace) + err := enableIstioSidecarInjection(ctx, fleetClusterClient, rolloutPolicy.Workload.Namespace) if err != nil { return ctrl.Result{}, errors.Wrapf(err, "failed to set namespace %s istio-injection enable", rolloutPolicy.Workload.Namespace) } @@ -117,25 +113,23 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, // if delete private testloader when rollout polity has changed if rolloutPolicy.TestLoader == nil || !*rolloutPolicy.TestLoader { testloaderDeploy := &appsv1.Deployment{} - if err := deleteResourceCreateByKurator(ctx, testloaderNamespaceName, newClient, testloaderDeploy); err != nil { + if err := deleteResourceCreatedByKurator(ctx, testloaderNamespaceName, fleetClusterClient, testloaderDeploy); err != nil { return ctrl.Result{}, err } testloaderSvc := &corev1.Service{} - if err := deleteResourceCreateByKurator(ctx, testloaderNamespaceName, newClient, testloaderSvc); err != nil { + if err := deleteResourceCreatedByKurator(ctx, testloaderNamespaceName, fleetClusterClient, testloaderSvc); err != nil { return ctrl.Result{}, err } - } - - // Installation of private testloader if needed - if rolloutPolicy.TestLoader != nil && *rolloutPolicy.TestLoader { - if result, err := installPrivateTestloader(ctx, testloaderNamespaceName, *fleetCluster, annotations); err != nil { - return result, fmt.Errorf("failed to install private testloader for workload: %w", err) + } else { + // Installation of private testloader if needed + if err := installPrivateTestloader(ctx, testloaderNamespaceName, RolloutIdentifier, policyName); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to install private testloader for workload: %w", err) } } // Get the configuration of the workload's service and generate a canaryService. service := &corev1.Service{} - if err := newClient.Get(ctx, serviceNamespaceName, service); err != nil { + if err := fleetClusterClient.Get(ctx, serviceNamespaceName, service); err != nil { if apierrors.IsNotFound(err) { return ctrl.Result{RequeueAfter: StatusSyncInterval}, errors.Wrapf(err, "not found service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) } @@ -143,7 +137,11 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, } canaryInCluster := &flaggerv1b1.Canary{} - getErr := newClient.Get(ctx, serviceNamespaceName, canaryInCluster) + getErr := fleetClusterClient.Get(ctx, serviceNamespaceName, canaryInCluster) + if getErr != nil && !apierrors.IsNotFound(getErr) { + return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", serviceNamespaceName.Name, serviceNamespaceName.Namespace) + } + canaryInCluster = renderCanary(*rolloutPolicy, canaryInCluster) if canaryService, err := renderCanaryService(*rolloutPolicy, service); err != nil { return ctrl.Result{}, errors.Wrapf(err, "failed rander canary configuration") @@ -152,26 +150,24 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, } canaryInCluster.Spec.Analysis = renderCanaryAnalysis(*rolloutPolicy, clusterKey.Name) // Set up annotations to make sure it's a resource created by kurator - canaryInCluster.SetAnnotations(annotations) + canaryInCluster.SetAnnotations(annotation) - if getErr != nil { - if apierrors.IsNotFound(getErr) { - if err := newClient.Create(ctx, canaryInCluster); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to create rolloutPolicy: %v", err) - } + if apierrors.IsNotFound(getErr) { + if err := fleetClusterClient.Create(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create rolloutPolicy: %v", err) + } + } else { + if err := fleetClusterClient.Update(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update rolloutPolicy: %v", err) } - return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", serviceNamespaceName.Name, serviceNamespaceName.Namespace) - } - if err := newClient.Update(ctx, canaryInCluster); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to update rolloutPolicy: %v", err) } - log.Info("sync rolloutPolicy for cluster successful") + log.Info("sync rolloutPolicy successful") } return ctrl.Result{}, nil } -func namespaceSidecarInject(ctx context.Context, kubeClient client.Client, namespace string) error { +func enableIstioSidecarInjection(ctx context.Context, kubeClient client.Client, namespace string) error { log := ctrl.LoggerFrom(ctx) ns := &corev1.Namespace{} @@ -183,12 +179,14 @@ func namespaceSidecarInject(ctx context.Context, kubeClient client.Client, names // if no found, create a namespace if apierrors.IsNotFound(err) { ns.SetName(namespace) - ns := addLablesOrAnnotaions(ns, "labels", sidecarInject, "enabled") + ns.SetLabels(map[string]string{ + sidecarInject: "enabled", + }) if createErr := kubeClient.Create(ctx, ns); createErr != nil { return errors.Wrapf(createErr, "failed to create namespace %s", namespacedName.Namespace) } } - ns := addLablesOrAnnotaions(ns, "labels", sidecarInject, "enabled") + ns := addLabels(ns, sidecarInject, "enabled") if updateErr := kubeClient.Update(ctx, ns); updateErr != nil { return errors.Wrapf(updateErr, "failed to update namespace %s", namespacedName.Namespace) } @@ -197,105 +195,45 @@ func namespaceSidecarInject(ctx context.Context, kubeClient client.Client, names return nil } -func installPrivateTestloader(ctx context.Context, - namespacedName types.NamespacedName, - fleetCluster fleetmanager.FleetCluster, - annotations map[string]string, -) (ctrl.Result, error) { +func installPrivateTestloader(ctx context.Context, namespacedName types.NamespacedName, annotationKey, annotationValue string) error { log := ctrl.LoggerFrom(ctx) - clusterClient := fleetCluster.Client.CtrlRuntimeClient() - - // Creating a private testload deployment from a configuration file - filepath := manifests.BuiltinOrDir("") - deploy, err1 := generateDeployConfig(filepath, testloaderDeployPath, namespacedName.Name, namespacedName.Namespace) - if err1 != nil { - return ctrl.Result{}, fmt.Errorf("failed get testloader deployment configuration: %v", err1) - } - // Set up annotations to make sure it's a resource created by kurator. - deploy.SetAnnotations(annotations) - - if err := clusterClient.Create(ctx, deploy); err != nil { - if apierrors.IsAlreadyExists(err) { - if updateErr := clusterClient.Update(ctx, deploy); updateErr != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to update private testloader deployment") - } - return ctrl.Result{}, nil - } - return ctrl.Result{}, errors.Wrapf(err, "failed to create private testloader deployment") + // apply testloader deployment resource + testloaderDeploy, deployErr := render.RenderTestloaderConfig(render.TestlaoderDeployment, namespacedName, annotationKey, annotationValue) + if deployErr != nil { + return deployErr } - - // Creating a private testload service from a configuration file - svc, err2 := generateSvcConfig(filepath, testloaderSvcPath, namespacedName.Name, namespacedName.Namespace) - if err2 != nil { - return ctrl.Result{}, fmt.Errorf("failed get testloader service configuration: %v", err2) + if _, err := util.PatchResources(testloaderDeploy); err != nil { + return err } - // Set up annotations to make sure it's a resource created by kurator. - svc.SetAnnotations(annotations) - if err := clusterClient.Create(ctx, svc); err != nil { - if apierrors.IsAlreadyExists(err) { - if updateErr := clusterClient.Update(ctx, svc); updateErr != nil { - return ctrl.Result{}, errors.Wrapf(err, "failed to update private testloader service") - } - } - return ctrl.Result{}, errors.Wrapf(err, "failed to create private testloader service") + // apply testloader service resource + testloaderSvc, svcErr := render.RenderTestloaderConfig(render.TestlaoderService, namespacedName, annotationKey, annotationValue) + if svcErr != nil { + return svcErr } - - log.Info("Create private workload successful") - return ctrl.Result{}, nil -} - -func generateDeployConfig(fsys fs.FS, fileName, name, namespace string) (*appsv1.Deployment, error) { - file, err := fs.ReadFile(fsys, fileName) - if err != nil { - return nil, fmt.Errorf("failed to open telstloader deployment configuration: %v", err) + if _, err := util.PatchResources(testloaderSvc); err != nil { + return err } - deploy := appsv1.Deployment{} - if err := yaml.Unmarshal(file, &deploy); err != nil { - return nil, err - } - - deploy.SetName(name) - deploy.SetNamespace(namespace) - // Set up labels to make sure it's a resource created by kurator. - deploy.SetLabels(map[string]string{ - "app": name, - }) - // let svc's selector to select private testloader pod - deploy.Spec.Selector.MatchLabels = map[string]string{ - "app": name, - } - deploy.Spec.Template.ObjectMeta.Labels = map[string]string{ - "app": name, - } - - return &deploy, nil + log.Info("install testloader successful") + return nil } -func generateSvcConfig(fsys fs.FS, fileName string, name, namespace string) (*corev1.Service, error) { - file, err1 := fs.ReadFile(fsys, fileName) - if err1 != nil { - return nil, fmt.Errorf("failed to open telstloader service configuration: %v", err1) - } - - svc := corev1.Service{} - if err := yaml.Unmarshal(file, &svc); err != nil { - return nil, err - } - - svc.SetName(name) - svc.SetNamespace(namespace) - // Set up labels to make sure it's a resource created by kurator. - svc.SetLabels(map[string]string{ - "app": name, - }) - // let svc's selector to select private testloader pod - svc.Spec.Selector = map[string]string{ - "app": name, +func deleteResourceCreatedByKurator(ctx context.Context, namespaceName types.NamespacedName, kubeClient client.Client, obj client.Object) error { + if err := kubeClient.Get(ctx, namespaceName, obj); err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrapf(err, "get kubernetes resource error") + } + } else { + // verify if the deployment were created by kurator + annotations := obj.GetAnnotations() + if _, exist := annotations[RolloutIdentifier]; exist { + if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil { + return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") + } + } } - - return &svc, nil + return nil } // create/update canary configuration @@ -428,46 +366,16 @@ func generateWebhookUrl(name, namespace string) string { return url } -func deleteResourceCreateByKurator(ctx context.Context, namespaceName types.NamespacedName, kubeClient client.Client, obj client.Object) error { - if err := kubeClient.Get(ctx, namespaceName, obj); err != nil { - if !apierrors.IsNotFound(err) { - return errors.Wrapf(err, "get kubernetes resource error") - } - } else { - // verify if the deployment were created by kurator - annotations := obj.GetAnnotations() - if _, exist := annotations[RolloutLabel]; exist { - if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil { - return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") - } - } - } - return nil -} - -func addLablesOrAnnotaions(obj client.Object, labelsOrAnnotaions, key, value string) client.Object { - switch labelsOrAnnotaions { - case "labels": - labels := obj.GetLabels() - if labels == nil { - obj.SetLabels(map[string]string{ - key: value, - }) - return obj - } - labels[key] = value - obj.SetLabels(labels) - case "annotations": - annotations := obj.GetAnnotations() - if annotations == nil { - obj.SetAnnotations(map[string]string{ - key: value, - }) - return obj - } - annotations[key] = value - obj.SetAnnotations(annotations) +func addLabels(obj client.Object, key, value string) client.Object { + labels := obj.GetLabels() + // prevent nil pointer panic + if labels == nil { + obj.SetLabels(map[string]string{ + key: value, + }) + return obj } - + labels[key] = value + obj.SetLabels(labels) return obj } diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go index 265cb6837..40d621e6f 100644 --- a/pkg/fleet-manager/application/rollout_helper_test.go +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -17,7 +17,6 @@ limitations under the License. package application import ( - "fmt" "reflect" "testing" @@ -26,11 +25,10 @@ import ( istiov1alpha3 "github.com/fluxcd/flagger/pkg/apis/istio/v1alpha3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" - "kurator.dev/kurator/pkg/fleet-manager/manifests" "sigs.k8s.io/controller-runtime/pkg/client" + + applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" ) func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.RolloutConfig { @@ -397,6 +395,7 @@ func Test_renderCanaryAnalysis(t *testing.T) { } } +/* func Test_generateDeployConfig(t *testing.T) { filepath := manifests.BuiltinOrDir("") //fmt.Printf("%s", filepath) @@ -422,13 +421,13 @@ func Test_generateSvcConfig(t *testing.T) { fmt.Printf("failed get testloader deployment configuration: %v", err) } } +*/ -func Test_addLablesOrAnnotaions(t *testing.T) { +func Test_addLables(t *testing.T) { type args struct { - obj client.Object - labelsOrAnnotaions string - key string - value string + obj client.Object + key string + value string } tests := []struct { name string @@ -450,9 +449,8 @@ func Test_addLablesOrAnnotaions(t *testing.T) { }, }, }, - labelsOrAnnotaions: "labels", - key: "istio-injection", - value: "ebabled", + key: "istio-injection", + value: "ebabled", }, want: &corev1.Namespace{ TypeMeta: metav1.TypeMeta{ @@ -480,9 +478,8 @@ func Test_addLablesOrAnnotaions(t *testing.T) { Name: "webapp", }, }, - labelsOrAnnotaions: "labels", - key: "XXX", - value: "abc", + key: "XXX", + value: "abc", }, want: &corev1.Namespace{ TypeMeta: metav1.TypeMeta{ @@ -500,7 +497,7 @@ func Test_addLablesOrAnnotaions(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := addLablesOrAnnotaions(tt.args.obj, tt.args.labelsOrAnnotaions, tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { + if got := addLabels(tt.args.obj, tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { t.Errorf("addLablesOrAnnotaions() = %v, want %v", got, tt.want) } }) From 41406369f59a6bd6e7eb6cf4ff47f3ce7171cbee Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Tue, 9 Jan 2024 11:21:26 +0800 Subject: [PATCH 5/6] delete testlaoder deployment and service configuration file in manifests/plugin Signed-off-by: LiZhenCheng9527 --- .../manifests/plugins/testloader-deploy.yaml | 61 ------------------- .../manifests/plugins/testloader-svc.yaml | 15 ----- 2 files changed, 76 deletions(-) delete mode 100644 pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml delete mode 100644 pkg/fleet-manager/manifests/plugins/testloader-svc.yaml diff --git a/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml b/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml deleted file mode 100644 index 8e9822d7c..000000000 --- a/pkg/fleet-manager/manifests/plugins/testloader-deploy.yaml +++ /dev/null @@ -1,61 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: flagger-loadtester - labels: - app: flagger-loadtester -spec: - selector: - matchLabels: - app: flagger-loadtester - template: - metadata: - labels: - app: flagger-loadtester - annotations: - prometheus.io/scrape: "true" - prometheus.io/port: "8080" - openservicemesh.io/inbound-port-exclusion-list: "80, 8080" - spec: - containers: - - name: loadtester - image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 - imagePullPolicy: IfNotPresent - ports: - - name: http - containerPort: 8080 - command: - - ./loadtester - - -port=8080 - - -log-level=info - - -timeout=1h - livenessProbe: - exec: - command: - - wget - - --quiet - - --tries=1 - - --timeout=4 - - --spider - - http://localhost:8080/healthz - timeoutSeconds: 5 - readinessProbe: - exec: - command: - - wget - - --quiet - - --tries=1 - - --timeout=4 - - --spider - - http://localhost:8080/healthz - timeoutSeconds: 5 - resources: - limits: - memory: "512Mi" - cpu: "1000m" - requests: - memory: "32Mi" - cpu: "10m" - securityContext: - readOnlyRootFilesystem: true - runAsUser: 10001 diff --git a/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml b/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml deleted file mode 100644 index 0093b319f..000000000 --- a/pkg/fleet-manager/manifests/plugins/testloader-svc.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: flagger-loadtester - labels: - app: flagger-loadtester -spec: - type: ClusterIP - selector: - app: flagger-loadtester - ports: - - name: http - port: 80 - protocol: TCP - targetPort: http From 6ad585464a64efa377a8dcfe8b0783da3b0bcd07 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Tue, 9 Jan 2024 19:08:44 +0800 Subject: [PATCH 6/6] fix by comment Signed-off-by: LiZhenCheng9527 --- pkg/fleet-manager/application/helper.go | 2 +- .../application/manifests/rollout_testloader.go | 17 +---------------- pkg/fleet-manager/application/rollout_helper.go | 9 +++------ .../application/rollout_helper_test.go | 3 ++- 4 files changed, 7 insertions(+), 24 deletions(-) diff --git a/pkg/fleet-manager/application/helper.go b/pkg/fleet-manager/application/helper.go index 9e5374731..a2d427b30 100644 --- a/pkg/fleet-manager/application/helper.go +++ b/pkg/fleet-manager/application/helper.go @@ -70,7 +70,7 @@ func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applic } if result, err := a.syncRolloutPolicyForCluster(ctx, syncPolicy.Rollout, rolloutClusters, policyName); err != nil { - return result, errors.Wrapf(err, "failed to handleSyncPolicyByKind currentFleetCluster") + return result, errors.Wrapf(err, "failed to syncRolloutPolicy") } } diff --git a/pkg/fleet-manager/application/manifests/rollout_testloader.go b/pkg/fleet-manager/application/manifests/rollout_testloader.go index 1576c3cb4..8ca019681 100644 --- a/pkg/fleet-manager/application/manifests/rollout_testloader.go +++ b/pkg/fleet-manager/application/manifests/rollout_testloader.go @@ -17,9 +17,7 @@ import ( "bytes" "text/template" - "github.com/Masterminds/sprig/v3" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/yaml" ) type TestloaderConfig struct { @@ -62,21 +60,8 @@ func renderTestloaderTemplateConfig(constTemplateName string, cfg TestloaderConf return b.Bytes(), nil } -// funMap returns a map of functions for use in the template. func funMap() template.FuncMap { - m := sprig.TxtFuncMap() - m["toYaml"] = toYaml - return m -} - -// toYaml converts a given value to its YAML representation. -func toYaml(value interface{}) string { - y, err := yaml.Marshal(value) - if err != nil { - return "" - } - - return string(y) + return nil } const TestlaoderDeployment = `apiVersion: apps/v1 diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go index 7fb00733f..c66d8a2f0 100644 --- a/pkg/fleet-manager/application/rollout_helper.go +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -222,13 +222,13 @@ func installPrivateTestloader(ctx context.Context, namespacedName types.Namespac func deleteResourceCreatedByKurator(ctx context.Context, namespaceName types.NamespacedName, kubeClient client.Client, obj client.Object) error { if err := kubeClient.Get(ctx, namespaceName, obj); err != nil { if !apierrors.IsNotFound(err) { - return errors.Wrapf(err, "get kubernetes resource error") + return errors.Wrapf(err, "falied to get resource %s in %s", namespaceName.Name, namespaceName.Namespace) } } else { // verify if the deployment were created by kurator annotations := obj.GetAnnotations() if _, exist := annotations[RolloutIdentifier]; exist { - if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil { + if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil && !apierrors.IsNotFound(deleteErr) { return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") } } @@ -238,9 +238,6 @@ func deleteResourceCreatedByKurator(ctx context.Context, namespaceName types.Nam // create/update canary configuration func renderCanary(rolloutPolicy applicationapi.RolloutConfig, canaryInCluster *flaggerv1b1.Canary) *flaggerv1b1.Canary { - value := int32(*rolloutPolicy.RolloutPolicy.RolloutTimeoutSeconds) - ptrValue := &value - canaryInCluster.ObjectMeta.Namespace = rolloutPolicy.Workload.Namespace canaryInCluster.ObjectMeta.Name = rolloutPolicy.Workload.Name canaryInCluster.TypeMeta.Kind = "Canary" @@ -252,7 +249,7 @@ func renderCanary(rolloutPolicy applicationapi.RolloutConfig, canaryInCluster *f Kind: rolloutPolicy.Workload.Kind, Name: rolloutPolicy.Workload.Name, }, - ProgressDeadlineSeconds: ptrValue, + ProgressDeadlineSeconds: rolloutPolicy.RolloutPolicy.RolloutTimeoutSeconds, SkipAnalysis: rolloutPolicy.RolloutPolicy.SkipTrafficAnalysis, RevertOnDeletion: rolloutPolicy.RolloutPolicy.RevertOnDeletion, Suspend: rolloutPolicy.RolloutPolicy.Suspend, diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go index 40d621e6f..4bf90e1f5 100644 --- a/pkg/fleet-manager/application/rollout_helper_test.go +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -33,6 +33,7 @@ import ( func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.RolloutConfig { timeout := 50 + RolloutTimeoutSeconds := int32(50) min := 99.0 max := 500.0 @@ -128,7 +129,7 @@ func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.Rollou MaxAge: 24, }, }, - RolloutTimeoutSeconds: &timeout, + RolloutTimeoutSeconds: &RolloutTimeoutSeconds, SkipTrafficAnalysis: false, RevertOnDeletion: false, Suspend: false,