xds: Update error handling for ADS stream close and failure scenarios (#11596)

When an ADS stream in closed with a non-OK status after receiving a response, new status will be updated to OK status. This makes the fail behavior consistent with gRFC A57.
This commit is contained in:
Vindhya Ningegowda 2024-10-08 17:28:14 -07:00 committed by GitHub
parent e59ae5fad0
commit 2e9c3e19fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 81 additions and 31 deletions

View File

@ -16,7 +16,6 @@
package io.grpc.xds.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@ -60,7 +59,6 @@ import javax.annotation.Nullable;
*/
final class ControlPlaneClient {
public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
@ -358,11 +356,7 @@ final class ControlPlaneClient {
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
handleRpcStreamClosed(status);
});
}
@ -381,7 +375,7 @@ final class ControlPlaneClient {
processingTracker.onComplete();
}
private void handleRpcStreamClosed(Status error) {
private void handleRpcStreamClosed(Status status) {
if (closed) {
return;
}
@ -399,15 +393,34 @@ final class ControlPlaneClient {
rpcRetryTimer = syncContext.schedule(
new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
checkArgument(!error.isOk(), "unexpected OK status");
String errorMsg = error.getDescription() != null
&& error.getDescription().equals(CLOSED_BY_SERVER)
? "ADS stream closed with status {0}: {1}. Cause: {2}"
: "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}
closed = true;
xdsResponseHandler.handleStreamClosed(error);
xdsResponseHandler.handleStreamClosed(newStatus);
cleanUp();
logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);

View File

@ -142,11 +142,13 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
public void handleStreamClosed(Status error) {
syncContext.throwIfNotInThisSynchronizationContext();
cleanUpResourceTimers();
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
if (!error.isOk()) {
for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error, null);
}
}
}
}

View File

@ -3331,6 +3331,43 @@ public abstract class GrpcXdsClientImplTestBase {
}
}
@Test
public void streamClosedWithNoResponse() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
// Management server closes the RPC stream before sending any response.
call.sendCompleted();
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
.onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
"ADS stream closed with OK before receiving a response");
}
@Test
public void streamClosedAfterSendingResponses() {
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher);
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE,
rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
ScheduledTask ldsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
ScheduledTask rdsResourceTimeout =
Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
// Management server closes the RPC stream after sending responses.
call.sendCompleted();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
}
@Test
public void streamClosedAndRetryWithBackoff() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@ -3408,10 +3445,10 @@ public abstract class GrpcXdsClientImplTestBase {
call.sendError(Status.DEADLINE_EXCEEDED.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.DEADLINE_EXCEEDED, "");
verify(cdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg);
// Reset backoff sequence and retry after backoff.
inOrder.verify(backoffPolicyProvider).get();
@ -3430,9 +3467,9 @@ public abstract class GrpcXdsClientImplTestBase {
call.sendError(Status.UNAVAILABLE.asException());
verify(ldsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(rdsResourceWatcher, times(2)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(cdsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher, times(4)).onError(errorCaptor.capture());
verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
// Retry after backoff.
@ -3516,10 +3553,8 @@ public abstract class GrpcXdsClientImplTestBase {
assertThat(edsResourceTimeout.isCancelled()).isTrue();
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(cdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(edsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(cdsResourceWatcher, never()).onError(errorCaptor.capture());
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());
fakeClock.forwardNanos(10L);
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0);