xds: better error handling to avoid RPC hangup

This change will fail application RPC immediately if XdsClient encounters any error instead of retrying or getting to fallback silently.

There could be optimization if the channel is currently READY while XdsClient stream just closed due to connection error, in which case we could still be using the current available subchannels while retrying, but this requires the LB knows the semantics of error status from the XdsClient. This optimization is not worth the effort for now.
This commit is contained in:
ZHANG Dapeng 2020-01-22 15:38:38 -08:00 committed by GitHub
parent ee661d45eb
commit a223263134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 0 deletions

View File

@ -502,6 +502,7 @@ final class LookasideLb extends LoadBalancer {
@Override @Override
public void onError(Status error) { public void onError(Status error) {
channelLogger.log(ChannelLogLevel.ERROR, "EDS load balancer received an error: {0}", error); channelLogger.log(ChannelLogLevel.ERROR, "EDS load balancer received an error: {0}", error);
lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
endpointUpdateCallback.onError(); endpointUpdateCallback.onError();
} }
} }

View File

@ -28,6 +28,7 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LookasideLb.EndpointUpdateCallback; import io.grpc.xds.LookasideLb.EndpointUpdateCallback;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull; import javax.annotation.CheckForNull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -118,6 +119,10 @@ final class XdsLoadBalancer2 extends LoadBalancer {
@Override @Override
public void run() { public void run() {
helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE,
new ErrorPicker(Status.UNAVAILABLE.withDescription(
"Channel is not ready when timeout for entering fallback mode happens")));
useFallbackPolicy(); useFallbackPolicy();
} }
} }

View File

@ -787,8 +787,11 @@ public class LookasideLbTest {
public void verifyRpcErrorPropagation() { public void verifyRpcErrorPropagation() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress); lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(edsUpdateCallback, never()).onError(); verify(edsUpdateCallback, never()).onError();
serverResponseWriter.onError(new RuntimeException()); serverResponseWriter.onError(new RuntimeException());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(edsUpdateCallback).onError(); verify(edsUpdateCallback).onError();
} }

View File

@ -19,6 +19,9 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same; import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -144,7 +147,9 @@ public class XdsLoadBalancer2Test {
fakeClock.forwardTime(9, TimeUnit.SECONDS); fakeClock.forwardTime(9, TimeUnit.SECONDS);
edsUpdateCallback.onWorking(); edsUpdateCallback.onWorking();
verifyNotInFallbackMode(); verifyNotInFallbackMode();
fakeClock.forwardTime(1, TimeUnit.SECONDS); fakeClock.forwardTime(1, TimeUnit.SECONDS);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verifyInFallbackMode(); verifyInFallbackMode();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);