xds: integrate CDS protocol in XdsClient (#6416)

Add CDS protocol integration into XdsClient. gRPC client interacts with XdsClient for CDS results via adding a cluster watcher to receive cluster information from traffic director. Multiple watchers can exist to receive cluster information for the same or different clusters.
This commit is contained in:
Chengyuan Zhang 2019-12-02 13:57:36 -08:00 committed by GitHub
parent 8062406afc
commit 06dd24e248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1046 additions and 14 deletions

View File

@ -344,6 +344,8 @@ abstract class XdsClient {
/**
* Registers a data watcher for the given cluster.
*
* <p>Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchClusterData(String clusterName, ClusterWatcher watcher) {
}
@ -351,12 +353,16 @@ abstract class XdsClient {
/**
* Unregisters the given cluster watcher, which was registered to receive updates for the
* given cluster.
*
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
}
/**
* Registers a data watcher for endpoints in the given cluster.
*
* <p>Adding the same watcher for the same cluster more than once is a no-op.
*/
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
}
@ -364,6 +370,8 @@ abstract class XdsClient {
/**
* Unregisters the given endpoints watcher, which was registered to receive updates for
* endpoints information in the given cluster.
*
* <p>Cancelling a watcher that was not registered for the given cluster is a no-op.
*/
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
}

View File

@ -25,6 +25,10 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
@ -71,6 +75,8 @@ final class XdsClientImpl extends XdsClient {
static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
@VisibleForTesting
static final String ADS_TYPE_URL_CDS = "type.googleapis.com/envoy.api.v2.Cluster";
@VisibleForTesting
static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
@ -91,12 +97,21 @@ final class XdsClientImpl extends XdsClient {
// responses.
private final Map<String, String> routeConfigNamesToClusterNames = new HashMap<>();
// Cached data for CDS responses, keyed by cluster names.
// Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
// of whole Cluster messages to reduce memory usage.
private final Map<String, ClusterUpdate> clusterNamesToClusterUpdates = new HashMap<>();
// Cached data for EDS responses, keyed by cluster names.
// CDS responses indicate absence of clusters and EDS responses indicate presence of clusters.
// Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead
// of whole ClusterLoadAssignment messages to reduce memory usage.
private final Map<String, EndpointUpdate> clusterNamesToEndpointUpdates = new HashMap<>();
// Cluster watchers waiting for cluster information updates. Multiple cluster watchers
// can watch on information for the same cluster.
private final Map<String, Set<ClusterWatcher>> clusterWatchers = new HashMap<>();
// Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint
// watchers can watch endpoints in the same cluster.
private final Map<String, Set<EndpointWatcher>> endpointWatchers = new HashMap<>();
@ -187,6 +202,66 @@ final class XdsClientImpl extends XdsClient {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
}
@Override
void watchClusterData(String clusterName, ClusterWatcher watcher) {
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!clusterWatchers.containsKey(clusterName)) {
logger.log(Level.FINE, "Start watching cluster {0}", clusterName);
needRequest = true;
clusterWatchers.put(clusterName, new HashSet<ClusterWatcher>());
}
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
if (watchers.contains(watcher)) {
logger.log(Level.WARNING, "Watcher {0} already registered", watcher);
return;
}
watchers.add(watcher);
// If local cache contains cluster information to be watched, notify the watcher immediately.
if (clusterNamesToClusterUpdates.containsKey(clusterName)) {
watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName));
}
if (rpcRetryTimer != null) {
// Currently in retry backoff.
return;
}
if (needRequest) {
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
}
}
@Override
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
if (watchers == null || !watchers.contains(watcher)) {
logger.log(Level.FINE, "Watcher {0} was not registered", watcher);
return;
}
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(Level.FINE, "Stop watching cluster {0}", clusterName);
clusterWatchers.remove(clusterName);
// If unsubscribe the last resource, do NOT send a CDS request for an empty resource list.
// This is a workaround for CDS protocol resource unsubscribe.
if (clusterWatchers.isEmpty()) {
return;
}
// No longer interested in this cluster, send an updated CDS request to unsubscribe
// this resource.
if (rpcRetryTimer != null) {
// Currently in retry backoff.
return;
}
checkState(adsStream != null,
"Severe bug: ADS stream was not created while an endpoint watcher was registered");
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
}
}
@Override
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
checkNotNull(watcher, "watcher");
@ -199,6 +274,7 @@ final class XdsClientImpl extends XdsClient {
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
if (watchers.contains(watcher)) {
logger.log(Level.WARNING, "Watcher {0} already registered", watcher);
return;
}
watchers.add(watcher);
// If local cache contains endpoint information for the cluster to be watched, notify
@ -223,7 +299,7 @@ final class XdsClientImpl extends XdsClient {
checkNotNull(watcher, "watcher");
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
if (watchers == null || !watchers.contains(watcher)) {
logger.log(Level.WARNING, "Watcher {0} was not registered", watcher);
logger.log(Level.FINE, "Watcher {0} was not registered", watcher);
return;
}
watchers.remove(watcher);
@ -494,6 +570,124 @@ final class XdsClientImpl extends XdsClient {
return null;
}
/**
* Handles CDS response, which contains a list of Cluster messages with information for a logical
* cluster. The response is NACKed if messages for requested resources contain invalid
* information for gRPC's usage. Otherwise, an ACK request is sent to management server.
* Response data for requested clusters is cached locally, in case of new cluster watchers
* interested in the same clusters are added later.
*/
private void handleCdsResponse(DiscoveryResponse cdsResponse) {
logger.log(Level.FINE, "Received an CDS response: {0}", cdsResponse);
checkState(adsStream.cdsResourceNames != null,
"Never requested for CDS resources, management server is doing something wrong");
adsStream.cdsRespNonce = cdsResponse.getNonce();
// Unpack Cluster messages.
List<Cluster> clusters = new ArrayList<>(cdsResponse.getResourcesCount());
try {
for (com.google.protobuf.Any res : cdsResponse.getResourcesList()) {
clusters.add(res.unpack(Cluster.class));
}
} catch (InvalidProtocolBufferException e) {
adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames,
"Broken CDS response");
return;
}
String errorMessage = null;
// Cluster information update for requested clusters received in this CDS response.
Map<String, ClusterUpdate> clusterUpdates = new HashMap<>();
// CDS responses represents the state of the world, EDS services not referenced by
// Clusters are those no longer exist.
Set<String> edsServices = new HashSet<>();
for (Cluster cluster : clusters) {
String clusterName = cluster.getName();
// Skip information for clusters not requested.
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!adsStream.cdsResourceNames.contains(clusterName)) {
continue;
}
ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
// The type field must be set to EDS.
if (!cluster.getType().equals(DiscoveryType.EDS)) {
errorMessage = "Cluster [" + clusterName + "]: only EDS discovery type is supported "
+ "in gRPC.";
break;
}
// In the eds_cluster_config field, the eds_config field must be set to indicate to
// use EDS (must be set to use ADS).
EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig();
if (!edsClusterConfig.getEdsConfig().hasAds()) {
errorMessage = "Cluster [" + clusterName + "]: field eds_cluster_config must be set to "
+ "indicate to use EDS over ADS.";
break;
}
// If the service_name field is set, that value will be used for the EDS request
// instead of the cluster name (default).
if (!edsClusterConfig.getServiceName().isEmpty()) {
updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName());
edsServices.add(edsClusterConfig.getServiceName());
} else {
edsServices.add(clusterName);
}
// The lb_policy field must be set to ROUND_ROBIN.
if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
errorMessage = "Cluster [" + clusterName + "]: only round robin load balancing policy is "
+ "supported in gRPC.";
break;
}
updateBuilder.setLbPolicy("round_robin");
// If the lrs_server field is set, it must have its self field set, in which case the
// client should use LRS for load reporting. Otherwise (the lrs_server field is not set),
// LRS load reporting will be disabled.
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
errorMessage = "Cluster [" + clusterName + "]: only support enabling LRS for the same "
+ "management server.";
break;
}
updateBuilder.setEnableLrs(true);
updateBuilder.setLrsServerName("");
} else {
updateBuilder.setEnableLrs(false);
}
clusterUpdates.put(clusterName, updateBuilder.build());
}
if (errorMessage != null) {
adsStream.sendNackRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames, errorMessage);
return;
}
adsStream.sendAckRequest(ADS_TYPE_URL_CDS, adsStream.cdsResourceNames,
cdsResponse.getVersionInfo());
// Update local CDS cache with data in this response.
clusterNamesToClusterUpdates.clear();
clusterNamesToClusterUpdates.putAll(clusterUpdates);
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
clusterNamesToEndpointUpdates.keySet().retainAll(edsServices);
// Notify watchers if clusters interested in present. Otherwise, notify with an error.
for (Map.Entry<String, Set<ClusterWatcher>> entry : clusterWatchers.entrySet()) {
if (clusterUpdates.containsKey(entry.getKey())) {
ClusterUpdate clusterUpdate = clusterUpdates.get(entry.getKey());
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onClusterChanged(clusterUpdate);
}
} else {
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onError(
Status.NOT_FOUND.withDescription(
"Requested cluster [" + entry.getKey() + "] does not exist"));
}
}
}
}
/**
* Handles EDS response, which contains a list of ClusterLoadAssignment messages with
* endpoint load balancing information for each cluster. The response is NACKed if messages
@ -607,10 +801,12 @@ final class XdsClientImpl extends XdsClient {
if (configWatcher != null) {
adsStream.sendXdsRequest(ADS_TYPE_URL_LDS, ImmutableList.of(ldsResourceName));
}
if (!clusterWatchers.isEmpty()) {
adsStream.sendXdsRequest(ADS_TYPE_URL_CDS, clusterWatchers.keySet());
}
if (!endpointWatchers.isEmpty()) {
adsStream.sendXdsRequest(ADS_TYPE_URL_EDS, endpointWatchers.keySet());
}
// TODO(chengyuanzhang): send CDS requests if CDS watcher presents.
}
}
@ -626,6 +822,7 @@ final class XdsClientImpl extends XdsClient {
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
// Response nonce for the most recently received discovery responses of each resource type.
@ -636,15 +833,24 @@ final class XdsClientImpl extends XdsClient {
// DiscoveryResponse.
private String ldsRespNonce = "";
private String rdsRespNonce = "";
private String cdsRespNonce = "";
private String edsRespNonce = "";
// Most recently requested RDS resource name, which is an intermediate resource name for
// resolving service config.
// LDS request always use the same resource name, which is the "xds:" URI.
// Resource names for CDS/EDS requests are always represented by the cluster names that
// Resource names for EDS requests are always represented by the cluster names that
// watchers are interested in.
@Nullable
private String rdsResourceName;
// Most recently requested CDS resource names.
// Due to CDS protocol limitation, client does not send a CDS request for empty resource
// names when unsubscribing the last resource. Management server assumes it is still
// subscribing to the last resource, client also need to behave so to avoid data lose.
// Therefore, cluster names that watchers interested in cannot always represent resource names
// in most recently sent CDS requests.
@Nullable
private Collection<String> cdsResourceNames;
private AdsStream(AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub) {
this.stub = checkNotNull(stub, "stub");
@ -671,11 +877,16 @@ final class XdsClientImpl extends XdsClient {
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
rdsRespNonce = response.getNonce();
handleRdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
cdsRespNonce = response.getNonce();
handleCdsResponse(response);
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
edsRespNonce = response.getNonce();
handleEdsResponse(response);
} else {
logger.log(Level.FINE, "Received an unknown type of DiscoveryResponse {0}",
response);
}
// TODO(zdapeng): add CDS response handles.
}
});
}
@ -761,11 +972,17 @@ final class XdsClientImpl extends XdsClient {
version = rdsVersion;
nonce = rdsRespNonce;
rdsResourceName = resourceNames.iterator().next();
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
version = cdsVersion;
nonce = cdsRespNonce;
// For CDS protocol resource unsubscribe workaround, keep the last unsubscribed cluster
// as the requested resource name for ACK requests when all all resources have
// been unsubscribed.
cdsResourceNames = ImmutableList.copyOf(resourceNames);
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
version = edsVersion;
nonce = edsRespNonce;
}
// TODO(chengyuanzhang): cases for CDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
@ -792,11 +1009,13 @@ final class XdsClientImpl extends XdsClient {
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
rdsVersion = versionInfo;
nonce = rdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
cdsVersion = versionInfo;
nonce = cdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
edsVersion = versionInfo;
nonce = edsRespNonce;
}
// TODO(chengyuanzhang): cases for CDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()
@ -824,11 +1043,13 @@ final class XdsClientImpl extends XdsClient {
} else if (typeUrl.equals(ADS_TYPE_URL_RDS)) {
versionInfo = rdsVersion;
nonce = rdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_CDS)) {
versionInfo = cdsVersion;
nonce = cdsRespNonce;
} else if (typeUrl.equals(ADS_TYPE_URL_EDS)) {
versionInfo = edsVersion;
nonce = edsRespNonce;
}
// TODO(chengyuanzhang): cases for EDS.
DiscoveryRequest request =
DiscoveryRequest
.newBuilder()

View File

@ -33,6 +33,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
@ -44,6 +48,7 @@ import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SelfConfigSource;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.listener.FilterChain;
import io.envoyproxy.envoy.api.v2.route.RedirectAction;
@ -72,6 +77,8 @@ import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.XdsClient.ClusterUpdate;
import io.grpc.xds.XdsClient.ClusterWatcher;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.EndpointUpdate;
@ -83,6 +90,7 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -141,6 +149,8 @@ public class XdsClientImplTest {
@Mock
private ConfigWatcher configWatcher;
@Mock
private ClusterWatcher clusterWatcher;
@Mock
private EndpointWatcher endpointWatcher;
private ManagedChannel channel;
@ -187,8 +197,9 @@ public class XdsClientImplTest {
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
xdsClient =
new XdsClientImpl(channel, NODE, syncContext, fakeClock.getScheduledExecutorService(),
backoffPolicyProvider, fakeClock.getStopwatchSupplier().get());
new XdsClientImpl(channel, NODE, syncContext,
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier().get());
// Only the connection to management server is established, no RPC request is sent until at
// least one watcher is registered.
assertThat(responseObservers).isEmpty();
@ -1101,6 +1112,441 @@ public class XdsClientImplTest {
.isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist");
}
/**
* Client receives an CDS response that does not contain a Cluster for the requested resource
* while each received Cluster is valid. The CDS response is ACKed. Cluster watchers are notified
* with an error for resource not found.
*/
@Test
public void cdsResponseWithoutMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends a CDS request for the only cluster being watched to management server.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server sends back a CDS response without Cluster for the requested resource.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<Status> errorStatusCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onError(errorStatusCaptor.capture());
Status error = errorStatusCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
assertThat(error.getDescription())
.isEqualTo("Requested cluster [cluster-foo.googleapis.com] does not exist");
}
/**
* Normal workflow of receiving a CDS response containing Cluster message for a requested
* cluster.
*/
@Test
public void cdsResponseWithMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends a CDS request for the only cluster being watched to management server.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server sends back a CDS response without Cluster for the requested resource.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false);
// Management server sends back another CDS response updating the requested Cluster.
clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)),
Any.pack(
buildCluster("cluster-foo.googleapis.com", "eds-cluster-foo.googleapis.com", true)),
Any.pack(buildCluster("cluster-baz.googleapis.com", null, false)));
response =
buildDiscoveryResponse("1", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("1", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture());
clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
.isEqualTo("eds-cluster-foo.googleapis.com");
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
}
@Test
public void multipleClusterWatchers() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends a CDS request containing all clusters being watched to management server.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server sends back a CDS response contains Cluster for only one of
// requested cluster.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("0",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
// Two watchers get notification of cluster update for the cluster they are interested in.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
// The other watcher gets an error notification for cluster not found.
ArgumentCaptor<Status> errorStatusCaptor = ArgumentCaptor.forClass(null);
verify(watcher3).onError(errorStatusCaptor.capture());
Status error = errorStatusCaptor.getValue();
assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND);
assertThat(error.getDescription())
.isEqualTo("Requested cluster [cluster-bar.googleapis.com] does not exist");
// Management server sends back another CDS response contains Clusters for all
// requested clusters.
clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
Any.pack(
buildCluster("cluster-bar.googleapis.com",
"eds-cluster-bar.googleapis.com", true)));
response = buildDiscoveryResponse("1", clusters,
XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("1",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
// All watchers received notification for cluster update.
verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture());
clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate3.getLrsServerName()).isEqualTo("");
}
/**
* (CDS response caching behavior) Adding cluster watchers interested in some cluster that
* some other endpoint watcher had already been watching on will result in cluster update
* notified to the newly added watcher immediately, without sending new CDS requests.
*/
@Test
public void watchClusterAlreadyBeingWatched() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an CDS request to management server.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server sends back an CDS response with Cluster for the requested
// cluster.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
// Another cluster watcher interested in the same cluster is added.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
// Since the client has received cluster update for this cluster before, cached result is
// notified to the newly added watcher immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
verifyNoMoreInteractions(requestObserver);
}
@Test
public void addRemoveClusterWatchersFreely() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an CDS request to management server.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server sends back a CDS response with Cluster for the requested
// cluster.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)));
DiscoveryResponse response =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
// Client sent an ACK CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
// Add another cluster watcher for a different cluster.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2);
// Client sent a new CDS request for all interested resources.
verify(requestObserver)
.onNext(
eq(buildDiscoveryRequest("0",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
// Management server sends back a CDS response with Cluster for all requested cluster.
clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)),
Any.pack(
buildCluster("cluster-bar.googleapis.com",
"eds-cluster-bar.googleapis.com", true)));
response = buildDiscoveryResponse("1", clusters,
XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
responseObserver.onNext(response);
// Client sent an ACK CDS request for all interested resources.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("1",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
// Cancel one of the watcher.
xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1);
// Since the cancelled watcher was the last watcher interested in that cluster (but there
// is still interested resource), client sent an new CDS request to unsubscribe from
// that cluster.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
// Management server has nothing to respond.
// Cancel the other watcher. All resources have been unsubscribed.
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2);
// All endpoint watchers have been cancelled. Due to protocol limitation, we do not send
// a CDS request for updated resource names (empty) when canceling the last resource.
verifyNoMoreInteractions(requestObserver);
// Management server sends back a new CDS response.
clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)),
Any.pack(
buildCluster("cluster-bar.googleapis.com", null, false)));
response =
buildDiscoveryResponse("2", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0002");
responseObserver.onNext(response);
// Due to protocol limitation, client sent an ACK CDS request, with resource_names containing
// the last unsubscribed resource.
verify(requestObserver)
.onNext(
argThat(
new DiscoveryRequestMatcher("2",
ImmutableList.of("cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0002")));
// Cancelled watchers do not receive notification.
verifyNoMoreInteractions(watcher1, watcher2);
// A new cluster watcher is added to watch cluster foo again.
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3);
// A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("2", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0002")));
// Management server sends back a new CDS response for at least newly requested resources
// (it is required to do so).
clusters = ImmutableList.of(
Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)),
Any.pack(
buildCluster("cluster-bar.googleapis.com", null, false)));
response =
buildDiscoveryResponse("3", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0003");
responseObserver.onNext(response);
// Notified with cached data immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
verifyNoMoreInteractions(watcher1, watcher2);
// A CDS request is sent to re-subscribe the cluster again.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("3", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0003")));
}
/**
* Client receives an EDS response that does not contain a ClusterLoadAssignment for the
* requested resource while each received ClusterLoadAssignment is valid.
@ -1604,9 +2050,6 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_EDS, "0003")));
}
// TODO(chengyuanzhang): incorporate interactions with cluster watchers and end endpoint watchers
// during retry.
/**
* RPC stream closed and retry during the period of first tiem resolving service config
* (LDS/RDS only).
@ -1764,8 +2207,348 @@ public class XdsClientImplTest {
verifyNoMoreInteractions(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
}
// TODO(chengyuanzhang): test for race between stream closed and watcher changes. Should only
// for ClusterWatchers and EndpointWatchers.
/**
* RPC stream close and retry while there are config/cluster/endpoint watchers registered.
*/
@Test
public void streamClosedAndRetry() {
InOrder inOrder =
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
ArgumentCaptor.forClass(null);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
StreamObserver<DiscoveryResponse> responseObserver =
responseObserverCaptor.getValue(); // same as responseObservers.poll()
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
waitUntilConfigResolved(responseObserver);
// Start watching cluster information.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
// Client sent first CDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Start watching endpoint information.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
// Client sent first EDS request.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server closes the RPC stream with an error.
responseObserver.onError(Status.UNKNOWN.asException());
// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
// Retry resumes requests for all wanted resources.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(9L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(99L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server sends back a CDS response.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster.googleapis.com", null, false)));
DiscoveryResponse cdsResponse =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(cdsResponse);
// Client sent an CDS ACK request (Omitted).
// Management server closes the RPC stream.
responseObserver.onCompleted();
// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Retry after backoff.
fakeClock.forwardNanos(19L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
}
/**
* RPC stream closed and retry while some cluster/endpoint watchers have changed (added/removed).
*/
@Test
public void streamClosedAndRetryRaceWithAddingAndRemovingWatchers() {
InOrder inOrder =
Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
xdsClient.watchConfigData(HOSTNAME, PORT, configWatcher);
ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
ArgumentCaptor.forClass(null);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
StreamObserver<DiscoveryResponse> responseObserver =
responseObserverCaptor.getValue(); // same as responseObservers.poll()
requestObservers.poll();
waitUntilConfigResolved(responseObserver);
// Management server closes RPC stream.
responseObserver.onCompleted();
// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching cluster information while RPC stream is still in retry backoff.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
// Retry after backoff.
fakeClock.forwardNanos(9L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Management server is still unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching endpoint information while RPC stream is still in retry backoff.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
// Retry after backoff.
fakeClock.forwardNanos(99L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server sends back a CDS response.
List<Any> clusters = ImmutableList.of(
Any.pack(buildCluster("cluster.googleapis.com", null, false)));
DiscoveryResponse cdsResponse =
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(cdsResponse);
// Client sent an CDS ACK request (Omitted).
// Management server closes the RPC stream again.
responseObserver.onCompleted();
// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
fakeClock.runDueTasks();
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
responseObserver = responseObserverCaptor.getValue();
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// No longer interested in previous cluster and endpoints in that cluster.
xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher);
xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher);
// Retry after backoff.
fakeClock.forwardNanos(19L);
assertThat(requestObservers).isEmpty();
fakeClock.forwardNanos(1L);
inOrder.verify(mockedDiscoveryService)
.streamAggregatedResources(responseObserverCaptor.capture());
requestObserver = requestObservers.poll();
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest("", "foo.googleapis.com:8080",
XdsClientImpl.ADS_TYPE_URL_LDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
verify(requestObserver, never())
.onNext(eq(buildDiscoveryRequest("", "cluster.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
}
// Simulates the use case of watching clusters/endpoints based on service config resolved by
// LDS/RDS.
private void waitUntilConfigResolved(StreamObserver<DiscoveryResponse> responseObserver) {
// Client sent an LDS request for resource "foo.googleapis.com:8080" (Omitted).
// Management server responses with a listener telling client to do RDS.
Rds rdsConfig =
Rds.newBuilder()
.setConfigSource(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()))
.setRouteConfigName("route-foo.googleapis.com")
.build();
List<Any> listeners = ImmutableList.of(
Any.pack(buildListener("foo.googleapis.com:8080", /* matching resource */
Any.pack(HttpConnectionManager.newBuilder().setRds(rdsConfig).build())))
);
DiscoveryResponse ldsResponse =
buildDiscoveryResponse("0", listeners, XdsClientImpl.ADS_TYPE_URL_LDS, "0000");
responseObserver.onNext(ldsResponse);
// Client sent an LDS ACK request and an RDS request for resource
// "route-foo.googleapis.com" (Omitted).
// Management server sends an RDS response.
List<Any> routeConfigs = ImmutableList.of(
Any.pack(
buildRouteConfiguration(
"route-foo.googleapis.com",
ImmutableList.of(
buildVirtualHost(ImmutableList.of("foo.googleapis.com"),
"cluster.googleapis.com")))));
DiscoveryResponse rdsResponse =
buildDiscoveryResponse("0", routeConfigs, XdsClientImpl.ADS_TYPE_URL_RDS, "0000");
responseObserver.onNext(rdsResponse);
}
@Test
public void matchHostName_exactlyMatch() {
@ -1870,6 +2653,26 @@ public class XdsClientImplTest {
.build();
}
private static Cluster buildCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs) {
Cluster.Builder clusterBuilder = Cluster.newBuilder();
clusterBuilder.setName(clusterName);
clusterBuilder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig(
ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance()));
if (edsServiceName != null) {
edsClusterConfigBuilder.setServiceName(edsServiceName);
}
clusterBuilder.setEdsClusterConfig(edsClusterConfigBuilder);
clusterBuilder.setLbPolicy(LbPolicy.ROUND_ROBIN);
if (enableLrs) {
clusterBuilder.setLrsServer(
ConfigSource.newBuilder().setSelf(SelfConfigSource.getDefaultInstance()));
}
return clusterBuilder.build();
}
private static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName,
List<io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints> localityLbEndpoints,
List<Policy.DropOverload> dropOverloads) {