diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index e9c4d79150..742e5d2f24 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; final class PickFirstLoadBalancer extends LoadBalancer { private final Helper helper; private Subchannel subchannel; + private ConnectivityState currentState = IDLE; PickFirstLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -69,7 +70,7 @@ final class PickFirstLoadBalancer extends LoadBalancer { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); + updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -86,20 +87,34 @@ final class PickFirstLoadBalancer extends LoadBalancer { } // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine // for time being. - helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - ConnectivityState currentState = stateInfo.getState(); - if (currentState == SHUTDOWN) { + ConnectivityState newState = stateInfo.getState(); + if (newState == SHUTDOWN) { return; } - if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) { + if (newState == TRANSIENT_FAILURE || newState == IDLE) { helper.refreshNameResolution(); } + // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state + // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky + // transient failure". Only a subchannel state change to READY will get the LB out of + // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we + // keep retrying for a connection. + if (currentState == TRANSIENT_FAILURE) { + if (newState == CONNECTING) { + return; + } else if (newState == IDLE) { + requestConnection(); + return; + } + } + SubchannelPicker picker; - switch (currentState) { + switch (newState) { case IDLE: picker = new RequestConnectionPicker(subchannel); break; @@ -115,9 +130,15 @@ final class PickFirstLoadBalancer extends LoadBalancer { picker = new Picker(PickResult.withError(stateInfo.getStatus())); break; default: - throw new IllegalArgumentException("Unsupported state:" + currentState); + throw new IllegalArgumentException("Unsupported state:" + newState); } - helper.updateBalancingState(currentState, picker); + + updateBalancingState(newState, picker); + } + + private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) { + currentState = state; + helper.updateBalancingState(state, picker); } @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 8bace28958..d9eab84aaa 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -260,17 +260,17 @@ public class PickFirstLoadBalancerTest { reset(mockHelper); when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + Status error = Status.UNAVAILABLE.withDescription("boom!"); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); - inOrder.verify(mockHelper).refreshNameResolution(); - inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); @@ -279,6 +279,43 @@ public class PickFirstLoadBalancerTest { verifyNoMoreInteractions(mockHelper); } + @Test + public void pickAfterResolutionAfterTransientValue() throws Exception { + InOrder inOrder = inOrder(mockHelper); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + CreateSubchannelArgs args = createArgsCaptor.getValue(); + assertThat(args.getAddresses()).isEqualTo(servers); + verify(mockSubchannel).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel).requestConnection(); + reset(mockHelper); + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + + // An error has happened. + Status error = Status.UNAVAILABLE.withDescription("boom!"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // But a subsequent IDLE update should be ignored and the LB state not updated. Additionally, + // a request for a new connection should be made keep the subchannel trying to connect. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + verifyNoMoreInteractions(mockHelper); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + verify(mockSubchannel, times(2)).requestConnection(); + + // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verifyNoMoreInteractions(mockHelper); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + } + @Test public void nameResolutionError() throws Exception { Status error = Status.NOT_FOUND.withDescription("nameResolutionError");