xds: rename CDS/EDS resource watch interface (#7454)

This commit is contained in:
Chengyuan Zhang 2020-09-24 10:48:58 -07:00 committed by GitHub
parent 41ba242782
commit 2e411512be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 672 additions and 692 deletions

View File

@ -35,8 +35,8 @@ import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.XdsClient.ClusterUpdate;
import io.grpc.xds.XdsClient.ClusterWatcher;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
@ -177,19 +177,19 @@ final class CdsLoadBalancer extends LoadBalancer {
}
}
private final class CdsLbState implements ClusterWatcher {
private final class CdsLbState implements CdsResourceWatcher {
private final ChannelSecurityLbHelper lbHelper = new ChannelSecurityLbHelper();
@Nullable
LoadBalancer edsBalancer;
private CdsLbState() {
xdsClient.watchClusterData(clusterName, this);
xdsClient.watchCdsResource(clusterName, this);
logger.log(XdsLogLevel.INFO,
"Started watcher for cluster {0} with xDS client {1}", clusterName, xdsClient);
}
@Override
public void onClusterChanged(ClusterUpdate newUpdate) {
public void onChanged(CdsUpdate newUpdate) {
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
@ -270,7 +270,7 @@ final class CdsLoadBalancer extends LoadBalancer {
}
void shutdown() {
xdsClient.cancelClusterDataWatch(clusterName, this);
xdsClient.cancelCdsResourceWatch(clusterName, this);
logger.log(XdsLogLevel.INFO,
"Cancelled watcher for cluster {0} with xDS client {1}", clusterName, xdsClient);
if (edsBalancer != null) {

View File

@ -45,8 +45,8 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
@ -146,7 +146,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
return new ChildLbState(helper);
}
private final class ChildLbState extends LoadBalancer implements EndpointWatcher {
private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher {
@Nullable
private final LoadStatsStore loadStatsStore;
private final DropHandlingLbHelper lbHelper;
@ -170,7 +170,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
logger.log(
XdsLogLevel.INFO,
"Start endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient);
xdsClient.watchEndpointData(resourceName, this);
xdsClient.watchEdsResource(resourceName, this);
}
@Override
@ -221,7 +221,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
xdsClient.cancelClientStatsReport();
xdsClient.removeClientStats(cluster, edsServiceName);
}
xdsClient.cancelEndpointDataWatch(resourceName, this);
xdsClient.cancelEdsResourceWatch(resourceName, this);
logger.log(
XdsLogLevel.INFO,
"Cancelled endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient);
@ -236,7 +236,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
}
@Override
public void onEndpointChanged(EndpointUpdate update) {
public void onChanged(EdsUpdate update) {
logger.log(XdsLogLevel.DEBUG,
"Received endpoint update from xDS client {0}: {1}", xdsClient, update);
if (logger.isLoggable(XdsLogLevel.INFO)) {

View File

@ -206,12 +206,7 @@ abstract class XdsClient {
}
}
/**
* Data class containing the results of performing a resource discovery RPC via CDS protocol.
* The results include configurations for a single upstream cluster, such as endpoint discovery
* type, load balancing policy, connection timeout and etc.
*/
static final class ClusterUpdate {
static final class CdsUpdate {
private final String clusterName;
@Nullable
private final String edsServiceName;
@ -220,7 +215,7 @@ abstract class XdsClient {
private final String lrsServerName;
private final UpstreamTlsContext upstreamTlsContext;
private ClusterUpdate(
private CdsUpdate(
String clusterName,
@Nullable String edsServiceName,
String lbPolicy,
@ -295,7 +290,7 @@ abstract class XdsClient {
if (o == null || getClass() != o.getClass()) {
return false;
}
ClusterUpdate that = (ClusterUpdate) o;
CdsUpdate that = (CdsUpdate) o;
return Objects.equals(clusterName, that.clusterName)
&& Objects.equals(edsServiceName, that.edsServiceName)
&& Objects.equals(lbPolicy, that.lbPolicy)
@ -317,7 +312,6 @@ abstract class XdsClient {
@Nullable
private UpstreamTlsContext upstreamTlsContext;
// Use ClusterUpdate.newBuilder().
private Builder() {
}
@ -346,29 +340,23 @@ abstract class XdsClient {
return this;
}
ClusterUpdate build() {
CdsUpdate build() {
checkState(clusterName != null, "clusterName is not set");
checkState(lbPolicy != null, "lbPolicy is not set");
return
new ClusterUpdate(
new CdsUpdate(
clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
}
}
}
/**
* Data class containing the results of performing a resource discovery RPC via EDS protocol.
* The results include endpoint addresses running the requested service, as well as
* configurations for traffic control such as drop overloads, inter-cluster load balancing
* policy and etc.
*/
static final class EndpointUpdate {
static final class EdsUpdate {
private final String clusterName;
private final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
private final List<DropOverload> dropPolicies;
private EndpointUpdate(
private EdsUpdate(
String clusterName,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints,
List<DropOverload> dropPolicies) {
@ -407,7 +395,7 @@ abstract class XdsClient {
if (o == null || getClass() != o.getClass()) {
return false;
}
EndpointUpdate that = (EndpointUpdate) o;
EdsUpdate that = (EdsUpdate) o;
return Objects.equals(clusterName, that.clusterName)
&& Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap)
&& Objects.equals(dropPolicies, that.dropPolicies);
@ -434,7 +422,6 @@ abstract class XdsClient {
private Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
private List<DropOverload> dropPolicies = new ArrayList<>();
// Use EndpointUpdate.newBuilder().
private Builder() {
}
@ -453,10 +440,10 @@ abstract class XdsClient {
return this;
}
EndpointUpdate build() {
EdsUpdate build() {
checkState(clusterName != null, "clusterName is not set");
return
new EndpointUpdate(
new EdsUpdate(
clusterName,
ImmutableMap.copyOf(localityLbEndpointsMap),
ImmutableList.copyOf(dropPolicies));
@ -539,6 +526,7 @@ abstract class XdsClient {
/**
* Config watcher interface. To be implemented by the xDS resolver.
*/
// TODO(chengyuanzhang): delete me.
interface ConfigWatcher extends ResourceWatcher {
/**
@ -547,20 +535,14 @@ abstract class XdsClient {
void onConfigChanged(ConfigUpdate update);
}
/**
* Cluster watcher interface.
*/
interface ClusterWatcher extends ResourceWatcher {
interface CdsResourceWatcher extends ResourceWatcher {
void onClusterChanged(ClusterUpdate update);
void onChanged(CdsUpdate update);
}
/**
* Endpoint watcher interface.
*/
interface EndpointWatcher extends ResourceWatcher {
interface EdsResourceWatcher extends ResourceWatcher {
void onEndpointChanged(EndpointUpdate update);
void onChanged(EdsUpdate update);
}
/**
@ -619,29 +601,27 @@ abstract class XdsClient {
}
/**
* Registers a data watcher for the given cluster.
* Registers a data watcher for the given CDS resource.
*/
void watchClusterData(String clusterName, ClusterWatcher watcher) {
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
}
/**
* Unregisters the given cluster watcher, which was registered to receive updates for the
* given cluster.
* Unregisters the given CDS resource watcher.
*/
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
}
/**
* Registers a data watcher for endpoints in the given cluster.
* Registers a data watcher for the given EDS resource.
*/
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
void watchEdsResource(String resourceName, EdsResourceWatcher watcher) {
}
/**
* Unregisters the given endpoints watcher, which was registered to receive updates for
* endpoints information in the given cluster.
* Unregisters the given EDS resource watcher.
*/
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) {
}
/**

View File

@ -127,29 +127,29 @@ final class XdsClientImpl extends XdsClient {
private Node node;
// Cached data for CDS responses, keyed by cluster names.
// Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead
// Optimization: cache CdsUpdate, which contains only information needed by gRPC, instead
// of whole Cluster messages to reduce memory usage.
private final Map<String, ClusterUpdate> clusterNamesToClusterUpdates = new HashMap<>();
private final Map<String, CdsUpdate> clusterNamesToCdsUpdates = new HashMap<>();
// Cached CDS resources that are known to be absent.
private final Set<String> absentCdsResources = new HashSet<>();
// Cached data for EDS responses, keyed by cluster names.
// CDS responses indicate absence of clusters and EDS responses indicate presence of clusters.
// Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead
// Optimization: cache EdsUpdate, which contains only information needed by gRPC, instead
// of whole ClusterLoadAssignment messages to reduce memory usage.
private final Map<String, EndpointUpdate> clusterNamesToEndpointUpdates = new HashMap<>();
private final Map<String, EdsUpdate> clusterNamesToEdsUpdates = new HashMap<>();
// Cached EDS resources that are known to be absent.
private final Set<String> absentEdsResources = new HashSet<>();
// Cluster watchers waiting for cluster information updates. Multiple cluster watchers
// can watch on information for the same cluster.
private final Map<String, Set<ClusterWatcher>> clusterWatchers = new HashMap<>();
private final Map<String, Set<CdsResourceWatcher>> cdsWatchers = new HashMap<>();
// Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint
// watchers can watch endpoints in the same cluster.
private final Map<String, Set<EndpointWatcher>> endpointWatchers = new HashMap<>();
private final Map<String, Set<EdsResourceWatcher>> edsWatchers = new HashMap<>();
// Resource fetch timers are used to conclude absence of resources. Each timer is activated when
// subscription for the resource starts and disarmed on first update for the resource.
@ -281,27 +281,27 @@ final class XdsClientImpl extends XdsClient {
}
@Override
void watchClusterData(String clusterName, ClusterWatcher watcher) {
checkNotNull(clusterName, "clusterName");
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
checkNotNull(resourceName, "resourceName");
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!clusterWatchers.containsKey(clusterName)) {
logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", clusterName);
if (!cdsWatchers.containsKey(resourceName)) {
logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", resourceName);
needRequest = true;
clusterWatchers.put(clusterName, new HashSet<ClusterWatcher>());
cdsWatchers.put(resourceName, new HashSet<CdsResourceWatcher>());
}
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName);
Set<CdsResourceWatcher> watchers = cdsWatchers.get(resourceName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName);
watchers.add(watcher);
// If local cache contains cluster information to be watched, notify the watcher immediately.
if (absentCdsResources.contains(clusterName)) {
logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", clusterName);
watcher.onResourceDoesNotExist(clusterName);
if (absentCdsResources.contains(resourceName)) {
logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", resourceName);
watcher.onResourceDoesNotExist(resourceName);
return;
}
if (clusterNamesToClusterUpdates.containsKey(clusterName)) {
logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", clusterName);
watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName));
if (clusterNamesToCdsUpdates.containsKey(resourceName)) {
logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", resourceName);
watcher.onChanged(clusterNamesToCdsUpdates.get(resourceName));
return;
}
@ -313,34 +313,34 @@ final class XdsClientImpl extends XdsClient {
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet());
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new CdsResourceFetchTimeoutTask(clusterName),
new CdsResourceFetchTimeoutTask(resourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
cdsRespTimers.put(clusterName, timeoutHandle);
cdsRespTimers.put(resourceName, timeoutHandle);
}
}
@Override
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<ClusterWatcher> watchers = clusterWatchers.get(clusterName);
Set<CdsResourceWatcher> watchers = cdsWatchers.get(resourceName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", clusterName);
"watcher for %s was not registered", resourceName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", clusterName);
clusterWatchers.remove(clusterName);
logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", resourceName);
cdsWatchers.remove(resourceName);
// Remove the corresponding CDS entry.
absentCdsResources.remove(clusterName);
clusterNamesToClusterUpdates.remove(clusterName);
absentCdsResources.remove(resourceName);
clusterNamesToCdsUpdates.remove(resourceName);
// Cancel and delete response timer waiting for the corresponding resource.
if (cdsRespTimers.containsKey(clusterName)) {
cdsRespTimers.get(clusterName).cancel();
cdsRespTimers.remove(clusterName);
if (cdsRespTimers.containsKey(resourceName)) {
cdsRespTimers.get(resourceName).cancel();
cdsRespTimers.remove(resourceName);
}
// No longer interested in this cluster, send an updated CDS request to unsubscribe
// this resource.
@ -350,36 +350,36 @@ final class XdsClientImpl extends XdsClient {
}
checkState(adsStream != null,
"Severe bug: ADS stream was not created while an endpoint watcher was registered");
adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet());
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
}
}
@Override
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
void watchEdsResource(String resourceName, EdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!endpointWatchers.containsKey(clusterName)) {
logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", clusterName);
if (!edsWatchers.containsKey(resourceName)) {
logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", resourceName);
needRequest = true;
endpointWatchers.put(clusterName, new HashSet<EndpointWatcher>());
edsWatchers.put(resourceName, new HashSet<EdsResourceWatcher>());
}
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName);
Set<EdsResourceWatcher> watchers = edsWatchers.get(resourceName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName);
watchers.add(watcher);
// If local cache contains endpoint information for the cluster to be watched, notify
// the watcher immediately.
if (absentEdsResources.contains(clusterName)) {
if (absentEdsResources.contains(resourceName)) {
logger.log(
XdsLogLevel.DEBUG,
"Endpoint resource for cluster {0} is known to be absent.", clusterName);
watcher.onResourceDoesNotExist(clusterName);
"Endpoint resource for cluster {0} is known to be absent.", resourceName);
watcher.onResourceDoesNotExist(resourceName);
return;
}
if (clusterNamesToEndpointUpdates.containsKey(clusterName)) {
if (clusterNamesToEdsUpdates.containsKey(resourceName)) {
logger.log(
XdsLogLevel.DEBUG,
"Retrieve endpoints info for cluster {0} from local cache.", clusterName);
watcher.onEndpointChanged(clusterNamesToEndpointUpdates.get(clusterName));
"Retrieve endpoints info for cluster {0} from local cache.", resourceName);
watcher.onChanged(clusterNamesToEdsUpdates.get(resourceName));
return;
}
@ -391,34 +391,34 @@ final class XdsClientImpl extends XdsClient {
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet());
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new EdsResourceFetchTimeoutTask(clusterName),
new EdsResourceFetchTimeoutTask(resourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
edsRespTimers.put(clusterName, timeoutHandle);
edsRespTimers.put(resourceName, timeoutHandle);
}
}
@Override
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
Set<EdsResourceWatcher> watchers = edsWatchers.get(resourceName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", clusterName);
"watcher for %s was not registered", resourceName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", clusterName);
endpointWatchers.remove(clusterName);
logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", resourceName);
edsWatchers.remove(resourceName);
// Remove the corresponding EDS cache entry.
absentEdsResources.remove(clusterName);
clusterNamesToEndpointUpdates.remove(clusterName);
absentEdsResources.remove(resourceName);
clusterNamesToEdsUpdates.remove(resourceName);
// Cancel and delete response timer waiting for the corresponding resource.
if (edsRespTimers.containsKey(clusterName)) {
edsRespTimers.get(clusterName).cancel();
edsRespTimers.remove(clusterName);
if (edsRespTimers.containsKey(resourceName)) {
edsRespTimers.get(resourceName).cancel();
edsRespTimers.remove(resourceName);
}
// No longer interested in this cluster, send an updated EDS request to unsubscribe
// this resource.
@ -426,7 +426,7 @@ final class XdsClientImpl extends XdsClient {
// Currently in retry backoff.
return;
}
adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet());
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
}
}
@ -949,7 +949,7 @@ final class XdsClientImpl extends XdsClient {
} catch (InvalidProtocolBufferException e) {
logger.log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e);
adsStream.sendNackRequest(
ResourceType.CDS, clusterWatchers.keySet(),
ResourceType.CDS, cdsWatchers.keySet(),
cdsResponse.getVersionInfo(), "Malformed CDS response: " + e);
return;
}
@ -957,7 +957,7 @@ final class XdsClientImpl extends XdsClient {
String errorMessage = null;
// Cluster information update for requested clusters received in this CDS response.
Map<String, ClusterUpdate> clusterUpdates = new HashMap<>();
Map<String, CdsUpdate> cdsUpdates = new HashMap<>();
// CDS responses represents the state of the world, EDS services not referenced by
// Clusters are those no longer exist.
Set<String> edsServices = new HashSet<>();
@ -967,10 +967,10 @@ final class XdsClientImpl extends XdsClient {
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!clusterWatchers.containsKey(clusterName)) {
if (!cdsWatchers.containsKey(clusterName)) {
continue;
}
ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder();
CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
// The type field must be set to EDS.
if (!cluster.getType().equals(DiscoveryType.EDS)) {
@ -1020,48 +1020,48 @@ final class XdsClientImpl extends XdsClient {
errorMessage = "Cluster " + clusterName + " : " + e.getMessage();
break;
}
clusterUpdates.put(clusterName, updateBuilder.build());
cdsUpdates.put(clusterName, updateBuilder.build());
}
if (errorMessage != null) {
adsStream.sendNackRequest(
ResourceType.CDS,
clusterWatchers.keySet(),
cdsWatchers.keySet(),
cdsResponse.getVersionInfo(),
errorMessage);
return;
}
adsStream.sendAckRequest(ResourceType.CDS, clusterWatchers.keySet(),
adsStream.sendAckRequest(ResourceType.CDS, cdsWatchers.keySet(),
cdsResponse.getVersionInfo());
// Update local CDS cache with data in this response.
absentCdsResources.removeAll(clusterUpdates.keySet());
for (Map.Entry<String, ClusterUpdate> entry : clusterNamesToClusterUpdates.entrySet()) {
if (!clusterUpdates.containsKey(entry.getKey())) {
absentCdsResources.removeAll(cdsUpdates.keySet());
for (Map.Entry<String, CdsUpdate> entry : clusterNamesToCdsUpdates.entrySet()) {
if (!cdsUpdates.containsKey(entry.getKey())) {
// Some previously existing resource no longer exists.
absentCdsResources.add(entry.getKey());
} else if (clusterUpdates.get(entry.getKey()).equals(entry.getValue())) {
clusterUpdates.remove(entry.getKey());
} else if (cdsUpdates.get(entry.getKey()).equals(entry.getValue())) {
cdsUpdates.remove(entry.getKey());
}
}
clusterNamesToClusterUpdates.keySet().removeAll(absentCdsResources);
clusterNamesToClusterUpdates.putAll(clusterUpdates);
clusterNamesToCdsUpdates.keySet().removeAll(absentCdsResources);
clusterNamesToCdsUpdates.putAll(cdsUpdates);
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
for (String clusterName : clusterNamesToEndpointUpdates.keySet()) {
for (String clusterName : clusterNamesToEdsUpdates.keySet()) {
if (!edsServices.contains(clusterName)) {
absentEdsResources.add(clusterName);
// Notify EDS resource removal to watchers.
if (endpointWatchers.containsKey(clusterName)) {
Set<EndpointWatcher> watchers = endpointWatchers.get(clusterName);
for (EndpointWatcher watcher : watchers) {
if (edsWatchers.containsKey(clusterName)) {
Set<EdsResourceWatcher> watchers = edsWatchers.get(clusterName);
for (EdsResourceWatcher watcher : watchers) {
watcher.onResourceDoesNotExist(clusterName);
}
}
}
}
clusterNamesToEndpointUpdates.keySet().retainAll(edsServices);
clusterNamesToEdsUpdates.keySet().retainAll(edsServices);
for (String clusterName : clusterUpdates.keySet()) {
for (String clusterName : cdsUpdates.keySet()) {
if (cdsRespTimers.containsKey(clusterName)) {
cdsRespTimers.get(clusterName).cancel();
cdsRespTimers.remove(clusterName);
@ -1069,17 +1069,17 @@ final class XdsClientImpl extends XdsClient {
}
// Notify watchers if clusters interested in present in this CDS response.
for (Map.Entry<String, Set<ClusterWatcher>> entry : clusterWatchers.entrySet()) {
for (Map.Entry<String, Set<CdsResourceWatcher>> entry : cdsWatchers.entrySet()) {
String clusterName = entry.getKey();
if (clusterUpdates.containsKey(entry.getKey())) {
ClusterUpdate clusterUpdate = clusterUpdates.get(clusterName);
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onClusterChanged(clusterUpdate);
if (cdsUpdates.containsKey(entry.getKey())) {
CdsUpdate cdsUpdate = cdsUpdates.get(clusterName);
for (CdsResourceWatcher watcher : entry.getValue()) {
watcher.onChanged(cdsUpdate);
}
} else if (!clusterNamesToClusterUpdates.containsKey(entry.getKey())
} else if (!clusterNamesToCdsUpdates.containsKey(entry.getKey())
&& !cdsRespTimers.containsKey(clusterName)) {
// Update for previously present resource being removed.
for (ClusterWatcher watcher : entry.getValue()) {
for (CdsResourceWatcher watcher : entry.getValue()) {
watcher.onResourceDoesNotExist(entry.getKey());
}
}
@ -1124,7 +1124,7 @@ final class XdsClientImpl extends XdsClient {
logger.log(
XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e);
adsStream.sendNackRequest(
ResourceType.EDS, endpointWatchers.keySet(),
ResourceType.EDS, edsWatchers.keySet(),
edsResponse.getVersionInfo(), "Malformed EDS response: " + e);
return;
}
@ -1132,7 +1132,7 @@ final class XdsClientImpl extends XdsClient {
String errorMessage = null;
// Endpoint information updates for requested clusters received in this EDS response.
Map<String, EndpointUpdate> endpointUpdates = new HashMap<>();
Map<String, EdsUpdate> edsUpdates = new HashMap<>();
// Walk through each ClusterLoadAssignment message. If any of them for requested clusters
// contain invalid information for gRPC's load balancing usage, the whole response is rejected.
for (ClusterLoadAssignment assignment : clusterLoadAssignments) {
@ -1141,10 +1141,10 @@ final class XdsClientImpl extends XdsClient {
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!endpointWatchers.containsKey(clusterName)) {
if (!edsWatchers.containsKey(clusterName)) {
continue;
}
EndpointUpdate.Builder updateBuilder = EndpointUpdate.newBuilder();
EdsUpdate.Builder updateBuilder = EdsUpdate.newBuilder();
updateBuilder.setClusterName(clusterName);
Set<Integer> priorities = new HashSet<>();
int maxPriority = -1;
@ -1192,37 +1192,37 @@ final class XdsClientImpl extends XdsClient {
: assignment.getPolicy().getDropOverloadsList()) {
updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload));
}
EndpointUpdate update = updateBuilder.build();
endpointUpdates.put(clusterName, update);
EdsUpdate update = updateBuilder.build();
edsUpdates.put(clusterName, update);
}
if (errorMessage != null) {
adsStream.sendNackRequest(
ResourceType.EDS,
endpointWatchers.keySet(),
edsWatchers.keySet(),
edsResponse.getVersionInfo(),
errorMessage);
return;
}
adsStream.sendAckRequest(ResourceType.EDS, endpointWatchers.keySet(),
adsStream.sendAckRequest(ResourceType.EDS, edsWatchers.keySet(),
edsResponse.getVersionInfo());
// Update local EDS cache by inserting updated endpoint information.
clusterNamesToEndpointUpdates.putAll(endpointUpdates);
absentEdsResources.removeAll(endpointUpdates.keySet());
clusterNamesToEdsUpdates.putAll(edsUpdates);
absentEdsResources.removeAll(edsUpdates.keySet());
// Notify watchers waiting for updates of endpoint information received in this EDS response.
// Based on xDS protocol, the management server should not send endpoint data again if
// nothing has changed.
for (Map.Entry<String, EndpointUpdate> entry : endpointUpdates.entrySet()) {
for (Map.Entry<String, EdsUpdate> entry : edsUpdates.entrySet()) {
String clusterName = entry.getKey();
// Cancel and delete response timeout timer.
if (edsRespTimers.containsKey(clusterName)) {
edsRespTimers.get(clusterName).cancel();
edsRespTimers.remove(clusterName);
}
if (endpointWatchers.containsKey(clusterName)) {
for (EndpointWatcher watcher : endpointWatchers.get(clusterName)) {
watcher.onEndpointChanged(entry.getValue());
if (edsWatchers.containsKey(clusterName)) {
for (EdsResourceWatcher watcher : edsWatchers.get(clusterName)) {
watcher.onChanged(entry.getValue());
}
}
}
@ -1249,9 +1249,9 @@ final class XdsClientImpl extends XdsClient {
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
if (!clusterWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet());
for (String clusterName : clusterWatchers.keySet()) {
if (!cdsWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
for (String clusterName : cdsWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
@ -1260,9 +1260,9 @@ final class XdsClientImpl extends XdsClient {
cdsRespTimers.put(clusterName, timeoutHandle);
}
}
if (!endpointWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet());
for (String clusterName : endpointWatchers.keySet()) {
if (!edsWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
for (String clusterName : edsWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
@ -1521,13 +1521,13 @@ final class XdsClientImpl extends XdsClient {
if (listenerWatcher != null) {
listenerWatcher.onError(error);
}
for (Set<ClusterWatcher> watchers : clusterWatchers.values()) {
for (ClusterWatcher watcher : watchers) {
for (Set<CdsResourceWatcher> watchers : cdsWatchers.values()) {
for (CdsResourceWatcher watcher : watchers) {
watcher.onError(error);
}
}
for (Set<EndpointWatcher> watchers : endpointWatchers.values()) {
for (EndpointWatcher watcher : watchers) {
for (Set<EdsResourceWatcher> watchers : edsWatchers.values()) {
for (EdsResourceWatcher watcher : watchers) {
watcher.onError(error);
}
}
@ -1910,7 +1910,7 @@ final class XdsClientImpl extends XdsClient {
super.run();
cdsRespTimers.remove(resourceName);
absentCdsResources.add(resourceName);
for (ClusterWatcher wat : clusterWatchers.get(resourceName)) {
for (CdsResourceWatcher wat : cdsWatchers.get(resourceName)) {
wat.onResourceDoesNotExist(resourceName);
}
}
@ -1928,7 +1928,7 @@ final class XdsClientImpl extends XdsClient {
super.run();
edsRespTimers.remove(resourceName);
absentEdsResources.add(resourceName);
for (EndpointWatcher wat : endpointWatchers.get(resourceName)) {
for (EdsResourceWatcher wat : edsWatchers.get(resourceName)) {
wat.onResourceDoesNotExist(resourceName);
}
}

View File

@ -306,17 +306,17 @@ public class CdsLoadBalancerTest {
}
private final class FakeXdsClient extends XdsClient {
private ClusterWatcher watcher;
private CdsResourceWatcher watcher;
@Override
void watchClusterData(String clusterName, ClusterWatcher watcher) {
assertThat(clusterName).isEqualTo(CLUSTER);
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
assertThat(resourceName).isEqualTo(CLUSTER);
this.watcher = watcher;
}
@Override
void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) {
assertThat(clusterName).isEqualTo(CLUSTER);
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
assertThat(resourceName).isEqualTo(CLUSTER);
assertThat(watcher).isSameInstanceAs(this.watcher);
this.watcher = null;
}
@ -331,8 +331,8 @@ public class CdsLoadBalancerTest {
syncContext.execute(new Runnable() {
@Override
public void run() {
watcher.onClusterChanged(
ClusterUpdate.newBuilder()
watcher.onChanged(
CdsUpdate.newBuilder()
.setClusterName(CLUSTER)
.setEdsServiceName(edsServiceName)
.setLbPolicy("round_robin") // only supported policy
@ -348,8 +348,8 @@ public class CdsLoadBalancerTest {
syncContext.execute(new Runnable() {
@Override
public void run() {
watcher.onClusterChanged(
ClusterUpdate.newBuilder()
watcher.onChanged(
CdsUpdate.newBuilder()
.setClusterName(CLUSTER)
.setEdsServiceName(edsServiceName)
.setLbPolicy("round_robin") // only supported policy

View File

@ -711,7 +711,7 @@ public class EdsLoadBalancer2Test {
}
private final class FakeXdsClient extends XdsClient {
private final Map<String, EndpointWatcher> watchers = new HashMap<>();
private final Map<String, EdsResourceWatcher> watchers = new HashMap<>();
private final Map<String, ConcurrentMap<String, AtomicLong>> dropStats = new HashMap<>();
@Override
@ -720,13 +720,13 @@ public class EdsLoadBalancer2Test {
}
@Override
void watchEndpointData(String clusterName, EndpointWatcher watcher) {
watchers.put(clusterName, watcher);
void watchEdsResource(String resourceName, EdsResourceWatcher watcher) {
watchers.put(resourceName, watcher);
}
@Override
void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) {
watchers.remove(clusterName);
void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) {
watchers.remove(resourceName);
}
@Override
@ -755,14 +755,14 @@ public class EdsLoadBalancer2Test {
@Override
public void run() {
if (watchers.containsKey(resource)) {
EndpointUpdate.Builder builder = EndpointUpdate.newBuilder().setClusterName(resource);
EdsUpdate.Builder builder = EdsUpdate.newBuilder().setClusterName(resource);
for (DropOverload dropOverload : dropOverloads) {
builder.addDropPolicy(dropOverload);
}
for (Locality locality : localityLbEndpointsMap.keySet()) {
builder.addLocalityLbEndpoints(locality, localityLbEndpointsMap.get(locality));
}
watchers.get(resource).onEndpointChanged(builder.build());
watchers.get(resource).onChanged(builder.build());
}
}
});
@ -783,7 +783,7 @@ public class EdsLoadBalancer2Test {
syncContext.execute(new Runnable() {
@Override
public void run() {
for (EndpointWatcher watcher : watchers.values()) {
for (EdsResourceWatcher watcher : watchers.values()) {
watcher.onError(error);
}
}

View File

@ -91,12 +91,12 @@ import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ClusterUpdate;
import io.grpc.xds.XdsClient.ClusterWatcher;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClientImpl.MessagePrinter;
import java.io.IOException;
@ -206,9 +206,9 @@ public class XdsClientImplTest {
@Mock
private ConfigWatcher configWatcher;
@Mock
private ClusterWatcher clusterWatcher;
private CdsResourceWatcher cdsResourceWatcher;
@Mock
private EndpointWatcher endpointWatcher;
private EdsResourceWatcher edsResourceWatcher;
private ManagedChannel channel;
private XdsClientImpl xdsClient;
@ -1337,7 +1337,7 @@ public class XdsClientImplTest {
*/
@Test
public void cdsResponseWithoutMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1359,12 +1359,12 @@ public class XdsClientImplTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
verify(clusterWatcher, never()).onClusterChanged(any(ClusterUpdate.class));
verify(clusterWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(clusterWatcher, never()).onError(any(Status.class));
verify(cdsResourceWatcher, never()).onChanged(any(CdsUpdate.class));
verify(cdsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -1374,7 +1374,7 @@ public class XdsClientImplTest {
*/
@Test
public void cdsResponseWithMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1401,13 +1401,13 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
assertThat(cdsRespTimer.isCancelled()).isTrue();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName()).isNull();
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isNull();
// Management server sends back another CDS response updating the requested Cluster.
clusters = ImmutableList.of(
@ -1424,13 +1424,13 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "1", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture());
clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName())
.isEqualTo("eds-cluster-foo.googleapis.com");
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isEqualTo("");
}
/**
@ -1438,7 +1438,7 @@ public class XdsClientImplTest {
*/
@Test
public void cdsResponseWithUpstreamTlsContext() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1458,10 +1458,10 @@ public class XdsClientImplTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher, times(1)).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = clusterUpdate
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate
.getUpstreamTlsContext();
SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext()
.getValidationContextSdsSecretConfig();
@ -1478,13 +1478,13 @@ public class XdsClientImplTest {
}
@Test
public void multipleClusterWatchers() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
public void multipleCdsWatchers() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1516,25 +1516,25 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
// Two watchers get notification of cluster update for the cluster they are interested in.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName()).isNull();
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isNull();
verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class));
verify(watcher3, never()).onChanged(any(CdsUpdate.class));
verify(watcher3, never()).onResourceDoesNotExist("cluster-bar.googleapis.com");
verify(watcher3, never()).onError(any(Status.class));
@ -1563,14 +1563,14 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
verifyNoMoreInteractions(watcher1, watcher2); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName())
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(cdsUpdateCaptor3.capture());
CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue();
assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(cdsUpdate3.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate3.getLrsServerName()).isEqualTo("");
}
/**
@ -1580,8 +1580,8 @@ public class XdsClientImplTest {
*/
@Test
public void watchClusterAlreadyBeingWatched() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -1606,28 +1606,28 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// Another cluster watcher interested in the same cluster is added.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
// Since the client has received cluster update for this cluster before, cached result is
// notified to the newly added watcher immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName()).isNull();
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isNull();
verifyNoMoreInteractions(requestObserver);
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
@ -1637,9 +1637,9 @@ public class XdsClientImplTest {
* Basic operations of adding/canceling cluster data watchers.
*/
@Test
public void addRemoveClusterWatchers() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
public void addRemoveCdsWatchers() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -1663,17 +1663,17 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
// Add another cluster watcher for a different cluster.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher2);
// Client sent a new CDS request for all interested resources.
verify(requestObserver)
@ -1701,17 +1701,17 @@ public class XdsClientImplTest {
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
verifyNoMoreInteractions(watcher1); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isEqualTo("");
// Cancel one of the watcher.
xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1);
xdsClient.cancelCdsResourceWatch("cluster-foo.googleapis.com", watcher1);
// Since the cancelled watcher was the last watcher interested in that cluster (but there
// is still interested resource), client sent an new CDS request to unsubscribe from
@ -1723,7 +1723,7 @@ public class XdsClientImplTest {
// Management server has nothing to respond.
// Cancel the other watcher. All resources have been unsubscribed.
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher2);
verify(requestObserver)
.onNext(
@ -1750,9 +1750,9 @@ public class XdsClientImplTest {
verifyNoMoreInteractions(watcher1, watcher2);
// A new cluster watcher is added to watch cluster foo again.
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3);
verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class));
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher3);
verify(watcher3, never()).onChanged(any(CdsUpdate.class));
// A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only.
verify(requestObserver)
@ -1770,13 +1770,13 @@ public class XdsClientImplTest {
responseObserver.onNext(response);
// Notified with cached data immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName()).isNull();
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(cdsUpdateCaptor3.capture());
CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue();
assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate3.getEdsServiceName()).isNull();
assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isEqualTo("");
verifyNoMoreInteractions(watcher1, watcher2);
@ -1787,9 +1787,9 @@ public class XdsClientImplTest {
}
@Test
public void addRemoveClusterWatcherWhileInitialResourceFetchInProgress() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
public void addRemoveCdsWatcherWhileInitialResourceFetchInProgress() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1804,12 +1804,12 @@ public class XdsClientImplTest {
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS);
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
ClusterWatcher watcher4 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher4);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher4 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher4);
// Client sends a new CDS request for updating the latest resource subscription.
verify(requestObserver)
@ -1828,8 +1828,8 @@ public class XdsClientImplTest {
verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com");
// The absence result is known immediately.
ClusterWatcher watcher5 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher5);
CdsResourceWatcher watcher5 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher5);
verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
@ -1837,9 +1837,9 @@ public class XdsClientImplTest {
// Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still
// in progress.
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher3);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher3);
assertThat(timeoutTask.isCancelled()).isFalse();
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher4);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher4);
// Client sends a CDS request for resource subscription update (Omitted).
@ -1853,7 +1853,7 @@ public class XdsClientImplTest {
@Test
public void cdsUpdateForClusterBeingRemoved() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1871,13 +1871,13 @@ public class XdsClientImplTest {
// Client sent an ACK CDS request (Omitted).
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName()).isNull();
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isEqualTo("");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// No cluster is available.
@ -1886,7 +1886,7 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_CDS, "0001");
responseObserver.onNext(response);
verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
}
/**
@ -1898,7 +1898,7 @@ public class XdsClientImplTest {
*/
@Test
public void edsResponseWithoutMatchingResource() {
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1936,11 +1936,11 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
verify(endpointWatcher, never()).onEndpointChanged(any(EndpointUpdate.class));
verify(endpointWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(endpointWatcher, never()).onError(any(Status.class));
verify(edsResourceWatcher, never()).onChanged(any(EdsUpdate.class));
verify(edsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(edsResourceWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(endpointWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -1950,7 +1950,7 @@ public class XdsClientImplTest {
*/
@Test
public void edsResponseWithMatchingResource() {
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2002,15 +2002,15 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor = ArgumentCaptor.forClass(null);
verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture());
EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate.getDropPolicies())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate.getDropPolicies())
.containsExactly(
new DropOverload("lb", 200),
new DropOverload("throttle", 1000));
assertThat(endpointUpdate.getLocalityLbEndpointsMap())
assertThat(edsUpdate.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2035,21 +2035,21 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "1", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
verify(endpointWatcher, times(2)).onEndpointChanged(endpointUpdateCaptor.capture());
endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate.getDropPolicies()).isEmpty();
assertThat(endpointUpdate.getLocalityLbEndpointsMap()).isEmpty();
verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture());
edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate.getDropPolicies()).isEmpty();
assertThat(edsUpdate.getLocalityLbEndpointsMap()).isEmpty();
}
@Test
public void multipleEndpointWatchers() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
public void multipleEdsWatchers() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2091,11 +2091,11 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
// Two watchers get notification of endpoint update for the cluster they are interested in.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2103,11 +2103,11 @@ public class XdsClientImplTest {
new LbEndpoint("192.168.0.1", 8080,
2, true)), 1, 0));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2141,11 +2141,11 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
// The corresponding watcher gets notified.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(edsUpdateCaptor3.capture());
EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue();
assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate3.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region2", "zone2", "subzone2"),
new LocalityLbEndpoints(
@ -2161,8 +2161,8 @@ public class XdsClientImplTest {
*/
@Test
public void watchEndpointsForClusterAlreadyBeingWatched() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2196,12 +2196,12 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getDropPolicies()).isEmpty();
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getDropPolicies()).isEmpty();
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2210,17 +2210,17 @@ public class XdsClientImplTest {
2, true)), 1, 0));
// A second endpoint watcher is registered for endpoints in the same cluster.
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
// Cached endpoint information is notified to the new watcher immediately, without sending
// another EDS request.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate2.getDropPolicies()).isEmpty();
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate2.getDropPolicies()).isEmpty();
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2236,9 +2236,9 @@ public class XdsClientImplTest {
* Basic operations of adding/canceling endpoint data watchers.
*/
@Test
public void addRemoveEndpointWatchers() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
public void addRemoveEdsWatchers() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -2271,11 +2271,11 @@ public class XdsClientImplTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2285,8 +2285,8 @@ public class XdsClientImplTest {
1, 0));
// Add another endpoint watcher for a different cluster.
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher2);
// Client sent a new EDS request for all interested resources.
verify(requestObserver)
@ -2319,11 +2319,11 @@ public class XdsClientImplTest {
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_EDS, "0001")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region2", "zone2", "subzone2"),
new LocalityLbEndpoints(
@ -2332,7 +2332,7 @@ public class XdsClientImplTest {
6, 0));
// Cancel one of the watcher.
xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1);
xdsClient.cancelEdsResourceWatch("cluster-foo.googleapis.com", watcher1);
// Since the cancelled watcher was the last watcher interested in that cluster, client
// sent an new EDS request to unsubscribe from that cluster.
@ -2343,7 +2343,7 @@ public class XdsClientImplTest {
// Management server should not respond as it had previously sent the requested resource.
// Cancel the other watcher.
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher2);
// Since the cancelled watcher was the last watcher interested in that cluster, client
// sent an new EDS request to unsubscribe from that cluster.
@ -2389,13 +2389,13 @@ public class XdsClientImplTest {
verifyNoMoreInteractions(watcher1, watcher2);
// A new endpoint watcher is added to watch an old but was no longer interested in cluster.
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
// Nothing should be notified to the new watcher as we are still waiting management server's
// latest response.
// Cached endpoint data should have been purged.
verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class));
verify(watcher3, never()).onChanged(any(EdsUpdate.class));
// An EDS request is sent to re-subscribe the cluster again.
verify(requestObserver)
@ -2416,11 +2416,11 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_EDS, "0003");
responseObserver.onNext(response);
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(edsUpdateCaptor3.capture());
EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue();
assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate3.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region4", "zone4", "subzone4"),
new LocalityLbEndpoints(
@ -2438,9 +2438,9 @@ public class XdsClientImplTest {
}
@Test
public void addRemoveEndpointWatcherWhileInitialResourceFetchInProgress() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
public void addRemoveEdsWatcherWhileInitialResourceFetchInProgress() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2455,12 +2455,12 @@ public class XdsClientImplTest {
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS);
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
EndpointWatcher watcher4 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher4);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher4 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher4);
// Client sends a new EDS request for updating the latest resource subscription.
verify(requestObserver)
@ -2479,8 +2479,8 @@ public class XdsClientImplTest {
verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com");
// The absence result is known immediately.
EndpointWatcher watcher5 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher5);
EdsResourceWatcher watcher5 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher5);
verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
@ -2488,9 +2488,9 @@ public class XdsClientImplTest {
// Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still
// in progress.
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher3);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher3);
assertThat(timeoutTask.isCancelled()).isFalse();
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher4);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher4);
// Client sends an EDS request for resource subscription update (Omitted).
@ -2504,7 +2504,7 @@ public class XdsClientImplTest {
@Test
public void cdsUpdateForEdsServiceNameChange() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
// Management server sends back a CDS response containing requested resource.
@ -2514,7 +2514,7 @@ public class XdsClientImplTest {
buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000");
responseObserver.onNext(response);
xdsClient.watchEndpointData("cluster-foo:service-bar", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo:service-bar", edsResourceWatcher);
// Management server sends back an EDS response for resource "cluster-foo:service-bar".
List<Any> clusterLoadAssignments = ImmutableList.of(
@ -2530,12 +2530,12 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_EDS, "0000");
responseObserver.onNext(response);
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor = ArgumentCaptor.forClass(null);
verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture());
EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar");
assertThat(endpointUpdate.getDropPolicies()).isEmpty();
assertThat(endpointUpdate.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar");
assertThat(edsUpdate.getDropPolicies()).isEmpty();
assertThat(edsUpdate.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2552,7 +2552,7 @@ public class XdsClientImplTest {
responseObserver.onNext(response);
// Watcher get notification for endpoint resource "cluster-foo:service-bar" being deleted.
verify(endpointWatcher).onResourceDoesNotExist("cluster-foo:service-bar");
verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo:service-bar");
}
/**
@ -2737,7 +2737,7 @@ public class XdsClientImplTest {
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Start watching cluster information.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher);
// Client sent first CDS request.
verify(requestObserver)
@ -2745,7 +2745,7 @@ public class XdsClientImplTest {
XdsClientImpl.ADS_TYPE_URL_CDS, "")));
// Start watching endpoint information.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher);
// Client sent first EDS request.
verify(requestObserver)
@ -2756,9 +2756,9 @@ public class XdsClientImplTest {
responseObserver.onError(Status.UNKNOWN.asException());
verify(configWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(clusterWatcher).onError(statusCaptor.capture());
verify(cdsResourceWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(endpointWatcher).onError(statusCaptor.capture());
verify(edsResourceWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
// Resets backoff and retry immediately.
@ -2784,9 +2784,9 @@ public class XdsClientImplTest {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(2)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(2)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2813,9 +2813,9 @@ public class XdsClientImplTest {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(3)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(3)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2850,8 +2850,8 @@ public class XdsClientImplTest {
// Management server closes the RPC stream.
responseObserver.onCompleted();
verify(configWatcher, times(4)).onError(any(Status.class));
verify(clusterWatcher, times(4)).onError(any(Status.class));
verify(endpointWatcher, times(4)).onError(any(Status.class));
verify(cdsResourceWatcher, times(4)).onError(any(Status.class));
verify(edsResourceWatcher, times(4)).onError(any(Status.class));
// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
@ -2875,9 +2875,9 @@ public class XdsClientImplTest {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(5)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(5)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2944,7 +2944,7 @@ public class XdsClientImplTest {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching cluster information while RPC stream is still in retry backoff.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(9L);
@ -2968,7 +2968,7 @@ public class XdsClientImplTest {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching endpoint information while RPC stream is still in retry backoff.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(99L);
@ -2999,14 +2999,14 @@ public class XdsClientImplTest {
// Client sent an CDS ACK request (Omitted).
// No longer interested in endpoint information after RPC resumes.
xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher);
xdsClient.cancelEdsResourceWatch("cluster.googleapis.com", edsResourceWatcher);
// Client updates EDS resource subscription immediately.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", ImmutableList.<String>of(),
XdsClientImpl.ADS_TYPE_URL_EDS, "")));
// Become interested in endpoints of another cluster.
xdsClient.watchEndpointData("cluster2.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster2.googleapis.com", edsResourceWatcher);
// Client updates EDS resource subscription immediately.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com",
@ -3038,8 +3038,8 @@ public class XdsClientImplTest {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// No longer interested in previous cluster and endpoints in that cluster.
xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher);
xdsClient.cancelEndpointDataWatch("cluster2.googleapis.com", endpointWatcher);
xdsClient.cancelCdsResourceWatch("cluster.googleapis.com", cdsResourceWatcher);
xdsClient.cancelEdsResourceWatch("cluster2.googleapis.com", edsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(19L);
@ -3162,7 +3162,7 @@ public class XdsClientImplTest {
// Client/server resumed LDS/RDS request/response (Omitted).
// Start watching cluster data.
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
ScheduledTask cdsRespTimeoutTask =
Iterables.getOnlyElement(
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
@ -3190,7 +3190,7 @@ public class XdsClientImplTest {
.isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
// Start watching endpoint data.
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
ScheduledTask edsTimeoutTask =
Iterables.getOnlyElement(
fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));

View File

@ -91,12 +91,12 @@ import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ClusterUpdate;
import io.grpc.xds.XdsClient.ClusterWatcher;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClientImpl.MessagePrinter;
import java.io.IOException;
@ -205,9 +205,9 @@ public class XdsClientImplTestV2 {
@Mock
private ConfigWatcher configWatcher;
@Mock
private ClusterWatcher clusterWatcher;
private CdsResourceWatcher cdsResourceWatcher;
@Mock
private EndpointWatcher endpointWatcher;
private EdsResourceWatcher edsResourceWatcher;
private ManagedChannel channel;
private XdsClientImpl xdsClient;
@ -1347,7 +1347,7 @@ public class XdsClientImplTestV2 {
*/
@Test
public void cdsResponseWithoutMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1369,12 +1369,12 @@ public class XdsClientImplTestV2 {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
verify(clusterWatcher, never()).onClusterChanged(any(ClusterUpdate.class));
verify(clusterWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(clusterWatcher, never()).onError(any(Status.class));
verify(cdsResourceWatcher, never()).onChanged(any(CdsUpdate.class));
verify(cdsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -1384,7 +1384,7 @@ public class XdsClientImplTestV2 {
*/
@Test
public void cdsResponseWithMatchingResource() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1411,13 +1411,13 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
assertThat(cdsRespTimer.isCancelled()).isTrue();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName()).isNull();
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isNull();
// Management server sends back another CDS response updating the requested Cluster.
clusters = ImmutableList.of(
@ -1434,13 +1434,13 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "1", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001")));
verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture());
clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName())
.isEqualTo("eds-cluster-foo.googleapis.com");
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isEqualTo("");
}
/**
@ -1448,7 +1448,7 @@ public class XdsClientImplTestV2 {
*/
@Test
public void cdsResponseWithUpstreamTlsContext() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1468,10 +1468,10 @@ public class XdsClientImplTestV2 {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher, times(1)).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = clusterUpdate
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate
.getUpstreamTlsContext();
SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext()
.getValidationContextSdsSecretConfig();
@ -1488,13 +1488,13 @@ public class XdsClientImplTestV2 {
}
@Test
public void multipleClusterWatchers() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
public void multipleCdsWatchers() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1526,25 +1526,25 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
// Two watchers get notification of cluster update for the cluster they are interested in.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName()).isNull();
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isNull();
verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class));
verify(watcher3, never()).onChanged(any(CdsUpdate.class));
verify(watcher3, never()).onResourceDoesNotExist("cluster-bar.googleapis.com");
verify(watcher3, never()).onError(any(Status.class));
@ -1573,14 +1573,14 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001")));
verifyNoMoreInteractions(watcher1, watcher2); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName())
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(cdsUpdateCaptor3.capture());
CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue();
assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(cdsUpdate3.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate3.getLrsServerName()).isEqualTo("");
}
/**
@ -1590,8 +1590,8 @@ public class XdsClientImplTestV2 {
*/
@Test
public void watchClusterAlreadyBeingWatched() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -1616,28 +1616,28 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// Another cluster watcher interested in the same cluster is added.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
// Since the client has received cluster update for this cluster before, cached result is
// notified to the newly added watcher immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName()).isNull();
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isNull();
verifyNoMoreInteractions(requestObserver);
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
@ -1647,9 +1647,9 @@ public class XdsClientImplTestV2 {
* Basic operations of adding/canceling cluster data watchers.
*/
@Test
public void addRemoveClusterWatchers() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
public void addRemoveCdsWatchers() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -1673,17 +1673,17 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000")));
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(cdsUpdateCaptor1.capture());
CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue();
assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate1.getEdsServiceName()).isNull();
assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate1.getLrsServerName()).isNull();
// Add another cluster watcher for a different cluster.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher2);
// Client sent a new CDS request for all interested resources.
verify(requestObserver)
@ -1711,17 +1711,17 @@ public class XdsClientImplTestV2 {
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001")));
verifyNoMoreInteractions(watcher1); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(cdsUpdateCaptor2.capture());
CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue();
assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(cdsUpdate2.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isEqualTo("");
// Cancel one of the watcher.
xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1);
xdsClient.cancelCdsResourceWatch("cluster-foo.googleapis.com", watcher1);
// Since the cancelled watcher was the last watcher interested in that cluster (but there
// is still interested resource), client sent an new CDS request to unsubscribe from
@ -1733,7 +1733,7 @@ public class XdsClientImplTestV2 {
// Management server has nothing to respond.
// Cancel the other watcher. All resources have been unsubscribed.
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher2);
verify(requestObserver)
.onNext(
@ -1760,9 +1760,9 @@ public class XdsClientImplTestV2 {
verifyNoMoreInteractions(watcher1, watcher2);
// A new cluster watcher is added to watch cluster foo again.
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3);
verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class));
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher3);
verify(watcher3, never()).onChanged(any(CdsUpdate.class));
// A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only.
verify(requestObserver)
@ -1780,13 +1780,13 @@ public class XdsClientImplTestV2 {
responseObserver.onNext(response);
// Notified with cached data immediately.
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName()).isNull();
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(cdsUpdateCaptor3.capture());
CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue();
assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate3.getEdsServiceName()).isNull();
assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate2.getLrsServerName()).isEqualTo("");
verifyNoMoreInteractions(watcher1, watcher2);
@ -1797,9 +1797,9 @@ public class XdsClientImplTestV2 {
}
@Test
public void addRemoveClusterWatcherWhileInitialResourceFetchInProgress() {
ClusterWatcher watcher1 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1);
public void addRemoveCdsWatcherWhileInitialResourceFetchInProgress() {
CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1814,12 +1814,12 @@ public class XdsClientImplTestV2 {
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS);
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
ClusterWatcher watcher3 = mock(ClusterWatcher.class);
ClusterWatcher watcher4 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3);
xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher4);
CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class);
CdsResourceWatcher watcher4 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3);
xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher4);
// Client sends a new CDS request for updating the latest resource subscription.
verify(requestObserver)
@ -1838,8 +1838,8 @@ public class XdsClientImplTestV2 {
verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com");
// The absence result is known immediately.
ClusterWatcher watcher5 = mock(ClusterWatcher.class);
xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher5);
CdsResourceWatcher watcher5 = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher5);
verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
@ -1847,9 +1847,9 @@ public class XdsClientImplTestV2 {
// Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still
// in progress.
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher3);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher3);
assertThat(timeoutTask.isCancelled()).isFalse();
xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher4);
xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher4);
// Client sends a CDS request for resource subscription update (Omitted).
@ -1863,7 +1863,7 @@ public class XdsClientImplTestV2 {
@Test
public void cdsUpdateForClusterBeingRemoved() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1881,13 +1881,13 @@ public class XdsClientImplTestV2 {
// Client sent an ACK CDS request (Omitted).
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
ArgumentCaptor<CdsUpdate> cdsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(cdsUpdate.getEdsServiceName()).isNull();
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(cdsUpdate.getLrsServerName()).isEqualTo("");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// No cluster is available.
@ -1896,7 +1896,7 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001");
responseObserver.onNext(response);
verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
}
/**
@ -1908,7 +1908,7 @@ public class XdsClientImplTestV2 {
*/
@Test
public void edsResponseWithoutMatchingResource() {
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -1946,11 +1946,11 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000")));
verify(endpointWatcher, never()).onEndpointChanged(any(EndpointUpdate.class));
verify(endpointWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(endpointWatcher, never()).onError(any(Status.class));
verify(edsResourceWatcher, never()).onChanged(any(EdsUpdate.class));
verify(edsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(edsResourceWatcher, never()).onError(any(Status.class));
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
verify(endpointWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@ -1960,7 +1960,7 @@ public class XdsClientImplTestV2 {
*/
@Test
public void edsResponseWithMatchingResource() {
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2012,15 +2012,15 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor = ArgumentCaptor.forClass(null);
verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture());
EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate.getDropPolicies())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate.getDropPolicies())
.containsExactly(
new DropOverload("lb", 200),
new DropOverload("throttle", 1000));
assertThat(endpointUpdate.getLocalityLbEndpointsMap())
assertThat(edsUpdate.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2045,21 +2045,21 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "1", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001")));
verify(endpointWatcher, times(2)).onEndpointChanged(endpointUpdateCaptor.capture());
endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate.getDropPolicies()).isEmpty();
assertThat(endpointUpdate.getLocalityLbEndpointsMap()).isEmpty();
verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture());
edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate.getDropPolicies()).isEmpty();
assertThat(edsUpdate.getLocalityLbEndpointsMap()).isEmpty();
}
@Test
public void multipleEndpointWatchers() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
public void multipleEdsWatchers() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2101,11 +2101,11 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000")));
// Two watchers get notification of endpoint update for the cluster they are interested in.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2113,11 +2113,11 @@ public class XdsClientImplTestV2 {
new LbEndpoint("192.168.0.1", 8080,
2, true)), 1, 0));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2151,11 +2151,11 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001")));
// The corresponding watcher gets notified.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(edsUpdateCaptor3.capture());
EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue();
assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate3.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region2", "zone2", "subzone2"),
new LocalityLbEndpoints(
@ -2171,8 +2171,8 @@ public class XdsClientImplTestV2 {
*/
@Test
public void watchEndpointsForClusterAlreadyBeingWatched() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2206,12 +2206,12 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getDropPolicies()).isEmpty();
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getDropPolicies()).isEmpty();
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2220,17 +2220,17 @@ public class XdsClientImplTestV2 {
2, true)), 1, 0));
// A second endpoint watcher is registered for endpoints in the same cluster.
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
// Cached endpoint information is notified to the new watcher immediately, without sending
// another EDS request.
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate2.getDropPolicies()).isEmpty();
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate2.getDropPolicies()).isEmpty();
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2246,9 +2246,9 @@ public class XdsClientImplTestV2 {
* Basic operations of adding/canceling endpoint data watchers.
*/
@Test
public void addRemoveEndpointWatchers() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
public void addRemoveEdsWatchers() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
@ -2281,11 +2281,11 @@ public class XdsClientImplTestV2 {
.onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com",
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture());
EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue();
assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(endpointUpdate1.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor1 = ArgumentCaptor.forClass(null);
verify(watcher1).onChanged(edsUpdateCaptor1.capture());
EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue();
assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(edsUpdate1.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2295,8 +2295,8 @@ public class XdsClientImplTestV2 {
1, 0));
// Add another endpoint watcher for a different cluster.
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher2);
// Client sent a new EDS request for all interested resources.
verify(requestObserver)
@ -2329,11 +2329,11 @@ public class XdsClientImplTestV2 {
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001")));
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture());
EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue();
assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate2.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onChanged(edsUpdateCaptor2.capture());
EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue();
assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate2.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region2", "zone2", "subzone2"),
new LocalityLbEndpoints(
@ -2342,7 +2342,7 @@ public class XdsClientImplTestV2 {
6, 0));
// Cancel one of the watcher.
xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1);
xdsClient.cancelEdsResourceWatch("cluster-foo.googleapis.com", watcher1);
// Since the cancelled watcher was the last watcher interested in that cluster, client
// sent an new EDS request to unsubscribe from that cluster.
@ -2353,7 +2353,7 @@ public class XdsClientImplTestV2 {
// Management server should not respond as it had previously sent the requested resource.
// Cancel the other watcher.
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher2);
// Since the cancelled watcher was the last watcher interested in that cluster, client
// sent an new EDS request to unsubscribe from that cluster.
@ -2399,13 +2399,13 @@ public class XdsClientImplTestV2 {
verifyNoMoreInteractions(watcher1, watcher2);
// A new endpoint watcher is added to watch an old but was no longer interested in cluster.
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
// Nothing should be notified to the new watcher as we are still waiting management server's
// latest response.
// Cached endpoint data should have been purged.
verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class));
verify(watcher3, never()).onChanged(any(EdsUpdate.class));
// An EDS request is sent to re-subscribe the cluster again.
verify(requestObserver)
@ -2426,11 +2426,11 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0003");
responseObserver.onNext(response);
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture());
EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue();
assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(endpointUpdate3.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onChanged(edsUpdateCaptor3.capture());
EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue();
assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com");
assertThat(edsUpdate3.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region4", "zone4", "subzone4"),
new LocalityLbEndpoints(
@ -2448,9 +2448,9 @@ public class XdsClientImplTestV2 {
}
@Test
public void addRemoveEndpointWatcherWhileInitialResourceFetchInProgress() {
EndpointWatcher watcher1 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1);
public void addRemoveEdsWatcherWhileInitialResourceFetchInProgress() {
EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1);
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
@ -2465,12 +2465,12 @@ public class XdsClientImplTestV2 {
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS);
EndpointWatcher watcher2 = mock(EndpointWatcher.class);
EndpointWatcher watcher3 = mock(EndpointWatcher.class);
EndpointWatcher watcher4 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3);
xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher4);
EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class);
EdsResourceWatcher watcher4 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3);
xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher4);
// Client sends a new EDS request for updating the latest resource subscription.
verify(requestObserver)
@ -2489,8 +2489,8 @@ public class XdsClientImplTestV2 {
verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com");
// The absence result is known immediately.
EndpointWatcher watcher5 = mock(EndpointWatcher.class);
xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher5);
EdsResourceWatcher watcher5 = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher5);
verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com");
assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
@ -2498,9 +2498,9 @@ public class XdsClientImplTestV2 {
// Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still
// in progress.
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher3);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher3);
assertThat(timeoutTask.isCancelled()).isFalse();
xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher4);
xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher4);
// Client sends an EDS request for resource subscription update (Omitted).
@ -2514,7 +2514,7 @@ public class XdsClientImplTestV2 {
@Test
public void cdsUpdateForEdsServiceNameChange() {
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
// Management server sends back a CDS response containing requested resource.
@ -2524,7 +2524,7 @@ public class XdsClientImplTestV2 {
buildDiscoveryResponseV2("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000");
responseObserver.onNext(response);
xdsClient.watchEndpointData("cluster-foo:service-bar", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo:service-bar", edsResourceWatcher);
// Management server sends back an EDS response for resource "cluster-foo:service-bar".
List<Any> clusterLoadAssignments = ImmutableList.of(
@ -2540,12 +2540,12 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000");
responseObserver.onNext(response);
ArgumentCaptor<EndpointUpdate> endpointUpdateCaptor = ArgumentCaptor.forClass(null);
verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture());
EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue();
assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar");
assertThat(endpointUpdate.getDropPolicies()).isEmpty();
assertThat(endpointUpdate.getLocalityLbEndpointsMap())
ArgumentCaptor<EdsUpdate> edsUpdateCaptor = ArgumentCaptor.forClass(null);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar");
assertThat(edsUpdate.getDropPolicies()).isEmpty();
assertThat(edsUpdate.getLocalityLbEndpointsMap())
.containsExactly(
new Locality("region1", "zone1", "subzone1"),
new LocalityLbEndpoints(
@ -2562,7 +2562,7 @@ public class XdsClientImplTestV2 {
responseObserver.onNext(response);
// Watcher get notification for endpoint resource "cluster-foo:service-bar" being deleted.
verify(endpointWatcher).onResourceDoesNotExist("cluster-foo:service-bar");
verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo:service-bar");
}
/**
@ -2748,7 +2748,7 @@ public class XdsClientImplTestV2 {
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Start watching cluster information.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher);
// Client sent first CDS request.
verify(requestObserver)
@ -2756,7 +2756,7 @@ public class XdsClientImplTestV2 {
XdsClientImpl.ADS_TYPE_URL_CDS_V2, "")));
// Start watching endpoint information.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher);
// Client sent first EDS request.
verify(requestObserver)
@ -2767,9 +2767,9 @@ public class XdsClientImplTestV2 {
responseObserver.onError(Status.UNKNOWN.asException());
verify(configWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(clusterWatcher).onError(statusCaptor.capture());
verify(cdsResourceWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(endpointWatcher).onError(statusCaptor.capture());
verify(edsResourceWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
// Resets backoff and retry immediately.
@ -2795,9 +2795,9 @@ public class XdsClientImplTestV2 {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(2)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(2)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2824,9 +2824,9 @@ public class XdsClientImplTestV2 {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(3)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(3)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2861,8 +2861,8 @@ public class XdsClientImplTestV2 {
// Management server closes the RPC stream.
responseObserver.onCompleted();
verify(configWatcher, times(4)).onError(any(Status.class));
verify(clusterWatcher, times(4)).onError(any(Status.class));
verify(endpointWatcher, times(4)).onError(any(Status.class));
verify(cdsResourceWatcher, times(4)).onError(any(Status.class));
verify(edsResourceWatcher, times(4)).onError(any(Status.class));
// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
@ -2886,9 +2886,9 @@ public class XdsClientImplTestV2 {
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(5)).onError(statusCaptor.capture());
verify(cdsResourceWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(5)).onError(statusCaptor.capture());
verify(edsResourceWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2955,7 +2955,7 @@ public class XdsClientImplTestV2 {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching cluster information while RPC stream is still in retry backoff.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(9L);
@ -2979,7 +2979,7 @@ public class XdsClientImplTestV2 {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// Start watching endpoint information while RPC stream is still in retry backoff.
xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(99L);
@ -3010,14 +3010,14 @@ public class XdsClientImplTestV2 {
// Client sent an CDS ACK request (Omitted).
// No longer interested in endpoint information after RPC resumes.
xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher);
xdsClient.cancelEdsResourceWatch("cluster.googleapis.com", edsResourceWatcher);
// Client updates EDS resource subscription immediately.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", ImmutableList.<String>of(),
XdsClientImpl.ADS_TYPE_URL_EDS_V2, "")));
// Become interested in endpoints of another cluster.
xdsClient.watchEndpointData("cluster2.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster2.googleapis.com", edsResourceWatcher);
// Client updates EDS resource subscription immediately.
verify(requestObserver)
.onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com",
@ -3049,8 +3049,8 @@ public class XdsClientImplTestV2 {
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
// No longer interested in previous cluster and endpoints in that cluster.
xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher);
xdsClient.cancelEndpointDataWatch("cluster2.googleapis.com", endpointWatcher);
xdsClient.cancelCdsResourceWatch("cluster.googleapis.com", cdsResourceWatcher);
xdsClient.cancelEdsResourceWatch("cluster2.googleapis.com", edsResourceWatcher);
// Retry after backoff.
fakeClock.forwardNanos(19L);
@ -3174,7 +3174,7 @@ public class XdsClientImplTestV2 {
// Client/server resumed LDS/RDS request/response (Omitted).
// Start watching cluster data.
xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher);
xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher);
ScheduledTask cdsRespTimeoutTask =
Iterables.getOnlyElement(
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
@ -3202,7 +3202,7 @@ public class XdsClientImplTestV2 {
.isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
// Start watching endpoint data.
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher);
ScheduledTask edsTimeoutTask =
Iterables.getOnlyElement(
fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));