diff --git a/manifests/charts/fleet-manager/templates/rbac.yaml b/manifests/charts/fleet-manager/templates/rbac.yaml index 50abbf8a3..3636921ac 100644 --- a/manifests/charts/fleet-manager/templates/rbac.yaml +++ b/manifests/charts/fleet-manager/templates/rbac.yaml @@ -162,6 +162,7 @@ rules: - create - delete - list + - update - apiGroups: - "source.toolkit.fluxcd.io" resources: diff --git a/pkg/fleet-manager/application/helper.go b/pkg/fleet-manager/application/helper.go index 41574cb73..a2d427b30 100644 --- a/pkg/fleet-manager/application/helper.go +++ b/pkg/fleet-manager/application/helper.go @@ -41,8 +41,9 @@ import ( // syncPolicyResource synchronizes the sync policy resources for a given application. func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet, syncPolicy *applicationapi.ApplicationSyncPolicy, policyName string) (ctrl.Result, error) { - policyKind := getSyncPolicyKind(syncPolicy) + log := ctrl.LoggerFrom(ctx) + policyKind := getSyncPolicyKind(syncPolicy) destination := getPolicyDestination(app, syncPolicy) // fetch fleet cluster list that recorded in fleet and matches the destination's cluster selector @@ -60,6 +61,19 @@ func (a *ApplicationManager) syncPolicyResource(ctx context.Context, app *applic } } + if syncPolicy.Rollout != nil { + // after finish application install, start handling rollout policy + rolloutClusters, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy) + if err != nil { + log.Error(err, "failed to fetch destination clusters for rollout") + return ctrl.Result{}, err + } + + if result, err := a.syncRolloutPolicyForCluster(ctx, syncPolicy.Rollout, rolloutClusters, policyName); err != nil { + return result, errors.Wrapf(err, "failed to syncRolloutPolicy") + } + } + return ctrl.Result{}, nil } diff --git a/pkg/fleet-manager/application/manifests/rollout_testloader.go b/pkg/fleet-manager/application/manifests/rollout_testloader.go new file mode 100644 index 000000000..8ca019681 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/rollout_testloader.go @@ -0,0 +1,151 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package render + +import ( + "bytes" + "text/template" + + "k8s.io/apimachinery/pkg/types" +) + +type TestloaderConfig struct { + Name string + Namespace string + LabelKey string + LabelValue string + AnnotationKey string + AnnotationValue string +} + +const testloaderTemplateName = "testloader" + +// RenderTestloaderConfig takes generates YAML byte array configuration representing the testloader configuration. +func RenderTestloaderConfig(constTemplateName string, namespacedName types.NamespacedName, annotationKey, annotationsValue string) ([]byte, error) { + cfg := TestloaderConfig{ + Name: namespacedName.Name, + Namespace: namespacedName.Namespace, + LabelKey: "app", + LabelValue: namespacedName.Name, + AnnotationKey: annotationKey, + AnnotationValue: annotationsValue, + } + + return renderTestloaderTemplateConfig(constTemplateName, cfg) +} + +// renderTestloaderTemplateConfig reads, parses, and renders a template file using the provided configuration data. +func renderTestloaderTemplateConfig(constTemplateName string, cfg TestloaderConfig) ([]byte, error) { + tql, err := template.New(testloaderTemplateName).Funcs(funMap()).Parse(constTemplateName) + if err != nil { + return nil, err + } + + var b bytes.Buffer + if err := tql.Execute(&b, cfg); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + +func funMap() template.FuncMap { + return nil +} + +const TestlaoderDeployment = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.Name}} + namespace: {{.Namespace}} + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + {{.AnnotationKey}}: {{.AnnotationValue}} +spec: + selector: + matchLabels: + {{.LabelKey}}: {{.LabelValue}} + template: + metadata: + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8080" + openservicemesh.io/inbound-port-exclusion-list: "80, 8080" + spec: + containers: + - name: loadtester + image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + command: + - ./loadtester + - -port=8080 + - -log-level=info + - -timeout=1h + livenessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + readinessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + resources: + limits: + memory: "512Mi" + cpu: "1000m" + requests: + memory: "32Mi" + cpu: "10m" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 10001 +` + +const TestlaoderService = `apiVersion: v1 +kind: Service +metadata: + name: {{.Name}} + namespace: {{.Namespace}} + labels: + {{.LabelKey}}: {{.LabelValue}} + annotations: + {{.AnnotationKey}}: {{.AnnotationValue}} +spec: + type: ClusterIP + selector: + {{.LabelKey}}: {{.LabelValue}} + ports: + - name: http + port: 80 + protocol: TCP + targetPort: http +` diff --git a/pkg/fleet-manager/application/manifests/rollout_testloader_test.go b/pkg/fleet-manager/application/manifests/rollout_testloader_test.go new file mode 100644 index 000000000..2e47ac238 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/rollout_testloader_test.go @@ -0,0 +1,61 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package render + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" +) + +func TestRenderTestloaderConfig(t *testing.T) { + expectedTestDataFile := "testdata/" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "testloader", + } + annotationKey := "kurator.dev/rollout" + annotationValue := "policy" + cases := []struct { + name string + constTemplateName string + expectFileName string + }{ + { + name: "testloader deploy template", + constTemplateName: TestlaoderDeployment, + expectFileName: "testloader-deploy.yaml", + }, + { + name: "testloader svc template", + constTemplateName: TestlaoderService, + expectFileName: "testloader-svc.yaml", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + result, err := RenderTestloaderConfig(tc.constTemplateName, namespacedName, annotationKey, annotationValue) + if err != nil { + assert.Error(t, err) + } else { + expected, err := os.ReadFile(expectedTestDataFile + tc.expectFileName) + assert.NoError(t, err) + assert.Equal(t, result, expected) + } + }) + } +} diff --git a/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml b/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml new file mode 100644 index 000000000..19fe59092 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/testdata/testloader-deploy.yaml @@ -0,0 +1,64 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: testloader + namespace: test + labels: + app: testloader + annotations: + kurator.dev/rollout: policy +spec: + selector: + matchLabels: + app: testloader + template: + metadata: + labels: + app: testloader + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8080" + openservicemesh.io/inbound-port-exclusion-list: "80, 8080" + spec: + containers: + - name: loadtester + image: ghcr.io/fluxcd/flagger-loadtester:0.29.0 + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8080 + command: + - ./loadtester + - -port=8080 + - -log-level=info + - -timeout=1h + livenessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + readinessProbe: + exec: + command: + - wget + - --quiet + - --tries=1 + - --timeout=4 + - --spider + - http://localhost:8080/healthz + timeoutSeconds: 5 + resources: + limits: + memory: "512Mi" + cpu: "1000m" + requests: + memory: "32Mi" + cpu: "10m" + securityContext: + readOnlyRootFilesystem: true + runAsUser: 10001 diff --git a/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml b/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml new file mode 100644 index 000000000..ebfb35de3 --- /dev/null +++ b/pkg/fleet-manager/application/manifests/testdata/testloader-svc.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: testloader + namespace: test + labels: + app: testloader + annotations: + kurator.dev/rollout: policy +spec: + type: ClusterIP + selector: + app: testloader + ports: + - name: http + port: 80 + protocol: TCP + targetPort: http diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go new file mode 100644 index 000000000..c66d8a2f0 --- /dev/null +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -0,0 +1,378 @@ +/* +Copyright Kurator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "context" + "fmt" + "time" + + flaggerv1b1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" + fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1" + fleetmanager "kurator.dev/kurator/pkg/fleet-manager" + render "kurator.dev/kurator/pkg/fleet-manager/application/manifests" + plugin "kurator.dev/kurator/pkg/fleet-manager/plugin" + "kurator.dev/kurator/pkg/infra/util" +) + +const ( + // kurator rollout labels + RolloutIdentifier = "kurator.dev/rollout" + sidecarInject = "istio-injection" + + // StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated. + StatusSyncInterval = 30 * time.Second +) + +func (a *ApplicationManager) fetchRolloutClusters(ctx context.Context, + app *applicationapi.Application, + kubeClient client.Client, + fleet *fleetapi.Fleet, + syncPolicy *applicationapi.ApplicationSyncPolicy, +) (map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, error) { + log := ctrl.LoggerFrom(ctx) + destination := getPolicyDestination(app, syncPolicy) + ClusterInterfaceList, result, err := a.fetchFleetClusterList(ctx, fleet, destination.ClusterSelector) + if err != nil || result.RequeueAfter > 0 { + return nil, err + } + + fleetclusters := make(map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, len(ClusterInterfaceList)) + for _, cluster := range ClusterInterfaceList { + kclient, err := fleetmanager.ClientForCluster(kubeClient, fleet.Namespace, cluster) + if err != nil { + return nil, err + } + + kind := cluster.GetObject().GetObjectKind().GroupVersionKind().Kind + fleetclusters[fleetmanager.ClusterKey{Kind: kind, Name: cluster.GetObject().GetName()}] = &fleetmanager.FleetCluster{ + Secret: cluster.GetSecretName(), + SecretKey: cluster.GetSecretKey(), + Client: kclient, + } + } + log.Info("Successful to fetch destination clusters for Rollout") + return fleetclusters, nil +} + +func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, + rolloutPolicy *applicationapi.RolloutConfig, + destinationClusters map[fleetmanager.ClusterKey]*fleetmanager.FleetCluster, + policyName string, +) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + serviceNamespaceName := types.NamespacedName{ + Namespace: rolloutPolicy.Workload.Namespace, + Name: rolloutPolicy.ServiceName, + } + + testloaderNamespaceName := types.NamespacedName{ + Namespace: rolloutPolicy.Workload.Namespace, + Name: rolloutPolicy.Workload.Name + "-testloader", + } + + annotation := map[string]string{ + RolloutIdentifier: policyName, + } + + for clusterKey, fleetCluster := range destinationClusters { + fleetClusterClient := fleetCluster.Client.CtrlRuntimeClient() + + // If the trafficRoutingProvider is Istio, add the sidecar injection label to the workload's namespace. + if rolloutPolicy.TrafficRoutingProvider == "istio" { + err := enableIstioSidecarInjection(ctx, fleetClusterClient, rolloutPolicy.Workload.Namespace) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to set namespace %s istio-injection enable", rolloutPolicy.Workload.Namespace) + } + } + + // if delete private testloader when rollout polity has changed + if rolloutPolicy.TestLoader == nil || !*rolloutPolicy.TestLoader { + testloaderDeploy := &appsv1.Deployment{} + if err := deleteResourceCreatedByKurator(ctx, testloaderNamespaceName, fleetClusterClient, testloaderDeploy); err != nil { + return ctrl.Result{}, err + } + testloaderSvc := &corev1.Service{} + if err := deleteResourceCreatedByKurator(ctx, testloaderNamespaceName, fleetClusterClient, testloaderSvc); err != nil { + return ctrl.Result{}, err + } + } else { + // Installation of private testloader if needed + if err := installPrivateTestloader(ctx, testloaderNamespaceName, RolloutIdentifier, policyName); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to install private testloader for workload: %w", err) + } + } + + // Get the configuration of the workload's service and generate a canaryService. + service := &corev1.Service{} + if err := fleetClusterClient.Get(ctx, serviceNamespaceName, service); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{RequeueAfter: StatusSyncInterval}, errors.Wrapf(err, "not found service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) + } + return ctrl.Result{}, errors.Wrapf(err, "failed to get service %s in %s", rolloutPolicy.ServiceName, rolloutPolicy.Workload.Namespace) + } + + canaryInCluster := &flaggerv1b1.Canary{} + getErr := fleetClusterClient.Get(ctx, serviceNamespaceName, canaryInCluster) + if getErr != nil && !apierrors.IsNotFound(getErr) { + return ctrl.Result{}, errors.Wrapf(getErr, "failed to get canary %s in %s", serviceNamespaceName.Name, serviceNamespaceName.Namespace) + } + + canaryInCluster = renderCanary(*rolloutPolicy, canaryInCluster) + if canaryService, err := renderCanaryService(*rolloutPolicy, service); err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed rander canary configuration") + } else { + canaryInCluster.Spec.Service = *canaryService + } + canaryInCluster.Spec.Analysis = renderCanaryAnalysis(*rolloutPolicy, clusterKey.Name) + // Set up annotations to make sure it's a resource created by kurator + canaryInCluster.SetAnnotations(annotation) + + if apierrors.IsNotFound(getErr) { + if err := fleetClusterClient.Create(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to create rolloutPolicy: %v", err) + } + } else { + if err := fleetClusterClient.Update(ctx, canaryInCluster); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update rolloutPolicy: %v", err) + } + } + + log.Info("sync rolloutPolicy successful") + } + return ctrl.Result{}, nil +} + +func enableIstioSidecarInjection(ctx context.Context, kubeClient client.Client, namespace string) error { + log := ctrl.LoggerFrom(ctx) + + ns := &corev1.Namespace{} + namespacedName := types.NamespacedName{ + Namespace: namespace, + Name: namespace, + } + if err := kubeClient.Get(ctx, namespacedName, ns); err != nil { + // if no found, create a namespace + if apierrors.IsNotFound(err) { + ns.SetName(namespace) + ns.SetLabels(map[string]string{ + sidecarInject: "enabled", + }) + if createErr := kubeClient.Create(ctx, ns); createErr != nil { + return errors.Wrapf(createErr, "failed to create namespace %s", namespacedName.Namespace) + } + } + ns := addLabels(ns, sidecarInject, "enabled") + if updateErr := kubeClient.Update(ctx, ns); updateErr != nil { + return errors.Wrapf(updateErr, "failed to update namespace %s", namespacedName.Namespace) + } + } + log.Info("Inject sidecar successful") + return nil +} + +func installPrivateTestloader(ctx context.Context, namespacedName types.NamespacedName, annotationKey, annotationValue string) error { + log := ctrl.LoggerFrom(ctx) + // apply testloader deployment resource + testloaderDeploy, deployErr := render.RenderTestloaderConfig(render.TestlaoderDeployment, namespacedName, annotationKey, annotationValue) + if deployErr != nil { + return deployErr + } + if _, err := util.PatchResources(testloaderDeploy); err != nil { + return err + } + + // apply testloader service resource + testloaderSvc, svcErr := render.RenderTestloaderConfig(render.TestlaoderService, namespacedName, annotationKey, annotationValue) + if svcErr != nil { + return svcErr + } + if _, err := util.PatchResources(testloaderSvc); err != nil { + return err + } + + log.Info("install testloader successful") + return nil +} + +func deleteResourceCreatedByKurator(ctx context.Context, namespaceName types.NamespacedName, kubeClient client.Client, obj client.Object) error { + if err := kubeClient.Get(ctx, namespaceName, obj); err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrapf(err, "falied to get resource %s in %s", namespaceName.Name, namespaceName.Namespace) + } + } else { + // verify if the deployment were created by kurator + annotations := obj.GetAnnotations() + if _, exist := annotations[RolloutIdentifier]; exist { + if deleteErr := kubeClient.Delete(ctx, obj); deleteErr != nil && !apierrors.IsNotFound(deleteErr) { + return errors.Wrapf(deleteErr, "failed to delete kubernetes resource") + } + } + } + return nil +} + +// create/update canary configuration +func renderCanary(rolloutPolicy applicationapi.RolloutConfig, canaryInCluster *flaggerv1b1.Canary) *flaggerv1b1.Canary { + canaryInCluster.ObjectMeta.Namespace = rolloutPolicy.Workload.Namespace + canaryInCluster.ObjectMeta.Name = rolloutPolicy.Workload.Name + canaryInCluster.TypeMeta.Kind = "Canary" + canaryInCluster.TypeMeta.APIVersion = "flagger.app/v1beta1" + canaryInCluster.Spec = flaggerv1b1.CanarySpec{ + Provider: rolloutPolicy.TrafficRoutingProvider, + TargetRef: flaggerv1b1.LocalObjectReference{ + APIVersion: rolloutPolicy.Workload.APIVersion, + Kind: rolloutPolicy.Workload.Kind, + Name: rolloutPolicy.Workload.Name, + }, + ProgressDeadlineSeconds: rolloutPolicy.RolloutPolicy.RolloutTimeoutSeconds, + SkipAnalysis: rolloutPolicy.RolloutPolicy.SkipTrafficAnalysis, + RevertOnDeletion: rolloutPolicy.RolloutPolicy.RevertOnDeletion, + Suspend: rolloutPolicy.RolloutPolicy.Suspend, + } + + return canaryInCluster +} + +func renderCanaryService(rolloutPolicy applicationapi.RolloutConfig, service *corev1.Service) (*flaggerv1b1.CanaryService, error) { + if service == nil { + return nil, fmt.Errorf("service is nil, build canaryService configuration failed") + } + ports := service.Spec.Ports + canaryService := &flaggerv1b1.CanaryService{ + Name: rolloutPolicy.ServiceName, + Port: rolloutPolicy.Port, + Gateways: rolloutPolicy.RolloutPolicy.TrafficRouting.Gateways, + Hosts: rolloutPolicy.RolloutPolicy.TrafficRouting.Hosts, + Retries: rolloutPolicy.RolloutPolicy.TrafficRouting.Retries, + Headers: rolloutPolicy.RolloutPolicy.TrafficRouting.Headers, + CorsPolicy: rolloutPolicy.RolloutPolicy.TrafficRouting.CorsPolicy, + Primary: (*flaggerv1b1.CustomMetadata)(rolloutPolicy.Primary), + Canary: (*flaggerv1b1.CustomMetadata)(rolloutPolicy.Preview), + } + + Timeout := fmt.Sprintf("%d", rolloutPolicy.RolloutPolicy.TrafficRouting.TimeoutSeconds) + "s" + canaryService.Timeout = Timeout + + for _, port := range ports { + if port.Port == rolloutPolicy.Port { + canaryService.TargetPort = port.TargetPort + break + } + } + + return canaryService, nil +} + +func renderCanaryAnalysis(rolloutPolicy applicationapi.RolloutConfig, clusterName string) *flaggerv1b1.CanaryAnalysis { + canaryAnalysis := flaggerv1b1.CanaryAnalysis{ + Iterations: rolloutPolicy.RolloutPolicy.TrafficRouting.AnalysisTimes, + Threshold: *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckFailedTimes, + Match: rolloutPolicy.RolloutPolicy.TrafficRouting.Match, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + } + + if rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy != nil { + canaryAnalysis.MaxWeight = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.MaxWeight + canaryAnalysis.StepWeight = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeight + canaryAnalysis.StepWeights = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeights + canaryAnalysis.StepWeightPromotion = rolloutPolicy.RolloutPolicy.TrafficRouting.CanaryStrategy.StepWeightPromotion + } + + CheckInterval := fmt.Sprintf("%d", *rolloutPolicy.RolloutPolicy.TrafficAnalysis.CheckIntervalSeconds) + "s" + canaryAnalysis.Interval = CheckInterval + + canaryMetric := []flaggerv1b1.CanaryMetric{} + for _, metric := range rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics { + metricInterval := fmt.Sprintf("%d", *metric.IntervalSeconds) + "s" + templateMetric := flaggerv1b1.CanaryMetric{ + Name: string(metric.Name), + Interval: metricInterval, + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(metric.ThresholdRange), + } + canaryMetric = append(canaryMetric, templateMetric) + } + canaryAnalysis.Metrics = canaryMetric + + // Trigger testloader to request service before analysis by webhook. + webhookTemplate := flaggerv1b1.CanaryWebhook{ + Name: "generated-testload", + Timeout: "60s", + } + + if len(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.Commands) != 0 { + var url string + // if have private webhook, webhook url is private testloader url + // else is public testloader url + if rolloutPolicy.TestLoader != nil && *rolloutPolicy.TestLoader { + name := rolloutPolicy.ServiceName + "-testloader" + namespace := rolloutPolicy.Workload.Namespace + url = generateWebhookUrl(name, namespace) + } else if namespace, exist := plugin.ProviderNamespace[fleetapi.Provider(rolloutPolicy.TrafficRoutingProvider)]; exist { + name := namespace + "-testloader-" + clusterName + "-loadtester" + url = generateWebhookUrl(name, namespace) + } + webhookTemplate.URL = url + + timeout := fmt.Sprintf("%d", *rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.TimeoutSeconds) + "s" + webhookTemplate.Timeout = timeout + + canaryWebhook := []flaggerv1b1.CanaryWebhook{} + bakName := webhookTemplate.Name + for index, command := range rolloutPolicy.RolloutPolicy.TrafficAnalysis.Webhooks.Commands { + metadata := map[string]string{ + "type": "cmd", + "cmd": command, + } + webhookTemplate.Metadata = &metadata + webhookTemplate.Name = bakName + "-" + fmt.Sprintf("%d", index) + canaryWebhook = append(canaryWebhook, webhookTemplate) + } + + canaryAnalysis.Webhooks = canaryWebhook + } + return &canaryAnalysis +} + +func generateWebhookUrl(name, namespace string) string { + url := "http://" + name + "." + namespace + "/" + return url +} + +func addLabels(obj client.Object, key, value string) client.Object { + labels := obj.GetLabels() + // prevent nil pointer panic + if labels == nil { + obj.SetLabels(map[string]string{ + key: value, + }) + return obj + } + labels[key] = value + obj.SetLabels(labels) + return obj +} diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go new file mode 100644 index 000000000..4bf90e1f5 --- /dev/null +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -0,0 +1,506 @@ +/* +Copyright Kurator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "reflect" + "testing" + + flaggerv1b1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + "github.com/fluxcd/flagger/pkg/apis/istio/common/v1alpha1" + istiov1alpha3 "github.com/fluxcd/flagger/pkg/apis/istio/v1alpha3" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + + applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" +) + +func generateRolloutPloicy(installPrivateTestloader *bool) applicationapi.RolloutConfig { + timeout := 50 + RolloutTimeoutSeconds := int32(50) + min := 99.0 + max := 500.0 + + rolloutPolicy := applicationapi.RolloutConfig{ + TestLoader: installPrivateTestloader, + TrafficRoutingProvider: "istio", + Workload: &applicationapi.CrossNamespaceObjectReference{ + APIVersion: "appv1/deployment", + Kind: "Deployment", + Name: "podinfo", + Namespace: "test", + }, + ServiceName: "podinfo-service", + Port: 80, + RolloutPolicy: &applicationapi.RolloutPolicy{ + TrafficRouting: &applicationapi.TrafficRoutingConfig{ + TimeoutSeconds: 50, + Gateways: []string{ + "istio-system/public-gateway", + }, + Hosts: []string{ + "app.example.com", + }, + Retries: &istiov1alpha3.HTTPRetry{ + Attempts: 10, + PerTryTimeout: "40s", + RetryOn: "gateway-error, connect-failure, refused-stream", + }, + Headers: &istiov1alpha3.Headers{ + Request: &istiov1alpha3.HeaderOperations{ + Add: map[string]string{ + "x-some-header": "value", + }, + }, + }, + CorsPolicy: &istiov1alpha3.CorsPolicy{ + AllowOrigin: []string{"example"}, + AllowMethods: []string{"GET"}, + AllowCredentials: false, + AllowHeaders: []string{"x-some-header"}, + MaxAge: "24h", + }, + CanaryStrategy: &applicationapi.CanaryConfig{ + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + }, + AnalysisTimes: 5, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + }, + TrafficAnalysis: &applicationapi.TrafficAnalysis{ + CheckIntervalSeconds: &timeout, + CheckFailedTimes: &timeout, + Metrics: []applicationapi.Metric{ + { + Name: "request-success-rate", + IntervalSeconds: &timeout, + ThresholdRange: &applicationapi.CanaryThresholdRange{ + Min: &min, + }, + }, + { + Name: "request-duration", + IntervalSeconds: &timeout, + ThresholdRange: &applicationapi.CanaryThresholdRange{ + Max: &max, + }, + }, + }, + Webhooks: applicationapi.Webhook{ + TimeoutSeconds: &timeout, + Commands: []string{ + "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + SessionAffinity: &applicationapi.SessionAffinity{ + CookieName: "User", + MaxAge: 24, + }, + }, + RolloutTimeoutSeconds: &RolloutTimeoutSeconds, + SkipTrafficAnalysis: false, + RevertOnDeletion: false, + Suspend: false, + }, + } + return rolloutPolicy +} + +func Test_renderCanary(t *testing.T) { + int32Time := int32(50) + sign := true + type args struct { + rolloutPolicy applicationapi.RolloutConfig + } + tests := []struct { + name string + args args + want *flaggerv1b1.Canary + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: generateRolloutPloicy(&sign), + }, + want: &flaggerv1b1.Canary{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "podinfo", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Canary", + APIVersion: "flagger.app/v1beta1", + }, + Spec: flaggerv1b1.CanarySpec{ + Provider: "istio", + TargetRef: flaggerv1b1.LocalObjectReference{ + APIVersion: "appv1/deployment", + Kind: "Deployment", + Name: "podinfo", + }, + ProgressDeadlineSeconds: &int32Time, + SkipAnalysis: false, + RevertOnDeletion: false, + Suspend: false, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := renderCanary(tt.args.rolloutPolicy, &flaggerv1b1.Canary{}); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanary() = %v\n, want %v", got, tt.want) + } + }) + } +} + +func Test_renderCanaryService(t *testing.T) { + sign := true + rolloutPolicy := generateRolloutPloicy(&sign) + type args struct { + rolloutPolicy applicationapi.RolloutConfig + service *corev1.Service + } + tests := []struct { + name string + args args + want *flaggerv1b1.CanaryService + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: rolloutPolicy, + service: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "podinfo-service", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "podinfo", + }, + Ports: []corev1.ServicePort{ + { + Protocol: corev1.ProtocolTCP, + Port: 80, + TargetPort: intstr.FromInt(8080), + }, + }, + }, + }, + }, + want: &flaggerv1b1.CanaryService{ + Name: "podinfo-service", + Port: 80, + Timeout: "50s", + TargetPort: intstr.FromInt(8080), + Gateways: []string{ + "istio-system/public-gateway", + }, + Hosts: []string{ + "app.example.com", + }, + Retries: rolloutPolicy.RolloutPolicy.TrafficRouting.Retries, + Headers: rolloutPolicy.RolloutPolicy.TrafficRouting.Headers, + CorsPolicy: rolloutPolicy.RolloutPolicy.TrafficRouting.CorsPolicy, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, _ := renderCanaryService(tt.args.rolloutPolicy, tt.args.service); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanaryService() = %v\n, want %v", got, tt.want) + } + }) + } +} + +func Test_renderCanaryAnalysis(t *testing.T) { + sign := true + wantFalse := false + timeout := 50 + rolloutPolicy := generateRolloutPloicy(&sign) + wantPublicTestloaderRolloutPolicy := generateRolloutPloicy(&wantFalse) + type args struct { + rolloutPolicy applicationapi.RolloutConfig + } + tests := []struct { + name string + args args + want *flaggerv1b1.CanaryAnalysis + }{ + { + name: "functional test", + args: args{ + rolloutPolicy: rolloutPolicy, + }, + want: &flaggerv1b1.CanaryAnalysis{ + Interval: "50s", + Iterations: 5, + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + Threshold: timeout, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + Metrics: []flaggerv1b1.CanaryMetric{ + { + Name: "request-success-rate", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[0].ThresholdRange), + }, + { + Name: "request-duration", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[1].ThresholdRange), + }, + }, + Webhooks: []flaggerv1b1.CanaryWebhook{ + { + Name: "generated-testload-0", + Timeout: "50s", + URL: "http://podinfo-service-testloader.test/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + }, + }, + { + Name: "generated-testload-1", + Timeout: "50s", + URL: "http://podinfo-service-testloader.test/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + }, + }, + }, + { + name: "public Testloader", + args: args{ + rolloutPolicy: wantPublicTestloaderRolloutPolicy, + }, + want: &flaggerv1b1.CanaryAnalysis{ + Interval: "50s", + Iterations: 5, + MaxWeight: 50, + StepWeight: 10, + StepWeights: []int{ + 1, 20, 40, 80, + }, + StepWeightPromotion: 30, + Threshold: timeout, + Match: []istiov1alpha3.HTTPMatchRequest{ + { + Headers: map[string]v1alpha1.StringMatch{ + "user-agent": { + Regex: ".*Firefox.*", + }, + "cookie": { + Regex: "^(.*?;)?(type=insider)(;.*)?$", + }, + }, + }, + }, + SessionAffinity: (*flaggerv1b1.SessionAffinity)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.SessionAffinity), + Metrics: []flaggerv1b1.CanaryMetric{ + { + Name: "request-success-rate", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[0].ThresholdRange), + }, + { + Name: "request-duration", + Interval: "50s", + ThresholdRange: (*flaggerv1b1.CanaryThresholdRange)(rolloutPolicy.RolloutPolicy.TrafficAnalysis.Metrics[1].ThresholdRange), + }, + }, + Webhooks: []flaggerv1b1.CanaryWebhook{ + { + Name: "generated-testload-0", + Timeout: "50s", + URL: "http://istio-system-testloader-kurator-member-loadtester.istio-system/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "hey -z 1m -q 10 -c 2 http://podinfo-canary.test:9898/", + }, + }, + { + Name: "generated-testload-1", + Timeout: "50s", + URL: "http://istio-system-testloader-kurator-member-loadtester.istio-system/", + Metadata: &map[string]string{ + "type": "cmd", + "cmd": "curl -sd 'test' http://podinfo-canary:9898/token | grep token", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := renderCanaryAnalysis(tt.args.rolloutPolicy, "kurator-member"); !reflect.DeepEqual(got, tt.want) { + t.Errorf("renderCanaryAnalysis() = %v\n, want %v", got, tt.want) + } + }) + } +} + +/* +func Test_generateDeployConfig(t *testing.T) { + filepath := manifests.BuiltinOrDir("") + //fmt.Printf("%s", filepath) + deployname := "plugins/testloader-deploy.yaml" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "podinfo", + } + if _, err := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace); err != nil { + fmt.Printf("failed get testloader deployment configuration: %v", err) + } +} + +func Test_generateSvcConfig(t *testing.T) { + filepath := manifests.BuiltinOrDir("") + //fmt.Printf("%s", filepath) + svcname := "plugins/testloader-svc.yaml" + namespacedName := types.NamespacedName{ + Namespace: "test", + Name: "podinfo", + } + if _, err := generateSvcConfig(filepath, svcname, namespacedName.Name, namespacedName.Namespace); err != nil { + fmt.Printf("failed get testloader deployment configuration: %v", err) + } +} +*/ + +func Test_addLables(t *testing.T) { + type args struct { + obj client.Object + key string + value string + } + tests := []struct { + name string + args args + want client.Object + }{ + { + name: "function test", + args: args{ + obj: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "xxx": "abc", + }, + }, + }, + key: "istio-injection", + value: "ebabled", + }, + want: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "xxx": "abc", + "istio-injection": "ebabled", + }, + }, + }, + }, + { + name: "empty labels test", + args: args{ + obj: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + }, + }, + key: "XXX", + value: "abc", + }, + want: &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + Kind: "Namespace", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "webapp", + Labels: map[string]string{ + "XXX": "abc", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := addLabels(tt.args.obj, tt.args.key, tt.args.value); !reflect.DeepEqual(got, tt.want) { + t.Errorf("addLablesOrAnnotaions() = %v, want %v", got, tt.want) + } + }) + } +}