diff --git a/.github/workflows/scale-test.yaml b/.github/workflows/scale-test.yaml index 36a70fe84d..6ce87b25a8 100644 --- a/.github/workflows/scale-test.yaml +++ b/.github/workflows/scale-test.yaml @@ -96,11 +96,12 @@ jobs: NUM_REPLICAS: ${{ inputs.num_replicas }} NUM_NETPOLS: ${{ inputs.num_netpol }} CLEANUP: ${{ inputs.cleanup }} - IMAGE_REGISTRY: ${{ inputs.image_namespace == '' && vars.ACR_NAME || inputs.image_namespace }} + IMAGE_REGISTRY: ${{ vars.ACR_NAME }} IMAGE_NAMESPACE: ${{ github.repository }} TAG: ${{ inputs.image_tag }} AZURE_APP_INSIGHTS_KEY: ${{ secrets.AZURE_APP_INSIGHTS_KEY }} shell: bash run: | set -euo pipefail - go test -v ./test/e2e/. -timeout 300m -tags=scale -count=1 -args -image-tag=$( [[ $TAG == "" ]] && make version || echo $TAG ) -create-infra=false -delete-infra=false + [[ $TAG == "" ]] && TAG=$(make version) + go test -v ./test/e2e/. -timeout 300m -tags=scale -count=1 -args -create-infra=false -delete-infra=false diff --git a/test/e2e/common/common.go b/test/e2e/common/common.go index 9772320685..2aa29f05dd 100644 --- a/test/e2e/common/common.go +++ b/test/e2e/common/common.go @@ -22,6 +22,7 @@ const ( KubeSystemNamespace = "kube-system" TestPodNamespace = "kube-system-test" AzureAppInsightsKeyEnv = "AZURE_APP_INSIGHTS_KEY" + OutputFilePathEnv = "OUTPUT_FILEPATH" ) var ( diff --git a/test/e2e/framework/kubernetes/check-pod-status.go b/test/e2e/framework/kubernetes/check-pod-status.go index 27405031bb..197b32c964 100644 --- a/test/e2e/framework/kubernetes/check-pod-status.go +++ b/test/e2e/framework/kubernetes/check-pod-status.go @@ -14,8 +14,9 @@ import ( ) const ( - RetryTimeoutPodsReady = 5 * time.Minute - RetryIntervalPodsReady = 5 * time.Second + RetryTimeoutPodsReady = 5 * time.Minute + RetryIntervalPodsReady = 5 * time.Second + timeoutWaitForPodsSeconds = 1200 printInterval = 5 // print to stdout every 5 iterations ) @@ -48,7 +49,7 @@ func (w *WaitPodsReady) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeoutWaitForPodsSeconds*time.Second) defer cancel() return WaitForPodReady(ctx, clientset, w.Namespace, w.LabelSelector) @@ -60,7 +61,6 @@ func (w *WaitPodsReady) Stop() error { } func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, namespace, labelSelector string) error { - podReadyMap := make(map[string]bool) printIterator := 0 conditionFunc := wait.ConditionWithContextFunc(func(context.Context) (bool, error) { @@ -78,34 +78,25 @@ func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, names return false, nil } - // check each indviidual pod to see if it's in Running state + // check each individual pod to see if it's in Running state for i := range podList.Items { - var pod *corev1.Pod - pod, err = clientset.CoreV1().Pods(namespace).Get(ctx, podList.Items[i].Name, metav1.GetOptions{}) - if err != nil { - return false, fmt.Errorf("error getting Pod: %w", err) - } // Check the Pod phase - if pod.Status.Phase != corev1.PodRunning { + if podList.Items[i].Status.Phase != corev1.PodRunning { if printIterator%printInterval == 0 { - log.Printf("pod \"%s\" is not in Running state yet. Waiting...\n", pod.Name) + log.Printf("pod \"%s\" is not in Running state yet. Waiting...\n", podList.Items[i].Name) } return false, nil } // Check all container status. - for _, containerStatus := range pod.Status.ContainerStatuses { - if !containerStatus.Ready { - log.Printf("container \"%s\" in pod \"%s\" is not ready yet. Waiting...\n", containerStatus.Name, pod.Name) + for j := range podList.Items[i].Status.ContainerStatuses { + if !podList.Items[i].Status.ContainerStatuses[j].Ready { + log.Printf("container \"%s\" in pod \"%s\" is not ready yet. Waiting...\n", podList.Items[i].Status.ContainerStatuses[j].Name, podList.Items[i].Name) return false, nil } } - if !podReadyMap[pod.Name] { - log.Printf("pod \"%s\" is in Running state\n", pod.Name) - podReadyMap[pod.Name] = true - } } log.Printf("all pods in namespace \"%s\" with label \"%s\" are in Running state\n", namespace, labelSelector) return true, nil diff --git a/test/e2e/framework/kubernetes/create-kapinger-deployment.go b/test/e2e/framework/kubernetes/create-kapinger-deployment.go index 06862e1c09..a895625e32 100644 --- a/test/e2e/framework/kubernetes/create-kapinger-deployment.go +++ b/test/e2e/framework/kubernetes/create-kapinger-deployment.go @@ -138,7 +138,7 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { "memory": resource.MustParse("20Mi"), }, Limits: v1.ResourceList{ - "memory": resource.MustParse("20Mi"), + "memory": resource.MustParse("100Mi"), }, }, Ports: []v1.ContainerPort{ diff --git a/test/e2e/framework/kubernetes/delete-namespace.go b/test/e2e/framework/kubernetes/delete-namespace.go index c5fa3dbc66..a8bd41c3ef 100644 --- a/test/e2e/framework/kubernetes/delete-namespace.go +++ b/test/e2e/framework/kubernetes/delete-namespace.go @@ -30,7 +30,7 @@ func (d *DeleteNamespace) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Second) defer cancel() err = clientset.CoreV1().Namespaces().Delete(ctx, d.Namespace, metaV1.DeleteOptions{}) @@ -41,7 +41,7 @@ func (d *DeleteNamespace) Run() error { } backoff := wait.Backoff{ - Steps: 6, + Steps: 9, Duration: 10 * time.Second, Factor: 2.0, // Jitter: 0.1, diff --git a/test/e2e/framework/kubernetes/install-retina-helm.go b/test/e2e/framework/kubernetes/install-retina-helm.go index 7f1828f17c..ba74d64eac 100644 --- a/test/e2e/framework/kubernetes/install-retina-helm.go +++ b/test/e2e/framework/kubernetes/install-retina-helm.go @@ -91,6 +91,7 @@ func (i *InstallHelmChart) Run() error { chart.Values["image"].(map[string]interface{})["repository"] = imageRegistry + "/" + imageNamespace + "/retina-agent" chart.Values["image"].(map[string]interface{})["initRepository"] = imageRegistry + "/" + imageNamespace + "/retina-init" chart.Values["operator"].(map[string]interface{})["repository"] = imageRegistry + "/" + imageNamespace + "/retina-operator" + chart.Values["operator"].(map[string]interface{})["enabled"] = true getclient := action.NewGet(actionConfig) release, err := getclient.Run(i.ReleaseName) diff --git a/test/e2e/framework/scaletest/add-shared-labels.go b/test/e2e/framework/scaletest/add-shared-labels.go index d76139c0be..6a38be4f5d 100644 --- a/test/e2e/framework/scaletest/add-shared-labels.go +++ b/test/e2e/framework/scaletest/add-shared-labels.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -50,32 +51,21 @@ func (a *AddSharedLabelsToAllPods) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := contextToLabelAllPods() defer cancel() resources, err := clientset.CoreV1().Pods(a.Namespace).List(ctx, metav1.ListOptions{}) - patch := []patchStringValue{} - - for i := 0; i < a.NumSharedLabelsPerPod; i++ { - patch = append(patch, patchStringValue{ - Op: "add", - Path: "/metadata/labels/shared-lab-" + fmt.Sprintf("%05d", i), - Value: "val", - }) - } - - patchBytes, err := json.Marshal(patch) + patchBytes, err := getSharedLabelsPatch(a.NumSharedLabelsPerPod) if err != nil { - return fmt.Errorf("error marshalling patch: %w", err) + return fmt.Errorf("error getting label patch: %w", err) } for _, resource := range resources.Items { - clientset.CoreV1().Pods(a.Namespace).Patch(ctx, resource.Name, - types.JSONPatchType, - patchBytes, - metav1.PatchOptions{}, - ) + err = patchLabel(ctx, clientset, a.Namespace, resource.Name, patchBytes) + if err != nil { + log.Printf("Error adding shared labels to pod %s: %s\n", resource.Name, err) + } } return nil @@ -85,3 +75,38 @@ func (a *AddSharedLabelsToAllPods) Run() error { func (a *AddSharedLabelsToAllPods) Stop() error { return nil } + +func patchLabel(ctx context.Context, clientset *kubernetes.Clientset, namespace, podName string, patchBytes []byte) error { + log.Println("Labeling Pod", podName) + _, err := clientset.CoreV1().Pods(namespace).Patch(ctx, podName, + types.JSONPatchType, + patchBytes, + metav1.PatchOptions{}, + ) + if err != nil { + return fmt.Errorf("failed to patch pod: %w", err) + } + + return nil +} + +func getSharedLabelsPatch(numLabels int) ([]byte, error) { + patch := []patchStringValue{} + for i := 0; i < numLabels; i++ { + patch = append(patch, patchStringValue{ + Op: "add", + Path: "/metadata/labels/shared-lab-" + fmt.Sprintf("%05d", i), + Value: "val", + }) + } + b, err := json.Marshal(patch) + if err != nil { + return nil, fmt.Errorf("error marshalling patch: %w", err) + } + + return b, nil +} + +func contextToLabelAllPods() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), 120*time.Minute) +} diff --git a/test/e2e/framework/scaletest/add-unique-labels.go b/test/e2e/framework/scaletest/add-unique-labels.go index cfdd458c82..ff85764d8f 100644 --- a/test/e2e/framework/scaletest/add-unique-labels.go +++ b/test/e2e/framework/scaletest/add-unique-labels.go @@ -1,13 +1,10 @@ package scaletest import ( - "context" "encoding/json" "fmt" - "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) @@ -44,7 +41,7 @@ func (a *AddUniqueLabelsToAllPods) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := contextToLabelAllPods() defer cancel() resources, err := clientset.CoreV1().Pods(a.Namespace).List(ctx, metav1.ListOptions{}) @@ -53,7 +50,6 @@ func (a *AddUniqueLabelsToAllPods) Run() error { for _, resource := range resources.Items { patch := []patchStringValue{} - for i := 0; i < a.NumUniqueLabelsPerPod; i++ { patch = append(patch, patchStringValue{ Op: "add", @@ -65,14 +61,13 @@ func (a *AddUniqueLabelsToAllPods) Run() error { patchBytes, err := json.Marshal(patch) if err != nil { - return fmt.Errorf("error marshalling patch: %w", err) + return fmt.Errorf("failed to marshal patch: %w", err) } - clientset.CoreV1().Pods(a.Namespace).Patch(ctx, resource.Name, - types.JSONPatchType, - patchBytes, - metav1.PatchOptions{}, - ) + err = patchLabel(ctx, clientset, a.Namespace, resource.Name, patchBytes) + if err != nil { + return fmt.Errorf("error adding unique label to pod: %w", err) + } } return nil diff --git a/test/e2e/framework/scaletest/create-resources.go b/test/e2e/framework/scaletest/create-resources.go index 688ab57747..4057cdc826 100644 --- a/test/e2e/framework/scaletest/create-resources.go +++ b/test/e2e/framework/scaletest/create-resources.go @@ -7,6 +7,7 @@ import ( "time" e2ekubernetes "github.com/microsoft/retina/test/e2e/framework/kubernetes" + "github.com/microsoft/retina/test/retry" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -48,11 +49,18 @@ func (c *CreateResources) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1200*time.Second) defer cancel() + retrier := retry.Retrier{Attempts: defaultRetryAttempts, Delay: defaultRetryDelay} + for _, resource := range resources { - e2ekubernetes.CreateResource(ctx, resource, clientset) + err := retrier.Do(ctx, func() error { + return e2ekubernetes.CreateResource(ctx, resource, clientset) + }) + if err != nil { + return fmt.Errorf("error creating resource: %w", err) + } } return nil @@ -71,12 +79,6 @@ func (c *CreateResources) getResources() []runtime.Object { // kwokDeployments := c.generateDeployments(c.NumKwokDeployments, c.NumKwokReplicas, "kwok") // objs = append(objs, kwokDeployments...) - realDeployments := c.generateDeployments() - objs = append(objs, realDeployments...) - - services := c.generateServices("real") - objs = append(objs, services...) - kapinger := e2ekubernetes.CreateKapingerDeployment{ KapingerNamespace: c.Namespace, KubeConfigFilePath: c.KubeConfigFilePath, @@ -88,6 +90,13 @@ func (c *CreateResources) getResources() []runtime.Object { kapingerSA := kapinger.GetKapingerServiceAccount() objs = append(objs, kapingerClusterRole, kapingerClusterRoleBinding, kapingerSA) + + realDeployments := c.generateDeployments() + objs = append(objs, realDeployments...) + + services := c.generateServices() + objs = append(objs, services...) + // c.generateKwokNodes() log.Println("Finished generating YAMLs") return objs @@ -118,6 +127,8 @@ func (c *CreateResources) generateDeployments() []runtime.Object { labelPrefix := fmt.Sprintf("%s-dep-lab", name) deployment.Name = name + deployment.Labels["name"] = name + deployment.Spec.Template.Labels["name"] = name r := int32(c.NumRealReplicas) deployment.Spec.Replicas = &r @@ -135,7 +146,7 @@ func (c *CreateResources) generateDeployments() []runtime.Object { return objs } -func (c *CreateResources) generateServices(svcKind string) []runtime.Object { +func (c *CreateResources) generateServices() []runtime.Object { objs := []runtime.Object{} kapingerSvc := e2ekubernetes.CreateKapingerDeployment{ @@ -146,10 +157,10 @@ func (c *CreateResources) generateServices(svcKind string) []runtime.Object { for i := 0; i < c.NumRealServices; i++ { template := kapingerSvc.GetKapingerService() - name := fmt.Sprintf("%s-svc-%05d", svcKind, i) + name := fmt.Sprintf("%s-svc-%05d", c.RealPodType, i) template.Name = name - template.Spec.Selector["name"] = fmt.Sprintf("%s-%s-dep-%05d", svcKind, c.RealPodType, i) + template.Spec.Selector["name"] = fmt.Sprintf("%s-dep-%05d", c.RealPodType, i) objs = append(objs, template) } diff --git a/test/e2e/framework/scaletest/delete-and-re-add-labels.go b/test/e2e/framework/scaletest/delete-and-re-add-labels.go index 5897b4d766..3403ea2488 100644 --- a/test/e2e/framework/scaletest/delete-and-re-add-labels.go +++ b/test/e2e/framework/scaletest/delete-and-re-add-labels.go @@ -48,7 +48,7 @@ func (d *DeleteAndReAddLabels) Run() error { return fmt.Errorf("error creating Kubernetes client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + ctx, cancel := contextToLabelAllPods() defer cancel() labelsToDelete := `"shared-lab-00000": null, "shared-lab-00001": null, "shared-lab-00002": null` @@ -91,6 +91,7 @@ func (d *DeleteAndReAddLabels) Run() error { func (d *DeleteAndReAddLabels) addLabels(ctx context.Context, clientset *kubernetes.Clientset, pods *corev1.PodList, patch string) error { for _, pod := range pods.Items { + log.Println("Labeling Pod", pod.Name) _, err := clientset.CoreV1().Pods(d.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) if err != nil { return fmt.Errorf("error patching pod: %w", err) @@ -103,6 +104,7 @@ func (d *DeleteAndReAddLabels) addLabels(ctx context.Context, clientset *kuberne func (d *DeleteAndReAddLabels) deleteLabels(ctx context.Context, clientset *kubernetes.Clientset, pods *corev1.PodList, patch string) error { for _, pod := range pods.Items { + log.Println("Deleting label from Pod", pod.Name) _, err := clientset.CoreV1().Pods(d.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) if err != nil { return fmt.Errorf("error patching pod: %w", err) diff --git a/test/e2e/framework/scaletest/get-publish-metrics.go b/test/e2e/framework/scaletest/get-publish-metrics.go index 3495addf33..b8e168b239 100644 --- a/test/e2e/framework/scaletest/get-publish-metrics.go +++ b/test/e2e/framework/scaletest/get-publish-metrics.go @@ -6,29 +6,41 @@ import ( "fmt" "log" "os" - "sync" + "strconv" "time" "github.com/microsoft/retina/pkg/telemetry" "github.com/microsoft/retina/test/e2e/common" + "github.com/microsoft/retina/test/retry" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" metrics "k8s.io/metrics/pkg/client/clientset/versioned" ) +const ( + defaultRetryAttempts = 10 + defaultRetryDelay = 500 * time.Millisecond + defaultInterval = 2 * time.Minute +) + type GetAndPublishMetrics struct { KubeConfigFilePath string AdditionalTelemetryProperty map[string]string Labels map[string]string - OutputFilePath string + outputFilePath string stop chan struct{} - wg sync.WaitGroup + errs *errgroup.Group telemetryClient *telemetry.TelemetryClient appInsightsKey string + k8sClient *kubernetes.Clientset + metricsClient *metrics.Clientset } func (g *GetAndPublishMetrics) Run() error { @@ -43,12 +55,36 @@ func (g *GetAndPublishMetrics) Run() error { g.telemetryClient = telemetryClient } + config, err := clientcmd.BuildConfigFromFlags("", g.KubeConfigFilePath) + if err != nil { + return fmt.Errorf("error building kubeconfig: %w", err) + } + + k8sClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating Kubernetes client: %w", err) + } + g.k8sClient = k8sClient + + metricsClient, err := metrics.NewForConfig(config) + if err != nil { + return fmt.Errorf("error creating metrics client: %w", err) + } + g.metricsClient = metricsClient + g.stop = make(chan struct{}) - g.wg.Add(1) + g.errs = new(errgroup.Group) - go func() { + g.errs.Go(func() error { - t := time.NewTicker(5 * time.Minute) + t := time.NewTicker(defaultInterval) + defer t.Stop() + + // First execution + err := g.getAndPublishMetrics() + if err != nil { + return fmt.Errorf("failed to get and publish test metrics: %w", err) + } for { select { @@ -56,18 +92,16 @@ func (g *GetAndPublishMetrics) Run() error { case <-t.C: err := g.getAndPublishMetrics() if err != nil { - log.Fatalf("error getting and publishing number of restarts: %v", err) - return + return fmt.Errorf("failed to get and publish test metrics: %w", err) } case <-g.stop: - g.wg.Done() - return + return nil } } - }() + }) return nil } @@ -75,66 +109,74 @@ func (g *GetAndPublishMetrics) Run() error { func (g *GetAndPublishMetrics) Stop() error { telemetry.ShutdownAppInsights() close(g.stop) - g.wg.Wait() + if err := g.errs.Wait(); err != nil { + return err //nolint:wrapcheck // already wrapped in goroutine + } + return nil } func (g *GetAndPublishMetrics) Prevalidate() error { - if os.Getenv(common.AzureAppInsightsKeyEnv) == "" { + if g.appInsightsKey == "" { log.Println("env ", common.AzureAppInsightsKeyEnv, " not provided") } - g.appInsightsKey = os.Getenv(common.AzureAppInsightsKeyEnv) if _, ok := g.AdditionalTelemetryProperty["retinaVersion"]; !ok { return fmt.Errorf("retinaVersion is required in AdditionalTelemetryProperty") } + + if g.outputFilePath == "" { + log.Println("Output file path not provided. Metrics will not be written to file") + return nil + } + + log.Println("Output file path provided: ", g.outputFilePath) return nil } func (g *GetAndPublishMetrics) getAndPublishMetrics() error { - config, err := clientcmd.BuildConfigFromFlags("", g.KubeConfigFilePath) - if err != nil { - return fmt.Errorf("error building kubeconfig: %w", err) - } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) + defer cancel() - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return fmt.Errorf("error creating Kubernetes client: %w", err) - } + labelSelector := labels.Set(g.Labels).String() - mc, err := metrics.NewForConfig(config) + agentsMetrics, err := g.getPodsMetrics(ctx, labelSelector) if err != nil { - return fmt.Errorf("error creating metrics client: %w", err) + log.Println("Error getting agents' metrics, will try again later:", err) + return nil } - ctx, cancel := context.WithTimeout(context.Background(), defaultTimeoutSeconds*time.Second) - defer cancel() - - metrics, err := g.getMetrics(ctx, clientset, mc) + operatorMetrics, err := g.getPodsMetrics(ctx, "app=retina-operator") if err != nil { - return fmt.Errorf("error getting metrics: %w", err) + log.Println("Error getting operator's metrics, will try again later:", err) + return nil } + allMetrics := []metric{} + allMetrics = append(allMetrics, agentsMetrics...) + allMetrics = append(allMetrics, operatorMetrics...) + // Publish metrics if g.telemetryClient != nil { log.Println("Publishing metrics to AppInsights") - for _, metric := range metrics { + for _, metric := range allMetrics { g.telemetryClient.TrackEvent("scale-test", metric) } } // Write metrics to file - if g.OutputFilePath != "" { - log.Println("Writing metrics to file ", g.OutputFilePath) - file, err := os.OpenFile(g.OutputFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if g.outputFilePath != "" { + log.Println("Writing metrics to file ", g.outputFilePath) + + file, err := os.OpenFile(g.outputFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return fmt.Errorf("error writing to csv file: %w", err) } defer file.Close() - for _, m := range metrics { + for _, m := range allMetrics { b, err := json.Marshal(m) if err != nil { return fmt.Errorf("error marshalling metric: %w", err) @@ -150,45 +192,83 @@ func (g *GetAndPublishMetrics) getAndPublishMetrics() error { type metric map[string]string -func (g *GetAndPublishMetrics) getMetrics(ctx context.Context, k8sClient *kubernetes.Clientset, metricsClient *metrics.Clientset) ([]metric, error) { +func (g *GetAndPublishMetrics) getPodsMetrics(ctx context.Context, labelSelector string) ([]metric, error) { - labelSelector := labels.Set(g.Labels).String() + var pods *v1.PodList - pods, err := k8sClient.CoreV1().Pods(common.KubeSystemNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + retrier := retry.Retrier{Attempts: defaultRetryAttempts, Delay: defaultRetryDelay} + + err := retrier.Do(ctx, func() error { + var err error + pods, err = g.k8sClient.CoreV1().Pods(common.KubeSystemNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return fmt.Errorf("error listing pods: %w", err) + } + return nil + }) if err != nil { - return nil, errors.Wrap(err, "error getting nodes") + return nil, errors.Wrap(err, "error getting pods") } - nodesMetricsInt := metricsClient.MetricsV1beta1().NodeMetricses() - podMetricsInt := metricsClient.MetricsV1beta1().PodMetricses(common.KubeSystemNamespace) + var nodeMetricsList *v1beta1.NodeMetricsList + err = retrier.Do(ctx, func() error { + nodeMetricsList, err = g.metricsClient.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("error listing node metrics: %w", err) + } + return nil + }) + if err != nil { + log.Println("Error getting node metrics:", err) + } + + var podMetricsList *v1beta1.PodMetricsList + err = retrier.Do(ctx, func() error { + podMetricsList, err = g.metricsClient.MetricsV1beta1().PodMetricses(common.KubeSystemNamespace).List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return fmt.Errorf("error listing pod metrics: %w", err) + } + return nil + }) + if err != nil { + log.Println("Error getting pod metrics:", err) + } var allPodsHealth []metric timestamp := time.Now().UTC().Format(time.RFC3339) + // List -> map for lookup + podMetrics := make(map[string]*v1beta1.PodMetrics) + for i := range podMetricsList.Items { + podMetrics[podMetricsList.Items[i].Name] = podMetricsList.Items[i].DeepCopy() + } + + // List -> map for lookup + nodeMetrics := make(map[string]*v1beta1.NodeMetrics) + for i := range nodeMetricsList.Items { + nodeMetrics[nodeMetricsList.Items[i].Name] = nodeMetricsList.Items[i].DeepCopy() + } + for _, pod := range pods.Items { var podHealth metric = make(map[string]string) - podMetrics, err := podMetricsInt.Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return nil, errors.Wrap(err, "error getting pod metrics") - } - podMem := resource.MustParse("0") podCpu := resource.MustParse("0") - for _, cm := range podMetrics.Containers { - podMem.Add(cm.Usage["memory"]) - podCpu.Add(cm.Usage["cpu"]) + if podMetrics[pod.Name] != nil { + for _, cm := range podMetrics[pod.Name].Containers { + podMem.Add(cm.Usage["memory"]) + podCpu.Add(cm.Usage["cpu"]) + } } - nodeMetrics, err := nodesMetricsInt.Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) - if err != nil { - return nil, errors.Wrap(err, "error getting node metrics") + nodeMem := resource.MustParse("0") + nodeCPU := resource.MustParse("0") + if nodeMetrics[pod.Spec.NodeName] != nil { + nodeMem = nodeMetrics[pod.Spec.NodeName].Usage["memory"] + nodeCPU = nodeMetrics[pod.Spec.NodeName].Usage["cpu"] } - nodeMem := nodeMetrics.Usage["memory"] - nodeCpu := nodeMetrics.Usage["cpu"] - restarts := 0 for _, containerStatus := range pod.Status.ContainerStatuses { @@ -196,13 +276,14 @@ func (g *GetAndPublishMetrics) getMetrics(ctx context.Context, k8sClient *kubern } podHealth["timestamp"] = timestamp - podHealth["pod"] = pod.Name - podHealth["podCpuInMilliCore"] = fmt.Sprintf("%d", podCpu.MilliValue()) - podHealth["podMemoryInMB"] = fmt.Sprintf("%d", podMem.Value()/(1048576)) - podHealth["podRestarts"] = fmt.Sprintf("%d", restarts) - podHealth["node"] = pod.Spec.NodeName - podHealth["nodeCpuInMilliCore"] = fmt.Sprintf("%d", nodeCpu.MilliValue()) - podHealth["nodeMemoryInMB"] = fmt.Sprintf("%d", nodeMem.Value()/(1048576)) + podHealth["retinaPod"] = pod.Name + podHealth["podStatus"] = string(pod.Status.Phase) + podHealth["podCpuInMilliCore"] = strconv.FormatInt(podCpu.MilliValue(), 10) + podHealth["podMemoryInMB"] = strconv.FormatInt(podMem.Value()/(1048576), 10) + podHealth["podRestarts"] = strconv.FormatInt(int64(restarts), 10) + podHealth["retinaNode"] = pod.Spec.NodeName + podHealth["nodeCpuInMilliCore"] = strconv.FormatInt(nodeCPU.MilliValue(), 10) + podHealth["nodeMemoryInMB"] = strconv.FormatInt(nodeMem.Value()/(1048576), 10) allPodsHealth = append(allPodsHealth, podHealth) @@ -210,3 +291,13 @@ func (g *GetAndPublishMetrics) getMetrics(ctx context.Context, k8sClient *kubern return allPodsHealth, nil } + +func (g *GetAndPublishMetrics) SetAppInsightsKey(appInsightsKey string) *GetAndPublishMetrics { + g.appInsightsKey = appInsightsKey + return g +} + +func (g *GetAndPublishMetrics) SetOutputFilePath(outputFilePath string) *GetAndPublishMetrics { + g.outputFilePath = outputFilePath + return g +} diff --git a/test/e2e/framework/scaletest/options.go b/test/e2e/framework/scaletest/options.go index 6b5284422b..a7d27683b6 100644 --- a/test/e2e/framework/scaletest/options.go +++ b/test/e2e/framework/scaletest/options.go @@ -37,4 +37,5 @@ type Options struct { numRealPods int LabelsToGetMetrics map[string]string AdditionalTelemetryProperty map[string]string + CleanUp bool } diff --git a/test/e2e/jobs/scale.go b/test/e2e/jobs/scale.go index 89215785c1..58b5d49864 100644 --- a/test/e2e/jobs/scale.go +++ b/test/e2e/jobs/scale.go @@ -4,6 +4,7 @@ import ( "os" "time" + "github.com/microsoft/retina/test/e2e/common" "github.com/microsoft/retina/test/e2e/framework/kubernetes" "github.com/microsoft/retina/test/e2e/framework/scaletest" "github.com/microsoft/retina/test/e2e/framework/types" @@ -15,7 +16,7 @@ func DefaultScaleTestOptions() scaletest.Options { MaxKwokPodsPerNode: 0, NumKwokDeployments: 0, NumKwokReplicas: 0, - MaxRealPodsPerNode: 100, + MaxRealPodsPerNode: 250, NumRealDeployments: 1000, RealPodType: "kapinger", NumRealReplicas: 40, @@ -32,7 +33,7 @@ func DefaultScaleTestOptions() scaletest.Options { DeletePodsInterval: 60 * time.Second, DeleteRealPods: false, DeletePodsTimes: 1, - DeleteLabels: false, + DeleteLabels: true, DeleteLabelsInterval: 60 * time.Second, DeleteLabelsTimes: 1, DeleteNetworkPolicies: false, @@ -40,6 +41,7 @@ func DefaultScaleTestOptions() scaletest.Options { DeleteNetworkPoliciesTimes: 1, LabelsToGetMetrics: map[string]string{}, AdditionalTelemetryProperty: map[string]string{}, + CleanUp: true, } } @@ -63,14 +65,18 @@ func ScaleTest(opt *scaletest.Options) *types.Job { job.AddStep(&kubernetes.CreateNamespace{}, nil) - job.AddStep(&scaletest.GetAndPublishMetrics{ + // There's a known limitation on leaving empty fields in Steps. + // Set methods are used to set private fields and keep environment variables accessed within jobs, rather then spread through steps. + job.AddStep((&scaletest.GetAndPublishMetrics{ Labels: opt.LabelsToGetMetrics, AdditionalTelemetryProperty: opt.AdditionalTelemetryProperty, - OutputFilePath: os.Getenv("OUTPUT_FILEPATH"), - }, &types.StepOptions{ - SkipSavingParametersToJob: true, - RunInBackgroundWithID: "get-metrics", - }) + }). + SetOutputFilePath(os.Getenv(common.OutputFilePathEnv)). + SetAppInsightsKey(os.Getenv(common.AzureAppInsightsKeyEnv)), + &types.StepOptions{ + SkipSavingParametersToJob: true, + RunInBackgroundWithID: "get-metrics", + }) job.AddStep(&scaletest.CreateResources{ NumKwokDeployments: opt.NumKwokDeployments, @@ -111,7 +117,9 @@ func ScaleTest(opt *scaletest.Options) *types.Job { BackgroundID: "get-metrics", }, nil) - job.AddStep(&kubernetes.DeleteNamespace{}, nil) + if opt.CleanUp { + job.AddStep(&kubernetes.DeleteNamespace{}, nil) + } return job } diff --git a/test/e2e/scale_test.go b/test/e2e/scale_test.go index 6769dccc09..687d32ceb5 100644 --- a/test/e2e/scale_test.go +++ b/test/e2e/scale_test.go @@ -58,7 +58,7 @@ func TestE2ERetina_Scale(t *testing.T) { NumDeployments := os.Getenv("NUM_DEPLOYMENTS") NumReplicas := os.Getenv("NUM_REPLICAS") - NumNetworkPolicies := os.Getenv("NUM_NET_POL") + NumNetworkPolicies := os.Getenv("NUM_NETPOLS") CleanUp := os.Getenv("CLEANUP") if NumDeployments != "" { @@ -75,7 +75,7 @@ func TestE2ERetina_Scale(t *testing.T) { require.NoError(t, err) } if CleanUp != "" { - opt.DeleteLabels, err = strconv.ParseBool(CleanUp) + opt.CleanUp, err = strconv.ParseBool(CleanUp) require.NoError(t, err) }