diff --git a/.kokoro/psm_interop_kokoro_lib.sh b/.kokoro/psm_interop_kokoro_lib.sh index 35b1a904..08a4d919 100644 --- a/.kokoro/psm_interop_kokoro_lib.sh +++ b/.kokoro/psm_interop_kokoro_lib.sh @@ -318,6 +318,7 @@ psm::csm::get_tests() { "gamma.affinity_test" "gamma.affinity_session_drain_test" "gamma.csm_observability_test" + "gamma.csm_observability_test_with_injection" "app_net_ssa_test" "app_net_csm_observability_test" ) @@ -781,7 +782,7 @@ activate_gke_cluster() { GKE_CLUSTER_ZONE="us-central1-a" ;; GKE_CLUSTER_PSM_CSM) - GKE_CLUSTER_NAME="psm-interop-csm" + GKE_CLUSTER_NAME="psm-interop-csm-gateway" GKE_CLUSTER_REGION="us-central1" ;; GKE_CLUSTER_PSM_GAMMA) diff --git a/config/grpc-testing.cfg b/config/grpc-testing.cfg index 2e89c7eb..272ea795 100644 --- a/config/grpc-testing.cfg +++ b/config/grpc-testing.cfg @@ -1,5 +1,6 @@ --flagfile=config/common.cfg --project=grpc-testing +--project_number=830293263384 --network=default-vpc --gcp_service_account=xds-k8s-interop-tests@grpc-testing.iam.gserviceaccount.com --private_api_key_secret_name=projects/830293263384/secrets/xds-interop-tests-private-api-access-key diff --git a/config/local-dev.cfg.example b/config/local-dev.cfg.example index 5528d090..581013eb 100644 --- a/config/local-dev.cfg.example +++ b/config/local-dev.cfg.example @@ -7,6 +7,7 @@ ## Project settings --project=${PROJECT_ID} +--project_number=${PROJECT_NUMBER} --gcp_service_account=${WORKLOAD_SA_EMAIL} --private_api_key_secret_name=projects/${PROJECT_NUMBER}/secrets/xds-interop-tests-private-api-access-key diff --git a/framework/infrastructure/k8s.py b/framework/infrastructure/k8s.py index 8562f16e..0fd91336 100644 --- a/framework/infrastructure/k8s.py +++ b/framework/infrastructure/k8s.py @@ -172,7 +172,7 @@ def grpc_route(self, version: str) -> dynamic_res.Resource: api_name = "gateway.networking.k8s.io" kind = "GRPCRoute" supported_versions = { - "v1alpha2", + "v1", } if version not in supported_versions: raise NotImplementedError( @@ -185,7 +185,7 @@ def grpc_route(self, version: str) -> dynamic_res.Resource: def http_route(self, version: str) -> dynamic_res.Resource: api_name = "gateway.networking.k8s.io" kind = "HTTPRoute" - supported_versions = {"v1alpha2", "v1beta1"} + supported_versions = {"v1", "v1beta1"} if version not in supported_versions: raise NotImplementedError( f"{kind} {api_name}/{version} not implemented." @@ -331,7 +331,7 @@ def api_gke_mesh(self) -> dynamic_res.Resource: @functools.cache def api_grpc_route(self) -> dynamic_res.Resource: return self._get_dynamic_api( - "gateway.networking.k8s.io/v1alpha2", + "gateway.networking.k8s.io/v1", "GRPCRoute", ) diff --git a/framework/xds_flags.py b/framework/xds_flags.py index 3960b27f..45e606f5 100644 --- a/framework/xds_flags.py +++ b/framework/xds_flags.py @@ -22,6 +22,9 @@ PROJECT = flags.DEFINE_string( "project", default=None, help="(required) GCP Project ID." ) +PROJECT_NUMBER = flags.DEFINE_string( + "project_number", default=None, help="GCP Project Number." +) RESOURCE_PREFIX = flags.DEFINE_string( "resource_prefix", default=None, diff --git a/framework/xds_k8s_testcase.py b/framework/xds_k8s_testcase.py index c302af89..ce9996c6 100644 --- a/framework/xds_k8s_testcase.py +++ b/framework/xds_k8s_testcase.py @@ -132,6 +132,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase): secondary_k8s_api_manager: Optional[k8s.KubernetesApiManager] = None network: str project: str + project_number: str resource_prefix: str resource_suffix: str = "" # Whether to randomize resources names for each test by appending a @@ -176,6 +177,7 @@ def setUpClass(cls): # GCP cls.project = xds_flags.PROJECT.value + cls.project_number = xds_flags.PROJECT_NUMBER.value cls.network = xds_flags.NETWORK.value cls.gcp_service_account = xds_k8s_flags.GCP_SERVICE_ACCOUNT.value cls.td_bootstrap_image = xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value diff --git a/kubernetes-manifests/gamma/client.deployment.yaml b/kubernetes-manifests/gamma/client.deployment.yaml new file mode 100644 index 00000000..724238a7 --- /dev/null +++ b/kubernetes-manifests/gamma/client.deployment.yaml @@ -0,0 +1,77 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${deployment_name} + namespace: ${namespace_name} + labels: + app: ${app_label} + deployment_id: ${deployment_id} + owner: xds-k8s-interop-test +spec: + replicas: 1 + selector: + matchLabels: + app: ${app_label} + deployment_id: ${deployment_id} + template: + metadata: + labels: + app: ${app_label} + deployment_id: ${deployment_id} + owner: xds-k8s-interop-test + spec: + % if service_account_name: + serviceAccountName: ${service_account_name} + % endif + containers: + - name: ${deployment_name} + image: ${image_name} + imagePullPolicy: Always + startupProbe: + tcpSocket: + port: ${stats_port} + periodSeconds: 3 + ## Extend the number of probes well beyond the duration of the test + ## driver waiting for the container to start. + failureThreshold: 1000 + args: + - "--server=${server_target}" + - "--stats_port=${stats_port}" + - "--qps=${qps}" + - "--rpc=${rpc}" + - "--metadata=${metadata}" + % if request_payload_size > 0: + - "--request_payload_size=${request_payload_size}" + % endif + % if response_payload_size > 0: + - "--response_payload_size=${response_payload_size}" + % endif + - "--print_response=${print_response}" + % if enable_csm_observability: + - "--enable_csm_observability=true" + % endif + ## #################################################################### + ## TODO(cl/698639274): Remove the these env once this CL is released. + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + ## ##################################################################### + ports: + - containerPort: ${stats_port} + resources: + limits: + cpu: 800m + memory: 512Mi + requests: + cpu: 100m + memory: 512Mi +... diff --git a/kubernetes-manifests/gamma/namespace.yaml b/kubernetes-manifests/gamma/namespace.yaml new file mode 100644 index 00000000..a3af725d --- /dev/null +++ b/kubernetes-manifests/gamma/namespace.yaml @@ -0,0 +1,10 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: ${namespace_name} + labels: + name: ${namespace_name} + owner: xds-k8s-interop-test + mesh.cloud.google.com/csm-injection: proxyless +... diff --git a/kubernetes-manifests/gamma/route_grpc.yaml b/kubernetes-manifests/gamma/route_grpc.yaml index b446fb28..c2fcd539 100644 --- a/kubernetes-manifests/gamma/route_grpc.yaml +++ b/kubernetes-manifests/gamma/route_grpc.yaml @@ -1,6 +1,6 @@ --- kind: GRPCRoute -apiVersion: gateway.networking.k8s.io/v1alpha2 +apiVersion: gateway.networking.k8s.io/v1 metadata: name: ${route_name} namespace: ${namespace_name} diff --git a/kubernetes-manifests/gamma/server.deployment.yaml b/kubernetes-manifests/gamma/server.deployment.yaml new file mode 100644 index 00000000..eaed0f25 --- /dev/null +++ b/kubernetes-manifests/gamma/server.deployment.yaml @@ -0,0 +1,75 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${deployment_name} + namespace: ${namespace_name} + labels: + app: ${app_label} + deployment_id: ${deployment_id} + owner: xds-k8s-interop-test +spec: + replicas: ${replica_count} + selector: + matchLabels: + app: ${app_label} + deployment_id: ${deployment_id} + template: + metadata: + labels: + app: ${app_label} + deployment_id: ${deployment_id} + owner: xds-k8s-interop-test + spec: + % if service_account_name: + serviceAccountName: ${service_account_name} + % endif + % if termination_grace_period_seconds: + terminationGracePeriodSeconds: ${termination_grace_period_seconds} + % endif + containers: + - name: ${deployment_name} + image: ${image_name} + imagePullPolicy: Always + ## TODO:(lsafran) test without this when ipv6 HC fw rules are in place + % if address_type != "ipv6": + startupProbe: + tcpSocket: + port: ${test_port} + periodSeconds: 3 + ## Extend the number of probes well beyond the duration of the test + ## driver waiting for the container to start. + failureThreshold: 1000 + % endif + args: + - "--port=${test_port}" + % if enable_csm_observability: + - "--enable_csm_observability=true" + % endif + % if address_type: + - "--address_type=${address_type}" + % endif + ## #################################################################### + ## TODO(cl/698639274): Remove the these env once this CL is released. + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + ## ##################################################################### + ports: + - containerPort: ${test_port} + resources: + limits: + cpu: 800m + memory: 512Mi + requests: + cpu: 100m + memory: 512Mi +... diff --git a/tests/gamma/csm_observability_test_with_injection.py b/tests/gamma/csm_observability_test_with_injection.py new file mode 100644 index 00000000..ddc366ad --- /dev/null +++ b/tests/gamma/csm_observability_test_with_injection.py @@ -0,0 +1,627 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import dataclasses +import logging +import time +from typing import Any, Callable, Iterable, TextIO +import unittest.mock + +from absl import flags +from absl.testing import absltest +from google.api_core import exceptions as gapi_errors +from google.api_core import retry as gapi_retries +from google.cloud import monitoring_v3 +import requests +from requests.exceptions import RequestException +import yaml + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase +from framework.helpers import skips +from framework.test_app.runners.k8s import gamma_server_runner +from framework.test_app.runners.k8s import k8s_base_runner +from framework.test_app.runners.k8s import k8s_xds_client_runner + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +_Lang = skips.Lang + +# Testing consts +TEST_RUN_SECS = 90 +CLIENT_QPS = 1 +REQUEST_PAYLOAD_SIZE = 27182 +RESPONSE_PAYLOAD_SIZE = 31415 +GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +CSM_WORKLOAD_NAME_SERVER = "psm-grpc-server" +CSM_WORKLOAD_NAME_CLIENT = "psm-grpc-client" +CSM_CANONICAL_SERVICE_NAME_SERVER = "deployment-" + CSM_WORKLOAD_NAME_SERVER +CSM_CANONICAL_SERVICE_NAME_CLIENT = "deployment-" + CSM_WORKLOAD_NAME_CLIENT +PROMETHEUS_HOST = "prometheus.googleapis.com" +METRIC_CLIENT_ATTEMPT_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_duration_seconds/histogram" +) +METRIC_CLIENT_ATTEMPT_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_started_total/counter" +) +METRIC_SERVER_CALL_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_server_call_duration_seconds/histogram" +) +METRIC_SERVER_CALL_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_server_call_started_total/counter" +) +HISTOGRAM_CLIENT_METRICS = ( + METRIC_CLIENT_ATTEMPT_SENT, + METRIC_CLIENT_ATTEMPT_RCVD, + METRIC_CLIENT_ATTEMPT_DURATION, +) +HISTOGRAM_SERVER_METRICS = ( + METRIC_SERVER_CALL_DURATION, + METRIC_SERVER_CALL_RCVD, + METRIC_SERVER_CALL_SENT, +) +COUNTER_CLIENT_METRICS = (METRIC_CLIENT_ATTEMPT_STARTED,) +COUNTER_SERVER_METRICS = (METRIC_SERVER_CALL_STARTED,) +HISTOGRAM_METRICS = HISTOGRAM_CLIENT_METRICS + HISTOGRAM_SERVER_METRICS +COUNTER_METRICS = COUNTER_CLIENT_METRICS + COUNTER_SERVER_METRICS +CLIENT_METRICS = HISTOGRAM_CLIENT_METRICS + COUNTER_CLIENT_METRICS +SERVER_METRICS = HISTOGRAM_SERVER_METRICS + COUNTER_SERVER_METRICS +ALL_METRICS = HISTOGRAM_METRICS + COUNTER_METRICS + +GammaServerRunner = gamma_server_runner.GammaServerRunner +ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +BuildQueryFn = Callable[[str, str], str] +ANY = unittest.mock.ANY + + +@dataclasses.dataclass(eq=False) +class MetricTimeSeries: + """ + This class represents one TimeSeries object + from monitoring_v3.ListTimeSeriesResponse. + """ + + # the metric name + name: str + # each time series has a monitored resource + resource_type: str + # each time series has a set of metric labels + metric_labels: dict[str, str] + # each time series has a set of monitored resource labels + resource_labels: dict[str, str] + # each time series has a set of data points + points: list[monitoring_v3.types.Point] + + @classmethod + def from_response( + cls, + name: str, + response: monitoring_v3.types.TimeSeries, + ) -> "MetricTimeSeries": + return cls( + name=name, + resource_type=response.resource.type, + metric_labels=dict(sorted(response.metric.labels.items())), + resource_labels=dict(sorted(response.resource.labels.items())), + points=list(response.points), + ) + + def pretty_print(self) -> str: + metric = dataclasses.asdict(self) + # too much noise to print all data points from a time series + metric.pop("points") + return yaml.dump(metric, sort_keys=False) + + +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + +class CsmObservabilityTestWithInjection( + xds_gamma_testcase.GammaXdsKubernetesTestCase +): + metric_client: monitoring_v3.MetricServiceClient + + @staticmethod + def is_supported(config: skips.TestConfig) -> bool: + if config.client_lang == _Lang.CPP: + return config.version_gte("v1.62.x") + elif config.client_lang in _Lang.GO | _Lang.JAVA | _Lang.PYTHON: + return config.version_gte("v1.65.x") + return False + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: + return super().initKubernetesClientRunner( + deployment_args=ClientDeploymentArgs( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + ), + namespace_template="gamma/namespace.yaml", + deployment_template="gamma/client.deployment.yaml", + ) + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: + return super().initKubernetesServerRunner( + deployment_args=gamma_server_runner.ServerDeploymentArgs( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_SERVER, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, + ), + namespace_template="gamma/namespace.yaml", + deployment_template="gamma/server.deployment.yaml", + ) + + def test_csm_observability(self): + # TODO(sergiitk): [GAMMA] Consider moving out custom gamma + # resource creation out of self.startTestServers() + with self.subTest("1_run_test_server"): + start_secs = int(time.time()) + test_server: _XdsTestServer = self.startTestServers()[0] + + with self.subTest("2_start_test_client"): + test_client: _XdsTestClient = self.startTestClient( + test_server, + qps=CLIENT_QPS, + request_payload_size=REQUEST_PAYLOAD_SIZE, + response_payload_size=RESPONSE_PAYLOAD_SIZE, + ) + + with self.subTest("3_test_server_received_rpcs_from_test_client"): + self.assertSuccessfulRpcs(test_client) + + with self.subTest("4_export_prometheus_metrics_data"): + logger.info( + "Letting test client run for %d seconds to produce metric data", + TEST_RUN_SECS, + ) + if self.server_runner.should_collect_logs_prometheus: + self._sleep_and_ping_prometheus_endpoint( + test_server, test_client + ) + else: + time.sleep(TEST_RUN_SECS) + + with self.subTest("5_query_cloud_monitoring_metrics"): + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + start_time={"seconds": start_secs}, + end_time={"seconds": end_secs}, + ) + server_histogram_results = self.query_metrics( + HISTOGRAM_SERVER_METRICS, + self.build_histogram_query, + self.server_namespace, + interval, + ) + client_histogram_results = self.query_metrics( + HISTOGRAM_CLIENT_METRICS, + self.build_histogram_query, + self.client_namespace, + interval, + ) + server_counter_results = self.query_metrics( + COUNTER_SERVER_METRICS, + self.build_counter_query, + self.server_namespace, + interval, + ) + client_counter_results = self.query_metrics( + COUNTER_CLIENT_METRICS, + self.build_counter_query, + self.client_namespace, + interval, + ) + all_results = { + **server_histogram_results, + **client_histogram_results, + **server_counter_results, + **client_counter_results, + } + self.assertNotEmpty(all_results, msg="No query metrics results") + + with self.subTest("6_check_metrics_time_series"): + for metric in ALL_METRICS: + # Every metric needs to exist in the query results + self.assertIn(metric, all_results) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("7_check_metrics_labels_histogram_client"): + expected_metric_labels = { + "csm_mesh_id": "proj-" + self.project_number, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER, + "csm_remote_workload_namespace_name": self.server_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_service_name": self.server_runner.service_name, + "csm_service_namespace_name": self.server_namespace, + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + self.filter_label_matcher_based_on_lang( + self.lang_spec.client_lang, expected_metric_labels + ) + for metric in HISTOGRAM_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("8_check_metrics_labels_histogram_server"): + expected_metric_labels = { + "csm_mesh_id": "proj-" + self.project_number, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT, + "csm_remote_workload_namespace_name": self.client_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + self.filter_label_matcher_based_on_lang( + self.lang_spec.server_lang, expected_metric_labels + ) + for metric in HISTOGRAM_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("9_check_metrics_labels_counter_client"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + self.filter_label_matcher_based_on_lang( + self.lang_spec.client_lang, expected_metric_labels + ) + for metric in COUNTER_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("10_check_metrics_labels_counter_server"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + self.filter_label_matcher_based_on_lang( + self.lang_spec.server_lang, expected_metric_labels + ) + for metric in COUNTER_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("11_check_client_resource_labels_client"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.client_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.client_namespace, + "project_id": self.project, + } + for metric in CLIENT_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("12_check_server_resource_labels_server"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.server_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.server_namespace, + "project_id": self.project, + } + for metric in SERVER_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # This tests whether each of the "bytes sent" histogram type metric + # should have at least 1 data point whose mean should converge to be + # close to the number of bytes being sent by the RPCs. + with self.subTest("13_check_bytes_sent_vs_data_points"): + for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, REQUEST_PAYLOAD_SIZE + ) + + for metric in (METRIC_CLIENT_ATTEMPT_RCVD, METRIC_SERVER_CALL_SENT): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, RESPONSE_PAYLOAD_SIZE + ) + + @classmethod + def build_histogram_query(cls, metric_type: str, namespace: str) -> str: + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + # The 'resource.labels.namespace' filter condition allows us to + # filter metrics just for the current test run. + return ( + f'metric.type = "{metric_type}" AND ' + 'metric.labels.grpc_status = "OK" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + @classmethod + def build_counter_query(cls, metric_type: str, namespace: str) -> str: + # For these num rpcs started counter metrics, they do not have the + # 'grpc_status' label + return ( + f'metric.type = "{metric_type}" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + @classmethod + def filter_label_matcher_based_on_lang( + cls, language: _Lang, label_matcher: dict[str, Any] + ) -> None: + """ + Filter label_matcher based on language. + """ + if language == _Lang.PYTHON: + # TODO(xuanwn): Remove this once https://github.com/open-telemetry/opentelemetry-python/issues/3072 is fixed. + label_matcher.pop("otel_scope_version", None) + label_matcher.pop("otel_scope_name", None) + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + namespace: str, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + # Based on default retry settings for list_time_series method: + # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 + # Modified: predicate extended to retry on a wider range of error types. + retry_settings = gapi_retries.Retry( + initial=0.1, + maximum=30.0, + multiplier=1.3, + predicate=gapi_retries.if_exception_type( + # Retry on 5xx, not just 503 ServiceUnavailable. This also + # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. + # 501 MethodNotImplemented not excluded because most likely + # reason we'd see this error is server misconfiguration, so we + # want to give it a chance to recovering this situation too. + gapi_errors.ServerError, + # Retry on 429/ResourceExhausted: recoverable rate limiting. + gapi_errors.TooManyRequests, + ), + deadline=90.0, + ) + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric, namespace), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + retry=retry_settings, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results + + def assertAtLeastOnePointWithinRange( + self, + points: list[monitoring_v3.types.Point], + ref_bytes: int, + tolerance: float = 0.05, + ): + """ + A helper function to check whether at least one of the "points" whose + mean should be within X% of ref_bytes. + """ + for point in points: + if ( + ref_bytes * (1 - tolerance) + < point.value.distribution_value.mean + < ref_bytes * (1 + tolerance) + ): + return + self.fail( + f"No data point with {ref_bytes}±{tolerance*100}% bytes found" + ) + + def _sleep_and_ping_prometheus_endpoint( + self, test_server: _XdsTestServer, test_client: _XdsTestClient + ): + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + server_prometheus_logger.write( + self._ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) + ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + client_prometheus_logger.write( + self._ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) + ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + @classmethod + def _ping_prometheus_endpoint( + cls, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. + """ + try: + prometheus_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(prometheus_log.text.splitlines()) + except RequestException as e: + logger.warning("Http request to Prometheus endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" + + +if __name__ == "__main__": + absltest.main()