diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 70fcd0451e..20bdf06988 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -1077,6 +1077,19 @@ final class XdsClientImpl extends XdsClient { } logger.log(Level.FINE, error.getDescription(), error.getCause()); closed = true; + if (configWatcher != null) { + configWatcher.onError(error); + } + for (Set watchers : clusterWatchers.values()) { + for (ClusterWatcher watcher : watchers) { + watcher.onError(error); + } + } + for (Set watchers : endpointWatchers.values()) { + for (EndpointWatcher watcher : watchers) { + watcher.onError(error); + } + } cleanUp(); cleanUpResources(); if (responseReceived || retryBackoffPolicy == null) { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 7395a364cb..a9c139eaae 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -2618,6 +2618,7 @@ public class XdsClientImplTest { StreamObserver requestObserver = requestObservers.poll(); waitUntilConfigResolved(responseObserver); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); // Start watching cluster information. xdsClient.watchClusterData("cluster.googleapis.com", clusterWatcher); @@ -2637,6 +2638,12 @@ public class XdsClientImplTest { // Management server closes the RPC stream with an error. responseObserver.onError(Status.UNKNOWN.asException()); + verify(configWatcher).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verify(clusterWatcher).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); + verify(endpointWatcher).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN); // Resets backoff and retry immediately. inOrder.verify(backoffPolicyProvider).get(); @@ -2659,6 +2666,12 @@ public class XdsClientImplTest { // Management server becomes unreachable. responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(configWatcher, times(2)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(clusterWatcher, times(2)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(endpointWatcher, times(2)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2682,6 +2695,12 @@ public class XdsClientImplTest { // Management server is still not reachable. responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(configWatcher, times(3)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(clusterWatcher, times(3)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(endpointWatcher, times(3)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy1).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -2714,6 +2733,9 @@ public class XdsClientImplTest { // Management server closes the RPC stream. responseObserver.onCompleted(); + verify(configWatcher, times(4)).onError(any(Status.class)); + verify(clusterWatcher, times(4)).onError(any(Status.class)); + verify(endpointWatcher, times(4)).onError(any(Status.class)); // Resets backoff and retry immediately inOrder.verify(backoffPolicyProvider).get(); @@ -2735,6 +2757,12 @@ public class XdsClientImplTest { // Management server becomes unreachable again. responseObserver.onError(Status.UNAVAILABLE.asException()); + verify(configWatcher, times(5)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(clusterWatcher, times(5)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); + verify(endpointWatcher, times(5)).onError(statusCaptor.capture()); + assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE); inOrder.verify(backoffPolicy2).nextBackoffNanos(); assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1); @@ -3044,11 +3072,6 @@ public class XdsClientImplTest { assertThat(cdsRespTimeoutTask.getDelay(TimeUnit.SECONDS)) .isEqualTo(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC); - fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(null); - verify(clusterWatcher).onError(statusCaptor.capture()); - assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.NOT_FOUND); - // Start watching endpoint data. xdsClient.watchEndpointData("cluster-foo.googleapis.com", endpointWatcher); ScheduledTask edsTimeoutTask =