From 2e9c3e19fbd50000ed964acc70f6fe814841db40 Mon Sep 17 00:00:00 2001 From: Vindhya Ningegowda Date: Tue, 8 Oct 2024 17:28:14 -0700 Subject: [PATCH] xds: Update error handling for ADS stream close and failure scenarios (#11596) When an ADS stream in closed with a non-OK status after receiving a response, new status will be updated to OK status. This makes the fail behavior consistent with gRFC A57. --- .../grpc/xds/client/ControlPlaneClient.java | 45 +++++++++------ .../io/grpc/xds/client/XdsClientImpl.java | 12 ++-- .../grpc/xds/GrpcXdsClientImplTestBase.java | 55 +++++++++++++++---- 3 files changed, 81 insertions(+), 31 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 3074d1120a..5ac979277c 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -16,7 +16,6 @@ package io.grpc.xds.client; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -60,7 +59,6 @@ import javax.annotation.Nullable; */ final class ControlPlaneClient { - public static final String CLOSED_BY_SERVER = "Closed by server"; private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; @@ -358,11 +356,7 @@ final class ControlPlaneClient { @Override public void onStatusReceived(final Status status) { syncContext.execute(() -> { - if (status.isOk()) { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); - } else { - handleRpcStreamClosed(status); - } + handleRpcStreamClosed(status); }); } @@ -381,7 +375,7 @@ final class ControlPlaneClient { processingTracker.onComplete(); } - private void handleRpcStreamClosed(Status error) { + private void handleRpcStreamClosed(Status status) { if (closed) { return; } @@ -399,15 +393,34 @@ final class ControlPlaneClient { rpcRetryTimer = syncContext.schedule( new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); - checkArgument(!error.isOk(), "unexpected OK status"); - String errorMsg = error.getDescription() != null - && error.getDescription().equals(CLOSED_BY_SERVER) - ? "ADS stream closed with status {0}: {1}. Cause: {2}" - : "ADS stream failed with status {0}: {1}. Cause: {2}"; - logger.log( - XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause()); + Status newStatus = status; + if (responseReceived) { + // A closed ADS stream after a successful response is not considered an error. Servers may + // close streams for various reasons during normal operation, such as load balancing or + // underlying connection hitting its max connection age limit (see gRFC A9). + if (!status.isOk()) { + newStatus = Status.OK; + logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a " + + "response was received, so this will not be treated as an error. Cause: {2}", + status.getCode(), status.getDescription(), status.getCause()); + } else { + logger.log(XdsLogLevel.DEBUG, + "ADS stream closed by server after a response was received"); + } + } else { + // If the ADS stream is closed without ever having received a response from the server, then + // the XdsClient should consider that a connectivity error (see gRFC A57). + if (status.isOk()) { + newStatus = Status.UNAVAILABLE.withDescription( + "ADS stream closed with OK before receiving a response"); + } + logger.log( + XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}", + newStatus.getCode(), newStatus.getDescription(), newStatus.getCause()); + } + closed = true; - xdsResponseHandler.handleStreamClosed(error); + xdsResponseHandler.handleStreamClosed(newStatus); cleanUp(); logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos); diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 79147cd986..e2380b9ed7 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -142,11 +142,13 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler public void handleStreamClosed(Status error) { syncContext.throwIfNotInThisSynchronizationContext(); cleanUpResourceTimers(); - for (Map> subscriberMap : - resourceSubscribers.values()) { - for (ResourceSubscriber subscriber : subscriberMap.values()) { - if (!subscriber.hasResult()) { - subscriber.onError(error, null); + if (!error.isOk()) { + for (Map> subscriberMap : + resourceSubscribers.values()) { + for (ResourceSubscriber subscriber : subscriberMap.values()) { + if (!subscriber.hasResult()) { + subscriber.onError(error, null); + } } } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index d41630cdb4..a9fda59918 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -3331,6 +3331,43 @@ public abstract class GrpcXdsClientImplTestBase { } } + @Test + public void streamClosedWithNoResponse() { + xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + rdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Management server closes the RPC stream before sending any response. + call.sendCompleted(); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, + "ADS stream closed with OK before receiving a response"); + verify(rdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, + "ADS stream closed with OK before receiving a response"); + } + + @Test + public void streamClosedAfterSendingResponses() { + xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + rdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + ScheduledTask ldsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + ScheduledTask rdsResourceTimeout = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); + call.sendResponse(LDS, testListenerRds, VERSION_1, "0000"); + assertThat(ldsResourceTimeout.isCancelled()).isTrue(); + call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000"); + assertThat(rdsResourceTimeout.isCancelled()).isTrue(); + // Management server closes the RPC stream after sending responses. + call.sendCompleted(); + verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); + verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); + } + @Test public void streamClosedAndRetryWithBackoff() { InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); @@ -3408,10 +3445,10 @@ public abstract class GrpcXdsClientImplTestBase { call.sendError(Status.DEADLINE_EXCEEDED.asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); - verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, ""); + verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); // Reset backoff sequence and retry after backoff. inOrder.verify(backoffPolicyProvider).get(); @@ -3430,9 +3467,9 @@ public abstract class GrpcXdsClientImplTestBase { call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture()); verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture()); - verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture()); + verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture()); + verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); // Retry after backoff. @@ -3516,10 +3553,8 @@ public abstract class GrpcXdsClientImplTestBase { assertThat(edsResourceTimeout.isCancelled()).isTrue(); verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); - verify(cdsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); - verify(edsResourceWatcher).onError(errorCaptor.capture()); - verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + verify(cdsResourceWatcher, never()).onError(errorCaptor.capture()); + verify(edsResourceWatcher, never()).onError(errorCaptor.capture()); fakeClock.forwardNanos(10L); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);