diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java
index 31b486510e..39945ab074 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClient.java
@@ -330,8 +330,8 @@ abstract class XdsClient {
* Registers a watcher to receive {@link ConfigUpdate} for service with the given hostname and
* port.
*
- *
Unlike watchers for cluster data and endpoint data, at any point of time at most one config
- * watcher is allowed.
+ *
Unlike watchers for cluster data and endpoint data, at most one ConfigWatcher can be
+ * registered. Once it is registered, it cannot be unregistered.
*
* @param hostName the host name part of the "xds:" URI for the server name that the gRPC client
* targets for. Must NOT contain port.
@@ -342,13 +342,6 @@ abstract class XdsClient {
void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
}
- /**
- * Unregisters the existing config watcher. The previously registered config watcher will no
- * longer receive {@link ConfigUpdate}. Noop if no config watcher has been registered.
- */
- void cancelConfigDataWatch() {
- }
-
/**
* Registers a data watcher for the given cluster.
*/
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
new file mode 100644
index 0000000000..c497f4ad9e
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
@@ -0,0 +1,705 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.rpc.Code;
+import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
+import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
+import io.envoyproxy.envoy.api.v2.Listener;
+import io.envoyproxy.envoy.api.v2.RouteConfiguration;
+import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.api.v2.route.Route;
+import io.envoyproxy.envoy.api.v2.route.VirtualHost;
+import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
+import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
+import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Status;
+import io.grpc.SynchronizationContext;
+import io.grpc.SynchronizationContext.ScheduledHandle;
+import io.grpc.alts.GoogleDefaultChannelBuilder;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.stub.StreamObserver;
+import io.grpc.xds.Bootstrapper.ChannelCreds;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+final class XdsClientImpl extends XdsClient {
+ private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName());
+
+ @VisibleForTesting
+ static final String ADS_TYPE_URL_LDS = "type.googleapis.com/envoy.api.v2.Listener";
+ @VisibleForTesting
+ static final String ADS_TYPE_URL_RDS =
+ "type.googleapis.com/envoy.api.v2.RouteConfiguration";
+
+ private final ManagedChannel channel;
+ private final SynchronizationContext syncContext;
+ private final ScheduledExecutorService timeService;
+ private final BackoffPolicy.Provider backoffPolicyProvider;
+ private final Stopwatch stopwatch;
+ // The node identifier to be included in xDS requests. Management server only requires the
+ // first request to carry the node identifier on a stream. It should be identical if present
+ // more than once.
+ private final Node node;
+
+ // Cached data for RDS responses, keyed by RouteConfiguration names.
+ // LDS responses indicate absence of RouteConfigurations and RDS responses indicate presence
+ // of RouteConfigurations.
+ // Optimization: only cache clusterName field in the RouteConfiguration messages of RDS
+ // responses.
+ private final Map routeConfigNamesToClusterNames = new HashMap<>();
+
+ @Nullable
+ private AdsStream adsStream;
+ @Nullable
+ private BackoffPolicy retryBackoffPolicy;
+ @Nullable
+ private ScheduledHandle rpcRetryTimer;
+
+ // Following fields are set only after the ConfigWatcher registered. Once set, they should
+ // never change.
+ @Nullable
+ private ConfigWatcher configWatcher;
+ // The host name portion of "xds:" URI that the gRPC client targets for.
+ @Nullable
+ private String hostName;
+ // The "xds:" URI (including port suffix if present) that the gRPC client targets for.
+ @Nullable
+ private String ldsResourceName;
+
+ XdsClientImpl(
+ // URI of the management server to be connected to.
+ String serverUri,
+ Node node,
+ // List of channel credential configurations for the channel to management server.
+ // Should pick the first supported one.
+ List channelCredsList,
+ SynchronizationContext syncContext,
+ ScheduledExecutorService timeService,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ Stopwatch stopwatch) {
+ this(
+ buildChannel(checkNotNull(serverUri, "serverUri"),
+ checkNotNull(channelCredsList, "channelCredsList")),
+ node,
+ syncContext,
+ timeService,
+ backoffPolicyProvider,
+ stopwatch);
+ }
+
+ @VisibleForTesting
+ XdsClientImpl(
+ ManagedChannel channel,
+ Node node,
+ SynchronizationContext syncContext,
+ ScheduledExecutorService timeService,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ Stopwatch stopwatch) {
+ this.channel = checkNotNull(channel, "channel");
+ this.node = checkNotNull(node, "node");
+ this.syncContext = checkNotNull(syncContext, "syncContext");
+ this.timeService = checkNotNull(timeService, "timeService");
+ this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
+ this.stopwatch = checkNotNull(stopwatch, "stopwatch");
+ }
+
+ @Override
+ void shutdown() {
+ channel.shutdown();
+ if (adsStream != null) {
+ adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
+ }
+ if (rpcRetryTimer != null) {
+ rpcRetryTimer.cancel();
+ }
+ }
+
+ @Override
+ void watchConfigData(String hostName, int port, ConfigWatcher watcher) {
+ checkState(configWatcher == null, "ConfigWatcher is already registered");
+ configWatcher = checkNotNull(watcher, "watcher");
+ this.hostName = checkNotNull(hostName, "hostName");
+ if (port == -1) {
+ ldsResourceName = hostName;
+ } else {
+ ldsResourceName = hostName + ":" + port;
+ }
+ if (rpcRetryTimer != null) {
+ // Currently in retry backoff.
+ return;
+ }
+ if (adsStream == null) {
+ startRpcStream();
+ }
+ adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
+ }
+
+ /**
+ * Builds a channel to the given server URI with the first supported channel creds config.
+ */
+ private static ManagedChannel buildChannel(String serverUri,List channelCredsList) {
+ ManagedChannel ch = null;
+ // Use the first supported channel credentials configuration.
+ // Currently, only "google_default" is supported.
+ for (ChannelCreds creds : channelCredsList) {
+ if (creds.getType().equals("google_default")) {
+ ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build();
+ break;
+ }
+ }
+ if (ch == null) {
+ ch = ManagedChannelBuilder.forTarget(serverUri).build();
+ }
+ return ch;
+ }
+
+ /**
+ * Establishes the RPC connection by creating a new RPC stream on the given channel for
+ * xDS protocol communication.
+ */
+ private void startRpcStream() {
+ checkState(adsStream == null, "Previous adsStream has not been cleared yet");
+ AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
+ AggregatedDiscoveryServiceGrpc.newStub(channel);
+ adsStream = new AdsStream(stub);
+ adsStream.start();
+ stopwatch.reset().start();
+ }
+
+ /**
+ * Handles LDS response to find the HttpConnectionManager message for the requested resource name.
+ * Proceed with the resolved RouteConfiguration in HttpConnectionManager message of the requested
+ * listener, if exists, to find the VirtualHost configuration for the "xds:" URI
+ * (with the port, if any, stripped off). Or sends an RDS request if configured for dynamic
+ * resolution. The response is NACKed if contains invalid data for gRPC's usage. Otherwise, an
+ * ACK request is sent to management server.
+ */
+ private void handleLdsResponse(DiscoveryResponse ldsResponse) {
+ logger.log(Level.FINE, "Received an LDS response: {0}", ldsResponse);
+ checkState(ldsResourceName != null && configWatcher != null,
+ "No LDS request was ever sent. Management server is doing something wrong");
+ adsStream.ldsRespNonce = ldsResponse.getNonce();
+
+ // Unpack Listener messages.
+ List listeners = new ArrayList<>(ldsResponse.getResourcesCount());
+ try {
+ for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) {
+ listeners.add(res.unpack(Listener.class));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
+ ldsResponse.getNonce(), "Broken LDS response.");
+ return;
+ }
+
+ // Unpack HttpConnectionManager messages
+ HttpConnectionManager requestedHttpConnManager = null;
+ List httpConnectionManagers = new ArrayList<>();
+ try {
+ for (Listener listener : listeners) {
+ HttpConnectionManager hm =
+ listener.getApiListener().getApiListener().unpack(HttpConnectionManager.class);
+ httpConnectionManagers.add(hm);
+ if (listener.getName().equals(ldsResourceName)) {
+ requestedHttpConnManager = hm;
+ }
+ }
+ } catch (InvalidProtocolBufferException e) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
+ ldsResponse.getNonce(), "Broken LDS response.");
+ return;
+ }
+
+ String errorMessage = null;
+ // All RouteConfigurations referenced by this LDS response, either in in-lined
+ // RouteConfiguration message or in RDS config.
+ Set routeConfigs = new HashSet<>();
+ for (HttpConnectionManager hm : httpConnectionManagers) {
+ // The HttpConnectionManager message must either provide the RouteConfiguration directly
+ // in-line or tell the client to use RDS to obtain it.
+ if (hm.hasRouteConfig()) {
+ routeConfigs.add(hm.getRouteConfig().getName());
+ } else if (hm.hasRds()) {
+ Rds rds = hm.getRds();
+ if (!rds.getConfigSource().hasAds()) {
+ errorMessage = "For using RDS, it must be set to use ADS.";
+ break;
+ }
+ routeConfigs.add(rds.getRouteConfigName());
+ } else {
+ errorMessage = "HttpConnectionManager message must either provide the "
+ + "RouteConfiguration directly in-line or tell the client to use RDS to obtain it.";
+ break;
+ }
+ }
+
+ // Field clusterName found in the in-lined RouteConfiguration, if exists.
+ String clusterName = null;
+ // RouteConfiguration name to be used as the resource name for RDS request.
+ String rdsRouteConfigName = null;
+ if (errorMessage == null && requestedHttpConnManager != null) {
+ if (requestedHttpConnManager.hasRouteConfig()) {
+ RouteConfiguration rc = requestedHttpConnManager.getRouteConfig();
+ clusterName = processRouteConfig(rc);
+ if (clusterName == null) {
+ errorMessage = "Cannot find a valid cluster name in VirtualHost inside "
+ + "RouteConfiguration with domains matching: " + hostName + ".";
+ }
+ } else if (requestedHttpConnManager.hasRds()) {
+ Rds rds = requestedHttpConnManager.getRds();
+ rdsRouteConfigName = rds.getRouteConfigName();
+ }
+ // Else impossible as we have already validated all HttpConnectionManager messages.
+ }
+
+ if (errorMessage != null) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
+ ldsResponse.getNonce(), errorMessage);
+ return;
+ }
+ adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName),
+ ldsResponse.getVersionInfo(), ldsResponse.getNonce());
+
+ // Remove RDS cache entries for RouteConfigurations not referenced by this LDS response.
+ // LDS responses represents the state of the world, RouteConfigurations not referenced
+ // by this LDS response are those no longer exist.
+ routeConfigNamesToClusterNames.keySet().retainAll(routeConfigs);
+
+ // Process the requested Listener if exists, either extract cluster information from in-lined
+ // RouteConfiguration message or send an RDS request for dynamic resolution.
+ if (clusterName != null) {
+ // Found clusterName in the in-lined RouteConfiguration.
+ ConfigUpdate configUpdate = ConfigUpdate.newBuilder().setClusterName(clusterName).build();
+ configWatcher.onConfigChanged(configUpdate);
+ } else if (rdsRouteConfigName != null) {
+ // Update the RDS resource we wish to request. An RDS request may not be necessary, but
+ // we need to keep what we request updated in case of notifying watcher upon receiving
+ // an RDS response for updating the requested resource.
+ adsStream.rdsResourceName = rdsRouteConfigName;
+ // First look up the RDS cache to see if we had received an RDS response containing the
+ // desired RouteConfiguration previously. Otherwise, send an RDS request for dynamic
+ // resolution.
+ if (routeConfigNamesToClusterNames.containsKey(rdsRouteConfigName)) {
+ ConfigUpdate configUpdate =
+ ConfigUpdate.newBuilder()
+ .setClusterName(routeConfigNamesToClusterNames.get(rdsRouteConfigName))
+ .build();
+ configWatcher.onConfigChanged(configUpdate);
+ } else {
+ adsStream.sendXdsRequest(ADS_TYPE_URL_RDS, ImmutableList.of(rdsRouteConfigName));
+ }
+ } else {
+ // The requested Listener does not exist.
+ configWatcher.onError(
+ Status.NOT_FOUND.withDescription(
+ "Listener for requested resource [" + ldsResourceName + "] does not exist"));
+ }
+ }
+
+ /**
+ * Handles RDS response to find the RouteConfiguration message for the requested resource name.
+ * Proceed with the resolved RouteConfiguration if exists to find the VirtualHost configuration
+ * for the "xds:" URI (with the port, if any, stripped off). The response is NACKed if contains
+ * invalid data for gRPC's usage. Otherwise, an ACK request is sent to management server.
+ */
+ private void handleRdsResponse(DiscoveryResponse rdsResponse) {
+ logger.log(Level.FINE, "Received an RDS response: {0}", rdsResponse);
+ checkState(adsStream.rdsResourceName != null,
+ "Never requested for RDS resources, management server is doing something wrong");
+ adsStream.rdsRespNonce = rdsResponse.getNonce();
+
+ // Unpack RouteConfiguration messages.
+ List routeConfigs = new ArrayList<>(rdsResponse.getResourcesCount());
+ try {
+ for (com.google.protobuf.Any res : rdsResponse.getResourcesList()) {
+ routeConfigs.add(res.unpack(RouteConfiguration.class));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
+ rdsResponse.getNonce(), "Broken RDS response.");
+ return;
+ }
+
+ // Validate and cache information from each RouteConfiguration message.
+ Map clusterNames = new HashMap<>();
+ for (RouteConfiguration routeConfig : routeConfigs) {
+ String clusterName = processRouteConfig(routeConfig);
+ if (clusterName == null) {
+ adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
+ rdsResponse.getNonce(),
+ "Cannot find a valid cluster name in VirtualHost inside "
+ + "RouteConfiguration with domains matching: " + hostName + ".");
+ return;
+ }
+ clusterNames.put(routeConfig.getName(), clusterName);
+ }
+ routeConfigNamesToClusterNames.putAll(clusterNames);
+
+ adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName),
+ rdsResponse.getVersionInfo(), rdsResponse.getNonce());
+
+ // Notify the ConfigWatcher if this RDS response contains the most recently requested
+ // RDS resource.
+ if (clusterNames.containsKey(adsStream.rdsResourceName)) {
+ ConfigUpdate configUpdate =
+ ConfigUpdate.newBuilder()
+ .setClusterName(clusterNames.get(adsStream.rdsResourceName))
+ .build();
+ configWatcher.onConfigChanged(configUpdate);
+ }
+ // Do not notify an error to the ConfigWatcher. RDS protocol is incremental, not receiving
+ // requested RouteConfiguration in this response does not imply absence.
+ }
+
+ /**
+ * Processes RouteConfiguration message (from an resource information in an LDS or RDS
+ * response), which may contain a VirtualHost with domains matching the "xds:"
+ * URI hostname directly in-line. Returns the clusterName found in that VirtualHost
+ * message. Returns {@code null} if such a clusterName cannot be resolved.
+ *
+ * Note we only validate VirtualHosts with domains matching the "xds:" URI hostname.
+ */
+ @Nullable
+ private String processRouteConfig(RouteConfiguration config) {
+ List virtualHosts = config.getVirtualHostsList();
+ int matchingLen = -1; // longest length of wildcard pattern that matches host name
+ VirtualHost targetVirtualHost = null; // target VirtualHost with longest matched domain
+ for (VirtualHost vHost : virtualHosts) {
+ for (String domain : vHost.getDomainsList()) {
+ if (matchHostName(hostName, domain) && domain.length() > matchingLen) {
+ matchingLen = domain.length();
+ targetVirtualHost = vHost;
+ }
+ }
+ }
+
+ // Proceed with the virtual host that has longest wildcard matched domain name with the
+ // hostname in original "xds:" URI.
+ if (targetVirtualHost != null) {
+ // The client will look only at the last route in the list (the default route),
+ // whose match field must be empty and whose route field must be set.
+ List routes = targetVirtualHost.getRoutesList();
+ if (!routes.isEmpty()) {
+ Route route = routes.get(routes.size() - 1);
+ // TODO(chengyuanzhang): check the match field must be empty.
+ if (route.hasRoute()) {
+ return route.getRoute().getCluster();
+ }
+ }
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ final class RpcRetryTask implements Runnable {
+ @Override
+ public void run() {
+ startRpcStream();
+ if (configWatcher != null) {
+ adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
+ }
+ // TODO(chengyuanzhang): send CDS/EDS requests if CDS/EDS watcher presents.
+ }
+ }
+
+ private final class AdsStream implements StreamObserver {
+ private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub;
+
+ private StreamObserver requestWriter;
+ private boolean responseReceived;
+ private boolean closed;
+
+ // Last successfully applied version_info for each resource type. Starts with empty string.
+ // A version_info is used to update management server with client's most recent knowledge of
+ // resources.
+ private String ldsVersion = "";
+ private String rdsVersion = "";
+
+ // Response nonce for the most recently received discovery responses of each resource type.
+ // Client initiated requests start response nonce with empty string.
+ // A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
+ // corresponds to.
+ // A nonce becomes stale following a newer nonce being presented to the client in a
+ // DiscoveryResponse.
+ private String ldsRespNonce = "";
+ private String rdsRespNonce = "";
+
+ // Most recently requested resource name(s) for each resource type. Note the resource_name in
+ // LDS requests will always be "xds:" URI (including port suffix if present).
+ @Nullable
+ private String rdsResourceName;
+
+ private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) {
+ this.stub = checkNotNull(stub, "stub");
+ }
+
+ private void start() {
+ requestWriter = stub.withWaitForReady().streamAggregatedResources(this);
+ }
+
+ @Override
+ public void onNext(final DiscoveryResponse response) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ responseReceived = true;
+ String typeUrl = response.getTypeUrl();
+ if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
+ handleLdsResponse(response);
+ } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
+ handleRdsResponse(response);
+ }
+ // TODO(zdapeng): add CDS/EDS response handles.
+ }
+ });
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ handleStreamClosed(
+ Status.fromThrowable(t).augmentDescription("ADS stream [" + this + "] had an error"));
+ }
+ });
+ }
+
+ @Override
+ public void onCompleted() {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ handleStreamClosed(
+ Status.UNAVAILABLE.withDescription("ADS stream [" + this + "] was closed by server"));
+ }
+ });
+ }
+
+ private void handleStreamClosed(Status error) {
+ logger.log(Level.INFO, error.getDescription(), error.getCause());
+ checkArgument(!error.isOk(), "unexpected OK status");
+ if (closed) {
+ return;
+ }
+ closed = true;
+ cleanUp();
+ if (responseReceived || retryBackoffPolicy == null) {
+ // Reset the backoff sequence if had received a response, or backoff sequence
+ // has never been initialized.
+ retryBackoffPolicy = backoffPolicyProvider.get();
+ }
+ long delayNanos = 0;
+ if (!responseReceived) {
+ delayNanos =
+ Math.max(
+ 0,
+ retryBackoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+ logger.log(Level.FINE, "{0} stream closed, retry in {1} ns", new Object[]{this, delayNanos});
+ rpcRetryTimer =
+ syncContext.schedule(
+ new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
+ }
+
+ private void close(Exception error) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ cleanUp();
+ requestWriter.onError(error);
+ }
+
+ private void cleanUp() {
+ if (adsStream == this) {
+ adsStream = null;
+ }
+ }
+
+ /**
+ * Sends a DiscoveryRequest for the given resource name to management server. Memories the
+ * requested resource name (except for LDS as we always request for the singleton Listener)
+ * as we need it to find resources in responses.
+ */
+ private void sendXdsRequest(String typeUrl, Collection resourceNames) {
+ checkState(requestWriter != null, "ADS stream has not been started");
+ String version = "";
+ String nonce = "";
+ if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
+ version = ldsVersion;
+ nonce = ldsRespNonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
+ checkArgument(resourceNames.size() == 1,
+ "RDS request requesting for more than one resource");
+ version = rdsVersion;
+ nonce = rdsRespNonce;
+ rdsResourceName = resourceNames.iterator().next();
+ }
+ // TODO(chengyuanzhang): cases for CDS/EDS.
+ DiscoveryRequest request =
+ DiscoveryRequest
+ .newBuilder()
+ .setVersionInfo(version)
+ .setNode(node)
+ .addAllResourceNames(resourceNames)
+ .setTypeUrl(typeUrl)
+ .setResponseNonce(nonce)
+ .build();
+ requestWriter.onNext(request);
+ }
+
+ /**
+ * Sends a DiscoveryRequest with the given information as an ACK. Updates the latest accepted
+ * version for the corresponding resource type.
+ */
+ private void sendAckRequest(String typeUrl, Collection resourceNames,
+ String versionInfo, String nonce) {
+ checkState(requestWriter != null, "ADS stream has not been started");
+ if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
+ ldsVersion = versionInfo;
+ ldsRespNonce = nonce;
+ } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
+ rdsVersion = versionInfo;
+ rdsRespNonce = nonce;
+ }
+ // TODO(chengyuanzhang): cases for CDS/EDS.
+ DiscoveryRequest request =
+ DiscoveryRequest
+ .newBuilder()
+ .setVersionInfo(versionInfo)
+ .setNode(node)
+ .addAllResourceNames(resourceNames)
+ .setTypeUrl(typeUrl)
+ .setResponseNonce(nonce)
+ .build();
+ requestWriter.onNext(request);
+ }
+
+ /**
+ * Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous
+ * accepted version.
+ */
+ private void sendNackRequest(String typeUrl, Collection resourceNames, String nonce,
+ String message) {
+ checkState(requestWriter != null, "ADS stream has not been started");
+ String versionInfo = "";
+ if (typeUrl.equals(ADS_TYPE_URL_LDS)) {
+ versionInfo = ldsVersion;
+ } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
+ versionInfo = rdsVersion;
+ }
+ // TODO(chengyuanzhang): cases for CDS/EDS.
+ DiscoveryRequest request =
+ DiscoveryRequest
+ .newBuilder()
+ .setVersionInfo(versionInfo)
+ .setNode(node)
+ .addAllResourceNames(resourceNames)
+ .setTypeUrl(typeUrl)
+ .setResponseNonce(nonce)
+ .setErrorDetail(
+ com.google.rpc.Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT_VALUE)
+ .setMessage(message))
+ .build();
+ requestWriter.onNext(request);
+ }
+ }
+
+ /**
+ * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
+ * case-insensitive.
+ *
+ * Wildcard pattern rules:
+ *
+ * - A single asterisk (*) matches any domain.
+ * - Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
+ * but not both.
+ *
+ */
+ @VisibleForTesting
+ static boolean matchHostName(String hostName, String pattern) {
+ checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
+ "Invalid host name");
+ checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
+ "Invalid pattern/domain name");
+
+ hostName = hostName.toLowerCase(Locale.US);
+ pattern = pattern.toLowerCase(Locale.US);
+ // hostName and pattern are now in lower case -- domain names are case-insensitive.
+
+ if (!pattern.contains("*")) {
+ // Not a wildcard pattern -- hostName and pattern must match exactly.
+ return hostName.equals(pattern);
+ }
+ // Wildcard pattern
+
+ if (pattern.length() == 1) {
+ return true;
+ }
+
+ int index = pattern.indexOf('*');
+
+ // At most one asterisk (*) is allowed.
+ if (pattern.indexOf('*', index + 1) != -1) {
+ return false;
+ }
+
+ // Asterisk can only match prefix or suffix.
+ if (index != 0 && index != pattern.length() - 1) {
+ return false;
+ }
+
+ // HostName must be at least as long as the pattern because asterisk has to
+ // match one or more characters.
+ if (hostName.length() < pattern.length()) {
+ return false;
+ }
+
+ if (index == 0 && hostName.endsWith(pattern.substring(1))) {
+ // Prefix matching fails.
+ return true;
+ }
+
+ // Pattern matches hostname if suffix matching succeeds.
+ return index == pattern.length() - 1
+ && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
+ }
+}
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
new file mode 100644
index 0000000000..1c6ab7a44a
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
@@ -0,0 +1,1385 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.Any;
+import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
+import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
+import io.envoyproxy.envoy.api.v2.Listener;
+import io.envoyproxy.envoy.api.v2.RouteConfiguration;
+import io.envoyproxy.envoy.api.v2.core.Address;
+import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
+import io.envoyproxy.envoy.api.v2.core.ConfigSource;
+import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.api.v2.listener.FilterChain;
+import io.envoyproxy.envoy.api.v2.route.RedirectAction;
+import io.envoyproxy.envoy.api.v2.route.Route;
+import io.envoyproxy.envoy.api.v2.route.RouteAction;
+import io.envoyproxy.envoy.api.v2.route.VirtualHost;
+import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
+import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
+import io.envoyproxy.envoy.config.listener.v2.ApiListener;
+import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
+import io.grpc.Context;
+import io.grpc.Context.CancellationListener;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.FakeClock;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.xds.XdsClient.ConfigUpdate;
+import io.grpc.xds.XdsClient.ConfigWatcher;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link XdsClientImpl}.
+ */
+public class XdsClientImplTest {
+
+ private static final String HOSTNAME = "foo.googleapis.com";
+ private static final int PORT = 8080;
+
+ private static final Node NODE = Node.getDefaultInstance();
+ private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER =
+ new FakeClock.TaskFilter() {
+ @Override
+ public boolean shouldAccept(Runnable command) {
+ return command.toString().contains(XdsClientImpl.RpcRetryTask.class.getSimpleName());
+ }
+ };
+
+ @Rule
+ public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
+
+ private final SynchronizationContext syncContext = new SynchronizationContext(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new AssertionError(e);
+ }
+ });
+ private final FakeClock fakeClock = new FakeClock();
+
+ private final Queue> responseObservers = new ArrayDeque<>();
+ private final Queue> requestObservers = new ArrayDeque<>();
+ private final AtomicBoolean callEnded = new AtomicBoolean(true);
+
+ @Mock
+ private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
+ @Mock
+ private BackoffPolicy.Provider backoffPolicyProvider;
+ @Mock
+ private BackoffPolicy backoffPolicy1;
+ @Mock
+ private BackoffPolicy backoffPolicy2;
+ @Mock
+ private ConfigWatcher configWatcher;
+
+ private ManagedChannel channel;
+ private XdsClientImpl xdsClient;
+
+ @Before
+ public void setUp() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
+ when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
+ when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
+
+ String serverName = InProcessServerBuilder.generateName();
+ AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() {
+ @Override
+ public StreamObserver streamAggregatedResources(
+ final StreamObserver responseObserver) {
+ assertThat(callEnded.get()).isTrue(); // ensure previous call was ended
+ callEnded.set(false);
+ Context.current().addListener(
+ new CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ callEnded.set(true);
+ }
+ }, MoreExecutors.directExecutor());
+ responseObservers.offer(responseObserver);
+ @SuppressWarnings("unchecked")
+ StreamObserver requestObserver = mock(StreamObserver.class);
+ requestObservers.offer(requestObserver);
+ return requestObserver;
+ }
+ };
+ mockedDiscoveryService =
+ mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(serviceImpl));
+
+ cleanupRule.register(
+ InProcessServerBuilder
+ .forName(serverName)
+ .addService(mockedDiscoveryService)
+ .directExecutor()
+ .build()
+ .start());
+ channel =
+ cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
+ xdsClient =
+ new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(),
+ backoffPolicyProvider, fakeClock.getStopwatchSupplier().get());
+ // Only the connection to management server is established, no RPC request is sent until at
+ // least one watcher is registered.
+ assertThat(responseObservers).isEmpty();
+ assertThat(requestObservers).isEmpty();
+ }
+
+ @After
+ public void tearDown() {
+ xdsClient.shutdown();
+ assertThat(callEnded.get()).isTrue();
+ assertThat(channel.isShutdown()).isTrue();
+ assertThat(fakeClock.getPendingTasks()).isEmpty();
+ }
+
+ // Always test the real workflow and integrity of XdsClient: RDS protocol should always followed
+ // after at least one LDS request-response, from which the RDS resource name comes. CDS and EDS
+ // can be tested separately as they are used in a standalone way.
+
+ // Discovery responses should follow management server spec and xDS protocol. See
+ // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol.
+
+ /**
+ * Client receives an LDS response that does not contain a Listener for the requested resource.
+ * The LDS response is ACKed.
+ * The config watcher is notified with an error.
+ */
+ @Test
+ public void ldsResponseWithoutMatchingResource() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("bar.googleapis.com",
+ Any.pack(HttpConnectionManager.newBuilder()
+ .setRouteConfig(
+ buildRouteConfiguration("route-bar.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(
+ ImmutableList.of("bar.googleapis.com"),
+ "cluster-bar.googleapis.com"))))
+ .build()))),
+ Any.pack(buildListener("baz.googleapis.com",
+ Any.pack(HttpConnectionManager.newBuilder()
+ .setRouteConfig(
+ buildRouteConfiguration("route-baz.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(
+ ImmutableList.of("baz.googleapis.com"),
+ "cluster-baz.googleapis.com"))))
+ .build()))));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onError(errorStatusCaptor.capture());
+ Status error = errorStatusCaptor.getValue();
+ assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
+ assertThat(error.getDescription())
+ .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist");
+
+ verifyNoMoreInteractions(requestObserver);
+ }
+
+ /**
+ * An LDS response contains the requested listener and an in-lined RouteConfiguration message for
+ * that listener. But the RouteConfiguration message is invalid as it does not contain any
+ * VirtualHost with domains matching the requested hostname.
+ * The LDS response is NACKed, as if the XdsClient has not received this response.
+ * The config watcher is NOT notified with an error.
+ */
+ @Test
+ public void failToFindVirtualHostInLdsResponseInLineRouteConfig() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ RouteConfiguration routeConfig =
+ buildRouteConfiguration(
+ "route.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster"),
+ buildVirtualHost(ImmutableList.of("something else does not match"),
+ "some other cluster")));
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build()))));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an NACK LDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(new DiscoveryRequestMatcher("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class));
+ verify(configWatcher, never()).onError(any(Status.class));
+ verifyNoMoreInteractions(requestObserver);
+ }
+
+ /**
+ * Client resolves the virtual host config from an LDS response that contains a
+ * RouteConfiguration message directly in-line for the requested resource. No RDS is needed.
+ * The LDS response is ACKed.
+ * The config watcher is notified with an update.
+ */
+ @Test
+ public void resolveVirtualHostInLdsResponse() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("bar.googleapis.com",
+ Any.pack(HttpConnectionManager.newBuilder()
+ .setRouteConfig(
+ buildRouteConfiguration("route-bar.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(
+ ImmutableList.of("bar.googleapis.com"),
+ "cluster-bar.googleapis.com"))))
+ .build()))),
+ Any.pack(buildListener("baz.googleapis.com",
+ Any.pack(HttpConnectionManager.newBuilder()
+ .setRouteConfig(
+ buildRouteConfiguration("route-baz.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(
+ ImmutableList.of("baz.googleapis.com"),
+ "cluster-baz.googleapis.com"))))
+ .build()))),
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(
+ HttpConnectionManager.newBuilder()
+ .setRouteConfig(
+ buildRouteConfiguration("route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(
+ ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "cluster.googleapis.com"),
+ buildVirtualHost(
+ ImmutableList.of("something does not match"),
+ "some cluster"))))
+ .build()))));
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com");
+
+ verifyNoMoreInteractions(requestObserver);
+ }
+
+ /**
+ * Client receives an RDS response (after a previous LDS request-response) that does not contain a
+ * RouteConfiguration for the requested resource while each received RouteConfiguration is valid.
+ * The RDS response is ACKed.
+ * The config watcher is NOT notified with an error (RDS protocol is incremental, responses
+ * not containing requested resources does not indicate absence).
+ */
+ @Test
+ public void rdsResponseWithoutMatchingResource() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sends an (first) RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server should only sends RouteConfiguration messages with at least one
+ // VirtualHost with domains matching requested hostname. Otherwise, it is invalid data.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "some resource name does not match route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "whatever cluster")))),
+ Any.pack(
+ buildRouteConfiguration(
+ "some other resource name does not match route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "some more whatever cluster")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class));
+ verify(configWatcher, never()).onError(any(Status.class));
+ }
+
+ /**
+ * Client resolves the virtual host config from an RDS response for the requested resource. The
+ * RDS response is ACKed.
+ * The config watcher is notified with an update.
+ */
+ @Test
+ public void resolveVirtualHostInRdsResponse() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+
+ // Management server should only sends RouteConfiguration messages with at least one
+ // VirtualHost with domains matching requested hostname. Otherwise, it is invalid data.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster"),
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "cluster.googleapis.com")))), // matching virtual host
+ Any.pack(
+ buildRouteConfiguration(
+ "some resource name does not match route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "some more cluster")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com");
+ }
+
+ /**
+ * Client receives an RDS response (after a previous LDS request-response) containing a
+ * RouteConfiguration message for the requested resource. But the RouteConfiguration message
+ * is invalid as it does not contain any VirtualHost with domains matching the requested
+ * hostname.
+ * The LDS response is NACKed, as if the XdsClient has not received this response.
+ * The config watcher is NOT notified with an error.
+ */
+ @Test
+ public void failToFindVirtualHostInRdsResponse() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster"),
+ buildVirtualHost(
+ ImmutableList.of("something else does not match", "also does not match"),
+ "cluster.googleapis.com")))),
+ Any.pack(
+ buildRouteConfiguration(
+ "some resource name does not match route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("one more does not match"),
+ "some more cluster")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an NACK RDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(new DiscoveryRequestMatcher("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class));
+ verify(configWatcher, never()).onError(any(Status.class));
+ }
+
+ /**
+ * Client receives an RDS response (after a previous LDS request-response) containing a
+ * RouteConfiguration message for the requested resource. But the RouteConfiguration message
+ * is invalid as the VirtualHost with domains matching the requested hostname contains invalid
+ * data, its RouteAction message is absent.
+ * The LDS response is NACKed, as if the XdsClient has not received this response.
+ * The config watcher is NOT notified with an error.
+ */
+ @Test
+ public void matchingVirtualHostDoesNotContainRouteAction() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+
+ // A VirtualHost with a Route that contains only redirect configuration.
+ VirtualHost virtualHost =
+ VirtualHost.newBuilder()
+ .setName("virtualhost00.googleapis.com") // don't care
+ .addDomains("foo.googleapis.com")
+ .addRoutes(
+ Route.newBuilder()
+ .setRedirect(
+ RedirectAction.newBuilder()
+ .setHostRedirect("bar.googleapis.com")
+ .setPortRedirect(443)))
+ .build();
+
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration("route-foo.googleapis.com",
+ ImmutableList.of(virtualHost))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an NACK RDS request.
+ verify(requestObserver)
+ .onNext(
+ argThat(new DiscoveryRequestMatcher("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class));
+ verify(configWatcher, never()).onError(any(Status.class));
+ }
+
+ /**
+ * Client receives LDS/RDS responses for updating resources previously received.
+ *
+ * Tests for streaming behavior.
+ */
+ @Test
+ public void notifyUpdatedResources() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server sends back an LDS response containing a RouteConfiguration for the
+ // requested Listener directly in-line.
+ RouteConfiguration routeConfig =
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "cluster.googleapis.com"),
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster")));
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Cluster name is resolved and notified to config watcher.
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com");
+
+ // Management sends back another LDS response containing updates for the requested Listener.
+ routeConfig =
+ buildRouteConfiguration(
+ "another-route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "another-cluster.googleapis.com"),
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster")));
+
+ listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build())))
+ );
+ response =
+ buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0001")));
+
+ // Updated cluster name is notified to config watcher.
+ configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher, times(2)).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName())
+ .isEqualTo("another-cluster.googleapis.com");
+
+ // Management server sends back another LDS response containing updates for the requested
+ // Listener and telling client to do RDS.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("some-route-to-foo.googleapis.com")
+ .build();
+
+ listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ response =
+ buildDiscoveryResponse("2", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0002");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("2", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0002")));
+
+ // Client sends an (first) RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "some-route-to-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server sends back an RDS response containing the RouteConfiguration
+ // for the requested resource.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "some-route-to-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster"),
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "some-other-cluster.googleapis.com")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "some-route-to-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ // Updated cluster name is notified to config watcher again.
+ configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher, times(3)).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName())
+ .isEqualTo("some-other-cluster.googleapis.com");
+
+ // Management server sends back another RDS response containing updated information for the
+ // RouteConfiguration currently in-use by client.
+ routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "some-route-to-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "an-updated-cluster.googleapis.com")))));
+ response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "some-route-to-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0001")));
+
+ // Updated cluster name is notified to config watcher again.
+ configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher, times(4)).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName())
+ .isEqualTo("an-updated-cluster.googleapis.com");
+ }
+
+ // TODO(chengyuanzhang): tests for timeout waiting for responses for incremental
+ // protocols (RDS/EDS).
+
+ /**
+ * Client receives multiple RDS responses without RouteConfiguration for the requested
+ * resource. It should continue waiting until such an RDS response arrives, as RDS
+ * protocol is incremental.
+ *
+ * Tests for RDS incremental protocol behavior.
+ */
+ @Test
+ public void waitRdsResponsesForRequestedResource() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management sends back an LDS response telling client to do RDS.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sends an (first) RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server sends back an RDS response that does not contain RouteConfiguration
+ // for the requested resource.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "some resource name does not match route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "some more cluster")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ // Client waits for future RDS responses silently.
+ verifyNoMoreInteractions(configWatcher);
+
+ // Management server sends back another RDS response containing the RouteConfiguration
+ // for the requested resource.
+ routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("something does not match"),
+ "some cluster"),
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"),
+ "another-cluster.googleapis.com")))));
+ response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0001")));
+
+ // Updated cluster name is notified to config watcher.
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName())
+ .isEqualTo("another-cluster.googleapis.com");
+ }
+
+ /**
+ * Client receives RDS responses containing RouteConfigurations for resources that were
+ * not requested (management server sends them proactively). Later client receives an LDS
+ * response with the requested Listener containing Rds config pointing to do RDS for one of
+ * the previously received RouteConfigurations. No RDS request needs to be sent for that
+ * RouteConfiguration as it can be found in local cache (management server will not send
+ * RDS responses for that RouteConfiguration again). A future RDS response update for
+ * that RouteConfiguration should be notified to config watcher.
+ *
+ * Tests for caching RDS response data behavior.
+ */
+ @Test
+ public void receiveRdsResponsesForRouteConfigurationsToBeUsedLater() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management sends back an LDS response telling client to do RDS.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo1.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sends an (first) RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo1.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server sends back an RDS response containing RouteConfigurations
+ // more than requested.
+ List routeConfigs = ImmutableList.of(
+ // Currently wanted resource.
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo1.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster1.googleapis.com")))),
+ // Resources currently not wanted.
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo2.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster2.googleapis.com")))),
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo3.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster3.googleapis.com")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo1.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ // Resolved cluster name is notified to config watcher.
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster1.googleapis.com");
+
+ // Management server sends back another LDS response containing updates for the requested
+ // Listener and telling client to do RDS for a RouteConfiguration which had previously
+ // sent to client.
+ rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo2.googleapis.com")
+ .build();
+
+ listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ response = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0001")));
+
+ // Updated cluster name is notified to config watcher.
+ configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher, times(2)).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster2.googleapis.com");
+
+ // At this time, no RDS request is sent as the result can be found in local cache (even if
+ // a request is sent for it, management server does not necessarily reply).
+ verify(requestObserver, times(0))
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ verifyNoMoreInteractions(requestObserver);
+
+ // Management server sends back another RDS response containing updates for the
+ // RouteConfiguration that the client was pointed to most recently (i.e.,
+ // "route-foo2.googleapis.com").
+ routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo2.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "a-new-cluster.googleapis.com")))));
+ response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "route-foo2.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0001")));
+
+ // Updated cluster name is notified to config watcher.
+ configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher, times(3)).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName())
+ .isEqualTo("a-new-cluster.googleapis.com");
+ }
+
+ /**
+ * An RouteConfiguration is removed by server by sending client an LDS response removing the
+ * corresponding Listener.
+ */
+ @Test
+ public void routeConfigurationRemovedNotifiedToWatcher() {
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+ StreamObserver responseObserver = responseObservers.poll();
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management sends back an LDS response telling client to do RDS.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ // Must set to use ADS.
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse response =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sends an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sends an (first) RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server sends back an RDS response containing RouteConfiguration requested.
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster.googleapis.com")))));
+ response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK RDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "0000")));
+
+ // Resolved cluster name is notified to config watcher.
+ ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onConfigChanged(configUpdateCaptor.capture());
+ assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com");
+
+ // Management server sends back another LDS response with the previous Listener (currently
+ // in-use by client) removed as the RouteConfiguration it references to is absent.
+ response =
+ buildDiscoveryResponse("1", ImmutableList.of(), // empty
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0001");
+ responseObserver.onNext(response);
+
+ // Client sent an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0001")));
+
+ // Notify config watcher with an error.
+ ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null);
+ verify(configWatcher).onError(errorStatusCaptor.capture());
+ Status error = errorStatusCaptor.getValue();
+ assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
+ assertThat(error.getDescription())
+ .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist");
+ }
+
+ /**
+ * RPC stream closed and retry during the period of first tiem resolving service config
+ * (LDS/RDS only).
+ */
+ @Test
+ public void streamClosedAndRetryWhenResolvingConfig() {
+ InOrder inOrder =
+ Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+ backoffPolicy2);
+ xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
+
+ ArgumentCaptor> responseObserverCaptor =
+ ArgumentCaptor.forClass(null);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ StreamObserver responseObserver =
+ responseObserverCaptor.getValue(); // same as responseObservers.poll()
+ StreamObserver requestObserver = requestObservers.poll();
+
+ // Client sends an LDS request for the host name (with port) to management server.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server closes the RPC stream immediately.
+ responseObserver.onCompleted();
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(9L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ // Client retried by sending an LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server closes the RPC stream with an error.
+ responseObserver.onError(Status.UNAVAILABLE.asException());
+ verifyNoMoreInteractions(backoffPolicyProvider);
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(99L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+
+ // Client retried again by sending an LDS.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server responses with a listener for the requested resource.
+ Rds rdsConfig =
+ Rds.newBuilder()
+ .setConfigSource(
+ ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
+ .setRouteConfigName("route-foo.googleapis.com")
+ .build();
+
+ List listeners = ImmutableList.of(
+ Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
+ Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
+ );
+ DiscoveryResponse ldsResponse =
+ buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
+ responseObserver.onNext(ldsResponse);
+
+ // Client sent back an ACK LDS request.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "0000")));
+
+ // Client sent an RDS request based on the received listener.
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com",
+ XdsClientImpl.ADS_TYPE_URL_RDS, "")));
+
+ // Management server encounters an error and closes the stream.
+ responseObserver.onError(Status.UNKNOWN.asException());
+
+ // Reset backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // RPC stream closed immediately
+ responseObserver.onError(Status.UNKNOWN.asException());
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+ assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+ // Retry after backoff.
+ fakeClock.forwardNanos(19L);
+ assertThat(requestObservers).isEmpty();
+ fakeClock.forwardNanos(1L);
+ inOrder.verify(mockedDiscoveryService)
+ .streamAggregatedResources(responseObserverCaptor.capture());
+ responseObserver = responseObserverCaptor.getValue();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ // Management server sends an LDS response.
+ responseObserver.onNext(ldsResponse);
+
+ // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted)
+
+ List routeConfigs = ImmutableList.of(
+ Any.pack(
+ buildRouteConfiguration(
+ "route-foo.googleapis.com",
+ ImmutableList.of(
+ buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
+ "cluster.googleapis.com")))));
+ DiscoveryResponse rdsResponse =
+ buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
+ // Management server sends an RDS response.
+ responseObserver.onNext(rdsResponse);
+
+ // Client has resolved the cluster based on the RDS response.
+ configWatcher
+ .onConfigChanged(
+ eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build()));
+
+ // RPC stream closed with an error again.
+ responseObserver.onError(Status.UNKNOWN.asException());
+
+ // Reset backoff and retry immediately.
+ inOrder.verify(backoffPolicyProvider).get();
+ fakeClock.runDueTasks();
+ requestObserver = requestObservers.poll();
+ verify(requestObserver)
+ .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
+ XdsClientImpl.ADS_TYPE_URL_LDS, "")));
+
+ verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+ }
+
+ // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only
+ // for ClusterWatchers and EndpointWatchers.
+
+ @Test
+ public void matchHostName_exactlyMatch() {
+ String pattern = "foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
+ }
+
+ @Test
+ public void matchHostName_prefixWildcard() {
+ String pattern = "*.foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue();
+ pattern = "*-bar.foo.googleapis.com";
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern))
+ .isTrue();
+ }
+
+ @Test
+ public void matchHostName_postfixWildCard() {
+ String pattern = "foo.*";
+ assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue();
+ pattern = "foo-*";
+ assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse();
+ assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue();
+ assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue();
+ }
+
+ private static DiscoveryResponse buildDiscoveryResponse(String versionInfo,
+ List resources, String typeUrl, String nonce) {
+ return
+ DiscoveryResponse.newBuilder()
+ .setVersionInfo(versionInfo)
+ .setTypeUrl(typeUrl)
+ .addAllResources(resources)
+ .setNonce(nonce)
+ .build();
+ }
+
+ private static DiscoveryRequest buildDiscoveryRequest(String versionInfo,
+ String resourceName, String typeUrl, String nonce) {
+ return
+ DiscoveryRequest.newBuilder()
+ .setVersionInfo(versionInfo)
+ .setNode(NODE)
+ .setTypeUrl(typeUrl)
+ .addResourceNames(resourceName)
+ .setResponseNonce(nonce)
+ .build();
+ }
+
+ private static Listener buildListener(String name, com.google.protobuf.Any apiListener) {
+ return
+ Listener.newBuilder()
+ .setName(name)
+ .setAddress(Address.getDefaultInstance())
+ .addFilterChains(FilterChain.getDefaultInstance())
+ .setApiListener(ApiListener.newBuilder().setApiListener(apiListener))
+ .build();
+ }
+
+ private static RouteConfiguration buildRouteConfiguration(String name,
+ List virtualHosts) {
+ return
+ RouteConfiguration.newBuilder()
+ .setName(name)
+ .addAllVirtualHosts(virtualHosts)
+ .build();
+ }
+
+ private static VirtualHost buildVirtualHost(List domains, String clusterName) {
+ return
+ VirtualHost.newBuilder()
+ .setName("virtualhost00.googleapis.com") // don't care
+ .addAllDomains(domains)
+ .addRoutes(Route.newBuilder()
+ .setRoute(RouteAction.newBuilder().setCluster("whatever cluster")))
+ .addRoutes(
+ // Only the last (default) route matters.
+ Route.newBuilder()
+ .setRoute(RouteAction.newBuilder().setCluster(clusterName)))
+ .build();
+ }
+
+ /**
+ * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for
+ * management server debugging purposes.
+ *
+ * In general, if you are sure error_details field should not be set in a DiscoveryRequest,
+ * compare with message equality. Otherwise, this matcher is handy for comparing other fields
+ * only.
+ */
+ private static class DiscoveryRequestMatcher implements ArgumentMatcher {
+ private final String versionInfo;
+ private final String typeUrl;
+ private final List resourceNames;
+ private final String responseNonce;
+
+ private DiscoveryRequestMatcher(String versionInfo, String resourceName, String typeUrl,
+ String responseNonce) {
+ this(versionInfo, ImmutableList.of(resourceName), typeUrl, responseNonce);
+ }
+
+ private DiscoveryRequestMatcher(String versionInfo, List resourceNames, String typeUrl,
+ String responseNonce) {
+ this.versionInfo = versionInfo;
+ this.resourceNames = resourceNames;
+ this.typeUrl = typeUrl;
+ this.responseNonce = responseNonce;
+ }
+
+ @Override
+ public boolean matches(DiscoveryRequest argument) {
+ if (!typeUrl.equals(argument.getTypeUrl())) {
+ return false;
+ }
+ if (!versionInfo.equals(argument.getVersionInfo())) {
+ return false;
+ }
+ if (!responseNonce.equals(argument.getResponseNonce())) {
+ return false;
+ }
+ if (!resourceNames.equals(argument.getResourceNamesList())) {
+ return false;
+ }
+ return NODE.equals(argument.getNode());
+ }
+ }
+}