Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorder CSM o11y test function order #29

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 87 additions & 87 deletions tests/gamma/csm_observability_test.py
sergiitk marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -168,93 +168,6 @@ def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner:
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER,
)

def assertAtLeastOnePointWithinRange(
self,
points: list[monitoring_v3.types.Point],
ref_bytes: int,
tolerance: float = 0.05,
):
"""
A helper function to check whether at least one of the "points" whose
mean should be within X% of ref_bytes.
"""
for point in points:
if (
ref_bytes * (1 - tolerance)
< point.value.distribution_value.mean
< ref_bytes * (1 + tolerance)
):
return
self.fail(
f"No data point with {ref_bytes}±{tolerance*100}% bytes found"
)

@classmethod
def build_histogram_query(cls, metric_type: str) -> str:
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
return (
f'metric.type = "{metric_type}" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

@classmethod
def build_counter_query(cls, metric_type: str) -> str:
# For these num rpcs started counter metrics, they do not have the
# 'grpc_status' label
return (
f'metric.type = "{metric_type}" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

def query_metrics(
self,
metric_names: Iterable[str],
build_query_fn: BuildQueryFn,
interval: monitoring_v3.TimeInterval,
) -> dict[str, MetricTimeSeries]:
"""
A helper function to make the cloud monitoring API call to query
metrics created by this test run.
"""
results = {}
for metric in metric_names:
logger.info("Requesting list_time_series for metric %s", metric)
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=build_query_fn(metric),
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
time_series = list(response)

self.assertLen(
time_series,
1,
msg=f"Query for {metric} should return exactly 1 time series."
f" Found {len(time_series)}.",
)

metric_time_series = MetricTimeSeries.from_response(
metric, time_series[0]
)
logger.info(
"Metric %s:\n%s", metric, metric_time_series.pretty_print()
)
results[metric] = metric_time_series
return results

def test_csm_observability(self):
# TODO(sergiitk): [GAMMA] Consider moving out custom gamma
# resource creation out of self.startTestServers()
Expand Down Expand Up @@ -446,6 +359,93 @@ def test_csm_observability(self):
all_results[metric].points, RESPONSE_PAYLOAD_SIZE
)

@classmethod
def build_histogram_query(cls, metric_type: str) -> str:
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
return (
f'metric.type = "{metric_type}" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

@classmethod
def build_counter_query(cls, metric_type: str) -> str:
# For these num rpcs started counter metrics, they do not have the
# 'grpc_status' label
return (
f'metric.type = "{metric_type}" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

def query_metrics(
self,
metric_names: Iterable[str],
build_query_fn: BuildQueryFn,
interval: monitoring_v3.TimeInterval,
) -> dict[str, MetricTimeSeries]:
"""
A helper function to make the cloud monitoring API call to query
metrics created by this test run.
"""
results = {}
for metric in metric_names:
logger.info("Requesting list_time_series for metric %s", metric)
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=build_query_fn(metric),
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
time_series = list(response)

self.assertLen(
time_series,
1,
msg=f"Query for {metric} should return exactly 1 time series."
f" Found {len(time_series)}.",
)

metric_time_series = MetricTimeSeries.from_response(
metric, time_series[0]
)
logger.info(
"Metric %s:\n%s", metric, metric_time_series.pretty_print()
)
results[metric] = metric_time_series
return results

def assertAtLeastOnePointWithinRange(
self,
points: list[monitoring_v3.types.Point],
ref_bytes: int,
tolerance: float = 0.05,
):
"""
A helper function to check whether at least one of the "points" whose
mean should be within X% of ref_bytes.
"""
for point in points:
if (
ref_bytes * (1 - tolerance)
< point.value.distribution_value.mean
< ref_bytes * (1 + tolerance)
):
return
self.fail(
f"No data point with {ref_bytes}±{tolerance*100}% bytes found"
)


if __name__ == "__main__":
absltest.main()
Loading