xds: Use XdsDependencyManager for XdsNameResolver

Contributes to the gRFC A74 effort.
https://github.com/grpc/proposal/blob/master/A74-xds-config-tears.md

The alternative to using Mockito's ArgumentMatcher is to use Hamcrest.
However, Hamcrest did not impress me. ArgumentMatcher is trivial if you
don't care about the error message.

This fixes a pre-existing issue where ConfigSelector.releaseCluster
could revert the LB config back to using cluster manager after releasing
all RPCs using a cluster have committed.

Co-authored-by: Larry Safran <lsafran@google.com>
This commit is contained in:
Eric Anderson 2025-03-18 14:05:01 -07:00 committed by GitHub
parent e388ef3975
commit e80c197455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 886 additions and 657 deletions

View File

@ -0,0 +1,118 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import org.mockito.ArgumentMatcher;
/**
* Mockito matcher for {@link Status}.
*/
public final class StatusMatcher implements ArgumentMatcher<Status> {
public static StatusMatcher statusHasCode(ArgumentMatcher<Status.Code> codeMatcher) {
return new StatusMatcher(codeMatcher, null);
}
public static StatusMatcher statusHasCode(Status.Code code) {
return statusHasCode(new EqualsMatcher<>(code));
}
private final ArgumentMatcher<Status.Code> codeMatcher;
private final ArgumentMatcher<String> descriptionMatcher;
private StatusMatcher(
ArgumentMatcher<Status.Code> codeMatcher,
ArgumentMatcher<String> descriptionMatcher) {
this.codeMatcher = checkNotNull(codeMatcher, "codeMatcher");
this.descriptionMatcher = descriptionMatcher;
}
public StatusMatcher andDescription(ArgumentMatcher<String> descriptionMatcher) {
checkState(this.descriptionMatcher == null, "Already has a description matcher");
return new StatusMatcher(codeMatcher, descriptionMatcher);
}
public StatusMatcher andDescription(String description) {
return andDescription(new EqualsMatcher<>(description));
}
public StatusMatcher andDescriptionContains(String substring) {
return andDescription(new StringContainsMatcher(substring));
}
@Override
public boolean matches(Status status) {
return status != null
&& codeMatcher.matches(status.getCode())
&& (descriptionMatcher == null || descriptionMatcher.matches(status.getDescription()));
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{code=");
sb.append(codeMatcher);
if (descriptionMatcher != null) {
sb.append(", description=");
sb.append(descriptionMatcher);
}
sb.append("}");
return sb.toString();
}
// Use instead of lambda for better error message.
static final class EqualsMatcher<T> implements ArgumentMatcher<T> {
private final T obj;
EqualsMatcher(T obj) {
this.obj = checkNotNull(obj, "obj");
}
@Override
public boolean matches(Object other) {
return obj.equals(other);
}
@Override
public String toString() {
return obj.toString();
}
}
static final class StringContainsMatcher implements ArgumentMatcher<String> {
private final String needle;
StringContainsMatcher(String needle) {
this.needle = checkNotNull(needle, "needle");
}
@Override
public boolean matches(String haystack) {
if (haystack == null) {
return false;
}
return haystack.contains(needle);
}
@Override
public String toString() {
return "contains " + needle;
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import org.mockito.ArgumentMatcher;
/**
* Mockito matcher for {@link StatusOr}.
*/
public final class StatusOrMatcher<T> implements ArgumentMatcher<StatusOr<T>> {
public static <T> StatusOrMatcher<T> hasValue(ArgumentMatcher<T> valueMatcher) {
return new StatusOrMatcher<T>(checkNotNull(valueMatcher, "valueMatcher"), null);
}
public static <T> StatusOrMatcher<T> hasStatus(ArgumentMatcher<Status> statusMatcher) {
return new StatusOrMatcher<T>(null, checkNotNull(statusMatcher, "statusMatcher"));
}
private final ArgumentMatcher<T> valueMatcher;
private final ArgumentMatcher<Status> statusMatcher;
private StatusOrMatcher(ArgumentMatcher<T> valueMatcher, ArgumentMatcher<Status> statusMatcher) {
this.valueMatcher = valueMatcher;
this.statusMatcher = statusMatcher;
}
@Override
public boolean matches(StatusOr<T> statusOr) {
if (statusOr == null) {
return false;
}
if (statusOr.hasValue() != (valueMatcher != null)) {
return false;
}
if (valueMatcher != null) {
return valueMatcher.matches(statusOr.getValue());
} else {
return statusMatcher.matches(statusOr.getStatus());
}
}
@Override
public String toString() {
if (valueMatcher != null) {
return "{value=" + valueMatcher + "}";
} else {
return "{status=" + statusMatcher + "}";
}
}
}

View File

@ -36,6 +36,22 @@ final class XdsAttributes {
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");
/**
* Attribute key for passing around the latest XdsConfig across NameResolver/LoadBalancers.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<XdsConfig> XDS_CONFIG =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig");
/**
* Attribute key for passing around the XdsDependencyManager across NameResolver/LoadBalancers.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<XdsConfig.XdsClusterSubscriptionRegistry>
XDS_CLUSTER_SUBSCRIPT_REGISTRY =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig.XdsClusterSubscriptionRegistry");
/**
* Attribute key for obtaining the global provider that provides atomics for aggregating
* outstanding RPCs sent to each cluster.

View File

@ -26,9 +26,9 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import java.io.Closeable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Represents the xDS configuration tree for a specified Listener.
@ -178,13 +178,22 @@ final class XdsConfig {
public StatusOr<EdsUpdate> getEndpoint() {
return endpoint;
}
@Override
public String toString() {
if (endpoint.hasValue()) {
return "EndpointConfig{endpoint=" + endpoint.getValue() + "}";
} else {
return "EndpointConfig{error=" + endpoint.getStatus() + "}";
}
}
}
// The list of leaf clusters for an aggregate cluster.
static final class AggregateConfig implements ClusterChild {
private final List<String> leafNames;
private final Set<String> leafNames;
public AggregateConfig(List<String> leafNames) {
public AggregateConfig(Set<String> leafNames) {
this.leafNames = checkNotNull(leafNames, "leafNames");
}
@ -234,6 +243,7 @@ final class XdsConfig {
XdsConfig build() {
checkNotNull(listener, "listener");
checkNotNull(route, "route");
checkNotNull(virtualHost, "virtualHost");
return new XdsConfig(listener, route, clusters, virtualHost);
}
}

View File

@ -22,9 +22,9 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate;
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import io.grpc.InternalLogId;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
@ -39,7 +39,6 @@ import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsResourceType;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -47,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@ -67,25 +67,28 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private final InternalLogId logId;
private final XdsLogger logger;
private XdsConfig lastXdsConfig = null;
private StatusOr<XdsConfig> lastUpdate = null;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority,
String listenerName) {
String listenerName, NameResolver.Args nameResolverArgs,
ScheduledExecutorService scheduler) {
logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
logger = XdsLogger.withLogId(logId);
this.xdsClient = checkNotNull(xdsClient, "xdsClient");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
checkNotNull(nameResolverArgs, "nameResolverArgs");
checkNotNull(scheduler, "scheduler");
// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
}
public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource: " + resourceName;
public static String toContextStr(String typeName, String resourceName) {
return typeName + " resource " + resourceName;
}
@Override
@ -225,7 +228,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
switch (cdsUpdate.clusterType()) {
case EDS:
String edsServiceName = cdsUpdate.edsServiceName();
String edsServiceName = root.getEdsServiceName();
EdsWatcher edsWatcher =
(EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
cancelEdsWatcher(edsWatcher, root);
@ -240,7 +243,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
break;
case LOGICAL_DNS:
// no eds needed
// no eds needed, so everything happens in cancelCdsWatcher()
break;
default:
throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
@ -260,54 +263,61 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return;
}
XdsConfig newConfig = buildConfig();
if (Objects.equals(newConfig, lastXdsConfig)) {
StatusOr<XdsConfig> newUpdate = buildUpdate();
if (Objects.equals(newUpdate, lastUpdate)) {
return;
}
lastXdsConfig = newConfig;
xdsConfigWatcher.onUpdate(lastXdsConfig);
assert newUpdate.hasValue()
|| (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
|| newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
lastUpdate = newUpdate;
xdsConfigWatcher.onUpdate(lastUpdate);
}
@VisibleForTesting
XdsConfig buildConfig() {
StatusOr<XdsConfig> buildUpdate() {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
// Iterate watchers and build the XdsConfig
// Will only be 1 listener and 1 route resource
VirtualHost activeVirtualHost = getActiveVirtualHost();
for (XdsWatcherBase<?> xdsWatcherBase :
resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) {
XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue();
RdsUpdateSupplier routeSource = null;
for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
getWatchers(XdsListenerResource.getInstance()).values()) {
if (!ldsWatcher.getData().hasValue()) {
return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
}
XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
builder.setListener(ldsUpdate);
if (activeVirtualHost == null) {
activeVirtualHost = RoutingUtils.findVirtualHostForHostName(
ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority);
}
if (ldsUpdate.httpConnectionManager() != null
&& ldsUpdate.httpConnectionManager().virtualHosts() != null) {
RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts());
builder.setRoute(rdsUpdate);
}
routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
}
resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));
StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
if (!statusOrRdsUpdate.hasValue()) {
return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
}
RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
builder.setRoute(rdsUpdate);
VirtualHost activeVirtualHost =
RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
if (activeVirtualHost == null) {
String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
}
builder.setVirtualHost(activeVirtualHost);
Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
resourceWatchers.get(ENDPOINT_RESOURCE).watchers;
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(CLUSTER_RESOURCE).watchers;
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
getWatchers(ENDPOINT_RESOURCE);
Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
getWatchers(CLUSTER_RESOURCE);
// Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
List<String> topLevelClusters =
cdsWatchers.values().stream()
.filter(XdsDependencyManager::isTopLevelCluster)
.map(w -> w.resourceName())
.map(XdsWatcherBase<?>::resourceName)
.distinct()
.collect(Collectors.toList());
// Flatten multi-level aggregates into lists of leaf clusters
@ -316,43 +326,60 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
addLeavesToBuilder(builder, edsWatchers, leafNames);
return builder.build();
return StatusOr.fromValue(builder.build());
}
private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder,
Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
Set<String> leafNames) {
private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
XdsResourceType<T> resourceType) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
if (typeWatchers == null) {
return Collections.emptyMap();
}
assert typeWatchers.resourceType == resourceType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
return tTypeWatchers.watchers;
}
private void addLeavesToBuilder(
XdsConfig.XdsConfigBuilder builder,
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
Set<String> leafNames) {
for (String clusterName : leafNames) {
CdsWatcher cdsWatcher = getCluster(clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
if (cdsUpdateOr.hasValue()) {
XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
if (cdsUpdate.clusterType() == ClusterType.EDS) {
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
if (edsWatcher != null) {
EndpointConfig child = new EndpointConfig(edsWatcher.getData());
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"EDS resource not found for cluster " + clusterName)));
}
} else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
// TODO get the resolved endpoint configuration
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null))));
}
} else {
if (!cdsUpdateOr.hasValue()) {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
continue;
}
XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
if (cdsUpdate.clusterType() == ClusterType.EDS) {
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
edsWatchers.get(cdsWatcher.getEdsServiceName());
EndpointConfig child;
if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData());
} else {
child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
"EDS resource not found for cluster " + clusterName)));
}
builder.addCluster(clusterName, StatusOr.fromValue(
new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
} else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
builder.addCluster(clusterName, StatusOr.fromStatus(
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
}
}
}
// Adds the top-level clusters to the builder and returns the leaf cluster names
private Set<String> addTopLevelClustersToBuilder(
XdsConfig.XdsConfigBuilder builder, Map<String, ? extends XdsWatcherBase<?>> edsWatchers,
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers, List<String> topLevelClusters) {
XdsConfig.XdsConfigBuilder builder,
Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
List<String> topLevelClusters) {
Set<String> leafClusterNames = new HashSet<>();
for (String clusterName : topLevelClusters) {
@ -367,23 +394,25 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
XdsConfig.XdsClusterConfig.ClusterChild child;
switch (cdsUpdate.clusterType()) {
case AGGREGATE:
List<String> leafNames = getLeafNames(cdsUpdate);
Set<String> leafNames = new HashSet<>();
addLeafNames(leafNames, cdsUpdate);
child = new AggregateConfig(leafNames);
leafClusterNames.addAll(leafNames);
break;
case EDS:
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName());
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
edsWatchers.get(cdsWatcher.getEdsServiceName());
if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData());
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
"EDS resource not found for cluster " + clusterName)));
continue;
}
break;
case LOGICAL_DNS:
// TODO get the resolved endpoint configuration
child = new EndpointConfig(null);
child = new EndpointConfig(StatusOr.fromStatus(
Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
break;
default:
throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
@ -395,29 +424,26 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return leafClusterNames;
}
private List<String> getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) {
List<String> childNames = new ArrayList<>();
private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
for (String cluster : cdsUpdate.prioritizedClusterNames()) {
if (leafNames.contains(cluster)) {
continue;
}
StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
if (data == null || !data.hasValue() || data.getValue() == null) {
childNames.add(cluster);
leafNames.add(cluster);
continue;
}
if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
childNames.addAll(getLeafNames(data.getValue()));
addLeafNames(leafNames, data.getValue());
} else {
childNames.add(cluster);
leafNames.add(cluster);
}
}
return childNames;
}
private static boolean isTopLevelCluster(XdsWatcherBase<?> cdsWatcher) {
if (! (cdsWatcher instanceof CdsWatcher)) {
return false;
}
private static boolean isTopLevelCluster(
XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
.anyMatch(depth -> depth == 1);
}
@ -454,17 +480,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
VirtualHost oldVirtualHost, boolean sameParentContext) {
List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
VirtualHost oldVirtualHost =
RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
VirtualHost virtualHost =
RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
if (virtualHost == null) {
String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
logger.log(XdsLogger.XdsLogLevel.WARNING, error);
cleanUpRoutes();
xdsConfigWatcher.onError(
"xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error));
return;
}
Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
@ -482,6 +502,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
}
private String nodeInfo() {
return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
}
private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
if (virtualHost == null) {
return Collections.emptySet();
@ -506,46 +530,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
return clusters;
}
@Nullable
private VirtualHost getActiveVirtualHost() {
TypeWatchers<?> rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsWatchers == null) {
return null;
}
RdsWatcher activeRdsWatcher =
(RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null);
if (activeRdsWatcher == null || activeRdsWatcher.missingResult()
|| !activeRdsWatcher.getData().hasValue()) {
return null;
}
return RoutingUtils.findVirtualHostForHostName(
activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority);
}
// Must be in SyncContext
private void cleanUpRoutes() {
// Remove RdsWatcher & CDS Watchers
TypeWatchers<?> rdsResourceWatcher =
resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) {
return;
}
XdsWatcherBase<?> watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get();
cancelWatcher(watcher);
// Remove CdsWatchers pointed to by the RdsWatcher
RdsWatcher rdsWatcher = (RdsWatcher) watcher;
for (String cName : rdsWatcher.getCdsNames()) {
CdsWatcher cdsWatcher = getCluster(cName);
if (cdsWatcher != null) {
cancelClusterWatcherTree(cdsWatcher, rdsWatcher);
}
}
}
private CdsWatcher getCluster(String clusterName) {
return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
}
@ -565,16 +549,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
public interface XdsConfigWatcher {
void onUpdate(XdsConfig config);
// These 2 methods are invoked when there is an error or
// does-not-exist on LDS or RDS only. The context will be a
// human-readable string indicating the scope in which the error
// occurred (e.g., the resource type and name).
void onError(String resourceContext, Status status);
void onResourceDoesNotExist(String resourceContext);
/**
* An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
* INTERNAL.
*/
void onUpdate(StatusOr<XdsConfig> config);
}
private class ClusterSubscription implements Closeable {
@ -594,7 +573,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
}
private abstract static class XdsWatcherBase<T extends ResourceUpdate>
private abstract class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T> {
private final XdsResourceType<T> type;
private final String resourceName;
@ -612,12 +591,25 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onError(Status error) {
checkNotNull(error, "error");
setDataAsStatus(error);
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
setDataAsStatus(Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s: %s",
toContextString(), error.getCode(), error.getDescription())));
maybePublishConfig();
}
}
protected void handleDoesNotExist(String resourceName) {
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
}
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString()));
setDataAsStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo()));
maybePublishConfig();
}
boolean missingResult() {
@ -647,12 +639,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
this.data = StatusOr.fromStatus(status);
}
String toContextString() {
public String toContextString() {
return toContextStr(type.typeName(), resourceName);
}
}
private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate> {
private interface RdsUpdateSupplier {
StatusOr<RdsUpdate> getRdsUpdate();
}
private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
implements RdsUpdateSupplier {
String rdsName;
private LdsWatcher(String resourceName) {
@ -664,9 +661,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
checkNotNull(update, "update");
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
String rdsName = httpConnectionManager.rdsName();
VirtualHost activeVirtualHost = getActiveVirtualHost();
List<VirtualHost> virtualHosts;
String rdsName;
if (httpConnectionManager == null) {
// TCP listener. Unsupported config
virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
rdsName = null;
} else {
virtualHosts = httpConnectionManager.virtualHosts();
rdsName = httpConnectionManager.rdsName();
}
StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
? activeRdsUpdate.getValue().virtualHosts
: Collections.emptyList();
boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
if (changedRdsName) {
@ -675,10 +683,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
if (virtualHosts != null) {
// No RDS watcher since we are getting RDS updates via LDS
updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null);
updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
this.rdsName = null;
} else if (changedRdsName) {
cleanUpRdsWatcher();
this.rdsName = rdsName;
addWatcher(new RdsWatcher(rdsName));
logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
@ -688,20 +695,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
maybePublishConfig();
}
@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
xdsConfigWatcher.onError(toContextString(), error);
}
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
}
handleDoesNotExist(resourceName);
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
checkArgument(resourceName().equals(resourceName), "Resource name does not match");
setDataAsStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo()));
cleanUpRdsWatcher();
rdsName = null;
maybePublishConfig();
}
private void cleanUpRdsWatcher() {
@ -711,8 +716,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
// Cleanup clusters (as appropriate) that had the old rds watcher as a parent
if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue()
|| resourceWatchers.get(CLUSTER_RESOURCE) == null) {
if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
return;
}
for (XdsWatcherBase<?> watcher :
@ -723,16 +727,58 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
}
private RdsWatcher getRdsWatcher() {
TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) {
if (rdsName == null) {
return null;
}
TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
if (watchers == null) {
return null;
}
return (RdsWatcher) watchers.watchers.get(rdsName);
}
public RdsUpdateSupplier getRouteSource() {
if (!hasDataValue()) {
return this;
}
HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
if (hcm == null) {
return this;
}
List<VirtualHost> virtualHosts = hcm.virtualHosts();
if (virtualHosts != null) {
return this;
}
RdsWatcher rdsWatcher = getRdsWatcher();
assert rdsWatcher != null;
return rdsWatcher;
}
@Override
public StatusOr<RdsUpdate> getRdsUpdate() {
if (missingResult()) {
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
}
if (!getData().hasValue()) {
return StatusOr.fromStatus(getData().getStatus());
}
HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
if (hcm == null) {
return StatusOr.fromStatus(
Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
}
List<VirtualHost> virtualHosts = hcm.virtualHosts();
if (virtualHosts == null) {
// Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
// be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
// bug
return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
}
return StatusOr.fromValue(new RdsUpdate(virtualHosts));
}
}
private class RdsWatcher extends XdsWatcherBase<RdsUpdate> {
private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
public RdsWatcher(String resourceName) {
super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
@ -741,37 +787,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onChanged(RdsUpdate update) {
checkNotNull(update, "update");
RdsUpdate oldData = hasDataValue() ? getData().getValue() : null;
VirtualHost oldVirtualHost =
(oldData != null)
? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority)
: null;
List<VirtualHost> oldVirtualHosts = hasDataValue()
? getData().getValue().virtualHosts
: Collections.emptyList();
setData(update);
updateRoutes(update.virtualHosts, this, oldVirtualHost, true);
updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
maybePublishConfig();
}
@Override
public void onError(Status error) {
super.onError(checkNotNull(error, "error"));
xdsConfigWatcher.onError(toContextString(), error);
}
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
public StatusOr<RdsUpdate> getRdsUpdate() {
if (missingResult()) {
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
}
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}
ImmutableList<String> getCdsNames() {
if (!hasDataValue() || getData().getValue().virtualHosts == null) {
return ImmutableList.of();
}
return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost()));
return getData();
}
}
@ -789,7 +818,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
switch (update.clusterType()) {
case EDS:
setData(update);
if (!addEdsWatcher(update.edsServiceName(), this)) {
if (!addEdsWatcher(getEdsServiceName(), this)) {
maybePublishConfig();
}
break;
@ -805,14 +834,15 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
logger.log(XdsLogger.XdsLogLevel.WARNING,
"Cluster recursion depth limit exceeded for cluster {0}", resourceName());
Status error = Status.UNAVAILABLE.withDescription(
"aggregate cluster graph exceeds max depth");
"aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
setDataAsStatus(error);
}
if (hasDataValue()) {
Set<String> oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames());
Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
? new HashSet<>(getData().getValue().prioritizedClusterNames())
: Collections.emptySet();
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
deletedClusters.forEach((cluster)
-> cancelClusterWatcherTree(getCluster(cluster), parentContext));
@ -838,19 +868,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
break;
default:
Status error = Status.UNAVAILABLE.withDescription(
"aggregate cluster graph exceeds max depth");
"aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
setDataAsStatus(error);
maybePublishConfig();
}
}
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
public String getEdsServiceName() {
XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
assert cdsUpdate.clusterType() == ClusterType.EDS;
String edsServiceName = cdsUpdate.edsServiceName();
if (edsServiceName == null) {
edsServiceName = cdsUpdate.clusterName();
}
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
maybePublishConfig();
return edsServiceName;
}
}
@ -868,15 +899,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
maybePublishConfig();
}
@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
}
handleDoesNotExist(checkNotNull(resourceName, "resourceName"));
maybePublishConfig();
}
void addParentContext(CdsWatcher parentContext) {
parentContexts.add(checkNotNull(parentContext, "parentContext"));
}

View File

@ -45,6 +45,7 @@ import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
@ -60,11 +61,9 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
@ -127,6 +126,7 @@ final class XdsNameResolver extends NameResolver {
private final ConfigSelector configSelector = new ConfigSelector();
private final long randomChannelId;
private final MetricRecorder metricRecorder;
private final Args nameResolverArgs;
// Must be accessed in syncContext.
// Filter instances are unique per channel, and per filter (name+typeUrl).
// NamedFilterConfig.filterStateKey -> filter_instance.
@ -138,20 +138,17 @@ final class XdsNameResolver extends NameResolver {
private XdsClient xdsClient;
private CallCounterProvider callCounterProvider;
private ResolveState resolveState;
// Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
// XdsClient instead of here.
private boolean receivedConfig;
XdsNameResolver(
URI targetUri, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride,
MetricRecorder metricRecorder) {
MetricRecorder metricRecorder, Args nameResolverArgs) {
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
metricRecorder);
metricRecorder, nameResolverArgs);
}
@VisibleForTesting
@ -161,7 +158,7 @@ final class XdsNameResolver extends NameResolver {
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
MetricRecorder metricRecorder) {
MetricRecorder metricRecorder, Args nameResolverArgs) {
this.targetAuthority = targetAuthority;
target = targetUri.toString();
@ -180,6 +177,8 @@ final class XdsNameResolver extends NameResolver {
this.random = checkNotNull(random, "random");
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
this.metricRecorder = metricRecorder;
this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
randomChannelId = random.nextLong();
logId = InternalLogId.allocate("xds-resolver", name);
logger = XdsLogger.withLogId(logId);
@ -228,9 +227,8 @@ final class XdsNameResolver extends NameResolver {
}
ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
callCounterProvider = SharedCallCounterMap.getInstance();
resolveState = new ResolveState(ldsResourceName);
resolveState.start();
resolveState = new ResolveState(ldsResourceName); // auto starts
}
private static String expandPercentS(String template, String replacement) {
@ -241,7 +239,7 @@ final class XdsNameResolver extends NameResolver {
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
if (resolveState != null) {
resolveState.stop();
resolveState.shutdown();
}
if (xdsClient != null) {
xdsClient = xdsClientPool.returnObject(xdsClient);
@ -290,7 +288,7 @@ final class XdsNameResolver extends NameResolver {
}
// called in syncContext
private void updateResolutionResult() {
private void updateResolutionResult(XdsConfig xdsConfig) {
syncContext.throwIfNotInThisSynchronizationContext();
ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
@ -312,6 +310,8 @@ final class XdsNameResolver extends NameResolver {
Attributes attrs =
Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.set(XdsAttributes.XDS_CONFIG, xdsConfig)
.set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, resolveState.xdsDependencyManager)
.set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
.set(InternalConfigSelector.KEY, configSelector)
.build();
@ -321,7 +321,6 @@ final class XdsNameResolver extends NameResolver {
.setServiceConfig(parsedServiceConfig)
.build();
listener.onResult2(result);
receivedConfig = true;
}
/**
@ -540,7 +539,11 @@ final class XdsNameResolver extends NameResolver {
public void run() {
if (clusterRefs.get(cluster).refCount.get() == 0) {
clusterRefs.remove(cluster);
updateResolutionResult();
if (resolveState.lastConfigOrStatus.hasValue()) {
updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
} else {
resolveState.cleanUpRoutes(resolveState.lastConfigOrStatus.getStatus());
}
}
}
});
@ -629,82 +632,56 @@ final class XdsNameResolver extends NameResolver {
return "cluster_specifier_plugin:" + pluginName;
}
private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
class ResolveState implements XdsDependencyManager.XdsConfigWatcher {
private final ConfigOrError emptyServiceConfig =
serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
private final String ldsResourceName;
private final String authority;
private final XdsDependencyManager xdsDependencyManager;
private boolean stopped;
@Nullable
private Set<String> existingClusters; // clusters to which new requests can be routed
@Nullable
private RouteDiscoveryState routeDiscoveryState;
private StatusOr<XdsConfig> lastConfigOrStatus;
ResolveState(String ldsResourceName) {
this.ldsResourceName = ldsResourceName;
private ResolveState(String ldsResourceName) {
authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
xdsDependencyManager =
new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName,
nameResolverArgs, scheduler);
}
@Override
public void onChanged(final XdsListenerResource.LdsUpdate update) {
private void shutdown() {
if (stopped) {
return;
}
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
String rdsName = httpConnectionManager.rdsName();
stopped = true;
xdsDependencyManager.shutdown();
updateActiveFilters(null);
}
@Override
public void onUpdate(StatusOr<XdsConfig> updateOrStatus) {
if (stopped) {
return;
}
logger.log(XdsLogLevel.INFO, "Receive XDS resource update: {0}", updateOrStatus);
lastConfigOrStatus = updateOrStatus;
if (!updateOrStatus.hasValue()) {
updateActiveFilters(null);
cleanUpRoutes(updateOrStatus.getStatus());
return;
}
// Process Route
XdsConfig update = updateOrStatus.getValue();
HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager();
VirtualHost virtualHost = update.getVirtualHost();
ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
// Create/update HCM-bound state.
cleanUpRouteDiscoveryState();
updateActiveFilters(filterConfigs);
// Routes specified directly in LDS.
if (virtualHosts != null) {
updateRoutes(virtualHosts, streamDurationNano, filterConfigs);
return;
}
// Routes provided by RDS.
routeDiscoveryState = new RouteDiscoveryState(rdsName, streamDurationNano, filterConfigs);
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
rdsName, routeDiscoveryState, syncContext);
}
@Override
public void onError(final Status error) {
if (stopped || receivedConfig) {
return;
}
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
String.format("Unable to load LDS %s. xDS server returned: %s: %s",
ldsResourceName, error.getCode(), error.getDescription())));
}
@Override
public void onResourceDoesNotExist(final String resourceName) {
if (stopped) {
return;
}
String error = "LDS resource does not exist: " + resourceName;
logger.log(XdsLogLevel.INFO, error);
cleanUpRouteDiscoveryState();
updateActiveFilters(null);
cleanUpRoutes(error);
}
private void start() {
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
ldsResourceName, this, syncContext);
}
private void stop() {
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
stopped = true;
cleanUpRouteDiscoveryState();
updateActiveFilters(null);
xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
updateRoutes(update, virtualHost, streamDurationNano, filterConfigs);
}
// called in syncContext
@ -732,18 +709,11 @@ final class XdsNameResolver extends NameResolver {
}
}
// called in syncContext
private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
private void updateRoutes(
XdsConfig xdsConfig,
@Nullable VirtualHost virtualHost,
long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
if (virtualHost == null) {
String error = "Failed to find virtual host matching hostname: " + authority;
logger.log(XdsLogLevel.WARNING, error);
cleanUpRoutes(error);
return;
}
List<Route> routes = virtualHost.routes();
ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
@ -826,7 +796,7 @@ final class XdsNameResolver extends NameResolver {
}
// Update service config to include newly added clusters.
if (shouldUpdateResult && routingConfig != null) {
updateResolutionResult();
updateResolutionResult(xdsConfig);
shouldUpdateResult = false;
}
// Make newly added clusters selectable by config selector and deleted clusters no longer
@ -840,7 +810,7 @@ final class XdsNameResolver extends NameResolver {
}
}
if (shouldUpdateResult) {
updateResolutionResult();
updateResolutionResult(xdsConfig);
}
}
@ -882,10 +852,8 @@ final class XdsNameResolver extends NameResolver {
return combineInterceptors(filterInterceptors.build());
}
private void cleanUpRoutes(String error) {
String errorWithNodeId =
error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
routingConfig = new RoutingConfig(Status.UNAVAILABLE.withDescription(errorWithNodeId));
private void cleanUpRoutes(Status error) {
routingConfig = new RoutingConfig(error);
if (existingClusters != null) {
for (String cluster : existingClusters) {
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
@ -904,64 +872,6 @@ final class XdsNameResolver extends NameResolver {
.build())
.setServiceConfig(emptyServiceConfig)
.build());
receivedConfig = true;
}
private void cleanUpRouteDiscoveryState() {
if (routeDiscoveryState != null) {
String rdsName = routeDiscoveryState.resourceName;
logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
routeDiscoveryState);
routeDiscoveryState = null;
}
}
/**
* Discovery state for RouteConfiguration resource. One instance for each Listener resource
* update.
*/
private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
private final String resourceName;
private final long httpMaxStreamDurationNano;
@Nullable
private final List<NamedFilterConfig> filterConfigs;
private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
this.resourceName = resourceName;
this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
this.filterConfigs = filterConfigs;
}
@Override
public void onChanged(final RdsUpdate update) {
if (RouteDiscoveryState.this != routeDiscoveryState) {
return;
}
logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
}
@Override
public void onError(final Status error) {
if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
return;
}
listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
String.format("Unable to load RDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription())));
}
@Override
public void onResourceDoesNotExist(final String resourceName) {
if (RouteDiscoveryState.this != routeDiscoveryState) {
return;
}
String error = "RDS resource does not exist: " + resourceName;
logger.log(XdsLogLevel.INFO, error);
cleanUpRoutes(error);
}
}
}

View File

@ -82,7 +82,7 @@ public final class XdsNameResolverProvider extends NameResolverProvider {
args.getServiceConfigParser(), args.getSynchronizationContext(),
args.getScheduledExecutorService(),
bootstrapOverride,
args.getMetricRecorder());
args.getMetricRecorder(), args);
}
return null;
}

View File

@ -17,6 +17,7 @@
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.StatusMatcher.statusHasCode;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
@ -32,7 +33,6 @@ import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -47,24 +47,26 @@ import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.grpc.BindableService;
import io.grpc.ChannelLogger;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.StatusOrMatcher;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
import java.io.IOException;
@ -77,7 +79,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@ -108,7 +110,9 @@ public class XdsDependencyManagerTest {
private XdsClientMetricReporter xdsClientMetricReporter;
private final SynchronizationContext syncContext =
new SynchronizationContext(mock(Thread.UncaughtExceptionHandler.class));
new SynchronizationContext((t, e) -> {
throw new AssertionError(e);
});
private ManagedChannel channel;
private XdsClientImpl xdsClient;
@ -133,9 +137,17 @@ public class XdsDependencyManagerTest {
private XdsConfig defaultXdsConfig; // set in setUp()
@Captor
private ArgumentCaptor<XdsConfig> xdsConfigCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
private ArgumentCaptor<StatusOr<XdsConfig>> xdsUpdateCaptor;
private final NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder()
.setDefaultPort(8080)
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class))
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.build();
private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
@Before
public void setUp() throws Exception {
@ -180,36 +192,36 @@ public class XdsDependencyManagerTest {
@Test
public void verify_basic_config() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
testWatcher.verifyStats(1, 0, 0);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
testWatcher.verifyStats(1, 0);
}
@Test
public void verify_config_update() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
testWatcher.verifyStats(1, 0, 0);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
testWatcher.verifyStats(1, 0);
assertThat(testWatcher.lastConfig).isEqualTo(defaultXdsConfig);
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2",
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT + 2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(ArgumentMatchers.notNull());
testWatcher.verifyStats(2, 0, 0);
testWatcher.verifyStats(2, 0);
assertThat(testWatcher.lastConfig).isNotEqualTo(defaultXdsConfig);
}
@Test
public void verify_simple_aggregate() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
String rootName = "root_c";
@ -226,14 +238,14 @@ public class XdsDependencyManagerTest {
testWatcher.lastConfig.getClusters();
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
StatusOr<XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
CdsUpdate rootUpdate = rootC.getValue().getClusterResource();
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);
for (String childName : childNames) {
assertThat(lastConfigClusters).containsKey(childName);
StatusOr<XdsClusterConfig> childConfigOr = lastConfigClusters.get(childName);
XdsClusterResource.CdsUpdate childResource =
CdsUpdate childResource =
childConfigOr.getValue().getClusterResource();
assertThat(childResource.clusterType()).isEqualTo(EDS);
assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName));
@ -266,54 +278,57 @@ public class XdsDependencyManagerTest {
List<String> childNames2 = Arrays.asList("clusterA", "clusterX");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
Closeable subscription2 = xdsDependencyManager.subscribeToCluster(rootName2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
testWatcher.verifyStats(3, 0, 0);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
testWatcher.verifyStats(3, 0);
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
Set<String> expectedClusters = builder.add(rootName1).add(rootName2).add(CLUSTER_NAME)
.addAll(childNames).addAll(childNames2).build();
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters);
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet())
.isEqualTo(expectedClusters);
// Close 1 subscription shouldn't affect the other or RDS subscriptions
subscription1.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
builder = ImmutableSet.builder();
Set<String> expectedClusters2 =
builder.add(rootName2).add(CLUSTER_NAME).addAll(childNames2).build();
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters2);
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet())
.isEqualTo(expectedClusters2);
subscription2.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
}
@Test
public void testDelayedSubscription() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
String rootName1 = "root_c";
Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1);
assertThat(subscription1).isNotNull();
fakeClock.forwardTime(16, TimeUnit.SECONDS);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).toString()).isEqualTo(
StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString());
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
Status status = xdsUpdateCaptor.getValue().getValue().getClusters().get(rootName1).getStatus();
assertThat(status.getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(status.getDescription()).contains(rootName1);
List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames);
inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue();
inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture());
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().get(rootName1).hasValue())
.isTrue();
}
@Test
@ -342,109 +357,99 @@ public class XdsDependencyManagerTest {
edsMap.put("garbageEds", clusterLoadAssignment);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
List<StatusOr<XdsClusterConfig>> returnedClusters = new ArrayList<>();
for (String childName : childNames) {
returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName));
returnedClusters.add(xdsUpdateCaptor.getValue().getValue().getClusters().get(childName));
}
// Check that missing cluster reported Status and the other 2 are present
Status expectedClusterStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2)));
StatusOr<XdsClusterConfig> missingCluster = returnedClusters.get(2);
assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString());
assertThat(missingCluster.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(missingCluster.getStatus().getDescription()).contains(childNames.get(2));
assertThat(returnedClusters.get(0).hasValue()).isTrue();
assertThat(returnedClusters.get(1).hasValue()).isTrue();
// Check that missing EDS reported Status, the other one is present and the garbage EDS is not
Status expectedEdsStatus = Status.UNAVAILABLE.withDescription(
"No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1));
assertThat(getEndpoint(returnedClusters.get(0)).hasValue()).isTrue();
assertThat(getEndpoint(returnedClusters.get(1)).hasValue()).isFalse();
assertThat(getEndpoint(returnedClusters.get(1)).getStatus().toString())
.isEqualTo(expectedEdsStatus.toString());
assertThat(getEndpoint(returnedClusters.get(1)).getStatus().getCode())
.isEqualTo(Status.Code.UNAVAILABLE);
assertThat(getEndpoint(returnedClusters.get(1)).getStatus().getDescription())
.contains(XdsTestUtils.EDS_NAME + 1);
verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any());
testWatcher.verifyStats(1, 0, 0);
verify(xdsConfigWatcher, never()).onUpdate(
argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE))));
testWatcher.verifyStats(1, 0);
}
@Test
public void testMissingLds() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, "badLdsName");
String ldsName = "badLdsName";
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, ldsName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist(
toContextStr(XdsListenerResource.getInstance().typeName(), "badLdsName"));
verify(xdsConfigWatcher, timeout(1000)).onUpdate(
argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE)
.andDescriptionContains(ldsName))));
testWatcher.verifyStats(0, 0, 1);
testWatcher.verifyStats(0, 1);
}
@Test
public void testTcpListenerErrors() {
Listener serverListener =
ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener));
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(
argThat(StatusOrMatcher.hasStatus(
statusHasCode(Status.Code.UNAVAILABLE).andDescriptionContains("Not an API listener"))));
testWatcher.verifyStats(0, 1);
}
@Test
public void testMissingRds() {
Listener serverListener = ControlPlaneRule.buildServerListener();
Listener clientListener =
ControlPlaneRule.buildClientListener(serverName, serverName, "badRdsName");
String rdsName = "badRdsName";
Listener clientListener = ControlPlaneRule.buildClientListener(serverName, serverName, rdsName);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
ImmutableMap.of(serverName, clientListener));
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
fakeClock.forwardTime(16, TimeUnit.SECONDS);
verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist(
toContextStr(XdsRouteConfigureResource.getInstance().typeName(), "badRdsName"));
verify(xdsConfigWatcher, timeout(1000)).onUpdate(
argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE)
.andDescriptionContains(rdsName))));
testWatcher.verifyStats(0, 0, 1);
testWatcher.verifyStats(0, 1);
}
@Test
public void testUpdateToMissingVirtualHost() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
WrappedXdsClient wrappedXdsClient = new WrappedXdsClient(xdsClient, syncContext);
xdsDependencyManager = new XdsDependencyManager(
wrappedXdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
RouteConfiguration routeConfig = XdsTestUtils.buildRouteConfiguration(
"wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME);
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
// Update with a config that has a virtual host that doesn't match the server name
wrappedXdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts());
inOrder.verify(xdsConfigWatcher, timeout(1000)).onError(any(), statusCaptor.capture());
assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Failed to find virtual host matching hostname: " + serverName);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
assertThat(xdsUpdateCaptor.getValue().getStatus().getDescription())
.contains("Failed to find virtual host matching hostname: " + serverName);
testWatcher.verifyStats(1, 1, 0);
wrappedXdsClient.shutdown();
}
private List<io.grpc.xds.VirtualHost> buildUnmatchedVirtualHosts() {
io.grpc.xds.VirtualHost.Route route1 =
io.grpc.xds.VirtualHost.Route.forAction(
io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/GreetService/bye"),
io.grpc.xds.VirtualHost.Route.RouteAction.forCluster(
"cluster-bar.googleapis.com", Collections.emptyList(),
TimeUnit.SECONDS.toNanos(15L), null, false), ImmutableMap.of());
io.grpc.xds.VirtualHost.Route route2 =
io.grpc.xds.VirtualHost.Route.forAction(
io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/HelloService/hi"),
io.grpc.xds.VirtualHost.Route.RouteAction.forCluster(
"cluster-foo.googleapis.com", Collections.emptyList(),
TimeUnit.SECONDS.toNanos(15L), null, false),
ImmutableMap.of());
return Arrays.asList(
io.grpc.xds.VirtualHost.create("virtualhost-foo", Collections.singletonList("hello"
+ ".googleapis.com"),
Collections.singletonList(route1),
ImmutableMap.of()),
io.grpc.xds.VirtualHost.create("virtualhost-bar", Collections.singletonList("hi"
+ ".googleapis.com"),
Collections.singletonList(route2),
ImmutableMap.of()));
testWatcher.verifyStats(0, 1);
}
@Test
@ -452,37 +457,32 @@ public class XdsDependencyManagerTest {
String ldsResourceName =
"xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1";
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, ldsResourceName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, ldsResourceName, nameResolverArgs, scheduler);
Status expectedStatus = Status.INVALID_ARGUMENT.withDescription(
"Wrong configuration: xds server does not exist for resource " + ldsResourceName);
String context = toContextStr(XdsListenerResource.getInstance().typeName(), ldsResourceName);
verify(xdsConfigWatcher, timeout(1000))
.onError(eq(context), argThat(new XdsTestUtils.StatusMatcher(expectedStatus)));
verify(xdsConfigWatcher, timeout(1000)).onUpdate(
argThat(StatusOrMatcher.hasStatus(
statusHasCode(Status.Code.UNAVAILABLE).andDescriptionContains(ldsResourceName))));
fakeClock.forwardTime(16, TimeUnit.SECONDS);
testWatcher.verifyStats(0, 1, 0);
testWatcher.verifyStats(0, 1);
}
@Test
public void testChangeRdsName_fromLds() {
// TODO implement
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
Listener serverListener = ControlPlaneRule.buildServerListener();
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig));
String newRdsName = "newRdsName1";
Listener clientListener = buildInlineClientListener(newRdsName, CLUSTER_NAME);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue()).isNotEqualTo(defaultXdsConfig);
assertThat(xdsConfigCaptor.getValue().getVirtualHost().name()).isEqualTo(newRdsName);
ImmutableMap.of(serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
assertThat(xdsUpdateCaptor.getValue().getValue()).isNotEqualTo(defaultXdsConfig);
assertThat(xdsUpdateCaptor.getValue().getValue().getVirtualHost().name()).isEqualTo(newRdsName);
}
@Test
@ -527,22 +527,22 @@ public class XdsDependencyManagerTest {
// Start the actual test
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig initialConfig = xdsConfigCaptor.getValue();
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
// Make sure that adding subscriptions that rds points at doesn't change the config
Closeable rootSub = xdsDependencyManager.subscribeToCluster("root");
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig);
Closeable clusterAB11Sub = xdsDependencyManager.subscribeToCluster("clusterAB11");
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig);
// Make sure that closing subscriptions that rds points at doesn't change the config
rootSub.close();
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig);
clusterAB11Sub.close();
assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig);
assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig);
// Make an explicit root subscription and then change RDS to point to A11
rootSub = xdsDependencyManager.subscribeToCluster("root");
@ -550,13 +550,14 @@ public class XdsDependencyManagerTest {
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA11");
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(4);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet().size()).isEqualTo(4);
// Now that it is released, we should only have A11
rootSub.close();
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).containsExactly("clusterA11");
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet())
.containsExactly("clusterA11");
}
@Test
@ -587,10 +588,10 @@ public class XdsDependencyManagerTest {
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
// Start the actual test
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig initialConfig = xdsConfigCaptor.getValue();
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue();
assertThat(initialConfig.getClusters().keySet())
.containsExactly("root", "clusterA", "clusterB");
@ -605,8 +606,8 @@ public class XdsDependencyManagerTest {
@Test
public void testChangeRdsName_FromLds_complexTree() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
// Create the same tree as in testMultipleParentsInCdsTree
Cluster rootCluster =
@ -639,11 +640,10 @@ public class XdsDependencyManagerTest {
// Do the test
String newRdsName = "newRdsName1";
Listener clientListener = buildInlineClientListener(newRdsName, "root");
Listener serverListener = ControlPlaneRule.buildServerListener();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture());
XdsConfig config = xdsConfigCaptor.getValue();
ImmutableMap.of(serverName, clientListener));
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
XdsConfig config = xdsUpdateCaptor.getValue().getValue();
assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName);
assertThat(config.getClusters().size()).isEqualTo(4);
}
@ -652,9 +652,9 @@ public class XdsDependencyManagerTest {
public void testChangeAggCluster() {
InOrder inOrder = Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any());
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
// Setup initial config A -> A1 -> (A11, A12)
Cluster rootCluster =
@ -673,9 +673,8 @@ public class XdsDependencyManagerTest {
XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterA12");
Listener clientListener = buildInlineClientListener(RDS_NAME, "root");
Listener serverListener = ControlPlaneRule.buildServerListener();
controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS,
ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener));
ImmutableMap.of(serverName, clientListener));
controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);
controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
@ -702,96 +701,50 @@ public class XdsDependencyManagerTest {
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(argThat(nameMatcher));
}
private Listener buildInlineClientListener(String rdsName, String clusterName) {
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
@Test
public void testCdsError() throws IOException {
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME,
Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build()));
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture());
Status status = xdsUpdateCaptor.getValue().getValue()
.getClusters().get(CLUSTER_NAME).getStatus();
assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME);
}
private static String toContextStr(String type, String resourceName) {
return type + " resource: " + resourceName;
private Listener buildInlineClientListener(String rdsName, String clusterName) {
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
}
private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher {
XdsConfig lastConfig;
int numUpdates = 0;
int numError = 0;
int numDoesNotExist = 0;
@Override
public void onUpdate(XdsConfig config) {
log.fine("Config changed: " + config);
lastConfig = config;
numUpdates++;
}
@Override
public void onError(String resourceContext, Status status) {
log.fine(String.format("Error %s for %s: ", status, resourceContext));
numError++;
}
@Override
public void onResourceDoesNotExist(String resourceName) {
log.fine("Resource does not exist: " + resourceName);
numDoesNotExist++;
public void onUpdate(StatusOr<XdsConfig> update) {
log.fine("Config update: " + update);
if (update.hasValue()) {
lastConfig = update.getValue();
numUpdates++;
} else {
numError++;
}
}
private List<Integer> getStats() {
return Arrays.asList(numUpdates, numError, numDoesNotExist);
return Arrays.asList(numUpdates, numError);
}
private void verifyStats(int updt, int err, int notExist) {
assertThat(getStats()).isEqualTo(Arrays.asList(updt, err, notExist));
private void verifyStats(int updt, int err) {
assertThat(getStats()).isEqualTo(Arrays.asList(updt, err));
}
}
private static class WrappedXdsClient extends XdsClient {
private final XdsClient delegate;
private final SynchronizationContext syncContext;
private ResourceWatcher<LdsUpdate> ldsWatcher;
WrappedXdsClient(XdsClient delegate, SynchronizationContext syncContext) {
this.delegate = delegate;
this.syncContext = syncContext;
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
@SuppressWarnings("unchecked")
public <T extends ResourceUpdate> void watchXdsResource(
XdsResourceType<T> type, String resourceName, ResourceWatcher<T> watcher,
Executor executor) {
if (type.equals(XdsListenerResource.getInstance())) {
ldsWatcher = (ResourceWatcher<LdsUpdate>) watcher;
}
delegate.watchXdsResource(type, resourceName, watcher, executor);
}
@Override
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
delegate.cancelXdsResourceWatch(type, resourceName, watcher);
}
void deliverLdsUpdate(long httpMaxStreamDurationNano,
List<io.grpc.xds.VirtualHost> virtualHosts) {
syncContext.execute(() -> {
LdsUpdate ldsUpdate = LdsUpdate.forApiListener(
io.grpc.xds.HttpConnectionManager.forVirtualHosts(
httpMaxStreamDurationNano, virtualHosts, null));
ldsWatcher.onChanged(ldsUpdate);
});
}
}
static class ClusterNameMatcher implements ArgumentMatcher<XdsConfig> {
static class ClusterNameMatcher implements ArgumentMatcher<StatusOr<XdsConfig>> {
private final List<String> expectedNames;
ClusterNameMatcher(List<String> expectedNames) {
@ -799,7 +752,11 @@ public class XdsDependencyManagerTest {
}
@Override
public boolean matches(XdsConfig xdsConfig) {
public boolean matches(StatusOr<XdsConfig> update) {
if (!update.hasValue()) {
return false;
}
XdsConfig xdsConfig = update.getValue();
if (xdsConfig == null || xdsConfig.getClusters() == null) {
return false;
}

View File

@ -46,6 +46,7 @@ import com.google.protobuf.util.Durations;
import com.google.re2j.Pattern;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelLogger;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
@ -70,6 +71,7 @@ import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ObjectPool;
@ -89,6 +91,8 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
@ -104,6 +108,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -187,6 +192,15 @@ public class XdsNameResolverTest {
private TestCall<?, ?> testCall;
private boolean originalEnableTimeout;
private URI targetUri;
private final NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder()
.setDefaultPort(8080)
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class))
.setScheduledExecutorService(fakeClock.getScheduledExecutorService())
.build();
@Before
public void setUp() {
@ -213,7 +227,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder);
xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder, nameResolverArgs);
}
@After
@ -259,7 +273,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
@ -273,7 +287,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri,
"notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
@ -295,7 +309,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext,
scheduler, xdsClientPoolFactory,
mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder);
mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs);
resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class));
}
@ -316,7 +330,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class));
}
@ -337,7 +351,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(
targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
// The Service Authority must be URL encoded, but unlike the LDS resource name.
@ -366,7 +380,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri,
"xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
verify(mockListener, never()).onError(any(Status.class));
}
@ -399,7 +413,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null,
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
// use different ldsResourceName and service authority. The virtualhost lookup should use
// service authority.
expectedLdsResourceName = "test-" + expectedLdsResourceName;
@ -413,6 +427,7 @@ public class XdsNameResolverTest {
Collections.singletonList(route1),
ImmutableMap.of());
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1),
@ -429,6 +444,7 @@ public class XdsNameResolverTest {
Collections.singletonList(route2),
ImmutableMap.of());
xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster2);
// Two new service config updates triggered:
// - with load balancing config being able to select cluster1 and cluster2
// - with load balancing config being able to select cluster2 only
@ -467,6 +483,7 @@ public class XdsNameResolverTest {
Collections.singletonList(route),
ImmutableMap.of());
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1),
@ -483,6 +500,7 @@ public class XdsNameResolverTest {
verifyNoInteractions(mockListener);
assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME);
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1),
@ -506,6 +524,7 @@ public class XdsNameResolverTest {
Collections.singletonList(route),
ImmutableMap.of());
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1),
@ -529,11 +548,15 @@ public class XdsNameResolverTest {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable"));
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
verify(mockListener).onResult2(resolutionResultCaptor.capture());
InternalConfigSelector configSelector = resolutionResultCaptor.getValue()
.getAttributes().get(InternalConfigSelector.KEY);
Result selectResult = configSelector.selectConfig(
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Status error = selectResult.getStatus();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY
+ ". xDS server returned: UNAVAILABLE: server unreachable");
assertThat(error.getDescription()).contains(AUTHORITY);
assertThat(error.getDescription()).contains("UNAVAILABLE: server unreachable");
}
@Test
@ -541,11 +564,15 @@ public class XdsNameResolverTest {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverError(Status.NOT_FOUND.withDescription("server unreachable"));
verify(mockListener).onError(errorCaptor.capture());
Status error = errorCaptor.getValue();
verify(mockListener).onResult2(resolutionResultCaptor.capture());
InternalConfigSelector configSelector = resolutionResultCaptor.getValue()
.getAttributes().get(InternalConfigSelector.KEY);
Result selectResult = configSelector.selectConfig(
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Status error = selectResult.getStatus();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY
+ ". xDS server returned: NOT_FOUND: server unreachable");
assertThat(error.getDescription()).contains(AUTHORITY);
assertThat(error.getDescription()).contains("NOT_FOUND: server unreachable");
assertThat(error.getCause()).isNull();
}
@ -555,15 +582,15 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME);
xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable"));
verify(mockListener, times(2)).onError(errorCaptor.capture());
Status error = errorCaptor.getAllValues().get(0);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
InternalConfigSelector configSelector = resolutionResultCaptor.getValue()
.getAttributes().get(InternalConfigSelector.KEY);
Result selectResult = configSelector.selectConfig(
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Status error = selectResult.getStatus();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY
+ ". xDS server returned: UNAVAILABLE: server unreachable");
error = errorCaptor.getAllValues().get(1);
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("Unable to load RDS " + RDS_RESOURCE_NAME
+ ". xDS server returned: UNAVAILABLE: server unreachable");
assertThat(error.getDescription()).contains(RDS_RESOURCE_NAME);
assertThat(error.getDescription()).contains("UNAVAILABLE: server unreachable");
}
@SuppressWarnings("unchecked")
@ -581,10 +608,11 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random",
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, cluster1);
verify(mockListener).onResult2(resolutionResultCaptor.capture());
assertServiceConfigForLoadBalancingConfig(
Collections.singletonList(cluster1),
@ -605,10 +633,11 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random",
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost));
xdsClient.deliverLdsUpdateOnly(0L, Arrays.asList(virtualHost));
fakeClock.forwardTime(15, TimeUnit.SECONDS);
assertEmptyResolutionResult("random");
}
@ -617,7 +646,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY,
serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts());
@ -702,7 +731,7 @@ public class XdsNameResolverTest {
true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first"));
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext,
scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
RetryPolicy retryPolicy = RetryPolicy.create(
@ -913,7 +942,7 @@ public class XdsNameResolverTest {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler,
xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
@ -946,7 +975,7 @@ public class XdsNameResolverTest {
public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom,
FilterRegistry.getDefaultRegistry(), null, metricRecorder);
FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
@ -977,7 +1006,7 @@ public class XdsNameResolverTest {
public void resolved_routeActionNoAutoHostRewrite_doesntEmitCallOptionForTheSame() {
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom,
FilterRegistry.getDefaultRegistry(), null, metricRecorder);
FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
@ -1190,6 +1219,20 @@ public class XdsNameResolverTest {
assertCallSelectClusterResult(call1, configSelector, cluster1, 20.0);
}
/** Creates and delivers both CDS and EDS updates for the given clusters. */
private static void createAndDeliverClusterUpdates(
FakeXdsClient xdsClient, String... clusterNames) {
for (String clusterName : clusterNames) {
CdsUpdate.Builder forEds = CdsUpdate
.forEds(clusterName, clusterName, null, null, null, null, false)
.roundRobinLbPolicy();
xdsClient.deliverCdsUpdate(clusterName, forEds.build());
EdsUpdate edsUpdate = new EdsUpdate(clusterName,
XdsTestUtils.createMinimalLbEndpointsMap("host"), Collections.emptyList());
xdsClient.deliverEdsUpdate(clusterName, edsUpdate);
}
}
@Test
public void resolved_simpleCallSucceeds_routeToRls() {
when(mockRandom.nextInt(anyInt())).thenReturn(90, 10);
@ -1305,6 +1348,7 @@ public class XdsNameResolverTest {
// LDS 1.
xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
// Verify that StatefulFilter with different filter names result in different Filter instances.
@ -1359,7 +1403,7 @@ public class XdsNameResolverTest {
* Verifies the lifecycle of HCM filter instances across RDS updates.
*
* <p>Filter instances:
* 1. Must have instantiated by the initial LDS.
* 1. Must have instantiated by the initial LDS/RDS.
* 2. Must be reused by all subsequent RDS updates.
* 3. Must be not shutdown (closed) by valid RDS updates.
*/
@ -1371,22 +1415,19 @@ public class XdsNameResolverTest {
// LDS 1.
xdsClient.deliverLdsUpdateForRdsNameWithFilters(RDS_RESOURCE_NAME,
filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
// Verify that StatefulFilter with different filter names result in different Filter instances.
assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2);
// Naming: lds<LDS#>Filter<name#>
StatefulFilter lds1Filter1 = lds1Snapshot.get(0);
StatefulFilter lds1Filter2 = lds1Snapshot.get(1);
assertThat(lds1Filter1).isNotSameInstanceAs(lds1Filter2);
// RDS 1.
VirtualHost vhost1 = filterStateTestVhost();
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, vhost1);
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
// Initial RDS update should not generate Filter instances.
ImmutableList<StatefulFilter> rds1Snapshot = statefulFilterProvider.getAllInstances();
assertWithMessage("RDS 1: Expected Filter instances to be reused across RDS route updates")
.that(rds1Snapshot).isEqualTo(lds1Snapshot);
// Verify that StatefulFilter with different filter names result in different Filter instances.
assertWithMessage("RDS 1: expected to create filter instances").that(rds1Snapshot).hasSize(2);
// Naming: lds<LDS#>Filter<name#>
StatefulFilter lds1Filter1 = rds1Snapshot.get(0);
StatefulFilter lds1Filter2 = rds1Snapshot.get(1);
assertThat(lds1Filter1).isNotSameInstanceAs(lds1Filter2);
// RDS 2: exactly the same as RDS 1.
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, vhost1);
@ -1394,7 +1435,7 @@ public class XdsNameResolverTest {
ImmutableList<StatefulFilter> rds2Snapshot = statefulFilterProvider.getAllInstances();
// Neither should any subsequent RDS updates.
assertWithMessage("RDS 2: Expected Filter instances to be reused across RDS route updates")
.that(rds2Snapshot).isEqualTo(lds1Snapshot);
.that(rds2Snapshot).isEqualTo(rds1Snapshot);
// RDS 3: Contains a per-route override for STATEFUL_1.
VirtualHost vhost3 = filterStateTestVhost(ImmutableMap.of(
@ -1406,7 +1447,7 @@ public class XdsNameResolverTest {
// As with any other Route update, typed_per_filter_config overrides should not result in
// creating new filter instances.
assertWithMessage("RDS 3: Expected Filter instances to be reused on per-route filter overrides")
.that(rds3Snapshot).isEqualTo(lds1Snapshot);
.that(rds3Snapshot).isEqualTo(rds1Snapshot);
}
/**
@ -1427,7 +1468,7 @@ public class XdsNameResolverTest {
.register(statefulFilterProvider, altStatefulFilterProvider, ROUTER_FILTER_PROVIDER);
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom, filterRegistry, null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
@ -1435,6 +1476,7 @@ public class XdsNameResolverTest {
// LDS 1.
xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
ImmutableList<StatefulFilter> lds1SnapshotAlt = altStatefulFilterProvider.getAllInstances();
@ -1483,6 +1525,7 @@ public class XdsNameResolverTest {
// LDS 1.
xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2);
@ -1510,6 +1553,7 @@ public class XdsNameResolverTest {
// LDS 1.
xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2);
@ -1526,33 +1570,32 @@ public class XdsNameResolverTest {
}
/**
* Verifies that filter instances are NOT shutdown on RDS_RESOURCE_NAME not found.
* Verifies that all filter instances are shutdown (closed) on RDS resource not found.
*/
@Test
public void filterState_shutdown_noShutdownOnRdsNotFound() {
public void filterState_shutdown_onRdsNotFound() {
StatefulFilter.Provider statefulFilterProvider = filterStateTestSetupResolver();
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
// LDS 1.
xdsClient.deliverLdsUpdateForRdsNameWithFilters(RDS_RESOURCE_NAME,
filterStateTestConfigs(STATEFUL_1, STATEFUL_2));
ImmutableList<StatefulFilter> lds1Snapshot = statefulFilterProvider.getAllInstances();
assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2);
// Naming: lds<LDS#>Filter<name#>
StatefulFilter lds1Filter1 = lds1Snapshot.get(0);
StatefulFilter lds1Filter2 = lds1Snapshot.get(1);
// RDS 1: Standard vhost with a route.
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, filterStateTestVhost());
createAndDeliverClusterUpdates(xdsClient, cluster1);
assertClusterResolutionResult(call1, cluster1);
assertThat(statefulFilterProvider.getAllInstances()).isEqualTo(lds1Snapshot);
ImmutableList<StatefulFilter> rds1Snapshot = statefulFilterProvider.getAllInstances();
assertWithMessage("RDS 1: expected to create filter instances").that(rds1Snapshot).hasSize(2);
// Naming: lds<LDS#>Filter<name#>
StatefulFilter lds1Filter1 = rds1Snapshot.get(0);
StatefulFilter lds1Filter2 = rds1Snapshot.get(1);
// RDS 2: RDS_RESOURCE_NAME not found.
reset(mockListener);
xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME);
assertEmptyResolutionResult(RDS_RESOURCE_NAME);
assertThat(lds1Filter1.isShutdown()).isFalse();
assertThat(lds1Filter2.isShutdown()).isFalse();
assertThat(lds1Filter1.isShutdown()).isTrue();
assertThat(lds1Filter2.isShutdown()).isTrue();
}
private StatefulFilter.Provider filterStateTestSetupResolver() {
@ -1561,7 +1604,7 @@ public class XdsNameResolverTest {
.register(statefulFilterProvider, ROUTER_FILTER_PROVIDER);
resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser,
syncContext, scheduler, xdsClientPoolFactory, mockRandom, filterRegistry, null,
metricRecorder);
metricRecorder, nameResolverArgs);
resolver.start(mockListener);
return statefulFilterProvider;
}
@ -1762,6 +1805,7 @@ public class XdsNameResolverTest {
ImmutableList.of(route1, route2, route3),
ImmutableMap.of());
xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost));
createAndDeliverClusterUpdates(xdsClient, "cluster-foo", "cluster-bar", "cluster-baz");
verify(mockListener).onResult2(resolutionResultCaptor.capture());
String expectedServiceConfigJson =
@ -2385,6 +2429,8 @@ public class XdsNameResolverTest {
private String rdsResource;
private ResourceWatcher<LdsUpdate> ldsWatcher;
private ResourceWatcher<RdsUpdate> rdsWatcher;
private final Map<String, List<ResourceWatcher<CdsUpdate>>> cdsWatchers = new HashMap<>();
private final Map<String, List<ResourceWatcher<EdsUpdate>>> edsWatchers = new HashMap<>();
@Override
public BootstrapInfo getBootstrapInfo() {
@ -2412,10 +2458,19 @@ public class XdsNameResolverTest {
rdsResource = resourceName;
rdsWatcher = (ResourceWatcher<RdsUpdate>) watcher;
break;
case "CDS":
cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
.add((ResourceWatcher<CdsUpdate>) watcher);
break;
case "EDS":
edsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
.add((ResourceWatcher<EdsUpdate>) watcher);
break;
default:
}
}
@SuppressWarnings("unchecked")
@Override
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
@ -2434,25 +2489,53 @@ public class XdsNameResolverTest {
rdsResource = null;
rdsWatcher = null;
break;
case "CDS":
assertThat(cdsWatchers).containsKey(resourceName);
assertThat(cdsWatchers.get(resourceName)).contains(watcher);
cdsWatchers.get(resourceName).remove((ResourceWatcher<CdsUpdate>) watcher);
break;
case "EDS":
assertThat(edsWatchers).containsKey(resourceName);
assertThat(edsWatchers.get(resourceName)).contains(watcher);
edsWatchers.get(resourceName).remove((ResourceWatcher<EdsUpdate>) watcher);
break;
default:
}
}
void deliverLdsUpdate(long httpMaxStreamDurationNano, List<VirtualHost> virtualHosts) {
void deliverLdsUpdateOnly(long httpMaxStreamDurationNano, List<VirtualHost> virtualHosts) {
syncContext.execute(() -> {
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
httpMaxStreamDurationNano, virtualHosts, null)));
});
}
void deliverLdsUpdate(long httpMaxStreamDurationNano, List<VirtualHost> virtualHosts) {
List<String> clusterNames = new ArrayList<>();
for (VirtualHost vh : virtualHosts) {
clusterNames.addAll(getClusterNames(vh.routes()));
}
syncContext.execute(() -> {
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
httpMaxStreamDurationNano, virtualHosts, null)));
createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0]));
});
}
void deliverLdsUpdate(final List<Route> routes) {
VirtualHost virtualHost =
VirtualHost.create(
"virtual-host", Collections.singletonList(expectedLdsResourceName), routes,
ImmutableMap.of());
List<String> clusterNames = getClusterNames(routes);
syncContext.execute(() -> {
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), null)));
if (!clusterNames.isEmpty()) {
createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0]));
}
});
}
@ -2508,6 +2591,7 @@ public class XdsNameResolverTest {
syncContext.execute(() -> {
ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts(
0L, Collections.singletonList(virtualHost), filterChain)));
createAndDeliverClusterUpdates(this, cluster);
});
}
@ -2545,6 +2629,29 @@ public class XdsNameResolverTest {
});
}
private List<String> getClusterNames(List<Route> routes) {
List<String> clusterNames = new ArrayList<>();
for (Route r : routes) {
if (r.routeAction() == null) {
continue;
}
String cluster = r.routeAction().cluster();
if (cluster != null) {
clusterNames.add(cluster);
} else {
List<ClusterWeight> weightedClusters = r.routeAction().weightedClusters();
if (weightedClusters == null) {
continue;
}
for (ClusterWeight wc : weightedClusters) {
clusterNames.add(wc.name());
}
}
}
return clusterNames;
}
void deliverRdsUpdateWithFaultInjection(
String resourceName, @Nullable FaultConfig virtualHostFaultConfig,
@Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) {
@ -2581,6 +2688,7 @@ public class XdsNameResolverTest {
overrideConfig);
syncContext.execute(() -> {
rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost)));
createAndDeliverClusterUpdates(this, cluster1);
});
}
@ -2606,6 +2714,29 @@ public class XdsNameResolverTest {
});
}
private void deliverCdsUpdate(String clusterName, CdsUpdate update) {
if (!cdsWatchers.containsKey(clusterName)) {
return;
}
syncContext.execute(() -> {
List<ResourceWatcher<CdsUpdate>> resourceWatchers =
ImmutableList.copyOf(cdsWatchers.get(clusterName));
resourceWatchers.forEach(w -> w.onChanged(update));
});
}
private void deliverEdsUpdate(String name, EdsUpdate update) {
syncContext.execute(() -> {
if (!edsWatchers.containsKey(name)) {
return;
}
List<ResourceWatcher<EdsUpdate>> resourceWatchers =
ImmutableList.copyOf(edsWatchers.get(name));
resourceWatchers.forEach(w -> w.onChanged(update));
});
}
void deliverError(final Status error) {
if (ldsWatcher != null) {
syncContext.execute(() -> {
@ -2617,6 +2748,11 @@ public class XdsNameResolverTest {
rdsWatcher.onError(error);
});
}
syncContext.execute(() -> {
cdsWatchers.values().stream()
.flatMap(List::stream)
.forEach(w -> w.onError(error));
});
}
}

View File

@ -51,7 +51,6 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.internal.JsonParser;
import io.grpc.stub.StreamObserver;
@ -281,6 +280,16 @@ public class XdsTestUtils {
return builder.build();
}
static Map<Locality, LocalityLbEndpoints> createMinimalLbEndpointsMap(String serverHostName) {
Map<Locality, LocalityLbEndpoints> lbEndpointsMap = new HashMap<>();
LbEndpoint lbEndpoint = LbEndpoint.create(
serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of());
lbEndpointsMap.put(
Locality.create("", "", ""),
LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of()));
return lbEndpointsMap;
}
@SuppressWarnings("unchecked")
private static ImmutableMap<String, ?> getWrrLbConfigAsMap() throws IOException {
String lbConfigStr = "{\"wrr_locality_experimental\" : "
@ -353,7 +362,6 @@ public class XdsTestUtils {
return Listener.newBuilder()
.setName(serverName)
.setApiListener(clientListenerBuilder.build()).build();
}
/**
@ -407,18 +415,4 @@ public class XdsTestUtils {
responseObserver.onNext(response);
}
}
static class StatusMatcher implements ArgumentMatcher<Status> {
private final Status expectedStatus;
StatusMatcher(Status expectedStatus) {
this.expectedStatus = expectedStatus;
}
@Override
public boolean matches(Status status) {
return status != null && expectedStatus.getCode().equals(status.getCode())
&& expectedStatus.getDescription().equals(status.getDescription());
}
}
}