From e4a292dca2ed71f588cc9c8f3fe1abb50b35ffad Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Tue, 3 Sep 2024 10:41:12 -0700 Subject: [PATCH] [fallback] Use EDS cluster instead of DNS (#129) Python test run: https://fusion2.corp.google.com/invocations/8415611c-5b30-49ee-8688-18bd8c0e7302 Core test run: https://fusion2.corp.google.com/invocations/a9bd4699-c570-4751-9681-eb2772513711 --- framework/helpers/xds_resources.py | 65 ++++++++++++++++++++---------- tests/fallback_test.py | 10 ++++- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/framework/helpers/xds_resources.py b/framework/helpers/xds_resources.py index d4be0ca7..e73bca8c 100644 --- a/framework/helpers/xds_resources.py +++ b/framework/helpers/xds_resources.py @@ -14,6 +14,8 @@ from envoy.config.cluster.v3 import cluster_pb2 from envoy.config.core.v3 import address_pb2 +from envoy.config.core.v3 import base_pb2 +from envoy.config.core.v3 import config_source_pb2 from envoy.config.endpoint.v3 import endpoint_components_pb2 from envoy.config.endpoint.v3 import endpoint_pb2 from envoy.config.listener.v3 import api_listener_pb2 @@ -26,6 +28,7 @@ ) from google.protobuf import any_pb2 from google.protobuf import message +from google.protobuf import wrappers_pb2 from protos.grpc.testing.xdsconfig import xdsconfig_pb2 @@ -70,46 +73,59 @@ def _build_listener(listener_name: str, cluster_name: str): def _build_endpoint( cluster_name: str, upstream_host: str, upstream_port: int ) -> endpoint_pb2.ClusterLoadAssignment: + endpoint = endpoint_components_pb2.Endpoint( + address=address_pb2.Address( + socket_address=address_pb2.SocketAddress( + protocol=address_pb2.SocketAddress.TCP, + address=upstream_host, + port_value=upstream_port, + ) + ) + ) return endpoint_pb2.ClusterLoadAssignment( cluster_name=cluster_name, endpoints=[ endpoint_components_pb2.LocalityLbEndpoints( + locality=base_pb2.Locality( + region="xds_default_locality_region", + zone="xds_default_locality_zone", + sub_zone="locality0", + ), lb_endpoints=[ endpoint_components_pb2.LbEndpoint( - endpoint=endpoint_components_pb2.Endpoint( - address=address_pb2.Address( - socket_address=address_pb2.SocketAddress( - protocol=address_pb2.SocketAddress.TCP, - address=upstream_host, - port_value=upstream_port, - ) - ) - ) + endpoint=endpoint, + health_status=1, + load_balancing_weight=wrappers_pb2.UInt32Value(value=1), ) - ] + ], + load_balancing_weight=wrappers_pb2.UInt32Value(value=1), ) ], ) -def _build_cluster( - cluster_name: str, upstream_host: str, upstream_port: int -) -> cluster_pb2.Cluster: +def _build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster: return cluster_pb2.Cluster( name=cluster_name, - load_assignment=_build_endpoint( - cluster_name, upstream_host, upstream_port + type=cluster_pb2.Cluster.DiscoveryType.EDS, + eds_cluster_config=cluster_pb2.Cluster.EdsClusterConfig( + eds_config=config_source_pb2.ConfigSource( + self=config_source_pb2.SelfConfigSource(), + ), + service_name=service_name, ), - type=cluster_pb2.Cluster.DiscoveryType.LOGICAL_DNS, - lb_policy=cluster_pb2.Cluster.LbPolicy.ROUND_ROBIN, - dns_lookup_family=cluster_pb2.Cluster.DnsLookupFamily.V4_ONLY, ) def _build_resource_to_set(resource: message.Message): + name = ( + resource.cluster_name + if hasattr(resource, "cluster_name") + else resource.name + ) return xdsconfig_pb2.SetResourcesRequest.ResourceToSet( type_url=f"type.googleapis.com/{resource.DESCRIPTOR.full_name}", - name=resource.name, + name=name, body=_wrap_in_any(resource), ) @@ -120,8 +136,15 @@ def build_listener_and_cluster( upstream_host: str, upstream_port: int, ) -> xdsconfig_pb2.SetResourcesRequest: + service_name = f"{cluster_name}_eds_service" listener = _build_listener(listener_name, cluster_name) - cluster = _build_cluster(cluster_name, upstream_host, upstream_port) + cluster = _build_cluster(cluster_name, service_name) + load_assignment = _build_endpoint( + service_name, upstream_host, upstream_port + ) return xdsconfig_pb2.SetResourcesRequest( - resources=[_build_resource_to_set(r) for r in [listener, cluster]] + resources=[ + _build_resource_to_set(r) + for r in [listener, cluster, load_assignment] + ] ) diff --git a/tests/fallback_test.py b/tests/fallback_test.py index 1eda0b8a..a9618391 100644 --- a/tests/fallback_test.py +++ b/tests/fallback_test.py @@ -63,9 +63,15 @@ def get_free_port() -> int: class FallbackTest(absltest.TestCase): bootstrap: framework.helpers.docker.Bootstrap = None + dockerInternalIp: str @staticmethod def setUpClass(): + # Use the host IP for when we need to use IP address and not the host + # name, such as EDS resources + FallbackTest.dockerInternalIp = socket.gethostbyname( + socket.gethostname() + ) FallbackTest.bootstrap = framework.helpers.docker.Bootstrap( framework.helpers.logs.log_dir_mkdir("bootstrap"), primary_port=get_free_port(), @@ -98,7 +104,7 @@ def start_control_plane( initial_resources=framework.helpers.xds_resources.build_listener_and_cluster( listener_name=_LISTENER, cluster_name=cluster_name or f"initial_cluster_for_{name}", - upstream_host=_HOST_NAME.value, + upstream_host=FallbackTest.dockerInternalIp, upstream_port=upstream_port, ), image=_CONTROL_PLANE_IMAGE.value, @@ -292,7 +298,7 @@ def test_fallback_mid_update(self): cluster_name="test_cluster_2", listener_name=_LISTENER, upstream_port=server3.port, - upstream_host=_HOST_NAME.value, + upstream_host=FallbackTest.dockerInternalIp, ) ) self.assert_ads_connections(