Skip to content

Commit

Permalink
Add assertions to CSM Observability Test
Browse files Browse the repository at this point in the history
  • Loading branch information
stanley-cheung committed Jan 21, 2024
1 parent 62c8a9e commit 91410c0
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 13 deletions.
19 changes: 19 additions & 0 deletions framework/test_app/runners/k8s/gamma_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class GammaServerRunner(KubernetesServerRunner):
be_policy: Optional[k8s.GcpBackendPolicy] = None
termination_grace_period_seconds: Optional[int] = None
pre_stop_hook: bool = False
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None

route_name: str
frontend_service_name: str
Expand Down Expand Up @@ -210,6 +212,17 @@ def run( # pylint: disable=arguments-differ
enable_csm_observability=enable_csm_observability,
)

# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
)

servers = self._make_servers_for_deployment(
replica_count,
test_port=test_port,
Expand Down Expand Up @@ -296,6 +309,12 @@ def cleanup(self, *, force=False, force_namespace=False):
self._delete_service_account(self.service_account_name)
self.service_account = None

# Pod monitoring name is only set when CSM observability is enabled.
if self.pod_monitoring_name and (self.pod_monitoring or force):
self._delete_pod_monitoring(self.pod_monitoring_name)
self.pod_monitoring = None
self.pod_monitoring_name = None

self._cleanup_namespace(force=(force_namespace and force))
finally:
self._stop()
Expand Down
278 changes: 265 additions & 13 deletions tests/gamma/csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time

from absl import flags
from absl.testing import absltest
from google.api_core import exceptions as gapi_errors
from google.cloud import monitoring_v3

from framework import xds_gamma_testcase
from framework import xds_k8s_flags
from framework import xds_k8s_testcase
from framework.helpers import skips

Expand All @@ -29,6 +31,46 @@
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_Lang = skips.Lang

# Testing consts
_TEST_RUN_SECS = 90
_CLIENT_SENT_BYTES = 271828
_SERVER_SENT_BYTES = 314159
_HISTOGRAM_METRICS = [
"prometheus.googleapis.com/grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram",
"prometheus.googleapis.com/grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram",
"prometheus.googleapis.com/grpc_client_attempt_duration_seconds/histogram",
"prometheus.googleapis.com/grpc_server_call_duration_seconds/histogram",
"prometheus.googleapis.com/grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram",
"prometheus.googleapis.com/grpc_server_call_sent_total_compressed_message_size_bytes/histogram",
]
_COUNTER_METRICS = [
"prometheus.googleapis.com/grpc_client_attempt_started_total/counter",
"prometheus.googleapis.com/grpc_server_call_started_total/counter",
]
_ALL_METRICS = _HISTOGRAM_METRICS + _COUNTER_METRICS
_HISTOGRAM_CLIENT_METRICS = [
metric for metric in _HISTOGRAM_METRICS if "client" in metric
]
_HISTOGRAM_SERVER_METRICS = [
metric for metric in _HISTOGRAM_METRICS if "server" in metric
]
_COUNTER_CLIENT_METRICS = [
metric for metric in _COUNTER_METRICS if "client" in metric
]
_COUNTER_SERVER_METRICS = [
metric for metric in _COUNTER_METRICS if "server" in metric
]
_CLIENT_SENT_METRICS = [
metric
for metric in _HISTOGRAM_METRICS
if "client_attempt_sent" in metric or "server_call_rcvd" in metric
]
_SERVER_SENT_METRICS = [
metric
for metric in _HISTOGRAM_METRICS
if "client_attempt_rcvd" in metric or "server_call_sent" in metric
]


class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase):
metric_client: monitoring_v3.MetricServiceClient
Expand All @@ -45,36 +87,246 @@ def setUpClass(cls):
super().setUpClass()
cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3")

def _query_metrics(self, metric_names, filter_str, interval):
results = {}
for metric in metric_names:
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=filter_str % metric,
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
time_series = []
for series in response:
time_series.append(series)
results[metric] = time_series
return results

def _at_least_one_point_within_range(self, points, ref_bytes):
for point in points:
if point.value.distribution_value.mean > (
ref_bytes * 0.95
) and point.value.distribution_value.mean < (ref_bytes * 1.05):
return True
return False

def test_csm_observability(self):
# TODO(sergiitk): [GAMMA] Consider moving out custom gamma
# resource creation out of self.startTestServers()
with self.subTest("1_run_test_server"):
start_secs = int(time.time())
test_server: _XdsTestServer = self.startTestServers(
enable_csm_observability=True
)[0]

with self.subTest("2_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(
test_server, enable_csm_observability=True
test_server,
enable_csm_observability=True,
request_payload_size=_CLIENT_SENT_BYTES,
response_payload_size=_SERVER_SENT_BYTES,
)
logger.info(
"Letting test client run for %d seconds", _TEST_RUN_SECS
)
time.sleep(_TEST_RUN_SECS)

with self.subTest("3_test_server_received_rpcs_from_test_client"):
self.assertSuccessfulRpcs(test_client)

# For now, this just makes a bogus call to ensure metrics client
# connected to the remote API service.
with self.subTest("4_check_monitoring_metric_client"):
with self.assertRaises(gapi_errors.GoogleAPICallError) as cm:
self.metric_client.list_metric_descriptors(
request=monitoring_v3.ListMetricDescriptorsRequest(
name="whatever",
with self.subTest("4_query_monitoring_metric_client"):
end_secs = int(time.time())
interval = monitoring_v3.TimeInterval(
{
"end_time": {"seconds": end_secs, "nanos": 0},
"start_time": {"seconds": start_secs, "nanos": 0},
}
)
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
filter_str = (
'metric.type = "%s" AND '
'metric.labels.grpc_status = "OK" AND '
'metric.labels.grpc_method = "grpc.testing.TestService/UnaryCall"'
)
histogram_results = self._query_metrics(
_HISTOGRAM_METRICS, filter_str, interval
)

# The counter metrics do not have the 'grpc_status' label
filter_str = (
'metric.type = "%s" AND '
'metric.labels.grpc_method = "grpc.testing.TestService/UnaryCall"'
)
counter_results = self._query_metrics(
_COUNTER_METRICS, filter_str, interval
)

all_results = {**histogram_results, **counter_results}

with self.subTest("5_check_metrics_time_series"):
for metric, time_series in all_results.items():
# There should be exactly 1 time series with these label value
# combinations that we are expecting.
self.assertEqual(1, len(time_series))

# Testing whether the metrics have the right set of metrics label keys
with self.subTest("6_check_metrics_labels"):
for metric in _HISTOGRAM_CLIENT_METRICS:
metric_labels = all_results[metric][0].metric.labels
for label_key in [
"csm_mesh_id",
"csm_remote_workload_canonical_service",
"csm_remote_workload_type",
"csm_service_name",
"csm_service_namespace_name",
"csm_workload_canonical_service",
"grpc_method",
"grpc_status",
"grpc_target",
"otel_scope_name",
"otel_scope_version",
"pod",
]:
self.assertIn(label_key, metric_labels)

for metric in _HISTOGRAM_SERVER_METRICS:
metric_labels = all_results[metric][0].metric.labels
for label_key in [
"grpc_method",
"grpc_status",
"otel_scope_name",
"otel_scope_version",
"pod",
]:
self.assertIn(label_key, metric_labels)

for metric in _COUNTER_CLIENT_METRICS:
metric_labels = all_results[metric][0].metric.labels
for label_key in [
"grpc_method",
"grpc_target",
"otel_scope_name",
"otel_scope_version",
"pod",
]:
self.assertIn(label_key, metric_labels)

for metric in _COUNTER_SERVER_METRICS:
metric_labels = all_results[metric][0].metric.labels
for label_key in [
"grpc_method",
"otel_scope_name",
"otel_scope_version",
"pod",
]:
self.assertIn(label_key, metric_labels)

# Testing whether the metrics have the right set of monitored resource
# label keys
with self.subTest("7_check_resource_labels"):
# all metrics have the same set of monitored resource labels
for metric in _ALL_METRICS:
resource_labels = all_results[metric][0].resource.labels
for label_key in [
"cluster",
"instance",
"job",
"location",
"namespace",
"project_id",
]:
self.assertIn(label_key, resource_labels)

# Testing the values of metric labels
with self.subTest("8_check_metrics_label_values"):
for metric, time_series in all_results.items():
series = time_series[0]
self.assertEqual(metric, series.metric.type)
for label_key, label_value in series.metric.labels.items():
if label_key == "pod" and "client" in metric:
self.assertEqual(test_client.hostname, label_value)
elif label_key == "pod" and "server" in metric:
self.assertEqual(test_server.hostname, label_value)
elif label_key == "grpc_method":
self.assertEqual(
"grpc.testing.TestService/UnaryCall", label_value
)
elif label_key == "grpc_target":
self.assertIn(self.server_namespace, label_value)
elif label_key == "csm_service_name":
self.assertEqual(
self.server_runner.service_name, label_value
)
elif label_key == "csm_service_namespace_name":
self.assertEqual(self.server_namespace, label_value)

# Testing the values of monitored resource labels
with self.subTest("9_check_resource_label_values"):
for metric, time_series in all_results.items():
series = time_series[0]
self.assertEqual("prometheus_target", series.resource.type)
for label_key, label_value in series.resource.labels.items():
if label_key == "project_id":
self.assertEqual(self.project, label_value)
elif label_key == "namespace" and "client" in metric:
self.assertEqual(self.client_namespace, label_value)
elif label_key == "namespace" and "server" in metric:
self.assertEqual(self.server_namespace, label_value)
elif label_key == "job" and "client" in metric:
# the "job" label on the monitored resource refers to
# the GMP PodMonitoring resource
self.assertEqual(
self.client_runner.pod_monitoring_name,
label_value,
)
elif label_key == "job" and "server" in metric:
# the "job" label on the monitored resource refers to
# the GMP PodMonitoring resource
self.assertEqual(
self.server_runner.pod_monitoring_name,
label_value,
)
elif label_key == "cluster":
self.assertIn(
label_value, xds_k8s_flags.KUBE_CONTEXT.value
)
elif label_key == "instance" and "client" in metric:
# the "instance" label on the monitored resource refers
# to the GKE pod and port
self.assertIn(test_client.hostname, label_value)
elif label_key == "instance" and "server" in metric:
# the "instance" label on the monitored resource refers
# to the GKE pod and port
self.assertIn(test_server.hostname, label_value)

# This tests whether each metric should have at least 1 data point
# whose mean should converge to be close to the number of bytes being
# sent by the RPCs.
with self.subTest("10_check_bytes_sent_vs_data_points"):
for metric in _CLIENT_SENT_METRICS:
self.assertTrue(
self._at_least_one_point_within_range(
all_results[metric][0].points, _CLIENT_SENT_BYTES
)
)

for metric in _SERVER_SENT_METRICS:
self.assertTrue(
self._at_least_one_point_within_range(
all_results[metric][0].points, _SERVER_SENT_BYTES
)
)
err = cm.exception
self.assertIsInstance(err, gapi_errors.InvalidArgument)
self.assertIsNotNone(err.grpc_status_code)
self.assertStartsWith(err.message, "Name must begin with")
self.assertEndsWith(err.message, " got: whatever")


if __name__ == "__main__":
Expand Down

0 comments on commit 91410c0

Please sign in to comment.