Add new classes for eliminating xds config tears (#11740)

* Framework definition to support A74
This commit is contained in:
Larry Safran 2025-02-07 16:33:17 -08:00 committed by GitHub
parent 90b1c4fe94
commit 67fc2e156a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 2186 additions and 25 deletions

View File

@ -46,6 +46,7 @@ dependencies {
thirdpartyImplementation project(':grpc-protobuf'),
project(':grpc-stub')
compileOnly sourceSets.thirdparty.output
testCompileOnly sourceSets.thirdparty.output
implementation project(':grpc-stub'),
project(':grpc-core'),
project(':grpc-util'),
@ -59,6 +60,7 @@ dependencies {
libraries.protobuf.java.util
def nettyDependency = implementation project(':grpc-netty')
testImplementation project(':grpc-api')
testImplementation project(':grpc-rls')
testImplementation project(':grpc-inprocess')
testImplementation testFixtures(project(':grpc-core')),

View File

@ -0,0 +1,192 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import io.grpc.StatusOr;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Represents the xDS configuration tree for a specified Listener.
*/
final class XdsConfig {
private final LdsUpdate listener;
private final RdsUpdate route;
private final VirtualHost virtualHost;
private final ImmutableMap<String, StatusOr<XdsClusterConfig>> clusters;
private final int hashCode;
XdsConfig(LdsUpdate listener, RdsUpdate route, Map<String, StatusOr<XdsClusterConfig>> clusters,
VirtualHost virtualHost) {
this(listener, route, virtualHost, ImmutableMap.copyOf(clusters));
}
public XdsConfig(LdsUpdate listener, RdsUpdate route, VirtualHost virtualHost,
ImmutableMap<String, StatusOr<XdsClusterConfig>> clusters) {
this.listener = listener;
this.route = route;
this.virtualHost = virtualHost;
this.clusters = clusters;
hashCode = Objects.hash(listener, route, virtualHost, clusters);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof XdsConfig)) {
return false;
}
XdsConfig o = (XdsConfig) obj;
return hashCode() == o.hashCode() && Objects.equals(listener, o.listener)
&& Objects.equals(route, o.route) && Objects.equals(virtualHost, o.virtualHost)
&& Objects.equals(clusters, o.clusters);
}
@Override
public int hashCode() {
return hashCode;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("XdsConfig{")
.append("\n listener=").append(listener)
.append(",\n route=").append(route)
.append(",\n virtualHost=").append(virtualHost)
.append(",\n clusters=").append(clusters)
.append("\n}");
return builder.toString();
}
public LdsUpdate getListener() {
return listener;
}
public RdsUpdate getRoute() {
return route;
}
public VirtualHost getVirtualHost() {
return virtualHost;
}
public ImmutableMap<String, StatusOr<XdsClusterConfig>> getClusters() {
return clusters;
}
static final class XdsClusterConfig {
private final String clusterName;
private final CdsUpdate clusterResource;
private final StatusOr<EdsUpdate> endpoint; //Will be null for non-EDS clusters
XdsClusterConfig(String clusterName, CdsUpdate clusterResource,
StatusOr<EdsUpdate> endpoint) {
this.clusterName = checkNotNull(clusterName, "clusterName");
this.clusterResource = checkNotNull(clusterResource, "clusterResource");
this.endpoint = endpoint;
}
@Override
public int hashCode() {
int endpointHash = (endpoint != null) ? endpoint.hashCode() : 0;
return clusterName.hashCode() + clusterResource.hashCode() + endpointHash;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof XdsClusterConfig)) {
return false;
}
XdsClusterConfig o = (XdsClusterConfig) obj;
return Objects.equals(clusterName, o.clusterName)
&& Objects.equals(clusterResource, o.clusterResource)
&& Objects.equals(endpoint, o.endpoint);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("XdsClusterConfig{clusterName=").append(clusterName)
.append(", clusterResource=").append(clusterResource)
.append(", endpoint=").append(endpoint).append("}");
return builder.toString();
}
public String getClusterName() {
return clusterName;
}
public CdsUpdate getClusterResource() {
return clusterResource;
}
public StatusOr<EdsUpdate> getEndpoint() {
return endpoint;
}
}
static final class XdsConfigBuilder {
private LdsUpdate listener;
private RdsUpdate route;
private Map<String, StatusOr<XdsClusterConfig>> clusters = new HashMap<>();
private VirtualHost virtualHost;
XdsConfigBuilder setListener(LdsUpdate listener) {
this.listener = checkNotNull(listener, "listener");
return this;
}
XdsConfigBuilder setRoute(RdsUpdate route) {
this.route = checkNotNull(route, "route");
return this;
}
XdsConfigBuilder addCluster(String name, StatusOr<XdsClusterConfig> clusterConfig) {
checkNotNull(name, "name");
checkNotNull(clusterConfig, "clusterConfig");
clusters.put(name, clusterConfig);
return this;
}
XdsConfigBuilder setVirtualHost(VirtualHost virtualHost) {
this.virtualHost = checkNotNull(virtualHost, "virtualHost");
return this;
}
XdsConfig build() {
checkNotNull(listener, "listener");
checkNotNull(route, "route");
return new XdsConfig(listener, route, clusters, virtualHost);
}
}
public interface XdsClusterSubscriptionRegistry {
Closeable subscribeToCluster(String clusterName);
}
}

View File

@ -0,0 +1,769 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.grpc.InternalLogId;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
/**
* This class acts as a layer of indirection between the XdsClient and the NameResolver. It
* maintains the watchers for the xds resources and when an update is received, it either requests
* referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher. Each instance
* applies to a single data plane authority.
*/
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
private final XdsClient xdsClient;
private final XdsConfigWatcher xdsConfigWatcher;
private final SynchronizationContext syncContext;
private final String dataPlaneAuthority;
private final InternalLogId logId;
private final XdsLogger logger;
private XdsConfig lastXdsConfig = null;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName) {
logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
logger = XdsLogger.withLogId(logId);
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}
public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource: " + resourceName;
}
@Override
public Closeable subscribeToCluster(String clusterName) {
checkNotNull(clusterName, "clusterName");
ClusterSubscription subscription = new ClusterSubscription(clusterName);
syncContext.execute(() -> {
addClusterWatcher(clusterName, subscription, 1);
maybePublishConfig();
});
return subscription;
}
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;
@SuppressWarnings("unchecked")
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
typeWatchers = new TypeWatchers<>(type);
resourceWatchers.put(type, typeWatchers);
}
typeWatchers.add(resourceName, watcher);
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
}
private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
if (watcher == null) {
return;
}
watcher.parentContexts.remove(parentContext);
if (watcher.parentContexts.isEmpty()) {
cancelWatcher(watcher);
}
}
private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
if (watcher == null) {
return;
}
watcher.parentContexts.remove(parentContext);
if (watcher.parentContexts.isEmpty()) {
cancelWatcher(watcher);
}
}
private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
if (watcher == null) {
return;
}
if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
throwIfParentContextsNotEmpty(watcher);
}
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;
@SuppressWarnings("unchecked")
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
if (typeWatchers == null) {
logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
return;
}
typeWatchers.watchers.remove(resourceName);
xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
}
private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
if (watcher instanceof CdsWatcher) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
if (!cdsWatcher.parentContexts.isEmpty()) {
String msg = String.format("CdsWatcher %s has parent contexts %s",
cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
throw new IllegalStateException(msg);
}
} else if (watcher instanceof EdsWatcher) {
EdsWatcher edsWatcher = (EdsWatcher) watcher;
if (!edsWatcher.parentContexts.isEmpty()) {
String msg = String.format("CdsWatcher %s has parent contexts %s",
edsWatcher.resourceName(), edsWatcher.parentContexts);
throw new IllegalStateException(msg);
}
}
}
public void shutdown() {
syncContext.execute(() -> {
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
shutdownWatchersForType(watchers);
}
resourceWatchers.clear();
});
}
private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
}
}
private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
String clusterName = subscription.getClusterName();
syncContext.execute(() -> {
XdsWatcherBase<?> cdsWatcher =
resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
if (cdsWatcher == null) {
return; // already released while waiting for the syncContext
}
cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
maybePublishConfig();
});
}
private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
checkNotNull(root, "root");
cancelCdsWatcher(root, parentContext);
if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
return;
}
XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
switch (cdsUpdate.clusterType()) {
case EDS:
String edsServiceName = cdsUpdate.edsServiceName();
EdsWatcher edsWatcher =
(EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
cancelEdsWatcher(edsWatcher, root);
break;
case AGGREGATE:
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
CdsWatcher clusterWatcher =
(CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
if (clusterWatcher != null) {
cancelClusterWatcherTree(clusterWatcher, root);
}
}
break;
case LOGICAL_DNS:
// no eds needed
break;
default:
throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
}
}
/**
* Check if all resources have results, and if so, generate a new XdsConfig and send it to all
* the watchers.
*/
private void maybePublishConfig() {
syncContext.throwIfNotInThisSynchronizationContext();
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(XdsWatcherBase::missingResult);
if (waitingOnResource) {
return;
}
XdsConfig newConfig = buildConfig();
if (Objects.equals(newConfig, lastXdsConfig)) {
return;
}
lastXdsConfig = newConfig;
xdsConfigWatcher.onUpdate(lastXdsConfig);
}
@VisibleForTesting
XdsConfig buildConfig() {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
// Iterate watchers and build the XdsConfig
// Will only be 1 listener and 1 route resource
VirtualHost activeVirtualHost = getActiveVirtualHost();
for (XdsWatcherBase<?> xdsWatcherBase :
resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) {
XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue();
builder.setListener(ldsUpdate);
if (activeVirtualHost == null) {
activeVirtualHost = RoutingUtils.findVirtualHostForHostName(
ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority);
}
if (ldsUpdate.httpConnectionManager() != null
&& ldsUpdate.httpConnectionManager().virtualHosts() != null) {
RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts());
builder.setRoute(rdsUpdate);
}
}
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
builder.setVirtualHost(activeVirtualHost);
Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
// Iterate CDS watchers
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
String clusterName = cdsWatcher.resourceName();
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdate = cdsWatcher.getData();
if (cdsUpdate.hasValue()) {
XdsConfig.XdsClusterConfig clusterConfig;
String edsName = cdsUpdate.getValue().edsServiceName();
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName);
// Only EDS type clusters have endpoint data
StatusOr<XdsEndpointResource.EdsUpdate> data =
edsWatcher != null ? edsWatcher.getData() : null;
clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data);
builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig));
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus()));
}
}
return builder.build();
}
@Override
public String toString() {
return logId.toString();
}
private static class TypeWatchers<T extends ResourceUpdate> {
// Key is resource name
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
final XdsResourceType<T> resourceType;
TypeWatchers(XdsResourceType<T> resourceType) {
this.resourceType = resourceType;
}
public void add(String resourceName, XdsWatcherBase<T> watcher) {
watchers.put(resourceName, watcher);
}
}
public interface XdsConfigWatcher {
void onUpdate(XdsConfig config);
// These 2 methods are invoked when there is an error or
// does-not-exist on LDS or RDS only. The context will be a
// human-readable string indicating the scope in which the error
// occurred (e.g., the resource type and name).
void onError(String resourceContext, Status status);
void onResourceDoesNotExist(String resourceContext);
}
private class ClusterSubscription implements Closeable {
String clusterName;
public ClusterSubscription(String clusterName) {
this.clusterName = clusterName;
}
public String getClusterName() {
return clusterName;
}
@Override
public void close() throws IOException {
releaseSubscription(this);
}
}
private abstract static class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T> {
private final XdsResourceType<T> type;
private final String resourceName;
@Nullable
private StatusOr<T> data;
private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
this.type = checkNotNull(type, "type");
this.resourceName = checkNotNull(resourceName, "resourceName");
}
@Override
public void onError(Status error) {
checkNotNull(error, "error");
setDataAsStatus(error);
}
protected void handleDoesNotExist(String resourceName) {
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString()));
}
boolean missingResult() {
return data == null;
}
@Nullable
StatusOr<T> getData() {
return data;
}
boolean hasDataValue() {
return data != null && data.hasValue();
}
String resourceName() {
return resourceName;
}
protected void setData(T data) {
checkNotNull(data, "data");
this.data = StatusOr.fromValue(data);
}
protected void setDataAsStatus(Status status) {
checkNotNull(status, "status");
this.data = StatusOr.fromStatus(status);
}
String toContextString() {
return toContextStr(type.typeName(), resourceName);
}
}
private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate> {
String rdsName;
private LdsWatcher(String resourceName) {
super(XdsListenerResource.getInstance(), resourceName);
}
@Override
public void onChanged(XdsListenerResource.LdsUpdate update) {
checkNotNull(update, "update");
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
String rdsName = httpConnectionManager.rdsName();
VirtualHost activeVirtualHost = getActiveVirtualHost();
boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
if (changedRdsName) {
cleanUpRdsWatcher();
}
if (virtualHosts != null) {
// No RDS watcher since we are getting RDS updates via LDS
updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
this.rdsName = null;
} else if (changedRdsName) {
cleanUpRdsWatcher();
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
}
setData(update);
maybePublishConfig();
}
@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
xdsConfigWatcher.onError(toContextString(), error);
}
@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}
private void cleanUpRdsWatcher() {
RdsWatcher oldRdsWatcher = getRdsWatcher();
if (oldRdsWatcher != null) {
cancelWatcher(oldRdsWatcher);
logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
// Cleanup clusters (as appropriate) that had the old rds watcher as a parent
if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue()
|| resourceWatchers.get(CLUSTER_RESOURCE) == null) {
return;
}
for (XdsWatcherBase<?> watcher :
resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
}
}
}
private RdsWatcher getRdsWatcher() {
TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) {
return null;
}
return (RdsWatcher) watchers.watchers.get(rdsName);
}
}
private class RdsWatcher extends XdsWatcherBase<RdsUpdate> {
public RdsWatcher(String resourceName) {
super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
}
@Override
public void onChanged(RdsUpdate update) {
checkNotNull(update, "update");
RdsUpdate oldData = hasDataValue() ? getData().getValue() : null;
VirtualHost oldVirtualHost =
(oldData != null)
? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
: null;
setData(update);
updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
maybePublishConfig();
}
@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
xdsConfigWatcher.onError(toContextString(), error);
}
@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}
ImmutableList<String> getCdsNames() {
if (!hasDataValue() || getData().getValue().virtualHosts == null) {
return ImmutableList.of();
}
return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost()));
}
}
private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
Map<Object, Integer> parentContexts = new HashMap<>();
CdsWatcher(String resourceName, Object parentContext, int depth) {
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
}
@Override
public void onChanged(XdsClusterResource.CdsUpdate update) {
checkNotNull(update, "update");
switch (update.clusterType()) {
case EDS:
setData(update);
if (!addEdsWatcher(update.edsServiceName(), this)) {
maybePublishConfig();
}
break;
case LOGICAL_DNS:
setData(update);
maybePublishConfig();
// no eds needed
break;
case AGGREGATE:
Object parentContext = this;
int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
logger.log(XdsLogger.XdsLogLevel.WARNING,
"Cluster recursion depth limit exceeded for cluster {0}", resourceName());
Status error = Status.UNAVAILABLE.withDescription(
"aggregate cluster graph exceeds max depth");
setDataAsStatus(error);
}
if (hasDataValue()) {
Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
deletedClusters.forEach((cluster)
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));
if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
setData(update);
Set<String> addedClusters = Sets.difference(newNames, oldNames);
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
if (addedClusters.isEmpty()) {
maybePublishConfig();
}
} else { // data was set to error status above
maybePublishConfig();
}
} else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
setData(update);
update.prioritizedClusterNames()
.forEach(name -> addClusterWatcher(name, parentContext, depth));
maybePublishConfig();
}
break;
default:
Status error = Status.UNAVAILABLE.withDescription(
"aggregate cluster graph exceeds max depth");
setDataAsStatus(error);
maybePublishConfig();
}
}
@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
maybePublishConfig();
}
}
// Returns true if the watcher was added, false if it already exists
private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
addWatcher(new EdsWatcher(edsServiceName, parentContext));
return true;
}
EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
return false;
}
private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
if (clusterWatchers != null) {
CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
if (watcher != null) {
watcher.parentContexts.put(parentContext, depth);
return;
}
}
addWatcher(new CdsWatcher(clusterName, parentContext, depth));
}
private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
private final Set<CdsWatcher> parentContexts = new HashSet<>();
private EdsWatcher(String resourceName, CdsWatcher parentContext) {
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
parentContexts.add(checkNotNull(parentContext, "parentContext"));
}
@Override
public void onChanged(XdsEndpointResource.EdsUpdate update) {
setData(checkNotNull(update, "update"));
maybePublishConfig();
}
@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
maybePublishConfig();
}
void addParentContext(CdsWatcher parentContext) {
parentContexts.add(checkNotNull(parentContext, "parentContext"));
}
}
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
VirtualHost oldVirtualHost, boolean sameParentContext) {
VirtualHost virtualHost =
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
if (virtualHost == null) {
String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
logger.log(XdsLogger.XdsLogLevel.WARNING, error);
cleanUpRoutes();
xdsConfigWatcher.onError(
"xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
return;
}
Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
if (sameParentContext) {
// Calculate diffs.
Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
deletedClusters.forEach(watcher ->
cancelClusterWatcherTree(getCluster(watcher), newParentContext));
addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
} else {
newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
}
}
private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
if (virtualHost == null) {
return Collections.emptySet();
}
// Get all cluster names to which requests can be routed through the virtual host.
Set<String> clusters = new HashSet<>();
for (VirtualHost.Route route : virtualHost.routes()) {
VirtualHost.Route.RouteAction action = route.routeAction();
if (action == null) {
continue;
}
if (action.cluster() != null) {
clusters.add(action.cluster());
} else if (action.weightedClusters() != null) {
for (ClusterWeight weighedCluster : action.weightedClusters()) {
clusters.add(weighedCluster.name());
}
}
}
return clusters;
}
@Nullable
private VirtualHost getActiveVirtualHost() {
TypeWatchers<?> rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsWatchers == null) {
return null;
}
RdsWatcher activeRdsWatcher =
(RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null);
if (activeRdsWatcher == null || activeRdsWatcher.missingResult()
|| !activeRdsWatcher.getData().hasValue()) {
return null;
}
return RoutingUtils.findVirtualHostForHostName(
activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority);
}
// Must be in SyncContext
private void cleanUpRoutes() {
// Remove RdsWatcher & CDS Watchers
TypeWatchers<?> rdsResourceWatcher =
resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) {
return;
}
XdsWatcherBase<?> watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get();
cancelWatcher(watcher);
// Remove CdsWatchers pointed to by the RdsWatcher
RdsWatcher rdsWatcher = (RdsWatcher) watcher;
for (String cName : rdsWatcher.getCdsNames()) {
CdsWatcher cdsWatcher = getCluster(cName);
if (cdsWatcher != null) {
cancelClusterWatcherTree(cdsWatcher, rdsWatcher);
}
}
}
private CdsWatcher getCluster(String clusterName) {
return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
}
}

View File

@ -582,8 +582,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo,
args.nonce);
controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
logger.log(XdsLogLevel.WARNING,

View File

@ -24,7 +24,6 @@ import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
@ -45,7 +44,6 @@ import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.NonForwardingAction;
import io.envoyproxy.envoy.config.route.v3.Route;
import io.envoyproxy.envoy.config.route.v3.RouteAction;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
@ -239,24 +237,7 @@ public class ControlPlaneRule extends TestWatcher {
* Builds a new default RDS configuration.
*/
static RouteConfiguration buildRouteConfiguration(String authority) {
return buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME);
}
static RouteConfiguration buildRouteConfiguration(String authority, String rdsName,
String clusterName) {
VirtualHost.Builder vhBuilder = VirtualHost.newBuilder()
.setName(rdsName)
.addDomains(authority)
.addRoutes(
Route.newBuilder()
.setMatch(
RouteMatch.newBuilder().setPrefix("/").build())
.setRoute(
RouteAction.newBuilder().setCluster(clusterName)
.setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build())
.build()));
VirtualHost virtualHost = vhBuilder.build();
return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build();
return XdsTestUtils.buildRouteConfiguration(authority, RDS_NAME, CLUSTER_NAME);
}
/**

View File

@ -205,7 +205,7 @@ public class XdsClientFallbackTest {
ControlPlaneRule.buildClientListener(MAIN_SERVER, serverName));
controlPlane.setRdsConfig(rdsName,
ControlPlaneRule.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName));
XdsTestUtils.buildRouteConfiguration(MAIN_SERVER, rdsName, clusterName));
controlPlane.setCdsConfig(clusterName, ControlPlaneRule.buildCluster(clusterName, edsName));
controlPlane.setEdsConfig(edsName,

View File

@ -0,0 +1,784 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS;
import static io.grpc.xds.XdsTestUtils.CLUSTER_NAME;
import static io.grpc.xds.XdsTestUtils.ENDPOINT_HOSTNAME;
import static io.grpc.xds.XdsTestUtils.ENDPOINT_PORT;
import static io.grpc.xds.XdsTestUtils.RDS_NAME;
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/** Unit tests for {@link XdsDependencyManager}. */
@RunWith(JUnit4.class)
public class XdsDependencyManagerTest {
private static final Logger log = Logger.getLogger(XdsDependencyManagerTest.class.getName());
public static final String CLUSTER_TYPE_NAME = XdsClusterResource.getInstance().typeName();
public static final String ENDPOINT_TYPE_NAME = XdsEndpointResource.getInstance().typeName();
@Mock
private XdsClientMetricReporter xdsClientMetricReporter;
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(Thread.UncaughtExceptionHandler.class));
private ManagedChannel channel;
private XdsClientImpl xdsClient;
private XdsDependencyManager xdsDependencyManager;
private TestWatcher xdsConfigWatcher;
private Server xdsServer;
private final FakeClock fakeClock = new FakeClock();
private final String serverName = InProcessServerBuilder.generateName();
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
private final AtomicBoolean lrsEnded = new AtomicBoolean(true);
private final XdsTestControlPlaneService controlPlaneService = new XdsTestControlPlaneService();
private final BindableService lrsService =
XdsTestUtils.createLrsService(lrsEnded, loadReportCalls);
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
private TestWatcher testWatcher;
private XdsConfig defaultXdsConfig; // set in setUp()
@Captor
private ArgumentCaptor<XdsConfig> xdsConfigCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@Before
public void setUp() throws Exception {
xdsServer = cleanupRule.register(InProcessServerBuilder
.forName(serverName)
.addService(controlPlaneService)
.addService(lrsService)
.directExecutor()
.build()
.start());
XdsTestUtils.setAdsConfig(controlPlaneService, serverName);
channel = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsTransportFactory xdsTransportFactory =
ignore -> new GrpcXdsTransportFactory.GrpcXdsTransport(channel);
xdsClient = CommonBootstrapperTestUtils.createXdsClient(
Collections.singletonList(SERVER_URI), xdsTransportFactory, fakeClock,
new ExponentialBackoffPolicy.Provider(), MessagePrinter.INSTANCE, xdsClientMetricReporter);
testWatcher = new TestWatcher();
xdsConfigWatcher = mock(TestWatcher.class, delegatesTo(testWatcher));
defaultXdsConfig = XdsTestUtils.getDefaultXdsConfig(serverName);
}
@After
public void tearDown() throws InterruptedException {
if (xdsDependencyManager != null) {
xdsDependencyManager.shutdown();
}
xdsClient.shutdown();
channel.shutdown(); // channel not owned by XdsClient
xdsServer.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
assertThat(adsEnded.get()).isTrue();
assertThat(lrsEnded.get()).isTrue();
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
@Test
public void verify_basic_config() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
testWatcher.verifyStats(1, 0, 0);
}
@Test
public void verify_config_update() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
testWatcher.verifyStats(1, 0, 0);
assertThat(testWatcher.lastConfig).isEqualTo(defaultXdsConfig);
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2",
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT + 2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(ArgumentMatchers.notNull());
testWatcher.verifyStats(2, 0, 0);
assertThat(testWatcher.lastConfig).isNotEqualTo(defaultXdsConfig);
}
@Test
public void verify_simple_aggregate() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
String rootName = "root_c";
RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, rootName);
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
XdsTestUtils.setAggregateCdsConfig(controlPlaneService, serverName, rootName, childNames);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Map<String, StatusOr<XdsConfig.XdsClusterConfig>> lastConfigClusters =
testWatcher.lastConfig.getClusters();
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
StatusOr<XdsConfig.XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);
for (String childName : childNames) {
assertThat(lastConfigClusters).containsKey(childName);
XdsClusterResource.CdsUpdate childResource =
lastConfigClusters.get(childName).getValue().getClusterResource();
assertThat(childResource.clusterType()).isEqualTo(EDS);
assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName));
StatusOr<XdsEndpointResource.EdsUpdate> endpoint =
lastConfigClusters.get(childName).getValue().getEndpoint();
assertThat(endpoint.hasValue()).isTrue();
assertThat(endpoint.getValue().clusterName).isEqualTo(getEdsNameForCluster(childName));
}
}
@Test
public void testComplexRegisteredAggregate() throws IOException {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
// Do initialization
String rootName1 = "root_c";
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);
String rootName2 = "root_2";
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Closeable subscription2 = xdsDependencyManager.subscribeToCluster(rootName2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
testWatcher.verifyStats(3, 0, 0);
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
Set<String> expectedClusters = builder.add(rootName1).add(rootName2).add(CLUSTER_NAME)
.addAll(childNames).addAll(childNames2).build();
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters);
// Close 1 subscription shouldn't affect the other or RDS subscriptions
subscription1.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
builder = ImmutableSet.builder();
Set<String> expectedClusters2 =
builder.add(rootName2).add(CLUSTER_NAME).addAll(childNames2).build();
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters2);
subscription2.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
}
@Test
public void testDelayedSubscription() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
String rootName1 = "root_c";
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
assertThat(subscription1).isNotNull();
fakeClock.forwardTime(16, TimeUnit.SECONDS);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).toString()).isEqualTo(
StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString());
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue();
}
@Test
public void testMissingCdsAndEds() {
// update config so that agg cluster references 2 existing & 1 non-existing cluster
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
Cluster cluster = XdsTestUtils.buildAggCluster(CLUSTER_NAME, childNames);
Map<String, Message> clusterMap = new HashMap<>();
Map<String, Message> edsMap = new HashMap<>();
clusterMap.put(CLUSTER_NAME, cluster);
for (int i = 0; i < childNames.size() - 1; i++) {
String edsName = XdsTestUtils.EDS_NAME + i;
Cluster child = ControlPlaneRule.buildCluster(childNames.get(i), edsName);
clusterMap.put(childNames.get(i), child);
}
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
// Update config so that one of the 2 "valid" clusters has an EDS resource, the other does not
// and there is an EDS that doesn't have matching clusters
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, XdsTestUtils.EDS_NAME + 0);
edsMap.put(XdsTestUtils.EDS_NAME + 0, clusterLoadAssignment);
clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, "garbageEds");
edsMap.put("garbageEds", clusterLoadAssignment);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
List<StatusOr<XdsConfig.XdsClusterConfig>> returnedClusters = new ArrayList<>();
for (String childName : childNames) {
returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName));
}
// Check that missing cluster reported Status and the other 2 are present
Status expectedClusterStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2)));
StatusOr<XdsConfig.XdsClusterConfig> missingCluster = returnedClusters.get(2);
assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString());
assertThat(returnedClusters.get(0).hasValue()).isTrue();
assertThat(returnedClusters.get(1).hasValue()).isTrue();
// Check that missing EDS reported Status, the other one is present and the garbage EDS is not
Status expectedEdsStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1));
assertThat(returnedClusters.get(0).getValue().getEndpoint().hasValue()).isTrue();
assertThat(returnedClusters.get(1).getValue().getEndpoint().hasValue()).isFalse();
assertThat(returnedClusters.get(1).getValue().getEndpoint().getStatus().toString())
.isEqualTo(expectedEdsStatus.toString());
verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any());
testWatcher.verifyStats(1, 0, 0);
}
@Test
public void testMissingLds() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, "badLdsName");
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist(
toContextStr(XdsListenerResource.getInstance().typeName(), "badLdsName"));
testWatcher.verifyStats(0, 0, 1);
}
@Test
public void testMissingRds() {
Listener serverListener = ControlPlaneRule.buildServerListener();
Listener clientListener =
ControlPlaneRule.buildClientListener(serverName, serverName, "badRdsName");
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist(
toContextStr(XdsRouteConfigureResource.getInstance().typeName(), "badRdsName"));
testWatcher.verifyStats(0, 0, 1);
}
@Test
public void testUpdateToMissingVirtualHost() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
WrappedXdsClient wrappedXdsClient = new WrappedXdsClient(xdsClient, syncContext);
xdsDependencyManager = new XdsDependencyManager(
wrappedXdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
// Update with a config that has a virtual host that doesn't match the server name
wrappedXdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts());
inOrder.verify(xdsConfigWatcher, timeout(1000)).onError(any(), statusCaptor.capture());
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Failed to find virtual host matching hostname: " + serverName);
testWatcher.verifyStats(1, 1, 0);
wrappedXdsClient.shutdown();
}
private List<io.grpc.xds.VirtualHost> buildUnmatchedVirtualHosts() {
io.grpc.xds.VirtualHost.Route route1 =
io.grpc.xds.VirtualHost.Route.forAction(
io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/GreetService/bye"),
io.grpc.xds.VirtualHost.Route.RouteAction.forCluster(
"cluster-bar.googleapis.com", Collections.emptyList(),
TimeUnit.SECONDS.toNanos(15L), null, false), ImmutableMap.of());
io.grpc.xds.VirtualHost.Route route2 =
io.grpc.xds.VirtualHost.Route.forAction(
io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/HelloService/hi"),
io.grpc.xds.VirtualHost.Route.RouteAction.forCluster(
"cluster-foo.googleapis.com", Collections.emptyList(),
TimeUnit.SECONDS.toNanos(15L), null, false),
ImmutableMap.of());
return Arrays.asList(
io.grpc.xds.VirtualHost.create("virtualhost-foo", Collections.singletonList("hello"
+ ".googleapis.com"),
Collections.singletonList(route1),
ImmutableMap.of()),
io.grpc.xds.VirtualHost.create("virtualhost-bar", Collections.singletonList("hi"
+ ".googleapis.com"),
Collections.singletonList(route2),
ImmutableMap.of()));
}
@Test
public void testCorruptLds() {
String ldsResourceName =
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, ldsResourceName);
Status expectedStatus = Status.INVALID_ARGUMENT.withDescription(
"Wrong configuration: xds server does not exist for resource " + ldsResourceName);
String context = toContextStr(XdsListenerResource.getInstance().typeName(), ldsResourceName);
verify(xdsConfigWatcher, timeout(1000))
.onError(eq(context), argThat(new XdsTestUtils.StatusMatcher(expectedStatus)));
fakeClock.forwardTime(16, TimeUnit.SECONDS);
testWatcher.verifyStats(0, 1, 0);
}
@Test
public void testChangeRdsName_fromLds() {
// TODO implement
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
Listener serverListener = ControlPlaneRule.buildServerListener();
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
String newRdsName = "newRdsName1";
Listener clientListener = buildInlineClientListener(newRdsName, CLUSTER_NAME);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue()).isNotEqualTo(defaultXdsConfig);
assertThat(xdsConfigCaptor.getValue().getVirtualHost().name()).isEqualTo(newRdsName);
}
@Test
public void testMultipleParentsInCdsTree() throws IOException {
/*
* Configure Xds server with the following cluster tree and point RDS to root:
2 aggregates under root A & B
B has EDS Cluster B1 && shared agg AB1; A has agg A1 && shared agg AB1
A1 has shared EDS Cluster A11 && shared agg AB1
AB1 has shared EDS Clusters A11 && AB11
As an alternate visualization, parents are:
A -> root, B -> root, A1 -> A, AB1 -> A|B|A1, B1 -> B, A11 -> A1|AB1, AB11 -> AB1
*/
Cluster rootCluster =
XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB"));
Cluster clusterA =
XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1", "clusterAB1"));
Cluster clusterB =
XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterB1", "clusterAB1"));
Cluster clusterA1 =
XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterAB1"));
Cluster clusterAB1 =
XdsTestUtils.buildAggCluster("clusterAB1", Arrays.asList("clusterA11", "clusterAB11"));
Map<String, Message> clusterMap = new HashMap<>();
Map<String, Message> edsMap = new HashMap<>();
clusterMap.put("root", rootCluster);
clusterMap.put("clusterA", clusterA);
clusterMap.put("clusterB", clusterB);
clusterMap.put("clusterA1", clusterA1);
clusterMap.put("clusterAB1", clusterAB1);
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterAB11", "clusterB1");
RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "root");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// Start the actual test
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig initialConfig = xdsConfigCaptor.getValue();
// Make sure that adding subscriptions that rds points at doesn't change the config
Closeable rootSub = xdsDependencyManager.subscribeToCluster("root");
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
Closeable clusterAB11Sub = xdsDependencyManager.subscribeToCluster("clusterAB11");
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
// Make sure that closing subscriptions that rds points at doesn't change the config
rootSub.close();
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
clusterAB11Sub.close();
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
// Make an explicit root subscription and then change RDS to point to A11
rootSub = xdsDependencyManager.subscribeToCluster("root");
RouteConfiguration newRouteConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA11");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(8);
// Now that it is released, we should only have A11
rootSub.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).containsExactly("clusterA11");
}
@Test
public void testMultipleCdsReferToSameEds() {
// Create the maps and Update the config to have 2 clusters that refer to the same EDS resource
String edsName = "sharedEds";
Cluster rootCluster =
XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB"));
Cluster clusterA = ControlPlaneRule.buildCluster("clusterA", edsName);
Cluster clusterB = ControlPlaneRule.buildCluster("clusterB", edsName);
Map<String, Message> clusterMap = new HashMap<>();
clusterMap.put("root", rootCluster);
clusterMap.put("clusterA", clusterA);
clusterMap.put("clusterB", clusterB);
Map<String, Message> edsMap = new HashMap<>();
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, edsName);
edsMap.put(edsName, clusterLoadAssignment);
RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "root");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// Start the actual test
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig initialConfig = xdsConfigCaptor.getValue();
assertThat(initialConfig.getClusters().keySet())
.containsExactly("root", "clusterA", "clusterB");
XdsEndpointResource.EdsUpdate edsForA =
initialConfig.getClusters().get("clusterA").getValue().getEndpoint().getValue();
assertThat(edsForA.clusterName).isEqualTo(edsName);
XdsEndpointResource.EdsUpdate edsForB =
initialConfig.getClusters().get("clusterB").getValue().getEndpoint().getValue();
assertThat(edsForB.clusterName).isEqualTo(edsName);
assertThat(edsForA).isEqualTo(edsForB);
edsForA.localityLbEndpointsMap.values().forEach(
localityLbEndpoints -> assertThat(localityLbEndpoints.endpoints()).hasSize(1));
}
@Test
public void testChangeRdsName_FromLds_complexTree() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
// Create the same tree as in testMultipleParentsInCdsTree
Cluster rootCluster =
XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA", "clusterB"));
Cluster clusterA =
XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1", "clusterAB1"));
Cluster clusterB =
XdsTestUtils.buildAggCluster("clusterB", Arrays.asList("clusterB1", "clusterAB1"));
Cluster clusterA1 =
XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterAB1"));
Cluster clusterAB1 =
XdsTestUtils.buildAggCluster("clusterAB1", Arrays.asList("clusterA11", "clusterAB11"));
Map<String, Message> clusterMap = new HashMap<>();
Map<String, Message> edsMap = new HashMap<>();
clusterMap.put("root", rootCluster);
clusterMap.put("clusterA", clusterA);
clusterMap.put("clusterB", clusterB);
clusterMap.put("clusterA1", clusterA1);
clusterMap.put("clusterAB1", clusterAB1);
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterAB11", "clusterB1");
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any());
// Do the test
String newRdsName = "newRdsName1";
Listener clientListener = buildInlineClientListener(newRdsName, "root");
Listener serverListener = ControlPlaneRule.buildServerListener();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig config = xdsConfigCaptor.getValue();
assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName);
assertThat(config.getClusters().size()).isEqualTo(8);
}
@Test
public void testChangeAggCluster() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any());
// Setup initial config A -> A1 -> (A11, A12)
Cluster rootCluster =
XdsTestUtils.buildAggCluster("root", Arrays.asList("clusterA"));
Cluster clusterA =
XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA1"));
Cluster clusterA1 =
XdsTestUtils.buildAggCluster("clusterA1", Arrays.asList("clusterA11", "clusterA12"));
Map<String, Message> clusterMap = new HashMap<>();
Map<String, Message> edsMap = new HashMap<>();
clusterMap.put("root", rootCluster);
clusterMap.put("clusterA", clusterA);
clusterMap.put("clusterA1", clusterA1);
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterA12");
Listener clientListener = buildInlineClientListener(RDS_NAME, "root");
Listener serverListener = ControlPlaneRule.buildServerListener();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
inOrder.verify(xdsConfigWatcher).onUpdate(any());
// Update the cluster to A -> A2 -> (A21, A22)
Cluster clusterA2 =
XdsTestUtils.buildAggCluster("clusterA2", Arrays.asList("clusterA21", "clusterA22"));
clusterA =
XdsTestUtils.buildAggCluster("clusterA", Arrays.asList("clusterA2"));
clusterMap.clear();
edsMap.clear();
clusterMap.put("root", rootCluster);
clusterMap.put("clusterA", clusterA);
clusterMap.put("clusterA2", clusterA2);
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA21", "clusterA22");
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// Verify that the config is updated as expected
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig config = xdsConfigCaptor.getValue();
assertThat(config.getClusters().keySet()).containsExactly("root", "clusterA", "clusterA2",
"clusterA21", "clusterA22");
}
private Listener buildInlineClientListener(String rdsName, String clusterName) {
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
}
private static String toContextStr(String type, String resourceName) {
return type + " resource: " + resourceName;
}
private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher {
XdsConfig lastConfig;
int numUpdates = 0;
int numError = 0;
int numDoesNotExist = 0;
@Override
public void onUpdate(XdsConfig config) {
log.fine("Config changed: " + config);
lastConfig = config;
numUpdates++;
}
@Override
public void onError(String resourceContext, Status status) {
log.fine(String.format("Error %s for %s: ", status, resourceContext));
numError++;
}
@Override
public void onResourceDoesNotExist(String resourceName) {
log.fine("Resource does not exist: " + resourceName);
numDoesNotExist++;
}
private List<Integer> getStats() {
return Arrays.asList(numUpdates, numError, numDoesNotExist);
}
private void verifyStats(int updt, int err, int notExist) {
assertThat(getStats()).isEqualTo(Arrays.asList(updt, err, notExist));
}
}
private static class WrappedXdsClient extends XdsClient {
private final XdsClient delegate;
private final SynchronizationContext syncContext;
private ResourceWatcher<LdsUpdate> ldsWatcher;
WrappedXdsClient(XdsClient delegate, SynchronizationContext syncContext) {
this.delegate = delegate;
this.syncContext = syncContext;
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
@SuppressWarnings("unchecked")
public <T extends ResourceUpdate> void watchXdsResource(
XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher,
Executor executor) {
if (type.equals(XdsListenerResource.getInstance())) {
ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher;
}
delegate.watchXdsResource(type, resourceName, watcher, executor);
}
@Override
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
delegate.cancelXdsResourceWatch(type, resourceName, watcher);
}
void deliverLdsUpdate(long httpMaxStreamDurationNano,
List<io.grpc.xds.VirtualHost> virtualHosts) {
syncContext.execute(() -> {
LdsUpdate ldsUpdate = LdsUpdate.forApiListener(
io.grpc.xds.HttpConnectionManager.forVirtualHosts(
httpMaxStreamDurationNano, virtualHosts, null));
ldsWatcher.onChanged(ldsUpdate);
});
}
}
}

View File

@ -106,7 +106,7 @@ final class XdsTestControlPlaneService extends
public void run() {
HashMap<String, Message> copyResources = new HashMap<>(resources);
xdsResources.put(type, copyResources);
String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement());
String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndIncrement());
for (Map.Entry<StreamObserver<DiscoveryResponse>, Set<String>> entry :
subscribers.get(type).entrySet()) {
@ -119,6 +119,11 @@ final class XdsTestControlPlaneService extends
});
}
ImmutableMap<String, Message> getCurrentConfig(String type) {
HashMap<String, Message> hashMap = xdsResources.get(type);
return (hashMap != null) ? ImmutableMap.copyOf(hashMap) : ImmutableMap.of();
}
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
@ -159,7 +164,7 @@ final class XdsTestControlPlaneService extends
DiscoveryResponse response = generateResponse(resourceType,
String.valueOf(xdsVersions.get(resourceType)),
String.valueOf(xdsNonces.get(resourceType).get(responseObserver)),
String.valueOf(xdsNonces.get(resourceType).get(responseObserver).addAndGet(1)),
requestedResourceNames);
responseObserver.onNext(response);
subscribers.get(resourceType).put(responseObserver, requestedResourceNames);

View File

@ -0,0 +1,423 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_LDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.Message;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats;
import io.envoyproxy.envoy.config.listener.v3.ApiListener;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.Route;
import io.envoyproxy.envoy.config.route.v3.RouteAction;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.internal.JsonParser;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsResourceType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
public class XdsTestUtils {
private static final Logger log = Logger.getLogger(XdsTestUtils.class.getName());
static final String RDS_NAME = "route-config.googleapis.com";
static final String CLUSTER_NAME = "cluster0";
static final String EDS_NAME = "eds-service-0";
static final String SERVER_LISTENER = "grpc/server?udpa.resource.listening_address=";
static final String HTTP_CONNECTION_MANAGER_TYPE_URL =
"type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3"
+ ".HttpConnectionManager";
public static final String ENDPOINT_HOSTNAME = "data-host";
public static final int ENDPOINT_PORT = 1234;
static BindableService createLrsService(AtomicBoolean lrsEnded,
Queue<LrsRpcCall> loadReportCalls) {
return new LoadReportingServiceGrpc.LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(lrsEnded.get()).isTrue();
lrsEnded.set(false);
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
LrsRpcCall call = new LrsRpcCall(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
}
static boolean matchErrorDetail(
com.google.rpc.Status errorDetail, int expectedCode, List<String> expectedMessages) {
if (expectedCode != errorDetail.getCode()) {
return false;
}
List<String> errors = Splitter.on('\n').splitToList(errorDetail.getMessage());
if (errors.size() != expectedMessages.size()) {
return false;
}
for (int i = 0; i < errors.size(); i++) {
if (!errors.get(i).startsWith(expectedMessages.get(i))) {
return false;
}
}
return true;
}
static void setAdsConfig(XdsTestControlPlaneService service, String serverName) {
setAdsConfig(service, serverName, RDS_NAME, CLUSTER_NAME, EDS_NAME, ENDPOINT_HOSTNAME,
ENDPOINT_PORT);
}
static void setAdsConfig(XdsTestControlPlaneService service, String serverName, String rdsName,
String clusterName, String edsName, String endpointHostname,
int endpointPort) {
Listener serverListener = ControlPlaneRule.buildServerListener();
Listener clientListener = ControlPlaneRule.buildClientListener(serverName, serverName, rdsName);
service.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(SERVER_LISTENER, serverListener, serverName, clientListener));
RouteConfiguration routeConfig =
buildRouteConfiguration(serverName, rdsName, clusterName);
service.setXdsConfig(ADS_TYPE_URL_RDS, ImmutableMap.of(rdsName, routeConfig));;
Cluster cluster = ControlPlaneRule.buildCluster(clusterName, edsName);
service.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.<String, Message>of(clusterName, cluster));
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, endpointHostname, endpointPort, edsName);
service.setXdsConfig(ADS_TYPE_URL_EDS,
ImmutableMap.<String, Message>of(edsName, clusterLoadAssignment));
log.log(Level.FINE, String.format("Set ADS config for %s with address %s:%d",
serverName, endpointHostname, endpointPort));
}
static String getEdsNameForCluster(String clusterName) {
return "eds_" + clusterName;
}
static void setAggregateCdsConfig(XdsTestControlPlaneService service, String serverName,
String clusterName, List<String> children) {
Map<String, Message> clusterMap = new HashMap<>();
ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(children).build();
Cluster.CustomClusterType type =
Cluster.CustomClusterType.newBuilder()
.setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(rootConfig))
.build();
Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type);
builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN);
Cluster cluster = builder.build();
clusterMap.put(clusterName, cluster);
for (String child : children) {
Cluster childCluster = ControlPlaneRule.buildCluster(child, getEdsNameForCluster(child));
clusterMap.put(child, childCluster);
}
service.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
Map<String, Message> edsMap = new HashMap<>();
for (String child : children) {
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, getEdsNameForCluster(child));
edsMap.put(getEdsNameForCluster(child), clusterLoadAssignment);
}
service.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
}
static void addAggregateToExistingConfig(XdsTestControlPlaneService service, String rootName,
List<String> children) {
Map<String, Message> clusterMap = new HashMap<>(service.getCurrentConfig(ADS_TYPE_URL_CDS));
if (clusterMap.containsKey(rootName)) {
throw new IllegalArgumentException("Root cluster " + rootName + " already exists");
}
ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(children).build();
Cluster.CustomClusterType type =
Cluster.CustomClusterType.newBuilder()
.setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(rootConfig))
.build();
Cluster.Builder builder = Cluster.newBuilder().setName(rootName).setClusterType(type);
builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN);
Cluster cluster = builder.build();
clusterMap.put(rootName, cluster);
for (String child : children) {
if (clusterMap.containsKey(child)) {
continue;
}
Cluster childCluster = ControlPlaneRule.buildCluster(child, getEdsNameForCluster(child));
clusterMap.put(child, childCluster);
}
service.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
Map<String, Message> edsMap = new HashMap<>(service.getCurrentConfig(ADS_TYPE_URL_EDS));
for (String child : children) {
if (edsMap.containsKey(getEdsNameForCluster(child))) {
continue;
}
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
child, ENDPOINT_HOSTNAME, ENDPOINT_PORT, getEdsNameForCluster(child));
edsMap.put(getEdsNameForCluster(child), clusterLoadAssignment);
}
service.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
}
static XdsConfig getDefaultXdsConfig(String serverHostName)
throws XdsResourceType.ResourceInvalidException, IOException {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
Filter.NamedFilterConfig routerFilterConfig = new Filter.NamedFilterConfig(
serverHostName, RouterFilter.ROUTER_CONFIG);
HttpConnectionManager httpConnectionManager = HttpConnectionManager.forRdsName(
0L, RDS_NAME, Collections.singletonList(routerFilterConfig));
XdsListenerResource.LdsUpdate ldsUpdate =
XdsListenerResource.LdsUpdate.forApiListener(httpConnectionManager);
RouteConfiguration routeConfiguration =
buildRouteConfiguration(serverHostName, RDS_NAME, CLUSTER_NAME);
Bootstrapper.ServerInfo serverInfo = null;
XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, "0", "0", null, null, null);
XdsRouteConfigureResource.RdsUpdate rdsUpdate =
XdsRouteConfigureResource.getInstance().doParse(args, routeConfiguration);
// Take advantage of knowing that there is only 1 virtual host in the route configuration
assertThat(rdsUpdate.virtualHosts).hasSize(1);
VirtualHost virtualHost = rdsUpdate.virtualHosts.get(0);
// Need to create endpoints to create locality endpoints map to create edsUpdate
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
LbEndpoint lbEndpoint =
LbEndpoint.create(serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME);
lbEndpointsMap.put(
Locality.create("", "", ""),
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0));
// Need to create EdsUpdate to create CdsUpdate to create XdsClusterConfig for builder
XdsEndpointResource.EdsUpdate edsUpdate = new XdsEndpointResource.EdsUpdate(
EDS_NAME, lbEndpointsMap, Collections.emptyList());
XdsClusterResource.CdsUpdate cdsUpdate = XdsClusterResource.CdsUpdate.forEds(
CLUSTER_NAME, EDS_NAME, serverInfo, null, null, null)
.lbPolicyConfig(getWrrLbConfigAsMap()).build();
XdsConfig.XdsClusterConfig clusterConfig = new XdsConfig.XdsClusterConfig(
CLUSTER_NAME, cdsUpdate, StatusOr.fromValue(edsUpdate));
builder
.setListener(ldsUpdate)
.setRoute(rdsUpdate)
.setVirtualHost(virtualHost)
.addCluster(CLUSTER_NAME, StatusOr.fromValue(clusterConfig));
return builder.build();
}
@SuppressWarnings("unchecked")
private static ImmutableMap<String, ?> getWrrLbConfigAsMap() throws IOException {
String lbConfigStr = "{\"wrr_locality_experimental\" : "
+ "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}";
return ImmutableMap.copyOf((Map<String, ?>) JsonParser.parse(lbConfigStr));
}
static RouteConfiguration buildRouteConfiguration(String authority, String rdsName,
String clusterName) {
io.envoyproxy.envoy.config.route.v3.VirtualHost.Builder vhBuilder =
io.envoyproxy.envoy.config.route.v3.VirtualHost.newBuilder()
.setName(rdsName)
.addDomains(authority)
.addRoutes(
Route.newBuilder()
.setMatch(
RouteMatch.newBuilder().setPrefix("/").build())
.setRoute(
RouteAction.newBuilder().setCluster(clusterName)
.setAutoHostRewrite(BoolValue.newBuilder().setValue(true).build())
.build()));
io.envoyproxy.envoy.config.route.v3.VirtualHost virtualHost = vhBuilder.build();
return RouteConfiguration.newBuilder().setName(rdsName).addVirtualHosts(virtualHost).build();
}
static Cluster buildAggCluster(String name, List<String> childNames) {
ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(childNames).build();
Cluster.CustomClusterType type =
Cluster.CustomClusterType.newBuilder()
.setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(rootConfig))
.build();
Cluster.Builder builder =
Cluster.newBuilder().setName(name).setClusterType(type);
builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN);
Cluster cluster = builder.build();
return cluster;
}
static void addEdsClusters(Map<String, Message> clusterMap, Map<String, Message> edsMap,
String... clusterNames) {
for (String clusterName : clusterNames) {
String edsName = getEdsNameForCluster(clusterName);
Cluster cluster = ControlPlaneRule.buildCluster(clusterName, edsName);
clusterMap.put(clusterName, cluster);
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
clusterName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, edsName);
edsMap.put(edsName, clusterLoadAssignment);
}
}
static Listener buildInlineClientListener(String rdsName, String clusterName, String serverName) {
HttpFilter
httpFilter = HttpFilter.newBuilder()
.setName(serverName)
.setTypedConfig(Any.pack(Router.newBuilder().build()))
.setIsOptional(true)
.build();
ApiListener.Builder clientListenerBuilder =
ApiListener.newBuilder().setApiListener(Any.pack(
io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3
.HttpConnectionManager.newBuilder()
.setRouteConfig(
buildRouteConfiguration(serverName, rdsName, clusterName))
.addAllHttpFilters(Collections.singletonList(httpFilter))
.build(),
HTTP_CONNECTION_MANAGER_TYPE_URL));
return Listener.newBuilder()
.setName(serverName)
.setApiListener(clientListenerBuilder.build()).build();
}
/**
* Matches a {@link LoadStatsRequest} containing a collection of {@link ClusterStats} with
* the same list of clusterName:clusterServiceName pair.
*/
static class LrsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
private final List<String> expected;
private LrsRequestMatcher(List<String[]> clusterNames) {
expected = new ArrayList<>();
for (String[] pair : clusterNames) {
expected.add(pair[0] + ":" + (pair[1] == null ? "" : pair[1]));
}
Collections.sort(expected);
}
@Override
public boolean matches(LoadStatsRequest argument) {
List<String> actual = new ArrayList<>();
for (ClusterStats clusterStats : argument.getClusterStatsList()) {
actual.add(clusterStats.getClusterName() + ":" + clusterStats.getClusterServiceName());
}
Collections.sort(actual);
return actual.equals(expected);
}
}
static class LrsRpcCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
private final StreamObserver<LoadStatsResponse> responseObserver;
private final InOrder inOrder;
private LrsRpcCall(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
inOrder = inOrder(requestObserver);
}
protected void verifyNextReportClusters(List<String[]> clusters) {
inOrder.verify(requestObserver).onNext(argThat(new LrsRequestMatcher(clusters)));
}
protected void sendResponse(List<String> clusters, long loadReportIntervalNano) {
LoadStatsResponse response =
LoadStatsResponse.newBuilder()
.addAllClusters(clusters)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNano))
.build();
responseObserver.onNext(response);
}
}
static class StatusMatcher implements ArgumentMatcher<Status> {
private final Status expectedStatus;
StatusMatcher(Status expectedStatus) {
this.expectedStatus = expectedStatus;
}
@Override
public boolean matches(Status status) {
return status != null && expectedStatus.getCode().equals(status.getCode())
&& expectedStatus.getDescription().equals(status.getDescription());
}
}
}

View File

@ -34,9 +34,15 @@ import java.util.Map;
import javax.annotation.Nullable;
public class CommonBootstrapperTestUtils {
public static final String SERVER_URI = "trafficdirector.googleapis.com";
private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create();
private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com";
private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com";
public static final String LDS_RESOURCE = "listener.googleapis.com";
public static final String RDS_RESOURCE = "route-configuration.googleapis.com";
public static final String CDS_RESOURCE = "cluster.googleapis.com";
public static final String EDS_RESOURCE = "cluster-load-assignment.googleapis.com";
private static final String FILE_WATCHER_CONFIG = "{\"path\": \"/etc/secret/certs\"}";
private static final String MESHCA_CONFIG =
"{\n"