Skip to content

Commit

Permalink
Address a number of code review comments and add a test for missing C…
Browse files Browse the repository at this point in the history
…ds aggregate and eds entries.
  • Loading branch information
larry-safran committed Jan 8, 2025
1 parent 87007b0 commit f58a49a
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 78 deletions.
34 changes: 27 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableMap;
import io.grpc.StatusOr;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
Expand All @@ -31,13 +32,18 @@
/**
* Represents the xDS configuration tree for a specified Listener.
*/
public class XdsConfig {
final LdsUpdate listener;
final RdsUpdate route;
final Map<String, StatusOr<XdsClusterConfig>> clusters;
final class XdsConfig {
private final LdsUpdate listener;
private final RdsUpdate route;
private final ImmutableMap<String, StatusOr<XdsClusterConfig>> clusters;
private final int hashCode;

XdsConfig(LdsUpdate listener, RdsUpdate route, Map<String, StatusOr<XdsClusterConfig>> clusters) {
this(listener, route, ImmutableMap.copyOf(clusters));
}

public XdsConfig(LdsUpdate listener, RdsUpdate route, ImmutableMap<String,
StatusOr<XdsClusterConfig>> clusters) {
this.listener = listener;
this.route = route;
this.clusters = clusters;
Expand All @@ -53,8 +59,8 @@ public boolean equals(Object obj) {

XdsConfig o = (XdsConfig) obj;

return Objects.equals(listener, o.listener) && Objects.equals(route, o.route)
&& Objects.equals(clusters, o.clusters);
return hashCode() == o.hashCode() && Objects.equals(listener, o.listener)
&& Objects.equals(route, o.route) && Objects.equals(clusters, o.clusters);
}

@Override
Expand All @@ -67,10 +73,22 @@ public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("XdsConfig{listener=").append(listener)
.append(", route=").append(route)
.append(", clusters={").append(clusters).append("}}");
.append(", clusters=").append(clusters).append("}");
return builder.toString();
}

public LdsUpdate getListener() {
return listener;

Check warning on line 81 in xds/src/main/java/io/grpc/xds/XdsConfig.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsConfig.java#L81

Added line #L81 was not covered by tests
}

public RdsUpdate getRoute() {
return route;

Check warning on line 85 in xds/src/main/java/io/grpc/xds/XdsConfig.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsConfig.java#L85

Added line #L85 was not covered by tests
}

public ImmutableMap<String, StatusOr<XdsClusterConfig>> getClusters() {
return clusters;
}

public static class XdsClusterConfig {
final String clusterName;
final CdsUpdate clusterResource;
Expand Down Expand Up @@ -138,6 +156,8 @@ XdsConfigBuilder setRoute(RdsUpdate route) {
}

XdsConfigBuilder addCluster(String name, StatusOr<XdsClusterConfig> clusterConfig) {
checkNotNull(name, "name");
checkNotNull(clusterConfig, "clusterConfig");
clusters.put(name, clusterConfig);
return this;
}
Expand Down
135 changes: 71 additions & 64 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,27 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
String listenerName) {
logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
logger = XdsLogger.withLogId(logId);
this.xdsClient = xdsClient;
this.xdsConfigWatcher = xdsConfigWatcher;
this.syncContext = syncContext;
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");

// start the ball rolling
addWatcher(new LdsWatcher(listenerName));
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}

@Override
public ClusterSubscription subscribeToCluster(String clusterName) {
public Closeable subscribeToCluster(String clusterName) {

checkNotNull(clusterName, "clusterName");
ClusterSubscription subscription = new ClusterSubscription(clusterName);

Check warning on line 83 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L82-L83

Added lines #L82 - L83 were not covered by tests

Set<ClusterSubscription> localSubscriptions =
clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>());
localSubscriptions.add(subscription);
addWatcher(new CdsWatcher(clusterName));
syncContext.execute(() -> {
Set<ClusterSubscription> localSubscriptions =
clusterSubscriptions.computeIfAbsent(clusterName, k -> new HashSet<>());
localSubscriptions.add(subscription);
addWatcher(new CdsWatcher(clusterName));
});

Check warning on line 90 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L85-L90

Added lines #L85 - L90 were not covered by tests

return subscription;

Check warning on line 92 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L92

Added line #L92 was not covered by tests
}
Expand All @@ -95,50 +97,51 @@ private boolean hasWatcher(XdsResourceType<?> type, String resourceName) {
return typeWatchers != null && typeWatchers.watchers.containsKey(resourceName);
}

@SuppressWarnings("unchecked")
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;

this.syncContext.execute(() -> {
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
typeWatchers = new TypeWatchers<>(type);
resourceWatchers.put(type, typeWatchers);
}
@SuppressWarnings("unchecked")
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
typeWatchers = new TypeWatchers<>(type);
resourceWatchers.put(type, typeWatchers);
}

typeWatchers.add(resourceName, watcher);
xdsClient.watchXdsResource(type, resourceName, watcher);
});
typeWatchers.add(resourceName, watcher);
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
}

@SuppressWarnings("unchecked")
private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();

if (watcher == null) {
return;

Check warning on line 120 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L120

Added line #L120 was not covered by tests
}

XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;

this.syncContext.execute(() -> {
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
return;
}
@SuppressWarnings("unchecked")
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
return;

Check warning on line 130 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L129-L130

Added lines #L129 - L130 were not covered by tests
}

typeWatchers.watchers.remove(resourceName);
xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
});
typeWatchers.watchers.remove(resourceName);
xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);

}

public void shutdown() {
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
shutdownWatchersForType(watchers);
}
resourceWatchers.clear();
syncContext.execute(() -> {
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
shutdownWatchersForType(watchers);
}
resourceWatchers.clear();
});
}

private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
Expand All @@ -151,21 +154,21 @@ private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T>
private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
String clusterName = subscription.getClusterName();
Set<ClusterSubscription> subscriptions = clusterSubscriptions.get(clusterName);
if (subscriptions == null) {
logger.log(DEBUG, "Subscription already released for {0}", clusterName);
return;
}

subscriptions.remove(subscription);
if (subscriptions.isEmpty()) {
clusterSubscriptions.remove(clusterName);
XdsWatcherBase<?> cdsWatcher =
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
cancelClusterWatcherTree((CdsWatcher) cdsWatcher);
syncContext.execute(() -> {
Set<ClusterSubscription> subscriptions = clusterSubscriptions.get(clusterName);

Check warning on line 158 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L155-L158

Added lines #L155 - L158 were not covered by tests
if (subscriptions == null || !subscriptions.remove(subscription)) {
logger.log(DEBUG, "Subscription already released for {0}", clusterName);
return;

Check warning on line 161 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L160-L161

Added lines #L160 - L161 were not covered by tests
}

maybePublishConfig();
}
if (subscriptions.isEmpty()) {
clusterSubscriptions.remove(clusterName);
XdsWatcherBase<?> cdsWatcher =
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
cancelClusterWatcherTree((CdsWatcher) cdsWatcher);
maybePublishConfig();

Check warning on line 169 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L165-L169

Added lines #L165 - L169 were not covered by tests
}
});
}

Check warning on line 172 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L171-L172

Added lines #L171 - L172 were not covered by tests

private void cancelClusterWatcherTree(CdsWatcher root) {
Expand Down Expand Up @@ -206,20 +209,18 @@ private void cancelClusterWatcherTree(CdsWatcher root) {
* the watchers.
*/
private void maybePublishConfig() {
syncContext.execute(() -> {
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(watcher -> !watcher.hasResult());
if (waitingOnResource) {
return;
}
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(watcher -> !watcher.hasResult());
if (waitingOnResource) {
return;
}

buildConfig();
xdsConfigWatcher.onUpdate(lastXdsConfig);
});
lastXdsConfig = buildConfig();
xdsConfigWatcher.onUpdate(lastXdsConfig);
}

private void buildConfig() {
private XdsConfig buildConfig() {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();

// Iterate watchers and build the XdsConfig
Expand Down Expand Up @@ -258,7 +259,7 @@ private void buildConfig() {
}
}

lastXdsConfig = builder.build();
return builder.build();
}

@Override
Expand Down Expand Up @@ -334,7 +335,8 @@ public void onError(Status error) {
protected void handleDoesNotExist(String resourceName) {
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
data = StatusOr.fromStatus(
Status.UNAVAILABLE.withDescription("No " + type + " resource: " + resourceName));
Status.UNAVAILABLE
.withDescription("No " + type.typeName() + " resource: " + resourceName));
transientError = false;
}

Expand Down Expand Up @@ -362,7 +364,7 @@ boolean isTransientError() {
}

String toContextString() {
return type + " resource: " + resourceName;
return type.typeName() + " resource: " + resourceName;

Check warning on line 367 in xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java#L367

Added line #L367 was not covered by tests
}
}

Expand Down Expand Up @@ -503,9 +505,11 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
syncContext.execute(() -> {
handleDoesNotExist(resourceName);
maybePublishConfig();
});
}

}

private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
Expand All @@ -523,7 +527,10 @@ public void onChanged(XdsEndpointResource.EdsUpdate update) {

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
syncContext.execute(() -> {
handleDoesNotExist(resourceName);
maybePublishConfig();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected RdsUpdate doParse(XdsResourceType.Args args, Message unpackedMessage)
(RouteConfiguration) unpackedMessage, FilterRegistry.getDefaultRegistry(), args);
}

static RdsUpdate processRouteConfiguration(
private static RdsUpdate processRouteConfiguration(
RouteConfiguration routeConfig, FilterRegistry filterRegistry, XdsResourceType.Args args)
throws ResourceInvalidException {
return new RdsUpdate(extractVirtualHosts(routeConfig, filterRegistry, args));
Expand Down
Loading

0 comments on commit f58a49a

Please sign in to comment.