xds: refactor resource subscription implementation in XdsClient (#7458)

Introduce ResourceSubscriber for tracking the state of a single resource.

Every time newly subscribing to some resource, a corresponding ResourceSubscriber is created. Note it does not control the resource discovery RPCs. It is still the XdsClient that sends RPCs for with all subscribed resource names for each type. A ResourceSubscriber can have the following states:

  - When the initial resource fetch timer (respTimer) is pending, the resource is under discovery, the resource data is unknown. Even if the XdsClient receives a response not containing the corresponding resource, it does not mean the resource is absent. We still need to wait until a response containing the resource data coming or the timer being fired. The timer is scheduled when the ResourceSubscriber is created. So the XdsClient should always create the corresponding ResourceSubscriber when it starts to subscribe a new resource.

  - If the resource fetch timer is not pending, we must know the existence of the resource data. If data field is set, it is the most recently received resource data (aka, cached entry). Otherwise, absent field is set to true, indicating the resource does not exist. The exceptional case is when the ADS stream is closed and in the retry backoff period. During that period, respTimer is cancelled and the resource existence may or may not be known. Once the backoff finishes, the XdsClient will reschedule the respTimer when it recreates the ADS stream and re-request all the resources.

Watchers can be added to existing ResourceSubscribers. At the time the watcher is added, its callback will be invoked if we've already known the existence of the resource. Otherwise, the watcher will just sit there and wait data or absence to come in the future.
This commit is contained in:
Chengyuan Zhang 2020-09-28 13:43:41 -07:00 committed by GitHub
parent 950ec30247
commit 2adeff56fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 255 additions and 328 deletions

View File

@ -103,7 +103,7 @@ abstract class XdsClient {
}
}
static final class LdsUpdate {
static final class LdsUpdate implements ResourceUpdate {
// Total number of nanoseconds to keep alive an HTTP request/response stream.
private final long httpMaxStreamDurationNano;
// The name of the route configuration to be used for RDS resource discovery.
@ -134,6 +134,25 @@ abstract class XdsClient {
return virtualHosts;
}
@Override
public int hashCode() {
return Objects.hash(httpMaxStreamDurationNano, rdsName, virtualHosts);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LdsUpdate that = (LdsUpdate) o;
return Objects.equals(httpMaxStreamDurationNano, that.httpMaxStreamDurationNano)
&& Objects.equals(rdsName, that.rdsName)
&& Objects.equals(virtualHosts, that.virtualHosts);
}
@Override
public String toString() {
ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
@ -182,7 +201,7 @@ abstract class XdsClient {
}
}
static final class RdsUpdate {
static final class RdsUpdate implements ResourceUpdate {
// The list virtual hosts that make up the route table.
private final List<VirtualHost> virtualHosts;
@ -206,7 +225,7 @@ abstract class XdsClient {
}
}
static final class CdsUpdate {
static final class CdsUpdate implements ResourceUpdate {
private final String clusterName;
@Nullable
private final String edsServiceName;
@ -351,7 +370,7 @@ abstract class XdsClient {
}
}
static final class EdsUpdate {
static final class EdsUpdate implements ResourceUpdate {
private final String clusterName;
private final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
private final List<DropOverload> dropPolicies;
@ -497,10 +516,13 @@ abstract class XdsClient {
}
}
interface ResourceUpdate {
}
/**
* Watcher interface for a single requested xDS resource.
*/
private interface ResourceWatcher {
interface ResourceWatcher {
/**
* Called when the resource discovery RPC encounters some transient error.
@ -523,6 +545,14 @@ abstract class XdsClient {
void onChanged(RdsUpdate update);
}
interface CdsResourceWatcher extends ResourceWatcher {
void onChanged(CdsUpdate update);
}
interface EdsResourceWatcher extends ResourceWatcher {
void onChanged(EdsUpdate update);
}
/**
* Config watcher interface. To be implemented by the xDS resolver.
*/
@ -535,16 +565,6 @@ abstract class XdsClient {
void onConfigChanged(ConfigUpdate update);
}
interface CdsResourceWatcher extends ResourceWatcher {
void onChanged(CdsUpdate update);
}
interface EdsResourceWatcher extends ResourceWatcher {
void onChanged(EdsUpdate update);
}
/**
* Listener watcher interface. To be used by {@link io.grpc.xds.internal.sds.XdsServerBuilder}.
*/

View File

@ -69,6 +69,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -126,39 +127,8 @@ final class XdsClientImpl extends XdsClient {
// more than once.
private Node node;
// Cached data for CDS responses, keyed by cluster names.
// Optimization: cache CdsUpdate, which contains only information needed by gRPC, instead
// of whole Cluster messages to reduce memory usage.
private final Map<String, CdsUpdate> clusterNamesToCdsUpdates = new HashMap<>();
// Cached CDS resources that are known to be absent.
private final Set<String> absentCdsResources = new HashSet<>();
// Cached data for EDS responses, keyed by cluster names.
// CDS responses indicate absence of clusters and EDS responses indicate presence of clusters.
// Optimization: cache EdsUpdate, which contains only information needed by gRPC, instead
// of whole ClusterLoadAssignment messages to reduce memory usage.
private final Map<String, EdsUpdate> clusterNamesToEdsUpdates = new HashMap<>();
// Cached EDS resources that are known to be absent.
private final Set<String> absentEdsResources = new HashSet<>();
// Cluster watchers waiting for cluster information updates. Multiple cluster watchers
// can watch on information for the same cluster.
private final Map<String, Set<CdsResourceWatcher>> cdsWatchers = new HashMap<>();
// Endpoint watchers waiting for endpoint updates for each cluster. Multiple endpoint
// watchers can watch endpoints in the same cluster.
private final Map<String, Set<EdsResourceWatcher>> edsWatchers = new HashMap<>();
// Resource fetch timers are used to conclude absence of resources. Each timer is activated when
// subscription for the resource starts and disarmed on first update for the resource.
// Timers for concluding CDS resources not found.
private final Map<String, ScheduledHandle> cdsRespTimers = new HashMap<>();
// Timers for concluding EDS resources not found.
private final Map<String, ScheduledHandle> edsRespTimers = new HashMap<>();
private final Map<String, ResourceSubscriber> cdsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> edsResourceSubscribers = new HashMap<>();
private final LoadStatsManager loadStatsManager = new LoadStatsManager();
@ -248,14 +218,12 @@ final class XdsClientImpl extends XdsClient {
rdsRespTimer.cancel();
rdsRespTimer = null;
}
for (ScheduledHandle handle : cdsRespTimers.values()) {
handle.cancel();
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.stopTimer();
}
cdsRespTimers.clear();
for (ScheduledHandle handle : edsRespTimers.values()) {
handle.cancel();
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.stopTimer();
}
edsRespTimers.clear();
}
@Override
@ -282,151 +250,49 @@ final class XdsClientImpl extends XdsClient {
@Override
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
checkNotNull(resourceName, "resourceName");
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!cdsWatchers.containsKey(resourceName)) {
logger.log(XdsLogLevel.INFO, "Start watching cluster {0}", resourceName);
needRequest = true;
cdsWatchers.put(resourceName, new HashSet<CdsResourceWatcher>());
}
Set<CdsResourceWatcher> watchers = cdsWatchers.get(resourceName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName);
watchers.add(watcher);
// If local cache contains cluster information to be watched, notify the watcher immediately.
if (absentCdsResources.contains(resourceName)) {
logger.log(XdsLogLevel.DEBUG, "Cluster resource {0} is known to be absent", resourceName);
watcher.onResourceDoesNotExist(resourceName);
return;
}
if (clusterNamesToCdsUpdates.containsKey(resourceName)) {
logger.log(XdsLogLevel.DEBUG, "Retrieve cluster info {0} from local cache", resourceName);
watcher.onChanged(clusterNamesToCdsUpdates.get(resourceName));
return;
}
if (needRequest) {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new CdsResourceFetchTimeoutTask(resourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
cdsRespTimers.put(resourceName, timeoutHandle);
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
if (subscriber == null) {
logger.log(XdsLogLevel.INFO, "Subscribe CDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.CDS, resourceName);
cdsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.CDS, cdsResourceSubscribers.keySet());
}
subscriber.addWatcher(watcher);
}
@Override
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<CdsResourceWatcher> watchers = cdsWatchers.get(resourceName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", resourceName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching cluster {0}", resourceName);
cdsWatchers.remove(resourceName);
// Remove the corresponding CDS entry.
absentCdsResources.remove(resourceName);
clusterNamesToCdsUpdates.remove(resourceName);
// Cancel and delete response timer waiting for the corresponding resource.
if (cdsRespTimers.containsKey(resourceName)) {
cdsRespTimers.get(resourceName).cancel();
cdsRespTimers.remove(resourceName);
}
// No longer interested in this cluster, send an updated CDS request to unsubscribe
// this resource.
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
checkState(adsStream != null,
"Severe bug: ADS stream was not created while an endpoint watcher was registered");
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
cdsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.CDS, cdsResourceSubscribers.keySet());
}
}
@Override
void watchEdsResource(String resourceName, EdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
boolean needRequest = false;
if (!edsWatchers.containsKey(resourceName)) {
logger.log(XdsLogLevel.INFO, "Start watching endpoints in cluster {0}", resourceName);
needRequest = true;
edsWatchers.put(resourceName, new HashSet<EdsResourceWatcher>());
}
Set<EdsResourceWatcher> watchers = edsWatchers.get(resourceName);
checkState(!watchers.contains(watcher), "watcher for %s already registered", resourceName);
watchers.add(watcher);
// If local cache contains endpoint information for the cluster to be watched, notify
// the watcher immediately.
if (absentEdsResources.contains(resourceName)) {
logger.log(
XdsLogLevel.DEBUG,
"Endpoint resource for cluster {0} is known to be absent.", resourceName);
watcher.onResourceDoesNotExist(resourceName);
return;
}
if (clusterNamesToEdsUpdates.containsKey(resourceName)) {
logger.log(
XdsLogLevel.DEBUG,
"Retrieve endpoints info for cluster {0} from local cache.", resourceName);
watcher.onChanged(clusterNamesToEdsUpdates.get(resourceName));
return;
}
if (needRequest) {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new EdsResourceFetchTimeoutTask(resourceName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
edsRespTimers.put(resourceName, timeoutHandle);
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
if (subscriber == null) {
logger.log(XdsLogLevel.INFO, "Subscribe EDS resource {0}", resourceName);
subscriber = new ResourceSubscriber(ResourceType.EDS, resourceName);
edsResourceSubscribers.put(resourceName, subscriber);
adjustResourceSubscription(ResourceType.EDS, edsResourceSubscribers.keySet());
}
subscriber.addWatcher(watcher);
}
@Override
void cancelEdsResourceWatch(String resourceName, EdsResourceWatcher watcher) {
checkNotNull(watcher, "watcher");
Set<EdsResourceWatcher> watchers = edsWatchers.get(resourceName);
checkState(
watchers != null && watchers.contains(watcher),
"watcher for %s was not registered", resourceName);
watchers.remove(watcher);
if (watchers.isEmpty()) {
logger.log(XdsLogLevel.INFO, "Stop watching endpoints in cluster {0}", resourceName);
edsWatchers.remove(resourceName);
// Remove the corresponding EDS cache entry.
absentEdsResources.remove(resourceName);
clusterNamesToEdsUpdates.remove(resourceName);
// Cancel and delete response timer waiting for the corresponding resource.
if (edsRespTimers.containsKey(resourceName)) {
edsRespTimers.get(resourceName).cancel();
edsRespTimers.remove(resourceName);
}
// No longer interested in this cluster, send an updated EDS request to unsubscribe
// this resource.
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
edsResourceSubscribers.remove(resourceName);
adjustResourceSubscription(ResourceType.EDS, edsResourceSubscribers.keySet());
}
}
@ -949,7 +815,7 @@ final class XdsClientImpl extends XdsClient {
} catch (InvalidProtocolBufferException e) {
logger.log(XdsLogLevel.WARNING, "Failed to unpack Clusters in CDS response {0}", e);
adsStream.sendNackRequest(
ResourceType.CDS, cdsWatchers.keySet(),
ResourceType.CDS, cdsResourceSubscribers.keySet(),
cdsResponse.getVersionInfo(), "Malformed CDS response: " + e);
return;
}
@ -967,7 +833,7 @@ final class XdsClientImpl extends XdsClient {
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!cdsWatchers.containsKey(clusterName)) {
if (!cdsResourceSubscribers.containsKey(clusterName)) {
continue;
}
CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder();
@ -1025,63 +891,26 @@ final class XdsClientImpl extends XdsClient {
if (errorMessage != null) {
adsStream.sendNackRequest(
ResourceType.CDS,
cdsWatchers.keySet(),
cdsResourceSubscribers.keySet(),
cdsResponse.getVersionInfo(),
errorMessage);
return;
}
adsStream.sendAckRequest(ResourceType.CDS, cdsWatchers.keySet(),
adsStream.sendAckRequest(ResourceType.CDS, cdsResourceSubscribers.keySet(),
cdsResponse.getVersionInfo());
// Update local CDS cache with data in this response.
absentCdsResources.removeAll(cdsUpdates.keySet());
for (Map.Entry<String, CdsUpdate> entry : clusterNamesToCdsUpdates.entrySet()) {
if (!cdsUpdates.containsKey(entry.getKey())) {
// Some previously existing resource no longer exists.
absentCdsResources.add(entry.getKey());
} else if (cdsUpdates.get(entry.getKey()).equals(entry.getValue())) {
cdsUpdates.remove(entry.getKey());
for (String resource : cdsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resource);
if (cdsUpdates.containsKey(resource)) {
subscriber.onData(cdsUpdates.get(resource));
} else {
subscriber.onAbsent();
}
}
clusterNamesToCdsUpdates.keySet().removeAll(absentCdsResources);
clusterNamesToCdsUpdates.putAll(cdsUpdates);
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
for (String clusterName : clusterNamesToEdsUpdates.keySet()) {
if (!edsServices.contains(clusterName)) {
absentEdsResources.add(clusterName);
// Notify EDS resource removal to watchers.
if (edsWatchers.containsKey(clusterName)) {
Set<EdsResourceWatcher> watchers = edsWatchers.get(clusterName);
for (EdsResourceWatcher watcher : watchers) {
watcher.onResourceDoesNotExist(clusterName);
}
}
}
}
clusterNamesToEdsUpdates.keySet().retainAll(edsServices);
for (String clusterName : cdsUpdates.keySet()) {
if (cdsRespTimers.containsKey(clusterName)) {
cdsRespTimers.get(clusterName).cancel();
cdsRespTimers.remove(clusterName);
}
}
// Notify watchers if clusters interested in present in this CDS response.
for (Map.Entry<String, Set<CdsResourceWatcher>> entry : cdsWatchers.entrySet()) {
String clusterName = entry.getKey();
if (cdsUpdates.containsKey(entry.getKey())) {
CdsUpdate cdsUpdate = cdsUpdates.get(clusterName);
for (CdsResourceWatcher watcher : entry.getValue()) {
watcher.onChanged(cdsUpdate);
}
} else if (!clusterNamesToCdsUpdates.containsKey(entry.getKey())
&& !cdsRespTimers.containsKey(clusterName)) {
// Update for previously present resource being removed.
for (CdsResourceWatcher watcher : entry.getValue()) {
watcher.onResourceDoesNotExist(entry.getKey());
}
for (String resource : edsResourceSubscribers.keySet()) {
if (!edsServices.contains(resource)) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
}
@ -1124,7 +953,7 @@ final class XdsClientImpl extends XdsClient {
logger.log(
XdsLogLevel.WARNING, "Failed to unpack ClusterLoadAssignments in EDS response {0}", e);
adsStream.sendNackRequest(
ResourceType.EDS, edsWatchers.keySet(),
ResourceType.EDS, edsResourceSubscribers.keySet(),
edsResponse.getVersionInfo(), "Malformed EDS response: " + e);
return;
}
@ -1141,7 +970,7 @@ final class XdsClientImpl extends XdsClient {
// Management server is required to always send newly requested resources, even if they
// may have been sent previously (proactively). Thus, client does not need to cache
// unrequested resources.
if (!edsWatchers.containsKey(clusterName)) {
if (!edsResourceSubscribers.containsKey(clusterName)) {
continue;
}
EdsUpdate.Builder updateBuilder = EdsUpdate.newBuilder();
@ -1198,36 +1027,33 @@ final class XdsClientImpl extends XdsClient {
if (errorMessage != null) {
adsStream.sendNackRequest(
ResourceType.EDS,
edsWatchers.keySet(),
edsResourceSubscribers.keySet(),
edsResponse.getVersionInfo(),
errorMessage);
return;
}
adsStream.sendAckRequest(ResourceType.EDS, edsWatchers.keySet(),
adsStream.sendAckRequest(ResourceType.EDS, edsResourceSubscribers.keySet(),
edsResponse.getVersionInfo());
// Update local EDS cache by inserting updated endpoint information.
clusterNamesToEdsUpdates.putAll(edsUpdates);
absentEdsResources.removeAll(edsUpdates.keySet());
// Notify watchers waiting for updates of endpoint information received in this EDS response.
// Based on xDS protocol, the management server should not send endpoint data again if
// nothing has changed.
for (Map.Entry<String, EdsUpdate> entry : edsUpdates.entrySet()) {
String clusterName = entry.getKey();
// Cancel and delete response timeout timer.
if (edsRespTimers.containsKey(clusterName)) {
edsRespTimers.get(clusterName).cancel();
edsRespTimers.remove(clusterName);
}
if (edsWatchers.containsKey(clusterName)) {
for (EdsResourceWatcher watcher : edsWatchers.get(clusterName)) {
watcher.onChanged(entry.getValue());
}
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (edsUpdates.containsKey(resource)) {
subscriber.onData(edsUpdates.get(resource));
}
}
}
private void adjustResourceSubscription(ResourceType type, Collection<String> resources) {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
// Currently in retry backoff.
return;
}
if (adsStream == null) {
startRpcStream();
}
adsStream.sendXdsRequest(type, resources);
}
@VisibleForTesting
final class RpcRetryTask implements Runnable {
@Override
@ -1249,35 +1075,26 @@ final class XdsClientImpl extends XdsClient {
new ListenerResourceFetchTimeoutTask(":" + listenerPort),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
}
if (!cdsWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.CDS, cdsWatchers.keySet());
for (String clusterName : cdsWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new CdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
cdsRespTimers.put(clusterName, timeoutHandle);
if (!cdsResourceSubscribers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.CDS, cdsResourceSubscribers.keySet());
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.restartTimer();
}
}
if (!edsWatchers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.EDS, edsWatchers.keySet());
for (String clusterName : edsWatchers.keySet()) {
ScheduledHandle timeoutHandle =
syncContext
.schedule(
new EdsResourceFetchTimeoutTask(clusterName),
INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, timeService);
edsRespTimers.put(clusterName, timeoutHandle);
if (!edsResourceSubscribers.isEmpty()) {
adsStream.sendXdsRequest(ResourceType.EDS, edsResourceSubscribers.keySet());
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.restartTimer();
}
}
}
}
private enum ResourceType {
@VisibleForTesting
enum ResourceType {
UNKNOWN, LDS, RDS, CDS, EDS;
String typeUrl() {
private String typeUrl() {
switch (this) {
case LDS:
return ADS_TYPE_URL_LDS;
@ -1293,7 +1110,7 @@ final class XdsClientImpl extends XdsClient {
}
}
String typeUrlV2() {
private String typeUrlV2() {
switch (this) {
case LDS:
return ADS_TYPE_URL_LDS_V2;
@ -1309,7 +1126,7 @@ final class XdsClientImpl extends XdsClient {
}
}
static ResourceType fromTypeUrl(String typeUrl) {
private static ResourceType fromTypeUrl(String typeUrl) {
switch (typeUrl) {
case ADS_TYPE_URL_LDS:
// fall trough
@ -1333,6 +1150,135 @@ final class XdsClientImpl extends XdsClient {
}
}
/**
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
// Resource states:
// - present: data != null; data is the cached data for the resource
// - absent: absent == true
// - unknown: anything else
// Note absent -> data == null, but not vice versa.
private ResourceUpdate data;
private boolean absent;
private ScheduledHandle respTimer;
ResourceSubscriber(ResourceType type, String resource) {
this.type = type;
this.resource = resource;
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
return;
}
restartTimer();
}
void addWatcher(ResourceWatcher watcher) {
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
watchers.add(watcher);
if (data != null) {
notifyWatcher(watcher, data);
} else if (absent) {
watcher.onResourceDoesNotExist(resource);
}
}
void removeWatcher(ResourceWatcher watcher) {
checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
watchers.remove(watcher);
}
void restartTimer() {
class ResourceNotFound implements Runnable {
@Override
public void run() {
respTimer = null;
onAbsent();
}
@Override
public String toString() {
return type + this.getClass().getSimpleName();
}
}
respTimer = syncContext.schedule(
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
timeService);
}
void stopTimer() {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
}
boolean isWatched() {
return !watchers.isEmpty();
}
void onData(ResourceUpdate data) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
ResourceUpdate oldData = this.data;
this.data = data;
absent = false;
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher watcher : watchers) {
notifyWatcher(watcher, data);
}
}
}
void onAbsent() {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
if (!absent) {
data = null;
absent = true;
for (ResourceWatcher watcher : watchers) {
watcher.onResourceDoesNotExist(resource);
}
}
}
void onError(Status error) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
for (ResourceWatcher watcher : watchers) {
watcher.onError(error);
}
}
private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) {
switch (type) {
case LDS:
((LdsResourceWatcher) watcher).onChanged((LdsUpdate) update);
break;
case RDS:
((RdsResourceWatcher) watcher).onChanged((RdsUpdate) update);
break;
case CDS:
((CdsResourceWatcher) watcher).onChanged((CdsUpdate) update);
break;
case EDS:
((EdsResourceWatcher) watcher).onChanged((EdsUpdate) update);
break;
case UNKNOWN:
default:
throw new AssertionError("should never be here");
}
}
}
private static final class DiscoveryRequestData {
private final ResourceType resourceType;
private final Collection<String> resourceNames;
@ -1521,15 +1467,11 @@ final class XdsClientImpl extends XdsClient {
if (listenerWatcher != null) {
listenerWatcher.onError(error);
}
for (Set<CdsResourceWatcher> watchers : cdsWatchers.values()) {
for (CdsResourceWatcher watcher : watchers) {
watcher.onError(error);
}
for (ResourceSubscriber subscriber : cdsResourceSubscribers.values()) {
subscriber.onError(error);
}
for (Set<EdsResourceWatcher> watchers : edsWatchers.values()) {
for (EdsResourceWatcher watcher : watchers) {
watcher.onError(error);
}
for (ResourceSubscriber subscriber : edsResourceSubscribers.values()) {
subscriber.onError(error);
}
cleanUp();
cleanUpResourceTimers();
@ -1837,6 +1779,7 @@ final class XdsClientImpl extends XdsClient {
}
}
// TODO(chengyuanzhang): delete me.
private abstract class ResourceFetchTimeoutTask implements Runnable {
final String resourceName;
@ -1853,6 +1796,7 @@ final class XdsClientImpl extends XdsClient {
}
}
// TODO(chengyuanzhang): delete me.
@VisibleForTesting
final class LdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
@ -1883,6 +1827,7 @@ final class XdsClientImpl extends XdsClient {
}
}
// TODO(chengyuanzhang): delete me.
@VisibleForTesting
final class RdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
@ -1898,42 +1843,6 @@ final class XdsClientImpl extends XdsClient {
}
}
@VisibleForTesting
final class CdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
CdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
cdsRespTimers.remove(resourceName);
absentCdsResources.add(resourceName);
for (CdsResourceWatcher wat : cdsWatchers.get(resourceName)) {
wat.onResourceDoesNotExist(resourceName);
}
}
}
@VisibleForTesting
final class EdsResourceFetchTimeoutTask extends ResourceFetchTimeoutTask {
EdsResourceFetchTimeoutTask(String resourceName) {
super(resourceName);
}
@Override
public void run() {
super.run();
edsRespTimers.remove(resourceName);
absentEdsResources.add(resourceName);
for (EdsResourceWatcher wat : edsWatchers.get(resourceName)) {
wat.onResourceDoesNotExist(resourceName);
}
}
}
/**
* Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
* case-insensitive.

View File

@ -99,6 +99,7 @@ import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsClientImpl.MessagePrinter;
import io.grpc.xds.XdsClientImpl.ResourceType;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
@ -161,8 +162,7 @@ public class XdsClientImplTest {
new TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString()
.contains(XdsClientImpl.CdsResourceFetchTimeoutTask.class.getSimpleName());
return command.toString().contains(ResourceType.CDS.toString());
}
};
@ -170,8 +170,7 @@ public class XdsClientImplTest {
new FakeClock.TaskFilter() {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString()
.contains(XdsClientImpl.EdsResourceFetchTimeoutTask.class.getSimpleName());
return command.toString().contains(ResourceType.EDS.toString());
}
};
@ -1794,7 +1793,6 @@ public class XdsClientImplTest {
// Streaming RPC starts after a first watcher is added.
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
// Client sends an EDS request to management server.
verify(requestObserver)
.onNext(
argThat(