xds: notify all watchers when RPC stream is closed by server (#6629)

This commit is contained in:
Chengyuan Zhang 2020-01-22 14:27:06 -08:00 committed by GitHub
parent 9cf45e960b
commit ee661d45eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 5 deletions

View File

@ -1077,6 +1077,19 @@ final class XdsClientImpl extends XdsClient {
}
logger.log(Level.FINE, error.getDescription(), error.getCause());
closed = true;
if (configWatcher != null) {
configWatcher.onError(error);
}
for (Set<ClusterWatcher> watchers : clusterWatchers.values()) {
for (ClusterWatcher watcher : watchers) {
watcher.onError(error);
}
}
for (Set<EndpointWatcher> watchers : endpointWatchers.values()) {
for (EndpointWatcher watcher : watchers) {
watcher.onError(error);
}
}
cleanUp();
cleanUpResources();
if (responseReceived || retryBackoffPolicy == null) {

View File

@ -2618,6 +2618,7 @@ public class XdsClientImplTest {
StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
waitUntilConfigResolved(responseObserver);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Start watching cluster information.
xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher);
@ -2637,6 +2638,12 @@ public class XdsClientImplTest {
// Management server closes the RPC stream with an error.
responseObserver.onError(Status.UNKNOWN.asException());
verify(configWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(clusterWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
verify(endpointWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
// Resets backoff and retry immediately.
inOrder.verify(backoffPolicyProvider).get();
@ -2659,6 +2666,12 @@ public class XdsClientImplTest {
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(2)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2682,6 +2695,12 @@ public class XdsClientImplTest {
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(3)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -2714,6 +2733,9 @@ public class XdsClientImplTest {
// Management server closes the RPC stream.
responseObserver.onCompleted();
verify(configWatcher, times(4)).onError(any(Status.class));
verify(clusterWatcher, times(4)).onError(any(Status.class));
verify(endpointWatcher, times(4)).onError(any(Status.class));
// Resets backoff and retry immediately
inOrder.verify(backoffPolicyProvider).get();
@ -2735,6 +2757,12 @@ public class XdsClientImplTest {
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
verify(configWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(clusterWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
verify(endpointWatcher, times(5)).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
inOrder.verify(backoffPolicy2).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
@ -3044,11 +3072,6 @@ public class XdsClientImplTest {
assertThat(cdsRespTimeoutTask.getDelay(TimeUnit.SECONDS))
.isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC);
fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
verify(clusterWatcher).onError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.NOT_FOUND);
// Start watching endpoint data.
xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher);
ScheduledTask edsTimeoutTask =