xds: xdsClient caches transient error for new watchers (#12262)

This commit is contained in:
MV Shiva 2025-08-19 21:41:52 +05:30 committed by GitHub
parent 43bef65cf9
commit 2039266ebc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 0 deletions

View File

@ -676,6 +676,8 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
private ResourceMetadata metadata;
@Nullable
private String errorDescription;
@Nullable
private Status lastError;
ResourceSubscriber(XdsResourceType<T> type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext();
@ -712,11 +714,16 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
watchers.put(watcher, watcherExecutor);
T savedData = data;
boolean savedAbsent = absent;
Status savedError = lastError;
watcherExecutor.execute(() -> {
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (savedError != null) {
watcher.onError(savedError);
return;
}
if (savedData != null) {
notifyWatcher(watcher, savedData);
} else if (savedAbsent) {
@ -808,6 +815,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
absent = false;
lastError = null;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
@ -857,6 +865,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
if (!absent) {
data = null;
absent = true;
lastError = null;
metadata = serverInfo.resourceTimerIsTransientError()
? ResourceMetadata.newResourceMetadataTimeout()
: ResourceMetadata.newResourceMetadataDoesNotExist();
@ -894,6 +903,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore {
Status errorAugmented = Status.fromCode(error.getCode())
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
.withCause(error.getCause());
this.lastError = errorAugmented;
for (ResourceWatcher<T> watcher : watchers.keySet()) {
if (tracker != null) {

View File

@ -285,6 +285,8 @@ public abstract class GrpcXdsClientImplTestBase {
@Mock
private ResourceWatcher<LdsUpdate> ldsResourceWatcher;
@Mock
private ResourceWatcher<LdsUpdate> ldsResourceWatcher2;
@Mock
private ResourceWatcher<RdsUpdate> rdsResourceWatcher;
@Mock
private ResourceWatcher<CdsUpdate> cdsResourceWatcher;
@ -694,6 +696,24 @@ public abstract class GrpcXdsClientImplTestBase {
assertThat(resourceDiscoveryCalls.poll()).isNull();
}
@Test
public void ldsResource_onError_cachedForNewWatcher() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE,
ldsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendCompleted();
verify(ldsResourceWatcher).onError(errorCaptor.capture());
Status initialError = errorCaptor.getValue();
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE,
ldsResourceWatcher2);
ArgumentCaptor<Status> secondErrorCaptor = ArgumentCaptor.forClass(Status.class);
verify(ldsResourceWatcher2).onError(secondErrorCaptor.capture());
Status cachedError = secondErrorCaptor.getValue();
assertThat(cachedError).isEqualTo(initialError);
assertThat(resourceDiscoveryCalls.poll()).isNull();
}
@Test
public void ldsResponseErrorHandling_allResourcesFailedUnpack() {
DiscoveryRpcCall call = startResourceWatcher(XdsListenerResource.getInstance(), LDS_RESOURCE,