Always start from the first address after a successful connection.

This commit is contained in:
Kun Zhang 2016-02-26 15:01:19 -08:00
parent 86bad4ffea
commit 643bb2c6b5
3 changed files with 34 additions and 63 deletions

View File

@ -77,12 +77,11 @@ final class TransportSet {
@GuardedBy("lock") @GuardedBy("lock")
private BackoffPolicy reconnectPolicy; private BackoffPolicy reconnectPolicy;
// The address index from which the current series of consecutive failing connection attempts // True if the next connect attempt is the first attempt ever, or the one right after a successful
// started. -1 means the current series have not started. // connection (i.e., transportReady() was called). If true, the next connect attempt will start
// In the case of consecutive failures, the time between two attempts for this address is // from the first address and will reset back-off.
// controlled by connectPolicy.
@GuardedBy("lock") @GuardedBy("lock")
private int headIndex = -1; private boolean firstAttempt = true;
@GuardedBy("lock") @GuardedBy("lock")
private final Stopwatch backoffWatch; private final Stopwatch backoffWatch;
@ -176,6 +175,9 @@ final class TransportSet {
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(), Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
"previous reconnectTask is not done"); "previous reconnectTask is not done");
if (firstAttempt) {
nextAddressIndex = 0;
}
final int currentAddressIndex = nextAddressIndex; final int currentAddressIndex = nextAddressIndex;
List<SocketAddress> addrs = addressGroup.getAddresses(); List<SocketAddress> addrs = addressGroup.getAddresses();
final SocketAddress address = addrs.get(currentAddressIndex); final SocketAddress address = addrs.get(currentAddressIndex);
@ -192,7 +194,7 @@ final class TransportSet {
boolean savedShutdown; boolean savedShutdown;
synchronized (lock) { synchronized (lock) {
savedShutdown = shutdown; savedShutdown = shutdown;
if (currentAddressIndex == headIndex) { if (currentAddressIndex == 0) {
backoffWatch.reset().start(); backoffWatch.reset().start();
} }
newActiveTransport = transportFactory.newClientTransport(address, authority); newActiveTransport = transportFactory.newClientTransport(address, authority);
@ -224,19 +226,18 @@ final class TransportSet {
} }
}; };
long delayMillis; long delayMillis = 0;
if (currentAddressIndex == headIndex) { if (currentAddressIndex == 0) {
// Back to the first attempted address. Calculate back-off delay. if (firstAttempt) {
delayMillis =
reconnectPolicy.nextBackoffMillis() - backoffWatch.elapsed(TimeUnit.MILLISECONDS);
} else {
delayMillis = 0;
if (headIndex == -1) {
// First connect attempt, or the first attempt since last successful connection. // First connect attempt, or the first attempt since last successful connection.
headIndex = currentAddressIndex;
reconnectPolicy = backoffPolicyProvider.get(); reconnectPolicy = backoffPolicyProvider.get();
} else {
// Back to the first address. Calculate back-off delay.
delayMillis =
reconnectPolicy.nextBackoffMillis() - backoffWatch.elapsed(TimeUnit.MILLISECONDS);
} }
} }
firstAttempt = false;
if (delayMillis <= 0) { if (delayMillis <= 0) {
reconnectTask = null; reconnectTask = null;
// No back-off this time. // No back-off this time.
@ -333,7 +334,7 @@ final class TransportSet {
super.transportReady(); super.transportReady();
synchronized (lock) { synchronized (lock) {
if (isAttachedToActiveTransport()) { if (isAttachedToActiveTransport()) {
headIndex = -1; firstAttempt = true;
} }
} }
loadBalancer.handleTransportReady(addressGroup); loadBalancer.handleTransportReady(addressGroup);

View File

@ -248,7 +248,7 @@ public class ManagedChannelImplTransportManagerTest {
// Second pick fails. This is the beginning of a series of failures. // Second pick fails. This is the beginning of a series of failures.
ClientTransport t2 = tm.getTransport(addressGroup); ClientTransport t2 = tm.getTransport(addressGroup);
assertNotNull(t2); assertNotNull(t2);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was reset. // Back-off policy was reset.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -256,15 +256,15 @@ public class ManagedChannelImplTransportManagerTest {
// Third pick fails too // Third pick fails too
ClientTransport t3 = tm.getTransport(addressGroup); ClientTransport t3 = tm.getTransport(addressGroup);
assertNotNull(t3); assertNotNull(t3);
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Back-off policy was not reset. // Back-off policy was not reset.
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth pick is on addr2, back-off policy kicks in. // Forth pick is on the first address, back-off policy kicks in.
ClientTransport t4 = tm.getTransport(addressGroup); ClientTransport t4 = tm.getTransport(addressGroup);
assertNotNull(t4); assertNotNull(t4);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was not reset, but was consulted. // Back-off policy was not reset, but was consulted.
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis(); verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis();

View File

@ -176,82 +176,52 @@ public class TransportSetTest {
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let this one through // Let this one fail without success
transports.peek().listener.transportReady();
// Then shut it down
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr2 is the head.
// First attempt after a connection closed. Reset back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy. // Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too // Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval. // Third attempt is the first address, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
fakeClock.forwardMillis(9); fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority);
fakeClock.forwardMillis(1); fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too // Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth attempt will start immediately. Keep back-off policy. // Forth attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too // Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Fifth attempt is on head, thus controlled by the second back-off interval. // Fifth attempt for the first address, thus controlled by the second back-off interval.
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
fakeClock.forwardMillis(99); fakeClock.forwardMillis(99);
verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority);
fakeClock.forwardMillis(1); fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let it through // Let it through
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
// Then close it. // Then close it.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// First attempt after a successful connection. Reset back-off policy, and start from the first
////// Now start a series of failing attempts, where addr1 is the head. // address.
// First attempt after a connection closed. Reset back-off policy.
transportSet.obtainActiveTransport(); transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicy3, times(++backoff3Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority);
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Final checks on invocations on back-off policies // Final checks on invocations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis(); verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();