diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 0fbce5fa5b..6d74006b39 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -121,6 +121,12 @@ public abstract class LoadBalancer { HEALTH_CONSUMER_LISTENER_ARG_KEY = LoadBalancer.CreateSubchannelArgs.Key.create("internal:health-check-consumer-listener"); + @Internal + public static final LoadBalancer.CreateSubchannelArgs.Key + DISABLE_SUBCHANNEL_RECONNECT_KEY = + LoadBalancer.CreateSubchannelArgs.Key.createWithDefault( + "internal:disable-subchannel-reconnect", Boolean.FALSE); + @Internal public static final Attributes.Key HAS_HEALTH_PRODUCER_LISTENER_KEY = diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 70e42e2f5f..27a80f7c19 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -45,6 +45,7 @@ import io.grpc.InternalChannelz.ChannelStats; import io.grpc.InternalInstrumented; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.LoadBalancer; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -77,6 +78,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr private final CallTracer callsTracer; private final ChannelTracer channelTracer; private final ChannelLogger channelLogger; + private final boolean reconnectDisabled; private final List transportFilters; @@ -159,13 +161,15 @@ final class InternalSubchannel implements InternalInstrumented, Tr private volatile Attributes connectedAddressAttributes; - InternalSubchannel(List addressGroups, String authority, String userAgent, - BackoffPolicy.Provider backoffPolicyProvider, - ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, - Supplier stopwatchSupplier, SynchronizationContext syncContext, Callback callback, - InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer, - InternalLogId logId, ChannelLogger channelLogger, - List transportFilters) { + InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent, + BackoffPolicy.Provider backoffPolicyProvider, + ClientTransportFactory transportFactory, + ScheduledExecutorService scheduledExecutor, + Supplier stopwatchSupplier, SynchronizationContext syncContext, + Callback callback, InternalChannelz channelz, CallTracer callsTracer, + ChannelTracer channelTracer, InternalLogId logId, + ChannelLogger channelLogger, List transportFilters) { + List addressGroups = args.getAddresses(); Preconditions.checkNotNull(addressGroups, "addressGroups"); Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); checkListHasNoNulls(addressGroups, "addressGroups contains null entry"); @@ -187,6 +191,7 @@ final class InternalSubchannel implements InternalInstrumented, Tr this.logId = Preconditions.checkNotNull(logId, "logId"); this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); this.transportFilters = transportFilters; + this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY); } ChannelLogger getChannelLogger() { @@ -289,6 +294,11 @@ final class InternalSubchannel implements InternalInstrumented, Tr } gotoState(ConnectivityStateInfo.forTransientFailure(status)); + + if (reconnectDisabled) { + return; + } + if (reconnectPolicy == null) { reconnectPolicy = backoffPolicyProvider.get(); } @@ -337,7 +347,11 @@ final class InternalSubchannel implements InternalInstrumented, Tr if (state.getState() != newState.getState()) { Preconditions.checkState(state.getState() != SHUTDOWN, "Cannot transition out of SHUTDOWN to " + newState); - state = newState; + if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) { + state = ConnectivityStateInfo.forNonError(IDLE); + } else { + state = newState; + } callback.onStateChange(InternalSubchannel.this, newState); } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7e36086ac9..36d79f4011 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -1483,7 +1483,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } final InternalSubchannel internalSubchannel = new InternalSubchannel( - addressGroup, + CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(), authority, userAgent, backoffPolicyProvider, oobTransportFactory, oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext, // All callback methods are run from syncContext @@ -1915,7 +1915,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } final InternalSubchannel internalSubchannel = new InternalSubchannel( - args.getAddresses(), + args, authority(), userAgent, backoffPolicyProvider, diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index bfa462e16e..6f4794fdd4 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { private int numTf = 0; private boolean firstPass = true; @Nullable - private ScheduledHandle scheduleConnectionTask; + private ScheduledHandle scheduleConnectionTask = null; private ConnectivityState rawConnectivityState = IDLE; private ConnectivityState concludedState = IDLE; - private final boolean enableHappyEyeballs = - PickFirstLoadBalancerProvider.isEnabledHappyEyeballs(); + private final boolean enableHappyEyeballs = !isSerializingRetries() + && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs(); private boolean notAPetiolePolicy = true; // means not under a petiole policy + private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider(); + private BackoffPolicy reconnectPolicy; + @Nullable + private ScheduledHandle reconnectTask = null; + private final boolean serializingRetries = isSerializingRetries(); PickFirstLeafLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); } + static boolean isSerializingRetries() { + return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false); + } + @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (rawConnectivityState == SHUTDOWN) { @@ -225,9 +234,10 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return; } - if (newState == IDLE) { + if (newState == IDLE && subchannelData.state == READY) { helper.refreshNameResolution(); } + // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky // transient failure". Only a subchannel state change to READY will get the LB out of @@ -277,6 +287,8 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { if (addressIndex.increment()) { cancelScheduleTask(); requestConnection(); // is recursive so might hit the end of the addresses + } else { + scheduleBackoff(); } } @@ -304,6 +316,39 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } } + /** + * Only called after all addresses attempted and failed (TRANSIENT_FAILURE). + */ + private void scheduleBackoff() { + if (!serializingRetries) { + return; + } + + class EndOfCurrentBackoff implements Runnable { + @Override + public void run() { + reconnectTask = null; + addressIndex.reset(); + requestConnection(); + } + } + + // Just allow the previous one to trigger when ready if we're already in backoff + if (reconnectTask != null) { + return; + } + + if (reconnectPolicy == null) { + reconnectPolicy = bkoffPolProvider.get(); + } + long delayNanos = reconnectPolicy.nextBackoffNanos(); + reconnectTask = helper.getSynchronizationContext().schedule( + new EndOfCurrentBackoff(), + delayNanos, + TimeUnit.NANOSECONDS, + helper.getScheduledExecutorService()); + } + private void updateHealthCheckedState(SubchannelData subchannelData) { if (subchannelData.state != READY) { return; @@ -337,6 +382,11 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { rawConnectivityState = SHUTDOWN; concludedState = SHUTDOWN; cancelScheduleTask(); + if (reconnectTask != null) { + reconnectTask.cancel(); + reconnectTask = null; + } + reconnectPolicy = null; for (SubchannelData subchannelData : subchannels.values()) { subchannelData.getSubchannel().shutdown(); @@ -350,6 +400,12 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { * that all other subchannels must be shutdown. */ private void shutdownRemaining(SubchannelData activeSubchannelData) { + if (reconnectTask != null) { + reconnectTask.cancel(); + reconnectTask = null; + } + reconnectPolicy = null; + cancelScheduleTask(); for (SubchannelData subchannelData : subchannels.values()) { if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) { @@ -391,8 +447,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { scheduleNextConnection(); break; case TRANSIENT_FAILURE: - addressIndex.increment(); - requestConnection(); + if (!serializingRetries) { + addressIndex.increment(); + requestConnection(); + } else { + if (!addressIndex.isValid()) { + scheduleBackoff(); + } else { + subchannelData.subchannel.requestConnection(); + subchannelData.updateState(CONNECTING); + } + } break; default: // Wait for current subchannel to change state @@ -438,9 +503,10 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { HealthListener hcListener = new HealthListener(); final Subchannel subchannel = helper.createSubchannel( CreateSubchannelArgs.newBuilder() - .setAddresses(Lists.newArrayList( - new EquivalentAddressGroup(addr, attrs))) - .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener) + .setAddresses(Lists.newArrayList( + new EquivalentAddressGroup(addr, attrs))) + .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener) + .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries) .build()); if (subchannel == null) { log.warning("Was not able to create subchannel for " + addr); @@ -458,7 +524,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } private boolean isPassComplete() { - if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) { + if (subchannels.size() < addressIndex.size()) { return false; } for (SubchannelData sc : subchannels.values()) { @@ -646,6 +712,16 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } } + @VisibleForTesting + int getGroupIndex() { + return addressIndex.groupIndex; + } + + @VisibleForTesting + boolean isIndexValid() { + return addressIndex.isValid(); + } + private static final class SubchannelData { private final Subchannel subchannel; private ConnectivityState state; diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index e4d9f27ed4..b75fd43a74 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -46,6 +46,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.InternalChannelz; import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; +import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.InternalSubchannel.CallTracingTransport; @@ -309,10 +310,57 @@ public class InternalSubchannelTest { verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos(); } + @Test public void twoAddressesReconnectDisabled() { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + createInternalSubchannel(true, + new EquivalentAddressGroup(Arrays.asList(addr1, addr2))); + assertEquals(IDLE, internalSubchannel.getState()); + + assertNull(internalSubchannel.obtainActiveTransport()); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(CONNECTING, internalSubchannel.getState()); + verify(mockTransportFactory).newClientTransport(eq(addr1), any(), any()); + // Let this one fail without success + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // Still in CONNECTING + assertNull(internalSubchannel.obtainActiveTransport()); + assertNoCallbackInvoke(); + assertEquals(CONNECTING, internalSubchannel.getState()); + + // Second attempt will start immediately. Still no back-off policy. + verify(mockBackoffPolicyProvider, times(0)).get(); + verify(mockTransportFactory, times(1)) + .newClientTransport( + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); + assertNull(internalSubchannel.obtainActiveTransport()); + // Fail this one too + assertNoCallbackInvoke(); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // All addresses have failed, but we aren't controlling retries. + assertEquals(IDLE, internalSubchannel.getState()); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + // Backoff reset and first back-off interval begins + verify(mockBackoffPolicy1, never()).nextBackoffNanos(); + verify(mockBackoffPolicyProvider, never()).get(); + assertTrue("Nothing should have been scheduled", fakeClock.getPendingTasks().isEmpty()); + + // Should follow orders and create an active transport. + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(CONNECTING, internalSubchannel.getState()); + + // Shouldn't have anything scheduled, so shouldn't do anything + assertTrue("Nothing should have been scheduled 2", fakeClock.getPendingTasks().isEmpty()); + } + @Test public void twoAddressesReconnect() { SocketAddress addr1 = mock(SocketAddress.class); SocketAddress addr2 = mock(SocketAddress.class); - createInternalSubchannel(addr1, addr2); + createInternalSubchannel(false, + new EquivalentAddressGroup(Arrays.asList(addr1, addr2))); assertEquals(IDLE, internalSubchannel.getState()); // Invocation counters int transportsAddr1 = 0; @@ -1377,11 +1425,24 @@ public class InternalSubchannelTest { } private void createInternalSubchannel(EquivalentAddressGroup ... addrs) { + createInternalSubchannel(false, addrs); + } + + private void createInternalSubchannel(boolean reconnectDisabled, + EquivalentAddressGroup ... addrs) { List addressGroups = Arrays.asList(addrs); InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY); ChannelTracer subchannelTracer = new ChannelTracer(logId, 10, fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel"); - internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT, + LoadBalancer.CreateSubchannelArgs.Builder argBuilder = + LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups); + if (reconnectDisabled) { + argBuilder.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, reconnectDisabled); + } + LoadBalancer.CreateSubchannelArgs createSubchannelArgs = argBuilder.build(); + internalSubchannel = new InternalSubchannel( + createSubchannelArgs, + AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz, CallTracer.getDefaultFactory().create(), diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index 63915bddc9..f0031a6ae6 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -27,10 +27,12 @@ import static io.grpc.LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY; import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY; import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY; import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS; +import static io.grpc.internal.PickFirstLeafLoadBalancer.isSerializingRetries; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assume.assumeTrue; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -73,7 +75,6 @@ import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -92,14 +93,22 @@ import org.mockito.junit.MockitoRule; public class PickFirstLeafLoadBalancerTest { public static final Status CONNECTION_ERROR = Status.UNAVAILABLE.withDescription("Simulated connection error"); + public static final String GRPC_SERIALIZE_RETRIES = "GRPC_SERIALIZE_RETRIES"; - @Parameterized.Parameters(name = "{0}") - public static List enableHappyEyeballs() { - return Arrays.asList(true, false); + @Parameterized.Parameters(name = "{0}-{1}") + public static List data() { + return Arrays.asList(new Object[][] { + {false, false}, + {false, true}, + {true, false}}); } - @Parameterized.Parameter + @Parameterized.Parameter(value = 0) + public boolean serializeRetries; + + @Parameterized.Parameter(value = 1) public boolean enableHappyEyeballs; + private PickFirstLeafLoadBalancer loadBalancer; private final List servers = Lists.newArrayList(); private static final Attributes.Key FOO = Attributes.Key.create("foo"); @@ -137,13 +146,22 @@ public class PickFirstLeafLoadBalancerTest { private PickSubchannelArgs mockArgs; private String originalHappyEyeballsEnabledValue; + private String originalSerializeRetriesValue; + + private long backoffMillis; @Before public void setUp() { + assumeTrue(!serializeRetries || !enableHappyEyeballs); // they are not compatible + + backoffMillis = TimeUnit.SECONDS.toMillis(1); + originalSerializeRetriesValue = System.getProperty(GRPC_SERIALIZE_RETRIES); + System.setProperty(GRPC_SERIALIZE_RETRIES, Boolean.toString(serializeRetries)); + originalHappyEyeballsEnabledValue = System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS); System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS, - enableHappyEyeballs ? "true" : "false"); + Boolean.toString(enableHappyEyeballs)); for (int i = 1; i <= 5; i++) { SocketAddress addr = new FakeSocketAddress("server" + i); @@ -176,6 +194,11 @@ public class PickFirstLeafLoadBalancerTest { @After public void tearDown() { + if (originalSerializeRetriesValue == null) { + System.clearProperty(GRPC_SERIALIZE_RETRIES); + } else { + System.setProperty(GRPC_SERIALIZE_RETRIES, originalSerializeRetriesValue); + } if (originalHappyEyeballsEnabledValue == null) { System.clearProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS); } else { @@ -498,6 +521,9 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs) .getSubchannel()).isSameInstanceAs(mockSubchannel1); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verifyNoMoreInteractions(mockHelper); healthListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); verifyNoMoreInteractions(mockHelper); @@ -520,20 +546,7 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); stateListeners[0] = stateListenerCaptor.getValue(); - if (enableHappyEyeballs) { - forwardTimeByConnectionDelay(); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - stateListeners[1] = stateListenerCaptor.getValue(); - forwardTimeByConnectionDelay(); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - stateListeners[2] = stateListenerCaptor.getValue(); - forwardTimeByConnectionDelay(); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); - stateListeners[3] = stateListenerCaptor.getValue(); - } - - reset(mockHelper); - + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(READY)); stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); @@ -543,11 +556,23 @@ public class PickFirstLeafLoadBalancerTest { stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); Status error = Status.UNAVAILABLE.withDescription("boom!"); + reset(mockHelper); if (enableHappyEyeballs) { - for (SubchannelStateListener listener : stateListeners) { - listener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - } + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListeners[1] = stateListenerCaptor.getValue(); + stateListeners[1].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + stateListeners[2] = stateListenerCaptor.getValue(); + stateListeners[2].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + stateListeners[3] = stateListenerCaptor.getValue(); + stateListeners[3].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + forwardTimeByConnectionDelay(); } else { stateListeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); for (int i = 1; i < stateListeners.length; i++) { @@ -589,6 +614,8 @@ public class PickFirstLeafLoadBalancerTest { // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); } @@ -619,6 +646,8 @@ public class PickFirstLeafLoadBalancerTest { // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -651,6 +680,8 @@ public class PickFirstLeafLoadBalancerTest { // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); verifyNoMoreInteractions(mockHelper); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -1518,6 +1549,8 @@ public class PickFirstLeafLoadBalancerTest { @Test public void updateAddresses_intersecting_transient_failure() { + assumeTrue(!isSerializingRetries()); + // Starting first connection attempt InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); // captor: captures @@ -1782,6 +1815,8 @@ public class PickFirstLeafLoadBalancerTest { @Test public void updateAddresses_identical_transient_failure() { + assumeTrue(!isSerializingRetries()); + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); // Creating first set of endpoints/addresses @@ -2295,7 +2330,7 @@ public class PickFirstLeafLoadBalancerTest { @Test public void happy_eyeballs_trigger_connection_delay() { - Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs // Starting first connection attempt InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); @@ -2340,7 +2375,7 @@ public class PickFirstLeafLoadBalancerTest { @Test public void happy_eyeballs_connection_results_happen_after_get_to_end() { - Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); Status error = Status.UNAUTHENTICATED.withDescription("simulated failure"); @@ -2393,7 +2428,7 @@ public class PickFirstLeafLoadBalancerTest { @Test public void happy_eyeballs_pick_pushes_index_over_end() { - Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel2n2, mockSubchannel3n2); @@ -2471,7 +2506,7 @@ public class PickFirstLeafLoadBalancerTest { @Test public void happy_eyeballs_fail_then_trigger_connection_delay() { - Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs // Starting first connection attempt InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); @@ -2550,6 +2585,44 @@ public class PickFirstLeafLoadBalancerTest { loadBalancer.requestConnection(); // should be handled without throwing exception } + @Test + public void serialized_retries_two_passes() { + assumeTrue(serializeRetries); // This test is only for serialized retries + + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + Status error = Status.UNAUTHENTICATED.withDescription("simulated failure"); + + List addrs = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + Subchannel[] subchannels = new Subchannel[]{mockSubchannel1, mockSubchannel2, mockSubchannel3}; + SubchannelStateListener[] listeners = new SubchannelStateListener[subchannels.length]; + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).build()); + forwardTimeByConnectionDelay(2); + for (int i = 0; i < subchannels.length; i++) { + inOrder.verify(subchannels[i]).start(stateListenerCaptor.capture()); + inOrder.verify(subchannels[i]).requestConnection(); + listeners[i] = stateListenerCaptor.getValue(); + listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + } + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + assertFalse("Index should be at end", loadBalancer.isIndexValid()); + + forwardTimeByBackoffDelay(); // should trigger retry + for (int i = 0; i < subchannels.length; i++) { + inOrder.verify(subchannels[i]).requestConnection(); + listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade + } + inOrder.verify(subchannels[0], never()).requestConnection(); // should wait for backoff delay + + forwardTimeByBackoffDelay(); // should trigger retry again + for (int i = 0; i < subchannels.length; i++) { + inOrder.verify(subchannels[i]).requestConnection(); + assertEquals(i, loadBalancer.getGroupIndex()); + listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade + } + } + @Test public void index_looping() { Attributes.Key key = Attributes.Key.create("some-key"); @@ -2689,6 +2762,11 @@ public class PickFirstLeafLoadBalancerTest { } } + private void forwardTimeByBackoffDelay() { + backoffMillis = (long) (backoffMillis * 1.8); // backoff factor default is 1.6 with Jitter .2 + fakeClock.forwardTime(backoffMillis, TimeUnit.MILLISECONDS); + } + private void acceptXSubchannels(int num) { List newServers = new ArrayList<>(); for (int i = 0; i < num; i++) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 047ba71bbe..dd7de3691a 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -150,7 +150,8 @@ public class RingHashLoadBalancerTest { assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; + int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() + && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; verify(subchannel, times(expectedTimes)).requestConnection(); verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -184,7 +185,8 @@ public class RingHashLoadBalancerTest { pickerCaptor.getValue().pickSubchannel(args); Subchannel subchannel = subchannels.get(Collections.singletonList(childLbState.getEag())); InOrder inOrder = Mockito.inOrder(helper, subchannel); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; + int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() + || !PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 2 : 1; inOrder.verify(subchannel, times(expectedTimes)).requestConnection(); deliverSubchannelState(subchannel, CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); @@ -443,8 +445,10 @@ public class RingHashLoadBalancerTest { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; // verify kicked off connection to server2 + int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() + || !PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 2 : 1; + verify(getSubChannel(servers.get(1)), times(expectedTimes)).requestConnection(); assertThat(subchannels.size()).isEqualTo(2); // no excessive connection