diff --git a/core/src/main/java/io/grpc/EquivalentAddressGroup.java b/core/src/main/java/io/grpc/EquivalentAddressGroup.java new file mode 100644 index 0000000000..ef64a90be1 --- /dev/null +++ b/core/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A group of {@link SocketAddress}es that are considered equivalent when channel makes connections. + * + *

Usually the addresses are addresses resolved from the same host name, and connecting to any of + * them is equally sufficient. They do have order. An address appears earlier on the list is likely + * to be tried earlier. + */ +@ExperimentalApi +public final class EquivalentAddressGroup { + + private final List addrs; + + public EquivalentAddressGroup(List addrs) { + this.addrs = Collections.unmodifiableList(new ArrayList(addrs)); + } + + public EquivalentAddressGroup(SocketAddress addr) { + this.addrs = Collections.singletonList(addr); + } + + /** + * Returns an immutable list of the addresses. + */ + public List getAddresses() { + return addrs; + } + + @Override + public String toString() { + return addrs.toString(); + } + + @Override + public int hashCode() { + return addrs.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof EquivalentAddressGroup)) { + return false; + } + return addrs.equals(((EquivalentAddressGroup) other).addrs); + } +} diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 3449021dab..c8cbae0b45 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.grpc.internal.ClientTransport; -import java.net.SocketAddress; import java.util.List; import javax.annotation.Nullable; @@ -83,12 +82,13 @@ public abstract class LoadBalancer { /** * Called when a transport is fully connected and ready to accept traffic. */ - public void transportReady(SocketAddress addr, ClientTransport transport) { } + public void transportReady(EquivalentAddressGroup addressGroup, ClientTransport transport) { } /** * Called when a transport is shutting down. */ - public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { } + public void transportShutdown( + EquivalentAddressGroup addressGroup, ClientTransport transport, Status s) { } public abstract static class Factory { /** diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java index a0855489b1..86d2c3efb6 100644 --- a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java @@ -68,14 +68,14 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { } private static class SimpleLoadBalancer extends LoadBalancer { - @GuardedBy("servers") - private final List servers = new ArrayList(); - @GuardedBy("servers") - private int currentServerIndex; - @GuardedBy("servers") + private final Object lock = new Object(); + + @GuardedBy("lock") + private EquivalentAddressGroup addresses; + @GuardedBy("lock") private final BlankFutureProvider pendingPicks = new BlankFutureProvider(); - @GuardedBy("servers") + @GuardedBy("lock") private StatusException nameResolutionError; private final TransportManager tm; @@ -86,42 +86,41 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { @Override public ListenableFuture pickTransport(@Nullable RequestKey requestKey) { - ResolvedServerInfo currentServer; - synchronized (servers) { - if (servers.isEmpty()) { + EquivalentAddressGroup addressesCopy; + synchronized (lock) { + addressesCopy = addresses; + if (addressesCopy == null) { if (nameResolutionError != null) { return Futures.immediateFailedFuture(nameResolutionError); } return pendingPicks.newBlankFuture(); } - currentServer = servers.get(currentServerIndex); } - return tm.getTransport(currentServer.getAddress()); + return tm.getTransport(addressesCopy); } @Override public void handleResolvedAddresses( List updatedServers, Attributes config) { BlankFutureProvider.FulfillmentBatch pendingPicksFulfillmentBatch; - final ResolvedServerInfo currentServer; - synchronized (servers) { - nameResolutionError = null; - servers.clear(); - for (ResolvedServerInfo addr : updatedServers) { - servers.add(addr); + final EquivalentAddressGroup newAddresses; + synchronized (lock) { + ArrayList newAddressList = + new ArrayList(updatedServers.size()); + for (ResolvedServerInfo server : updatedServers) { + newAddressList.add(server.getAddress()); } - if (servers.isEmpty()) { + newAddresses = new EquivalentAddressGroup(newAddressList); + if (newAddresses.equals(addresses)) { return; } + addresses = newAddresses; + nameResolutionError = null; pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch(); - if (currentServerIndex >= servers.size()) { - currentServerIndex = 0; - } - currentServer = servers.get(currentServerIndex); } pendingPicksFulfillmentBatch.link(new Supplier>() { @Override public ListenableFuture get() { - return tm.getTransport(currentServer.getAddress()); + return tm.getTransport(newAddresses); } }); } @@ -131,27 +130,11 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { BlankFutureProvider.FulfillmentBatch pendingPicksFulfillmentBatch; StatusException statusException = error.augmentDescription("Name resolution failed").asException(); - synchronized (servers) { + synchronized (lock) { pendingPicksFulfillmentBatch = pendingPicks.createFulfillmentBatch(); nameResolutionError = statusException; } pendingPicksFulfillmentBatch.fail(statusException); } - - @Override - public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { - if (!s.isOk()) { - // If the current transport is shut down due to error, move on to the next address in the - // list - synchronized (servers) { - if (addr.equals(servers.get(currentServerIndex).getAddress())) { - currentServerIndex++; - if (currentServerIndex >= servers.size()) { - currentServerIndex = 0; - } - } - } - } - } } } diff --git a/core/src/main/java/io/grpc/TransportManager.java b/core/src/main/java/io/grpc/TransportManager.java index 1848bc70a3..83fbd3e12c 100644 --- a/core/src/main/java/io/grpc/TransportManager.java +++ b/core/src/main/java/io/grpc/TransportManager.java @@ -49,12 +49,13 @@ public abstract class TransportManager { public abstract void updateRetainedTransports(SocketAddress[] addrs); /** - * Returns the future of a transport for the given server. + * Returns the future of a transport for any of the addresses from the given address group. * *

If the channel has been shut down, the value of the future will be {@code null}. */ // TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers, // which would have a different authority than the primary servers. We need to figure out how to // do it. - public abstract ListenableFuture getTransport(SocketAddress addr); + public abstract ListenableFuture getTransport( + EquivalentAddressGroup addressGroup); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 261b0318f6..a0be0f896c 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -47,6 +47,7 @@ import io.grpc.ClientInterceptors; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.DecompressorRegistry; +import io.grpc.EquivalentAddressGroup; import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; @@ -114,11 +115,11 @@ public final class ManagedChannelImpl extends ManagedChannel { private final LoadBalancer loadBalancer; /** - * Maps addresses to transports for that server. + * Maps EquivalentAddressGroups to transports for that server. */ @GuardedBy("lock") - private final Map transports = - new HashMap(); + private final Map transports = + new HashMap(); @GuardedBy("lock") private boolean shutdown; @@ -359,20 +360,21 @@ public final class ManagedChannelImpl extends ManagedChannel { } @Override - public ListenableFuture getTransport(final SocketAddress addr) { + public ListenableFuture getTransport( + final EquivalentAddressGroup addressGroup) { TransportSet ts; synchronized (lock) { if (shutdown) { return NULL_VALUE_TRANSPORT_FUTURE; } - ts = transports.get(addr); + ts = transports.get(addressGroup); if (ts == null) { - ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider, + ts = new TransportSet(addressGroup, authority(), loadBalancer, backoffPolicyProvider, transportFactory, scheduledExecutor, new TransportSet.Callback() { @Override public void onTerminated() { synchronized (lock) { - transports.remove(addr); + transports.remove(addressGroup); if (shutdown && transports.isEmpty()) { if (terminated) { log.warning("transportTerminated called after already terminated"); @@ -384,7 +386,7 @@ public final class ManagedChannelImpl extends ManagedChannel { } } }); - transports.put(addr, ts); + transports.put(addressGroup, ts); } } return ts.obtainActiveTransport(); diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index ea80353fec..a640b0e203 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -36,12 +36,14 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.Status; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -53,7 +55,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; /** - * Transports for a single server. + * Transports for a single {@link SocketAddress}. */ @ThreadSafe final class TransportSet { @@ -66,7 +68,7 @@ final class TransportSet { } private final Object lock = new Object(); - private final SocketAddress server; + private final EquivalentAddressGroup addressGroup; private final String authority; private final BackoffPolicy.Provider backoffPolicyProvider; private final Callback callback; @@ -74,9 +76,18 @@ final class TransportSet { private final ScheduledExecutorService scheduledExecutor; @GuardedBy("lock") - @Nullable + private int nextAddressIndex; + + @GuardedBy("lock") private BackoffPolicy reconnectPolicy; + // The address index from which the current series of consecutive failing connection attempts + // started. -1 means the current series have not started. + // In the case of consecutive failures, the time between two attempts for this address is + // controlled by connectPolicy. + @GuardedBy("lock") + private int headIndex = -1; + @GuardedBy("lock") @Nullable private ScheduledFuture reconnectTask; @@ -100,10 +111,10 @@ final class TransportSet { */ private volatile SettableFuture activeTransportFuture; - TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer, + TransportSet(EquivalentAddressGroup addressGroup, String authority, LoadBalancer loadBalancer, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Callback callback) { - this.server = server; + this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); this.authority = authority; this.loadBalancer = loadBalancer; this.backoffPolicyProvider = backoffPolicyProvider; @@ -150,6 +161,15 @@ final class TransportSet { Preconditions.checkState(!shutdown, "Already shut down"); Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(), "previous reconnectTask is not done"); + + final int currentAddressIndex = nextAddressIndex; + List addrs = addressGroup.getAddresses(); + final SocketAddress address = addrs.get(currentAddressIndex); + nextAddressIndex++; + if (nextAddressIndex >= addrs.size()) { + nextAddressIndex = 0; + } + Runnable createTransportRunnable = new Runnable() { @Override public void run() { @@ -157,28 +177,33 @@ final class TransportSet { if (shutdown) { return; } - ClientTransport newActiveTransport = transportFactory.newClientTransport( - server, authority); + ClientTransport newActiveTransport = + transportFactory.newClientTransport(address, authority); log.log(Level.INFO, "Created transport {0} for {1}", - new Object[] {newActiveTransport, server}); + new Object[] {newActiveTransport, address}); transports.add(newActiveTransport); newActiveTransport.start( - new TransportListener(newActiveTransport, activeTransportFuture)); + new TransportListener(newActiveTransport, activeTransportFuture, address)); Preconditions.checkState(activeTransportFuture.set(newActiveTransport), "failed to set the new transport to the future"); } } }; - if (reconnectPolicy == null) { - // First connect attempt - reconnectPolicy = backoffPolicyProvider.get(); - createTransportRunnable.run(); - reconnectTask = null; - } else { - // Reconnect attempts + + if (currentAddressIndex == headIndex) { + // Back to the first attempted address. Trigger back-off. long delayMillis = reconnectPolicy.nextBackoffMillis(); reconnectTask = scheduledExecutor.schedule( createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS); + } else { + if (headIndex == -1) { + // First connect attempt, or the first attempt since last successful connection. + headIndex = currentAddressIndex; + reconnectPolicy = backoffPolicyProvider.get(); + } + reconnectTask = null; + // No back-off this time. + createTransportRunnable.run(); } } @@ -221,13 +246,15 @@ final class TransportSet { } private class TransportListener implements ClientTransport.Listener { + private final SocketAddress address; private final ClientTransport transport; private final SettableFuture transportFuture; public TransportListener(ClientTransport transport, - SettableFuture transportFuture) { + SettableFuture transportFuture, SocketAddress address) { this.transport = transport; this.transportFuture = transportFuture; + this.address = address; } @GuardedBy("lock") @@ -238,26 +265,26 @@ final class TransportSet { @Override public void transportReady() { synchronized (lock) { - log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server}); + log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address}); Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { - reconnectPolicy = null; + headIndex = -1; } } - loadBalancer.transportReady(server, transport); + loadBalancer.transportReady(addressGroup, transport); } @Override public void transportShutdown(Status s) { synchronized (lock) { log.log(Level.INFO, "Transport {0} for {1} is being shutdown", - new Object[] {transport, server}); + new Object[] {transport, address}); Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { createActiveTransportFuture(); } } - loadBalancer.transportShutdown(server, transport, s); + loadBalancer.transportShutdown(addressGroup, transport, s); } @Override @@ -265,7 +292,7 @@ final class TransportSet { boolean runCallback = false; synchronized (lock) { log.log(Level.INFO, "Transport {0} for {1} is terminated", - new Object[] {transport, server}); + new Object[] {transport, address}); Preconditions.checkState(!isAttachedToActiveTransport(), "Listener is still attached to activeTransportFuture. " + "Seems transportTerminated was not called."); diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java index f79f6189f2..9b606b63b8 100644 --- a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java @@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -44,7 +44,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -66,6 +65,7 @@ public class SimpleLoadBalancerTest { private LoadBalancer loadBalancer; private ArrayList servers; + private EquivalentAddressGroup addressGroup; @Mock private TransportManager mockTransportManager; @@ -76,16 +76,20 @@ public class SimpleLoadBalancerTest { loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer( "fakeservice", mockTransportManager); servers = new ArrayList(); + ArrayList addresses = new ArrayList(); for (int i = 0; i < 3; i++) { - servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY)); + SocketAddress addr = new FakeSocketAddress("server" + i); + servers.add(new ResolvedServerInfo(addr, Attributes.EMPTY)); + addresses.add(addr); } + addressGroup = new EquivalentAddressGroup(addresses); } @Test public void pickBeforeResolved() throws Exception { ClientTransport mockTransport = mock(ClientTransport.class); SettableFuture sourceFuture = SettableFuture.create(); - when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))) + when(mockTransportManager.getTransport(eq(addressGroup))) .thenReturn(sourceFuture); ListenableFuture f1 = loadBalancer.pickTransport(null); ListenableFuture f2 = loadBalancer.pickTransport(null); @@ -94,9 +98,9 @@ public class SimpleLoadBalancerTest { assertNotSame(f1, f2); assertFalse(f1.isDone()); assertFalse(f2.isDone()); - verify(mockTransportManager, never()).getTransport(any(SocketAddress.class)); + verify(mockTransportManager, never()).getTransport(any(EquivalentAddressGroup.class)); loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY); - verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress())); + verify(mockTransportManager, times(2)).getTransport(eq(addressGroup)); assertFalse(f1.isDone()); assertFalse(f2.isDone()); assertNotSame(sourceFuture, f1); @@ -104,28 +108,22 @@ public class SimpleLoadBalancerTest { sourceFuture.set(mockTransport); assertSame(mockTransport, f1.get()); assertSame(mockTransport, f2.get()); - ListenableFuture f3 = loadBalancer.pickTransport(null); - assertSame(sourceFuture, f3); - verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress())); verifyNoMoreInteractions(mockTransportManager); } @Test - public void transportFailed() throws Exception { - ClientTransport mockTransport1 = mock(ClientTransport.class); - ClientTransport mockTransport2 = mock(ClientTransport.class); - when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn( - Futures.immediateFuture(mockTransport1)); - when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn( - Futures.immediateFuture(mockTransport2)); + public void pickAfterResolved() throws Exception { + ClientTransport mockTransport = mock(ClientTransport.class); + SettableFuture sourceFuture = SettableFuture.create(); + when(mockTransportManager.getTransport(eq(addressGroup))) + .thenReturn(sourceFuture); loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY); - ListenableFuture f1 = loadBalancer.pickTransport(null); - ListenableFuture f2 = loadBalancer.pickTransport(null); - assertSame(mockTransport1, f1.get()); - assertSame(mockTransport1, f2.get()); - loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL); - ListenableFuture f3 = loadBalancer.pickTransport(null); - assertSame(mockTransport2, f3.get()); + ListenableFuture f = loadBalancer.pickTransport(null); + assertSame(sourceFuture, f); + assertFalse(f.isDone()); + sourceFuture.set(mockTransport); + assertSame(mockTransport, f.get()); + verify(mockTransportManager).getTransport(addressGroup); } private static class FakeSocketAddress extends SocketAddress { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 230ef7a0f2..4fa9fc93b9 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -83,11 +83,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** Unit tests for {@link ManagedChannelImpl}. */ @@ -353,22 +351,6 @@ public class ManagedChannelImplTest { .thenReturn(goodTransport); when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class))) .thenReturn(badTransport); - final CountDownLatch badTransportFailed = new CountDownLatch(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - final ClientTransport.Listener listener = (ClientTransport.Listener) args[0]; - executor.execute(new Runnable() { - @Override - public void run() { - listener.transportShutdown(Status.UNAVAILABLE); - } - }); - badTransportFailed.countDown(); - return null; - } - }).when(badTransport).start(any(ClientTransport.Listener.class)); FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(Arrays.asList(badServer, goodServer)); @@ -389,8 +371,11 @@ public class ManagedChannelImplTest { // First try should fail with the bad address. call.start(mockCallListener, headers); - assertTrue(badTransportFailed.await(1000, TimeUnit.MILLISECONDS)); + ArgumentCaptor badTransportListenerCaptor = + ArgumentCaptor.forClass(ClientTransport.Listener.class); verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); + verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture()); + badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); // Retry should work with the good address. ClientCall call2 = channel.newCall(method, CallOptions.DEFAULT); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java new file mode 100644 index 0000000000..171e5eacd2 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -0,0 +1,254 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.ListenableFuture; + +import io.grpc.Attributes; +import io.grpc.ClientInterceptor; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.NameResolver; +import io.grpc.Status; +import io.grpc.TransportManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.net.SocketAddress; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Unit tests for {@link ManagedChannelImpl}'s {@link TransportManager} implementation as well as + * {@link TransportSet}. + */ +@RunWith(JUnit4.class) +public class ManagedChannelImplTransportManagerTest { + + private static final String authority = "fakeauthority"; + private static final NameResolver.Factory nameResolverFactory = new NameResolver.Factory() { + @Override + public NameResolver newNameResolver(final URI targetUri, Attributes params) { + return new NameResolver() { + @Override public void start(final Listener listener) { + } + + @Override public String getServiceAuthority() { + return authority; + } + + @Override public void shutdown() { + } + }; + } + }; + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + private ManagedChannelImpl channel; + + @Mock private ClientTransportFactory mockTransportFactory; + @Mock private LoadBalancer.Factory mockLoadBalancerFactory; + @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; + @Mock private BackoffPolicy mockBackoffPolicy; + + private TransportManager tm; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy); + when(mockLoadBalancerFactory.newLoadBalancer(anyString(), any(TransportManager.class))) + .thenReturn(mock(LoadBalancer.class)); + + channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider, + nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory, + mockTransportFactory, executor, null, Collections.emptyList()); + + ArgumentCaptor tmCaptor = ArgumentCaptor.forClass(TransportManager.class); + verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture()); + tm = tmCaptor.getValue(); + } + + @After + public void tearDown() { + channel.shutdown(); + executor.shutdown(); + } + + @Test + public void createAndReuseTransport() throws Exception { + doAnswer(new Answer() { + @Override + public ClientTransport answer(InvocationOnMock invocation) throws Throwable { + return mock(ClientTransport.class); + } + }).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class)); + + SocketAddress addr = mock(SocketAddress.class); + EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr); + ListenableFuture future1 = tm.getTransport(addressGroup); + verify(mockTransportFactory).newClientTransport(addr, authority); + ListenableFuture future2 = tm.getTransport(addressGroup); + assertNotNull(future1.get()); + assertSame(future1.get(), future2.get()); + verify(mockBackoffPolicyProvider).get(); + verify(mockBackoffPolicy, times(0)).nextBackoffMillis(); + verifyNoMoreInteractions(mockTransportFactory); + } + + @Test + public void reconnect() throws Exception { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2)); + + LinkedList listeners = + TestUtils.captureListeners(mockTransportFactory); + + // Invocation counters + int backoffReset = 0; + + // Pick the first transport + ListenableFuture future1 = tm.getTransport(addressGroup); + assertNotNull(future1.get()); + verify(mockTransportFactory).newClientTransport(addr1, authority); + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + // Fail the first transport, without setting it to ready + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Subsequent getTransport() will use the next address + ListenableFuture future2a = tm.getTransport(addressGroup); + assertNotNull(future2a.get()); + // Will keep the previous back-off policy, and not consult back-off policy + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory).newClientTransport(addr2, authority); + ListenableFuture future2b = tm.getTransport(addressGroup); + assertSame(future2a.get(), future2b.get()); + assertNotSame(future1.get(), future2a.get()); + // Make the second transport ready + listeners.peek().transportReady(); + // Disconnect the second transport + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Subsequent getTransport() will use the next address, which is the first one since we have run + // out of addresses. + ListenableFuture future3 = tm.getTransport(addressGroup); + assertNotSame(future1.get(), future3.get()); + assertNotSame(future2a.get(), future3.get()); + // This time back-off policy was reset, because previous transport was succesfully connected. + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + // Back-off policy was never consulted. + verify(mockBackoffPolicy, times(0)).nextBackoffMillis(); + verify(mockTransportFactory, times(2)).newClientTransport(addr1, authority); + verifyNoMoreInteractions(mockTransportFactory); + assertEquals(1, listeners.size()); + } + + @Test + public void reconnectWithBackoff() throws Exception { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2)); + + LinkedList listeners = + TestUtils.captureListeners(mockTransportFactory); + + // Invocation counters + int transportsAddr1 = 0; + int transportsAddr2 = 0; + int backoffConsulted = 0; + int backoffReset = 0; + + // First pick succeeds + ListenableFuture future1 = tm.getTransport(addressGroup); + assertNotNull(future1.get()); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Back-off policy was set initially. + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + listeners.peek().transportReady(); + // Then close it + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Second pick fails. This is the beginning of a series of failures. + ListenableFuture future2 = tm.getTransport(addressGroup); + assertNotNull(future2.get()); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Back-off policy was reset. + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Third pick fails too + ListenableFuture future3 = tm.getTransport(addressGroup); + assertNotNull(future3.get()); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Back-off policy was not reset. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Forth pick is on addr2, back-off policy kicks in. + ListenableFuture future4 = tm.getTransport(addressGroup); + assertNotNull(future4.get()); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Back-off policy was not reset, but was consulted. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis(); + } + +} diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java new file mode 100644 index 0000000000..3d74d510cf --- /dev/null +++ b/core/src/test/java/io/grpc/internal/TestUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.net.SocketAddress; +import java.util.LinkedList; + +/** + * Common utility methods for tests. + */ +final class TestUtils { + + /** + * Stub the given mock {@link ClientTransportFactory} by returning mock {@link ClientTransport}s + * which saves their listeners to a list which is returned by this method. + */ + static LinkedList captureListeners( + ClientTransportFactory mockTransportFactory) { + final LinkedList listeners = + new LinkedList(); + + doAnswer(new Answer() { + @Override + public ClientTransport answer(InvocationOnMock invocation) throws Throwable { + ClientTransport mockTransport = mock(ClientTransport.class); + // Save the listener + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + listeners.add((ClientTransport.Listener) invocation.getArguments()[0]); + return null; + } + }).when(mockTransport).start(any(ClientTransport.Listener.class)); + return mockTransport; + } + }).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class)); + + return listeners; + } +} diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java new file mode 100644 index 0000000000..761cf61947 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -0,0 +1,294 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.Status; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.PriorityQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Unit tests for {@link TransportSet}. + * + *

It only tests the logic that is not covered by {@link ManagedChannelImplTransportManagerTest}. + */ +@RunWith(JUnit4.class) +public class TransportSetTest { + + private static final String authority = "fakeauthority"; + + private long currentTimeMillis; + + @Mock private LoadBalancer mockLoadBalancer; + @Mock private BackoffPolicy mockBackoffPolicy1; + @Mock private BackoffPolicy mockBackoffPolicy2; + @Mock private BackoffPolicy mockBackoffPolicy3; + @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; + @Mock private ClientTransportFactory mockTransportFactory; + @Mock private TransportSet.Callback mockTransportSetCallback; + @Mock private ScheduledExecutorService mockScheduledExecutorService; + + private final PriorityQueue tasks = new PriorityQueue(); + + private TransportSet transportSet; + private EquivalentAddressGroup addressGroup; + private LinkedList listeners; + + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + + when(mockBackoffPolicyProvider.get()) + .thenReturn(mockBackoffPolicy1, mockBackoffPolicy2, mockBackoffPolicy3); + when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L); + when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L); + when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L); + doAnswer(new Answer>() { + @Override public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[0]; + long delay = (Long) args[1]; + TimeUnit unit = (TimeUnit) args[2]; + tasks.add(new Task(currentTimeMillis + unit.toMillis(delay), task)); + return null; + } + }).when(mockScheduledExecutorService) + .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + listeners = TestUtils.captureListeners(mockTransportFactory); + } + + @Test public void singleAddressBackoff() { + SocketAddress addr = mock(SocketAddress.class); + createTransortSet(addr); + + // Invocation counters + int transportsCreated = 0; + int backoff1Consulted = 0; + int backoff2Consulted = 0; + int backoffReset = 0; + + // First attempt happens immediately (TransportSet aggressively maintains a transport) + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); + // Fail this one + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Second attempt uses the first back-off value interval. + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + // Transport creation doesn't happen until time is due + forwardTime(9); + verify(mockTransportFactory, times(transportsCreated)).newClientTransport(addr, authority); + forwardTime(1); + verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); + // Fail this one too + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Third attempt uses the second back-off interval. + verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + // Transport creation doesn't happen until time is due + forwardTime(99); + verify(mockTransportFactory, times(transportsCreated)).newClientTransport(addr, authority); + forwardTime(1); + verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); + // Let this one succeed + listeners.peek().transportReady(); + // And close it + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Back-off is reset, and the next attempt will happen immediately + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); + + // Final checks for consultations on back-off policies + verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis(); + } + + @Test public void twoAddressesBackoff() { + SocketAddress addr1 = mock(SocketAddress.class); + SocketAddress addr2 = mock(SocketAddress.class); + createTransortSet(addr1, addr2); + + // Invocation counters + int transportsAddr1 = 0; + int transportsAddr2 = 0; + int backoff1Consulted = 0; + int backoff2Consulted = 0; + int backoff3Consulted = 0; + int backoffReset = 0; + + // Connection happens immediately (TransportSet aggressively maintains a transport) + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Let this one through + listeners.peek().transportReady(); + // Then shut it down + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + + ////// Now start a series of failing attempts, where addr2 is the head. + // First attempt after a connection closed. Reset back-off policy. + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Fail this one + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Second attempt will happen immediately. Keep back-off policy. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Fail this one too + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Third attempt is on head, thus controlled by the first back-off interval. + verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + forwardTime(9); + verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); + forwardTime(1); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Fail this one too + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Forth attempt will happen immediately. Keep back-off policy. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Fail this one too + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Fifth attempt is on head, thus controlled by the second back-off interval. + verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + forwardTime(99); + verify(mockTransportFactory, times(transportsAddr2)).newClientTransport(addr2, authority); + forwardTime(1); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Let it through + listeners.peek().transportReady(); + // Then close it. + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + + ////// Now start a series of failing attempts, where addr1 is the head. + // First attempt after a connection closed. Reset back-off policy. + verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + // Fail this one + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Second attempt will happen immediately. Keep back-off policy. + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); + // Fail this one too + listeners.poll().transportShutdown(Status.UNAVAILABLE); + + // Third attempt is on head, thus controlled by the first back-off interval. + verify(mockBackoffPolicy3, times(++backoff3Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicyProvider, times(backoffReset)).get(); + forwardTime(9); + verify(mockTransportFactory, times(transportsAddr1)).newClientTransport(addr1, authority); + forwardTime(1); + verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); + + // Final checks on invocations on back-off policies + verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis(); + verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis(); + } + + private static class Task implements Comparable { + final long dueTimeMillis; + final Runnable command; + + Task(long dueTimeMillis, Runnable command) { + this.dueTimeMillis = dueTimeMillis; + this.command = command; + } + + @Override public int compareTo(Task other) { + if (dueTimeMillis < other.dueTimeMillis) { + return -1; + } else if (dueTimeMillis > other.dueTimeMillis) { + return 1; + } else { + return 0; + } + } + } + + private void runDueTasks() { + while (true) { + Task task = tasks.peek(); + if (task == null || task.dueTimeMillis > currentTimeMillis) { + break; + } + tasks.poll(); + task.command.run(); + } + } + + private void forwardTime(long millis) { + currentTimeMillis += millis; + runDueTasks(); + } + + private void createTransortSet(SocketAddress ... addrs) { + addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs)); + transportSet = new TransportSet(addressGroup, authority, mockLoadBalancer, + mockBackoffPolicyProvider, mockTransportFactory, mockScheduledExecutorService, + mockTransportSetCallback); + } +}