Skip to content

Commit

Permalink
[fallback] Use EDS cluster instead of DNS (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo authored Sep 3, 2024
1 parent 0e87a36 commit e4a292d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
65 changes: 44 additions & 21 deletions framework/helpers/xds_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from envoy.config.cluster.v3 import cluster_pb2
from envoy.config.core.v3 import address_pb2
from envoy.config.core.v3 import base_pb2
from envoy.config.core.v3 import config_source_pb2
from envoy.config.endpoint.v3 import endpoint_components_pb2
from envoy.config.endpoint.v3 import endpoint_pb2
from envoy.config.listener.v3 import api_listener_pb2
Expand All @@ -26,6 +28,7 @@
)
from google.protobuf import any_pb2
from google.protobuf import message
from google.protobuf import wrappers_pb2

from protos.grpc.testing.xdsconfig import xdsconfig_pb2

Expand Down Expand Up @@ -70,46 +73,59 @@ def _build_listener(listener_name: str, cluster_name: str):
def _build_endpoint(
cluster_name: str, upstream_host: str, upstream_port: int
) -> endpoint_pb2.ClusterLoadAssignment:
endpoint = endpoint_components_pb2.Endpoint(
address=address_pb2.Address(
socket_address=address_pb2.SocketAddress(
protocol=address_pb2.SocketAddress.TCP,
address=upstream_host,
port_value=upstream_port,
)
)
)
return endpoint_pb2.ClusterLoadAssignment(
cluster_name=cluster_name,
endpoints=[
endpoint_components_pb2.LocalityLbEndpoints(
locality=base_pb2.Locality(
region="xds_default_locality_region",
zone="xds_default_locality_zone",
sub_zone="locality0",
),
lb_endpoints=[
endpoint_components_pb2.LbEndpoint(
endpoint=endpoint_components_pb2.Endpoint(
address=address_pb2.Address(
socket_address=address_pb2.SocketAddress(
protocol=address_pb2.SocketAddress.TCP,
address=upstream_host,
port_value=upstream_port,
)
)
)
endpoint=endpoint,
health_status=1,
load_balancing_weight=wrappers_pb2.UInt32Value(value=1),
)
]
],
load_balancing_weight=wrappers_pb2.UInt32Value(value=1),
)
],
)


def _build_cluster(
cluster_name: str, upstream_host: str, upstream_port: int
) -> cluster_pb2.Cluster:
def _build_cluster(cluster_name: str, service_name: str) -> cluster_pb2.Cluster:
return cluster_pb2.Cluster(
name=cluster_name,
load_assignment=_build_endpoint(
cluster_name, upstream_host, upstream_port
type=cluster_pb2.Cluster.DiscoveryType.EDS,
eds_cluster_config=cluster_pb2.Cluster.EdsClusterConfig(
eds_config=config_source_pb2.ConfigSource(
self=config_source_pb2.SelfConfigSource(),
),
service_name=service_name,
),
type=cluster_pb2.Cluster.DiscoveryType.LOGICAL_DNS,
lb_policy=cluster_pb2.Cluster.LbPolicy.ROUND_ROBIN,
dns_lookup_family=cluster_pb2.Cluster.DnsLookupFamily.V4_ONLY,
)


def _build_resource_to_set(resource: message.Message):
name = (
resource.cluster_name
if hasattr(resource, "cluster_name")
else resource.name
)
return xdsconfig_pb2.SetResourcesRequest.ResourceToSet(
type_url=f"type.googleapis.com/{resource.DESCRIPTOR.full_name}",
name=resource.name,
name=name,
body=_wrap_in_any(resource),
)

Expand All @@ -120,8 +136,15 @@ def build_listener_and_cluster(
upstream_host: str,
upstream_port: int,
) -> xdsconfig_pb2.SetResourcesRequest:
service_name = f"{cluster_name}_eds_service"
listener = _build_listener(listener_name, cluster_name)
cluster = _build_cluster(cluster_name, upstream_host, upstream_port)
cluster = _build_cluster(cluster_name, service_name)
load_assignment = _build_endpoint(
service_name, upstream_host, upstream_port
)
return xdsconfig_pb2.SetResourcesRequest(
resources=[_build_resource_to_set(r) for r in [listener, cluster]]
resources=[
_build_resource_to_set(r)
for r in [listener, cluster, load_assignment]
]
)
10 changes: 8 additions & 2 deletions tests/fallback_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ def get_free_port() -> int:

class FallbackTest(absltest.TestCase):
bootstrap: framework.helpers.docker.Bootstrap = None
dockerInternalIp: str

@staticmethod
def setUpClass():
# Use the host IP for when we need to use IP address and not the host
# name, such as EDS resources
FallbackTest.dockerInternalIp = socket.gethostbyname(
socket.gethostname()
)
FallbackTest.bootstrap = framework.helpers.docker.Bootstrap(
framework.helpers.logs.log_dir_mkdir("bootstrap"),
primary_port=get_free_port(),
Expand Down Expand Up @@ -98,7 +104,7 @@ def start_control_plane(
initial_resources=framework.helpers.xds_resources.build_listener_and_cluster(
listener_name=_LISTENER,
cluster_name=cluster_name or f"initial_cluster_for_{name}",
upstream_host=_HOST_NAME.value,
upstream_host=FallbackTest.dockerInternalIp,
upstream_port=upstream_port,
),
image=_CONTROL_PLANE_IMAGE.value,
Expand Down Expand Up @@ -292,7 +298,7 @@ def test_fallback_mid_update(self):
cluster_name="test_cluster_2",
listener_name=_LISTENER,
upstream_port=server3.port,
upstream_host=_HOST_NAME.value,
upstream_host=FallbackTest.dockerInternalIp,
)
)
self.assert_ads_connections(
Expand Down

0 comments on commit e4a292d

Please sign in to comment.