diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index 8515a83214..8ad0b0de9a 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -35,8 +35,8 @@ import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.XdsClient.ClusterUpdate; -import io.grpc.xds.XdsClient.ClusterWatcher; +import io.grpc.xds.XdsClient.CdsResourceWatcher; +import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.internal.sds.SslContextProviderSupplier; @@ -177,19 +177,19 @@ final class CdsLoadBalancer extends LoadBalancer { } } - private final class CdsLbState implements ClusterWatcher { + private final class CdsLbState implements CdsResourceWatcher { private final ChannelSecurityLbHelper lbHelper = new ChannelSecurityLbHelper(); @Nullable LoadBalancer edsBalancer; private CdsLbState() { - xdsClient.watchClusterData(clusterName, this); + xdsClient.watchCdsResource(clusterName, this); logger.log(XdsLogLevel.INFO, "Started watcher for cluster {0} with xDS client {1}", clusterName, xdsClient); } @Override - public void onClusterChanged(ClusterUpdate newUpdate) { + public void onChanged(CdsUpdate newUpdate) { if (logger.isLoggable(XdsLogLevel.INFO)) { logger.log( XdsLogLevel.INFO, @@ -270,7 +270,7 @@ final class CdsLoadBalancer extends LoadBalancer { } void shutdown() { - xdsClient.cancelClusterDataWatch(clusterName, this); + xdsClient.cancelCdsResourceWatch(clusterName, this); logger.log(XdsLogLevel.INFO, "Cancelled watcher for cluster {0} with xDS client {1}", clusterName, xdsClient); if (edsBalancer != null) { diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index a02578530f..ece7baf742 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -45,8 +45,8 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; -import io.grpc.xds.XdsClient.EndpointUpdate; -import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.EdsResourceWatcher; +import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; @@ -146,7 +146,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { return new ChildLbState(helper); } - private final class ChildLbState extends LoadBalancer implements EndpointWatcher { + private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher { @Nullable private final LoadStatsStore loadStatsStore; private final DropHandlingLbHelper lbHelper; @@ -170,7 +170,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { logger.log( XdsLogLevel.INFO, "Start endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient); - xdsClient.watchEndpointData(resourceName, this); + xdsClient.watchEdsResource(resourceName, this); } @Override @@ -221,7 +221,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { xdsClient.cancelClientStatsReport(); xdsClient.removeClientStats(cluster, edsServiceName); } - xdsClient.cancelEndpointDataWatch(resourceName, this); + xdsClient.cancelEdsResourceWatch(resourceName, this); logger.log( XdsLogLevel.INFO, "Cancelled endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient); @@ -236,7 +236,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { } @Override - public void onEndpointChanged(EndpointUpdate update) { + public void onChanged(EdsUpdate update) { logger.log(XdsLogLevel.DEBUG, "Received endpoint update from xDS client {0}: {1}", xdsClient, update); if (logger.isLoggable(XdsLogLevel.INFO)) { diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 606a47f906..7ec7afdc8b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -206,12 +206,7 @@ abstract class XdsClient { } } - /** - * Data class containing the results of performing a resource discovery RPC via CDS protocol. - * The results include configurations for a single upstream cluster, such as endpoint discovery - * type, load balancing policy, connection timeout and etc. - */ - static final class ClusterUpdate { + static final class CdsUpdate { private final String clusterName; @Nullable private final String edsServiceName; @@ -220,7 +215,7 @@ abstract class XdsClient { private final String lrsServerName; private final UpstreamTlsContext upstreamTlsContext; - private ClusterUpdate( + private CdsUpdate( String clusterName, @Nullable String edsServiceName, String lbPolicy, @@ -295,7 +290,7 @@ abstract class XdsClient { if (o == null || getClass() != o.getClass()) { return false; } - ClusterUpdate that = (ClusterUpdate) o; + CdsUpdate that = (CdsUpdate) o; return Objects.equals(clusterName, that.clusterName) && Objects.equals(edsServiceName, that.edsServiceName) && Objects.equals(lbPolicy, that.lbPolicy) @@ -317,7 +312,6 @@ abstract class XdsClient { @Nullable private UpstreamTlsContext upstreamTlsContext; - // Use ClusterUpdate.newBuilder(). private Builder() { } @@ -346,29 +340,23 @@ abstract class XdsClient { return this; } - ClusterUpdate build() { + CdsUpdate build() { checkState(clusterName != null, "clusterName is not set"); checkState(lbPolicy != null, "lbPolicy is not set"); return - new ClusterUpdate( + new CdsUpdate( clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); } } } - /** - * Data class containing the results of performing a resource discovery RPC via EDS protocol. - * The results include endpoint addresses running the requested service, as well as - * configurations for traffic control such as drop overloads, inter-cluster load balancing - * policy and etc. - */ - static final class EndpointUpdate { + static final class EdsUpdate { private final String clusterName; private final Map localityLbEndpointsMap; private final List dropPolicies; - private EndpointUpdate( + private EdsUpdate( String clusterName, Map localityLbEndpoints, List dropPolicies) { @@ -407,7 +395,7 @@ abstract class XdsClient { if (o == null || getClass() != o.getClass()) { return false; } - EndpointUpdate that = (EndpointUpdate) o; + EdsUpdate that = (EdsUpdate) o; return Objects.equals(clusterName, that.clusterName) && Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap) && Objects.equals(dropPolicies, that.dropPolicies); @@ -434,7 +422,6 @@ abstract class XdsClient { private Map localityLbEndpointsMap = new LinkedHashMap<>(); private List dropPolicies = new ArrayList<>(); - // Use EndpointUpdate.newBuilder(). private Builder() { } @@ -453,10 +440,10 @@ abstract class XdsClient { return this; } - EndpointUpdate build() { + EdsUpdate build() { checkState(clusterName != null, "clusterName is not set"); return - new EndpointUpdate( + new EdsUpdate( clusterName, ImmutableMap.copyOf(localityLbEndpointsMap), ImmutableList.copyOf(dropPolicies)); @@ -539,6 +526,7 @@ abstract class XdsClient { /** * Config watcher interface. To be implemented by the xDS resolver. */ + // TODO(chengyuanzhang): delete me. interface ConfigWatcher extends ResourceWatcher { /** @@ -547,20 +535,14 @@ abstract class XdsClient { void onConfigChanged(ConfigUpdate update); } - /** - * Cluster watcher interface. - */ - interface ClusterWatcher extends ResourceWatcher { + interface CdsResourceWatcher extends ResourceWatcher { - void onClusterChanged(ClusterUpdate update); + void onChanged(CdsUpdate update); } - /** - * Endpoint watcher interface. - */ - interface EndpointWatcher extends ResourceWatcher { + interface EdsResourceWatcher extends ResourceWatcher { - void onEndpointChanged(EndpointUpdate update); + void onChanged(EdsUpdate update); } /** @@ -619,29 +601,27 @@ abstract class XdsClient { } /** - * Registers a data watcher for the given cluster. + * Registers a data watcher for the given CDS resource. */ - void watchClusterData(String clusterName, ClusterWatcher watcher) { + void watchCdsResource(String resourceName, CdsResourceWatcher watcher) { } /** - * Unregisters the given cluster watcher, which was registered to receive updates for the - * given cluster. + * Unregisters the given CDS resource watcher. */ - void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { + void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) { } /** - * Registers a data watcher for endpoints in the given cluster. + * Registers a data watcher for the given EDS resource. */ - void watchEndpointData(String clusterName, EndpointWatcher watcher) { + void watchEdsResource(String resourceName, EdsResourceWatcher watcher) { } /** - * Unregisters the given endpoints watcher, which was registered to receive updates for - * endpoints information in the given cluster. + * Unregisters the given EDS resource watcher. */ - void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { + void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { } /** diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index ba43da1796..b01b01acc7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -127,29 +127,29 @@ final class XdsClientImpl extends XdsClient { private Node node; // Cached data for CDS responses, keyed by cluster names. - // Optimization: cache ClusterUpdate, which contains only information needed by gRPC, instead + // Optimization: cache CdsUpdate, which contains only information needed by gRPC, instead // of whole Cluster messages to reduce memory usage. - private final Map clusterNamesToClusterUpdates = new HashMap<>(); + private final Map clusterNamesToCdsUpdates = new HashMap<>(); // Cached CDS resources that are known to be absent. private final Set absentCdsResources = new HashSet<>(); // Cached data for EDS responses, keyed by cluster names. // CDS responses indicate absence of clusters and EDS responses indicate presence of clusters. - // Optimization: cache EndpointUpdate, which contains only information needed by gRPC, instead + // Optimization: cache EdsUpdate, which contains only information needed by gRPC, instead // of whole ClusterLoadAssignment messages to reduce memory usage. - private final Map clusterNamesToEndpointUpdates = new HashMap<>(); + private final Map clusterNamesToEdsUpdates = new HashMap<>(); // Cached EDS resources that are known to be absent. private final Set absentEdsResources = new HashSet<>(); // Cluster watchers waiting for cluster information updates. Multiple cluster watchers // can watch on information for the same cluster. - private final Map> clusterWatchers = new HashMap<>(); + private final Map> cdsWatchers = new HashMap<>(); // Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint // watchers can watch endpoints in the same cluster. - private final Map> endpointWatchers = new HashMap<>(); + private final Map> edsWatchers = new HashMap<>(); // Resource fetch timers are used to conclude absence of resources. Each timer is activated when // subscription for the resource starts and disarmed on first update for the resource. @@ -281,27 +281,27 @@ final class XdsClientImpl extends XdsClient { } @Override - void watchClusterData(String clusterName, ClusterWatcher watcher) { - checkNotNull(clusterName, "clusterName"); + void watchCdsResource(String resourceName, CdsResourceWatcher watcher) { + checkNotNull(resourceName, "resourceName"); checkNotNull(watcher, "watcher"); boolean needRequest = false; - if (!clusterWatchers.containsKey(clusterName)) { - logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", clusterName); + if (!cdsWatchers.containsKey(resourceName)) { + logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", resourceName); needRequest = true; - clusterWatchers.put(clusterName, new HashSet()); + cdsWatchers.put(resourceName, new HashSet()); } - Set watchers = clusterWatchers.get(clusterName); - checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName); + Set watchers = cdsWatchers.get(resourceName); + checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName); watchers.add(watcher); // If local cache contains cluster information to be watched, notify the watcher immediately. - if (absentCdsResources.contains(clusterName)) { - logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", clusterName); - watcher.onResourceDoesNotExist(clusterName); + if (absentCdsResources.contains(resourceName)) { + logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", resourceName); + watcher.onResourceDoesNotExist(resourceName); return; } - if (clusterNamesToClusterUpdates.containsKey(clusterName)) { - logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", clusterName); - watcher.onClusterChanged(clusterNamesToClusterUpdates.get(clusterName)); + if (clusterNamesToCdsUpdates.containsKey(resourceName)) { + logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", resourceName); + watcher.onChanged(clusterNamesToCdsUpdates.get(resourceName)); return; } @@ -313,34 +313,34 @@ final class XdsClientImpl extends XdsClient { if (adsStream == null) { startRpcStream(); } - adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet()); + adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet()); ScheduledHandle timeoutHandle = syncContext .schedule( - new CdsResourceFetchTimeoutTask(clusterName), + new CdsResourceFetchTimeoutTask(resourceName), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); - cdsRespTimers.put(clusterName, timeoutHandle); + cdsRespTimers.put(resourceName, timeoutHandle); } } @Override - void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { + void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) { checkNotNull(watcher, "watcher"); - Set watchers = clusterWatchers.get(clusterName); + Set watchers = cdsWatchers.get(resourceName); checkState( watchers != null && watchers.contains(watcher), - "watcher for %s was not registered", clusterName); + "watcher for %s was not registered", resourceName); watchers.remove(watcher); if (watchers.isEmpty()) { - logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", clusterName); - clusterWatchers.remove(clusterName); + logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", resourceName); + cdsWatchers.remove(resourceName); // Remove the corresponding CDS entry. - absentCdsResources.remove(clusterName); - clusterNamesToClusterUpdates.remove(clusterName); + absentCdsResources.remove(resourceName); + clusterNamesToCdsUpdates.remove(resourceName); // Cancel and delete response timer waiting for the corresponding resource. - if (cdsRespTimers.containsKey(clusterName)) { - cdsRespTimers.get(clusterName).cancel(); - cdsRespTimers.remove(clusterName); + if (cdsRespTimers.containsKey(resourceName)) { + cdsRespTimers.get(resourceName).cancel(); + cdsRespTimers.remove(resourceName); } // No longer interested in this cluster, send an updated CDS request to unsubscribe // this resource. @@ -350,36 +350,36 @@ final class XdsClientImpl extends XdsClient { } checkState(adsStream != null, "Severe bug: ADS stream was not created while an endpoint watcher was registered"); - adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet()); + adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet()); } } @Override - void watchEndpointData(String clusterName, EndpointWatcher watcher) { + void watchEdsResource(String resourceName, EdsResourceWatcher watcher) { checkNotNull(watcher, "watcher"); boolean needRequest = false; - if (!endpointWatchers.containsKey(clusterName)) { - logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", clusterName); + if (!edsWatchers.containsKey(resourceName)) { + logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", resourceName); needRequest = true; - endpointWatchers.put(clusterName, new HashSet()); + edsWatchers.put(resourceName, new HashSet()); } - Set watchers = endpointWatchers.get(clusterName); - checkState(!watchers.contains(watcher), "watcher for %s already registered", clusterName); + Set watchers = edsWatchers.get(resourceName); + checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName); watchers.add(watcher); // If local cache contains endpoint information for the cluster to be watched, notify // the watcher immediately. - if (absentEdsResources.contains(clusterName)) { + if (absentEdsResources.contains(resourceName)) { logger.log( XdsLogLevel.DEBUG, - "Endpoint resource for cluster {0} is known to be absent.", clusterName); - watcher.onResourceDoesNotExist(clusterName); + "Endpoint resource for cluster {0} is known to be absent.", resourceName); + watcher.onResourceDoesNotExist(resourceName); return; } - if (clusterNamesToEndpointUpdates.containsKey(clusterName)) { + if (clusterNamesToEdsUpdates.containsKey(resourceName)) { logger.log( XdsLogLevel.DEBUG, - "Retrieve endpoints info for cluster {0} from local cache.", clusterName); - watcher.onEndpointChanged(clusterNamesToEndpointUpdates.get(clusterName)); + "Retrieve endpoints info for cluster {0} from local cache.", resourceName); + watcher.onChanged(clusterNamesToEdsUpdates.get(resourceName)); return; } @@ -391,34 +391,34 @@ final class XdsClientImpl extends XdsClient { if (adsStream == null) { startRpcStream(); } - adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet()); + adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet()); ScheduledHandle timeoutHandle = syncContext .schedule( - new EdsResourceFetchTimeoutTask(clusterName), + new EdsResourceFetchTimeoutTask(resourceName), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); - edsRespTimers.put(clusterName, timeoutHandle); + edsRespTimers.put(resourceName, timeoutHandle); } } @Override - void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { + void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { checkNotNull(watcher, "watcher"); - Set watchers = endpointWatchers.get(clusterName); + Set watchers = edsWatchers.get(resourceName); checkState( watchers != null && watchers.contains(watcher), - "watcher for %s was not registered", clusterName); + "watcher for %s was not registered", resourceName); watchers.remove(watcher); if (watchers.isEmpty()) { - logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", clusterName); - endpointWatchers.remove(clusterName); + logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", resourceName); + edsWatchers.remove(resourceName); // Remove the corresponding EDS cache entry. - absentEdsResources.remove(clusterName); - clusterNamesToEndpointUpdates.remove(clusterName); + absentEdsResources.remove(resourceName); + clusterNamesToEdsUpdates.remove(resourceName); // Cancel and delete response timer waiting for the corresponding resource. - if (edsRespTimers.containsKey(clusterName)) { - edsRespTimers.get(clusterName).cancel(); - edsRespTimers.remove(clusterName); + if (edsRespTimers.containsKey(resourceName)) { + edsRespTimers.get(resourceName).cancel(); + edsRespTimers.remove(resourceName); } // No longer interested in this cluster, send an updated EDS request to unsubscribe // this resource. @@ -426,7 +426,7 @@ final class XdsClientImpl extends XdsClient { // Currently in retry backoff. return; } - adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet()); + adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet()); } } @@ -949,7 +949,7 @@ final class XdsClientImpl extends XdsClient { } catch (InvalidProtocolBufferException e) { logger.log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e); adsStream.sendNackRequest( - ResourceType.CDS, clusterWatchers.keySet(), + ResourceType.CDS, cdsWatchers.keySet(), cdsResponse.getVersionInfo(), "Malformed CDS response: " + e); return; } @@ -957,7 +957,7 @@ final class XdsClientImpl extends XdsClient { String errorMessage = null; // Cluster information update for requested clusters received in this CDS response. - Map clusterUpdates = new HashMap<>(); + Map cdsUpdates = new HashMap<>(); // CDS responses represents the state of the world, EDS services not referenced by // Clusters are those no longer exist. Set edsServices = new HashSet<>(); @@ -967,10 +967,10 @@ final class XdsClientImpl extends XdsClient { // Management server is required to always send newly requested resources, even if they // may have been sent previously (proactively). Thus, client does not need to cache // unrequested resources. - if (!clusterWatchers.containsKey(clusterName)) { + if (!cdsWatchers.containsKey(clusterName)) { continue; } - ClusterUpdate.Builder updateBuilder = ClusterUpdate.newBuilder(); + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder(); updateBuilder.setClusterName(clusterName); // The type field must be set to EDS. if (!cluster.getType().equals(DiscoveryType.EDS)) { @@ -1020,48 +1020,48 @@ final class XdsClientImpl extends XdsClient { errorMessage = "Cluster " + clusterName + " : " + e.getMessage(); break; } - clusterUpdates.put(clusterName, updateBuilder.build()); + cdsUpdates.put(clusterName, updateBuilder.build()); } if (errorMessage != null) { adsStream.sendNackRequest( ResourceType.CDS, - clusterWatchers.keySet(), + cdsWatchers.keySet(), cdsResponse.getVersionInfo(), errorMessage); return; } - adsStream.sendAckRequest(ResourceType.CDS, clusterWatchers.keySet(), + adsStream.sendAckRequest(ResourceType.CDS, cdsWatchers.keySet(), cdsResponse.getVersionInfo()); // Update local CDS cache with data in this response. - absentCdsResources.removeAll(clusterUpdates.keySet()); - for (Map.Entry entry : clusterNamesToClusterUpdates.entrySet()) { - if (!clusterUpdates.containsKey(entry.getKey())) { + absentCdsResources.removeAll(cdsUpdates.keySet()); + for (Map.Entry entry : clusterNamesToCdsUpdates.entrySet()) { + if (!cdsUpdates.containsKey(entry.getKey())) { // Some previously existing resource no longer exists. absentCdsResources.add(entry.getKey()); - } else if (clusterUpdates.get(entry.getKey()).equals(entry.getValue())) { - clusterUpdates.remove(entry.getKey()); + } else if (cdsUpdates.get(entry.getKey()).equals(entry.getValue())) { + cdsUpdates.remove(entry.getKey()); } } - clusterNamesToClusterUpdates.keySet().removeAll(absentCdsResources); - clusterNamesToClusterUpdates.putAll(clusterUpdates); + clusterNamesToCdsUpdates.keySet().removeAll(absentCdsResources); + clusterNamesToCdsUpdates.putAll(cdsUpdates); // Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response. - for (String clusterName : clusterNamesToEndpointUpdates.keySet()) { + for (String clusterName : clusterNamesToEdsUpdates.keySet()) { if (!edsServices.contains(clusterName)) { absentEdsResources.add(clusterName); // Notify EDS resource removal to watchers. - if (endpointWatchers.containsKey(clusterName)) { - Set watchers = endpointWatchers.get(clusterName); - for (EndpointWatcher watcher : watchers) { + if (edsWatchers.containsKey(clusterName)) { + Set watchers = edsWatchers.get(clusterName); + for (EdsResourceWatcher watcher : watchers) { watcher.onResourceDoesNotExist(clusterName); } } } } - clusterNamesToEndpointUpdates.keySet().retainAll(edsServices); + clusterNamesToEdsUpdates.keySet().retainAll(edsServices); - for (String clusterName : clusterUpdates.keySet()) { + for (String clusterName : cdsUpdates.keySet()) { if (cdsRespTimers.containsKey(clusterName)) { cdsRespTimers.get(clusterName).cancel(); cdsRespTimers.remove(clusterName); @@ -1069,17 +1069,17 @@ final class XdsClientImpl extends XdsClient { } // Notify watchers if clusters interested in present in this CDS response. - for (Map.Entry> entry : clusterWatchers.entrySet()) { + for (Map.Entry> entry : cdsWatchers.entrySet()) { String clusterName = entry.getKey(); - if (clusterUpdates.containsKey(entry.getKey())) { - ClusterUpdate clusterUpdate = clusterUpdates.get(clusterName); - for (ClusterWatcher watcher : entry.getValue()) { - watcher.onClusterChanged(clusterUpdate); + if (cdsUpdates.containsKey(entry.getKey())) { + CdsUpdate cdsUpdate = cdsUpdates.get(clusterName); + for (CdsResourceWatcher watcher : entry.getValue()) { + watcher.onChanged(cdsUpdate); } - } else if (!clusterNamesToClusterUpdates.containsKey(entry.getKey()) + } else if (!clusterNamesToCdsUpdates.containsKey(entry.getKey()) && !cdsRespTimers.containsKey(clusterName)) { // Update for previously present resource being removed. - for (ClusterWatcher watcher : entry.getValue()) { + for (CdsResourceWatcher watcher : entry.getValue()) { watcher.onResourceDoesNotExist(entry.getKey()); } } @@ -1124,7 +1124,7 @@ final class XdsClientImpl extends XdsClient { logger.log( XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e); adsStream.sendNackRequest( - ResourceType.EDS, endpointWatchers.keySet(), + ResourceType.EDS, edsWatchers.keySet(), edsResponse.getVersionInfo(), "Malformed EDS response: " + e); return; } @@ -1132,7 +1132,7 @@ final class XdsClientImpl extends XdsClient { String errorMessage = null; // Endpoint information updates for requested clusters received in this EDS response. - Map endpointUpdates = new HashMap<>(); + Map edsUpdates = new HashMap<>(); // Walk through each ClusterLoadAssignment message. If any of them for requested clusters // contain invalid information for gRPC's load balancing usage, the whole response is rejected. for (ClusterLoadAssignment assignment : clusterLoadAssignments) { @@ -1141,10 +1141,10 @@ final class XdsClientImpl extends XdsClient { // Management server is required to always send newly requested resources, even if they // may have been sent previously (proactively). Thus, client does not need to cache // unrequested resources. - if (!endpointWatchers.containsKey(clusterName)) { + if (!edsWatchers.containsKey(clusterName)) { continue; } - EndpointUpdate.Builder updateBuilder = EndpointUpdate.newBuilder(); + EdsUpdate.Builder updateBuilder = EdsUpdate.newBuilder(); updateBuilder.setClusterName(clusterName); Set priorities = new HashSet<>(); int maxPriority = -1; @@ -1192,37 +1192,37 @@ final class XdsClientImpl extends XdsClient { : assignment.getPolicy().getDropOverloadsList()) { updateBuilder.addDropPolicy(DropOverload.fromEnvoyProtoDropOverload(dropOverload)); } - EndpointUpdate update = updateBuilder.build(); - endpointUpdates.put(clusterName, update); + EdsUpdate update = updateBuilder.build(); + edsUpdates.put(clusterName, update); } if (errorMessage != null) { adsStream.sendNackRequest( ResourceType.EDS, - endpointWatchers.keySet(), + edsWatchers.keySet(), edsResponse.getVersionInfo(), errorMessage); return; } - adsStream.sendAckRequest(ResourceType.EDS, endpointWatchers.keySet(), + adsStream.sendAckRequest(ResourceType.EDS, edsWatchers.keySet(), edsResponse.getVersionInfo()); // Update local EDS cache by inserting updated endpoint information. - clusterNamesToEndpointUpdates.putAll(endpointUpdates); - absentEdsResources.removeAll(endpointUpdates.keySet()); + clusterNamesToEdsUpdates.putAll(edsUpdates); + absentEdsResources.removeAll(edsUpdates.keySet()); // Notify watchers waiting for updates of endpoint information received in this EDS response. // Based on xDS protocol, the management server should not send endpoint data again if // nothing has changed. - for (Map.Entry entry : endpointUpdates.entrySet()) { + for (Map.Entry entry : edsUpdates.entrySet()) { String clusterName = entry.getKey(); // Cancel and delete response timeout timer. if (edsRespTimers.containsKey(clusterName)) { edsRespTimers.get(clusterName).cancel(); edsRespTimers.remove(clusterName); } - if (endpointWatchers.containsKey(clusterName)) { - for (EndpointWatcher watcher : endpointWatchers.get(clusterName)) { - watcher.onEndpointChanged(entry.getValue()); + if (edsWatchers.containsKey(clusterName)) { + for (EdsResourceWatcher watcher : edsWatchers.get(clusterName)) { + watcher.onChanged(entry.getValue()); } } } @@ -1249,9 +1249,9 @@ final class XdsClientImpl extends XdsClient { new ListenerResourceFetchTimeoutTask(":" + listenerPort), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService); } - if (!clusterWatchers.isEmpty()) { - adsStream.sendXdsRequest(ResourceType.CDS, clusterWatchers.keySet()); - for (String clusterName : clusterWatchers.keySet()) { + if (!cdsWatchers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet()); + for (String clusterName : cdsWatchers.keySet()) { ScheduledHandle timeoutHandle = syncContext .schedule( @@ -1260,9 +1260,9 @@ final class XdsClientImpl extends XdsClient { cdsRespTimers.put(clusterName, timeoutHandle); } } - if (!endpointWatchers.isEmpty()) { - adsStream.sendXdsRequest(ResourceType.EDS, endpointWatchers.keySet()); - for (String clusterName : endpointWatchers.keySet()) { + if (!edsWatchers.isEmpty()) { + adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet()); + for (String clusterName : edsWatchers.keySet()) { ScheduledHandle timeoutHandle = syncContext .schedule( @@ -1521,13 +1521,13 @@ final class XdsClientImpl extends XdsClient { if (listenerWatcher != null) { listenerWatcher.onError(error); } - for (Set watchers : clusterWatchers.values()) { - for (ClusterWatcher watcher : watchers) { + for (Set watchers : cdsWatchers.values()) { + for (CdsResourceWatcher watcher : watchers) { watcher.onError(error); } } - for (Set watchers : endpointWatchers.values()) { - for (EndpointWatcher watcher : watchers) { + for (Set watchers : edsWatchers.values()) { + for (EdsResourceWatcher watcher : watchers) { watcher.onError(error); } } @@ -1910,7 +1910,7 @@ final class XdsClientImpl extends XdsClient { super.run(); cdsRespTimers.remove(resourceName); absentCdsResources.add(resourceName); - for (ClusterWatcher wat : clusterWatchers.get(resourceName)) { + for (CdsResourceWatcher wat : cdsWatchers.get(resourceName)) { wat.onResourceDoesNotExist(resourceName); } } @@ -1928,7 +1928,7 @@ final class XdsClientImpl extends XdsClient { super.run(); edsRespTimers.remove(resourceName); absentEdsResources.add(resourceName); - for (EndpointWatcher wat : endpointWatchers.get(resourceName)) { + for (EdsResourceWatcher wat : edsWatchers.get(resourceName)) { wat.onResourceDoesNotExist(resourceName); } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index d7570725e0..831ff06d5c 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -306,17 +306,17 @@ public class CdsLoadBalancerTest { } private final class FakeXdsClient extends XdsClient { - private ClusterWatcher watcher; + private CdsResourceWatcher watcher; @Override - void watchClusterData(String clusterName, ClusterWatcher watcher) { - assertThat(clusterName).isEqualTo(CLUSTER); + void watchCdsResource(String resourceName, CdsResourceWatcher watcher) { + assertThat(resourceName).isEqualTo(CLUSTER); this.watcher = watcher; } @Override - void cancelClusterDataWatch(String clusterName, ClusterWatcher watcher) { - assertThat(clusterName).isEqualTo(CLUSTER); + void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) { + assertThat(resourceName).isEqualTo(CLUSTER); assertThat(watcher).isSameInstanceAs(this.watcher); this.watcher = null; } @@ -331,8 +331,8 @@ public class CdsLoadBalancerTest { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onClusterChanged( - ClusterUpdate.newBuilder() + watcher.onChanged( + CdsUpdate.newBuilder() .setClusterName(CLUSTER) .setEdsServiceName(edsServiceName) .setLbPolicy("round_robin") // only supported policy @@ -348,8 +348,8 @@ public class CdsLoadBalancerTest { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onClusterChanged( - ClusterUpdate.newBuilder() + watcher.onChanged( + CdsUpdate.newBuilder() .setClusterName(CLUSTER) .setEdsServiceName(edsServiceName) .setLbPolicy("round_robin") // only supported policy diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java index b964c06157..ecb83c9460 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java @@ -711,7 +711,7 @@ public class EdsLoadBalancer2Test { } private final class FakeXdsClient extends XdsClient { - private final Map watchers = new HashMap<>(); + private final Map watchers = new HashMap<>(); private final Map> dropStats = new HashMap<>(); @Override @@ -720,13 +720,13 @@ public class EdsLoadBalancer2Test { } @Override - void watchEndpointData(String clusterName, EndpointWatcher watcher) { - watchers.put(clusterName, watcher); + void watchEdsResource(String resourceName, EdsResourceWatcher watcher) { + watchers.put(resourceName, watcher); } @Override - void cancelEndpointDataWatch(String clusterName, EndpointWatcher watcher) { - watchers.remove(clusterName); + void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) { + watchers.remove(resourceName); } @Override @@ -755,14 +755,14 @@ public class EdsLoadBalancer2Test { @Override public void run() { if (watchers.containsKey(resource)) { - EndpointUpdate.Builder builder = EndpointUpdate.newBuilder().setClusterName(resource); + EdsUpdate.Builder builder = EdsUpdate.newBuilder().setClusterName(resource); for (DropOverload dropOverload : dropOverloads) { builder.addDropPolicy(dropOverload); } for (Locality locality : localityLbEndpointsMap.keySet()) { builder.addLocalityLbEndpoints(locality, localityLbEndpointsMap.get(locality)); } - watchers.get(resource).onEndpointChanged(builder.build()); + watchers.get(resource).onChanged(builder.build()); } } }); @@ -783,7 +783,7 @@ public class EdsLoadBalancer2Test { syncContext.execute(new Runnable() { @Override public void run() { - for (EndpointWatcher watcher : watchers.values()) { + for (EdsResourceWatcher watcher : watchers.values()) { watcher.onError(error); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index f75b67fccd..346c723062 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -91,12 +91,12 @@ import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.ClusterUpdate; -import io.grpc.xds.XdsClient.ClusterWatcher; +import io.grpc.xds.XdsClient.CdsResourceWatcher; +import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; -import io.grpc.xds.XdsClient.EndpointUpdate; -import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.EdsResourceWatcher; +import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.XdsChannel; import io.grpc.xds.XdsClientImpl.MessagePrinter; import java.io.IOException; @@ -206,9 +206,9 @@ public class XdsClientImplTest { @Mock private ConfigWatcher configWatcher; @Mock - private ClusterWatcher clusterWatcher; + private CdsResourceWatcher cdsResourceWatcher; @Mock - private EndpointWatcher endpointWatcher; + private EdsResourceWatcher edsResourceWatcher; private ManagedChannel channel; private XdsClientImpl xdsClient; @@ -1337,7 +1337,7 @@ public class XdsClientImplTest { */ @Test public void cdsResponseWithoutMatchingResource() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1359,12 +1359,12 @@ public class XdsClientImplTest { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); - verify(clusterWatcher, never()).onClusterChanged(any(ClusterUpdate.class)); - verify(clusterWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); - verify(clusterWatcher, never()).onError(any(Status.class)); + verify(cdsResourceWatcher, never()).onChanged(any(CdsUpdate.class)); + verify(cdsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -1374,7 +1374,7 @@ public class XdsClientImplTest { */ @Test public void cdsResponseWithMatchingResource() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1401,13 +1401,13 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); assertThat(cdsRespTimer.isCancelled()).isTrue(); - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()).isNull(); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); // Management server sends back another CDS response updating the requested Cluster. clusters = ImmutableList.of( @@ -1424,13 +1424,13 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "1", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); - verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture()); - clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()) + verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()) .isEqualTo("eds-cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); } /** @@ -1438,7 +1438,7 @@ public class XdsClientImplTest { */ @Test public void cdsResponseWithUpstreamTlsContext() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1458,10 +1458,10 @@ public class XdsClientImplTest { verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher, times(1)).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = clusterUpdate + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate .getUpstreamTlsContext(); SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() .getValidationContextSdsSecretConfig(); @@ -1478,13 +1478,13 @@ public class XdsClientImplTest { } @Test - public void multipleClusterWatchers() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); + public void multipleCdsWatchers() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1516,25 +1516,25 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); // Two watchers get notification of cluster update for the cluster they are interested in. - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()).isNull(); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()).isNull(); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isNull(); - verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class)); + verify(watcher3, never()).onChanged(any(CdsUpdate.class)); verify(watcher3, never()).onResourceDoesNotExist("cluster-bar.googleapis.com"); verify(watcher3, never()).onError(any(Status.class)); @@ -1563,14 +1563,14 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); verifyNoMoreInteractions(watcher1, watcher2); // resource has no change - ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); - ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); - assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(clusterUpdate3.getEdsServiceName()) + ArgumentCaptor cdsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(cdsUpdateCaptor3.capture()); + CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue(); + assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(cdsUpdate3.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); - assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate3.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate3.getLrsServerName()).isEqualTo(""); } /** @@ -1580,8 +1580,8 @@ public class XdsClientImplTest { */ @Test public void watchClusterAlreadyBeingWatched() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -1606,28 +1606,28 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // Another cluster watcher interested in the same cluster is added. - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); // Since the client has received cluster update for this cluster before, cached result is // notified to the newly added watcher immediately. - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()).isNull(); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()).isNull(); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isNull(); verifyNoMoreInteractions(requestObserver); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -1637,9 +1637,9 @@ public class XdsClientImplTest { * Basic operations of adding/canceling cluster data watchers. */ @Test - public void addRemoveClusterWatchers() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + public void addRemoveCdsWatchers() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -1663,17 +1663,17 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); // Add another cluster watcher for a different cluster. - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher2); // Client sent a new CDS request for all interested resources. verify(requestObserver) @@ -1701,17 +1701,17 @@ public class XdsClientImplTest { ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); verifyNoMoreInteractions(watcher1); // resource has no change - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()) + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isEqualTo(""); // Cancel one of the watcher. - xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); + xdsClient.cancelCdsResourceWatch("cluster-foo.googleapis.com", watcher1); // Since the cancelled watcher was the last watcher interested in that cluster (but there // is still interested resource), client sent an new CDS request to unsubscribe from @@ -1723,7 +1723,7 @@ public class XdsClientImplTest { // Management server has nothing to respond. // Cancel the other watcher. All resources have been unsubscribed. - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher2); verify(requestObserver) .onNext( @@ -1750,9 +1750,9 @@ public class XdsClientImplTest { verifyNoMoreInteractions(watcher1, watcher2); // A new cluster watcher is added to watch cluster foo again. - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3); - verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class)); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher3); + verify(watcher3, never()).onChanged(any(CdsUpdate.class)); // A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only. verify(requestObserver) @@ -1770,13 +1770,13 @@ public class XdsClientImplTest { responseObserver.onNext(response); // Notified with cached data immediately. - ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); - ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); - assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate3.getEdsServiceName()).isNull(); - assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + ArgumentCaptor cdsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(cdsUpdateCaptor3.capture()); + CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue(); + assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate3.getEdsServiceName()).isNull(); + assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isEqualTo(""); verifyNoMoreInteractions(watcher1, watcher2); @@ -1787,9 +1787,9 @@ public class XdsClientImplTest { } @Test - public void addRemoveClusterWatcherWhileInitialResourceFetchInProgress() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + public void addRemoveCdsWatcherWhileInitialResourceFetchInProgress() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver requestObserver = requestObservers.poll(); @@ -1804,12 +1804,12 @@ public class XdsClientImplTest { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS); - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - ClusterWatcher watcher4 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher4); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher4 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher4); // Client sends a new CDS request for updating the latest resource subscription. verify(requestObserver) @@ -1828,8 +1828,8 @@ public class XdsClientImplTest { verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com"); // The absence result is known immediately. - ClusterWatcher watcher5 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher5); + CdsResourceWatcher watcher5 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher5); verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); @@ -1837,9 +1837,9 @@ public class XdsClientImplTest { // Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still // in progress. - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher3); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher3); assertThat(timeoutTask.isCancelled()).isFalse(); - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher4); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher4); // Client sends a CDS request for resource subscription update (Omitted). @@ -1853,7 +1853,7 @@ public class XdsClientImplTest { @Test public void cdsUpdateForClusterBeingRemoved() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1871,13 +1871,13 @@ public class XdsClientImplTest { // Client sent an ACK CDS request (Omitted). - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()).isNull(); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // No cluster is available. @@ -1886,7 +1886,7 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); responseObserver.onNext(response); - verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); } /** @@ -1898,7 +1898,7 @@ public class XdsClientImplTest { */ @Test public void edsResponseWithoutMatchingResource() { - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1936,11 +1936,11 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); - verify(endpointWatcher, never()).onEndpointChanged(any(EndpointUpdate.class)); - verify(endpointWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); - verify(endpointWatcher, never()).onError(any(Status.class)); + verify(edsResourceWatcher, never()).onChanged(any(EdsUpdate.class)); + verify(edsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(edsResourceWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(endpointWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -1950,7 +1950,7 @@ public class XdsClientImplTest { */ @Test public void edsResponseWithMatchingResource() { - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2002,15 +2002,15 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); - ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null); - verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture()); - EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate.getDropPolicies()) + ArgumentCaptor edsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate.getDropPolicies()) .containsExactly( new DropOverload("lb", 200), new DropOverload("throttle", 1000)); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()) + assertThat(edsUpdate.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2035,21 +2035,21 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "1", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); - verify(endpointWatcher, times(2)).onEndpointChanged(endpointUpdateCaptor.capture()); - endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()).isEmpty(); + verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture()); + edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()).isEmpty(); } @Test - public void multipleEndpointWatchers() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + public void multipleEdsWatchers() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2091,11 +2091,11 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); // Two watchers get notification of endpoint update for the cluster they are interested in. - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2103,11 +2103,11 @@ public class XdsClientImplTest { new LbEndpoint("192.168.0.1", 8080, 2, true)), 1, 0)); - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2141,11 +2141,11 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); // The corresponding watcher gets notified. - ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); - EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); - assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(edsUpdateCaptor3.capture()); + EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue(); + assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate3.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region2", "zone2", "subzone2"), new LocalityLbEndpoints( @@ -2161,8 +2161,8 @@ public class XdsClientImplTest { */ @Test public void watchEndpointsForClusterAlreadyBeingWatched() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2196,12 +2196,12 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getDropPolicies()).isEmpty(); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2210,17 +2210,17 @@ public class XdsClientImplTest { 2, true)), 1, 0)); // A second endpoint watcher is registered for endpoints in the same cluster. - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); // Cached endpoint information is notified to the new watcher immediately, without sending // another EDS request. - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate2.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate2.getDropPolicies()).isEmpty(); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2236,9 +2236,9 @@ public class XdsClientImplTest { * Basic operations of adding/canceling endpoint data watchers. */ @Test - public void addRemoveEndpointWatchers() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + public void addRemoveEdsWatchers() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -2271,11 +2271,11 @@ public class XdsClientImplTest { .onNext(eq(buildDiscoveryRequest(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS, "0000"))); - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2285,8 +2285,8 @@ public class XdsClientImplTest { 1, 0)); // Add another endpoint watcher for a different cluster. - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher2); // Client sent a new EDS request for all interested resources. verify(requestObserver) @@ -2319,11 +2319,11 @@ public class XdsClientImplTest { ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), XdsClientImpl.ADS_TYPE_URL_EDS, "0001"))); - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region2", "zone2", "subzone2"), new LocalityLbEndpoints( @@ -2332,7 +2332,7 @@ public class XdsClientImplTest { 6, 0)); // Cancel one of the watcher. - xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1); + xdsClient.cancelEdsResourceWatch("cluster-foo.googleapis.com", watcher1); // Since the cancelled watcher was the last watcher interested in that cluster, client // sent an new EDS request to unsubscribe from that cluster. @@ -2343,7 +2343,7 @@ public class XdsClientImplTest { // Management server should not respond as it had previously sent the requested resource. // Cancel the other watcher. - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher2); // Since the cancelled watcher was the last watcher interested in that cluster, client // sent an new EDS request to unsubscribe from that cluster. @@ -2389,13 +2389,13 @@ public class XdsClientImplTest { verifyNoMoreInteractions(watcher1, watcher2); // A new endpoint watcher is added to watch an old but was no longer interested in cluster. - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); // Nothing should be notified to the new watcher as we are still waiting management server's // latest response. // Cached endpoint data should have been purged. - verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class)); + verify(watcher3, never()).onChanged(any(EdsUpdate.class)); // An EDS request is sent to re-subscribe the cluster again. verify(requestObserver) @@ -2416,11 +2416,11 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_EDS, "0003"); responseObserver.onNext(response); - ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); - EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); - assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(edsUpdateCaptor3.capture()); + EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue(); + assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate3.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region4", "zone4", "subzone4"), new LocalityLbEndpoints( @@ -2438,9 +2438,9 @@ public class XdsClientImplTest { } @Test - public void addRemoveEndpointWatcherWhileInitialResourceFetchInProgress() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + public void addRemoveEdsWatcherWhileInitialResourceFetchInProgress() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver requestObserver = requestObservers.poll(); @@ -2455,12 +2455,12 @@ public class XdsClientImplTest { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS); - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - EndpointWatcher watcher4 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher4); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher4 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher4); // Client sends a new EDS request for updating the latest resource subscription. verify(requestObserver) @@ -2479,8 +2479,8 @@ public class XdsClientImplTest { verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com"); // The absence result is known immediately. - EndpointWatcher watcher5 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher5); + EdsResourceWatcher watcher5 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher5); verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); @@ -2488,9 +2488,9 @@ public class XdsClientImplTest { // Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still // in progress. - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher3); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher3); assertThat(timeoutTask.isCancelled()).isFalse(); - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher4); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher4); // Client sends an EDS request for resource subscription update (Omitted). @@ -2504,7 +2504,7 @@ public class XdsClientImplTest { @Test public void cdsUpdateForEdsServiceNameChange() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); // Management server sends back a CDS response containing requested resource. @@ -2514,7 +2514,7 @@ public class XdsClientImplTest { buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); responseObserver.onNext(response); - xdsClient.watchEndpointData("cluster-foo:service-bar", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo:service-bar", edsResourceWatcher); // Management server sends back an EDS response for resource "cluster-foo:service-bar". List clusterLoadAssignments = ImmutableList.of( @@ -2530,12 +2530,12 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_EDS, "0000"); responseObserver.onNext(response); - ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null); - verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture()); - EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar"); - assertThat(endpointUpdate.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar"); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2552,7 +2552,7 @@ public class XdsClientImplTest { responseObserver.onNext(response); // Watcher get notification for endpoint resource "cluster-foo:service-bar" being deleted. - verify(endpointWatcher).onResourceDoesNotExist("cluster-foo:service-bar"); + verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo:service-bar"); } /** @@ -2737,7 +2737,7 @@ public class XdsClientImplTest { ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); // Start watching cluster information. - xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher); // Client sent first CDS request. verify(requestObserver) @@ -2745,7 +2745,7 @@ public class XdsClientImplTest { XdsClientImpl.ADS_TYPE_URL_CDS, ""))); // Start watching endpoint information. - xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher); // Client sent first EDS request. verify(requestObserver) @@ -2756,9 +2756,9 @@ public class XdsClientImplTest { responseObserver.onError(Status.UNKNOWN.asException()); verify(configWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); - verify(clusterWatcher).onError(statusCaptor.capture()); + verify(cdsResourceWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); - verify(endpointWatcher).onError(statusCaptor.capture()); + verify(edsResourceWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); // Resets backoff and retry immediately. @@ -2784,9 +2784,9 @@ public class XdsClientImplTest { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(2)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(2)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2813,9 +2813,9 @@ public class XdsClientImplTest { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(3)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(3)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2850,8 +2850,8 @@ public class XdsClientImplTest { // Management server closes the RPC stream. responseObserver.onCompleted(); verify(configWatcher, times(4)).onError(any(Status.class)); - verify(clusterWatcher, times(4)).onError(any(Status.class)); - verify(endpointWatcher, times(4)).onError(any(Status.class)); + verify(cdsResourceWatcher, times(4)).onError(any(Status.class)); + verify(edsResourceWatcher, times(4)).onError(any(Status.class)); // Resets backoff and retry immediately inOrder.verify(backoffPolicyProvider).get(); @@ -2875,9 +2875,9 @@ public class XdsClientImplTest { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(5)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(5)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy2).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2944,7 +2944,7 @@ public class XdsClientImplTest { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // Start watching cluster information while RPC stream is still in retry backoff. - xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(9L); @@ -2968,7 +2968,7 @@ public class XdsClientImplTest { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // Start watching endpoint information while RPC stream is still in retry backoff. - xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(99L); @@ -2999,14 +2999,14 @@ public class XdsClientImplTest { // Client sent an CDS ACK request (Omitted). // No longer interested in endpoint information after RPC resumes. - xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher); + xdsClient.cancelEdsResourceWatch("cluster.googleapis.com", edsResourceWatcher); // Client updates EDS resource subscription immediately. verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", ImmutableList.of(), XdsClientImpl.ADS_TYPE_URL_EDS, ""))); // Become interested in endpoints of another cluster. - xdsClient.watchEndpointData("cluster2.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster2.googleapis.com", edsResourceWatcher); // Client updates EDS resource subscription immediately. verify(requestObserver) .onNext(eq(buildDiscoveryRequest(NODE, "", "cluster2.googleapis.com", @@ -3038,8 +3038,8 @@ public class XdsClientImplTest { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // No longer interested in previous cluster and endpoints in that cluster. - xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher); - xdsClient.cancelEndpointDataWatch("cluster2.googleapis.com", endpointWatcher); + xdsClient.cancelCdsResourceWatch("cluster.googleapis.com", cdsResourceWatcher); + xdsClient.cancelEdsResourceWatch("cluster2.googleapis.com", edsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(19L); @@ -3162,7 +3162,7 @@ public class XdsClientImplTest { // Client/server resumed LDS/RDS request/response (Omitted). // Start watching cluster data. - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); ScheduledTask cdsRespTimeoutTask = Iterables.getOnlyElement( fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); @@ -3190,7 +3190,7 @@ public class XdsClientImplTest { .isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); // Start watching endpoint data. - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); ScheduledTask edsTimeoutTask = Iterables.getOnlyElement( fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java index 17351d00d1..e04cd739ab 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestV2.java @@ -91,12 +91,12 @@ import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.ClusterUpdate; -import io.grpc.xds.XdsClient.ClusterWatcher; +import io.grpc.xds.XdsClient.CdsResourceWatcher; +import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; -import io.grpc.xds.XdsClient.EndpointUpdate; -import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.EdsResourceWatcher; +import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.XdsChannel; import io.grpc.xds.XdsClientImpl.MessagePrinter; import java.io.IOException; @@ -205,9 +205,9 @@ public class XdsClientImplTestV2 { @Mock private ConfigWatcher configWatcher; @Mock - private ClusterWatcher clusterWatcher; + private CdsResourceWatcher cdsResourceWatcher; @Mock - private EndpointWatcher endpointWatcher; + private EdsResourceWatcher edsResourceWatcher; private ManagedChannel channel; private XdsClientImpl xdsClient; @@ -1347,7 +1347,7 @@ public class XdsClientImplTestV2 { */ @Test public void cdsResponseWithoutMatchingResource() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1369,12 +1369,12 @@ public class XdsClientImplTestV2 { verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); - verify(clusterWatcher, never()).onClusterChanged(any(ClusterUpdate.class)); - verify(clusterWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); - verify(clusterWatcher, never()).onError(any(Status.class)); + verify(cdsResourceWatcher, never()).onChanged(any(CdsUpdate.class)); + verify(cdsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -1384,7 +1384,7 @@ public class XdsClientImplTestV2 { */ @Test public void cdsResponseWithMatchingResource() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1411,13 +1411,13 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); assertThat(cdsRespTimer.isCancelled()).isTrue(); - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()).isNull(); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); // Management server sends back another CDS response updating the requested Cluster. clusters = ImmutableList.of( @@ -1434,13 +1434,13 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "1", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001"))); - verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture()); - clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()) + verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()) .isEqualTo("eds-cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); } /** @@ -1448,7 +1448,7 @@ public class XdsClientImplTestV2 { */ @Test public void cdsResponseWithUpstreamTlsContext() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1468,10 +1468,10 @@ public class XdsClientImplTestV2 { verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher, times(1)).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = clusterUpdate + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = cdsUpdate .getUpstreamTlsContext(); SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() .getValidationContextSdsSecretConfig(); @@ -1488,13 +1488,13 @@ public class XdsClientImplTestV2 { } @Test - public void multipleClusterWatchers() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); + public void multipleCdsWatchers() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1526,25 +1526,25 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); // Two watchers get notification of cluster update for the cluster they are interested in. - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()).isNull(); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()).isNull(); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isNull(); - verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class)); + verify(watcher3, never()).onChanged(any(CdsUpdate.class)); verify(watcher3, never()).onResourceDoesNotExist("cluster-bar.googleapis.com"); verify(watcher3, never()).onError(any(Status.class)); @@ -1573,14 +1573,14 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001"))); verifyNoMoreInteractions(watcher1, watcher2); // resource has no change - ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); - ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); - assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(clusterUpdate3.getEdsServiceName()) + ArgumentCaptor cdsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(cdsUpdateCaptor3.capture()); + CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue(); + assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(cdsUpdate3.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); - assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate3.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate3.getLrsServerName()).isEqualTo(""); } /** @@ -1590,8 +1590,8 @@ public class XdsClientImplTestV2 { */ @Test public void watchClusterAlreadyBeingWatched() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -1616,28 +1616,28 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // Another cluster watcher interested in the same cluster is added. - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); // Since the client has received cluster update for this cluster before, cached result is // notified to the newly added watcher immediately. - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()).isNull(); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()).isNull(); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isNull(); verifyNoMoreInteractions(requestObserver); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -1647,9 +1647,9 @@ public class XdsClientImplTestV2 { * Basic operations of adding/canceling cluster data watchers. */ @Test - public void addRemoveClusterWatchers() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + public void addRemoveCdsWatchers() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -1673,17 +1673,17 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"))); - ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); - ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); - assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()).isNull(); - assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.getLrsServerName()).isNull(); + ArgumentCaptor cdsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(cdsUpdateCaptor1.capture()); + CdsUpdate cdsUpdate1 = cdsUpdateCaptor1.getValue(); + assertThat(cdsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate1.getEdsServiceName()).isNull(); + assertThat(cdsUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate1.getLrsServerName()).isNull(); // Add another cluster watcher for a different cluster. - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher2); // Client sent a new CDS request for all interested resources. verify(requestObserver) @@ -1711,17 +1711,17 @@ public class XdsClientImplTestV2 { ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001"))); verifyNoMoreInteractions(watcher1); // resource has no change - ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); - ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); - assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()) + ArgumentCaptor cdsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(cdsUpdateCaptor2.capture()); + CdsUpdate cdsUpdate2 = cdsUpdateCaptor2.getValue(); + assertThat(cdsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(cdsUpdate2.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); - assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + assertThat(cdsUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isEqualTo(""); // Cancel one of the watcher. - xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); + xdsClient.cancelCdsResourceWatch("cluster-foo.googleapis.com", watcher1); // Since the cancelled watcher was the last watcher interested in that cluster (but there // is still interested resource), client sent an new CDS request to unsubscribe from @@ -1733,7 +1733,7 @@ public class XdsClientImplTestV2 { // Management server has nothing to respond. // Cancel the other watcher. All resources have been unsubscribed. - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher2); verify(requestObserver) .onNext( @@ -1760,9 +1760,9 @@ public class XdsClientImplTestV2 { verifyNoMoreInteractions(watcher1, watcher2); // A new cluster watcher is added to watch cluster foo again. - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3); - verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class)); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher3); + verify(watcher3, never()).onChanged(any(CdsUpdate.class)); // A CDS request is sent to indicate subscription of "cluster-foo.googleapis.com" only. verify(requestObserver) @@ -1780,13 +1780,13 @@ public class XdsClientImplTestV2 { responseObserver.onNext(response); // Notified with cached data immediately. - ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); - ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); - assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate3.getEdsServiceName()).isNull(); - assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); + ArgumentCaptor cdsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(cdsUpdateCaptor3.capture()); + CdsUpdate cdsUpdate3 = cdsUpdateCaptor3.getValue(); + assertThat(cdsUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate3.getEdsServiceName()).isNull(); + assertThat(cdsUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate2.getLrsServerName()).isEqualTo(""); verifyNoMoreInteractions(watcher1, watcher2); @@ -1797,9 +1797,9 @@ public class XdsClientImplTestV2 { } @Test - public void addRemoveClusterWatcherWhileInitialResourceFetchInProgress() { - ClusterWatcher watcher1 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + public void addRemoveCdsWatcherWhileInitialResourceFetchInProgress() { + CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver requestObserver = requestObservers.poll(); @@ -1814,12 +1814,12 @@ public class XdsClientImplTestV2 { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS); - ClusterWatcher watcher2 = mock(ClusterWatcher.class); - ClusterWatcher watcher3 = mock(ClusterWatcher.class); - ClusterWatcher watcher4 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); - xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher4); + CdsResourceWatcher watcher2 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher3 = mock(CdsResourceWatcher.class); + CdsResourceWatcher watcher4 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher3); + xdsClient.watchCdsResource("cluster-bar.googleapis.com", watcher4); // Client sends a new CDS request for updating the latest resource subscription. verify(requestObserver) @@ -1838,8 +1838,8 @@ public class XdsClientImplTestV2 { verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com"); // The absence result is known immediately. - ClusterWatcher watcher5 = mock(ClusterWatcher.class); - xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher5); + CdsResourceWatcher watcher5 = mock(CdsResourceWatcher.class); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", watcher5); verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); @@ -1847,9 +1847,9 @@ public class XdsClientImplTestV2 { // Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still // in progress. - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher3); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher3); assertThat(timeoutTask.isCancelled()).isFalse(); - xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher4); + xdsClient.cancelCdsResourceWatch("cluster-bar.googleapis.com", watcher4); // Client sends a CDS request for resource subscription update (Omitted). @@ -1863,7 +1863,7 @@ public class XdsClientImplTestV2 { @Test public void cdsUpdateForClusterBeingRemoved() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1881,13 +1881,13 @@ public class XdsClientImplTestV2 { // Client sent an ACK CDS request (Omitted). - ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); - ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); - assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()).isNull(); - assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); + ArgumentCaptor cdsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isEqualTo(""); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // No cluster is available. @@ -1896,7 +1896,7 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0001"); responseObserver.onNext(response); - verify(clusterWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(cdsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); } /** @@ -1908,7 +1908,7 @@ public class XdsClientImplTestV2 { */ @Test public void edsResponseWithoutMatchingResource() { - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -1946,11 +1946,11 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"))); - verify(endpointWatcher, never()).onEndpointChanged(any(EndpointUpdate.class)); - verify(endpointWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); - verify(endpointWatcher, never()).onError(any(Status.class)); + verify(edsResourceWatcher, never()).onChanged(any(EdsUpdate.class)); + verify(edsResourceWatcher, never()).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(edsResourceWatcher, never()).onError(any(Status.class)); fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - verify(endpointWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); + verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -1960,7 +1960,7 @@ public class XdsClientImplTestV2 { */ @Test public void edsResponseWithMatchingResource() { - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2012,15 +2012,15 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"))); - ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null); - verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture()); - EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate.getDropPolicies()) + ArgumentCaptor edsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate.getDropPolicies()) .containsExactly( new DropOverload("lb", 200), new DropOverload("throttle", 1000)); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()) + assertThat(edsUpdate.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2045,21 +2045,21 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "1", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001"))); - verify(endpointWatcher, times(2)).onEndpointChanged(endpointUpdateCaptor.capture()); - endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()).isEmpty(); + verify(edsResourceWatcher, times(2)).onChanged(edsUpdateCaptor.capture()); + edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()).isEmpty(); } @Test - public void multipleEndpointWatchers() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + public void multipleEdsWatchers() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2101,11 +2101,11 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"))); // Two watchers get notification of endpoint update for the cluster they are interested in. - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2113,11 +2113,11 @@ public class XdsClientImplTestV2 { new LbEndpoint("192.168.0.1", 8080, 2, true)), 1, 0)); - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2151,11 +2151,11 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001"))); // The corresponding watcher gets notified. - ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); - EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); - assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(edsUpdateCaptor3.capture()); + EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue(); + assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate3.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region2", "zone2", "subzone2"), new LocalityLbEndpoints( @@ -2171,8 +2171,8 @@ public class XdsClientImplTestV2 { */ @Test public void watchEndpointsForClusterAlreadyBeingWatched() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); StreamObserver responseObserver = responseObservers.poll(); StreamObserver requestObserver = requestObservers.poll(); @@ -2206,12 +2206,12 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"))); - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getDropPolicies()).isEmpty(); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2220,17 +2220,17 @@ public class XdsClientImplTestV2 { 2, true)), 1, 0)); // A second endpoint watcher is registered for endpoints in the same cluster. - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); // Cached endpoint information is notified to the new watcher immediately, without sending // another EDS request. - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate2.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate2.getDropPolicies()).isEmpty(); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2246,9 +2246,9 @@ public class XdsClientImplTestV2 { * Basic operations of adding/canceling endpoint data watchers. */ @Test - public void addRemoveEndpointWatchers() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + public void addRemoveEdsWatchers() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver responseObserver = responseObservers.poll(); @@ -2281,11 +2281,11 @@ public class XdsClientImplTestV2 { .onNext(eq(buildDiscoveryRequestV2(NODE, "0", "cluster-foo.googleapis.com", XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"))); - ArgumentCaptor endpointUpdateCaptor1 = ArgumentCaptor.forClass(null); - verify(watcher1).onEndpointChanged(endpointUpdateCaptor1.capture()); - EndpointUpdate endpointUpdate1 = endpointUpdateCaptor1.getValue(); - assertThat(endpointUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(endpointUpdate1.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onChanged(edsUpdateCaptor1.capture()); + EdsUpdate edsUpdate1 = edsUpdateCaptor1.getValue(); + assertThat(edsUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(edsUpdate1.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2295,8 +2295,8 @@ public class XdsClientImplTestV2 { 1, 0)); // Add another endpoint watcher for a different cluster. - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher2); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher2); // Client sent a new EDS request for all interested resources. verify(requestObserver) @@ -2329,11 +2329,11 @@ public class XdsClientImplTestV2 { ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0001"))); - ArgumentCaptor endpointUpdateCaptor2 = ArgumentCaptor.forClass(null); - verify(watcher2).onEndpointChanged(endpointUpdateCaptor2.capture()); - EndpointUpdate endpointUpdate2 = endpointUpdateCaptor2.getValue(); - assertThat(endpointUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate2.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onChanged(edsUpdateCaptor2.capture()); + EdsUpdate edsUpdate2 = edsUpdateCaptor2.getValue(); + assertThat(edsUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate2.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region2", "zone2", "subzone2"), new LocalityLbEndpoints( @@ -2342,7 +2342,7 @@ public class XdsClientImplTestV2 { 6, 0)); // Cancel one of the watcher. - xdsClient.cancelEndpointDataWatch("cluster-foo.googleapis.com", watcher1); + xdsClient.cancelEdsResourceWatch("cluster-foo.googleapis.com", watcher1); // Since the cancelled watcher was the last watcher interested in that cluster, client // sent an new EDS request to unsubscribe from that cluster. @@ -2353,7 +2353,7 @@ public class XdsClientImplTestV2 { // Management server should not respond as it had previously sent the requested resource. // Cancel the other watcher. - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher2); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher2); // Since the cancelled watcher was the last watcher interested in that cluster, client // sent an new EDS request to unsubscribe from that cluster. @@ -2399,13 +2399,13 @@ public class XdsClientImplTestV2 { verifyNoMoreInteractions(watcher1, watcher2); // A new endpoint watcher is added to watch an old but was no longer interested in cluster. - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); // Nothing should be notified to the new watcher as we are still waiting management server's // latest response. // Cached endpoint data should have been purged. - verify(watcher3, never()).onEndpointChanged(any(EndpointUpdate.class)); + verify(watcher3, never()).onChanged(any(EdsUpdate.class)); // An EDS request is sent to re-subscribe the cluster again. verify(requestObserver) @@ -2426,11 +2426,11 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0003"); responseObserver.onNext(response); - ArgumentCaptor endpointUpdateCaptor3 = ArgumentCaptor.forClass(null); - verify(watcher3).onEndpointChanged(endpointUpdateCaptor3.capture()); - EndpointUpdate endpointUpdate3 = endpointUpdateCaptor3.getValue(); - assertThat(endpointUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); - assertThat(endpointUpdate3.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onChanged(edsUpdateCaptor3.capture()); + EdsUpdate edsUpdate3 = edsUpdateCaptor3.getValue(); + assertThat(edsUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(edsUpdate3.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region4", "zone4", "subzone4"), new LocalityLbEndpoints( @@ -2448,9 +2448,9 @@ public class XdsClientImplTestV2 { } @Test - public void addRemoveEndpointWatcherWhileInitialResourceFetchInProgress() { - EndpointWatcher watcher1 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher1); + public void addRemoveEdsWatcherWhileInitialResourceFetchInProgress() { + EdsResourceWatcher watcher1 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher1); // Streaming RPC starts after a first watcher is added. StreamObserver requestObserver = requestObservers.poll(); @@ -2465,12 +2465,12 @@ public class XdsClientImplTestV2 { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC - 1, TimeUnit.SECONDS); - EndpointWatcher watcher2 = mock(EndpointWatcher.class); - EndpointWatcher watcher3 = mock(EndpointWatcher.class); - EndpointWatcher watcher4 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher2); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher3); - xdsClient.watchEndpointData("cluster-bar.googleapis.com", watcher4); + EdsResourceWatcher watcher2 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher3 = mock(EdsResourceWatcher.class); + EdsResourceWatcher watcher4 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher2); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher3); + xdsClient.watchEdsResource("cluster-bar.googleapis.com", watcher4); // Client sends a new EDS request for updating the latest resource subscription. verify(requestObserver) @@ -2489,8 +2489,8 @@ public class XdsClientImplTestV2 { verify(watcher2).onResourceDoesNotExist("cluster-foo.googleapis.com"); // The absence result is known immediately. - EndpointWatcher watcher5 = mock(EndpointWatcher.class); - xdsClient.watchEndpointData("cluster-foo.googleapis.com", watcher5); + EdsResourceWatcher watcher5 = mock(EdsResourceWatcher.class); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", watcher5); verify(watcher5).onResourceDoesNotExist("cluster-foo.googleapis.com"); assertThat(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1); @@ -2498,9 +2498,9 @@ public class XdsClientImplTestV2 { // Cancel watchers while discovery for resource "cluster-bar.googleapis.com" is still // in progress. - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher3); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher3); assertThat(timeoutTask.isCancelled()).isFalse(); - xdsClient.cancelEndpointDataWatch("cluster-bar.googleapis.com", watcher4); + xdsClient.cancelEdsResourceWatch("cluster-bar.googleapis.com", watcher4); // Client sends an EDS request for resource subscription update (Omitted). @@ -2514,7 +2514,7 @@ public class XdsClientImplTestV2 { @Test public void cdsUpdateForEdsServiceNameChange() { - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); StreamObserver responseObserver = responseObservers.poll(); // Management server sends back a CDS response containing requested resource. @@ -2524,7 +2524,7 @@ public class XdsClientImplTestV2 { buildDiscoveryResponseV2("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS_V2, "0000"); responseObserver.onNext(response); - xdsClient.watchEndpointData("cluster-foo:service-bar", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo:service-bar", edsResourceWatcher); // Management server sends back an EDS response for resource "cluster-foo:service-bar". List clusterLoadAssignments = ImmutableList.of( @@ -2540,12 +2540,12 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_EDS_V2, "0000"); responseObserver.onNext(response); - ArgumentCaptor endpointUpdateCaptor = ArgumentCaptor.forClass(null); - verify(endpointWatcher).onEndpointChanged(endpointUpdateCaptor.capture()); - EndpointUpdate endpointUpdate = endpointUpdateCaptor.getValue(); - assertThat(endpointUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar"); - assertThat(endpointUpdate.getDropPolicies()).isEmpty(); - assertThat(endpointUpdate.getLocalityLbEndpointsMap()) + ArgumentCaptor edsUpdateCaptor = ArgumentCaptor.forClass(null); + verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getValue(); + assertThat(edsUpdate.getClusterName()).isEqualTo("cluster-foo:service-bar"); + assertThat(edsUpdate.getDropPolicies()).isEmpty(); + assertThat(edsUpdate.getLocalityLbEndpointsMap()) .containsExactly( new Locality("region1", "zone1", "subzone1"), new LocalityLbEndpoints( @@ -2562,7 +2562,7 @@ public class XdsClientImplTestV2 { responseObserver.onNext(response); // Watcher get notification for endpoint resource "cluster-foo:service-bar" being deleted. - verify(endpointWatcher).onResourceDoesNotExist("cluster-foo:service-bar"); + verify(edsResourceWatcher).onResourceDoesNotExist("cluster-foo:service-bar"); } /** @@ -2748,7 +2748,7 @@ public class XdsClientImplTestV2 { ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); // Start watching cluster information. - xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher); // Client sent first CDS request. verify(requestObserver) @@ -2756,7 +2756,7 @@ public class XdsClientImplTestV2 { XdsClientImpl.ADS_TYPE_URL_CDS_V2, ""))); // Start watching endpoint information. - xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher); // Client sent first EDS request. verify(requestObserver) @@ -2767,9 +2767,9 @@ public class XdsClientImplTestV2 { responseObserver.onError(Status.UNKNOWN.asException()); verify(configWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); - verify(clusterWatcher).onError(statusCaptor.capture()); + verify(cdsResourceWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); - verify(endpointWatcher).onError(statusCaptor.capture()); + verify(edsResourceWatcher).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); // Resets backoff and retry immediately. @@ -2795,9 +2795,9 @@ public class XdsClientImplTestV2 { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(2)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(2)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(2)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2824,9 +2824,9 @@ public class XdsClientImplTestV2 { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(3)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(3)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(3)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2861,8 +2861,8 @@ public class XdsClientImplTestV2 { // Management server closes the RPC stream. responseObserver.onCompleted(); verify(configWatcher, times(4)).onError(any(Status.class)); - verify(clusterWatcher, times(4)).onError(any(Status.class)); - verify(endpointWatcher, times(4)).onError(any(Status.class)); + verify(cdsResourceWatcher, times(4)).onError(any(Status.class)); + verify(edsResourceWatcher, times(4)).onError(any(Status.class)); // Resets backoff and retry immediately inOrder.verify(backoffPolicyProvider).get(); @@ -2886,9 +2886,9 @@ public class XdsClientImplTestV2 { responseObserver.onError(Status.UNAVAILABLE.asException()); verify(configWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(clusterWatcher, times(5)).onError(statusCaptor.capture()); + verify(cdsResourceWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); - verify(endpointWatcher, times(5)).onError(statusCaptor.capture()); + verify(edsResourceWatcher, times(5)).onError(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy2).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2955,7 +2955,7 @@ public class XdsClientImplTestV2 { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // Start watching cluster information while RPC stream is still in retry backoff. - xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster.googleapis.com", cdsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(9L); @@ -2979,7 +2979,7 @@ public class XdsClientImplTestV2 { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // Start watching endpoint information while RPC stream is still in retry backoff. - xdsClient.watchEndpointData("cluster.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster.googleapis.com", edsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(99L); @@ -3010,14 +3010,14 @@ public class XdsClientImplTestV2 { // Client sent an CDS ACK request (Omitted). // No longer interested in endpoint information after RPC resumes. - xdsClient.cancelEndpointDataWatch("cluster.googleapis.com", endpointWatcher); + xdsClient.cancelEdsResourceWatch("cluster.googleapis.com", edsResourceWatcher); // Client updates EDS resource subscription immediately. verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", ImmutableList.of(), XdsClientImpl.ADS_TYPE_URL_EDS_V2, ""))); // Become interested in endpoints of another cluster. - xdsClient.watchEndpointData("cluster2.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster2.googleapis.com", edsResourceWatcher); // Client updates EDS resource subscription immediately. verify(requestObserver) .onNext(eq(buildDiscoveryRequestV2(NODE, "", "cluster2.googleapis.com", @@ -3049,8 +3049,8 @@ public class XdsClientImplTestV2 { assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); // No longer interested in previous cluster and endpoints in that cluster. - xdsClient.cancelClusterDataWatch("cluster.googleapis.com", clusterWatcher); - xdsClient.cancelEndpointDataWatch("cluster2.googleapis.com", endpointWatcher); + xdsClient.cancelCdsResourceWatch("cluster.googleapis.com", cdsResourceWatcher); + xdsClient.cancelEdsResourceWatch("cluster2.googleapis.com", edsResourceWatcher); // Retry after backoff. fakeClock.forwardNanos(19L); @@ -3174,7 +3174,7 @@ public class XdsClientImplTestV2 { // Client/server resumed LDS/RDS request/response (Omitted). // Start watching cluster data. - xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + xdsClient.watchCdsResource("cluster-foo.googleapis.com", cdsResourceWatcher); ScheduledTask cdsRespTimeoutTask = Iterables.getOnlyElement( fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); @@ -3202,7 +3202,7 @@ public class XdsClientImplTestV2 { .isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); // Start watching endpoint data. - xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); + xdsClient.watchEdsResource("cluster-foo.googleapis.com", edsResourceWatcher); ScheduledTask edsTimeoutTask = Iterables.getOnlyElement( fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));