diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 01a879a248..2021f73008 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; @@ -117,7 +118,8 @@ abstract class XdsClient { @Nullable List virtualHosts) { this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; this.rdsName = rdsName; - this.virtualHosts = virtualHosts; + this.virtualHosts = virtualHosts == null + ? null : Collections.unmodifiableList(new ArrayList<>(virtualHosts)); } long getHttpMaxStreamDurationNano() { @@ -169,7 +171,7 @@ abstract class XdsClient { return new Builder(); } - private static class Builder { + static class Builder { private long httpMaxStreamDurationNano; @Nullable private String rdsName; @@ -189,8 +191,11 @@ abstract class XdsClient { return this; } - Builder setVirtualHosts(List virtualHosts) { - this.virtualHosts = virtualHosts; + Builder addVirtualHost(VirtualHost virtualHost) { + if (virtualHosts == null) { + virtualHosts = new ArrayList<>(); + } + virtualHosts.add(virtualHost); return this; } @@ -206,7 +211,8 @@ abstract class XdsClient { private final List virtualHosts; private RdsUpdate(List virtualHosts) { - this.virtualHosts = virtualHosts; + this.virtualHosts = Collections.unmodifiableList( + new ArrayList<>(checkNotNull(virtualHosts, "virtualHosts"))); } static RdsUpdate fromVirtualHosts(List virtualHosts) { @@ -223,6 +229,23 @@ abstract class XdsClient { .add("virtualHosts", virtualHosts) .toString(); } + + @Override + public int hashCode() { + return Objects.hash(virtualHosts); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RdsUpdate that = (RdsUpdate) o; + return Objects.equals(virtualHosts, that.virtualHosts); + } } static final class CdsUpdate implements ResourceUpdate { @@ -474,7 +497,7 @@ abstract class XdsClient { * Updates via resource discovery RPCs using LDS. Includes {@link Listener} object containing * config for security, RBAC or other server side features such as rate limit. */ - static final class ListenerUpdate { + static final class ListenerUpdate implements ResourceUpdate { // TODO(sanjaypujare): flatten structure by moving Listener class members here. private final Listener listener; diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java new file mode 100644 index 0000000000..d4c7e5f58d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java @@ -0,0 +1,1702 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.JsonFormat; +import com.google.rpc.Code; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.FilterChain; +import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; +import io.envoyproxy.envoy.config.route.v3.VirtualHost; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.grpc.InternalLogId; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.BackoffPolicy; +import io.grpc.stub.StreamObserver; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.EnvoyProtoData.StructOrError; +import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; +import io.grpc.xds.LoadStatsManager.LoadStatsStore; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +final class XdsClientImpl2 extends XdsClient { + + // Longest time to wait, since the subscription to some resource, for concluding its absence. + @VisibleForTesting + static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; + private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener"; + private static final String ADS_TYPE_URL_LDS = + "type.googleapis.com/envoy.config.listener.v3.Listener"; + private static final String ADS_TYPE_URL_RDS_V2 = + "type.googleapis.com/envoy.api.v2.RouteConfiguration"; + private static final String ADS_TYPE_URL_RDS = + "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; + private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 = + "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2" + + ".HttpConnectionManager"; + private static final String TYPE_URL_HTTP_CONNECTION_MANAGER = + "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3" + + ".HttpConnectionManager"; + private static final String ADS_TYPE_URL_CDS_V2 = "type.googleapis.com/envoy.api.v2.Cluster"; + private static final String ADS_TYPE_URL_CDS = + "type.googleapis.com/envoy.config.cluster.v3.Cluster"; + private static final String ADS_TYPE_URL_EDS_V2 = + "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; + private static final String ADS_TYPE_URL_EDS = + "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; + + private final MessagePrinter respPrinter = new MessagePrinter(); + private final InternalLogId logId; + private final XdsLogger logger; + private final String targetName; // TODO: delete me. + private final XdsChannel xdsChannel; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timeService; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final Supplier stopwatchSupplier; + private final Stopwatch adsStreamRetryStopwatch; + // The node identifier to be included in xDS requests. Management server only requires the + // first request to carry the node identifier on a stream. It should be identical if present + // more than once. + private Node node; + + private final Map ldsResourceSubscribers = new HashMap<>(); + private final Map rdsResourceSubscribers = new HashMap<>(); + private final Map cdsResourceSubscribers = new HashMap<>(); + private final Map edsResourceSubscribers = new HashMap<>(); + + private final LoadStatsManager loadStatsManager = new LoadStatsManager(); + + // Last successfully applied version_info for each resource type. Starts with empty string. + // A version_info is used to update management server with client's most recent knowledge of + // resources. + private String ldsVersion = ""; + private String rdsVersion = ""; + private String cdsVersion = ""; + private String edsVersion = ""; + + @Nullable + private AbstractAdsStream adsStream; + @Nullable + private BackoffPolicy retryBackoffPolicy; + @Nullable + private ScheduledHandle rpcRetryTimer; + @Nullable + private LoadReportClient lrsClient; + private int loadReportCount; // number of clusters enabling load reporting + + // For server side usage. + @Nullable + private ListenerWatcher listenerWatcher; + private int listenerPort = -1; + @Nullable + private ScheduledHandle ldsRespTimer; + + XdsClientImpl2( + String targetName, + XdsChannel channel, + Node node, + SynchronizationContext syncContext, + ScheduledExecutorService timeService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.targetName = checkNotNull(targetName, "targetName"); + this.xdsChannel = checkNotNull(channel, "channel"); + this.node = checkNotNull(node, "node"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.timeService = checkNotNull(timeService, "timeService"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatch"); + adsStreamRetryStopwatch = stopwatchSupplier.get(); + logId = InternalLogId.allocate("xds-client", null); + logger = XdsLogger.withLogId(logId); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutting down"); + xdsChannel.getManagedChannel().shutdown(); + if (adsStream != null) { + adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); + } + cleanUpResourceTimers(); + if (lrsClient != null) { + lrsClient.stopLoadReporting(); + lrsClient = null; + } + if (rpcRetryTimer != null) { + rpcRetryTimer.cancel(); + } + } + + private void cleanUpResourceTimers() { + if (ldsRespTimer != null) { + ldsRespTimer.cancel(); + ldsRespTimer = null; + } + for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { + subscriber.stopTimer(); + } + for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { + subscriber.stopTimer(); + } + for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { + subscriber.stopTimer(); + } + for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { + subscriber.stopTimer(); + } + } + + @Override + void watchLdsResource(String resourceName, LdsResourceWatcher watcher) { + ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); + subscriber = new ResourceSubscriber(ResourceType.LDS, resourceName); + ldsResourceSubscribers.put(resourceName, subscriber); + adjustResourceSubscription(ResourceType.LDS, ldsResourceSubscribers.keySet()); + } + subscriber.addWatcher(watcher); + } + + @Override + void cancelLdsResourceWatch(String resourceName, LdsResourceWatcher watcher) { + ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName); + subscriber.removeWatcher(watcher); + if (!subscriber.isWatched()) { + subscriber.stopTimer(); + logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName); + ldsResourceSubscribers.remove(resourceName); + adjustResourceSubscription(ResourceType.LDS, ldsResourceSubscribers.keySet()); + } + } + + @Override + void watchRdsResource(String resourceName, RdsResourceWatcher watcher) { + ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.INFO, "Subscribe RDS resource {0}", resourceName); + subscriber = new ResourceSubscriber(ResourceType.RDS, resourceName); + rdsResourceSubscribers.put(resourceName, subscriber); + adjustResourceSubscription(ResourceType.RDS, rdsResourceSubscribers.keySet()); + } + subscriber.addWatcher(watcher); + } + + @Override + void cancelRdsResourceWatch(String resourceName, RdsResourceWatcher watcher) { + ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName); + subscriber.removeWatcher(watcher); + if (!subscriber.isWatched()) { + subscriber.stopTimer(); + logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName); + rdsResourceSubscribers.remove(resourceName); + adjustResourceSubscription(ResourceType.RDS, rdsResourceSubscribers.keySet()); + } + } + + @Override + void watchCdsResource(String resourceName, CdsResourceWatcher watcher) { + ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName); + subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName); + cdsResourceSubscribers.put(resourceName, subscriber); + adjustResourceSubscription(ResourceType.CDS, cdsResourceSubscribers.keySet()); + } + subscriber.addWatcher(watcher); + } + + @Override + void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) { + ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName); + subscriber.removeWatcher(watcher); + if (!subscriber.isWatched()) { + subscriber.stopTimer(); + logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName); + cdsResourceSubscribers.remove(resourceName); + adjustResourceSubscription(ResourceType.CDS, cdsResourceSubscribers.keySet()); + } + } + + @Override + void watchEdsResource(String resourceName, EdsResourceWatcher watcher) { + ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); + if (subscriber == null) { + logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName); + subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName); + edsResourceSubscribers.put(resourceName, subscriber); + adjustResourceSubscription(ResourceType.EDS, edsResourceSubscribers.keySet()); + } + subscriber.addWatcher(watcher); + } + + @Override + void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { + ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName); + subscriber.removeWatcher(watcher); + if (!subscriber.isWatched()) { + subscriber.stopTimer(); + logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName); + edsResourceSubscribers.remove(resourceName); + adjustResourceSubscription(ResourceType.EDS, edsResourceSubscribers.keySet()); + } + } + + @Override + void watchListenerData(int port, ListenerWatcher watcher) { + checkState(listenerWatcher == null, "ListenerWatcher already registered"); + listenerWatcher = checkNotNull(watcher, "watcher"); + checkArgument(port > 0, "port needs to be > 0"); + this.listenerPort = port; + logger.log(XdsLogLevel.INFO, "Started watching listener for port {0}", port); + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + // Currently in retry backoff. + return; + } + if (adsStream == null) { + startRpcStream(); + } + updateNodeMetadataForListenerRequest(port); + adsStream.sendXdsRequest(ResourceType.LDS, ImmutableList.of()); + ldsRespTimer = + syncContext + .schedule( + new ListenerResourceFetchTimeoutTask(":" + port), + INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); + } + + /** In case of Listener watcher metadata to be updated to include port. */ + private void updateNodeMetadataForListenerRequest(int port) { + Map newMetadata = new HashMap<>(); + if (node.getMetadata() != null) { + newMetadata.putAll(node.getMetadata()); + } + newMetadata.put("TRAFFICDIRECTOR_PROXYLESS", "1"); + // TODO(sanjaypujare): eliminate usage of listening_addresses. + EnvoyProtoData.Address listeningAddress = + new EnvoyProtoData.Address("0.0.0.0", port); + node = + node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build(); + } + + @Override + void reportClientStats() { + if (lrsClient == null) { + logger.log(XdsLogLevel.INFO, "Turning on load reporting"); + lrsClient = + new LoadReportClient( + targetName, + loadStatsManager, + xdsChannel, + node, + syncContext, + timeService, + backoffPolicyProvider, + stopwatchSupplier); + } + if (loadReportCount == 0) { + lrsClient.startLoadReporting(); + } + loadReportCount++; + } + + @Override + void cancelClientStatsReport() { + checkState(loadReportCount > 0, "load reporting was never started"); + loadReportCount--; + if (loadReportCount == 0) { + logger.log(XdsLogLevel.INFO, "Turning off load reporting"); + lrsClient.stopLoadReporting(); + lrsClient = null; + } + } + + @Override + LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) { + return loadStatsManager.addLoadStats(clusterName, clusterServiceName); + } + + @Override + void removeClientStats(String clusterName, @Nullable String clusterServiceName) { + loadStatsManager.removeLoadStats(clusterName, clusterServiceName); + } + + @Override + public String toString() { + return logId.toString(); + } + + /** + * Establishes the RPC connection by creating a new RPC stream on the given channel for + * xDS protocol communication. + */ + private void startRpcStream() { + checkState(adsStream == null, "Previous adsStream has not been cleared yet"); + if (xdsChannel.isUseProtocolV3()) { + adsStream = new AdsStream(); + } else { + adsStream = new AdsStreamV2(); + } + adsStream.start(); + logger.log(XdsLogLevel.INFO, "ADS stream started"); + adsStreamRetryStopwatch.reset().start(); + } + + private void handleLdsResponse(DiscoveryResponseData ldsResponse) { + if (listenerWatcher != null) { + handleLdsResponseForServer(ldsResponse); + } else { + handleLdsResponseForClient(ldsResponse); + } + } + + private void handleLdsResponseForClient(DiscoveryResponseData ldsResponse) { + // Unpack Listener messages. + List listeners = new ArrayList<>(ldsResponse.getResourcesList().size()); + List listenerNames = new ArrayList<>(ldsResponse.getResourcesList().size()); + try { + for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) { + if (res.getTypeUrl().equals(ADS_TYPE_URL_LDS_V2)) { + res = res.toBuilder().setTypeUrl(ADS_TYPE_URL_LDS).build(); + } + Listener listener = res.unpack(Listener.class); + listeners.add(listener); + listenerNames.add(listener.getName()); + } + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.LDS, ldsResourceSubscribers.keySet(), + ldsResponse.getVersionInfo(), "Malformed LDS response: " + e); + return; + } + logger.log(XdsLogLevel.INFO, "Received LDS response for resources: {0}", listenerNames); + + // Unpack HttpConnectionManager messages. + Map httpConnectionManagers = new HashMap<>(listeners.size()); + try { + for (Listener listener : listeners) { + Any apiListener = listener.getApiListener().getApiListener(); + if (apiListener.getTypeUrl().equals(TYPE_URL_HTTP_CONNECTION_MANAGER_V2)) { + apiListener = + apiListener.toBuilder().setTypeUrl(TYPE_URL_HTTP_CONNECTION_MANAGER).build(); + } + HttpConnectionManager hcm = apiListener.unpack(HttpConnectionManager.class); + httpConnectionManagers.put(listener.getName(), hcm); + } + } catch (InvalidProtocolBufferException e) { + logger.log( + XdsLogLevel.WARNING, + "Failed to unpack HttpConnectionManagers in Listeners of LDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.LDS, ldsResourceSubscribers.keySet(), + ldsResponse.getVersionInfo(), "Malformed LDS response: " + e); + return; + } + + Map ldsUpdates = new HashMap<>(); + Set rdsNames = new HashSet<>(); + String errorMessage = null; + for (Map.Entry entry : httpConnectionManagers.entrySet()) { + String listenerName = entry.getKey(); + HttpConnectionManager hcm = entry.getValue(); + LdsUpdate.Builder updateBuilder = LdsUpdate.newBuilder(); + if (hcm.hasRouteConfig()) { + for (VirtualHost virtualHostProto : hcm.getRouteConfig().getVirtualHostsList()) { + StructOrError virtualHost = + EnvoyProtoData.VirtualHost.fromEnvoyProtoVirtualHost(virtualHostProto); + if (virtualHost.getErrorDetail() != null) { + errorMessage = "Listener " + listenerName + " contains invalid virtual host: " + + virtualHost.getErrorDetail(); + break; + } else { + updateBuilder.addVirtualHost(virtualHost.getStruct()); + } + } + } else if (hcm.hasRds()) { + Rds rds = hcm.getRds(); + if (!rds.getConfigSource().hasAds()) { + errorMessage = "Listener " + listenerName + " with RDS config_source not set to ADS"; + } else { + updateBuilder.setRdsName(rds.getRouteConfigName()); + rdsNames.add(rds.getRouteConfigName()); + } + } else { + errorMessage = "Listener " + listenerName + " without inline RouteConfiguration or RDS"; + } + if (errorMessage != null) { + break; + } + if (hcm.hasCommonHttpProtocolOptions()) { + HttpProtocolOptions options = hcm.getCommonHttpProtocolOptions(); + if (options.hasMaxStreamDuration()) { + updateBuilder.setHttpMaxStreamDurationNano( + Durations.toNanos(options.getMaxStreamDuration())); + } + } + ldsUpdates.put(listenerName, updateBuilder.build()); + } + if (errorMessage != null) { + adsStream.sendNackRequest( + ResourceType.LDS, ldsResourceSubscribers.keySet(), + ldsResponse.getVersionInfo(), errorMessage); + return; + } + adsStream.sendAckRequest( + ResourceType.LDS, ldsResourceSubscribers.keySet(), ldsResponse.getVersionInfo()); + + for (String resource : ldsResourceSubscribers.keySet()) { + ResourceSubscriber subscriber = ldsResourceSubscribers.get(resource); + if (ldsUpdates.containsKey(resource)) { + subscriber.onData(ldsUpdates.get(resource)); + } else { + subscriber.onAbsent(); + } + } + for (String resource : rdsResourceSubscribers.keySet()) { + if (!rdsNames.contains(resource)) { + ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource); + subscriber.onAbsent(); + } + } + } + + private void handleRdsResponse(DiscoveryResponseData rdsResponse) { + // Unpack RouteConfiguration messages. + Map routeConfigs = + new HashMap<>(rdsResponse.getResourcesList().size()); + try { + for (com.google.protobuf.Any res : rdsResponse.getResourcesList()) { + if (res.getTypeUrl().equals(ADS_TYPE_URL_RDS_V2)) { + res = res.toBuilder().setTypeUrl(ADS_TYPE_URL_RDS).build(); + } + RouteConfiguration rc = res.unpack(RouteConfiguration.class); + routeConfigs.put(rc.getName(), rc); + } + } catch (InvalidProtocolBufferException e) { + logger.log( + XdsLogLevel.WARNING, "Failed to unpack RouteConfiguration in RDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.RDS, rdsResourceSubscribers.keySet(), + rdsResponse.getVersionInfo(), "Malformed RDS response: " + e); + return; + } + logger.log( + XdsLogLevel.INFO, "Received RDS response for resources: {0}", routeConfigs.keySet()); + + Map rdsUpdates = new HashMap<>(); + String errorMessage = null; + for (Map.Entry entry : routeConfigs.entrySet()) { + String routeConfigName = entry.getKey(); + RouteConfiguration routeConfig = entry.getValue(); + List virtualHosts = + new ArrayList<>(routeConfig.getVirtualHostsCount()); + for (VirtualHost virtualHostProto : routeConfig.getVirtualHostsList()) { + StructOrError virtualHost = + EnvoyProtoData.VirtualHost.fromEnvoyProtoVirtualHost(virtualHostProto); + if (virtualHost.getErrorDetail() != null) { + errorMessage = "RouteConfiguration " + routeConfigName + + " contains invalid virtual host: " + virtualHost.getErrorDetail(); + break; + } else { + virtualHosts.add(virtualHost.getStruct()); + } + } + if (errorMessage != null) { + break; + } + rdsUpdates.put(routeConfigName, RdsUpdate.fromVirtualHosts(virtualHosts)); + } + if (errorMessage != null) { + adsStream.sendNackRequest(ResourceType.RDS, rdsResourceSubscribers.keySet(), + rdsResponse.getVersionInfo(), errorMessage); + return; + } + adsStream.sendAckRequest(ResourceType.RDS, rdsResourceSubscribers.keySet(), + rdsResponse.getVersionInfo()); + + for (String resource : rdsResourceSubscribers.keySet()) { + if (rdsUpdates.containsKey(resource)) { + ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource); + subscriber.onData(rdsUpdates.get(resource)); + } + } + } + + private void handleLdsResponseForServer(DiscoveryResponseData ldsResponse) { + // Unpack Listener messages. + Listener requestedListener = null; + logger.log(XdsLogLevel.DEBUG, "Listener count: {0}", ldsResponse.getResourcesList().size()); + try { + for (com.google.protobuf.Any res : ldsResponse.getResourcesList()) { + if (res.getTypeUrl().equals(ADS_TYPE_URL_LDS_V2)) { + res = res.toBuilder().setTypeUrl(ADS_TYPE_URL_LDS).build(); + } + Listener listener = res.unpack(Listener.class); + logger.log(XdsLogLevel.DEBUG, "Found listener {0}", listener.toString()); + if (isRequestedListener(listener)) { + requestedListener = listener; + logger.log(XdsLogLevel.DEBUG, "Requested listener found: {0}", listener.getName()); + } + } + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Failed to unpack Listeners in LDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.LDS, ImmutableList.of(), + ldsResponse.getVersionInfo(), "Malformed LDS response: " + e); + return; + } + ListenerUpdate listenerUpdate = null; + if (requestedListener != null) { + if (ldsRespTimer != null) { + ldsRespTimer.cancel(); + ldsRespTimer = null; + } + try { + listenerUpdate = ListenerUpdate.newBuilder() + .setListener(EnvoyServerProtoData.Listener.fromEnvoyProtoListener(requestedListener)) + .build(); + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Failed to unpack Listener in LDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.LDS, ImmutableList.of(), + ldsResponse.getVersionInfo(), "Malformed LDS response: " + e); + return; + } + } else { + if (ldsRespTimer == null) { + listenerWatcher.onResourceDoesNotExist(":" + listenerPort); + } + } + adsStream.sendAckRequest(ResourceType.LDS, ImmutableList.of(), + ldsResponse.getVersionInfo()); + if (listenerUpdate != null) { + listenerWatcher.onListenerChanged(listenerUpdate); + } + } + + private boolean isRequestedListener(Listener listener) { + // TODO(sanjaypujare): check listener.getName() once we know what xDS server returns + return isAddressMatching(listener.getAddress()) + && hasMatchingFilter(listener.getFilterChainsList()); + } + + private boolean isAddressMatching(Address address) { + // TODO(sanjaypujare): check IP address once we know xDS server will include it + return address.hasSocketAddress() + && (address.getSocketAddress().getPortValue() == listenerPort); + } + + private boolean hasMatchingFilter(List filterChainsList) { + // TODO(sanjaypujare): if myIp to be checked against filterChainMatch.getPrefixRangesList() + for (FilterChain filterChain : filterChainsList) { + FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch(); + + if (listenerPort == filterChainMatch.getDestinationPort().getValue()) { + return true; + } + } + return false; + } + + /** + * Handles CDS response, which contains a list of Cluster messages with information for a logical + * cluster. The response is NACKed if messages for requested resources contain invalid + * information for gRPC's usage. Otherwise, an ACK request is sent to management server. + * Response data for requested clusters is cached locally, in case of new cluster watchers + * interested in the same clusters are added later. + */ + private void handleCdsResponse(DiscoveryResponseData cdsResponse) { + adsStream.cdsRespNonce = cdsResponse.getNonce(); + + // Unpack Cluster messages. + List clusters = new ArrayList<>(cdsResponse.getResourcesList().size()); + List clusterNames = new ArrayList<>(cdsResponse.getResourcesList().size()); + try { + for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) { + if (res.getTypeUrl().equals(ADS_TYPE_URL_CDS_V2)) { + res = res.toBuilder().setTypeUrl(ADS_TYPE_URL_CDS).build(); + } + Cluster cluster = res.unpack(Cluster.class); + clusters.add(cluster); + clusterNames.add(cluster.getName()); + } + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.CDS, cdsResourceSubscribers.keySet(), + cdsResponse.getVersionInfo(), "Malformed CDS response: " + e); + return; + } + logger.log(XdsLogLevel.INFO, "Received CDS response for resources: {0}", clusterNames); + + String errorMessage = null; + // Cluster information update for requested clusters received in this CDS response. + Map cdsUpdates = new HashMap<>(); + // CDS responses represents the state of the world, EDS services not referenced by + // Clusters are those no longer exist. + Set edsServices = new HashSet<>(); + for (Cluster cluster : clusters) { + String clusterName = cluster.getName(); + // Skip information for clusters not requested. + // Management server is required to always send newly requested resources, even if they + // may have been sent previously (proactively). Thus, client does not need to cache + // unrequested resources. + if (!cdsResourceSubscribers.containsKey(clusterName)) { + continue; + } + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder(); + updateBuilder.setClusterName(clusterName); + // The type field must be set to EDS. + if (!cluster.getType().equals(DiscoveryType.EDS)) { + errorMessage = "Cluster " + clusterName + " : only EDS discovery type is supported " + + "in gRPC."; + break; + } + // In the eds_cluster_config field, the eds_config field must be set to indicate to + // use EDS (must be set to use ADS). + EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig(); + if (!edsClusterConfig.getEdsConfig().hasAds()) { + errorMessage = "Cluster " + clusterName + " : field eds_cluster_config must be set to " + + "indicate to use EDS over ADS."; + break; + } + // If the service_name field is set, that value will be used for the EDS request. + if (!edsClusterConfig.getServiceName().isEmpty()) { + updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName()); + edsServices.add(edsClusterConfig.getServiceName()); + } else { + edsServices.add(clusterName); + } + // The lb_policy field must be set to ROUND_ROBIN. + if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) { + errorMessage = "Cluster " + clusterName + " : only round robin load balancing policy is " + + "supported in gRPC."; + break; + } + updateBuilder.setLbPolicy("round_robin"); + // If the lrs_server field is set, it must have its self field set, in which case the + // client should use LRS for load reporting. Otherwise (the lrs_server field is not set), + // LRS load reporting will be disabled. + if (cluster.hasLrsServer()) { + if (!cluster.getLrsServer().hasSelf()) { + errorMessage = "Cluster " + clusterName + " : only support enabling LRS for the same " + + "management server."; + break; + } + updateBuilder.setLrsServerName(""); + } + try { + UpstreamTlsContext upstreamTlsContext = getTlsContextFromCluster(cluster); + if (upstreamTlsContext != null && upstreamTlsContext.getCommonTlsContext() != null) { + updateBuilder.setUpstreamTlsContext(upstreamTlsContext); + } + } catch (InvalidProtocolBufferException e) { + errorMessage = "Cluster " + clusterName + " : " + e.getMessage(); + break; + } + cdsUpdates.put(clusterName, updateBuilder.build()); + } + if (errorMessage != null) { + adsStream.sendNackRequest( + ResourceType.CDS, + cdsResourceSubscribers.keySet(), + cdsResponse.getVersionInfo(), + errorMessage); + return; + } + adsStream.sendAckRequest(ResourceType.CDS, cdsResourceSubscribers.keySet(), + cdsResponse.getVersionInfo()); + + for (String resource : cdsResourceSubscribers.keySet()) { + ResourceSubscriber subscriber = cdsResourceSubscribers.get(resource); + if (cdsUpdates.containsKey(resource)) { + subscriber.onData(cdsUpdates.get(resource)); + } else { + subscriber.onAbsent(); + } + } + for (String resource : edsResourceSubscribers.keySet()) { + ResourceSubscriber subscriber = edsResourceSubscribers.get(resource); + if (!edsServices.contains(resource)) { + subscriber.onAbsent(); + } + } + } + + @Nullable + private static UpstreamTlsContext getTlsContextFromCluster(Cluster cluster) + throws InvalidProtocolBufferException { + if (cluster.hasTransportSocket() && "tls".equals(cluster.getTransportSocket().getName())) { + Any any = cluster.getTransportSocket().getTypedConfig(); + return UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext( + io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.parseFrom( + any.getValue())); + } + return null; + } + + /** + * Handles EDS response, which contains a list of ClusterLoadAssignment messages with + * endpoint load balancing information for each cluster. The response is NACKed if messages + * for requested resources contain invalid information for gRPC's usage. Otherwise, + * an ACK request is sent to management server. Response data for requested clusters is + * cached locally, in case of new endpoint watchers interested in the same clusters + * are added later. + */ + private void handleEdsResponse(DiscoveryResponseData edsResponse) { + // Unpack ClusterLoadAssignment messages. + List clusterLoadAssignments = + new ArrayList<>(edsResponse.getResourcesList().size()); + List claNames = new ArrayList<>(edsResponse.getResourcesList().size()); + try { + for (com.google.protobuf.Any res : edsResponse.getResourcesList()) { + if (res.getTypeUrl().equals(ADS_TYPE_URL_EDS_V2)) { + res = res.toBuilder().setTypeUrl(ADS_TYPE_URL_EDS).build(); + } + ClusterLoadAssignment assignment = res.unpack(ClusterLoadAssignment.class); + clusterLoadAssignments.add(assignment); + claNames.add(assignment.getClusterName()); + } + } catch (InvalidProtocolBufferException e) { + logger.log( + XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e); + adsStream.sendNackRequest( + ResourceType.EDS, edsResourceSubscribers.keySet(), + edsResponse.getVersionInfo(), "Malformed EDS response: " + e); + return; + } + logger.log(XdsLogLevel.INFO, "Received EDS response for resources: {0}", claNames); + + String errorMessage = null; + // Endpoint information updates for requested clusters received in this EDS response. + Map edsUpdates = new HashMap<>(); + // Walk through each ClusterLoadAssignment message. If any of them for requested clusters + // contain invalid information for gRPC's load balancing usage, the whole response is rejected. + for (ClusterLoadAssignment assignment : clusterLoadAssignments) { + String clusterName = assignment.getClusterName(); + // Skip information for clusters not requested. + // Management server is required to always send newly requested resources, even if they + // may have been sent previously (proactively). Thus, client does not need to cache + // unrequested resources. + if (!edsResourceSubscribers.containsKey(clusterName)) { + continue; + } + EdsUpdate.Builder updateBuilder = EdsUpdate.newBuilder(); + updateBuilder.setClusterName(clusterName); + Set priorities = new HashSet<>(); + int maxPriority = -1; + for (io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints localityLbEndpoints + : assignment.getEndpointsList()) { + // Filter out localities without or with 0 weight. + if (!localityLbEndpoints.hasLoadBalancingWeight() + || localityLbEndpoints.getLoadBalancingWeight().getValue() < 1) { + continue; + } + int localityPriority = localityLbEndpoints.getPriority(); + if (localityPriority < 0) { + errorMessage = + "ClusterLoadAssignment " + clusterName + " : locality with negative priority."; + break; + } + maxPriority = Math.max(maxPriority, localityPriority); + priorities.add(localityPriority); + // The endpoint field of each lb_endpoints must be set. + // Inside of it: the address field must be set. + for (LbEndpoint lbEndpoint : localityLbEndpoints.getLbEndpointsList()) { + if (!lbEndpoint.getEndpoint().hasAddress()) { + errorMessage = "ClusterLoadAssignment " + clusterName + " : endpoint with no address."; + break; + } + } + if (errorMessage != null) { + break; + } + // Note endpoints with health status other than UNHEALTHY and UNKNOWN are still + // handed over to watching parties. It is watching parties' responsibility to + // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy(). + updateBuilder.addLocalityLbEndpoints( + Locality.fromEnvoyProtoLocality(localityLbEndpoints.getLocality()), + LocalityLbEndpoints.fromEnvoyProtoLocalityLbEndpoints(localityLbEndpoints)); + } + if (errorMessage != null) { + break; + } + if (priorities.size() != maxPriority + 1) { + errorMessage = "ClusterLoadAssignment " + clusterName + " : sparse priorities."; + break; + } + for (ClusterLoadAssignment.Policy.DropOverload dropOverload + : assignment.getPolicy().getDropOverloadsList()) { + updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload)); + } + EdsUpdate update = updateBuilder.build(); + edsUpdates.put(clusterName, update); + } + if (errorMessage != null) { + adsStream.sendNackRequest( + ResourceType.EDS, + edsResourceSubscribers.keySet(), + edsResponse.getVersionInfo(), + errorMessage); + return; + } + adsStream.sendAckRequest(ResourceType.EDS, edsResourceSubscribers.keySet(), + edsResponse.getVersionInfo()); + + for (String resource : edsResourceSubscribers.keySet()) { + ResourceSubscriber subscriber = edsResourceSubscribers.get(resource); + if (edsUpdates.containsKey(resource)) { + subscriber.onData(edsUpdates.get(resource)); + } + } + } + + private void adjustResourceSubscription(ResourceType type, Collection resources) { + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + // Currently in retry backoff. + return; + } + if (adsStream == null) { + startRpcStream(); + } + adsStream.sendXdsRequest(type, resources); + } + + @VisibleForTesting + final class RpcRetryTask implements Runnable { + @Override + public void run() { + startRpcStream(); + if (listenerWatcher != null) { + adsStream.sendXdsRequest(ResourceType.LDS, ImmutableList.of()); + ldsRespTimer = + syncContext + .schedule( + new ListenerResourceFetchTimeoutTask(":" + listenerPort), + INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); + } + if (!ldsResourceSubscribers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.LDS, ldsResourceSubscribers.keySet()); + for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { + subscriber.restartTimer(); + } + } + if (!rdsResourceSubscribers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.RDS, rdsResourceSubscribers.keySet()); + for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { + subscriber.restartTimer(); + } + } + if (!cdsResourceSubscribers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.CDS, cdsResourceSubscribers.keySet()); + for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { + subscriber.restartTimer(); + } + } + if (!edsResourceSubscribers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.EDS, edsResourceSubscribers.keySet()); + for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { + subscriber.restartTimer(); + } + } + } + } + + @VisibleForTesting + enum ResourceType { + UNKNOWN, LDS, RDS, CDS, EDS; + + @VisibleForTesting + String typeUrl() { + switch (this) { + case LDS: + return ADS_TYPE_URL_LDS; + case RDS: + return ADS_TYPE_URL_RDS; + case CDS: + return ADS_TYPE_URL_CDS; + case EDS: + return ADS_TYPE_URL_EDS; + case UNKNOWN: + default: + throw new AssertionError("Unknown or missing case in enum switch: " + this); + } + } + + private String typeUrlV2() { + switch (this) { + case LDS: + return ADS_TYPE_URL_LDS_V2; + case RDS: + return ADS_TYPE_URL_RDS_V2; + case CDS: + return ADS_TYPE_URL_CDS_V2; + case EDS: + return ADS_TYPE_URL_EDS_V2; + case UNKNOWN: + default: + throw new AssertionError("Unknown or missing case in enum switch: " + this); + } + } + + private static ResourceType fromTypeUrl(String typeUrl) { + switch (typeUrl) { + case ADS_TYPE_URL_LDS: + // fall trough + case ADS_TYPE_URL_LDS_V2: + return LDS; + case ADS_TYPE_URL_RDS: + // fall through + case ADS_TYPE_URL_RDS_V2: + return RDS; + case ADS_TYPE_URL_CDS: + // fall through + case ADS_TYPE_URL_CDS_V2: + return CDS; + case ADS_TYPE_URL_EDS: + // fall through + case ADS_TYPE_URL_EDS_V2: + return EDS; + default: + return UNKNOWN; + } + } + } + + /** + * Tracks a single subscribed resource. + */ + private final class ResourceSubscriber { + private final ResourceType type; + private final String resource; + private final Set watchers = new HashSet<>(); + private ResourceUpdate data; + private boolean absent; + private ScheduledHandle respTimer; + + ResourceSubscriber(ResourceType type, String resource) { + this.type = type; + this.resource = resource; + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + return; + } + restartTimer(); + } + + void addWatcher(ResourceWatcher watcher) { + checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); + watchers.add(watcher); + if (data != null) { + notifyWatcher(watcher, data); + } else if (absent) { + watcher.onResourceDoesNotExist(resource); + } + } + + void removeWatcher(ResourceWatcher watcher) { + checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher); + watchers.remove(watcher); + } + + void restartTimer() { + class ResourceNotFound implements Runnable { + @Override + public void run() { + logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); + respTimer = null; + onAbsent(); + } + + @Override + public String toString() { + return type + this.getClass().getSimpleName(); + } + } + + respTimer = syncContext.schedule( + new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, + timeService); + } + + void stopTimer() { + if (respTimer != null && respTimer.isPending()) { + respTimer.cancel(); + respTimer = null; + } + } + + boolean isWatched() { + return !watchers.isEmpty(); + } + + void onData(ResourceUpdate data) { + if (respTimer != null && respTimer.isPending()) { + respTimer.cancel(); + respTimer = null; + } + ResourceUpdate oldData = this.data; + this.data = data; + absent = false; + if (!Objects.equals(oldData, data)) { + for (ResourceWatcher watcher : watchers) { + notifyWatcher(watcher, data); + } + } + } + + void onAbsent() { + if (respTimer != null && respTimer.isPending()) { // too early to conclude absence + return; + } + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); + if (!absent) { + data = null; + absent = true; + for (ResourceWatcher watcher : watchers) { + watcher.onResourceDoesNotExist(resource); + } + } + } + + void onError(Status error) { + if (respTimer != null && respTimer.isPending()) { + respTimer.cancel(); + respTimer = null; + } + for (ResourceWatcher watcher : watchers) { + watcher.onError(error); + } + } + + private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) { + switch (type) { + case LDS: + ((LdsResourceWatcher) watcher).onChanged((LdsUpdate) update); + break; + case RDS: + ((RdsResourceWatcher) watcher).onChanged((RdsUpdate) update); + break; + case CDS: + ((CdsResourceWatcher) watcher).onChanged((CdsUpdate) update); + break; + case EDS: + ((EdsResourceWatcher) watcher).onChanged((EdsUpdate) update); + break; + case UNKNOWN: + default: + throw new AssertionError("should never be here"); + } + } + } + + private static final class DiscoveryRequestData { + private final ResourceType resourceType; + private final Collection resourceNames; + private final String versionInfo; + private final String responseNonce; + private final Node node; + @Nullable + private final com.google.rpc.Status errorDetail; + + DiscoveryRequestData( + ResourceType resourceType, Collection resourceNames, String versionInfo, + String responseNonce, Node node, @Nullable com.google.rpc.Status errorDetail) { + this.resourceType = resourceType; + this.resourceNames = resourceNames; + this.versionInfo = versionInfo; + this.responseNonce = responseNonce; + this.node = node; + this.errorDetail = errorDetail; + } + + DiscoveryRequest toEnvoyProto() { + DiscoveryRequest.Builder builder = + DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(node.toEnvoyProtoNode()) + .addAllResourceNames(resourceNames) + .setTypeUrl(resourceType.typeUrl()) + .setResponseNonce(responseNonce); + if (errorDetail != null) { + builder.setErrorDetail(errorDetail); + } + return builder.build(); + } + + io.envoyproxy.envoy.api.v2.DiscoveryRequest toEnvoyProtoV2() { + io.envoyproxy.envoy.api.v2.DiscoveryRequest.Builder builder = + io.envoyproxy.envoy.api.v2.DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(node.toEnvoyProtoNodeV2()) + .addAllResourceNames(resourceNames) + .setTypeUrl(resourceType.typeUrlV2()) + .setResponseNonce(responseNonce); + if (errorDetail != null) { + builder.setErrorDetail(errorDetail); + } + return builder.build(); + } + } + + private static final class DiscoveryResponseData { + private final ResourceType resourceType; + private final List resources; + private final String versionInfo; + private final String nonce; + + DiscoveryResponseData( + ResourceType resourceType, List resources, String versionInfo, String nonce) { + this.resourceType = resourceType; + this.resources = resources; + this.versionInfo = versionInfo; + this.nonce = nonce; + } + + ResourceType getResourceType() { + return resourceType; + } + + List getResourcesList() { + return resources; + } + + String getVersionInfo() { + return versionInfo; + } + + String getNonce() { + return nonce; + } + + static DiscoveryResponseData fromEnvoyProto(DiscoveryResponse proto) { + return new DiscoveryResponseData( + ResourceType.fromTypeUrl(proto.getTypeUrl()), proto.getResourcesList(), + proto.getVersionInfo(), proto.getNonce()); + } + + static DiscoveryResponseData fromEnvoyProtoV2( + io.envoyproxy.envoy.api.v2.DiscoveryResponse proto) { + return new DiscoveryResponseData( + ResourceType.fromTypeUrl(proto.getTypeUrl()), proto.getResourcesList(), + proto.getVersionInfo(), proto.getNonce()); + } + } + + private abstract class AbstractAdsStream { + private boolean responseReceived; + private boolean closed; + + // Response nonce for the most recently received discovery responses of each resource type. + // Client initiated requests start response nonce with empty string. + // A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest + // corresponds to. + // A nonce becomes stale following a newer nonce being presented to the client in a + // DiscoveryResponse. + private String ldsRespNonce = ""; + private String rdsRespNonce = ""; + private String cdsRespNonce = ""; + private String edsRespNonce = ""; + + abstract void start(); + + abstract void sendDiscoveryRequest(DiscoveryRequestData request); + + abstract void sendError(Exception error); + + // Must run in syncContext. + final void handleResponse(DiscoveryResponseData response) { + if (closed) { + return; + } + responseReceived = true; + String respNonce = response.getNonce(); + // Nonce in each response is echoed back in the following ACK/NACK request. It is + // used for management server to identify which response the client is ACKing/NACking. + // To avoid confusion, client-initiated requests will always use the nonce in + // most recently received responses of each resource type. + ResourceType resourceType = response.getResourceType(); + switch (resourceType) { + case LDS: + ldsRespNonce = respNonce; + handleLdsResponse(response); + break; + case RDS: + rdsRespNonce = respNonce; + handleRdsResponse(response); + break; + case CDS: + cdsRespNonce = respNonce; + handleCdsResponse(response); + break; + case EDS: + edsRespNonce = respNonce; + handleEdsResponse(response); + break; + case UNKNOWN: + logger.log( + XdsLogLevel.WARNING, + "Received an unknown type of DiscoveryResponse\n{0}", + respNonce); + break; + default: + throw new AssertionError("Missing case in enum switch: " + resourceType); + } + } + + // Must run in syncContext. + final void handleRpcError(Throwable t) { + handleStreamClosed(Status.fromThrowable(t)); + } + + // Must run in syncContext. + final void handleRpcCompleted() { + handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); + } + + private void handleStreamClosed(Status error) { + checkArgument(!error.isOk(), "unexpected OK status"); + if (closed) { + return; + } + logger.log( + XdsLogLevel.ERROR, + "ADS stream closed with status {0}: {1}. Cause: {2}", + error.getCode(), error.getDescription(), error.getCause()); + closed = true; + if (listenerWatcher != null) { + listenerWatcher.onError(error); + } + for (ResourceSubscriber subscriber : ldsResourceSubscribers.values()) { + subscriber.onError(error); + } + for (ResourceSubscriber subscriber : rdsResourceSubscribers.values()) { + subscriber.onError(error); + } + for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) { + subscriber.onError(error); + } + for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) { + subscriber.onError(error); + } + cleanUp(); + cleanUpResourceTimers(); + if (responseReceived || retryBackoffPolicy == null) { + // Reset the backoff sequence if had received a response, or backoff sequence + // has never been initialized. + retryBackoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = 0; + if (!responseReceived) { + delayNanos = + Math.max( + 0, + retryBackoffPolicy.nextBackoffNanos() + - adsStreamRetryStopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos); + rpcRetryTimer = + syncContext.schedule( + new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); + } + + private void close(Exception error) { + if (closed) { + return; + } + closed = true; + cleanUp(); + sendError(error); + } + + private void cleanUp() { + if (adsStream == this) { + adsStream = null; + } + } + + /** + * Sends a DiscoveryRequest for the given resource name to management server. Memories the + * requested resource name (except for LDS as we always request for the singleton Listener) + * as we need it to find resources in responses. + */ + private void sendXdsRequest(ResourceType resourceType, Collection resourceNames) { + String version; + String nonce; + switch (resourceType) { + case LDS: + version = ldsVersion; + nonce = ldsRespNonce; + logger.log(XdsLogLevel.INFO, "Sending LDS request for resources: {0}", resourceNames); + break; + case RDS: + version = rdsVersion; + nonce = rdsRespNonce; + logger.log(XdsLogLevel.INFO, "Sending RDS request for resources: {0}", resourceNames); + break; + case CDS: + version = cdsVersion; + nonce = cdsRespNonce; + logger.log(XdsLogLevel.INFO, "Sending CDS request for resources: {0}", resourceNames); + break; + case EDS: + version = edsVersion; + nonce = edsRespNonce; + logger.log(XdsLogLevel.INFO, "Sending EDS request for resources: {0}", resourceNames); + break; + case UNKNOWN: + default: + throw new AssertionError("Unknown or missing case in enum switch: " + resourceType); + } + DiscoveryRequestData request = + new DiscoveryRequestData(resourceType, resourceNames, version, nonce, node, null); + sendDiscoveryRequest(request); + } + + /** + * Sends a DiscoveryRequest with the given information as an ACK. Updates the latest accepted + * version for the corresponding resource type. + */ + private void sendAckRequest(ResourceType resourceType, Collection resourceNames, + String versionInfo) { + String nonce; + switch (resourceType) { + case LDS: + ldsVersion = versionInfo; + nonce = ldsRespNonce; + logger.log(XdsLogLevel.WARNING, "Sending ACK for LDS update, version: {0}", versionInfo); + break; + case RDS: + rdsVersion = versionInfo; + nonce = rdsRespNonce; + logger.log(XdsLogLevel.WARNING, "Sending ACK for RDS update, version: {0}", versionInfo); + break; + case CDS: + cdsVersion = versionInfo; + nonce = cdsRespNonce; + logger.log(XdsLogLevel.WARNING, "Sending ACK for CDS update, version: {0}", versionInfo); + break; + case EDS: + edsVersion = versionInfo; + nonce = edsRespNonce; + logger.log(XdsLogLevel.WARNING, "Sending ACK for EDS update, version: {0}", versionInfo); + break; + case UNKNOWN: + default: + throw new AssertionError("Unknown or missing case in enum switch: " + resourceType); + } + DiscoveryRequestData request = + new DiscoveryRequestData(resourceType, resourceNames, versionInfo, nonce, node, null); + sendDiscoveryRequest(request); + } + + /** + * Sends a DiscoveryRequest with the given information as an NACK. NACK takes the previous + * accepted version. + */ + private void sendNackRequest(ResourceType resourceType, Collection resourceNames, + String rejectVersion, String message) { + String versionInfo; + String nonce; + switch (resourceType) { + case LDS: + versionInfo = ldsVersion; + nonce = ldsRespNonce; + logger.log( + XdsLogLevel.WARNING, + "Sending NACK for LDS update, version: {0}, reason: {1}", + rejectVersion, + message); + break; + case RDS: + versionInfo = rdsVersion; + nonce = rdsRespNonce; + logger.log( + XdsLogLevel.WARNING, + "Sending NACK for RDS update, version: {0}, reason: {1}", + rejectVersion, + message); + break; + case CDS: + versionInfo = cdsVersion; + nonce = cdsRespNonce; + logger.log( + XdsLogLevel.WARNING, + "Sending NACK for CDS update, version: {0}, reason: {1}", + rejectVersion, + message); + break; + case EDS: + versionInfo = edsVersion; + nonce = edsRespNonce; + logger.log( + XdsLogLevel.WARNING, + "Sending NACK for EDS update, version: {0}, reason: {1}", + rejectVersion, + message); + break; + case UNKNOWN: + default: + throw new AssertionError("Unknown or missing case in enum switch: " + resourceType); + } + com.google.rpc.Status error = com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT_VALUE) + .setMessage(message) + .build(); + DiscoveryRequestData request = + new DiscoveryRequestData(resourceType, resourceNames, versionInfo, nonce, node, error); + sendDiscoveryRequest(request); + } + } + + private final class AdsStreamV2 extends AbstractAdsStream { + private final io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc + .AggregatedDiscoveryServiceStub stubV2; + private StreamObserver requestWriterV2; + + AdsStreamV2() { + stubV2 = io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub( + xdsChannel.getManagedChannel()); + } + + @Override + void start() { + StreamObserver responseReaderV2 = + new StreamObserver() { + @Override + public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", + ResourceType.fromTypeUrl(response.getTypeUrl()), + respPrinter.print(response)); + } + DiscoveryResponseData responseData = + DiscoveryResponseData.fromEnvoyProtoV2(response); + handleResponse(responseData); + } + }); + } + + @Override + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); + } + + @Override + public void onCompleted() { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); + } + }; + requestWriterV2 = stubV2.withWaitForReady().streamAggregatedResources(responseReaderV2); + } + + @Override + void sendDiscoveryRequest(DiscoveryRequestData request) { + checkState(requestWriterV2 != null, "ADS stream has not been started"); + io.envoyproxy.envoy.api.v2.DiscoveryRequest requestProto = + request.toEnvoyProtoV2(); + requestWriterV2.onNext(requestProto); + logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", requestProto); + } + + @Override + void sendError(Exception error) { + requestWriterV2.onError(error); + } + } + + // AdsStream V3 + private final class AdsStream extends AbstractAdsStream { + private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub; + private StreamObserver requestWriter; + + AdsStream() { + stub = AggregatedDiscoveryServiceGrpc.newStub(xdsChannel.getManagedChannel()); + } + + @Override + void start() { + StreamObserver responseReader = new StreamObserver() { + @Override + public void onNext(final DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", + ResourceType.fromTypeUrl(response.getTypeUrl()), respPrinter.print(response)); + } + DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response); + handleResponse(responseData); + } + }); + } + + @Override + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); + } + + @Override + public void onCompleted() { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); + } + }; + requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader); + } + + @Override + void sendDiscoveryRequest(DiscoveryRequestData request) { + checkState(requestWriter != null, "ADS stream has not been started"); + DiscoveryRequest requestProto = request.toEnvoyProto(); + requestWriter.onNext(requestProto); + logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", requestProto); + } + + @Override + void sendError(Exception error) { + requestWriter.onError(error); + } + } + + // TODO(chengyuanzhang): delete me. + @VisibleForTesting + final class ListenerResourceFetchTimeoutTask implements Runnable { + private String resourceName; + + ListenerResourceFetchTimeoutTask(String resourceName) { + this.resourceName = resourceName; + } + + @Override + public void run() { + logger.log( + XdsLogLevel.WARNING, + "Did not receive resource info {0} after {1} seconds, conclude it absent", + resourceName, INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); + ldsRespTimer = null; + listenerWatcher.onResourceDoesNotExist(resourceName); + } + } + + /** + * Convert protobuf message to human readable String format. Useful for protobuf messages + * containing {@link com.google.protobuf.Any} fields. + */ + @VisibleForTesting + static final class MessagePrinter { + private final JsonFormat.Printer printer; + + @VisibleForTesting + MessagePrinter() { + com.google.protobuf.TypeRegistry registry = + com.google.protobuf.TypeRegistry.newBuilder() + .add(Listener.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.Listener.getDescriptor()) + .add(HttpConnectionManager.getDescriptor()) + .add( + io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2 + .HttpConnectionManager.getDescriptor()) + .add(RouteConfiguration.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.RouteConfiguration.getDescriptor()) + .add(Cluster.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.Cluster.getDescriptor()) + .add(ClusterLoadAssignment.getDescriptor()) + .add(io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.getDescriptor()) + .build(); + printer = JsonFormat.printer().usingTypeRegistry(registry); + } + + @VisibleForTesting + String print(MessageOrBuilder message) { + String res; + try { + res = printer.print(message); + } catch (InvalidProtocolBufferException e) { + res = message + " (failed to pretty-print: " + e + ")"; + } + return res; + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java new file mode 100644 index 0000000000..d1bc5ee990 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java @@ -0,0 +1,1859 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsClientTestHelper.buildCluster; +import static io.grpc.xds.XdsClientTestHelper.buildClusterLoadAssignment; +import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryRequest; +import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse; +import static io.grpc.xds.XdsClientTestHelper.buildDropOverload; +import static io.grpc.xds.XdsClientTestHelper.buildLbEndpoint; +import static io.grpc.xds.XdsClientTestHelper.buildListener; +import static io.grpc.xds.XdsClientTestHelper.buildLocalityLbEndpoints; +import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration; +import static io.grpc.xds.XdsClientTestHelper.buildSecureCluster; +import static io.grpc.xds.XdsClientTestHelper.buildUpstreamTlsContext; +import static io.grpc.xds.XdsClientTestHelper.buildVirtualHost; +import static io.grpc.xds.XdsClientTestHelper.buildVirtualHosts; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Any; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; +import io.envoyproxy.envoy.config.core.v3.ConfigSource; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; +import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.internal.FakeClock.ScheduledTask; +import io.grpc.internal.FakeClock.TaskFilter; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.LbEndpoint; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.XdsClient.CdsResourceWatcher; +import io.grpc.xds.XdsClient.CdsUpdate; +import io.grpc.xds.XdsClient.EdsResourceWatcher; +import io.grpc.xds.XdsClient.EdsUpdate; +import io.grpc.xds.XdsClient.LdsResourceWatcher; +import io.grpc.xds.XdsClient.LdsUpdate; +import io.grpc.xds.XdsClient.RdsResourceWatcher; +import io.grpc.xds.XdsClient.RdsUpdate; +import io.grpc.xds.XdsClient.ResourceWatcher; +import io.grpc.xds.XdsClient.XdsChannel; +import io.grpc.xds.XdsClientImpl2.MessagePrinter; +import io.grpc.xds.XdsClientImpl2.ResourceType; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link XdsClientImpl2}. + */ +@RunWith(JUnit4.class) +public class XdsClientImplTest2 { + private static final String TARGET_NAME = "hello.googleapis.com"; + private static final String LDS_RESOURCE = "listener.googleapis.com"; + private static final String RDS_RESOURCE = "route-configuration.googleapis.com"; + private static final String CDS_RESOURCE = "cluster.googleapis.com"; + private static final String EDS_RESOURCE = "cluster-load-assignment.googleapis.com"; + private static final Node NODE = Node.newBuilder().build(); + private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(XdsClientImpl2.RpcRetryTask.class.getSimpleName()); + } + }; + + private static final FakeClock.TaskFilter LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = + new TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(ResourceType.LDS.toString()); + } + }; + + private static final FakeClock.TaskFilter RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = + new TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(ResourceType.RDS.toString()); + } + }; + + private static final FakeClock.TaskFilter CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = + new TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(ResourceType.CDS.toString()); + } + }; + + private static final FakeClock.TaskFilter EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command.toString().contains(ResourceType.EDS.toString()); + } + }; + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock fakeClock = new FakeClock(); + private final Queue> resourceDiscoveryCalls = + new ArrayDeque<>(); + private final Queue> loadReportCalls = + new ArrayDeque<>(); + private final AtomicBoolean adsEnded = new AtomicBoolean(true); + private final AtomicBoolean lrsEnded = new AtomicBoolean(true); + + @Captor + private ArgumentCaptor ldsUpdateCaptor; + @Captor + private ArgumentCaptor rdsUpdateCaptor; + @Captor + private ArgumentCaptor cdsUpdateCaptor; + @Captor + private ArgumentCaptor edsUpdateCaptor; + @Captor + private ArgumentCaptor errorCaptor; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; + @Mock + private LdsResourceWatcher ldsResourceWatcher; + @Mock + private RdsResourceWatcher rdsResourceWatcher; + @Mock + private CdsResourceWatcher cdsResourceWatcher; + @Mock + private EdsResourceWatcher edsResourceWatcher; + + private ManagedChannel channel; + private XdsClientImpl2 xdsClient; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.initMocks(this); + when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); + when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); + when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); + + final String serverName = InProcessServerBuilder.generateName(); + AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + RpcCall call = + new RpcCall<>(requestObserver, responseObserver); + resourceDiscoveryCalls.offer(call); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + adsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + return requestObserver; + } + }; + + LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() { + @Override + public StreamObserver streamLoadStats( + StreamObserver responseObserver) { + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); + @SuppressWarnings("unchecked") + StreamObserver requestObserver = mock(StreamObserver.class); + RpcCall call = + new RpcCall<>(requestObserver, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context context) { + lrsEnded.set(true); + } + }, MoreExecutors.directExecutor()); + loadReportCalls.offer(call); + return requestObserver; + } + }; + + cleanupRule.register( + InProcessServerBuilder + .forName(serverName) + .addService(adsServiceImpl) + .addService(lrsServiceImpl) + .directExecutor() + .build() + .start()); + channel = + cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + xdsClient = + new XdsClientImpl2( + TARGET_NAME, + new XdsChannel(channel, /* useProtocolV3= */ true), + EnvoyProtoData.Node.newBuilder().build(), + syncContext, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier()); + + assertThat(resourceDiscoveryCalls).isEmpty(); + assertThat(loadReportCalls).isEmpty(); + } + + @After + public void tearDown() { + xdsClient.shutdown(); + assertThat(adsEnded.get()).isTrue(); + assertThat(lrsEnded.get()).isTrue(); + assertThat(channel.isShutdown()).isTrue(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + @Test + public void ldsResourceNotFound() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener("bar.googleapis.com", + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(1))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + + verifyNoInteractions(ldsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void ldsResourceFound_containsVirtualHosts() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void ldsResourceFound_containsRdsName() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(RDS_RESOURCE) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void cachedLdsResource_data() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(RDS_RESOURCE) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); + xdsClient.watchLdsResource(LDS_RESOURCE, watcher); + verify(watcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void cachedLdsResource_absent() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); + LdsResourceWatcher watcher = mock(LdsResourceWatcher.class); + xdsClient.watchLdsResource(LDS_RESOURCE, watcher); + verify(watcher).onResourceDoesNotExist(LDS_RESOURCE); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void ldsResourceUpdated() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + + listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(RDS_RESOURCE) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build())))); + response = + buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); + } + + @Test + public void ldsResourceDeleted() { + RpcCall call = + startResourceWatcher(ResourceType.LDS, LDS_RESOURCE, ldsResourceWatcher); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0000"))); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + + response = buildDiscoveryResponse("1", Collections.emptyList(), + ResourceType.LDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + // Client sends an ACK LDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "1", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "0001"))); + verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); + } + + @Test + public void multipleLdsWatchers() { + String ldsResource = "bar.googleapis.com"; + LdsResourceWatcher watcher1 = mock(LdsResourceWatcher.class); + LdsResourceWatcher watcher2 = mock(LdsResourceWatcher.class); + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchLdsResource(ldsResource, watcher1); + xdsClient.watchLdsResource(ldsResource, watcher2); + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(LDS_RESOURCE, ldsResource), + ResourceType.LDS.typeUrl(), ""))); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); + verify(watcher1).onResourceDoesNotExist(ldsResource); + verify(watcher2).onResourceDoesNotExist(ldsResource); + + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build()))), + Any.pack(buildListener(ldsResource, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(4))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + verify(watcher1).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); + verify(watcher2).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); + } + + @Test + public void rdsResourceNotFound() { + RpcCall call = + startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); + List routeConfigs = ImmutableList.of( + Any.pack(buildRouteConfiguration("route-bar.googleapis.com", buildVirtualHosts(2)))); + DiscoveryResponse response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + + verifyNoInteractions(rdsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); + assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void rdsResourceFound() { + RpcCall call = + startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + DiscoveryResponse response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void cachedRdsResource_data() { + RpcCall call = + startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + DiscoveryResponse response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + + RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); + xdsClient.watchRdsResource(RDS_RESOURCE, watcher); + verify(watcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void cachedRdsResource_absent() { + RpcCall call = + startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); + RdsResourceWatcher watcher = mock(RdsResourceWatcher.class); + xdsClient.watchRdsResource(RDS_RESOURCE, watcher); + verify(watcher).onResourceDoesNotExist(RDS_RESOURCE); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void rdsResourceUpdated() { + RpcCall call = + startResourceWatcher(ResourceType.RDS, RDS_RESOURCE, rdsResourceWatcher); + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + DiscoveryResponse response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0000"))); + verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + + routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(4)))); + response = + buildDiscoveryResponse("1", routeConfigs, ResourceType.RDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + // Client sends an ACK RDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "1", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "0001"))); + verify(rdsResourceWatcher, times(2)).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); + } + + @Test + public void rdsResourceDeletedByLds() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); + RpcCall call = resourceDiscoveryCalls.poll(); + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRds( + Rds.newBuilder() + .setRouteConfigName(RDS_RESOURCE) + .setConfigSource( + ConfigSource.newBuilder() + .setAds(AggregatedConfigSource.getDefaultInstance()))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getRdsName()).isEqualTo(RDS_RESOURCE); + + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + response = buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + + listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(5))) + .build())))); + response = buildDiscoveryResponse("1", listeners, ResourceType.LDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture()); + assertThat(ldsUpdateCaptor.getValue().getVirtualHosts()).hasSize(5); + verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); + } + + @Test + public void multipleRdsWatchers() { + String rdsResource = "route-bar.googleapis.com"; + RdsResourceWatcher watcher1 = mock(RdsResourceWatcher.class); + RdsResourceWatcher watcher2 = mock(RdsResourceWatcher.class); + xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); + xdsClient.watchRdsResource(rdsResource, watcher1); + xdsClient.watchRdsResource(rdsResource, watcher2); + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(RDS_RESOURCE, rdsResource), + ResourceType.RDS.typeUrl(), ""))); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); + verify(watcher1).onResourceDoesNotExist(rdsResource); + verify(watcher2).onResourceDoesNotExist(rdsResource); + + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + DiscoveryResponse response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(2); + verifyNoMoreInteractions(watcher1, watcher2); + + routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(rdsResource, buildVirtualHosts(4)))); + response = + buildDiscoveryResponse("2", routeConfigs, ResourceType.RDS.typeUrl(), "0002"); + call.responseObserver.onNext(response); + + verify(watcher1).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); + verify(watcher2).onChanged(rdsUpdateCaptor.capture()); + assertThat(rdsUpdateCaptor.getValue().getVirtualHosts()).hasSize(4); + verifyNoMoreInteractions(rdsResourceWatcher); + } + + @Test + public void cdsResourceNotFound() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verifyNoInteractions(cdsResourceWatcher); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void cdsResourceFound() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + /** + * CDS response containing UpstreamTlsContext for a cluster. + */ + @Test + public void cdsResponseWithUpstreamTlsContext() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + + // Management server sends back CDS response with UpstreamTlsContext. + UpstreamTlsContext testUpstreamTlsContext = + buildUpstreamTlsContext("secret1", "unix:/var/uds2"); + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildSecureCluster(CDS_RESOURCE, + "eds-cluster-foo.googleapis.com", true, testUpstreamTlsContext)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate + .getUpstreamTlsContext(); + SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() + .getValidationContextSdsSecretConfig(); + assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); + assertThat( + Iterables.getOnlyElement( + validationContextSdsSecretConfig + .getSdsConfig() + .getApiConfigSource() + .getGrpcServicesList()) + .getGoogleGrpc() + .getTargetUri()) + .isEqualTo("unix:/var/uds2"); + } + + @Test + public void cachedCdsResource_data() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + + CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource(CDS_RESOURCE, watcher); + verify(watcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void cachedCdsResource_absent() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); + CdsResourceWatcher watcher = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource(CDS_RESOURCE, watcher); + verify(watcher).onResourceDoesNotExist(CDS_RESOURCE); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void cdsResourceUpdated() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + + String edsService = "eds-service-bar.googleapis.com"; + clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, edsService, true))); + response = buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + // Client sends an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); + } + + @Test + public void cdsResourceDeleted() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + List clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + + response = buildDiscoveryResponse("1", Collections.emptyList(), + ResourceType.CDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + // Client sends an ACK CDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "1", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0001"))); + verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); + } + + @Test + public void multipleCdsWatchers() { + String cdsResource = "cluster-bar.googleapis.com"; + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchCdsResource(cdsResource, watcher1); + xdsClient.watchCdsResource(cdsResource, watcher2); + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(CDS_RESOURCE, cdsResource), + ResourceType.CDS.typeUrl(), ""))); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); + verify(watcher1).onResourceDoesNotExist(cdsResource); + verify(watcher2).onResourceDoesNotExist(cdsResource); + + String edsService = "eds-service-bar.googleapis.com"; + List clusters = ImmutableList.of( + Any.pack(buildCluster(CDS_RESOURCE, null, false)), + Any.pack(buildCluster(cdsResource, edsService, true))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + verify(watcher1).onChanged(cdsUpdateCaptor.capture()); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.getEdsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); + verify(watcher2).onChanged(cdsUpdateCaptor.capture()); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.getEdsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); + } + + @Test + public void edsResourceNotFound() { + RpcCall call = + startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment("cluster-bar.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of()))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + verifyNoInteractions(edsResourceWatcher); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); + assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void edsResourceFound() { + RpcCall call = + startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), + 2, 1), /* locality with 0 endpoint */ + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + 0, 2) /* locality with 0 weight */), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + assertThat(edsUpdate.getDropPolicies()) + .containsExactly( + new DropOverload("lb", 200), + new DropOverload("throttle", 1000)); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0), + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); + } + + @Test + public void cachedEdsResource_data() { + RpcCall call = + startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), + 2, 1), /* locality with 0 endpoint */ + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + 0, 2) /* locality with 0 weight */), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sends an ACK EDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + + EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource(EDS_RESOURCE, watcher); + verify(watcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + assertThat(edsUpdate.getDropPolicies()) + .containsExactly( + new DropOverload("lb", 200), + new DropOverload("throttle", 1000)); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, + 2, true)), 1, 0), + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void cachedEdsResource_absent() { + RpcCall call = + startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); + EdsResourceWatcher watcher = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource(EDS_RESOURCE, watcher); + verify(watcher).onResourceDoesNotExist(EDS_RESOURCE); + verifyNoMoreInteractions(call.requestObserver); + } + + @Test + public void edsResourceUpdated() { + RpcCall call = + startResourceWatcher(ResourceType.EDS, EDS_RESOURCE, edsResourceWatcher); + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), + 2, 1), /* locality with 0 endpoint */ + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + 0, 2) /* locality with 0 weight */), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + // Client sent an ACK EDS request. + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", EDS_RESOURCE, ResourceType.EDS.typeUrl(), "0000"))); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + assertThat(edsUpdate.getDropPolicies()) + .containsExactly( + new DropOverload("lb", 200), + new DropOverload("throttle", 1000)); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, 2, true)), 1, 0), + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); + + clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + 2, 0)), + ImmutableList.of()))); + response = + buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture()); + edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("172.44.2.2", 8000, 3, true)), 2, 0)); + } + + @Test + public void edsResourceDeletedByCds() { + xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); + RpcCall call = resourceDiscoveryCalls.poll(); + List clusters = + ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, EDS_RESOURCE, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isEqualTo(EDS_RESOURCE); + + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0)), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + + clusters = ImmutableList.of(Any.pack(buildCluster(CDS_RESOURCE, null, false))); + response = + buildDiscoveryResponse("1", clusters, ResourceType.CDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); + assertThat(cdsUpdateCaptor.getValue().getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdateCaptor.getValue().getEdsServiceName()).isNull(); + verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); + } + + @Test + public void multipleEdsWatchers() { + String edsResource = "cluster-load-assignment-bar.googleapis.com"; + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); + xdsClient.watchEdsResource(edsResource, watcher1); + xdsClient.watchEdsResource(edsResource, watcher2); + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + argThat(new DiscoveryRequestMatcher(NODE, "", Arrays.asList(EDS_RESOURCE, edsResource), + ResourceType.EDS.typeUrl(), ""))); + + fakeClock.forwardTime(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); + verify(watcher1).onResourceDoesNotExist(edsResource); + verify(watcher2).onResourceDoesNotExist(edsResource); + + List clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of(), + 2, 1), /* locality with 0 endpoint */ + buildLocalityLbEndpoints("region4", "zone4", "subzone4", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNKNOWN, 5)), + 0, 2) /* locality with 0 weight */), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(EDS_RESOURCE); + assertThat(edsUpdate.getDropPolicies()) + .containsExactly( + new DropOverload("lb", 200), + new DropOverload("throttle", 1000)); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region1", "zone1", "subzone1"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("192.168.0.1", 8080, 2, true)), 1, 0), + new Locality("region3", "zone3", "subzone3"), + new LocalityLbEndpoints(ImmutableList.of(), 2, 1)); + verifyNoMoreInteractions(watcher1, watcher2); + + clusterLoadAssignments = + ImmutableList.of( + Any.pack( + buildClusterLoadAssignment(edsResource, + ImmutableList.of( + buildLocalityLbEndpoints("region2", "zone2", "subzone2", + ImmutableList.of( + buildLbEndpoint("172.44.2.2", 8000, HealthStatus.HEALTHY, 3)), + 2, 0)), + ImmutableList.of()))); + response = + buildDiscoveryResponse("1", clusterLoadAssignments, ResourceType.EDS.typeUrl(), "0001"); + call.responseObserver.onNext(response); + + verify(watcher1).onChanged(edsUpdateCaptor.capture()); + edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(edsResource); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("172.44.2.2", 8000, 3, true)), 2, 0)); + verify(watcher2).onChanged(edsUpdateCaptor.capture()); + edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo(edsResource); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) + .containsExactly( + new Locality("region2", "zone2", "subzone2"), + new LocalityLbEndpoints( + ImmutableList.of( + new LbEndpoint("172.44.2.2", 8000, 3, true)), 2, 0)); + verifyNoMoreInteractions(edsResourceWatcher); + } + + @Test + public void streamClosedAndRetryWithBackoff() { + InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); + xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + + // Management server closes the RPC stream with an error. + call.responseObserver.onError(Status.UNKNOWN.asException()); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verify(rdsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verify(cdsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verify(edsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + + // Retry after backoff. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + ScheduledTask retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); + fakeClock.forwardNanos(10L); + call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + + // Management server becomes unreachable. + call.responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + + // Retry after backoff. + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(100L); + fakeClock.forwardNanos(100L); + call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("63", listeners, ResourceType.LDS.typeUrl(), "3242"); + call.responseObserver.onNext(response); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), "3242"))); + + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + response = + buildDiscoveryResponse("5", routeConfigs, ResourceType.RDS.typeUrl(), "6764"); + call.responseObserver.onNext(response); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), "6764"))); + + call.responseObserver.onError(Status.DEADLINE_EXCEEDED.asException()); + verify(ldsResourceWatcher, times(3)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verify(rdsResourceWatcher, times(3)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED); + + // Reset backoff sequence and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + + // Management server becomes unreachable again. + call.responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(ldsResourceWatcher, times(4)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(rdsResourceWatcher, times(4)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + + // Retry after backoff. + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); + fakeClock.forwardNanos(20L); + call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "63", LDS_RESOURCE, ResourceType.LDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "5", RDS_RESOURCE, ResourceType.RDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void streamClosedAndRetryRaceWithAddRemoveWatchers() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); + RpcCall call = resourceDiscoveryCalls.poll(); + call.responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(rdsResourceWatcher).onError(errorCaptor.capture()); + assertThat(errorCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + ScheduledTask retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); + + xdsClient.cancelLdsResourceWatch(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.cancelRdsResourceWatch(RDS_RESOURCE, rdsResourceWatcher); + xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); + fakeClock.forwardNanos(10L); + call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", CDS_RESOURCE, ResourceType.CDS.typeUrl(), ""))); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", EDS_RESOURCE, ResourceType.EDS.typeUrl(), ""))); + verifyNoMoreInteractions(call.requestObserver); + + List listeners = ImmutableList.of( + Any.pack(buildListener(LDS_RESOURCE, + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig(buildRouteConfiguration("do not care", buildVirtualHosts(2))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + List routeConfigs = + ImmutableList.of(Any.pack(buildRouteConfiguration(RDS_RESOURCE, buildVirtualHosts(2)))); + response = + buildDiscoveryResponse("0", routeConfigs, ResourceType.RDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher); + } + + @Test + public void streamClosedAndRetryRestartResourceInitialFetchTimers() { + xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchRdsResource(RDS_RESOURCE, rdsResourceWatcher); + xdsClient.watchCdsResource(CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); + RpcCall call = resourceDiscoveryCalls.poll(); + ScheduledTask ldsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + ScheduledTask rdsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + ScheduledTask cdsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + ScheduledTask edsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + call.responseObserver.onError(Status.UNAVAILABLE.asException()); + assertThat(ldsResourceTimeout.isCancelled()).isTrue(); + assertThat(rdsResourceTimeout.isCancelled()).isTrue(); + assertThat(cdsResourceTimeout.isCancelled()).isTrue(); + assertThat(edsResourceTimeout.isCancelled()).isTrue(); + + fakeClock.forwardNanos(10L); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); + } + + /** + * Tests sending a streaming LRS RPC for each cluster to report loads for. + */ + @Test + public void reportLoadStatsToServer() { + String clusterName = "cluster-foo.googleapis.com"; + xdsClient.addClientStats(clusterName, null); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(null); + xdsClient.reportClientStats(); + RpcCall lrsCall = loadReportCalls.poll(); + verify(lrsCall.requestObserver).onNext(requestCaptor.capture()); + assertThat(requestCaptor.getValue().getClusterStatsCount()) + .isEqualTo(0); // initial request + + lrsCall.responseObserver.onNext( + LoadStatsResponse.newBuilder() + .addClusters(clusterName) + .setLoadReportingInterval(Durations.fromNanos(1000L)) + .build()); + fakeClock.forwardNanos(1000L); + verify(lrsCall.requestObserver, times(2)).onNext(requestCaptor.capture()); + ClusterStats report = Iterables.getOnlyElement(requestCaptor.getValue().getClusterStatsList()); + assertThat(report.getClusterName()).isEqualTo(clusterName); + + xdsClient.removeClientStats(clusterName, null); + fakeClock.forwardNanos(1000L); + verify(lrsCall.requestObserver, times(3)).onNext(requestCaptor.capture()); + assertThat(requestCaptor.getValue().getClusterStatsCount()) + .isEqualTo(0); // no more stats reported + + xdsClient.cancelClientStatsReport(); + assertThat(lrsEnded.get()).isTrue(); + // See more test on LoadReportClientTest.java + } + + @Test + public void messagePrinter_printLdsResponse() { + MessagePrinter printer = new MessagePrinter(); + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", + Any.pack( + HttpConnectionManager.newBuilder() + .setRouteConfig( + buildRouteConfiguration("route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "cluster.googleapis.com")))) + .build())))); + DiscoveryResponse response = + buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000"); + + String expectedString = "{\n" + + " \"versionInfo\": \"0\",\n" + + " \"resources\": [{\n" + + " \"@type\": \"type.googleapis.com/envoy.config.listener.v3.Listener\",\n" + + " \"name\": \"foo.googleapis.com:8080\",\n" + + " \"address\": {\n" + + " },\n" + + " \"filterChains\": [{\n" + + " }],\n" + + " \"apiListener\": {\n" + + " \"apiListener\": {\n" + + " \"@type\": \"type.googleapis.com/envoy.extensions.filters.network" + + ".http_connection_manager.v3.HttpConnectionManager\",\n" + + " \"routeConfig\": {\n" + + " \"name\": \"route-foo.googleapis.com\",\n" + + " \"virtualHosts\": [{\n" + + " \"name\": \"virtualhost00.googleapis.com\",\n" + + " \"domains\": [\"foo.googleapis.com\", \"bar.googleapis.com\"],\n" + + " \"routes\": [{\n" + + " \"match\": {\n" + + " \"prefix\": \"\"\n" + + " },\n" + + " \"route\": {\n" + + " \"cluster\": \"cluster.googleapis.com\"\n" + + " }\n" + + " }]\n" + + " }]\n" + + " }\n" + + " }\n" + + " }\n" + + " }],\n" + + " \"typeUrl\": \"type.googleapis.com/envoy.config.listener.v3.Listener\",\n" + + " \"nonce\": \"0000\"\n" + + "}"; + String res = printer.print(response); + assertThat(res).isEqualTo(expectedString); + } + + @Test + public void messagePrinter_printRdsResponse() { + MessagePrinter printer = new MessagePrinter(); + List routeConfigs = + ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost( + ImmutableList.of("foo.googleapis.com", "bar.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse response = + buildDiscoveryResponse("213", routeConfigs, ResourceType.RDS.typeUrl(), "0052"); + + String expectedString = "{\n" + + " \"versionInfo\": \"213\",\n" + + " \"resources\": [{\n" + + " \"@type\": \"type.googleapis.com/envoy.config.route.v3.RouteConfiguration\",\n" + + " \"name\": \"route-foo.googleapis.com\",\n" + + " \"virtualHosts\": [{\n" + + " \"name\": \"virtualhost00.googleapis.com\",\n" + + " \"domains\": [\"foo.googleapis.com\", \"bar.googleapis.com\"],\n" + + " \"routes\": [{\n" + + " \"match\": {\n" + + " \"prefix\": \"\"\n" + + " },\n" + + " \"route\": {\n" + + " \"cluster\": \"cluster.googleapis.com\"\n" + + " }\n" + + " }]\n" + + " }]\n" + + " }],\n" + + " \"typeUrl\": \"type.googleapis.com/envoy.config.route.v3.RouteConfiguration\",\n" + + " \"nonce\": \"0052\"\n" + + "}"; + String res = printer.print(response); + assertThat(res).isEqualTo(expectedString); + } + + @Test + public void messagePrinter_printCdsResponse() { + MessagePrinter printer = new MessagePrinter(); + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", "service-blaze:cluster-bar", true)), + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("14", clusters, ResourceType.CDS.typeUrl(), "8"); + + String expectedString = "{\n" + + " \"versionInfo\": \"14\",\n" + + " \"resources\": [{\n" + + " \"@type\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n" + + " \"name\": \"cluster-bar.googleapis.com\",\n" + + " \"type\": \"EDS\",\n" + + " \"edsClusterConfig\": {\n" + + " \"edsConfig\": {\n" + + " \"ads\": {\n" + + " }\n" + + " },\n" + + " \"serviceName\": \"service-blaze:cluster-bar\"\n" + + " },\n" + + " \"lrsServer\": {\n" + + " \"self\": {\n" + + " }\n" + + " }\n" + + " }, {\n" + + " \"@type\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n" + + " \"name\": \"cluster-foo.googleapis.com\",\n" + + " \"type\": \"EDS\",\n" + + " \"edsClusterConfig\": {\n" + + " \"edsConfig\": {\n" + + " \"ads\": {\n" + + " }\n" + + " }\n" + + " }\n" + + " }],\n" + + " \"typeUrl\": \"type.googleapis.com/envoy.config.cluster.v3.Cluster\",\n" + + " \"nonce\": \"8\"\n" + + "}"; + String res = printer.print(response); + assertThat(res).isEqualTo(expectedString); + } + + @Test + public void messagePrinter_printEdsResponse() { + MessagePrinter printer = new MessagePrinter(); + List clusterLoadAssignments = ImmutableList.of( + Any.pack(buildClusterLoadAssignment("cluster-foo.googleapis.com", + ImmutableList.of( + buildLocalityLbEndpoints("region1", "zone1", "subzone1", + ImmutableList.of( + buildLbEndpoint("192.168.0.1", 8080, HealthStatus.HEALTHY, 2)), + 1, 0), + buildLocalityLbEndpoints("region3", "zone3", "subzone3", + ImmutableList.of( + buildLbEndpoint("192.168.142.5", 80, HealthStatus.UNHEALTHY, 5)), + 2, 1)), + ImmutableList.of( + buildDropOverload("lb", 200), + buildDropOverload("throttle", 1000))))); + + DiscoveryResponse response = + buildDiscoveryResponse("5", clusterLoadAssignments, + ResourceType.EDS.typeUrl(), "004"); + + String expectedString = "{\n" + + " \"versionInfo\": \"5\",\n" + + " \"resources\": [{\n" + + " \"@type\": \"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment\",\n" + + " \"clusterName\": \"cluster-foo.googleapis.com\",\n" + + " \"endpoints\": [{\n" + + " \"locality\": {\n" + + " \"region\": \"region1\",\n" + + " \"zone\": \"zone1\",\n" + + " \"subZone\": \"subzone1\"\n" + + " },\n" + + " \"lbEndpoints\": [{\n" + + " \"endpoint\": {\n" + + " \"address\": {\n" + + " \"socketAddress\": {\n" + + " \"address\": \"192.168.0.1\",\n" + + " \"portValue\": 8080\n" + + " }\n" + + " }\n" + + " },\n" + + " \"healthStatus\": \"HEALTHY\",\n" + + " \"loadBalancingWeight\": 2\n" + + " }],\n" + + " \"loadBalancingWeight\": 1\n" + + " }, {\n" + + " \"locality\": {\n" + + " \"region\": \"region3\",\n" + + " \"zone\": \"zone3\",\n" + + " \"subZone\": \"subzone3\"\n" + + " },\n" + + " \"lbEndpoints\": [{\n" + + " \"endpoint\": {\n" + + " \"address\": {\n" + + " \"socketAddress\": {\n" + + " \"address\": \"192.168.142.5\",\n" + + " \"portValue\": 80\n" + + " }\n" + + " }\n" + + " },\n" + + " \"healthStatus\": \"UNHEALTHY\",\n" + + " \"loadBalancingWeight\": 5\n" + + " }],\n" + + " \"loadBalancingWeight\": 2,\n" + + " \"priority\": 1\n" + + " }],\n" + + " \"policy\": {\n" + + " \"dropOverloads\": [{\n" + + " \"category\": \"lb\",\n" + + " \"dropPercentage\": {\n" + + " \"numerator\": 200,\n" + + " \"denominator\": \"MILLION\"\n" + + " }\n" + + " }, {\n" + + " \"category\": \"throttle\",\n" + + " \"dropPercentage\": {\n" + + " \"numerator\": 1000,\n" + + " \"denominator\": \"MILLION\"\n" + + " }\n" + + " }]\n" + + " }\n" + + " }],\n" + + " \"typeUrl\": \"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment\",\n" + + " \"nonce\": \"004\"\n" + + "}"; + String res = printer.print(response); + assertThat(res).isEqualTo(expectedString); + } + + private RpcCall startResourceWatcher( + ResourceType type, String name, ResourceWatcher watcher) { + FakeClock.TaskFilter timeoutTaskFilter; + switch (type) { + case LDS: + timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchLdsResource(name, (LdsResourceWatcher) watcher); + break; + case RDS: + timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchRdsResource(name, (RdsResourceWatcher) watcher); + break; + case CDS: + timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchCdsResource(name, (CdsResourceWatcher) watcher); + break; + case EDS: + timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchEdsResource(name, (EdsResourceWatcher) watcher); + break; + case UNKNOWN: + default: + throw new AssertionError("should never be here"); + } + RpcCall call = resourceDiscoveryCalls.poll(); + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "", name, type.typeUrl(), ""))); + ScheduledTask timeoutTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); + assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) + .isEqualTo(XdsClientImpl2.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); + return call; + } + + /** + * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for + * management server debugging purposes. + * + *

In general, if you are sure error_details field should not be set in a DiscoveryRequest, + * compare with message equality. Otherwise, this matcher is handy for comparing other fields + * only. + */ + private static class DiscoveryRequestMatcher implements ArgumentMatcher { + private final Node node; + private final String versionInfo; + private final String typeUrl; + private final Set resourceNames; + private final String responseNonce; + + private DiscoveryRequestMatcher( + Node node, String versionInfo, List resourceNames, String typeUrl, + String responseNonce) { + this.node = node; + this.versionInfo = versionInfo; + this.resourceNames = new HashSet<>(resourceNames); + this.typeUrl = typeUrl; + this.responseNonce = responseNonce; + } + + @Override + public boolean matches(DiscoveryRequest argument) { + if (!typeUrl.equals(argument.getTypeUrl())) { + return false; + } + if (!versionInfo.equals(argument.getVersionInfo())) { + return false; + } + if (!responseNonce.equals(argument.getResponseNonce())) { + return false; + } + if (!node.toEnvoyProtoNode().equals(argument.getNode())) { + return false; + } + if (!resourceNames.equals(new HashSet<>(argument.getResourceNamesList()))) { + return false; + } + return argument.getNode().equals(NODE.toEnvoyProtoNode()); + } + } + + private static class RpcCall { + private final StreamObserver requestObserver; + private final StreamObserver responseObserver; + + RpcCall(StreamObserver requestObserver, StreamObserver responseObserver) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java index 3220a77e52..08ba88aad2 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java @@ -56,6 +56,7 @@ import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.grpc.xds.EnvoyProtoData.Node; +import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -160,6 +161,23 @@ class XdsClientTestHelper { .build(); } + static List buildVirtualHosts(int num) { + List virtualHosts = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + VirtualHost virtualHost = + VirtualHost.newBuilder() + .setName(num + ": do not care") + .addDomains("do not care") + .addRoutes( + Route.newBuilder() + .setRoute(RouteAction.newBuilder().setCluster("do not care")) + .setMatch(RouteMatch.newBuilder().setPrefix("do not care"))) + .build(); + virtualHosts.add(virtualHost); + } + return virtualHosts; + } + static VirtualHost buildVirtualHost(List domains, String clusterName) { return VirtualHost.newBuilder() .setName("virtualhost00.googleapis.com") // don't care