Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fallback] Use EDS cluster instead of DNS #129

Merged
merged 10 commits into from
Sep 3, 2024
65 changes: 44 additions & 21 deletions framework/helpers/xds_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from envoy.config.cluster.v3 import cluster_pb2
from envoy.config.core.v3 import address_pb2
from envoy.config.core.v3 import base_pb2
from envoy.config.core.v3 import config_source_pb2
from envoy.config.endpoint.v3 import endpoint_components_pb2
from envoy.config.endpoint.v3 import endpoint_pb2
from envoy.config.listener.v3 import api_listener_pb2
Expand All @@ -26,6 +28,7 @@
)
from google.protobuf import any_pb2
from google.protobuf import message
from google.protobuf import wrappers_pb2

from protos.grpc.testing.xdsconfig import xdsconfig_pb2

Expand Down Expand Up @@ -70,46 +73,59 @@ def _build_listener(listener_name: str, cluster_name: str):
def _build_endpoint(
cluster_name: str, upstream_host: str, upstream_port: int
) -> endpoint_pb2.ClusterLoadAssignment:
endpoint = endpoint_components_pb2.Endpoint(
address=address_pb2.Address(
socket_address=address_pb2.SocketAddress(
protocol=address_pb2.SocketAddress.TCP,
address=upstream_host,
port_value=upstream_port,
)
)
)
return endpoint_pb2.ClusterLoadAssignment(
cluster_name=cluster_name,
endpoints=[
endpoint_components_pb2.LocalityLbEndpoints(
locality=base_pb2.Locality(
region="xds_default_locality_region",
zone="xds_default_locality_zone",
sub_zone="locality0",
),
lb_endpoints=[
endpoint_components_pb2.LbEndpoint(
endpoint=endpoint_components_pb2.Endpoint(
address=address_pb2.Address(
socket_address=address_pb2.SocketAddress(
protocol=address_pb2.SocketAddress.TCP,
address=upstream_host,
port_value=upstream_port,
)
)
)
endpoint=endpoint,
health_status=1,
load_balancing_weight=wrappers_pb2.UInt32Value(value=1),
)
]
],
load_balancing_weight=wrappers_pb2.UInt32Value(value=1),
)
],
)


def _build_cluster(
cluster_name: str, upstream_host: str, upstream_port: int
) -> cluster_pb2.Cluster:
def _build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster:
return cluster_pb2.Cluster(
name=cluster_name,
load_assignment=_build_endpoint(
cluster_name, upstream_host, upstream_port
type=cluster_pb2.Cluster.DiscoveryType.EDS,
eds_cluster_config=cluster_pb2.Cluster.EdsClusterConfig(
eds_config=config_source_pb2.ConfigSource(
self=config_source_pb2.SelfConfigSource(),
),
service_name=service_name,
),
type=cluster_pb2.Cluster.DiscoveryType.LOGICAL_DNS,
lb_policy=cluster_pb2.Cluster.LbPolicy.ROUND_ROBIN,
dns_lookup_family=cluster_pb2.Cluster.DnsLookupFamily.V4_ONLY,
)


def _build_resource_to_set(resource: message.Message):
name = (
resource.cluster_name
if hasattr(resource, "cluster_name")
else resource.name
)
return xdsconfig_pb2.SetResourcesRequest.ResourceToSet(
type_url=f"type.googleapis.com/{resource.DESCRIPTOR.full_name}",
name=resource.name,
name=name,
body=_wrap_in_any(resource),
)

Expand All @@ -120,8 +136,15 @@ def build_listener_and_cluster(
upstream_host: str,
upstream_port: int,
) -> xdsconfig_pb2.SetResourcesRequest:
service_name = f"{cluster_name}_eds_service"
listener = _build_listener(listener_name, cluster_name)
cluster = _build_cluster(cluster_name, upstream_host, upstream_port)
cluster = _build_cluster(cluster_name, service_name)
load_assignment = _build_endpoint(
service_name, upstream_host, upstream_port
)
return xdsconfig_pb2.SetResourcesRequest(
resources=[_build_resource_to_set(r) for r in [listener, cluster]]
resources=[
_build_resource_to_set(r)
for r in [listener, cluster, load_assignment]
]
)
10 changes: 8 additions & 2 deletions tests/fallback_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ def get_free_port() -> int:

class FallbackTest(absltest.TestCase):
bootstrap: framework.helpers.docker.Bootstrap = None
dockerInternalIp: str

@staticmethod
def setUpClass():
# Use the host IP for when we need to use IP address and not the host
# name, such as EDS resources
FallbackTest.dockerInternalIp = socket.gethostbyname(
socket.gethostname()
)
FallbackTest.bootstrap = framework.helpers.docker.Bootstrap(
framework.helpers.logs.log_dir_mkdir("bootstrap"),
primary_port=get_free_port(),
Expand Down Expand Up @@ -98,7 +104,7 @@ def start_control_plane(
initial_resources=framework.helpers.xds_resources.build_listener_and_cluster(
listener_name=_LISTENER,
cluster_name=cluster_name or f"initial_cluster_for_{name}",
upstream_host=_HOST_NAME.value,
upstream_host=FallbackTest.dockerInternalIp,
upstream_port=upstream_port,
),
image=_CONTROL_PLANE_IMAGE.value,
Expand Down Expand Up @@ -292,7 +298,7 @@ def test_fallback_mid_update(self):
cluster_name="test_cluster_2",
listener_name=_LISTENER,
upstream_port=server3.port,
upstream_host=_HOST_NAME.value,
upstream_host=FallbackTest.dockerInternalIp,
)
)
self.assert_ads_connections(
Expand Down
Loading