diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 39945ab074..6ed206715b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -344,6 +344,8 @@ abstract class XdsClient { /** * Registers a data watcher for the given cluster. + * + *

Adding the same watcher for the same cluster more than once is a no-op. */ void watchClusterData(String clusterName, ClusterWatcher watcher) { } @@ -351,12 +353,16 @@ abstract class XdsClient { /** * Unregisters the given cluster watcher, which was registered to receive updates for the * given cluster. + * + *

Cancelling a watcher that was not registered for the given cluster is a no-op. */ void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { } /** * Registers a data watcher for endpoints in the given cluster. + * + *

Adding the same watcher for the same cluster more than once is a no-op. */ void watchEndpointData(String clusterName, EndpointWatcher watcher) { } @@ -364,6 +370,8 @@ abstract class XdsClient { /** * Unregisters the given endpoints watcher, which was registered to receive updates for * endpoints information in the given cluster. + * + *

Cancelling a watcher that was not registered for the given cluster is a no-op. */ void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index d4077386f0..011dc033b2 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -25,6 +25,10 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; @@ -71,6 +75,8 @@ final class XdsClientImpl extends XdsClient { static final String ADS_TYPE_URL_RDS = "type.googleapis.com/envoy.api.v2.RouteConfiguration"; @VisibleForTesting + static final String ADS_TYPE_URL_CDS = "type.googleapis.com/envoy.api.v2.Cluster"; + @VisibleForTesting static final String ADS_TYPE_URL_EDS = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; @@ -91,12 +97,21 @@ final class XdsClientImpl extends XdsClient { // responses. private final Map routeConfigNamesToClusterNames = new HashMap<>(); + // Cached data for CDS responses, keyed by cluster names. + // Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead + // of whole Cluster messages to reduce memory usage. + private final Map clusterNamesToClusterUpdates = new HashMap<>(); + // Cached data for EDS responses, keyed by cluster names. // CDS responses indicate absence of clusters and EDS responses indicate presence of clusters. // Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead // of whole ClusterLoadAssignment messages to reduce memory usage. private final Map clusterNamesToEndpointUpdates = new HashMap<>(); + // Cluster watchers waiting for cluster information updates. Multiple cluster watchers + // can watch on information for the same cluster. + private final Map> clusterWatchers = new HashMap<>(); + // Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint // watchers can watch endpoints in the same cluster. private final Map> endpointWatchers = new HashMap<>(); @@ -187,6 +202,66 @@ final class XdsClientImpl extends XdsClient { adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); } + @Override + void watchClusterData(String clusterName, ClusterWatcher watcher) { + checkNotNull(watcher, "watcher"); + boolean needRequest = false; + if (!clusterWatchers.containsKey(clusterName)) { + logger.log(Level.FINE, "Start watching cluster {0}", clusterName); + needRequest = true; + clusterWatchers.put(clusterName, new HashSet()); + } + Set watchers = clusterWatchers.get(clusterName); + if (watchers.contains(watcher)) { + logger.log(Level.WARNING, "Watcher {0} already registered", watcher); + return; + } + watchers.add(watcher); + // If local cache contains cluster information to be watched, notify the watcher immediately. + if (clusterNamesToClusterUpdates.containsKey(clusterName)) { + watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName)); + } + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + if (needRequest) { + if (adsStream == null) { + startRpcStream(); + } + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } + } + + @Override + void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { + checkNotNull(watcher, "watcher"); + Set watchers = clusterWatchers.get(clusterName); + if (watchers == null || !watchers.contains(watcher)) { + logger.log(Level.FINE, "Watcher {0} was not registered", watcher); + return; + } + watchers.remove(watcher); + if (watchers.isEmpty()) { + logger.log(Level.FINE, "Stop watching cluster {0}", clusterName); + clusterWatchers.remove(clusterName); + // If unsubscribe the last resource, do NOT send a CDS request for an empty resource list. + // This is a workaround for CDS protocol resource unsubscribe. + if (clusterWatchers.isEmpty()) { + return; + } + // No longer interested in this cluster, send an updated CDS request to unsubscribe + // this resource. + if (rpcRetryTimer != null) { + // Currently in retry backoff. + return; + } + checkState(adsStream != null, + "Severe bug: ADS stream was not created while an endpoint watcher was registered"); + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } + } + @Override void watchEndpointData(String clusterName, EndpointWatcher watcher) { checkNotNull(watcher, "watcher"); @@ -199,6 +274,7 @@ final class XdsClientImpl extends XdsClient { Set watchers = endpointWatchers.get(clusterName); if (watchers.contains(watcher)) { logger.log(Level.WARNING, "Watcher {0} already registered", watcher); + return; } watchers.add(watcher); // If local cache contains endpoint information for the cluster to be watched, notify @@ -223,7 +299,7 @@ final class XdsClientImpl extends XdsClient { checkNotNull(watcher, "watcher"); Set watchers = endpointWatchers.get(clusterName); if (watchers == null || !watchers.contains(watcher)) { - logger.log(Level.WARNING, "Watcher {0} was not registered", watcher); + logger.log(Level.FINE, "Watcher {0} was not registered", watcher); return; } watchers.remove(watcher); @@ -494,6 +570,124 @@ final class XdsClientImpl extends XdsClient { return null; } + /** + * Handles CDS response, which contains a list of Cluster messages with information for a logical + * cluster. The response is NACKed if messages for requested resources contain invalid + * information for gRPC's usage. Otherwise, an ACK request is sent to management server. + * Response data for requested clusters is cached locally, in case of new cluster watchers + * interested in the same clusters are added later. + */ + private void handleCdsResponse(DiscoveryResponse cdsResponse) { + logger.log(Level.FINE, "Received an CDS response: {0}", cdsResponse); + checkState(adsStream.cdsResourceNames != null, + "Never requested for CDS resources, management server is doing something wrong"); + adsStream.cdsRespNonce = cdsResponse.getNonce(); + + // Unpack Cluster messages. + List clusters = new ArrayList<>(cdsResponse.getResourcesCount()); + try { + for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) { + clusters.add(res.unpack(Cluster.class)); + } + } catch (InvalidProtocolBufferException e) { + adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, + "Broken CDS response"); + return; + } + + String errorMessage = null; + // Cluster information update for requested clusters received in this CDS response. + Map clusterUpdates = new HashMap<>(); + // CDS responses represents the state of the world, EDS services not referenced by + // Clusters are those no longer exist. + Set edsServices = new HashSet<>(); + for (Cluster cluster : clusters) { + String clusterName = cluster.getName(); + // Skip information for clusters not requested. + // Management server is required to always send newly requested resources, even if they + // may have been sent previously (proactively). Thus, client does not need to cache + // unrequested resources. + if (!adsStream.cdsResourceNames.contains(clusterName)) { + continue; + } + ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder(); + updateBuilder.setClusterName(clusterName); + // The type field must be set to EDS. + if (!cluster.getType().equals(DiscoveryType.EDS)) { + errorMessage = "Cluster [" + clusterName + "]: only EDS discovery type is supported " + + "in gRPC."; + break; + } + // In the eds_cluster_config field, the eds_config field must be set to indicate to + // use EDS (must be set to use ADS). + EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig(); + if (!edsClusterConfig.getEdsConfig().hasAds()) { + errorMessage = "Cluster [" + clusterName + "]: field eds_cluster_config must be set to " + + "indicate to use EDS over ADS."; + break; + } + // If the service_name field is set, that value will be used for the EDS request + // instead of the cluster name (default). + if (!edsClusterConfig.getServiceName().isEmpty()) { + updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName()); + edsServices.add(edsClusterConfig.getServiceName()); + } else { + edsServices.add(clusterName); + } + // The lb_policy field must be set to ROUND_ROBIN. + if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) { + errorMessage = "Cluster [" + clusterName + "]: only round robin load balancing policy is " + + "supported in gRPC."; + break; + } + updateBuilder.setLbPolicy("round_robin"); + // If the lrs_server field is set, it must have its self field set, in which case the + // client should use LRS for load reporting. Otherwise (the lrs_server field is not set), + // LRS load reporting will be disabled. + if (cluster.hasLrsServer()) { + if (!cluster.getLrsServer().hasSelf()) { + errorMessage = "Cluster [" + clusterName + "]: only support enabling LRS for the same " + + "management server."; + break; + } + updateBuilder.setEnableLrs(true); + updateBuilder.setLrsServerName(""); + } else { + updateBuilder.setEnableLrs(false); + } + clusterUpdates.put(clusterName, updateBuilder.build()); + } + if (errorMessage != null) { + adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, errorMessage); + return; + } + adsStream.sendAckRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, + cdsResponse.getVersionInfo()); + + // Update local CDS cache with data in this response. + clusterNamesToClusterUpdates.clear(); + clusterNamesToClusterUpdates.putAll(clusterUpdates); + + // Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response. + clusterNamesToEndpointUpdates.keySet().retainAll(edsServices); + + // Notify watchers if clusters interested in present. Otherwise, notify with an error. + for (Map.Entry> entry : clusterWatchers.entrySet()) { + if (clusterUpdates.containsKey(entry.getKey())) { + ClusterUpdate clusterUpdate = clusterUpdates.get(entry.getKey()); + for (ClusterWatcher watcher : entry.getValue()) { + watcher.onClusterChanged(clusterUpdate); + } + } else { + for (ClusterWatcher watcher : entry.getValue()) { + watcher.onError( + Status.NOT_FOUND.withDescription( + "Requested cluster [" + entry.getKey() + "] does not exist")); + } + } + } + } + /** * Handles EDS response, which contains a list of ClusterLoadAssignment messages with * endpoint load balancing information for each cluster. The response is NACKed if messages @@ -607,10 +801,12 @@ final class XdsClientImpl extends XdsClient { if (configWatcher != null) { adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName)); } + if (!clusterWatchers.isEmpty()) { + adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet()); + } if (!endpointWatchers.isEmpty()) { adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet()); } - // TODO(chengyuanzhang): send CDS requests if CDS watcher presents. } } @@ -626,6 +822,7 @@ final class XdsClientImpl extends XdsClient { // resources. private String ldsVersion = ""; private String rdsVersion = ""; + private String cdsVersion = ""; private String edsVersion = ""; // Response nonce for the most recently received discovery responses of each resource type. @@ -636,15 +833,24 @@ final class XdsClientImpl extends XdsClient { // DiscoveryResponse. private String ldsRespNonce = ""; private String rdsRespNonce = ""; + private String cdsRespNonce = ""; private String edsRespNonce = ""; // Most recently requested RDS resource name, which is an intermediate resource name for // resolving service config. // LDS request always use the same resource name, which is the "xds:" URI. - // Resource names for CDS/EDS requests are always represented by the cluster names that + // Resource names for EDS requests are always represented by the cluster names that // watchers are interested in. @Nullable private String rdsResourceName; + // Most recently requested CDS resource names. + // Due to CDS protocol limitation, client does not send a CDS request for empty resource + // names when unsubscribing the last resource. Management server assumes it is still + // subscribing to the last resource, client also need to behave so to avoid data lose. + // Therefore, cluster names that watchers interested in cannot always represent resource names + // in most recently sent CDS requests. + @Nullable + private Collection cdsResourceNames; private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) { this.stub = checkNotNull(stub, "stub"); @@ -671,11 +877,16 @@ final class XdsClientImpl extends XdsClient { } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { rdsRespNonce = response.getNonce(); handleRdsResponse(response); + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + cdsRespNonce = response.getNonce(); + handleCdsResponse(response); } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { edsRespNonce = response.getNonce(); handleEdsResponse(response); + } else { + logger.log(Level.FINE, "Received an unknown type of DiscoveryResponse {0}", + response); } - // TODO(zdapeng): add CDS response handles. } }); } @@ -761,11 +972,17 @@ final class XdsClientImpl extends XdsClient { version = rdsVersion; nonce = rdsRespNonce; rdsResourceName = resourceNames.iterator().next(); + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + version = cdsVersion; + nonce = cdsRespNonce; + // For CDS protocol resource unsubscribe workaround, keep the last unsubscribed cluster + // as the requested resource name for ACK requests when all all resources have + // been unsubscribed. + cdsResourceNames = ImmutableList.copyOf(resourceNames); } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { version = edsVersion; nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for CDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() @@ -792,11 +1009,13 @@ final class XdsClientImpl extends XdsClient { } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { rdsVersion = versionInfo; nonce = rdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + cdsVersion = versionInfo; + nonce = cdsRespNonce; } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { edsVersion = versionInfo; nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for CDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() @@ -824,11 +1043,13 @@ final class XdsClientImpl extends XdsClient { } else if (typeUrl.equals(ADS_TYPE_URL_RDS)) { versionInfo = rdsVersion; nonce = rdsRespNonce; + } else if (typeUrl.equals(ADS_TYPE_URL_CDS)) { + versionInfo = cdsVersion; + nonce = cdsRespNonce; } else if (typeUrl.equals(ADS_TYPE_URL_EDS)) { versionInfo = edsVersion; nonce = edsRespNonce; } - // TODO(chengyuanzhang): cases for EDS. DiscoveryRequest request = DiscoveryRequest .newBuilder() diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index a8d4a5e1de..a38ed0a96b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -33,6 +33,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; @@ -44,6 +48,7 @@ import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; import io.envoyproxy.envoy.api.v2.core.ConfigSource; import io.envoyproxy.envoy.api.v2.core.HealthStatus; import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SelfConfigSource; import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.listener.FilterChain; import io.envoyproxy.envoy.api.v2.route.RedirectAction; @@ -72,6 +77,8 @@ import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.XdsClient.ClusterUpdate; +import io.grpc.xds.XdsClient.ClusterWatcher; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; import io.grpc.xds.XdsClient.EndpointUpdate; @@ -83,6 +90,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -141,6 +149,8 @@ public class XdsClientImplTest { @Mock private ConfigWatcher configWatcher; @Mock + private ClusterWatcher clusterWatcher; + @Mock private EndpointWatcher endpointWatcher; private ManagedChannel channel; @@ -187,8 +197,9 @@ public class XdsClientImplTest { channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); xdsClient = - new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(), - backoffPolicyProvider, fakeClock.getStopwatchSupplier().get()); + new XdsClientImpl(channel, NODE, syncContext, + fakeClock.getScheduledExecutorService(), backoffPolicyProvider, + fakeClock.getStopwatchSupplier().get()); // Only the connection to management server is established, no RPC request is sent until at // least one watcher is registered. assertThat(responseObservers).isEmpty(); @@ -1101,6 +1112,441 @@ public class XdsClientImplTest { .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist"); } + /** + * Client receives an CDS response that does not contain a Cluster for the requested resource + * while each received Cluster is valid. The CDS response is ACKed. Cluster watchers are notified + * with an error for resource not found. + */ + @Test + public void cdsResponseWithoutMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-foo.googleapis.com] does not exist"); + } + + /** + * Normal workflow of receiving a CDS response containing Cluster message for a requested + * cluster. + */ + @Test + public void cdsResponseWithMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); + ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false); + + // Management server sends back another CDS response updating the requested Cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-foo.googleapis.com", "eds-cluster-foo.googleapis.com", true)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + response = + buildDiscoveryResponse("1", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture()); + clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("eds-cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + } + + @Test + public void multipleClusterWatchers() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); + + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request containing all clusters being watched to management server. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response contains Cluster for only one of + // requested cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + // Two watchers get notification of cluster update for the cluster they are interested in. + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + // The other watcher gets an error notification for cluster not found. + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(watcher3).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-bar.googleapis.com] does not exist"); + + // Management server sends back another CDS response contains Clusters for all + // requested clusters. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + // All watchers received notification for cluster update. + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture()); + clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate3.getLrsServerName()).isEqualTo(""); + } + + /** + * (CDS response caching behavior) Adding cluster watchers interested in some cluster that + * some other endpoint watcher had already been watching on will result in cluster update + * notified to the newly added watcher immediately, without sending new CDS requests. + */ + @Test + public void watchClusterAlreadyBeingWatched() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an CDS request to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back an CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Another cluster watcher interested in the same cluster is added. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + + // Since the client has received cluster update for this cluster before, cached result is + // notified to the newly added watcher immediately. + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + verifyNoMoreInteractions(requestObserver); + } + + @Test + public void addRemoveClusterWatchersFreely() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an CDS request to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Add another cluster watcher for a different cluster. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2); + + // Client sent a new CDS request for all interested resources. + verify(requestObserver) + .onNext( + eq(buildDiscoveryRequest("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + // Management server sends back a CDS response with Cluster for all requested cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + + // Cancel one of the watcher. + xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); + + // Since the cancelled watcher was the last watcher interested in that cluster (but there + // is still interested resource), client sent an new CDS request to unsubscribe from + // that cluster. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + // Management server has nothing to respond. + + // Cancel the other watcher. All resources have been unsubscribed. + xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2); + + // All endpoint watchers have been cancelled. Due to protocol limitation, we do not send + // a CDS request for updated resource names (empty) when canceling the last resource. + verifyNoMoreInteractions(requestObserver); + + // Management server sends back a new CDS response. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", null, false))); + response = + buildDiscoveryResponse("2", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0002"); + responseObserver.onNext(response); + + // Due to protocol limitation, client sent an ACK CDS request, with resource_names containing + // the last unsubscribed resource. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("2", + ImmutableList.of("cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); + + // Cancelled watchers do not receive notification. + verifyNoMoreInteractions(watcher1, watcher2); + + // A new cluster watcher is added to watch cluster foo again. + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3); + + // A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("2", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); + + // Management server sends back a new CDS response for at least newly requested resources + // (it is required to do so). + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", null, false))); + response = + buildDiscoveryResponse("3", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0003"); + responseObserver.onNext(response); + + // Notified with cached data immediately. + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + + verifyNoMoreInteractions(watcher1, watcher2); + + // A CDS request is sent to re-subscribe the cluster again. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("3", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0003"))); + } + /** * Client receives an EDS response that does not contain a ClusterLoadAssignment for the * requested resource while each received ClusterLoadAssignment is valid. @@ -1604,9 +2050,6 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_EDS, "0003"))); } - // TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers - // during retry. - /** * RPC stream closed and retry during the period of first tiem resolving service config * (LDS/RDS only). @@ -1764,8 +2207,348 @@ public class XdsClientImplTest { verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); } - // TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only - // for ClusterWatchers and EndpointWatchers. + /** + * RPC stream close and retry while there are config/cluster/endpoint watchers registered. + */ + @Test + public void streamClosedAndRetry() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + StreamObserver requestObserver = requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Start watching cluster information. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Client sent first CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Start watching endpoint information. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Client sent first EDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server closes the RPC stream with an error. + responseObserver.onError(Status.UNKNOWN.asException()); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + // Retry resumes requests for all wanted resources. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server is still not reachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + /** + * RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed). + */ + @Test + public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() { + InOrder inOrder = + Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher); + + ArgumentCaptor> responseObserverCaptor = + ArgumentCaptor.forClass(null); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + StreamObserver responseObserver = + responseObserverCaptor.getValue(); // same as responseObservers.poll() + requestObservers.poll(); + + waitUntilConfigResolved(responseObserver); + + // Management server closes RPC stream. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + StreamObserver requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + + // Management server becomes unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching cluster information while RPC stream is still in retry backoff. + xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(9L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server is still unreachable. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // Start watching endpoint information while RPC stream is still in retry backoff. + xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(99L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server sends back a CDS response. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster.googleapis.com", null, false))); + DiscoveryResponse cdsResponse = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(cdsResponse); + + // Client sent an CDS ACK request (Omitted). + + // Management server closes the RPC stream again. + responseObserver.onCompleted(); + + // Resets backoff and retry immediately. + inOrder.verify(backoffPolicyProvider).get(); + fakeClock.runDueTasks(); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + responseObserver = responseObserverCaptor.getValue(); + requestObserver = requestObservers.poll(); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + // Management server becomes unreachable again. + responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); + + // No longer interested in previous cluster and endpoints in that cluster. + xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher); + xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher); + + // Retry after backoff. + fakeClock.forwardNanos(19L); + assertThat(requestObservers).isEmpty(); + fakeClock.forwardNanos(1L); + inOrder.verify(mockedDiscoveryService) + .streamAggregatedResources(responseObserverCaptor.capture()); + requestObserver = requestObservers.poll(); + + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080", + XdsClientImpl.ADS_TYPE_URL_LDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + verify(requestObserver, never()) + .onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_EDS, ""))); + + verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1, + backoffPolicy2); + } + + // Simulates the use case of watching clusters/endpoints based on service config resolved by + // LDS/RDS. + private void waitUntilConfigResolved(StreamObserver responseObserver) { + // Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted). + + // Management server responses with a listener telling client to do RDS. + Rds rdsConfig = + Rds.newBuilder() + .setConfigSource( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())) + .setRouteConfigName("route-foo.googleapis.com") + .build(); + + List listeners = ImmutableList.of( + Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */ + Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build()))) + ); + DiscoveryResponse ldsResponse = + buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000"); + responseObserver.onNext(ldsResponse); + + // Client sent an LDS ACK request and an RDS request for resource + // "route-foo.googleapis.com" (Omitted). + + // Management server sends an RDS response. + List routeConfigs = ImmutableList.of( + Any.pack( + buildRouteConfiguration( + "route-foo.googleapis.com", + ImmutableList.of( + buildVirtualHost(ImmutableList.of("foo.googleapis.com"), + "cluster.googleapis.com"))))); + DiscoveryResponse rdsResponse = + buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000"); + responseObserver.onNext(rdsResponse); + } @Test public void matchHostName_exactlyMatch() { @@ -1870,6 +2653,26 @@ public class XdsClientImplTest { .build(); } + private static Cluster buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs) { + Cluster.Builder clusterBuilder = Cluster.newBuilder(); + clusterBuilder.setName(clusterName); + clusterBuilder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + clusterBuilder.setEdsClusterConfig(edsClusterConfigBuilder); + clusterBuilder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + clusterBuilder.setLrsServer( + ConfigSource.newBuilder().setSelf(SelfConfigSource.getDefaultInstance())); + } + return clusterBuilder.build(); + } + private static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName, List localityLbEndpoints, List dropOverloads) {