diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 0fa5784765..d3f884e383 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -77,12 +77,11 @@ final class TransportSet { @GuardedBy("lock") private BackoffPolicy reconnectPolicy; - // The address index from which the current series of consecutive failing connection attempts - // started. -1 means the current series have not started. - // In the case of consecutive failures, the time between two attempts for this address is - // controlled by connectPolicy. + // True if the next connect attempt is the first attempt ever, or the one right after a successful + // connection (i.e., transportReady() was called). If true, the next connect attempt will start + // from the first address and will reset back-off. @GuardedBy("lock") - private int headIndex = -1; + private boolean firstAttempt = true; @GuardedBy("lock") private final Stopwatch backoffWatch; @@ -176,6 +175,9 @@ final class TransportSet { Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(), "previous reconnectTask is not done"); + if (firstAttempt) { + nextAddressIndex = 0; + } final int currentAddressIndex = nextAddressIndex; List addrs = addressGroup.getAddresses(); final SocketAddress address = addrs.get(currentAddressIndex); @@ -192,7 +194,7 @@ final class TransportSet { boolean savedShutdown; synchronized (lock) { savedShutdown = shutdown; - if (currentAddressIndex == headIndex) { + if (currentAddressIndex == 0) { backoffWatch.reset().start(); } newActiveTransport = transportFactory.newClientTransport(address, authority); @@ -224,19 +226,18 @@ final class TransportSet { } }; - long delayMillis; - if (currentAddressIndex == headIndex) { - // Back to the first attempted address. Calculate back-off delay. - delayMillis = - reconnectPolicy.nextBackoffMillis() - backoffWatch.elapsed(TimeUnit.MILLISECONDS); - } else { - delayMillis = 0; - if (headIndex == -1) { + long delayMillis = 0; + if (currentAddressIndex == 0) { + if (firstAttempt) { // First connect attempt, or the first attempt since last successful connection. - headIndex = currentAddressIndex; reconnectPolicy = backoffPolicyProvider.get(); + } else { + // Back to the first address. Calculate back-off delay. + delayMillis = + reconnectPolicy.nextBackoffMillis() - backoffWatch.elapsed(TimeUnit.MILLISECONDS); } } + firstAttempt = false; if (delayMillis <= 0) { reconnectTask = null; // No back-off this time. @@ -333,7 +334,7 @@ final class TransportSet { super.transportReady(); synchronized (lock) { if (isAttachedToActiveTransport()) { - headIndex = -1; + firstAttempt = true; } } loadBalancer.handleTransportReady(addressGroup); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index 27605559e9..45f1a391b0 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -248,7 +248,7 @@ public class ManagedChannelImplTransportManagerTest { // Second pick fails. This is the beginning of a series of failures. ClientTransport t2 = tm.getTransport(addressGroup); assertNotNull(t2); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Back-off policy was reset. verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -256,15 +256,15 @@ public class ManagedChannelImplTransportManagerTest { // Third pick fails too ClientTransport t3 = tm.getTransport(addressGroup); assertNotNull(t3); - verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Back-off policy was not reset. verify(mockBackoffPolicyProvider, times(backoffReset)).get(); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // Forth pick is on addr2, back-off policy kicks in. + // Forth pick is on the first address, back-off policy kicks in. ClientTransport t4 = tm.getTransport(addressGroup); assertNotNull(t4); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Back-off policy was not reset, but was consulted. verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis(); diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index a7ed5f9c52..702a84b459 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -176,82 +176,52 @@ public class TransportSetTest { transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); - // Let this one through - transports.peek().listener.transportReady(); - // Then shut it down - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - - - ////// Now start a series of failing attempts, where addr2 is the head. - // First attempt after a connection closed. Reset back-off policy. - transportSet.obtainActiveTransport(); - verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); - // Fail this one + // Let this one fail without success transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Second attempt will start immediately. Keep back-off policy. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); - verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // Third attempt is on head, thus controlled by the first back-off interval. + // Third attempt is the first address, thus controlled by the first back-off interval. transportSet.obtainActiveTransport(); - verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); fakeClock.forwardMillis(9); - verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority); fakeClock.forwardMillis(1); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Forth attempt will start immediately. Keep back-off policy. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); - verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // Fifth attempt is on head, thus controlled by the second back-off interval. + // Fifth attempt for the first address, thus controlled by the second back-off interval. transportSet.obtainActiveTransport(); - verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); fakeClock.forwardMillis(99); - verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority); fakeClock.forwardMillis(1); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Let it through transports.peek().listener.transportReady(); // Then close it. transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - - ////// Now start a series of failing attempts, where addr1 is the head. - // First attempt after a connection closed. Reset back-off policy. + // First attempt after a successful connection. Reset back-off policy, and start from the first + // address. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); - // Fail this one - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - - // Second attempt will start immediately. Keep back-off policy. - transportSet.obtainActiveTransport(); - verify(mockBackoffPolicyProvider, times(backoffReset)).get(); - verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); - // Fail this one too - transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - - // Third attempt is on head, thus controlled by the first back-off interval. - transportSet.obtainActiveTransport(); - verify(mockBackoffPolicy3, times(++backoff3Consulted)).nextBackoffMillis(); - verify(mockBackoffPolicyProvider, times(backoffReset)).get(); - fakeClock.forwardMillis(9); - verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority); - fakeClock.forwardMillis(1); - verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Final checks on invocations on back-off policies verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();