Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateful Session Affinity: Draining test #34

Merged
merged 49 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
963434b
Stateful Session Affinity: Draining test
sergiitk Feb 9, 2024
e5bba6d
Lint
sergiitk Feb 10, 2024
fa56794
Move AffinitySessionDrainTest to own file
sergiitk Feb 12, 2024
b672a51
Remove refreshTestServers logic
sergiitk Feb 20, 2024
4e08bdf
Improve plumbing code: introduce ServerDeploymentArgs
sergiitk Feb 21, 2024
4b567e8
Improve port forwarding logging
sergiitk Feb 22, 2024
d4e5f6c
Templating WIP
sergiitk Feb 22, 2024
bbaeb25
pre_stop_hook templating
sergiitk Feb 22, 2024
7839a8c
init container dockerfile wip
sergiitk Feb 22, 2024
ed1307d
Dockerfile build
sergiitk Feb 23, 2024
3e0116c
Docker build improvements - v0.0.4
sergiitk Feb 23, 2024
e8e451d
temporary cpp-server build that includes tini
sergiitk Feb 23, 2024
b3a41e0
Switch to an initContainers approach
sergiitk Feb 23, 2024
d701689
Kubernetes runners now track started pods: pod_name => k8s.V1Pod
sergiitk Feb 24, 2024
32a3081
almost lost it
sergiitk Feb 24, 2024
9ee8284
Minor fixes: _reset_state, pretty_format_statuses
sergiitk Feb 26, 2024
4e4c81f
_pod_stopped_logic wip
sergiitk Feb 27, 2024
bef957f
test wip
sergiitk Feb 27, 2024
9f58f79
save point
sergiitk Feb 29, 2024
f5945a3
another checkpoint
sergiitk Feb 29, 2024
6827c6e
cookie works
sergiitk Feb 29, 2024
c28b21d
printing works
sergiitk Feb 29, 2024
a9c1762
more progress on the test
sergiitk Feb 29, 2024
9694817
fix csds parsing
sergiitk Feb 29, 2024
fa6c734
Improve DumpedXdsConfig parsing logic
sergiitk Feb 29, 2024
996f16f
fix parsing bugs
sergiitk Feb 29, 2024
286c08f
move wait logic to wait_for_draining_endpoint_count
sergiitk Mar 1, 2024
b40cd67
Remove temporary cpp-server.Dockerfile
sergiitk Mar 4, 2024
1d40417
Merge branch 'main' into ssa-draining-test
sergiitk Mar 5, 2024
da1d80b
post-merge fixes
sergiitk Mar 5, 2024
8ced272
Undo uncesessary changes
sergiitk Mar 5, 2024
b83a63c
Clean servers with prestop hook WIP
sergiitk Mar 5, 2024
d9dfa68
Fix send_prestop_hook_release timeout propagation
sergiitk Mar 5, 2024
ac2f62d
Merge branch 'main' into ssa-draining-test
sergiitk Mar 5, 2024
84e2c53
cleanup unrelated changes
sergiitk Mar 5, 2024
18b9bed
Pod stopping logic
sergiitk Mar 6, 2024
4ce1288
checkpoint
sergiitk Mar 7, 2024
cdda674
I think we're done here
sergiitk Mar 7, 2024
59d5af5
Handle last todos
sergiitk Mar 7, 2024
63facf4
More todo updates
sergiitk Mar 7, 2024
020f1ed
simplify assertRpcsEventuallyGoToGivenServers retries
sergiitk Mar 7, 2024
f30d036
Add check for the draining server to not receive unpinned traffic
sergiitk Mar 7, 2024
cb1cffb
fix set math
sergiitk Mar 7, 2024
2e25921
Add missing copyright blurbs to docker files
sergiitk Mar 7, 2024
bc9ca33
More future todos
sergiitk Mar 7, 2024
18dbbfe
Fix prestop image readme
sergiitk Mar 7, 2024
7d90285
prestop image v0.0.5 - copyright notice
sergiitk Mar 7, 2024
66f87b7
prestop image v0.0.6 - newer alpine
sergiitk Mar 7, 2024
fcf1f2f
Documentation per feedback
sergiitk Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!protos/**
!docker/**
14 changes: 14 additions & 0 deletions docker/psm-prestop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# PSM Interop PreStop hook init container

### Building

From the repo root:

```sh
PRESTOP_INIT_VERSION="v0.0.4"
docker build -f ./docker/psm-prestop/prestop.Dockerfile -t "gcr.io/grpc-testing/xds-interop/prestop-hook:${PRESTOP_INIT_VERSION:-dev}" .
docker push "gcr.io/grpc-testing/xds-interop/prestop-hook:${PRESTOP_INIT_VERSION:-dev}"
```

Build in publishing steps will be automated.\
TODO(sergiitk): Implement automated build, publish.
19 changes: 19 additions & 0 deletions docker/psm-prestop/prestop-init-volume.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

readonly TARGET_DIR="${1:?Usage prestop-init-volume.sh TARGET_DIR}"
mkdir -vp "${TARGET_DIR}"
cp -rv ./grpcurl "${TARGET_DIR}"
cp -rv ./protos "${TARGET_DIR}"
43 changes: 43 additions & 0 deletions docker/psm-prestop/prestop.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# TODO(sergiitk): Implement automated build, publish.

FROM alpine:latest as grpcurl

WORKDIR /tmp/grpcurl/
ENV GRPCURL_URI="https://github.com/fullstorydev/grpcurl/releases/download/v1.8.9/grpcurl_1.8.9_linux_x86_64.tar.gz" \
GRPCURL_CHECKSUM="a422d1e8ad854a305c0dd53f2f2053da242211d3d1810e7addb40a041e309516"

ADD "$GRPCURL_URI" grpcurl.tar.gz
RUN \
echo "$GRPCURL_CHECKSUM grpcurl.tar.gz" | sha256sum -c - \
&& tar -xf grpcurl.tar.gz

# ---

FROM alpine:latest

# Environment
ENV APP_DIR=/usr/src/psm-prestop
WORKDIR "$APP_DIR"

# Provision grpcurl binary
COPY --from=grpcurl /tmp/grpcurl/grpcurl .

# Provision protos and the init script
COPY protos/grpc/testing/*.proto ./protos/grpc/testing/
COPY docker/psm-prestop/prestop-init-volume.sh .

ENTRYPOINT ["./prestop-init-volume.sh"]
40 changes: 35 additions & 5 deletions framework/helpers/retryers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def exponential_retryer_with_timeout(
check_result: Optional[CheckResultFn] = None,
logger: Optional[logging.Logger] = None,
log_level: Optional[int] = logging.DEBUG,
error_note: str = "",
) -> Retrying:
if logger is None:
logger = retryers_logger
Expand All @@ -77,7 +78,7 @@ def exponential_retryer_with_timeout(
retry_on_exceptions=retry_on_exceptions, check_result=check_result
)
retry_error_callback = _on_error_callback(
timeout=timeout, check_result=check_result
timeout=timeout, check_result=check_result, error_note=error_note
)
return Retrying(
retry=tenacity.retry_any(*retry_conditions),
Expand All @@ -99,6 +100,7 @@ def constant_retryer(
check_result: Optional[CheckResultFn] = None,
logger: Optional[logging.Logger] = None,
log_level: Optional[int] = logging.DEBUG,
error_note: str = "",
) -> Retrying:
if logger is None:
logger = retryers_logger
Expand All @@ -116,7 +118,10 @@ def constant_retryer(
retry_on_exceptions=retry_on_exceptions, check_result=check_result
)
retry_error_callback = _on_error_callback(
timeout=timeout, attempts=attempts, check_result=check_result
timeout=timeout,
attempts=attempts,
check_result=check_result,
error_note=error_note,
)
return Retrying(
retry=tenacity.retry_any(*retry_conditions),
Expand All @@ -132,6 +137,7 @@ def _on_error_callback(
timeout: Optional[timedelta] = None,
attempts: int = 0,
check_result: Optional[CheckResultFn] = None,
error_note: str = "",
):
"""A helper to propagate the initial state to the RetryError, so that
it can assemble a helpful message containing timeout/number of attempts.
Expand All @@ -143,6 +149,7 @@ def error_handler(retry_state: tenacity.RetryCallState):
timeout=timeout,
attempts=attempts,
check_result=check_result,
note=error_note,
)

return error_handler
Expand Down Expand Up @@ -205,7 +212,7 @@ def log_it(retry_state):

logger.log(
log_level,
"Retrying %s in %s seconds as it %s %s.",
"Retrying %s in %s seconds as it %s %s",
tenacity_utils.get_callback_name(retry_state.fn),
getattr(retry_state.next_action, "sleep"),
verb,
Expand All @@ -230,12 +237,18 @@ def __init__(
timeout: Optional[timedelta] = None,
attempts: int = 0,
check_result: Optional[CheckResultFn] = None,
note: str = "",
):
last_attempt: tenacity.Future = retry_state.outcome
super().__init__(last_attempt)

callback_name = tenacity_utils.get_callback_name(retry_state.fn)
self.message = f"Retry error calling {callback_name}:"
self.message = f"Retry error"
if retry_state.fn is None:
# Context manager
self.message += f":"
else:
callback_name = tenacity_utils.get_callback_name(retry_state.fn)
self.message += f" calling {callback_name}:"
if timeout:
self.message += f" timeout {timeout} (h:mm:ss) exceeded"
if attempts:
Expand All @@ -251,6 +264,9 @@ def __init__(
elif check_result:
self.message += " Check result callback returned False."

if note:
self.add_note(note)

def result(self, *, default=None):
return (
self.last_attempt.result()
Expand All @@ -265,6 +281,20 @@ def exception(self, *, default=None):
else default
)

def exception_str(self) -> str:
return f"Error: {self._exception_str(self.exception())}"

def result_str(self) -> str:
result = self.result()
return f"Result: {result}" if result is not None else "No result"

def reason_str(self):
return self.exception_str() if self.exception() else self.result_str()

@classmethod
def _exception_str(cls, err: Optional[BaseException]) -> str:
return f"{type(err).__name__}: {err}" if err else "???"

# TODO(sergiitk): Remove in py3.11, this will be built-in. See PEP 678.
def add_note(self, note: str):
self.note = note
Expand Down
51 changes: 39 additions & 12 deletions framework/infrastructure/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import pathlib
import threading
from typing import Any, Callable, List, Optional, Tuple
from typing import Any, Callable, Final, List, Optional, Tuple
import warnings

from kubernetes import client
Expand Down Expand Up @@ -273,9 +273,15 @@ class KubernetesNamespace: # pylint: disable=too-many-public-methods
_name: str

NEG_STATUS_ANNOTATION = "cloud.google.com/neg-status"
DELETE_GRACE_PERIOD_SEC: int = 5
WAIT_SHORT_TIMEOUT_SEC: int = 60
WAIT_SHORT_SLEEP_SEC: int = 1
# TODO(sergiitk): get rid of _SEC variables, only use timedelta
# timedelta.seconds: assumes none of the timeouts more than a day.
DELETE_GRACE_PERIOD: Final[_timedelta] = _timedelta(seconds=5)
DELETE_GRACE_PERIOD_SEC: Final[int] = DELETE_GRACE_PERIOD.seconds
WAIT_SHORT_TIMEOUT: Final[_timedelta] = _timedelta(minutes=1)
WAIT_SHORT_TIMEOUT_SEC: Final[int] = WAIT_SHORT_TIMEOUT.seconds
WAIT_SHORT_SLEEP: Final[_timedelta] = _timedelta(seconds=1)
WAIT_SHORT_SLEEP_SEC: Final[int] = WAIT_SHORT_SLEEP.seconds
# TODO(sergiitk): timedelta form for the rest
WAIT_MEDIUM_TIMEOUT_SEC: int = 5 * 60
WAIT_MEDIUM_SLEEP_SEC: int = 10
WAIT_LONG_TIMEOUT_SEC: int = 10 * 60
Expand Down Expand Up @@ -689,22 +695,41 @@ def delete_backend_policy(
grace_period_seconds=grace_period_seconds,
)

def delete_pod_async(
def delete_pod(
self,
name: str,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC,
*,
grace_period: Optional[_timedelta] = DELETE_GRACE_PERIOD,
) -> None:
# TODO(sergiitk): Do we need async? Won't it break error handling?
delete_options = client.V1DeleteOptions(propagation_policy="Foreground")

# Checking for None because 0 is valid grace period value, meaning
# immediate deletion. While not setting the field indicates the
# grace period defaults to k8s's setting (usually 30s).
if grace_period is not None:
delete_options.grace_period_seconds = int(
grace_period.total_seconds()
)

self._execute(
self._api.core.delete_namespaced_pod,
name=name,
namespace=self.name,
body=client.V1DeleteOptions(
propagation_policy="Foreground",
grace_period_seconds=grace_period_seconds,
),
async_req=True,
body=delete_options,
)

def wait_for_pod_deleted(
self,
name: str,
timeout: _timedelta = WAIT_SHORT_TIMEOUT,
retry_wait: _timedelta = WAIT_SHORT_SLEEP,
) -> None:
retryer = retryers.constant_retryer(
wait_fixed=retry_wait,
timeout=timeout,
check_result=lambda pod: pod is None,
)
retryer(self.get_pod, name)

def get(self) -> V1Namespace:
return self._get_resource(self._api.core.read_namespace, self.name)
Expand Down Expand Up @@ -1098,6 +1123,8 @@ def pretty_format_status(
f" {_helper_datetime.ago(metadata.creation_timestamp)}"
)
if metadata and metadata.deletion_timestamp:
# TODO(sergiitk): Handle the case with deletion_timestamp is in
# the future, (e.g. waiting on prestop while in grace period).
result.append(
f"Deletion requested: {metadata.deletion_timestamp};"
f" {_helper_datetime.ago(metadata.deletion_timestamp)}"
Expand Down
24 changes: 14 additions & 10 deletions framework/rpc/grpc_csds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
This contains helpers for gRPC services defined in
https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto
"""
import datetime as dt
import json
import logging
import re
Expand Down Expand Up @@ -158,23 +159,26 @@ def __str__(self) -> str:


class CsdsClient(framework.rpc.grpc.GrpcClientHelper):
stub: csds_pb2_grpc.ClientStatusDiscoveryServiceStub
STUB_CLASS: Final = csds_pb2_grpc.ClientStatusDiscoveryServiceStub
DEFAULT_RPC_DEADLINE: Final[dt.timedelta] = dt.timedelta(seconds=30)

def __init__(
self, channel: grpc.Channel, *, log_target: Optional[str] = ""
):
super().__init__(
channel,
csds_pb2_grpc.ClientStatusDiscoveryServiceStub,
log_target=log_target,
)

def fetch_client_status(self, **kwargs) -> Optional[ClientConfig]:
) -> None:
super().__init__(channel, self.STUB_CLASS, log_target=log_target)

def fetch_client_status(
self,
*,
timeout: dt.timedelta = DEFAULT_RPC_DEADLINE,
log_level: int = logging.INFO,
) -> Optional[ClientConfig]:
"""Fetches the active xDS configurations."""
response: ClientStatusResponse = self.call_unary_with_deadline(
rpc="FetchClientStatus",
req=_ClientStatusRequest(),
**kwargs,
log_level=log_level,
deadline_sec=int(timeout.total_seconds()),
)
response = cast(ClientStatusResponse, response)
if len(response.config) != 1:
Expand Down
36 changes: 36 additions & 0 deletions framework/rpc/grpc_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
https://github.com/grpc/grpc/blob/master/src/proto/grpc/testing/test.proto
"""
from collections.abc import Sequence
import datetime as dt
import logging
from typing import Any, Final, Optional, cast

Expand Down Expand Up @@ -282,6 +283,41 @@ def set_not_serving(self):
)


class HookServiceClient(framework.rpc.grpc.GrpcClientHelper):
gnossen marked this conversation as resolved.
Show resolved Hide resolved
STUB_CLASS: Final = test_pb2_grpc.HookServiceStub

# Override the default deadline: all requests expected to be short.
DEFAULT_RPC_DEADLINE: Final[dt.timedelta] = dt.timedelta(seconds=10)

def __init__(self, channel: grpc.Channel, log_target: Optional[str] = ""):
super().__init__(channel, self.STUB_CLASS, log_target=log_target)

def set_return_status(
self,
*,
timeout: dt.timedelta = DEFAULT_RPC_DEADLINE,
) -> None:
request = messages_pb2.SetReturnStatusRequest()
self.call_unary_with_deadline(
rpc="SetReturnStatus",
req=request,
log_level=logging.INFO,
deadline_sec=int(timeout.total_seconds()),
)

def clear_return_status(
self,
*,
timeout: dt.timedelta = DEFAULT_RPC_DEADLINE,
) -> None:
self.call_unary_with_deadline(
rpc="ClearReturnStatus",
req=empty_pb2.Empty(),
log_level=logging.INFO,
deadline_sec=int(timeout.total_seconds()),
)


class HealthClient(framework.rpc.grpc.GrpcClientHelper):
stub: health_pb2_grpc.HealthStub

Expand Down
3 changes: 3 additions & 0 deletions framework/test_app/client_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def csds(self) -> _CsdsClient:
log_target=f"{self.hostname}:{self.maintenance_port}",
)

def get_csds_parsed(self, **kwargs) -> Optional[grpc_csds.DumpedXdsConfig]:
return self.csds.fetch_client_status_parsed(**kwargs)

def get_load_balancer_stats(
self,
*,
Expand Down
Loading
Loading