Skip to content

Commit

Permalink
Add dualstack support to the framework behind flag (#101)
Browse files Browse the repository at this point in the history
Based on @larry-safran's proof of concept PR
#88
  • Loading branch information
purnesh42H authored Jul 23, 2024
1 parent fc57159 commit 00199ac
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 5 deletions.
2 changes: 2 additions & 0 deletions framework/infrastructure/gcp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def compute(self, version: str):
return self._build_from_discovery_v1(api_name, version)
elif version == "v1alpha":
return self._build_from_discovery_v1(api_name, "alpha")
elif version == "v1beta":
return self._build_from_discovery_v1(api_name, "beta")

raise NotImplementedError(f"Compute {version} not supported")

Expand Down
5 changes: 5 additions & 0 deletions framework/infrastructure/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def create_backend_service_traffic_director(
subset_size: Optional[int] = None,
locality_lb_policies: Optional[List[dict]] = None,
outlier_detection: Optional[dict] = None,
enable_dualstack: bool = False,
) -> "GcpResource":
if not isinstance(protocol, self.BackendServiceProtocol):
raise TypeError(f"Unexpected Backend Service protocol: {protocol}")
Expand All @@ -160,6 +161,10 @@ def create_backend_service_traffic_director(
"healthChecks": [health_check.url],
"protocol": protocol.name,
}
# If add dualstack support is specified True, config the backend service
# to support IPv6
if enable_dualstack:
body["ipAddressSelectionPolicy"] = "PREFER_IPV6"
# If affinity header is specified, config the backend service to support
# affinity, and set affinity header to the one given.
if affinity_header:
Expand Down
80 changes: 79 additions & 1 deletion framework/infrastructure/traffic_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ class TrafficDirectorManager: # pylint: disable=too-many-public-methods
URL_MAP_PATH_MATCHER_NAME: Final[str] = "path-matcher"

TARGET_PROXY_NAME: Final[str] = "target-proxy"
TARGET_PROXY_NAME_IPV6: Final[str] = "target-proxy-ipv6"
ALTERNATIVE_TARGET_PROXY_NAME: Final[str] = "target-proxy-alt"

FORWARDING_RULE_NAME: Final[str] = "forwarding-rule"
FORWARDING_RULE_NAME_IPV6: Final[str] = "forwarding-rule-ipv6"
ALTERNATIVE_FORWARDING_RULE_NAME: Final[str] = "forwarding-rule-alt"

HEALTH_CHECK_NAME: Final[str] = "health-check"
Expand Down Expand Up @@ -107,6 +109,7 @@ def __init__(
resource_suffix: str,
network: str = "default",
compute_api_version: str = "v1",
enable_dualstack: bool = False,
):
# API
self.compute = _ComputeV1(
Expand All @@ -121,6 +124,7 @@ def __init__(
self.network: str = network
self.resource_prefix: str = resource_prefix
self.resource_suffix: str = resource_suffix
self.enable_dualstack: bool = enable_dualstack

# Managed resources
self.health_check: Optional[GcpResource] = None
Expand All @@ -129,10 +133,12 @@ def __init__(
self.firewall_rule: Optional[GcpResource] = None
self.firewall_rule_ipv6: Optional[GcpResource] = None
self.target_proxy: Optional[GcpResource] = None
self.target_proxy_ipv6: Optional[GcpResource] = None
# TODO(sergiitk): remove this flag once target proxy resource loaded
self.target_proxy_is_http: bool = False
self.alternative_target_proxy: Optional[GcpResource] = None
self.forwarding_rule: Optional[GcpResource] = None
self.forwarding_rule_ipv6: Optional[GcpResource] = None
self.alternative_forwarding_rule: Optional[GcpResource] = None

# Backends.
Expand Down Expand Up @@ -171,13 +177,20 @@ def setup_routing_rule_map_for_grpc(self, service_host, service_port):
self.create_target_proxy()
self.create_forwarding_rule(service_port)

if self.enable_dualstack:
self.create_target_proxy_ipv6()
self.create_forwarding_rule_ipv6(service_port)

def cleanup(self, *, force=False):
# Cleanup in the reverse order of creation
self.delete_firewall_rules(force=force)
self.delete_forwarding_rule(force=force)
self.delete_alternative_forwarding_rule(force=force)
self.delete_target_http_proxy(force=force)
self.delete_target_grpc_proxy(force=force)
if self.enable_dualstack:
self.delete_forwarding_rule_ipv6(force=force)
self.delete_target_proxy_ipv6(force=force)
self.delete_alternative_target_grpc_proxy(force=force)
self.delete_url_map(force=force)
self.delete_alternative_url_map(force=force)
Expand Down Expand Up @@ -246,6 +259,7 @@ def create_backend_service(
affinity_header=affinity_header,
locality_lb_policies=locality_lb_policies,
outlier_detection=outlier_detection,
enable_dualstack=self.enable_dualstack,
)
self.backend_service = resource
self.backend_service_protocol = protocol
Expand Down Expand Up @@ -337,7 +351,10 @@ def create_alternative_backend_service(
'Creating %s Alternative Backend Service "%s"', protocol.name, name
)
resource = self.compute.create_backend_service_traffic_director(
name, health_check=self.health_check, protocol=protocol
name,
health_check=self.health_check,
protocol=protocol,
enable_dualstack=self.enable_dualstack,
)
self.alternative_backend_service = resource
self.alternative_backend_service_protocol = protocol
Expand Down Expand Up @@ -418,6 +435,7 @@ def create_affinity_backend_service(
health_check=self.health_check,
protocol=protocol,
affinity_header=TEST_AFFINITY_METADATA_KEY,
enable_dualstack=self.enable_dualstack,
)
self.affinity_backend_service = resource
self.affinity_backend_service_protocol = protocol
Expand Down Expand Up @@ -614,6 +632,20 @@ def create_target_proxy(self):
)
self.target_proxy = create_proxy_fn(name, self.url_map)

def create_target_proxy_ipv6(self):
name = self.make_resource_name(self.TARGET_PROXY_NAME_IPV6)
# TODO(lsafran): Support GRPC target proxy as well
target_proxy_type = "HTTP"
create_proxy_fn = self.compute.create_target_http_proxy

logger.info(
'Creating IPv6 target %s proxy "%s" to URL map %s',
name,
target_proxy_type,
self.url_map.name,
)
self.target_proxy_ipv6 = create_proxy_fn(name, self.url_map)

def delete_target_grpc_proxy(self, force=False):
if force:
name = self.make_resource_name(self.TARGET_PROXY_NAME)
Expand All @@ -638,6 +670,18 @@ def delete_target_http_proxy(self, force=False):
self.target_proxy = None
self.target_proxy_is_http = False

def delete_target_proxy_ipv6(self, force=False):
if force:
name = self.make_resource_name(self.TARGET_PROXY_NAME_IPV6)
elif self.target_proxy_ipv6:
name = self.target_proxy_ipv6.name
else:
return
# TODO: Delete Target GRPC Proxy when added in create_target_proxy_ipv6.
logger.info('Deleting IPv6 Target HTTP proxy "%s"', name)
self.compute.delete_target_http_proxy(name)
self.target_proxy_ipv6 = None

def create_alternative_target_proxy(self):
name = self.make_resource_name(self.ALTERNATIVE_TARGET_PROXY_NAME)
if self.backend_service_protocol is BackendServiceProtocol.GRPC:
Expand Down Expand Up @@ -695,6 +739,25 @@ def create_forwarding_rule(self, src_port: int):
self.forwarding_rule = resource
return resource

def create_forwarding_rule_ipv6(self, src_port: int):
name = self.make_resource_name(self.FORWARDING_RULE_NAME_IPV6)
logging.info(
'Creating IPv6 forwarding rule "%s" in network "%s": [::]:%s -> %s',
name,
self.network,
src_port,
self.target_proxy_ipv6.url,
)
resource = self.compute.create_forwarding_rule(
name,
src_port,
self.target_proxy_ipv6,
self.network_url,
ip_address="::",
)
self.forwarding_rule_ipv6 = resource
return resource

def delete_forwarding_rule(self, force=False):
if force:
name = self.make_resource_name(self.FORWARDING_RULE_NAME)
Expand All @@ -706,6 +769,17 @@ def delete_forwarding_rule(self, force=False):
self.compute.delete_forwarding_rule(name)
self.forwarding_rule = None

def delete_forwarding_rule_ipv6(self, force=False):
if force:
name = self.make_resource_name(self.FORWARDING_RULE_NAME_IPV6)
elif self.forwarding_rule_ipv6:
name = self.forwarding_rule_ipv6.name
else:
return
logger.info('Deleting IPv6 Forwarding rule "%s"', name)
self.compute.delete_forwarding_rule(name)
self.forwarding_rule_ipv6 = None

def create_alternative_forwarding_rule(
self, src_port: int, ip_address="0.0.0.0"
):
Expand Down Expand Up @@ -845,6 +919,7 @@ def __init__(
resource_suffix: Optional[str] = None,
network: str = "default",
compute_api_version: str = "v1",
enable_dualstack: bool = False,
):
super().__init__(
gcp_api_manager,
Expand All @@ -853,6 +928,7 @@ def __init__(
resource_suffix=resource_suffix,
network=network,
compute_api_version=compute_api_version,
enable_dualstack=enable_dualstack,
)

# API
Expand Down Expand Up @@ -969,6 +1045,7 @@ def __init__(
resource_suffix: Optional[str] = None,
network: str = "default",
compute_api_version: str = "v1",
enable_dualstack: bool = False,
):
super().__init__(
gcp_api_manager,
Expand All @@ -977,6 +1054,7 @@ def __init__(
resource_suffix=resource_suffix,
network=network,
compute_api_version=compute_api_version,
enable_dualstack=enable_dualstack,
)

# API
Expand Down
9 changes: 6 additions & 3 deletions framework/test_app/runners/k8s/k8s_base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,12 @@ def _wait_pod_started(self, name, **kwargs) -> k8s.V1Pod:
logger.info("Waiting for pod %s to start", name)
self.k8s_namespace.wait_for_pod_started(name, **kwargs)
pod = self.k8s_namespace.get_pod(name)
logger.info(
"Pod %s ready, IP: %s", pod.metadata.name, pod.status.pod_ip
)

if hasattr(pod.status, "pod_ip_s"): # if running with dualstack support
pod_ips = pod.status.pod_ip_s
else:
pod_ips = pod.status.pod_ip
logger.info("Pod %s ready, IP: %s", pod.metadata.name, pod_ips)
return pod

def _pod_started_logic(self, pod: k8s.V1Pod) -> bool:
Expand Down
1 change: 1 addition & 0 deletions framework/test_app/runners/k8s/k8s_xds_client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ClientDeploymentArgs:
enable_csm_observability: bool = False
csm_workload_name: str = ""
csm_canonical_service_name: str = ""
enable_dualstack: bool = False

def as_dict(self):
return dataclasses.asdict(self)
Expand Down
3 changes: 3 additions & 0 deletions framework/test_app/runners/k8s/k8s_xds_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__( # pylint: disable=too-many-locals
debug_use_port_forwarding: bool = False,
enable_workload_identity: bool = True,
deployment_args: Optional[ServerDeploymentArgs] = None,
enable_dualstack: bool = False,
):
super().__init__(
k8s_namespace,
Expand Down Expand Up @@ -142,6 +143,7 @@ def __init__( # pylint: disable=too-many-locals
self.td_bootstrap_image = td_bootstrap_image
self.network = network
self.xds_server_uri = xds_server_uri
self.enable_dualstack = enable_dualstack

# Workload identity settings:
if self.enable_workload_identity:
Expand Down Expand Up @@ -223,6 +225,7 @@ def run( # pylint: disable=arguments-differ,too-many-branches
deployment_name=self.deployment_name,
neg_name=self.gcp_neg_name,
test_port=test_port,
enable_dualstack=self.enable_dualstack,
)
self._wait_service_neg_status_annotation(self.service_name, test_port)

Expand Down
6 changes: 6 additions & 0 deletions framework/xds_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@
help="Whether to enable GFE debug headers and what value to use.",
)

ENABLE_DUALSTACK = flags.DEFINE_bool(
"enable_dualstack",
default=False,
help="Enable support for Dual Stack resources to the framework.",
)


def set_socket_default_timeout_from_flag() -> None:
"""A helper to configure default socket timeout from a flag.
Expand Down
14 changes: 13 additions & 1 deletion framework/xds_k8s_testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager
XdsTestServer = server_app.XdsTestServer
XdsTestClient = client_app.XdsTestClient
ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
_LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse
Expand Down Expand Up @@ -146,6 +147,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase):
_prev_sigint_handler: Optional[_SignalHandler] = None
_handling_sigint: bool = False
yaml_highlighter: framework.helpers.highlighter.HighlighterYaml = None
enable_dualstack: bool = False

@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
Expand Down Expand Up @@ -179,6 +181,7 @@ def setUpClass(cls):
cls.td_bootstrap_image = xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value
cls.xds_server_uri = xds_flags.XDS_SERVER_URI.value
cls.compute_api_version = xds_flags.COMPUTE_API_VERSION.value
cls.enable_dualstack = xds_flags.ENABLE_DUALSTACK.value

# Firewall
cls.ensure_firewall = xds_flags.ENSURE_FIREWALL.value
Expand Down Expand Up @@ -796,7 +799,11 @@ def setUp(self):
self.client_namespace = KubernetesClientRunner.make_namespace_name(
self.resource_prefix, self.resource_suffix
)
self.client_runner = self.initKubernetesClientRunner()
self.client_runner = self.initKubernetesClientRunner(
deployment_args=ClientDeploymentArgs(
enable_dualstack=self.enable_dualstack
)
)

# Create healthcheck firewall rules if necessary.
if self.ensure_firewall:
Expand Down Expand Up @@ -940,6 +947,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorManager:
resource_suffix=self.resource_suffix,
network=self.network,
compute_api_version=self.compute_api_version,
enable_dualstack=self.enable_dualstack,
)

def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner:
Expand All @@ -957,6 +965,7 @@ def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner:
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
enable_workload_identity=self.enable_workload_identity,
enable_dualstack=self.enable_dualstack,
**kwargs,
)

Expand Down Expand Up @@ -1014,6 +1023,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorAppNetManager:
resource_suffix=self.resource_suffix,
network=self.network,
compute_api_version=self.compute_api_version,
enable_dualstack=self.enable_dualstack,
)


Expand Down Expand Up @@ -1051,6 +1061,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager:
resource_suffix=self.resource_suffix,
network=self.network,
compute_api_version=self.compute_api_version,
enable_dualstack=self.enable_dualstack,
)

def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner:
Expand All @@ -1068,6 +1079,7 @@ def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner:
xds_server_uri=self.xds_server_uri,
deployment_template="server-secure.deployment.yaml",
debug_use_port_forwarding=self.debug_use_port_forwarding,
enable_dualstack=self.enable_dualstack,
**kwargs,
)

Expand Down
6 changes: 6 additions & 0 deletions kubernetes-manifests/client.deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ spec:
value: "true"
- name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST
value: "true"
% if enable_dualstack:
- name: GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST
value: "true"
- name: GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS
value: "true"
% endif
% if csm_workload_name:
- name: CSM_WORKLOAD_NAME
value: ${csm_workload_name}
Expand Down
6 changes: 6 additions & 0 deletions kubernetes-manifests/server.service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ metadata:
cloud.google.com/neg: '{"exposed_ports": {"${test_port}":{"name":"${neg_name}"}}}'
spec:
type: ClusterIP
% if enable_dualstack:
ipFamilyPolicy: PreferDualStack
ipFamilies:
- IPv6
- IPv4
% endif
selector:
app: ${deployment_name}
ports:
Expand Down

0 comments on commit 00199ac

Please sign in to comment.