diff --git a/framework/infrastructure/gcp/api.py b/framework/infrastructure/gcp/api.py index 746950a6..e85f69b9 100644 --- a/framework/infrastructure/gcp/api.py +++ b/framework/infrastructure/gcp/api.py @@ -157,6 +157,8 @@ def compute(self, version: str): return self._build_from_discovery_v1(api_name, version) elif version == "v1alpha": return self._build_from_discovery_v1(api_name, "alpha") + elif version == "v1beta": + return self._build_from_discovery_v1(api_name, "beta") raise NotImplementedError(f"Compute {version} not supported") diff --git a/framework/infrastructure/gcp/compute.py b/framework/infrastructure/gcp/compute.py index f90d104a..d1f4f96c 100644 --- a/framework/infrastructure/gcp/compute.py +++ b/framework/infrastructure/gcp/compute.py @@ -151,6 +151,7 @@ def create_backend_service_traffic_director( subset_size: Optional[int] = None, locality_lb_policies: Optional[List[dict]] = None, outlier_detection: Optional[dict] = None, + enable_dualstack: bool = False, ) -> "GcpResource": if not isinstance(protocol, self.BackendServiceProtocol): raise TypeError(f"Unexpected Backend Service protocol: {protocol}") @@ -160,6 +161,10 @@ def create_backend_service_traffic_director( "healthChecks": [health_check.url], "protocol": protocol.name, } + # If add dualstack support is specified True, config the backend service + # to support IPv6 + if enable_dualstack: + body["ipAddressSelectionPolicy"] = "PREFER_IPV6" # If affinity header is specified, config the backend service to support # affinity, and set affinity header to the one given. if affinity_header: diff --git a/framework/infrastructure/traffic_director.py b/framework/infrastructure/traffic_director.py index decd4102..0004ad9d 100644 --- a/framework/infrastructure/traffic_director.py +++ b/framework/infrastructure/traffic_director.py @@ -65,9 +65,11 @@ class TrafficDirectorManager: # pylint: disable=too-many-public-methods URL_MAP_PATH_MATCHER_NAME: Final[str] = "path-matcher" TARGET_PROXY_NAME: Final[str] = "target-proxy" + TARGET_PROXY_NAME_IPV6: Final[str] = "target-proxy-ipv6" ALTERNATIVE_TARGET_PROXY_NAME: Final[str] = "target-proxy-alt" FORWARDING_RULE_NAME: Final[str] = "forwarding-rule" + FORWARDING_RULE_NAME_IPV6: Final[str] = "forwarding-rule-ipv6" ALTERNATIVE_FORWARDING_RULE_NAME: Final[str] = "forwarding-rule-alt" HEALTH_CHECK_NAME: Final[str] = "health-check" @@ -107,6 +109,7 @@ def __init__( resource_suffix: str, network: str = "default", compute_api_version: str = "v1", + enable_dualstack: bool = False, ): # API self.compute = _ComputeV1( @@ -121,6 +124,7 @@ def __init__( self.network: str = network self.resource_prefix: str = resource_prefix self.resource_suffix: str = resource_suffix + self.enable_dualstack: bool = enable_dualstack # Managed resources self.health_check: Optional[GcpResource] = None @@ -129,10 +133,12 @@ def __init__( self.firewall_rule: Optional[GcpResource] = None self.firewall_rule_ipv6: Optional[GcpResource] = None self.target_proxy: Optional[GcpResource] = None + self.target_proxy_ipv6: Optional[GcpResource] = None # TODO(sergiitk): remove this flag once target proxy resource loaded self.target_proxy_is_http: bool = False self.alternative_target_proxy: Optional[GcpResource] = None self.forwarding_rule: Optional[GcpResource] = None + self.forwarding_rule_ipv6: Optional[GcpResource] = None self.alternative_forwarding_rule: Optional[GcpResource] = None # Backends. @@ -171,6 +177,10 @@ def setup_routing_rule_map_for_grpc(self, service_host, service_port): self.create_target_proxy() self.create_forwarding_rule(service_port) + if self.enable_dualstack: + self.create_target_proxy_ipv6() + self.create_forwarding_rule_ipv6(service_port) + def cleanup(self, *, force=False): # Cleanup in the reverse order of creation self.delete_firewall_rules(force=force) @@ -178,6 +188,9 @@ def cleanup(self, *, force=False): self.delete_alternative_forwarding_rule(force=force) self.delete_target_http_proxy(force=force) self.delete_target_grpc_proxy(force=force) + if self.enable_dualstack: + self.delete_forwarding_rule_ipv6(force=force) + self.delete_target_proxy_ipv6(force=force) self.delete_alternative_target_grpc_proxy(force=force) self.delete_url_map(force=force) self.delete_alternative_url_map(force=force) @@ -246,6 +259,7 @@ def create_backend_service( affinity_header=affinity_header, locality_lb_policies=locality_lb_policies, outlier_detection=outlier_detection, + enable_dualstack=self.enable_dualstack, ) self.backend_service = resource self.backend_service_protocol = protocol @@ -337,7 +351,10 @@ def create_alternative_backend_service( 'Creating %s Alternative Backend Service "%s"', protocol.name, name ) resource = self.compute.create_backend_service_traffic_director( - name, health_check=self.health_check, protocol=protocol + name, + health_check=self.health_check, + protocol=protocol, + enable_dualstack=self.enable_dualstack, ) self.alternative_backend_service = resource self.alternative_backend_service_protocol = protocol @@ -418,6 +435,7 @@ def create_affinity_backend_service( health_check=self.health_check, protocol=protocol, affinity_header=TEST_AFFINITY_METADATA_KEY, + enable_dualstack=self.enable_dualstack, ) self.affinity_backend_service = resource self.affinity_backend_service_protocol = protocol @@ -614,6 +632,20 @@ def create_target_proxy(self): ) self.target_proxy = create_proxy_fn(name, self.url_map) + def create_target_proxy_ipv6(self): + name = self.make_resource_name(self.TARGET_PROXY_NAME_IPV6) + # TODO(lsafran): Support GRPC target proxy as well + target_proxy_type = "HTTP" + create_proxy_fn = self.compute.create_target_http_proxy + + logger.info( + 'Creating IPv6 target %s proxy "%s" to URL map %s', + name, + target_proxy_type, + self.url_map.name, + ) + self.target_proxy_ipv6 = create_proxy_fn(name, self.url_map) + def delete_target_grpc_proxy(self, force=False): if force: name = self.make_resource_name(self.TARGET_PROXY_NAME) @@ -638,6 +670,18 @@ def delete_target_http_proxy(self, force=False): self.target_proxy = None self.target_proxy_is_http = False + def delete_target_proxy_ipv6(self, force=False): + if force: + name = self.make_resource_name(self.TARGET_PROXY_NAME_IPV6) + elif self.target_proxy_ipv6: + name = self.target_proxy_ipv6.name + else: + return + # TODO: Delete Target GRPC Proxy when added in create_target_proxy_ipv6. + logger.info('Deleting IPv6 Target HTTP proxy "%s"', name) + self.compute.delete_target_http_proxy(name) + self.target_proxy_ipv6 = None + def create_alternative_target_proxy(self): name = self.make_resource_name(self.ALTERNATIVE_TARGET_PROXY_NAME) if self.backend_service_protocol is BackendServiceProtocol.GRPC: @@ -695,6 +739,25 @@ def create_forwarding_rule(self, src_port: int): self.forwarding_rule = resource return resource + def create_forwarding_rule_ipv6(self, src_port: int): + name = self.make_resource_name(self.FORWARDING_RULE_NAME_IPV6) + logging.info( + 'Creating IPv6 forwarding rule "%s" in network "%s": [::]:%s -> %s', + name, + self.network, + src_port, + self.target_proxy_ipv6.url, + ) + resource = self.compute.create_forwarding_rule( + name, + src_port, + self.target_proxy_ipv6, + self.network_url, + ip_address="::", + ) + self.forwarding_rule_ipv6 = resource + return resource + def delete_forwarding_rule(self, force=False): if force: name = self.make_resource_name(self.FORWARDING_RULE_NAME) @@ -706,6 +769,17 @@ def delete_forwarding_rule(self, force=False): self.compute.delete_forwarding_rule(name) self.forwarding_rule = None + def delete_forwarding_rule_ipv6(self, force=False): + if force: + name = self.make_resource_name(self.FORWARDING_RULE_NAME_IPV6) + elif self.forwarding_rule_ipv6: + name = self.forwarding_rule_ipv6.name + else: + return + logger.info('Deleting IPv6 Forwarding rule "%s"', name) + self.compute.delete_forwarding_rule(name) + self.forwarding_rule_ipv6 = None + def create_alternative_forwarding_rule( self, src_port: int, ip_address="0.0.0.0" ): @@ -845,6 +919,7 @@ def __init__( resource_suffix: Optional[str] = None, network: str = "default", compute_api_version: str = "v1", + enable_dualstack: bool = False, ): super().__init__( gcp_api_manager, @@ -853,6 +928,7 @@ def __init__( resource_suffix=resource_suffix, network=network, compute_api_version=compute_api_version, + enable_dualstack=enable_dualstack, ) # API @@ -969,6 +1045,7 @@ def __init__( resource_suffix: Optional[str] = None, network: str = "default", compute_api_version: str = "v1", + enable_dualstack: bool = False, ): super().__init__( gcp_api_manager, @@ -977,6 +1054,7 @@ def __init__( resource_suffix=resource_suffix, network=network, compute_api_version=compute_api_version, + enable_dualstack=enable_dualstack, ) # API diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index 867f50c7..a5b9834f 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -998,9 +998,12 @@ def _wait_pod_started(self, name, **kwargs) -> k8s.V1Pod: logger.info("Waiting for pod %s to start", name) self.k8s_namespace.wait_for_pod_started(name, **kwargs) pod = self.k8s_namespace.get_pod(name) - logger.info( - "Pod %s ready, IP: %s", pod.metadata.name, pod.status.pod_ip - ) + + if hasattr(pod.status, "pod_ip_s"): # if running with dualstack support + pod_ips = pod.status.pod_ip_s + else: + pod_ips = pod.status.pod_ip + logger.info("Pod %s ready, IP: %s", pod.metadata.name, pod_ips) return pod def _pod_started_logic(self, pod: k8s.V1Pod) -> bool: diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 9c5110af..db98bd2f 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -31,6 +31,7 @@ class ClientDeploymentArgs: enable_csm_observability: bool = False csm_workload_name: str = "" csm_canonical_service_name: str = "" + enable_dualstack: bool = False def as_dict(self): return dataclasses.asdict(self) diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index ffe60c39..045e57d0 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -108,6 +108,7 @@ def __init__( # pylint: disable=too-many-locals debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, deployment_args: Optional[ServerDeploymentArgs] = None, + enable_dualstack: bool = False, ): super().__init__( k8s_namespace, @@ -142,6 +143,7 @@ def __init__( # pylint: disable=too-many-locals self.td_bootstrap_image = td_bootstrap_image self.network = network self.xds_server_uri = xds_server_uri + self.enable_dualstack = enable_dualstack # Workload identity settings: if self.enable_workload_identity: @@ -223,6 +225,7 @@ def run( # pylint: disable=arguments-differ,too-many-branches deployment_name=self.deployment_name, neg_name=self.gcp_neg_name, test_port=test_port, + enable_dualstack=self.enable_dualstack, ) self._wait_service_neg_status_annotation(self.service_name, test_port) diff --git a/framework/xds_flags.py b/framework/xds_flags.py index 971f1b4c..3960b27f 100644 --- a/framework/xds_flags.py +++ b/framework/xds_flags.py @@ -190,6 +190,12 @@ help="Whether to enable GFE debug headers and what value to use.", ) +ENABLE_DUALSTACK = flags.DEFINE_bool( + "enable_dualstack", + default=False, + help="Enable support for Dual Stack resources to the framework.", +) + def set_socket_default_timeout_from_flag() -> None: """A helper to configure default socket timeout from a flag. diff --git a/framework/xds_k8s_testcase.py b/framework/xds_k8s_testcase.py index 58e0ed10..49106b19 100644 --- a/framework/xds_k8s_testcase.py +++ b/framework/xds_k8s_testcase.py @@ -65,6 +65,7 @@ TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager XdsTestServer = server_app.XdsTestServer XdsTestClient = client_app.XdsTestClient +ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner _LoadBalancerStatsResponse = grpc_testing.LoadBalancerStatsResponse @@ -146,6 +147,7 @@ class XdsKubernetesBaseTestCase(base_testcase.BaseTestCase): _prev_sigint_handler: Optional[_SignalHandler] = None _handling_sigint: bool = False yaml_highlighter: framework.helpers.highlighter.HighlighterYaml = None + enable_dualstack: bool = False @staticmethod def is_supported(config: skips.TestConfig) -> bool: @@ -179,6 +181,7 @@ def setUpClass(cls): cls.td_bootstrap_image = xds_k8s_flags.TD_BOOTSTRAP_IMAGE.value cls.xds_server_uri = xds_flags.XDS_SERVER_URI.value cls.compute_api_version = xds_flags.COMPUTE_API_VERSION.value + cls.enable_dualstack = xds_flags.ENABLE_DUALSTACK.value # Firewall cls.ensure_firewall = xds_flags.ENSURE_FIREWALL.value @@ -796,7 +799,11 @@ def setUp(self): self.client_namespace = KubernetesClientRunner.make_namespace_name( self.resource_prefix, self.resource_suffix ) - self.client_runner = self.initKubernetesClientRunner() + self.client_runner = self.initKubernetesClientRunner( + deployment_args=ClientDeploymentArgs( + enable_dualstack=self.enable_dualstack + ) + ) # Create healthcheck firewall rules if necessary. if self.ensure_firewall: @@ -940,6 +947,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorManager: resource_suffix=self.resource_suffix, network=self.network, compute_api_version=self.compute_api_version, + enable_dualstack=self.enable_dualstack, ) def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: @@ -957,6 +965,7 @@ def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: network=self.network, debug_use_port_forwarding=self.debug_use_port_forwarding, enable_workload_identity=self.enable_workload_identity, + enable_dualstack=self.enable_dualstack, **kwargs, ) @@ -1014,6 +1023,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorAppNetManager: resource_suffix=self.resource_suffix, network=self.network, compute_api_version=self.compute_api_version, + enable_dualstack=self.enable_dualstack, ) @@ -1051,6 +1061,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager: resource_suffix=self.resource_suffix, network=self.network, compute_api_version=self.compute_api_version, + enable_dualstack=self.enable_dualstack, ) def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: @@ -1068,6 +1079,7 @@ def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: xds_server_uri=self.xds_server_uri, deployment_template="server-secure.deployment.yaml", debug_use_port_forwarding=self.debug_use_port_forwarding, + enable_dualstack=self.enable_dualstack, **kwargs, ) diff --git a/kubernetes-manifests/client.deployment.yaml b/kubernetes-manifests/client.deployment.yaml index 897c2ed2..6f5ada29 100644 --- a/kubernetes-manifests/client.deployment.yaml +++ b/kubernetes-manifests/client.deployment.yaml @@ -66,6 +66,12 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if enable_dualstack: + - name: GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST + value: "true" + - name: GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS + value: "true" + % endif % if csm_workload_name: - name: CSM_WORKLOAD_NAME value: ${csm_workload_name} diff --git a/kubernetes-manifests/server.service.yaml b/kubernetes-manifests/server.service.yaml index 376de175..bfb13c00 100644 --- a/kubernetes-manifests/server.service.yaml +++ b/kubernetes-manifests/server.service.yaml @@ -10,6 +10,12 @@ metadata: cloud.google.com/neg: '{"exposed_ports": {"${test_port}":{"name":"${neg_name}"}}}' spec: type: ClusterIP + % if enable_dualstack: + ipFamilyPolicy: PreferDualStack + ipFamilies: + - IPv6 + - IPv4 + % endif selector: app: ${deployment_name} ports: