From c2d68039ba818b15c5882d0c1e4c9710fbab9c23 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 20 Nov 2019 00:43:08 -0800 Subject: [PATCH] xds: implement LDS/RDS over ADS in XdsClient (#6320) Implement envoy's LDS and RDS protocols in XdsClient. gRPC has a special purpose xDS resolver to instantiate an XdsClient object and add a watcher to initiate xDS communication with management server to retrieve information used in service config, which configures gRPC's load balancing plugin. --- xds/src/main/java/io/grpc/xds/XdsClient.java | 11 +- .../main/java/io/grpc/xds/XdsClientImpl.java | 705 +++++++++ .../java/io/grpc/xds/XdsClientImplTest.java | 1385 +++++++++++++++++ 3 files changed, 2092 insertions(+), 9 deletions(-) create mode 100644 xds/src/main/java/io/grpc/xds/XdsClientImpl.java create mode 100644 xds/src/test/java/io/grpc/xds/XdsClientImplTest.java diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 31b486510e..39945ab074 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -330,8 +330,8 @@ abstract class XdsClient { * Registers a watcher to receive {@link ConfigUpdate} for service with the given hostname and * port. * - *

Unlike watchers for cluster data and endpoint data, at any point of time at most one config - * watcher is allowed. + *

Unlike watchers for cluster data and endpoint data, at most one ConfigWatcher can be + * registered. Once it is registered, it cannot be unregistered. * * @param hostName the host name part of the "xds:" URI for the server name that the gRPC client * targets for. Must NOT contain port. @@ -342,13 +342,6 @@ abstract class XdsClient { void watchConfigData(String hostName, int port, ConfigWatcher watcher) { } - /** - * Unregisters the existing config watcher. The previously registered config watcher will no - * longer receive {@link ConfigUpdate}. Noop if no config watcher has been registered. - */ - void cancelConfigDataWatch() { - } - /** * Registers a data watcher for the given cluster. */ diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java new file mode 100644 index 0000000000..c497f4ad9e --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -0,0 +1,705 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.rpc.Code; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.RouteConfiguration; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.route.Route; +import io.envoyproxy.envoy.api.v2.route.VirtualHost; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.alts.GoogleDefaultChannelBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.Bootstrapper.ChannelCreds; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +final class XdsClientImpl extends XdsClient { + private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName()); + + @VisibleForTesting + static final String ADS_TYPE_URL_LDS = "type.googleapis.com/envoy.api.v2.Listener"; + @VisibleForTesting + static final String ADS_TYPE_URL_RDS = + "type.googleapis.com/envoy.api.v2.RouteConfiguration"; + + private final ManagedChannel channel; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Stopwatch stopwatch; + // The node identifier to be included in xDS requests. Management server only requires the + // first request to carry the node identifier on a stream. It should be identical if present + // more than once. + private final Node node; + + // Cached data for RDS responses, keyed by RouteConfiguration names. + // LDS responses indicate absence of RouteConfigurations and RDS responses indicate presence + // of RouteConfigurations. + // Optimization: only cache clusterName field in the RouteConfiguration messages of RDS + // responses. + private final Map routeConfigNamesToClusterNames = new HashMap<>(); + + @Nullable + private AdsStream adsStream; + @Nullable + private BackoffPolicy retryBackoffPolicy; + @Nullable + private ScheduledHandle rpcRetryTimer; + + // Following fields are set only after the ConfigWatcher registered. Once set, they should + // never change. + @Nullable + private ConfigWatcher configWatcher; + // The host name portion of "xds:" URI that the gRPC client targets for. + @Nullable + private String hostName; + // The "xds:" URI (including port suffix if present) that the gRPC client targets for. + @Nullable + private String ldsResourceName; + + XdsClientImpl( + // URI of the management server to be connected to. + String serverUri, + Node node, + // List of channel credential configurations for the channel to management server. + // Should pick the first supported one. + List channelCredsList, + SynchronizationContext syncContext, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Stopwatch stopwatch) { + this( + buildChannel(checkNotNull(serverUri, "serverUri"), + checkNotNull(channelCredsList, "channelCredsList")), + node, + syncContext, + timeService, + backoffPolicyProvider, + stopwatch); + } + + @VisibleForTesting + XdsClientImpl( + ManagedChannel channel, + Node node, + SynchronizationContext syncContext, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Stopwatch stopwatch) { + this.channel = checkNotNull(channel, "channel"); + this.node = checkNotNull(node, "node"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.timeService = checkNotNull(timeService, "timeService"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.stopwatch = checkNotNull(stopwatch, "stopwatch"); + } + + @Override + void shutdown() { + channel.shutdown(); + if (adsStream != null) { + adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); + } + if (rpcRetryTimer != null) { + rpcRetryTimer.cancel(); + } + } + + @Override + void watchConfigData(String hostName, int port, ConfigWatcher watcher) { + checkState(configWatcher == null, "ConfigWatcher is already registered"); + configWatcher = checkNotNull(watcher, "watcher"); + this.hostName = checkNotNull(hostName, "hostName"); + if (port == -1) { + ldsResourceName = hostName; + } else { + ldsResourceName = hostName + ":" + port; + } + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + if (adsStream == null) { + startRpcStream(); + } + adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); + } + + /** + * Builds a channel to the given server URI with the first supported channel creds config. + */ + private static ManagedChannel buildChannel(String serverUri,List channelCredsList) { + ManagedChannel ch = null; + // Use the first supported channel credentials configuration. + // Currently, only "google_default" is supported. + for (ChannelCreds creds : channelCredsList) { + if (creds.getType().equals("google_default")) { + ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build(); + break; + } + } + if (ch == null) { + ch = ManagedChannelBuilder.forTarget(serverUri).build(); + } + return ch; + } + + /** + * Establishes the RPC connection by creating a new RPC stream on the given channel for + * xDS protocol communication. + */ + private void startRpcStream() { + checkState(adsStream == null, "Previous adsStream has not been cleared yet"); + AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = + AggregatedDiscoveryServiceGrpc.newStub(channel); + adsStream = new AdsStream(stub); + adsStream.start(); + stopwatch.reset().start(); + } + + /** + * Handles LDS response to find the HttpConnectionManager message for the requested resource name. + * Proceed with the resolved RouteConfiguration in HttpConnectionManager message of the requested + * listener, if exists, to find the VirtualHost configuration for the "xds:" URI + * (with the port, if any, stripped off). Or sends an RDS request if configured for dynamic + * resolution. The response is NACKed if contains invalid data for gRPC's usage. Otherwise, an + * ACK request is sent to management server. + */ + private void handleLdsResponse(DiscoveryResponse ldsResponse) { + logger.log(Level.FINE, "Received an LDS response: {0}", ldsResponse); + checkState(ldsResourceName != null && configWatcher != null, + "No LDS request was ever sent. Management server is doing something wrong"); + adsStream.ldsRespNonce = ldsResponse.getNonce(); + + // Unpack Listener messages. + List listeners = new ArrayList<>(ldsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) { + listeners.add(res.unpack(Listener.class)); + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), + ldsResponse.getNonce(), "Broken LDS response."); + return; + } + + // Unpack HttpConnectionManager messages + HttpConnectionManager requestedHttpConnManager = null; + List httpConnectionManagers = new ArrayList<>(); + try { + for (Listener listener : listeners) { + HttpConnectionManager hm = + listener.getApiListener().getApiListener().unpack(HttpConnectionManager.class); + httpConnectionManagers.add(hm); + if (listener.getName().equals(ldsResourceName)) { + requestedHttpConnManager = hm; + } + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), + ldsResponse.getNonce(), "Broken LDS response."); + return; + } + + String errorMessage = null; + // All RouteConfigurations referenced by this LDS response, either in in-lined + // RouteConfiguration message or in RDS config. + Set routeConfigs = new HashSet<>(); + for (HttpConnectionManager hm : httpConnectionManagers) { + // The HttpConnectionManager message must either provide the RouteConfiguration directly + // in-line or tell the client to use RDS to obtain it. + if (hm.hasRouteConfig()) { + routeConfigs.add(hm.getRouteConfig().getName()); + } else if (hm.hasRds()) { + Rds rds = hm.getRds(); + if (!rds.getConfigSource().hasAds()) { + errorMessage = "For using RDS, it must be set to use ADS."; + break; + } + routeConfigs.add(rds.getRouteConfigName()); + } else { + errorMessage = "HttpConnectionManager message must either provide the " + + "RouteConfiguration directly in-line or tell the client to use RDS to obtain it."; + break; + } + } + + // Field clusterName found in the in-lined RouteConfiguration, if exists. + String clusterName = null; + // RouteConfiguration name to be used as the resource name for RDS request. + String rdsRouteConfigName = null; + if (errorMessage == null && requestedHttpConnManager != null) { + if (requestedHttpConnManager.hasRouteConfig()) { + RouteConfiguration rc = requestedHttpConnManager.getRouteConfig(); + clusterName = processRouteConfig(rc); + if (clusterName == null) { + errorMessage = "Cannot find a valid cluster name in VirtualHost inside " + + "RouteConfiguration with domains matching: " + hostName + "."; + } + } else if (requestedHttpConnManager.hasRds()) { + Rds rds = requestedHttpConnManager.getRds(); + rdsRouteConfigName = rds.getRouteConfigName(); + } + // Else impossible as we have already validated all HttpConnectionManager messages. + } + + if (errorMessage != null) { + adsStream.sendNackRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), + ldsResponse.getNonce(), errorMessage); + return; + } + adsStream.sendAckRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName), + ldsResponse.getVersionInfo(), ldsResponse.getNonce()); + + // Remove RDS cache entries for RouteConfigurations not referenced by this LDS response. + // LDS responses represents the state of the world, RouteConfigurations not referenced + // by this LDS response are those no longer exist. + routeConfigNamesToClusterNames.keySet().retainAll(routeConfigs); + + // Process the requested Listener if exists, either extract cluster information from in-lined + // RouteConfiguration message or send an RDS request for dynamic resolution. + if (clusterName != null) { + // Found clusterName in the in-lined RouteConfiguration. + ConfigUpdate configUpdate = ConfigUpdate.newBuilder().setClusterName(clusterName).build(); + configWatcher.onConfigChanged(configUpdate); + } else if (rdsRouteConfigName != null) { + // Update the RDS resource we wish to request. An RDS request may not be necessary, but + // we need to keep what we request updated in case of notifying watcher upon receiving + // an RDS response for updating the requested resource. + adsStream.rdsResourceName = rdsRouteConfigName; + // First look up the RDS cache to see if we had received an RDS response containing the + // desired RouteConfiguration previously. Otherwise, send an RDS request for dynamic + // resolution. + if (routeConfigNamesToClusterNames.containsKey(rdsRouteConfigName)) { + ConfigUpdate configUpdate = + ConfigUpdate.newBuilder() + .setClusterName(routeConfigNamesToClusterNames.get(rdsRouteConfigName)) + .build(); + configWatcher.onConfigChanged(configUpdate); + } else { + adsStream.sendXdsRequest(ADS_TYPE_URL_RDS, ImmutableList.of(rdsRouteConfigName)); + } + } else { + // The requested Listener does not exist. + configWatcher.onError( + Status.NOT_FOUND.withDescription( + "Listener for requested resource [" + ldsResourceName + "] does not exist")); + } + } + + /** + * Handles RDS response to find the RouteConfiguration message for the requested resource name. + * Proceed with the resolved RouteConfiguration if exists to find the VirtualHost configuration + * for the "xds:" URI (with the port, if any, stripped off). The response is NACKed if contains + * invalid data for gRPC's usage. Otherwise, an ACK request is sent to management server. + */ + private void handleRdsResponse(DiscoveryResponse rdsResponse) { + logger.log(Level.FINE, "Received an RDS response: {0}", rdsResponse); + checkState(adsStream.rdsResourceName != null, + "Never requested for RDS resources, management server is doing something wrong"); + adsStream.rdsRespNonce = rdsResponse.getNonce(); + + // Unpack RouteConfiguration messages. + List routeConfigs = new ArrayList<>(rdsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : rdsResponse.getResourcesList()) { + routeConfigs.add(res.unpack(RouteConfiguration.class)); + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), + rdsResponse.getNonce(), "Broken RDS response."); + return; + } + + // Validate and cache information from each RouteConfiguration message. + Map clusterNames = new HashMap<>(); + for (RouteConfiguration routeConfig : routeConfigs) { + String clusterName = processRouteConfig(routeConfig); + if (clusterName == null) { + adsStream.sendNackRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), + rdsResponse.getNonce(), + "Cannot find a valid cluster name in VirtualHost inside " + + "RouteConfiguration with domains matching: " + hostName + "."); + return; + } + clusterNames.put(routeConfig.getName(), clusterName); + } + routeConfigNamesToClusterNames.putAll(clusterNames); + + adsStream.sendAckRequest(ADS_TYPE_URL_RDS, ImmutableList.of(adsStream.rdsResourceName), + rdsResponse.getVersionInfo(), rdsResponse.getNonce()); + + // Notify the ConfigWatcher if this RDS response contains the most recently requested + // RDS resource. + if (clusterNames.containsKey(adsStream.rdsResourceName)) { + ConfigUpdate configUpdate = + ConfigUpdate.newBuilder() + .setClusterName(clusterNames.get(adsStream.rdsResourceName)) + .build(); + configWatcher.onConfigChanged(configUpdate); + } + // Do not notify an error to the ConfigWatcher. RDS protocol is incremental, not receiving + // requested RouteConfiguration in this response does not imply absence. + } + + /** + * Processes RouteConfiguration message (from an resource information in an LDS or RDS + * response), which may contain a VirtualHost with domains matching the "xds:" + * URI hostname directly in-line. Returns the clusterName found in that VirtualHost + * message. Returns {@code null} if such a clusterName cannot be resolved. + * + *

Note we only validate VirtualHosts with domains matching the "xds:" URI hostname. + */ + @Nullable + private String processRouteConfig(RouteConfiguration config) { + List virtualHosts = config.getVirtualHostsList(); + int matchingLen = -1; // longest length of wildcard pattern that matches host name + VirtualHost targetVirtualHost = null; // target VirtualHost with longest matched domain + for (VirtualHost vHost : virtualHosts) { + for (String domain : vHost.getDomainsList()) { + if (matchHostName(hostName, domain) && domain.length() > matchingLen) { + matchingLen = domain.length(); + targetVirtualHost = vHost; + } + } + } + + // Proceed with the virtual host that has longest wildcard matched domain name with the + // hostname in original "xds:" URI. + if (targetVirtualHost != null) { + // The client will look only at the last route in the list (the default route), + // whose match field must be empty and whose route field must be set. + List routes = targetVirtualHost.getRoutesList(); + if (!routes.isEmpty()) { + Route route = routes.get(routes.size() - 1); + // TODO(chengyuanzhang): check the match field must be empty. + if (route.hasRoute()) { + return route.getRoute().getCluster(); + } + } + } + return null; + } + + @VisibleForTesting + final class RpcRetryTask implements Runnable { + @Override + public void run() { + startRpcStream(); + if (configWatcher != null) { + adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); + } + // TODO(chengyuanzhang): send CDS/EDS requests if CDS/EDS watcher presents. + } + } + + private final class AdsStream implements StreamObserver { + private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub; + + private StreamObserver requestWriter; + private boolean responseReceived; + private boolean closed; + + // Last successfully applied version_info for each resource type. Starts with empty string. + // A version_info is used to update management server with client's most recent knowledge of + // resources. + private String ldsVersion = ""; + private String rdsVersion = ""; + + // Response nonce for the most recently received discovery responses of each resource type. + // Client initiated requests start response nonce with empty string. + // A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest + // corresponds to. + // A nonce becomes stale following a newer nonce being presented to the client in a + // DiscoveryResponse. + private String ldsRespNonce = ""; + private String rdsRespNonce = ""; + + // Most recently requested resource name(s) for each resource type. Note the resource_name in + // LDS requests will always be "xds:" URI (including port suffix if present). + @Nullable + private String rdsResourceName; + + private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) { + this.stub = checkNotNull(stub, "stub"); + } + + private void start() { + requestWriter = stub.withWaitForReady().streamAggregatedResources(this); + } + + @Override + public void onNext(final DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + responseReceived = true; + String typeUrl = response.getTypeUrl(); + if (typeUrl.equals(ADS_TYPE_URL_LDS)) { + handleLdsResponse(response); + } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { + handleRdsResponse(response); + } + // TODO(zdapeng): add CDS/EDS response handles. + } + }); + } + + @Override + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleStreamClosed( + Status.fromThrowable(t).augmentDescription("ADS stream [" + this + "] had an error")); + } + }); + } + + @Override + public void onCompleted() { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleStreamClosed( + Status.UNAVAILABLE.withDescription("ADS stream [" + this + "] was closed by server")); + } + }); + } + + private void handleStreamClosed(Status error) { + logger.log(Level.INFO, error.getDescription(), error.getCause()); + checkArgument(!error.isOk(), "unexpected OK status"); + if (closed) { + return; + } + closed = true; + cleanUp(); + if (responseReceived || retryBackoffPolicy == null) { + // Reset the backoff sequence if had received a response, or backoff sequence + // has never been initialized. + retryBackoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = 0; + if (!responseReceived) { + delayNanos = + Math.max( + 0, + retryBackoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + logger.log(Level.FINE, "{0} stream closed, retry in {1} ns", new Object[]{this, delayNanos}); + rpcRetryTimer = + syncContext.schedule( + new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); + } + + private void close(Exception error) { + if (closed) { + return; + } + closed = true; + cleanUp(); + requestWriter.onError(error); + } + + private void cleanUp() { + if (adsStream == this) { + adsStream = null; + } + } + + /** + * Sends a DiscoveryRequest for the given resource name to management server. Memories the + * requested resource name (except for LDS as we always request for the singleton Listener) + * as we need it to find resources in responses. + */ + private void sendXdsRequest(String typeUrl, Collection resourceNames) { + checkState(requestWriter != null, "ADS stream has not been started"); + String version = ""; + String nonce = ""; + if (typeUrl.equals(ADS_TYPE_URL_LDS)) { + version = ldsVersion; + nonce = ldsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { + checkArgument(resourceNames.size() == 1, + "RDS request requesting for more than one resource"); + version = rdsVersion; + nonce = rdsRespNonce; + rdsResourceName = resourceNames.iterator().next(); + } + // TODO(chengyuanzhang): cases for CDS/EDS. + DiscoveryRequest request = + DiscoveryRequest + .newBuilder() + .setVersionInfo(version) + .setNode(node) + .addAllResourceNames(resourceNames) + .setTypeUrl(typeUrl) + .setResponseNonce(nonce) + .build(); + requestWriter.onNext(request); + } + + /** + * Sends a DiscoveryRequest with the given information as an ACK. Updates the latest accepted + * version for the corresponding resource type. + */ + private void sendAckRequest(String typeUrl, Collection resourceNames, + String versionInfo, String nonce) { + checkState(requestWriter != null, "ADS stream has not been started"); + if (typeUrl.equals(ADS_TYPE_URL_LDS)) { + ldsVersion = versionInfo; + ldsRespNonce = nonce; + } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { + rdsVersion = versionInfo; + rdsRespNonce = nonce; + } + // TODO(chengyuanzhang): cases for CDS/EDS. + DiscoveryRequest request = + DiscoveryRequest + .newBuilder() + .setVersionInfo(versionInfo) + .setNode(node) + .addAllResourceNames(resourceNames) + .setTypeUrl(typeUrl) + .setResponseNonce(nonce) + .build(); + requestWriter.onNext(request); + } + + /** + * Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous + * accepted version. + */ + private void sendNackRequest(String typeUrl, Collection resourceNames, String nonce, + String message) { + checkState(requestWriter != null, "ADS stream has not been started"); + String versionInfo = ""; + if (typeUrl.equals(ADS_TYPE_URL_LDS)) { + versionInfo = ldsVersion; + } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { + versionInfo = rdsVersion; + } + // TODO(chengyuanzhang): cases for CDS/EDS. + DiscoveryRequest request = + DiscoveryRequest + .newBuilder() + .setVersionInfo(versionInfo) + .setNode(node) + .addAllResourceNames(resourceNames) + .setTypeUrl(typeUrl) + .setResponseNonce(nonce) + .setErrorDetail( + com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT_VALUE) + .setMessage(message)) + .build(); + requestWriter.onNext(request); + } + } + + /** + * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with + * case-insensitive. + * + *

Wildcard pattern rules: + *

    + *
  1. A single asterisk (*) matches any domain.
  2. + *
  3. Asterisk (*) is only permitted in the left-most or the right-most part of the pattern, + * but not both.
  4. + *
+ */ + @VisibleForTesting + static boolean matchHostName(String hostName, String pattern) { + checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."), + "Invalid host name"); + checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."), + "Invalid pattern/domain name"); + + hostName = hostName.toLowerCase(Locale.US); + pattern = pattern.toLowerCase(Locale.US); + // hostName and pattern are now in lower case -- domain names are case-insensitive. + + if (!pattern.contains("*")) { + // Not a wildcard pattern -- hostName and pattern must match exactly. + return hostName.equals(pattern); + } + // Wildcard pattern + + if (pattern.length() == 1) { + return true; + } + + int index = pattern.indexOf('*'); + + // At most one asterisk (*) is allowed. + if (pattern.indexOf('*', index + 1) != -1) { + return false; + } + + // Asterisk can only match prefix or suffix. + if (index != 0 && index != pattern.length() - 1) { + return false; + } + + // HostName must be at least as long as the pattern because asterisk has to + // match one or more characters. + if (hostName.length() < pattern.length()) { + return false; + } + + if (index == 0 && hostName.endsWith(pattern.substring(1))) { + // Prefix matching fails. + return true; + } + + // Pattern matches hostname if suffix matching succeeds. + return index == pattern.length() - 1 + && hostName.startsWith(pattern.substring(0, pattern.length() - 1)); + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java new file mode 100644 index 0000000000..1c6ab7a44a --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -0,0 +1,1385 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.Listener; +import io.envoyproxy.envoy.api.v2.RouteConfiguration; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; +import io.envoyproxy.envoy.api.v2.core.ConfigSource; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.listener.FilterChain; +import io.envoyproxy.envoy.api.v2.route.RedirectAction; +import io.envoyproxy.envoy.api.v2.route.Route; +import io.envoyproxy.envoy.api.v2.route.RouteAction; +import io.envoyproxy.envoy.api.v2.route.VirtualHost; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; +import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; +import io.envoyproxy.envoy.config.listener.v2.ApiListener; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsClient.ConfigUpdate; +import io.grpc.xds.XdsClient.ConfigWatcher; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link XdsClientImpl}. + */ +public class XdsClientImplTest { + + private static final String HOSTNAME = "foo.googleapis.com"; + private static final int PORT = 8080; + + private static final Node NODE = Node.getDefaultInstance(); + private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(XdsClientImpl.RpcRetryTask.class.getSimpleName()); + } + }; + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock fakeClock = new FakeClock(); + + private final Queue> responseObservers = new ArrayDeque<>(); + private final Queue> requestObservers = new ArrayDeque<>(); + private final AtomicBoolean callEnded = new AtomicBoolean(true); + + @Mock + private AggregatedDiscoveryServiceImplBase mockedDiscoveryService; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; + @Mock + private ConfigWatcher configWatcher; + + private ManagedChannel channel; + private XdsClientImpl xdsClient; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); + when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); + when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); + + String serverName = InProcessServerBuilder.generateName(); + AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(callEnded.get()).isTrue(); // ensure previous call was ended + callEnded.set(false); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + callEnded.set(true); + } + }, MoreExecutors.directExecutor()); + responseObservers.offer(responseObserver); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + requestObservers.offer(requestObserver); + return requestObserver; + } + }; + mockedDiscoveryService = + mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(serviceImpl)); + + cleanupRule.register( + InProcessServerBuilder + .forName(serverName) + .addService(mockedDiscoveryService) + .directExecutor() + .build() + .start()); + channel = + cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + xdsClient = + new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, fakeClock.getStopwatchSupplier().get()); + // Only the connection to management server is established, no RPC request is sent until at + // least one watcher is registered. + assertThat(responseObservers).isEmpty(); + assertThat(requestObservers).isEmpty(); + } + + @After + public void tearDown() { + xdsClient.shutdown(); + assertThat(callEnded.get()).isTrue(); + assertThat(channel.isShutdown()).isTrue(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + // Always test the real workflow and integrity of XdsClient: RDS protocol should always followed + // after at least one LDS request-response, from which the RDS resource name comes. CDS and EDS + // can be tested separately as they are used in a standalone way. + + // Discovery responses should follow management server spec and xDS protocol. See + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol. + + /** + * Client receives an LDS response that does not contain a Listener for the requested resource. + * The LDS response is ACKed. + * The config watcher is notified with an error. + */ + @Test + public void ldsResponseWithoutMatchingResource() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListener("baz.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-baz.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("baz.googleapis.com"), + "cluster-baz.googleapis.com")))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist"); + + verifyNoMoreInteractions(requestObserver); + } + + /** + * An LDS response contains the requested listener and an in-lined RouteConfiguration message for + * that listener. But the RouteConfiguration message is invalid as it does not contain any + * VirtualHost with domains matching the requested hostname. + * The LDS response is NACKed, as if the XdsClient has not received this response. + * The config watcher is NOT notified with an error. + */ + @Test + public void failToFindVirtualHostInLdsResponseInLineRouteConfig() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + RouteConfiguration routeConfig = + buildRouteConfiguration( + "route.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"), + buildVirtualHost(ImmutableList.of("something else does not match"), + "some other cluster"))); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an NACK LDS request. + verify(requestObserver) + .onNext( + argThat(new DiscoveryRequestMatcher("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class)); + verify(configWatcher, never()).onError(any(Status.class)); + verifyNoMoreInteractions(requestObserver); + } + + /** + * Client resolves the virtual host config from an LDS response that contains a + * RouteConfiguration message directly in-line for the requested resource. No RDS is needed. + * The LDS response is ACKed. + * The config watcher is notified with an update. + */ + @Test + public void resolveVirtualHostInLdsResponse() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("bar.googleapis.com"), + "cluster-bar.googleapis.com")))) + .build()))), + Any.pack(buildListener("baz.googleapis.com", + Any.pack(HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-baz.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("baz.googleapis.com"), + "cluster-baz.googleapis.com")))) + .build()))), + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "cluster.googleapis.com"), + buildVirtualHost( + ImmutableList.of("something does not match"), + "some cluster")))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com"); + + verifyNoMoreInteractions(requestObserver); + } + + /** + * Client receives an RDS response (after a previous LDS request-response) that does not contain a + * RouteConfiguration for the requested resource while each received RouteConfiguration is valid. + * The RDS response is ACKed. + * The config watcher is NOT notified with an error (RDS protocol is incremental, responses + * not containing requested resources does not indicate absence). + */ + @Test + public void rdsResponseWithoutMatchingResource() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sends an (first) RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server should only sends RouteConfiguration messages with at least one + // VirtualHost with domains matching requested hostname. Otherwise, it is invalid data. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "some resource name does not match route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "whatever cluster")))), + Any.pack( + buildRouteConfiguration( + "some other resource name does not match route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "some more whatever cluster"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class)); + verify(configWatcher, never()).onError(any(Status.class)); + } + + /** + * Client resolves the virtual host config from an RDS response for the requested resource. The + * RDS response is ACKed. + * The config watcher is notified with an update. + */ + @Test + public void resolveVirtualHostInRdsResponse() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + + // Management server should only sends RouteConfiguration messages with at least one + // VirtualHost with domains matching requested hostname. Otherwise, it is invalid data. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"), + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "cluster.googleapis.com")))), // matching virtual host + Any.pack( + buildRouteConfiguration( + "some resource name does not match route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "some more cluster"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com"); + } + + /** + * Client receives an RDS response (after a previous LDS request-response) containing a + * RouteConfiguration message for the requested resource. But the RouteConfiguration message + * is invalid as it does not contain any VirtualHost with domains matching the requested + * hostname. + * The LDS response is NACKed, as if the XdsClient has not received this response. + * The config watcher is NOT notified with an error. + */ + @Test + public void failToFindVirtualHostInRdsResponse() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"), + buildVirtualHost( + ImmutableList.of("something else does not match", "also does not match"), + "cluster.googleapis.com")))), + Any.pack( + buildRouteConfiguration( + "some resource name does not match route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("one more does not match"), + "some more cluster"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an NACK RDS request. + verify(requestObserver) + .onNext( + argThat(new DiscoveryRequestMatcher("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class)); + verify(configWatcher, never()).onError(any(Status.class)); + } + + /** + * Client receives an RDS response (after a previous LDS request-response) containing a + * RouteConfiguration message for the requested resource. But the RouteConfiguration message + * is invalid as the VirtualHost with domains matching the requested hostname contains invalid + * data, its RouteAction message is absent. + * The LDS response is NACKed, as if the XdsClient has not received this response. + * The config watcher is NOT notified with an error. + */ + @Test + public void matchingVirtualHostDoesNotContainRouteAction() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + + // A VirtualHost with a Route that contains only redirect configuration. + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName("virtualhost00.googleapis.com") // don't care + .addDomains("foo.googleapis.com") + .addRoutes( + Route.newBuilder() + .setRedirect( + RedirectAction.newBuilder() + .setHostRedirect("bar.googleapis.com") + .setPortRedirect(443))) + .build(); + + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration("route-foo.googleapis.com", + ImmutableList.of(virtualHost)))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an NACK RDS request. + verify(requestObserver) + .onNext( + argThat(new DiscoveryRequestMatcher("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + verify(configWatcher, never()).onConfigChanged(any(ConfigUpdate.class)); + verify(configWatcher, never()).onError(any(Status.class)); + } + + /** + * Client receives LDS/RDS responses for updating resources previously received. + * + *

Tests for streaming behavior. + */ + @Test + public void notifyUpdatedResources() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server sends back an LDS response containing a RouteConfiguration for the + // requested Listener directly in-line. + RouteConfiguration routeConfig = + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "cluster.googleapis.com"), + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"))); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Cluster name is resolved and notified to config watcher. + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com"); + + // Management sends back another LDS response containing updates for the requested Listener. + routeConfig = + buildRouteConfiguration( + "another-route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "another-cluster.googleapis.com"), + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"))); + + listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRouteConfig(routeConfig).build()))) + ); + response = + buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"))); + + // Updated cluster name is notified to config watcher. + configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher, times(2)).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()) + .isEqualTo("another-cluster.googleapis.com"); + + // Management server sends back another LDS response containing updates for the requested + // Listener and telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("some-route-to-foo.googleapis.com") + .build(); + + listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + response = + buildDiscoveryResponse("2", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0002"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("2", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0002"))); + + // Client sends an (first) RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "some-route-to-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server sends back an RDS response containing the RouteConfiguration + // for the requested resource. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "some-route-to-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"), + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "some-other-cluster.googleapis.com"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "some-route-to-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + // Updated cluster name is notified to config watcher again. + configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher, times(3)).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()) + .isEqualTo("some-other-cluster.googleapis.com"); + + // Management server sends back another RDS response containing updated information for the + // RouteConfiguration currently in-use by client. + routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "some-route-to-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "an-updated-cluster.googleapis.com"))))); + response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "some-route-to-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0001"))); + + // Updated cluster name is notified to config watcher again. + configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher, times(4)).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()) + .isEqualTo("an-updated-cluster.googleapis.com"); + } + + // TODO(chengyuanzhang): tests for timeout waiting for responses for incremental + // protocols (RDS/EDS). + + /** + * Client receives multiple RDS responses without RouteConfiguration for the requested + * resource. It should continue waiting until such an RDS response arrives, as RDS + * protocol is incremental. + * + *

Tests for RDS incremental protocol behavior. + */ + @Test + public void waitRdsResponsesForRequestedResource() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management sends back an LDS response telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sends an (first) RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server sends back an RDS response that does not contain RouteConfiguration + // for the requested resource. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "some resource name does not match route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "some more cluster"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + // Client waits for future RDS responses silently. + verifyNoMoreInteractions(configWatcher); + + // Management server sends back another RDS response containing the RouteConfiguration + // for the requested resource. + routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("something does not match"), + "some cluster"), + buildVirtualHost(ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "another-cluster.googleapis.com"))))); + response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0001"))); + + // Updated cluster name is notified to config watcher. + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()) + .isEqualTo("another-cluster.googleapis.com"); + } + + /** + * Client receives RDS responses containing RouteConfigurations for resources that were + * not requested (management server sends them proactively). Later client receives an LDS + * response with the requested Listener containing Rds config pointing to do RDS for one of + * the previously received RouteConfigurations. No RDS request needs to be sent for that + * RouteConfiguration as it can be found in local cache (management server will not send + * RDS responses for that RouteConfiguration again). A future RDS response update for + * that RouteConfiguration should be notified to config watcher. + * + *

Tests for caching RDS response data behavior. + */ + @Test + public void receiveRdsResponsesForRouteConfigurationsToBeUsedLater() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management sends back an LDS response telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo1.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sends an (first) RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo1.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server sends back an RDS response containing RouteConfigurations + // more than requested. + List routeConfigs = ImmutableList.of( + // Currently wanted resource. + Any.pack( + buildRouteConfiguration( + "route-foo1.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster1.googleapis.com")))), + // Resources currently not wanted. + Any.pack( + buildRouteConfiguration( + "route-foo2.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster2.googleapis.com")))), + Any.pack( + buildRouteConfiguration( + "route-foo3.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster3.googleapis.com"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "route-foo1.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + // Resolved cluster name is notified to config watcher. + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster1.googleapis.com"); + + // Management server sends back another LDS response containing updates for the requested + // Listener and telling client to do RDS for a RouteConfiguration which had previously + // sent to client. + rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo2.googleapis.com") + .build(); + + listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + response = buildDiscoveryResponse("1", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"))); + + // Updated cluster name is notified to config watcher. + configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher, times(2)).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster2.googleapis.com"); + + // At this time, no RDS request is sent as the result can be found in local cache (even if + // a request is sent for it, management server does not necessarily reply). + verify(requestObserver, times(0)) + .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + verifyNoMoreInteractions(requestObserver); + + // Management server sends back another RDS response containing updates for the + // RouteConfiguration that the client was pointed to most recently (i.e., + // "route-foo2.googleapis.com"). + routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo2.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "a-new-cluster.googleapis.com"))))); + response = buildDiscoveryResponse("1", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "route-foo2.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0001"))); + + // Updated cluster name is notified to config watcher. + configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher, times(3)).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()) + .isEqualTo("a-new-cluster.googleapis.com"); + } + + /** + * An RouteConfiguration is removed by server by sending client an LDS response removing the + * corresponding Listener. + */ + @Test + public void routeConfigurationRemovedNotifiedToWatcher() { + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management sends back an LDS response telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + // Must set to use ADS. + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sends an (first) RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server sends back an RDS response containing RouteConfiguration requested. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + response = buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK RDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, "0000"))); + + // Resolved cluster name is notified to config watcher. + ArgumentCaptor configUpdateCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onConfigChanged(configUpdateCaptor.capture()); + assertThat(configUpdateCaptor.getValue().getClusterName()).isEqualTo("cluster.googleapis.com"); + + // Management server sends back another LDS response with the previous Listener (currently + // in-use by client) removed as the RouteConfiguration it references to is absent. + response = + buildDiscoveryResponse("1", ImmutableList.of(), // empty + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0001"))); + + // Notify config watcher with an error. + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(configWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist"); + } + + /** + * RPC stream closed and retry during the period of first tiem resolving service config + * (LDS/RDS only). + */ + @Test + public void streamClosedAndRetryWhenResolvingConfig() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an LDS request for the host name (with port) to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server closes the RPC stream immediately. + responseObserver.onCompleted(); + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Client retried by sending an LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNAVAILABLE.asException()); + verifyNoMoreInteractions(backoffPolicyProvider); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Client retried again by sending an LDS. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server responses with a listener for the requested resource. + Rds rdsConfig = + Rds.newBuilder() + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse ldsResponse = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(ldsResponse); + + // Client sent back an ACK LDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, "0000"))); + + // Client sent an RDS request based on the received listener. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "route-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_RDS, ""))); + + // Management server encounters an error and closes the stream. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Reset backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // RPC stream closed immediately + responseObserver.onError(Status.UNKNOWN.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server sends an LDS response. + responseObserver.onNext(ldsResponse); + + // Client sends an ACK LDS request and an RDS request for "route-foo.googleapis.com". (Omitted) + + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse rdsResponse = + buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + // Management server sends an RDS response. + responseObserver.onNext(rdsResponse); + + // Client has resolved the cluster based on the RDS response. + configWatcher + .onConfigChanged( + eq(ConfigUpdate.newBuilder().setClusterName("cluster.googleapis.com").build())); + + // RPC stream closed with an error again. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Reset backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + } + + // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only + // for ClusterWatchers and EndpointWatchers. + + @Test + public void matchHostName_exactlyMatch() { + String pattern = "foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("fo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("oo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); + } + + @Test + public void matchHostName_prefixWildcard() { + String pattern = "*.foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar-baz.foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isTrue(); + pattern = "*-bar.foo.googleapis.com"; + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("-bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("baz-bar.foo.googleapis.com", pattern)) + .isTrue(); + } + + @Test + public void matchHostName_postfixWildCard() { + String pattern = "foo.*"; + assertThat(XdsClientImpl.matchHostName("bar.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("bar.foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo.com", pattern)).isTrue(); + pattern = "foo-*"; + assertThat(XdsClientImpl.matchHostName("bar-.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo.googleapis.com", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo-", pattern)).isFalse(); + assertThat(XdsClientImpl.matchHostName("foo-bar.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo-.com", pattern)).isTrue(); + assertThat(XdsClientImpl.matchHostName("foo-bar", pattern)).isTrue(); + } + + private static DiscoveryResponse buildDiscoveryResponse(String versionInfo, + List resources, String typeUrl, String nonce) { + return + DiscoveryResponse.newBuilder() + .setVersionInfo(versionInfo) + .setTypeUrl(typeUrl) + .addAllResources(resources) + .setNonce(nonce) + .build(); + } + + private static DiscoveryRequest buildDiscoveryRequest(String versionInfo, + String resourceName, String typeUrl, String nonce) { + return + DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(NODE) + .setTypeUrl(typeUrl) + .addResourceNames(resourceName) + .setResponseNonce(nonce) + .build(); + } + + private static Listener buildListener(String name, com.google.protobuf.Any apiListener) { + return + Listener.newBuilder() + .setName(name) + .setAddress(Address.getDefaultInstance()) + .addFilterChains(FilterChain.getDefaultInstance()) + .setApiListener(ApiListener.newBuilder().setApiListener(apiListener)) + .build(); + } + + private static RouteConfiguration buildRouteConfiguration(String name, + List virtualHosts) { + return + RouteConfiguration.newBuilder() + .setName(name) + .addAllVirtualHosts(virtualHosts) + .build(); + } + + private static VirtualHost buildVirtualHost(List domains, String clusterName) { + return + VirtualHost.newBuilder() + .setName("virtualhost00.googleapis.com") // don't care + .addAllDomains(domains) + .addRoutes(Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("whatever cluster"))) + .addRoutes( + // Only the last (default) route matters. + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster(clusterName))) + .build(); + } + + /** + * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for + * management server debugging purposes. + * + *

In general, if you are sure error_details field should not be set in a DiscoveryRequest, + * compare with message equality. Otherwise, this matcher is handy for comparing other fields + * only. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final String versionInfo; + private final String typeUrl; + private final List resourceNames; + private final String responseNonce; + + private DiscoveryRequestMatcher(String versionInfo, String resourceName, String typeUrl, + String responseNonce) { + this(versionInfo, ImmutableList.of(resourceName), typeUrl, responseNonce); + } + + private DiscoveryRequestMatcher(String versionInfo, List resourceNames, String typeUrl, + String responseNonce) { + this.versionInfo = versionInfo; + this.resourceNames = resourceNames; + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!resourceNames.equals(argument.getResourceNamesList())) { + return false; + } + return NODE.equals(argument.getNode()); + } + } +}