diff --git a/api/src/testFixtures/java/io/grpc/StatusMatcher.java b/api/src/testFixtures/java/io/grpc/StatusMatcher.java new file mode 100644 index 0000000000..f464b2d709 --- /dev/null +++ b/api/src/testFixtures/java/io/grpc/StatusMatcher.java @@ -0,0 +1,118 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.mockito.ArgumentMatcher; + +/** + * Mockito matcher for {@link Status}. + */ +public final class StatusMatcher implements ArgumentMatcher { + public static StatusMatcher statusHasCode(ArgumentMatcher codeMatcher) { + return new StatusMatcher(codeMatcher, null); + } + + public static StatusMatcher statusHasCode(Status.Code code) { + return statusHasCode(new EqualsMatcher<>(code)); + } + + private final ArgumentMatcher codeMatcher; + private final ArgumentMatcher descriptionMatcher; + + private StatusMatcher( + ArgumentMatcher codeMatcher, + ArgumentMatcher descriptionMatcher) { + this.codeMatcher = checkNotNull(codeMatcher, "codeMatcher"); + this.descriptionMatcher = descriptionMatcher; + } + + public StatusMatcher andDescription(ArgumentMatcher descriptionMatcher) { + checkState(this.descriptionMatcher == null, "Already has a description matcher"); + return new StatusMatcher(codeMatcher, descriptionMatcher); + } + + public StatusMatcher andDescription(String description) { + return andDescription(new EqualsMatcher<>(description)); + } + + public StatusMatcher andDescriptionContains(String substring) { + return andDescription(new StringContainsMatcher(substring)); + } + + @Override + public boolean matches(Status status) { + return status != null + && codeMatcher.matches(status.getCode()) + && (descriptionMatcher == null || descriptionMatcher.matches(status.getDescription())); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{code="); + sb.append(codeMatcher); + if (descriptionMatcher != null) { + sb.append(", description="); + sb.append(descriptionMatcher); + } + sb.append("}"); + return sb.toString(); + } + + // Use instead of lambda for better error message. + static final class EqualsMatcher implements ArgumentMatcher { + private final T obj; + + EqualsMatcher(T obj) { + this.obj = checkNotNull(obj, "obj"); + } + + @Override + public boolean matches(Object other) { + return obj.equals(other); + } + + @Override + public String toString() { + return obj.toString(); + } + } + + static final class StringContainsMatcher implements ArgumentMatcher { + private final String needle; + + StringContainsMatcher(String needle) { + this.needle = checkNotNull(needle, "needle"); + } + + @Override + public boolean matches(String haystack) { + if (haystack == null) { + return false; + } + return haystack.contains(needle); + } + + @Override + public String toString() { + return "contains " + needle; + } + } +} diff --git a/api/src/testFixtures/java/io/grpc/StatusOrMatcher.java b/api/src/testFixtures/java/io/grpc/StatusOrMatcher.java new file mode 100644 index 0000000000..1e70ae9785 --- /dev/null +++ b/api/src/testFixtures/java/io/grpc/StatusOrMatcher.java @@ -0,0 +1,66 @@ +/* + * Copyright 2025 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.mockito.ArgumentMatcher; + +/** + * Mockito matcher for {@link StatusOr}. + */ +public final class StatusOrMatcher implements ArgumentMatcher> { + public static StatusOrMatcher hasValue(ArgumentMatcher valueMatcher) { + return new StatusOrMatcher(checkNotNull(valueMatcher, "valueMatcher"), null); + } + + public static StatusOrMatcher hasStatus(ArgumentMatcher statusMatcher) { + return new StatusOrMatcher(null, checkNotNull(statusMatcher, "statusMatcher")); + } + + private final ArgumentMatcher valueMatcher; + private final ArgumentMatcher statusMatcher; + + private StatusOrMatcher(ArgumentMatcher valueMatcher, ArgumentMatcher statusMatcher) { + this.valueMatcher = valueMatcher; + this.statusMatcher = statusMatcher; + } + + @Override + public boolean matches(StatusOr statusOr) { + if (statusOr == null) { + return false; + } + if (statusOr.hasValue() != (valueMatcher != null)) { + return false; + } + if (valueMatcher != null) { + return valueMatcher.matches(statusOr.getValue()); + } else { + return statusMatcher.matches(statusOr.getStatus()); + } + } + + @Override + public String toString() { + if (valueMatcher != null) { + return "{value=" + valueMatcher + "}"; + } else { + return "{status=" + statusMatcher + "}"; + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsAttributes.java b/xds/src/main/java/io/grpc/xds/XdsAttributes.java index af8139d8ff..4a64fdb145 100644 --- a/xds/src/main/java/io/grpc/xds/XdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/XdsAttributes.java @@ -36,6 +36,22 @@ final class XdsAttributes { static final Attributes.Key> XDS_CLIENT_POOL = Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool"); + /** + * Attribute key for passing around the latest XdsConfig across NameResolver/LoadBalancers. + */ + @NameResolver.ResolutionResultAttr + static final Attributes.Key XDS_CONFIG = + Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig"); + + + /** + * Attribute key for passing around the XdsDependencyManager across NameResolver/LoadBalancers. + */ + @NameResolver.ResolutionResultAttr + static final Attributes.Key + XDS_CLUSTER_SUBSCRIPT_REGISTRY = + Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsConfig.XdsClusterSubscriptionRegistry"); + /** * Attribute key for obtaining the global provider that provides atomics for aggregating * outstanding RPCs sent to each cluster. diff --git a/xds/src/main/java/io/grpc/xds/XdsConfig.java b/xds/src/main/java/io/grpc/xds/XdsConfig.java index 7af03caf4b..ec8f3dc076 100644 --- a/xds/src/main/java/io/grpc/xds/XdsConfig.java +++ b/xds/src/main/java/io/grpc/xds/XdsConfig.java @@ -26,9 +26,9 @@ import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import java.io.Closeable; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Represents the xDS configuration tree for a specified Listener. @@ -178,13 +178,22 @@ final class XdsConfig { public StatusOr getEndpoint() { return endpoint; } + + @Override + public String toString() { + if (endpoint.hasValue()) { + return "EndpointConfig{endpoint=" + endpoint.getValue() + "}"; + } else { + return "EndpointConfig{error=" + endpoint.getStatus() + "}"; + } + } } // The list of leaf clusters for an aggregate cluster. static final class AggregateConfig implements ClusterChild { - private final List leafNames; + private final Set leafNames; - public AggregateConfig(List leafNames) { + public AggregateConfig(Set leafNames) { this.leafNames = checkNotNull(leafNames, "leafNames"); } @@ -234,6 +243,7 @@ final class XdsConfig { XdsConfig build() { checkNotNull(listener, "listener"); checkNotNull(route, "route"); + checkNotNull(virtualHost, "virtualHost"); return new XdsConfig(listener, route, clusters, virtualHost); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 7eff0c549e..8cd3119727 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -22,9 +22,9 @@ import static io.grpc.xds.client.XdsClient.ResourceUpdate; import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import io.grpc.InternalLogId; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.SynchronizationContext; @@ -39,7 +39,6 @@ import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsResourceType; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -47,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -67,25 +67,28 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi private final InternalLogId logId; private final XdsLogger logger; - private XdsConfig lastXdsConfig = null; + private StatusOr lastUpdate = null; private final Map, TypeWatchers> resourceWatchers = new HashMap<>(); XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher, SynchronizationContext syncContext, String dataPlaneAuthority, - String listenerName) { + String listenerName, NameResolver.Args nameResolverArgs, + ScheduledExecutorService scheduler) { logId = InternalLogId.allocate("xds-dependency-manager", listenerName); logger = XdsLogger.withLogId(logId); this.xdsClient = checkNotNull(xdsClient, "xdsClient"); this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority"); + checkNotNull(nameResolverArgs, "nameResolverArgs"); + checkNotNull(scheduler, "scheduler"); // start the ball rolling syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName))); } - public static String toContextStr(String typeName, String resourceName) { - return typeName + " resource: " + resourceName; + public static String toContextStr(String typeName, String resourceName) { + return typeName + " resource " + resourceName; } @Override @@ -225,7 +228,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue(); switch (cdsUpdate.clusterType()) { case EDS: - String edsServiceName = cdsUpdate.edsServiceName(); + String edsServiceName = root.getEdsServiceName(); EdsWatcher edsWatcher = (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName); cancelEdsWatcher(edsWatcher, root); @@ -240,7 +243,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } break; case LOGICAL_DNS: - // no eds needed + // no eds needed, so everything happens in cancelCdsWatcher() break; default: throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType()); @@ -260,54 +263,61 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return; } - XdsConfig newConfig = buildConfig(); - if (Objects.equals(newConfig, lastXdsConfig)) { + StatusOr newUpdate = buildUpdate(); + if (Objects.equals(newUpdate, lastUpdate)) { return; } - lastXdsConfig = newConfig; - xdsConfigWatcher.onUpdate(lastXdsConfig); + assert newUpdate.hasValue() + || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE + || newUpdate.getStatus().getCode() == Status.Code.INTERNAL); + lastUpdate = newUpdate; + xdsConfigWatcher.onUpdate(lastUpdate); } @VisibleForTesting - XdsConfig buildConfig() { + StatusOr buildUpdate() { XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder(); // Iterate watchers and build the XdsConfig // Will only be 1 listener and 1 route resource - VirtualHost activeVirtualHost = getActiveVirtualHost(); - for (XdsWatcherBase xdsWatcherBase : - resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values()) { - XdsListenerResource.LdsUpdate ldsUpdate = ((LdsWatcher) xdsWatcherBase).getData().getValue(); + RdsUpdateSupplier routeSource = null; + for (XdsWatcherBase ldsWatcher : + getWatchers(XdsListenerResource.getInstance()).values()) { + if (!ldsWatcher.getData().hasValue()) { + return StatusOr.fromStatus(ldsWatcher.getData().getStatus()); + } + XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue(); builder.setListener(ldsUpdate); - if (activeVirtualHost == null) { - activeVirtualHost = RoutingUtils.findVirtualHostForHostName( - ldsUpdate.httpConnectionManager().virtualHosts(), dataPlaneAuthority); - } - - if (ldsUpdate.httpConnectionManager() != null - && ldsUpdate.httpConnectionManager().virtualHosts() != null) { - RdsUpdate rdsUpdate = new RdsUpdate(ldsUpdate.httpConnectionManager().virtualHosts()); - builder.setRoute(rdsUpdate); - } + routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(); } - resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream() - .map(watcher -> (RdsWatcher) watcher) - .forEach(watcher -> builder.setRoute(watcher.getData().getValue())); + StatusOr statusOrRdsUpdate = routeSource.getRdsUpdate(); + if (!statusOrRdsUpdate.hasValue()) { + return StatusOr.fromStatus(statusOrRdsUpdate.getStatus()); + } + RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue(); + builder.setRoute(rdsUpdate); + VirtualHost activeVirtualHost = + RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority); + if (activeVirtualHost == null) { + String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority; + return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error)); + } builder.setVirtualHost(activeVirtualHost); - Map> edsWatchers = - resourceWatchers.get(ENDPOINT_RESOURCE).watchers; - Map> cdsWatchers = - resourceWatchers.get(CLUSTER_RESOURCE).watchers; + Map> edsWatchers = + getWatchers(ENDPOINT_RESOURCE); + Map> cdsWatchers = + getWatchers(CLUSTER_RESOURCE); // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters List topLevelClusters = cdsWatchers.values().stream() .filter(XdsDependencyManager::isTopLevelCluster) - .map(w -> w.resourceName()) + .map(XdsWatcherBase::resourceName) + .distinct() .collect(Collectors.toList()); // Flatten multi-level aggregates into lists of leaf clusters @@ -316,43 +326,60 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi addLeavesToBuilder(builder, edsWatchers, leafNames); - return builder.build(); + return StatusOr.fromValue(builder.build()); } - private void addLeavesToBuilder(XdsConfig.XdsConfigBuilder builder, - Map> edsWatchers, - Set leafNames) { + private Map> getWatchers( + XdsResourceType resourceType) { + TypeWatchers typeWatchers = resourceWatchers.get(resourceType); + if (typeWatchers == null) { + return Collections.emptyMap(); + } + assert typeWatchers.resourceType == resourceType; + @SuppressWarnings("unchecked") + TypeWatchers tTypeWatchers = (TypeWatchers) typeWatchers; + return tTypeWatchers.watchers; + } + + private void addLeavesToBuilder( + XdsConfig.XdsConfigBuilder builder, + Map> edsWatchers, + Set leafNames) { for (String clusterName : leafNames) { CdsWatcher cdsWatcher = getCluster(clusterName); StatusOr cdsUpdateOr = cdsWatcher.getData(); - if (cdsUpdateOr.hasValue()) { - XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); - if (cdsUpdate.clusterType() == ClusterType.EDS) { - EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); - if (edsWatcher != null) { - EndpointConfig child = new EndpointConfig(edsWatcher.getData()); - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); - } else { - builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( - "EDS resource not found for cluster " + clusterName))); - } - } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { - // TODO get the resolved endpoint configuration - builder.addCluster(clusterName, StatusOr.fromValue( - new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, new EndpointConfig(null)))); - } - } else { + if (!cdsUpdateOr.hasValue()) { builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus())); + continue; + } + + XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue(); + if (cdsUpdate.clusterType() == ClusterType.EDS) { + XdsWatcherBase edsWatcher = + edsWatchers.get(cdsWatcher.getEdsServiceName()); + EndpointConfig child; + if (edsWatcher != null) { + child = new EndpointConfig(edsWatcher.getData()); + } else { + child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( + "EDS resource not found for cluster " + clusterName))); + } + builder.addCluster(clusterName, StatusOr.fromValue( + new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child))); + } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) { + builder.addCluster(clusterName, StatusOr.fromStatus( + Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); } } } // Adds the top-level clusters to the builder and returns the leaf cluster names private Set addTopLevelClustersToBuilder( - XdsConfig.XdsConfigBuilder builder, Map> edsWatchers, - Map> cdsWatchers, List topLevelClusters) { + XdsConfig.XdsConfigBuilder builder, + Map> edsWatchers, + Map> cdsWatchers, + List topLevelClusters) { Set leafClusterNames = new HashSet<>(); for (String clusterName : topLevelClusters) { @@ -367,23 +394,25 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi XdsConfig.XdsClusterConfig.ClusterChild child; switch (cdsUpdate.clusterType()) { case AGGREGATE: - List leafNames = getLeafNames(cdsUpdate); + Set leafNames = new HashSet<>(); + addLeafNames(leafNames, cdsUpdate); child = new AggregateConfig(leafNames); leafClusterNames.addAll(leafNames); break; case EDS: - EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(cdsUpdate.edsServiceName()); + XdsWatcherBase edsWatcher = + edsWatchers.get(cdsWatcher.getEdsServiceName()); if (edsWatcher != null) { child = new EndpointConfig(edsWatcher.getData()); } else { - builder.addCluster(clusterName, StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( + child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription( "EDS resource not found for cluster " + clusterName))); - continue; } break; case LOGICAL_DNS: // TODO get the resolved endpoint configuration - child = new EndpointConfig(null); + child = new EndpointConfig(StatusOr.fromStatus( + Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported"))); break; default: throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType()); @@ -395,29 +424,26 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return leafClusterNames; } - private List getLeafNames(XdsClusterResource.CdsUpdate cdsUpdate) { - List childNames = new ArrayList<>(); - + private void addLeafNames(Set leafNames, XdsClusterResource.CdsUpdate cdsUpdate) { for (String cluster : cdsUpdate.prioritizedClusterNames()) { + if (leafNames.contains(cluster)) { + continue; + } StatusOr data = getCluster(cluster).getData(); if (data == null || !data.hasValue() || data.getValue() == null) { - childNames.add(cluster); + leafNames.add(cluster); continue; } if (data.getValue().clusterType() == ClusterType.AGGREGATE) { - childNames.addAll(getLeafNames(data.getValue())); + addLeafNames(leafNames, data.getValue()); } else { - childNames.add(cluster); + leafNames.add(cluster); } } - - return childNames; } - private static boolean isTopLevelCluster(XdsWatcherBase cdsWatcher) { - if (! (cdsWatcher instanceof CdsWatcher)) { - return false; - } + private static boolean isTopLevelCluster( + XdsWatcherBase cdsWatcher) { return ((CdsWatcher)cdsWatcher).parentContexts.values().stream() .anyMatch(depth -> depth == 1); } @@ -454,17 +480,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private void updateRoutes(List virtualHosts, Object newParentContext, - VirtualHost oldVirtualHost, boolean sameParentContext) { + List oldVirtualHosts, boolean sameParentContext) { + VirtualHost oldVirtualHost = + RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority); VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority); - if (virtualHost == null) { - String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority; - logger.log(XdsLogger.XdsLogLevel.WARNING, error); - cleanUpRoutes(); - xdsConfigWatcher.onError( - "xDS node ID:" + dataPlaneAuthority, Status.UNAVAILABLE.withDescription(error)); - return; - } Set newClusters = getClusterNamesFromVirtualHost(virtualHost); Set oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost); @@ -482,6 +502,10 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } } + private String nodeInfo() { + return " nodeID: " + xdsClient.getBootstrapInfo().node().getId(); + } + private static Set getClusterNamesFromVirtualHost(VirtualHost virtualHost) { if (virtualHost == null) { return Collections.emptySet(); @@ -506,46 +530,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi return clusters; } - @Nullable - private VirtualHost getActiveVirtualHost() { - TypeWatchers rdsWatchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); - if (rdsWatchers == null) { - return null; - } - - RdsWatcher activeRdsWatcher = - (RdsWatcher) rdsWatchers.watchers.values().stream().findFirst().orElse(null); - if (activeRdsWatcher == null || activeRdsWatcher.missingResult() - || !activeRdsWatcher.getData().hasValue()) { - return null; - } - - return RoutingUtils.findVirtualHostForHostName( - activeRdsWatcher.getData().getValue().virtualHosts, dataPlaneAuthority); - } - - // Must be in SyncContext - private void cleanUpRoutes() { - // Remove RdsWatcher & CDS Watchers - TypeWatchers rdsResourceWatcher = - resourceWatchers.get(XdsRouteConfigureResource.getInstance()); - if (rdsResourceWatcher == null || rdsResourceWatcher.watchers.isEmpty()) { - return; - } - - XdsWatcherBase watcher = rdsResourceWatcher.watchers.values().stream().findFirst().get(); - cancelWatcher(watcher); - - // Remove CdsWatchers pointed to by the RdsWatcher - RdsWatcher rdsWatcher = (RdsWatcher) watcher; - for (String cName : rdsWatcher.getCdsNames()) { - CdsWatcher cdsWatcher = getCluster(cName); - if (cdsWatcher != null) { - cancelClusterWatcherTree(cdsWatcher, rdsWatcher); - } - } - } - private CdsWatcher getCluster(String clusterName) { return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName); } @@ -565,16 +549,11 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } public interface XdsConfigWatcher { - - void onUpdate(XdsConfig config); - - // These 2 methods are invoked when there is an error or - // does-not-exist on LDS or RDS only. The context will be a - // human-readable string indicating the scope in which the error - // occurred (e.g., the resource type and name). - void onError(String resourceContext, Status status); - - void onResourceDoesNotExist(String resourceContext); + /** + * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or + * INTERNAL. + */ + void onUpdate(StatusOr config); } private class ClusterSubscription implements Closeable { @@ -594,7 +573,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } } - private abstract static class XdsWatcherBase + private abstract class XdsWatcherBase implements ResourceWatcher { private final XdsResourceType type; private final String resourceName; @@ -612,12 +591,25 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onError(Status error) { checkNotNull(error, "error"); - setDataAsStatus(error); + // Don't update configuration on error, if we've already received configuration + if (!hasDataValue()) { + setDataAsStatus(Status.UNAVAILABLE.withDescription( + String.format("Error retrieving %s: %s: %s", + toContextString(), error.getCode(), error.getDescription()))); + maybePublishConfig(); + } } - protected void handleDoesNotExist(String resourceName) { + @Override + public void onResourceDoesNotExist(String resourceName) { + if (cancelled) { + return; + } + checkArgument(this.resourceName.equals(resourceName), "Resource name does not match"); - setDataAsStatus(Status.UNAVAILABLE.withDescription("No " + toContextString())); + setDataAsStatus(Status.UNAVAILABLE.withDescription( + toContextString() + " does not exist" + nodeInfo())); + maybePublishConfig(); } boolean missingResult() { @@ -647,12 +639,17 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi this.data = StatusOr.fromStatus(status); } - String toContextString() { + public String toContextString() { return toContextStr(type.typeName(), resourceName); } } - private class LdsWatcher extends XdsWatcherBase { + private interface RdsUpdateSupplier { + StatusOr getRdsUpdate(); + } + + private class LdsWatcher extends XdsWatcherBase + implements RdsUpdateSupplier { String rdsName; private LdsWatcher(String resourceName) { @@ -664,9 +661,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi checkNotNull(update, "update"); HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); - List virtualHosts = httpConnectionManager.virtualHosts(); - String rdsName = httpConnectionManager.rdsName(); - VirtualHost activeVirtualHost = getActiveVirtualHost(); + List virtualHosts; + String rdsName; + if (httpConnectionManager == null) { + // TCP listener. Unsupported config + virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS + rdsName = null; + } else { + virtualHosts = httpConnectionManager.virtualHosts(); + rdsName = httpConnectionManager.rdsName(); + } + StatusOr activeRdsUpdate = getRouteSource().getRdsUpdate(); + List activeVirtualHosts = activeRdsUpdate.hasValue() + ? activeRdsUpdate.getValue().virtualHosts + : Collections.emptyList(); boolean changedRdsName = !Objects.equals(rdsName, this.rdsName); if (changedRdsName) { @@ -675,10 +683,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi if (virtualHosts != null) { // No RDS watcher since we are getting RDS updates via LDS - updateRoutes(virtualHosts, this, activeVirtualHost, this.rdsName == null); + updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null); this.rdsName = null; } else if (changedRdsName) { - cleanUpRdsWatcher(); this.rdsName = rdsName; addWatcher(new RdsWatcher(rdsName)); logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); @@ -688,20 +695,18 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi maybePublishConfig(); } - @Override - public void onError(Status error) { - super.onError(checkNotNull(error, "error")); - xdsConfigWatcher.onError(toContextString(), error); - } - @Override public void onResourceDoesNotExist(String resourceName) { if (cancelled) { return; } - handleDoesNotExist(resourceName); - xdsConfigWatcher.onResourceDoesNotExist(toContextString()); + checkArgument(resourceName().equals(resourceName), "Resource name does not match"); + setDataAsStatus(Status.UNAVAILABLE.withDescription( + toContextString() + " does not exist" + nodeInfo())); + cleanUpRdsWatcher(); + rdsName = null; + maybePublishConfig(); } private void cleanUpRdsWatcher() { @@ -711,8 +716,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName); // Cleanup clusters (as appropriate) that had the old rds watcher as a parent - if (!oldRdsWatcher.hasDataValue() || !oldRdsWatcher.getData().hasValue() - || resourceWatchers.get(CLUSTER_RESOURCE) == null) { + if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) { return; } for (XdsWatcherBase watcher : @@ -723,16 +727,58 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi } private RdsWatcher getRdsWatcher() { - TypeWatchers watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); - if (watchers == null || rdsName == null || watchers.watchers.isEmpty()) { + if (rdsName == null) { + return null; + } + TypeWatchers watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance()); + if (watchers == null) { return null; } - return (RdsWatcher) watchers.watchers.get(rdsName); } + + public RdsUpdateSupplier getRouteSource() { + if (!hasDataValue()) { + return this; + } + HttpConnectionManager hcm = getData().getValue().httpConnectionManager(); + if (hcm == null) { + return this; + } + List virtualHosts = hcm.virtualHosts(); + if (virtualHosts != null) { + return this; + } + RdsWatcher rdsWatcher = getRdsWatcher(); + assert rdsWatcher != null; + return rdsWatcher; + } + + @Override + public StatusOr getRdsUpdate() { + if (missingResult()) { + return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded")); + } + if (!getData().hasValue()) { + return StatusOr.fromStatus(getData().getStatus()); + } + HttpConnectionManager hcm = getData().getValue().httpConnectionManager(); + if (hcm == null) { + return StatusOr.fromStatus( + Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo())); + } + List virtualHosts = hcm.virtualHosts(); + if (virtualHosts == null) { + // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would + // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a + // bug + return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS")); + } + return StatusOr.fromValue(new RdsUpdate(virtualHosts)); + } } - private class RdsWatcher extends XdsWatcherBase { + private class RdsWatcher extends XdsWatcherBase implements RdsUpdateSupplier { public RdsWatcher(String resourceName) { super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName")); @@ -741,37 +787,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(RdsUpdate update) { checkNotNull(update, "update"); - RdsUpdate oldData = hasDataValue() ? getData().getValue() : null; - VirtualHost oldVirtualHost = - (oldData != null) - ? RoutingUtils.findVirtualHostForHostName(oldData.virtualHosts, dataPlaneAuthority) - : null; + List oldVirtualHosts = hasDataValue() + ? getData().getValue().virtualHosts + : Collections.emptyList(); setData(update); - updateRoutes(update.virtualHosts, this, oldVirtualHost, true); + updateRoutes(update.virtualHosts, this, oldVirtualHosts, true); maybePublishConfig(); } @Override - public void onError(Status error) { - super.onError(checkNotNull(error, "error")); - xdsConfigWatcher.onError(toContextString(), error); - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - if (cancelled) { - return; + public StatusOr getRdsUpdate() { + if (missingResult()) { + return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded")); } - handleDoesNotExist(checkNotNull(resourceName, "resourceName")); - xdsConfigWatcher.onResourceDoesNotExist(toContextString()); - } - - ImmutableList getCdsNames() { - if (!hasDataValue() || getData().getValue().virtualHosts == null) { - return ImmutableList.of(); - } - - return ImmutableList.copyOf(getClusterNamesFromVirtualHost(getActiveVirtualHost())); + return getData(); } } @@ -789,7 +818,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi switch (update.clusterType()) { case EDS: setData(update); - if (!addEdsWatcher(update.edsServiceName(), this)) { + if (!addEdsWatcher(getEdsServiceName(), this)) { maybePublishConfig(); } break; @@ -805,14 +834,15 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi logger.log(XdsLogger.XdsLogLevel.WARNING, "Cluster recursion depth limit exceeded for cluster {0}", resourceName()); Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth"); + "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); setDataAsStatus(error); } if (hasDataValue()) { - Set oldNames = new HashSet<>(getData().getValue().prioritizedClusterNames()); + Set oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE + ? new HashSet<>(getData().getValue().prioritizedClusterNames()) + : Collections.emptySet(); Set newNames = new HashSet<>(update.prioritizedClusterNames()); - Set deletedClusters = Sets.difference(oldNames, newNames); deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster), parentContext)); @@ -838,19 +868,20 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi break; default: Status error = Status.UNAVAILABLE.withDescription( - "aggregate cluster graph exceeds max depth"); + "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo()); setDataAsStatus(error); maybePublishConfig(); } } - @Override - public void onResourceDoesNotExist(String resourceName) { - if (cancelled) { - return; + public String getEdsServiceName() { + XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue(); + assert cdsUpdate.clusterType() == ClusterType.EDS; + String edsServiceName = cdsUpdate.edsServiceName(); + if (edsServiceName == null) { + edsServiceName = cdsUpdate.clusterName(); } - handleDoesNotExist(checkNotNull(resourceName, "resourceName")); - maybePublishConfig(); + return edsServiceName; } } @@ -868,15 +899,6 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi maybePublishConfig(); } - @Override - public void onResourceDoesNotExist(String resourceName) { - if (cancelled) { - return; - } - handleDoesNotExist(checkNotNull(resourceName, "resourceName")); - maybePublishConfig(); - } - void addParentContext(CdsWatcher parentContext) { parentContexts.add(checkNotNull(parentContext, "parentContext")); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 4b6da6ac8b..5c1b3105c4 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -45,6 +45,7 @@ import io.grpc.MetricRecorder; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; @@ -60,11 +61,9 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; -import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.XdsClient; -import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.net.URI; @@ -127,6 +126,7 @@ final class XdsNameResolver extends NameResolver { private final ConfigSelector configSelector = new ConfigSelector(); private final long randomChannelId; private final MetricRecorder metricRecorder; + private final Args nameResolverArgs; // Must be accessed in syncContext. // Filter instances are unique per channel, and per filter (name+typeUrl). // NamedFilterConfig.filterStateKey -> filter_instance. @@ -138,20 +138,17 @@ final class XdsNameResolver extends NameResolver { private XdsClient xdsClient; private CallCounterProvider callCounterProvider; private ResolveState resolveState; - // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in - // XdsClient instead of here. - private boolean receivedConfig; XdsNameResolver( URI targetUri, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, @Nullable Map bootstrapOverride, - MetricRecorder metricRecorder) { + MetricRecorder metricRecorder, Args nameResolverArgs) { this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser, syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride, - metricRecorder); + metricRecorder, nameResolverArgs); } @VisibleForTesting @@ -161,7 +158,7 @@ final class XdsNameResolver extends NameResolver { SynchronizationContext syncContext, ScheduledExecutorService scheduler, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, FilterRegistry filterRegistry, @Nullable Map bootstrapOverride, - MetricRecorder metricRecorder) { + MetricRecorder metricRecorder, Args nameResolverArgs) { this.targetAuthority = targetAuthority; target = targetUri.toString(); @@ -180,6 +177,8 @@ final class XdsNameResolver extends NameResolver { this.random = checkNotNull(random, "random"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); this.metricRecorder = metricRecorder; + this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs"); + randomChannelId = random.nextLong(); logId = InternalLogId.allocate("xds-resolver", name); logger = XdsLogger.withLogId(logId); @@ -228,9 +227,8 @@ final class XdsNameResolver extends NameResolver { } ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName); callCounterProvider = SharedCallCounterMap.getInstance(); - resolveState = new ResolveState(ldsResourceName); - resolveState.start(); + resolveState = new ResolveState(ldsResourceName); // auto starts } private static String expandPercentS(String template, String replacement) { @@ -241,7 +239,7 @@ final class XdsNameResolver extends NameResolver { public void shutdown() { logger.log(XdsLogLevel.INFO, "Shutdown"); if (resolveState != null) { - resolveState.stop(); + resolveState.shutdown(); } if (xdsClient != null) { xdsClient = xdsClientPool.returnObject(xdsClient); @@ -290,7 +288,7 @@ final class XdsNameResolver extends NameResolver { } // called in syncContext - private void updateResolutionResult() { + private void updateResolutionResult(XdsConfig xdsConfig) { syncContext.throwIfNotInThisSynchronizationContext(); ImmutableMap.Builder childPolicy = new ImmutableMap.Builder<>(); @@ -312,6 +310,8 @@ final class XdsNameResolver extends NameResolver { Attributes attrs = Attributes.newBuilder() .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool) + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, resolveState.xdsDependencyManager) .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider) .set(InternalConfigSelector.KEY, configSelector) .build(); @@ -321,7 +321,6 @@ final class XdsNameResolver extends NameResolver { .setServiceConfig(parsedServiceConfig) .build(); listener.onResult2(result); - receivedConfig = true; } /** @@ -540,7 +539,11 @@ final class XdsNameResolver extends NameResolver { public void run() { if (clusterRefs.get(cluster).refCount.get() == 0) { clusterRefs.remove(cluster); - updateResolutionResult(); + if (resolveState.lastConfigOrStatus.hasValue()) { + updateResolutionResult(resolveState.lastConfigOrStatus.getValue()); + } else { + resolveState.cleanUpRoutes(resolveState.lastConfigOrStatus.getStatus()); + } } } }); @@ -629,82 +632,56 @@ final class XdsNameResolver extends NameResolver { return "cluster_specifier_plugin:" + pluginName; } - private class ResolveState implements ResourceWatcher { + class ResolveState implements XdsDependencyManager.XdsConfigWatcher { private final ConfigOrError emptyServiceConfig = serviceConfigParser.parseServiceConfig(Collections.emptyMap()); - private final String ldsResourceName; + private final String authority; + private final XdsDependencyManager xdsDependencyManager; private boolean stopped; @Nullable private Set existingClusters; // clusters to which new requests can be routed - @Nullable - private RouteDiscoveryState routeDiscoveryState; + private StatusOr lastConfigOrStatus; - ResolveState(String ldsResourceName) { - this.ldsResourceName = ldsResourceName; + private ResolveState(String ldsResourceName) { + authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; + xdsDependencyManager = + new XdsDependencyManager(xdsClient, this, syncContext, authority, ldsResourceName, + nameResolverArgs, scheduler); } - @Override - public void onChanged(final XdsListenerResource.LdsUpdate update) { + private void shutdown() { if (stopped) { return; } - logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); - List virtualHosts = httpConnectionManager.virtualHosts(); - String rdsName = httpConnectionManager.rdsName(); + + stopped = true; + xdsDependencyManager.shutdown(); + updateActiveFilters(null); + } + + @Override + public void onUpdate(StatusOr updateOrStatus) { + if (stopped) { + return; + } + logger.log(XdsLogLevel.INFO, "Receive XDS resource update: {0}", updateOrStatus); + + lastConfigOrStatus = updateOrStatus; + if (!updateOrStatus.hasValue()) { + updateActiveFilters(null); + cleanUpRoutes(updateOrStatus.getStatus()); + return; + } + + // Process Route + XdsConfig update = updateOrStatus.getValue(); + HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager(); + VirtualHost virtualHost = update.getVirtualHost(); ImmutableList filterConfigs = httpConnectionManager.httpFilterConfigs(); long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano(); - // Create/update HCM-bound state. - cleanUpRouteDiscoveryState(); updateActiveFilters(filterConfigs); - - // Routes specified directly in LDS. - if (virtualHosts != null) { - updateRoutes(virtualHosts, streamDurationNano, filterConfigs); - return; - } - // Routes provided by RDS. - routeDiscoveryState = new RouteDiscoveryState(rdsName, streamDurationNano, filterConfigs); - logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - rdsName, routeDiscoveryState, syncContext); - } - - @Override - public void onError(final Status error) { - if (stopped || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load LDS %s. xDS server returned: %s: %s", - ldsResourceName, error.getCode(), error.getDescription()))); - } - - @Override - public void onResourceDoesNotExist(final String resourceName) { - if (stopped) { - return; - } - String error = "LDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRouteDiscoveryState(); - updateActiveFilters(null); - cleanUpRoutes(error); - } - - private void start() { - logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), - ldsResourceName, this, syncContext); - } - - private void stop() { - logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName); - stopped = true; - cleanUpRouteDiscoveryState(); - updateActiveFilters(null); - xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this); + updateRoutes(update, virtualHost, streamDurationNano, filterConfigs); } // called in syncContext @@ -732,18 +709,11 @@ final class XdsNameResolver extends NameResolver { } } - // called in syncContext - private void updateRoutes(List virtualHosts, long httpMaxStreamDurationNano, + private void updateRoutes( + XdsConfig xdsConfig, + @Nullable VirtualHost virtualHost, + long httpMaxStreamDurationNano, @Nullable List filterConfigs) { - String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority; - VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority); - if (virtualHost == null) { - String error = "Failed to find virtual host matching hostname: " + authority; - logger.log(XdsLogLevel.WARNING, error); - cleanUpRoutes(error); - return; - } - List routes = virtualHost.routes(); ImmutableList.Builder routesData = ImmutableList.builder(); @@ -826,7 +796,7 @@ final class XdsNameResolver extends NameResolver { } // Update service config to include newly added clusters. if (shouldUpdateResult && routingConfig != null) { - updateResolutionResult(); + updateResolutionResult(xdsConfig); shouldUpdateResult = false; } // Make newly added clusters selectable by config selector and deleted clusters no longer @@ -840,7 +810,7 @@ final class XdsNameResolver extends NameResolver { } } if (shouldUpdateResult) { - updateResolutionResult(); + updateResolutionResult(xdsConfig); } } @@ -882,10 +852,8 @@ final class XdsNameResolver extends NameResolver { return combineInterceptors(filterInterceptors.build()); } - private void cleanUpRoutes(String error) { - String errorWithNodeId = - error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId(); - routingConfig = new RoutingConfig(Status.UNAVAILABLE.withDescription(errorWithNodeId)); + private void cleanUpRoutes(Status error) { + routingConfig = new RoutingConfig(error); if (existingClusters != null) { for (String cluster : existingClusters) { int count = clusterRefs.get(cluster).refCount.decrementAndGet(); @@ -904,64 +872,6 @@ final class XdsNameResolver extends NameResolver { .build()) .setServiceConfig(emptyServiceConfig) .build()); - receivedConfig = true; - } - - private void cleanUpRouteDiscoveryState() { - if (routeDiscoveryState != null) { - String rdsName = routeDiscoveryState.resourceName; - logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName); - xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName, - routeDiscoveryState); - routeDiscoveryState = null; - } - } - - /** - * Discovery state for RouteConfiguration resource. One instance for each Listener resource - * update. - */ - private class RouteDiscoveryState implements ResourceWatcher { - private final String resourceName; - private final long httpMaxStreamDurationNano; - @Nullable - private final List filterConfigs; - - private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano, - @Nullable List filterConfigs) { - this.resourceName = resourceName; - this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; - this.filterConfigs = filterConfigs; - } - - @Override - public void onChanged(final RdsUpdate update) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); - updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs); - } - - @Override - public void onError(final Status error) { - if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load RDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription()))); - } - - @Override - public void onResourceDoesNotExist(final String resourceName) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - String error = "RDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRoutes(error); - } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java index 7451833126..eb3887396a 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java @@ -82,7 +82,7 @@ public final class XdsNameResolverProvider extends NameResolverProvider { args.getServiceConfigParser(), args.getSynchronizationContext(), args.getScheduledExecutorService(), bootstrapOverride, - args.getMetricRecorder()); + args.getMetricRecorder(), args); } return null; } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index f94a94a944..2af04a3aed 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.StatusMatcher.statusHasCode; import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE; import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS; import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS; @@ -32,7 +33,6 @@ import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -47,24 +47,26 @@ import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.grpc.BindableService; +import io.grpc.ChannelLogger; import io.grpc.ManagedChannel; +import io.grpc.NameResolver; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusOr; +import io.grpc.StatusOrMatcher; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcUtil; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; -import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; -import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsClientMetricReporter; -import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; import java.io.IOException; @@ -77,7 +79,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -108,7 +110,9 @@ public class XdsDependencyManagerTest { private XdsClientMetricReporter xdsClientMetricReporter; private final SynchronizationContext syncContext = - new SynchronizationContext(mock(Thread.UncaughtExceptionHandler.class)); + new SynchronizationContext((t, e) -> { + throw new AssertionError(e); + }); private ManagedChannel channel; private XdsClientImpl xdsClient; @@ -133,9 +137,17 @@ public class XdsDependencyManagerTest { private XdsConfig defaultXdsConfig; // set in setUp() @Captor - private ArgumentCaptor xdsConfigCaptor; - @Captor - private ArgumentCaptor statusCaptor; + private ArgumentCaptor> xdsUpdateCaptor; + private final NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class)) + .setChannelLogger(mock(ChannelLogger.class)) + .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) + .build(); + + private final ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService(); @Before public void setUp() throws Exception { @@ -180,36 +192,36 @@ public class XdsDependencyManagerTest { @Test public void verify_basic_config() { - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); - verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); - testWatcher.verifyStats(1, 0, 0); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); + testWatcher.verifyStats(1, 0); } @Test public void verify_config_update() { - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); - testWatcher.verifyStats(1, 0, 0); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); + testWatcher.verifyStats(1, 0); assertThat(testWatcher.lastConfig).isEqualTo(defaultXdsConfig); XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2", ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT + 2); inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(ArgumentMatchers.notNull()); - testWatcher.verifyStats(2, 0, 0); + testWatcher.verifyStats(2, 0); assertThat(testWatcher.lastConfig).isNotEqualTo(defaultXdsConfig); } @Test public void verify_simple_aggregate() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); String rootName = "root_c"; @@ -226,14 +238,14 @@ public class XdsDependencyManagerTest { testWatcher.lastConfig.getClusters(); assertThat(lastConfigClusters).hasSize(childNames.size() + 1); StatusOr rootC = lastConfigClusters.get(rootName); - XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); + CdsUpdate rootUpdate = rootC.getValue().getClusterResource(); assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE); assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames); for (String childName : childNames) { assertThat(lastConfigClusters).containsKey(childName); StatusOr childConfigOr = lastConfigClusters.get(childName); - XdsClusterResource.CdsUpdate childResource = + CdsUpdate childResource = childConfigOr.getValue().getClusterResource(); assertThat(childResource.clusterType()).isEqualTo(EDS); assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName)); @@ -266,54 +278,57 @@ public class XdsDependencyManagerTest { List childNames2 = Arrays.asList("clusterA", "clusterX"); XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName2, childNames2); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); Closeable subscription2 = xdsDependencyManager.subscribeToCluster(rootName2); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - testWatcher.verifyStats(3, 0, 0); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + testWatcher.verifyStats(3, 0); ImmutableSet.Builder builder = ImmutableSet.builder(); Set expectedClusters = builder.add(rootName1).add(rootName2).add(CLUSTER_NAME) .addAll(childNames).addAll(childNames2).build(); - assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet()) + .isEqualTo(expectedClusters); // Close 1 subscription shouldn't affect the other or RDS subscriptions subscription1.close(); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); builder = ImmutableSet.builder(); Set expectedClusters2 = builder.add(rootName2).add(CLUSTER_NAME).addAll(childNames2).build(); - assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).isEqualTo(expectedClusters2); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet()) + .isEqualTo(expectedClusters2); subscription2.close(); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); } @Test public void testDelayedSubscription() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String rootName1 = "root_c"; Closeable subscription1 = xdsDependencyManager.subscribeToCluster(rootName1); assertThat(subscription1).isNotNull(); fakeClock.forwardTime(16, TimeUnit.SECONDS); - inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).toString()).isEqualTo( - StatusOr.fromStatus(Status.UNAVAILABLE.withDescription( - "No " + toContextStr(CLUSTER_TYPE_NAME, rootName1))).toString()); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + Status status = xdsUpdateCaptor.getValue().getValue().getClusters().get(rootName1).getStatus(); + assertThat(status.getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(status.getDescription()).contains(rootName1); List childNames = Arrays.asList("clusterC", "clusterB", "clusterA"); XdsTestUtils.addAggregateToExistingConfig(controlPlaneService, rootName1, childNames); - inOrder.verify(xdsConfigWatcher).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue().getClusters().get(rootName1).hasValue()).isTrue(); + inOrder.verify(xdsConfigWatcher).onUpdate(xdsUpdateCaptor.capture()); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().get(rootName1).hasValue()) + .isTrue(); } @Test @@ -342,109 +357,99 @@ public class XdsDependencyManagerTest { edsMap.put("garbageEds", clusterLoadAssignment); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); fakeClock.forwardTime(16, TimeUnit.SECONDS); - verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); List> returnedClusters = new ArrayList<>(); for (String childName : childNames) { - returnedClusters.add(xdsConfigCaptor.getValue().getClusters().get(childName)); + returnedClusters.add(xdsUpdateCaptor.getValue().getValue().getClusters().get(childName)); } // Check that missing cluster reported Status and the other 2 are present - Status expectedClusterStatus = Status.UNAVAILABLE.withDescription( - "No " + toContextStr(CLUSTER_TYPE_NAME, childNames.get(2))); StatusOr missingCluster = returnedClusters.get(2); - assertThat(missingCluster.getStatus().toString()).isEqualTo(expectedClusterStatus.toString()); + assertThat(missingCluster.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + assertThat(missingCluster.getStatus().getDescription()).contains(childNames.get(2)); assertThat(returnedClusters.get(0).hasValue()).isTrue(); assertThat(returnedClusters.get(1).hasValue()).isTrue(); // Check that missing EDS reported Status, the other one is present and the garbage EDS is not - Status expectedEdsStatus = Status.UNAVAILABLE.withDescription( - "No " + toContextStr(ENDPOINT_TYPE_NAME, XdsTestUtils.EDS_NAME + 1)); assertThat(getEndpoint(returnedClusters.get(0)).hasValue()).isTrue(); - assertThat(getEndpoint(returnedClusters.get(1)).hasValue()).isFalse(); - assertThat(getEndpoint(returnedClusters.get(1)).getStatus().toString()) - .isEqualTo(expectedEdsStatus.toString()); + assertThat(getEndpoint(returnedClusters.get(1)).getStatus().getCode()) + .isEqualTo(Status.Code.UNAVAILABLE); + assertThat(getEndpoint(returnedClusters.get(1)).getStatus().getDescription()) + .contains(XdsTestUtils.EDS_NAME + 1); - verify(xdsConfigWatcher, never()).onResourceDoesNotExist(any()); - testWatcher.verifyStats(1, 0, 0); + verify(xdsConfigWatcher, never()).onUpdate( + argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE)))); + testWatcher.verifyStats(1, 0); } @Test public void testMissingLds() { - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, "badLdsName"); + String ldsName = "badLdsName"; + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, ldsName, nameResolverArgs, scheduler); fakeClock.forwardTime(16, TimeUnit.SECONDS); - verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist( - toContextStr(XdsListenerResource.getInstance().typeName(), "badLdsName")); + verify(xdsConfigWatcher, timeout(1000)).onUpdate( + argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE) + .andDescriptionContains(ldsName)))); - testWatcher.verifyStats(0, 0, 1); + testWatcher.verifyStats(0, 1); + } + + @Test + public void testTcpListenerErrors() { + Listener serverListener = + ControlPlaneRule.buildServerListener().toBuilder().setName(serverName).build(); + controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, ImmutableMap.of(serverName, serverListener)); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + fakeClock.forwardTime(16, TimeUnit.SECONDS); + verify(xdsConfigWatcher, timeout(1000)).onUpdate( + argThat(StatusOrMatcher.hasStatus( + statusHasCode(Status.Code.UNAVAILABLE).andDescriptionContains("Not an API listener")))); + + testWatcher.verifyStats(0, 1); } @Test public void testMissingRds() { - Listener serverListener = ControlPlaneRule.buildServerListener(); - Listener clientListener = - ControlPlaneRule.buildClientListener(serverName, serverName, "badRdsName"); + String rdsName = "badRdsName"; + Listener clientListener = ControlPlaneRule.buildClientListener(serverName, serverName, rdsName); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, - ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + ImmutableMap.of(serverName, clientListener)); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); fakeClock.forwardTime(16, TimeUnit.SECONDS); - verify(xdsConfigWatcher, timeout(1000)).onResourceDoesNotExist( - toContextStr(XdsRouteConfigureResource.getInstance().typeName(), "badRdsName")); + verify(xdsConfigWatcher, timeout(1000)).onUpdate( + argThat(StatusOrMatcher.hasStatus(statusHasCode(Status.Code.UNAVAILABLE) + .andDescriptionContains(rdsName)))); - testWatcher.verifyStats(0, 0, 1); + testWatcher.verifyStats(0, 1); } @Test public void testUpdateToMissingVirtualHost() { - InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - WrappedXdsClient wrappedXdsClient = new WrappedXdsClient(xdsClient, syncContext); - xdsDependencyManager = new XdsDependencyManager( - wrappedXdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + RouteConfiguration routeConfig = XdsTestUtils.buildRouteConfiguration( + "wrong-virtual-host", XdsTestUtils.RDS_NAME, XdsTestUtils.CLUSTER_NAME); + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig)); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); // Update with a config that has a virtual host that doesn't match the server name - wrappedXdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts()); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onError(any(), statusCaptor.capture()); - assertThat(statusCaptor.getValue().getDescription()) - .isEqualTo("Failed to find virtual host matching hostname: " + serverName); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + assertThat(xdsUpdateCaptor.getValue().getStatus().getDescription()) + .contains("Failed to find virtual host matching hostname: " + serverName); - testWatcher.verifyStats(1, 1, 0); - - wrappedXdsClient.shutdown(); - } - - private List buildUnmatchedVirtualHosts() { - io.grpc.xds.VirtualHost.Route route1 = - io.grpc.xds.VirtualHost.Route.forAction( - io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/GreetService/bye"), - io.grpc.xds.VirtualHost.Route.RouteAction.forCluster( - "cluster-bar.googleapis.com", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L), null, false), ImmutableMap.of()); - io.grpc.xds.VirtualHost.Route route2 = - io.grpc.xds.VirtualHost.Route.forAction( - io.grpc.xds.VirtualHost.Route.RouteMatch.withPathExactOnly("/HelloService/hi"), - io.grpc.xds.VirtualHost.Route.RouteAction.forCluster( - "cluster-foo.googleapis.com", Collections.emptyList(), - TimeUnit.SECONDS.toNanos(15L), null, false), - ImmutableMap.of()); - return Arrays.asList( - io.grpc.xds.VirtualHost.create("virtualhost-foo", Collections.singletonList("hello" - + ".googleapis.com"), - Collections.singletonList(route1), - ImmutableMap.of()), - io.grpc.xds.VirtualHost.create("virtualhost-bar", Collections.singletonList("hi" - + ".googleapis.com"), - Collections.singletonList(route2), - ImmutableMap.of())); + testWatcher.verifyStats(0, 1); } @Test @@ -452,37 +457,32 @@ public class XdsDependencyManagerTest { String ldsResourceName = "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1"; - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, ldsResourceName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, ldsResourceName, nameResolverArgs, scheduler); - Status expectedStatus = Status.INVALID_ARGUMENT.withDescription( - "Wrong configuration: xds server does not exist for resource " + ldsResourceName); - String context = toContextStr(XdsListenerResource.getInstance().typeName(), ldsResourceName); - verify(xdsConfigWatcher, timeout(1000)) - .onError(eq(context), argThat(new XdsTestUtils.StatusMatcher(expectedStatus))); + verify(xdsConfigWatcher, timeout(1000)).onUpdate( + argThat(StatusOrMatcher.hasStatus( + statusHasCode(Status.Code.UNAVAILABLE).andDescriptionContains(ldsResourceName)))); fakeClock.forwardTime(16, TimeUnit.SECONDS); - testWatcher.verifyStats(0, 1, 0); + testWatcher.verifyStats(0, 1); } @Test public void testChangeRdsName_fromLds() { - // TODO implement InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - Listener serverListener = ControlPlaneRule.buildServerListener(); - - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(StatusOr.fromValue(defaultXdsConfig)); String newRdsName = "newRdsName1"; Listener clientListener = buildInlineClientListener(newRdsName, CLUSTER_NAME); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, - ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue()).isNotEqualTo(defaultXdsConfig); - assertThat(xdsConfigCaptor.getValue().getVirtualHost().name()).isEqualTo(newRdsName); + ImmutableMap.of(serverName, clientListener)); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + assertThat(xdsUpdateCaptor.getValue().getValue()).isNotEqualTo(defaultXdsConfig); + assertThat(xdsUpdateCaptor.getValue().getValue().getVirtualHost().name()).isEqualTo(newRdsName); } @Test @@ -527,22 +527,22 @@ public class XdsDependencyManagerTest { // Start the actual test InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - XdsConfig initialConfig = xdsConfigCaptor.getValue(); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); // Make sure that adding subscriptions that rds points at doesn't change the config Closeable rootSub = xdsDependencyManager.subscribeToCluster("root"); - assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig); Closeable clusterAB11Sub = xdsDependencyManager.subscribeToCluster("clusterAB11"); - assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig); // Make sure that closing subscriptions that rds points at doesn't change the config rootSub.close(); - assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig); clusterAB11Sub.close(); - assertThat(xdsDependencyManager.buildConfig()).isEqualTo(initialConfig); + assertThat(xdsDependencyManager.buildUpdate().getValue()).isEqualTo(initialConfig); // Make an explicit root subscription and then change RDS to point to A11 rootSub = xdsDependencyManager.subscribeToCluster("root"); @@ -550,13 +550,14 @@ public class XdsDependencyManagerTest { XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, "clusterA11"); controlPlaneService.setXdsConfig( ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, newRouteConfig)); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue().getClusters().keySet().size()).isEqualTo(4); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet().size()).isEqualTo(4); // Now that it is released, we should only have A11 rootSub.close(); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - assertThat(xdsConfigCaptor.getValue().getClusters().keySet()).containsExactly("clusterA11"); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + assertThat(xdsUpdateCaptor.getValue().getValue().getClusters().keySet()) + .containsExactly("clusterA11"); } @Test @@ -587,10 +588,10 @@ public class XdsDependencyManagerTest { controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); // Start the actual test - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - XdsConfig initialConfig = xdsConfigCaptor.getValue(); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig initialConfig = xdsUpdateCaptor.getValue().getValue(); assertThat(initialConfig.getClusters().keySet()) .containsExactly("root", "clusterA", "clusterB"); @@ -605,8 +606,8 @@ public class XdsDependencyManagerTest { @Test public void testChangeRdsName_FromLds_complexTree() { - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); // Create the same tree as in testMultipleParentsInCdsTree Cluster rootCluster = @@ -639,11 +640,10 @@ public class XdsDependencyManagerTest { // Do the test String newRdsName = "newRdsName1"; Listener clientListener = buildInlineClientListener(newRdsName, "root"); - Listener serverListener = ControlPlaneRule.buildServerListener(); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, - ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); - inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsConfigCaptor.capture()); - XdsConfig config = xdsConfigCaptor.getValue(); + ImmutableMap.of(serverName, clientListener)); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + XdsConfig config = xdsUpdateCaptor.getValue().getValue(); assertThat(config.getVirtualHost().name()).isEqualTo(newRdsName); assertThat(config.getClusters().size()).isEqualTo(4); } @@ -652,9 +652,9 @@ public class XdsDependencyManagerTest { public void testChangeAggCluster() { InOrder inOrder = Mockito.inOrder(xdsConfigWatcher); - xdsDependencyManager = new XdsDependencyManager( - xdsClient, xdsConfigWatcher, syncContext, serverName, serverName); - inOrder.verify(xdsConfigWatcher, atLeastOnce()).onUpdate(any()); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); // Setup initial config A -> A1 -> (A11, A12) Cluster rootCluster = @@ -673,9 +673,8 @@ public class XdsDependencyManagerTest { XdsTestUtils.addEdsClusters(clusterMap, edsMap, "clusterA11", "clusterA12"); Listener clientListener = buildInlineClientListener(RDS_NAME, "root"); - Listener serverListener = ControlPlaneRule.buildServerListener(); controlPlaneService.setXdsConfig(ADS_TYPE_URL_LDS, - ImmutableMap.of(XdsTestUtils.SERVER_LISTENER, serverListener, serverName, clientListener)); + ImmutableMap.of(serverName, clientListener)); controlPlaneService.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap); controlPlaneService.setXdsConfig(ADS_TYPE_URL_EDS, edsMap); @@ -702,96 +701,50 @@ public class XdsDependencyManagerTest { inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(argThat(nameMatcher)); } - private Listener buildInlineClientListener(String rdsName, String clusterName) { - return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); + @Test + public void testCdsError() throws IOException { + controlPlaneService.setXdsConfig( + ADS_TYPE_URL_CDS, ImmutableMap.of(XdsTestUtils.CLUSTER_NAME, + Cluster.newBuilder().setName(XdsTestUtils.CLUSTER_NAME).build())); + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(xdsUpdateCaptor.capture()); + Status status = xdsUpdateCaptor.getValue().getValue() + .getClusters().get(CLUSTER_NAME).getStatus(); + assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME); } - - private static String toContextStr(String type, String resourceName) { - return type + " resource: " + resourceName; + private Listener buildInlineClientListener(String rdsName, String clusterName) { + return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); } private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher { XdsConfig lastConfig; int numUpdates = 0; int numError = 0; - int numDoesNotExist = 0; @Override - public void onUpdate(XdsConfig config) { - log.fine("Config changed: " + config); - lastConfig = config; - numUpdates++; - } - - @Override - public void onError(String resourceContext, Status status) { - log.fine(String.format("Error %s for %s: ", status, resourceContext)); - numError++; - } - - @Override - public void onResourceDoesNotExist(String resourceName) { - log.fine("Resource does not exist: " + resourceName); - numDoesNotExist++; + public void onUpdate(StatusOr update) { + log.fine("Config update: " + update); + if (update.hasValue()) { + lastConfig = update.getValue(); + numUpdates++; + } else { + numError++; + } } private List getStats() { - return Arrays.asList(numUpdates, numError, numDoesNotExist); + return Arrays.asList(numUpdates, numError); } - private void verifyStats(int updt, int err, int notExist) { - assertThat(getStats()).isEqualTo(Arrays.asList(updt, err, notExist)); + private void verifyStats(int updt, int err) { + assertThat(getStats()).isEqualTo(Arrays.asList(updt, err)); } } - private static class WrappedXdsClient extends XdsClient { - private final XdsClient delegate; - private final SynchronizationContext syncContext; - private ResourceWatcher ldsWatcher; - - WrappedXdsClient(XdsClient delegate, SynchronizationContext syncContext) { - this.delegate = delegate; - this.syncContext = syncContext; - } - - @Override - public void shutdown() { - delegate.shutdown(); - } - - @Override - @SuppressWarnings("unchecked") - public void watchXdsResource( - XdsResourceType type, String resourceName, ResourceWatcher watcher, - Executor executor) { - if (type.equals(XdsListenerResource.getInstance())) { - ldsWatcher = (ResourceWatcher) watcher; - } - delegate.watchXdsResource(type, resourceName, watcher, executor); - } - - - - @Override - public void cancelXdsResourceWatch(XdsResourceType type, - String resourceName, - ResourceWatcher watcher) { - delegate.cancelXdsResourceWatch(type, resourceName, watcher); - } - - void deliverLdsUpdate(long httpMaxStreamDurationNano, - List virtualHosts) { - syncContext.execute(() -> { - LdsUpdate ldsUpdate = LdsUpdate.forApiListener( - io.grpc.xds.HttpConnectionManager.forVirtualHosts( - httpMaxStreamDurationNano, virtualHosts, null)); - ldsWatcher.onChanged(ldsUpdate); - }); - } - } - - static class ClusterNameMatcher implements ArgumentMatcher { + static class ClusterNameMatcher implements ArgumentMatcher> { private final List expectedNames; ClusterNameMatcher(List expectedNames) { @@ -799,7 +752,11 @@ public class XdsDependencyManagerTest { } @Override - public boolean matches(XdsConfig xdsConfig) { + public boolean matches(StatusOr update) { + if (!update.hasValue()) { + return false; + } + XdsConfig xdsConfig = update.getValue(); if (xdsConfig == null || xdsConfig.getClusters() == null) { return false; } diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 4a8193c932..7425e3e31d 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -46,6 +46,7 @@ import com.google.protobuf.util.Durations; import com.google.re2j.Pattern; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ChannelLogger; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; @@ -70,6 +71,7 @@ import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.AutoConfiguredLoadBalancerFactory; import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.JsonParser; import io.grpc.internal.JsonUtil; import io.grpc.internal.ObjectPool; @@ -89,6 +91,8 @@ import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher; +import io.grpc.xds.XdsClusterResource.CdsUpdate; +import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.client.Bootstrapper.AuthorityInfo; @@ -104,6 +108,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -187,6 +192,15 @@ public class XdsNameResolverTest { private TestCall testCall; private boolean originalEnableTimeout; private URI targetUri; + private final NameResolver.Args nameResolverArgs = NameResolver.Args.newBuilder() + .setDefaultPort(8080) + .setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR) + .setSynchronizationContext(syncContext) + .setServiceConfigParser(mock(NameResolver.ServiceConfigParser.class)) + .setChannelLogger(mock(ChannelLogger.class)) + .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) + .build(); + @Before public void setUp() { @@ -213,7 +227,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder); + xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder, nameResolverArgs); } @After @@ -259,7 +273,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -273,7 +287,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, "notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -295,7 +309,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, - mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder); + mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -316,7 +330,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -337,7 +351,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); // The Service Authority must be URL encoded, but unlike the LDS resource name. @@ -366,7 +380,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, "xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -399,7 +413,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); // use different ldsResourceName and service authority. The virtualhost lookup should use // service authority. expectedLdsResourceName = "test-" + expectedLdsResourceName; @@ -413,6 +427,7 @@ public class XdsNameResolverTest { Collections.singletonList(route1), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -429,6 +444,7 @@ public class XdsNameResolverTest { Collections.singletonList(route2), ImmutableMap.of()); xdsClient.deliverRdsUpdate(alternativeRdsResource, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster2); // Two new service config updates triggered: // - with load balancing config being able to select cluster1 and cluster2 // - with load balancing config being able to select cluster2 only @@ -467,6 +483,7 @@ public class XdsNameResolverTest { Collections.singletonList(route), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -483,6 +500,7 @@ public class XdsNameResolverTest { verifyNoInteractions(mockListener); assertThat(xdsClient.rdsResource).isEqualTo(RDS_RESOURCE_NAME); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -506,6 +524,7 @@ public class XdsNameResolverTest { Collections.singletonList(route), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -529,11 +548,15 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable")); - verify(mockListener).onError(errorCaptor.capture()); - Status error = errorCaptor.getValue(); + verify(mockListener).onResult2(resolutionResultCaptor.capture()); + InternalConfigSelector configSelector = resolutionResultCaptor.getValue() + .getAttributes().get(InternalConfigSelector.KEY); + Result selectResult = configSelector.selectConfig( + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + Status error = selectResult.getStatus(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY - + ". xDS server returned: UNAVAILABLE: server unreachable"); + assertThat(error.getDescription()).contains(AUTHORITY); + assertThat(error.getDescription()).contains("UNAVAILABLE: server unreachable"); } @Test @@ -541,11 +564,15 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverError(Status.NOT_FOUND.withDescription("server unreachable")); - verify(mockListener).onError(errorCaptor.capture()); - Status error = errorCaptor.getValue(); + verify(mockListener).onResult2(resolutionResultCaptor.capture()); + InternalConfigSelector configSelector = resolutionResultCaptor.getValue() + .getAttributes().get(InternalConfigSelector.KEY); + Result selectResult = configSelector.selectConfig( + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + Status error = selectResult.getStatus(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY - + ". xDS server returned: NOT_FOUND: server unreachable"); + assertThat(error.getDescription()).contains(AUTHORITY); + assertThat(error.getDescription()).contains("NOT_FOUND: server unreachable"); assertThat(error.getCause()).isNull(); } @@ -555,15 +582,15 @@ public class XdsNameResolverTest { FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdateForRdsName(RDS_RESOURCE_NAME); xdsClient.deliverError(Status.UNAVAILABLE.withDescription("server unreachable")); - verify(mockListener, times(2)).onError(errorCaptor.capture()); - Status error = errorCaptor.getAllValues().get(0); + verify(mockListener).onResult2(resolutionResultCaptor.capture()); + InternalConfigSelector configSelector = resolutionResultCaptor.getValue() + .getAttributes().get(InternalConfigSelector.KEY); + Result selectResult = configSelector.selectConfig( + newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT)); + Status error = selectResult.getStatus(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load LDS " + AUTHORITY - + ". xDS server returned: UNAVAILABLE: server unreachable"); - error = errorCaptor.getAllValues().get(1); - assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(error.getDescription()).isEqualTo("Unable to load RDS " + RDS_RESOURCE_NAME - + ". xDS server returned: UNAVAILABLE: server unreachable"); + assertThat(error.getDescription()).contains(RDS_RESOURCE_NAME); + assertThat(error.getDescription()).contains("UNAVAILABLE: server unreachable"); } @SuppressWarnings("unchecked") @@ -581,10 +608,11 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, cluster1); verify(mockListener).onResult2(resolutionResultCaptor.capture()); assertServiceConfigForLoadBalancingConfig( Collections.singletonList(cluster1), @@ -605,10 +633,11 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); - xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); + xdsClient.deliverLdsUpdateOnly(0L, Arrays.asList(virtualHost)); + fakeClock.forwardTime(15, TimeUnit.SECONDS); assertEmptyResolutionResult("random"); } @@ -617,7 +646,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts()); @@ -702,7 +731,7 @@ public class XdsNameResolverTest { true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first")); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); RetryPolicy retryPolicy = RetryPolicy.create( @@ -913,7 +942,7 @@ public class XdsNameResolverTest { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -946,7 +975,7 @@ public class XdsNameResolverTest { public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, - FilterRegistry.getDefaultRegistry(), null, metricRecorder); + FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -977,7 +1006,7 @@ public class XdsNameResolverTest { public void resolved_routeActionNoAutoHostRewrite_doesntEmitCallOptionForTheSame() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, - FilterRegistry.getDefaultRegistry(), null, metricRecorder); + FilterRegistry.getDefaultRegistry(), null, metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -1190,6 +1219,20 @@ public class XdsNameResolverTest { assertCallSelectClusterResult(call1, configSelector, cluster1, 20.0); } + /** Creates and delivers both CDS and EDS updates for the given clusters. */ + private static void createAndDeliverClusterUpdates( + FakeXdsClient xdsClient, String... clusterNames) { + for (String clusterName : clusterNames) { + CdsUpdate.Builder forEds = CdsUpdate + .forEds(clusterName, clusterName, null, null, null, null, false) + .roundRobinLbPolicy(); + xdsClient.deliverCdsUpdate(clusterName, forEds.build()); + EdsUpdate edsUpdate = new EdsUpdate(clusterName, + XdsTestUtils.createMinimalLbEndpointsMap("host"), Collections.emptyList()); + xdsClient.deliverEdsUpdate(clusterName, edsUpdate); + } + } + @Test public void resolved_simpleCallSucceeds_routeToRls() { when(mockRandom.nextInt(anyInt())).thenReturn(90, 10); @@ -1305,6 +1348,7 @@ public class XdsNameResolverTest { // LDS 1. xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); // Verify that StatefulFilter with different filter names result in different Filter instances. @@ -1359,7 +1403,7 @@ public class XdsNameResolverTest { * Verifies the lifecycle of HCM filter instances across RDS updates. * *

Filter instances: - * 1. Must have instantiated by the initial LDS. + * 1. Must have instantiated by the initial LDS/RDS. * 2. Must be reused by all subsequent RDS updates. * 3. Must be not shutdown (closed) by valid RDS updates. */ @@ -1371,22 +1415,19 @@ public class XdsNameResolverTest { // LDS 1. xdsClient.deliverLdsUpdateForRdsNameWithFilters(RDS_RESOURCE_NAME, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); - ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); - // Verify that StatefulFilter with different filter names result in different Filter instances. - assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2); - // Naming: ldsFilter - StatefulFilter lds1Filter1 = lds1Snapshot.get(0); - StatefulFilter lds1Filter2 = lds1Snapshot.get(1); - assertThat(lds1Filter1).isNotSameInstanceAs(lds1Filter2); - // RDS 1. VirtualHost vhost1 = filterStateTestVhost(); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, vhost1); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); // Initial RDS update should not generate Filter instances. ImmutableList rds1Snapshot = statefulFilterProvider.getAllInstances(); - assertWithMessage("RDS 1: Expected Filter instances to be reused across RDS route updates") - .that(rds1Snapshot).isEqualTo(lds1Snapshot); + // Verify that StatefulFilter with different filter names result in different Filter instances. + assertWithMessage("RDS 1: expected to create filter instances").that(rds1Snapshot).hasSize(2); + // Naming: ldsFilter + StatefulFilter lds1Filter1 = rds1Snapshot.get(0); + StatefulFilter lds1Filter2 = rds1Snapshot.get(1); + assertThat(lds1Filter1).isNotSameInstanceAs(lds1Filter2); // RDS 2: exactly the same as RDS 1. xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, vhost1); @@ -1394,7 +1435,7 @@ public class XdsNameResolverTest { ImmutableList rds2Snapshot = statefulFilterProvider.getAllInstances(); // Neither should any subsequent RDS updates. assertWithMessage("RDS 2: Expected Filter instances to be reused across RDS route updates") - .that(rds2Snapshot).isEqualTo(lds1Snapshot); + .that(rds2Snapshot).isEqualTo(rds1Snapshot); // RDS 3: Contains a per-route override for STATEFUL_1. VirtualHost vhost3 = filterStateTestVhost(ImmutableMap.of( @@ -1406,7 +1447,7 @@ public class XdsNameResolverTest { // As with any other Route update, typed_per_filter_config overrides should not result in // creating new filter instances. assertWithMessage("RDS 3: Expected Filter instances to be reused on per-route filter overrides") - .that(rds3Snapshot).isEqualTo(lds1Snapshot); + .that(rds3Snapshot).isEqualTo(rds1Snapshot); } /** @@ -1427,7 +1468,7 @@ public class XdsNameResolverTest { .register(statefulFilterProvider, altStatefulFilterProvider, ROUTER_FILTER_PROVIDER); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, filterRegistry, null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); @@ -1435,6 +1476,7 @@ public class XdsNameResolverTest { // LDS 1. xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); ImmutableList lds1SnapshotAlt = altStatefulFilterProvider.getAllInstances(); @@ -1483,6 +1525,7 @@ public class XdsNameResolverTest { // LDS 1. xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2); @@ -1510,6 +1553,7 @@ public class XdsNameResolverTest { // LDS 1. xdsClient.deliverLdsUpdateWithFilters(vhost, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2); @@ -1526,33 +1570,32 @@ public class XdsNameResolverTest { } /** - * Verifies that filter instances are NOT shutdown on RDS_RESOURCE_NAME not found. + * Verifies that all filter instances are shutdown (closed) on RDS resource not found. */ @Test - public void filterState_shutdown_noShutdownOnRdsNotFound() { + public void filterState_shutdown_onRdsNotFound() { StatefulFilter.Provider statefulFilterProvider = filterStateTestSetupResolver(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); // LDS 1. xdsClient.deliverLdsUpdateForRdsNameWithFilters(RDS_RESOURCE_NAME, filterStateTestConfigs(STATEFUL_1, STATEFUL_2)); - ImmutableList lds1Snapshot = statefulFilterProvider.getAllInstances(); - assertWithMessage("LDS 1: expected to create filter instances").that(lds1Snapshot).hasSize(2); - // Naming: ldsFilter - StatefulFilter lds1Filter1 = lds1Snapshot.get(0); - StatefulFilter lds1Filter2 = lds1Snapshot.get(1); - // RDS 1: Standard vhost with a route. xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, filterStateTestVhost()); + createAndDeliverClusterUpdates(xdsClient, cluster1); assertClusterResolutionResult(call1, cluster1); - assertThat(statefulFilterProvider.getAllInstances()).isEqualTo(lds1Snapshot); + ImmutableList rds1Snapshot = statefulFilterProvider.getAllInstances(); + assertWithMessage("RDS 1: expected to create filter instances").that(rds1Snapshot).hasSize(2); + // Naming: ldsFilter + StatefulFilter lds1Filter1 = rds1Snapshot.get(0); + StatefulFilter lds1Filter2 = rds1Snapshot.get(1); // RDS 2: RDS_RESOURCE_NAME not found. reset(mockListener); xdsClient.deliverRdsResourceNotFound(RDS_RESOURCE_NAME); assertEmptyResolutionResult(RDS_RESOURCE_NAME); - assertThat(lds1Filter1.isShutdown()).isFalse(); - assertThat(lds1Filter2.isShutdown()).isFalse(); + assertThat(lds1Filter1.isShutdown()).isTrue(); + assertThat(lds1Filter2.isShutdown()).isTrue(); } private StatefulFilter.Provider filterStateTestSetupResolver() { @@ -1561,7 +1604,7 @@ public class XdsNameResolverTest { .register(statefulFilterProvider, ROUTER_FILTER_PROVIDER); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, filterRegistry, null, - metricRecorder); + metricRecorder, nameResolverArgs); resolver.start(mockListener); return statefulFilterProvider; } @@ -1762,6 +1805,7 @@ public class XdsNameResolverTest { ImmutableList.of(route1, route2, route3), ImmutableMap.of()); xdsClient.deliverRdsUpdate(RDS_RESOURCE_NAME, Collections.singletonList(virtualHost)); + createAndDeliverClusterUpdates(xdsClient, "cluster-foo", "cluster-bar", "cluster-baz"); verify(mockListener).onResult2(resolutionResultCaptor.capture()); String expectedServiceConfigJson = @@ -2385,6 +2429,8 @@ public class XdsNameResolverTest { private String rdsResource; private ResourceWatcher ldsWatcher; private ResourceWatcher rdsWatcher; + private final Map>> cdsWatchers = new HashMap<>(); + private final Map>> edsWatchers = new HashMap<>(); @Override public BootstrapInfo getBootstrapInfo() { @@ -2412,10 +2458,19 @@ public class XdsNameResolverTest { rdsResource = resourceName; rdsWatcher = (ResourceWatcher) watcher; break; + case "CDS": + cdsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); + break; + case "EDS": + edsWatchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) + .add((ResourceWatcher) watcher); + break; default: } } + @SuppressWarnings("unchecked") @Override public void cancelXdsResourceWatch(XdsResourceType type, String resourceName, @@ -2434,25 +2489,53 @@ public class XdsNameResolverTest { rdsResource = null; rdsWatcher = null; break; + case "CDS": + assertThat(cdsWatchers).containsKey(resourceName); + assertThat(cdsWatchers.get(resourceName)).contains(watcher); + cdsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + break; + case "EDS": + assertThat(edsWatchers).containsKey(resourceName); + assertThat(edsWatchers.get(resourceName)).contains(watcher); + edsWatchers.get(resourceName).remove((ResourceWatcher) watcher); + break; default: } } - void deliverLdsUpdate(long httpMaxStreamDurationNano, List virtualHosts) { + void deliverLdsUpdateOnly(long httpMaxStreamDurationNano, List virtualHosts) { syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( httpMaxStreamDurationNano, virtualHosts, null))); }); } + void deliverLdsUpdate(long httpMaxStreamDurationNano, List virtualHosts) { + List clusterNames = new ArrayList<>(); + for (VirtualHost vh : virtualHosts) { + clusterNames.addAll(getClusterNames(vh.routes())); + } + + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + httpMaxStreamDurationNano, virtualHosts, null))); + createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0])); + }); + } + void deliverLdsUpdate(final List routes) { VirtualHost virtualHost = VirtualHost.create( "virtual-host", Collections.singletonList(expectedLdsResourceName), routes, ImmutableMap.of()); + List clusterNames = getClusterNames(routes); + syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( 0L, Collections.singletonList(virtualHost), null))); + if (!clusterNames.isEmpty()) { + createAndDeliverClusterUpdates(this, clusterNames.toArray(new String[0])); + } }); } @@ -2508,6 +2591,7 @@ public class XdsNameResolverTest { syncContext.execute(() -> { ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( 0L, Collections.singletonList(virtualHost), filterChain))); + createAndDeliverClusterUpdates(this, cluster); }); } @@ -2545,6 +2629,29 @@ public class XdsNameResolverTest { }); } + private List getClusterNames(List routes) { + List clusterNames = new ArrayList<>(); + for (Route r : routes) { + if (r.routeAction() == null) { + continue; + } + String cluster = r.routeAction().cluster(); + if (cluster != null) { + clusterNames.add(cluster); + } else { + List weightedClusters = r.routeAction().weightedClusters(); + if (weightedClusters == null) { + continue; + } + for (ClusterWeight wc : weightedClusters) { + clusterNames.add(wc.name()); + } + } + } + + return clusterNames; + } + void deliverRdsUpdateWithFaultInjection( String resourceName, @Nullable FaultConfig virtualHostFaultConfig, @Nullable FaultConfig routFaultConfig, @Nullable FaultConfig weightedClusterFaultConfig) { @@ -2581,6 +2688,7 @@ public class XdsNameResolverTest { overrideConfig); syncContext.execute(() -> { rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); + createAndDeliverClusterUpdates(this, cluster1); }); } @@ -2606,6 +2714,29 @@ public class XdsNameResolverTest { }); } + private void deliverCdsUpdate(String clusterName, CdsUpdate update) { + if (!cdsWatchers.containsKey(clusterName)) { + return; + } + syncContext.execute(() -> { + List> resourceWatchers = + ImmutableList.copyOf(cdsWatchers.get(clusterName)); + resourceWatchers.forEach(w -> w.onChanged(update)); + }); + } + + private void deliverEdsUpdate(String name, EdsUpdate update) { + syncContext.execute(() -> { + if (!edsWatchers.containsKey(name)) { + return; + } + List> resourceWatchers = + ImmutableList.copyOf(edsWatchers.get(name)); + resourceWatchers.forEach(w -> w.onChanged(update)); + }); + } + + void deliverError(final Status error) { if (ldsWatcher != null) { syncContext.execute(() -> { @@ -2617,6 +2748,11 @@ public class XdsNameResolverTest { rdsWatcher.onError(error); }); } + syncContext.execute(() -> { + cdsWatchers.values().stream() + .flatMap(List::stream) + .forEach(w -> w.onError(error)); + }); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java index d0580ae266..9f90777be3 100644 --- a/xds/src/test/java/io/grpc/xds/XdsTestUtils.java +++ b/xds/src/test/java/io/grpc/xds/XdsTestUtils.java @@ -51,7 +51,6 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.grpc.BindableService; import io.grpc.Context; import io.grpc.Context.CancellationListener; -import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.internal.JsonParser; import io.grpc.stub.StreamObserver; @@ -281,6 +280,16 @@ public class XdsTestUtils { return builder.build(); } + static Map createMinimalLbEndpointsMap(String serverHostName) { + Map lbEndpointsMap = new HashMap<>(); + LbEndpoint lbEndpoint = LbEndpoint.create( + serverHostName, ENDPOINT_PORT, 0, true, ENDPOINT_HOSTNAME, ImmutableMap.of()); + lbEndpointsMap.put( + Locality.create("", "", ""), + LocalityLbEndpoints.create(ImmutableList.of(lbEndpoint), 10, 0, ImmutableMap.of())); + return lbEndpointsMap; + } + @SuppressWarnings("unchecked") private static ImmutableMap getWrrLbConfigAsMap() throws IOException { String lbConfigStr = "{\"wrr_locality_experimental\" : " @@ -353,7 +362,6 @@ public class XdsTestUtils { return Listener.newBuilder() .setName(serverName) .setApiListener(clientListenerBuilder.build()).build(); - } /** @@ -407,18 +415,4 @@ public class XdsTestUtils { responseObserver.onNext(response); } } - - static class StatusMatcher implements ArgumentMatcher { - private final Status expectedStatus; - - StatusMatcher(Status expectedStatus) { - this.expectedStatus = expectedStatus; - } - - @Override - public boolean matches(Status status) { - return status != null && expectedStatus.getCode().equals(status.getCode()) - && expectedStatus.getDescription().equals(status.getDescription()); - } - } }