diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 537023148e..58887b1f0b 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -45,6 +45,7 @@ import java.util.Collection; import java.util.LinkedHashSet; import java.util.concurrent.Executor; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; /** @@ -232,6 +233,12 @@ class DelayedClientTransport implements ManagedClientTransport { return GrpcUtil.getLogId(this); } + @VisibleForTesting + @Nullable + Supplier getTransportSupplier() { + return transportSupplier; + } + private class PendingStream extends DelayedStream { private final MethodDescriptor method; private final Metadata headers; diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index df7ee70c32..9d1300cd66 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -34,7 +34,6 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; @@ -44,7 +43,6 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -108,20 +106,16 @@ final class TransportSet implements WithLogId { /* * The transport for new outgoing requests. * - If shutdown == true, activeTransport is null (shutdown) - * - Otherwise, if delayedTransport != null, - * activeTransport is delayedTransport (waiting to connect) + * - Otherwise, if a connection is pending or connecting, + * activeTransport is a DelayedClientTransport * - Otherwise, activeTransport is either null (initially or when idle) - * or points to a real transport (when connecting or connected). + * or points to a real transport (when ready). * * 'lock' must be held when assigning to it. */ @Nullable private volatile ManagedClientTransport activeTransport; - @GuardedBy("lock") - @Nullable - private DelayedClientTransport delayedTransport; - TransportSet(EquivalentAddressGroup addressGroup, String authority, LoadBalancer loadBalancer, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, @@ -155,40 +149,24 @@ final class TransportSet implements WithLogId { if (savedTransport != null) { return savedTransport; } - Callable immediateConnectionTask = null; synchronized (lock) { // Check again, since it could have changed before acquiring the lock if (activeTransport == null) { if (shutdown) { return SHUTDOWN_TRANSPORT; } - delayedTransport = new DelayedClientTransport(); + DelayedClientTransport delayedTransport = new DelayedClientTransport(); transports.add(delayedTransport); delayedTransport.start(new BaseTransportListener(delayedTransport)); activeTransport = delayedTransport; - immediateConnectionTask = scheduleConnection(); + scheduleConnection(delayedTransport); } - savedTransport = activeTransport; + return activeTransport; } - if (immediateConnectionTask != null) { - try { - return immediateConnectionTask.call(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - return savedTransport; } - /** - * Schedule a task that creates a new transport. - * - * @return if not {@code null}, caller should run the returned callable outside of lock. The - * callable returns the real transport that has been created. - */ - @Nullable @GuardedBy("lock") - private Callable scheduleConnection() { + private void scheduleConnection(final DelayedClientTransport delayedTransport) { Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(), "previous reconnectTask is not done"); @@ -203,47 +181,23 @@ final class TransportSet implements WithLogId { nextAddressIndex = 0; } - final Callable createTransportCallable = new Callable() { + Runnable createTransportRunnable = new Runnable() { @Override - public ClientTransport call() { - DelayedClientTransport savedDelayedTransport; - ManagedClientTransport newActiveTransport; - boolean savedShutdown; + public void run() { synchronized (lock) { - savedShutdown = shutdown; reconnectTask = null; if (currentAddressIndex == 0) { backoffWatch.reset().start(); } - newActiveTransport = transportFactory.newClientTransport(address, authority); + ManagedClientTransport transport = + transportFactory.newClientTransport(address, authority); if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] Created {1} for {2}", - new Object[] {getLogId(), newActiveTransport.getLogId(), address}); + new Object[] {getLogId(), transport.getLogId(), address}); } - transports.add(newActiveTransport); - newActiveTransport.start( - new TransportListener(newActiveTransport, address)); - if (shutdown) { - // If TransportSet already shutdown, newActiveTransport is only to take care of pending - // streams in delayedTransport, but will not serve new streams, and it will be shutdown - // as soon as it's set to the delayedTransport. - // activeTransport should have already been set to null by shutdown(). We keep it null. - Preconditions.checkState(activeTransport == null, - "Unexpected non-null activeTransport"); - } else { - activeTransport = newActiveTransport; - } - savedDelayedTransport = delayedTransport; - delayedTransport = null; + transports.add(transport); + transport.start(new TransportListener(transport, delayedTransport, address)); } - savedDelayedTransport.setTransport(newActiveTransport); - // This delayed transport will terminate and be removed from transports. - savedDelayedTransport.shutdown(); - if (savedShutdown) { - // See comments in the synchronized block above on why we shutdown here. - newActiveTransport.shutdown(); - } - return newActiveTransport; } }; @@ -266,20 +220,10 @@ final class TransportSet implements WithLogId { if (delayMillis <= 0) { reconnectTask = null; // No back-off this time. - // Note createTransportRunnable is not supposed to run under the lock. - return createTransportCallable; + createTransportRunnable.run(); } else { reconnectTask = scheduledExecutor.schedule( - new Runnable() { - @Override public void run() { - try { - createTransportCallable.call(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - }, delayMillis, TimeUnit.MILLISECONDS); - return null; + createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS); } } @@ -301,7 +245,6 @@ final class TransportSet implements WithLogId { if (transports.isEmpty()) { runCallback = true; Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); - Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport"); } // else: the callback will be run once all transports have been terminated } if (savedActiveTransport != null) { @@ -361,14 +304,13 @@ final class TransportSet implements WithLogId { /** Listener for real transports. */ private class TransportListener extends BaseTransportListener { private final SocketAddress address; + private final DelayedClientTransport delayedTransport; - public TransportListener(ManagedClientTransport transport, SocketAddress address) { + public TransportListener(ManagedClientTransport transport, + DelayedClientTransport delayedTransport, SocketAddress address) { super(transport); this.address = address; - } - - private boolean isAttachedToActiveTransport() { - return activeTransport == transport; + this.delayedTransport = delayedTransport; } @Override @@ -378,11 +320,28 @@ final class TransportSet implements WithLogId { new Object[] {getLogId(), transport.getLogId(), address}); } super.transportReady(); + boolean savedShutdown; synchronized (lock) { - if (isAttachedToActiveTransport()) { - firstAttempt = true; + savedShutdown = shutdown; + firstAttempt = true; + if (shutdown) { + // If TransportSet already shutdown, transport is only to take care of pending + // streams in delayedTransport, but will not serve new streams, and it will be shutdown + // as soon as it's set to the delayedTransport. + // activeTransport should have already been set to null by shutdown(). We keep it null. + Preconditions.checkState(activeTransport == null, + "Unexpected non-null activeTransport"); + } else if (activeTransport == delayedTransport) { + activeTransport = transport; } } + delayedTransport.setTransport(transport); + // This delayed transport will terminate and be removed from transports. + delayedTransport.shutdown(); + if (savedShutdown) { + // See comments in the synchronized block above on why we shutdown here. + transport.shutdown(); + } loadBalancer.handleTransportReady(addressGroup); } @@ -394,8 +353,18 @@ final class TransportSet implements WithLogId { } super.transportShutdown(s); synchronized (lock) { - if (isAttachedToActiveTransport()) { + if (activeTransport == transport) { activeTransport = null; + } else if (activeTransport == delayedTransport) { + // Continue reconnect if there are still addresses to try. + // Fail if all addresses have been tried and failed in a row. + if (nextAddressIndex == 0) { + delayedTransport.setTransport(new FailingClientTransport(s)); + delayedTransport.shutdown(); + activeTransport = null; + } else { + scheduleConnection(delayedTransport); + } } } loadBalancer.handleTransportShutdown(addressGroup, s); @@ -408,9 +377,9 @@ final class TransportSet implements WithLogId { new Object[] {getLogId(), transport.getLogId(), address}); } super.transportTerminated(); - Preconditions.checkState(!isAttachedToActiveTransport(), - "Listener is still attached to activeTransport. " - + "Seems transportTerminated was not called."); + Preconditions.checkState(activeTransport != transport, + "activeTransport still points to the delayedTransport. " + + "Seems transportShutdown() was not called."); } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index dbc2f6b876..1558d3aafd 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -48,6 +48,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -196,6 +197,7 @@ public class ManagedChannelImplTest { .newClientTransport(same(socketAddress), eq(authority)); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); + transportListener.transportReady(); verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); verify(mockStream).start(streamListenerCaptor.capture()); verify(mockStream).setCompressor(isA(Compressor.class)); @@ -341,6 +343,8 @@ public class ManagedChannelImplTest { ClientCall call = channel.newCall(method, CallOptions.DEFAULT.withExecutor(executor)); call.start(mockCallListener, headers); + verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); + transportListenerCaptor.getValue().transportReady(); verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); verify(mockStream).start(streamListenerCaptor.capture()); ClientStreamListener streamListener = streamListenerCaptor.getValue(); @@ -385,16 +389,27 @@ public class ManagedChannelImplTest { } /** - * Verify that if one resolved address points to a bad server, the retry will use another address. + * Verify that if the first resolved address points to a server that cannot be connected, the call + * will end up with the second address which works. */ @Test - public void firstResolvedServerIsBad() throws Exception { - final SocketAddress goodAddress = new SocketAddress() {}; - final SocketAddress badAddress = new SocketAddress() {}; + public void firstResolvedServerFailedToConnect() throws Exception { + final SocketAddress goodAddress = new SocketAddress() { + @Override public String toString() { + return "goodAddress"; + } + }; + final SocketAddress badAddress = new SocketAddress() { + @Override public String toString() { + return "badAddress"; + } + }; final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY); final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY); final ManagedClientTransport goodTransport = mock(ManagedClientTransport.class); final ManagedClientTransport badTransport = mock(ManagedClientTransport.class); + when(goodTransport.newStream(any(MethodDescriptor.class), any(Metadata.class))) + .thenReturn(mock(ClientStream.class)); when(mockTransportFactory.newClientTransport(same(goodAddress), any(String.class))) .thenReturn(goodTransport); when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class))) @@ -405,31 +420,131 @@ public class ManagedChannelImplTest { ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR); ClientCall call = channel.newCall(method, CallOptions.DEFAULT); Metadata headers = new Metadata(); - ClientStream badStream = mock(ClientStream.class); - when(badTransport.newStream(same(method), same(headers))).thenReturn(badStream); - doAnswer(new Answer() { - @Override - public ClientStream answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - final ClientStreamListener listener = (ClientStreamListener) args[0]; - listener.closed(Status.UNAVAILABLE, new Metadata()); - return mock(ClientStream.class); - } - }).when(badStream).start(any(ClientStreamListener.class)); - when(goodTransport.newStream(same(method), same(headers))).thenReturn(mock(ClientStream.class)); - // First try should fail with the bad address. + // Start a call. The channel will starts with the first address (badAddress) call.start(mockCallListener, headers); ArgumentCaptor badTransportListenerCaptor = ArgumentCaptor.forClass(ManagedClientTransport.Listener.class); - verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture()); + verify(mockTransportFactory).newClientTransport(same(badAddress), any(String.class)); + verify(mockTransportFactory, times(0)) + .newClientTransport(same(goodAddress), any(String.class)); badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); - // Retry should work with the good address. + // The channel then try the second address (goodAddress) + ArgumentCaptor goodTransportListenerCaptor = + ArgumentCaptor.forClass(ManagedClientTransport.Listener.class); + verify(mockTransportFactory, timeout(1000)) + .newClientTransport(same(goodAddress), any(String.class)); + verify(goodTransport, timeout(1000)).start(goodTransportListenerCaptor.capture()); + goodTransportListenerCaptor.getValue().transportReady(); + verify(goodTransport, timeout(1000)).newStream(same(method), same(headers)); + // The bad transport was never used. + verify(badTransport, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class)); + } + + /** + * Verify that if all resolved addresses failed to connect, the call will fail. + */ + @Test + public void allServersFailedToConnect() throws Exception { + final SocketAddress addr1 = new SocketAddress() { + @Override public String toString() { + return "addr1"; + } + }; + final SocketAddress addr2 = new SocketAddress() { + @Override public String toString() { + return "addr2"; + } + }; + final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY); + final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY); + final ManagedClientTransport transport1 = mock(ManagedClientTransport.class); + final ManagedClientTransport transport2 = mock(ManagedClientTransport.class); + when(mockTransportFactory.newClientTransport(same(addr1), any(String.class))) + .thenReturn(transport1); + when(mockTransportFactory.newClientTransport(same(addr2), any(String.class))) + .thenReturn(transport2); + + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory(Arrays.asList(server1, server2)); + ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + Metadata headers = new Metadata(); + + // Start a call. The channel will starts with the first address, which will fail to connect. + call.start(mockCallListener, headers); + verify(transport1, timeout(1000)).start(transportListenerCaptor.capture()); + verify(mockTransportFactory).newClientTransport(same(addr1), any(String.class)); + verify(mockTransportFactory, times(0)) + .newClientTransport(same(addr2), any(String.class)); + transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); + + // The channel then try the second address, which will fail to connect too. + verify(transport2, timeout(1000)).start(transportListenerCaptor.capture()); + verify(mockTransportFactory).newClientTransport(same(addr2), any(String.class)); + verify(transport2, timeout(1000)).start(transportListenerCaptor.capture()); + transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); + + // Call fails + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + // No real stream was ever created + verify(transport1, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class)); + verify(transport2, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class)); + } + + /** + * Verify that if the first resolved address points to a server that is at first connected, but + * disconnected later, all calls will stick to the first address. + */ + @Test + public void firstResolvedServerConnectedThenDisconnected() throws Exception { + final SocketAddress addr1 = new SocketAddress() { + @Override public String toString() { + return "addr1"; + } + }; + final SocketAddress addr2 = new SocketAddress() { + @Override public String toString() { + return "addr2"; + } + }; + final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY); + final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY); + // Addr1 will have two transports throughout this test. + final ManagedClientTransport transport1 = mock(ManagedClientTransport.class); + final ManagedClientTransport transport2 = mock(ManagedClientTransport.class); + when(transport1.newStream(any(MethodDescriptor.class), any(Metadata.class))) + .thenReturn(mock(ClientStream.class)); + when(transport2.newStream(any(MethodDescriptor.class), any(Metadata.class))) + .thenReturn(mock(ClientStream.class)); + when(mockTransportFactory.newClientTransport(same(addr1), any(String.class))) + .thenReturn(transport1, transport2); + + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory(Arrays.asList(server1, server2)); + ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + Metadata headers = new Metadata(); + + // First call will use the first address + call.start(mockCallListener, headers); + verify(mockTransportFactory, timeout(1000)).newClientTransport(same(addr1), any(String.class)); + verify(transport1, timeout(1000)).start(transportListenerCaptor.capture()); + transportListenerCaptor.getValue().transportReady(); + verify(transport1, timeout(1000)).newStream(same(method), same(headers)); + transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); + + // Second call still use the first address, since it was successfully connected. ClientCall call2 = channel.newCall(method, CallOptions.DEFAULT); call2.start(mockCallListener, headers); - verify(goodTransport, timeout(1000)).newStream(same(method), same(headers)); + verify(transport2, timeout(1000)).start(transportListenerCaptor.capture()); + verify(mockTransportFactory, times(2)).newClientTransport(same(addr1), any(String.class)); + transportListenerCaptor.getValue().transportReady(); + verify(transport2, timeout(1000)).newStream(same(method), same(headers)); } private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index f6d1384b13..161c239b84 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -165,13 +165,12 @@ public class ManagedChannelImplTransportManagerTest { ClientTransport t1 = tm.getTransport(addressGroup); verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, authority); // The real transport - ClientTransport rt = transports.poll(1, TimeUnit.SECONDS).transport; + MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS); + transportInfo.listener.transportReady(); ClientTransport t2 = tm.getTransport(addressGroup); - // Make sure the first transport is always a real transport. This promise is especially made for - // InProcessTransport, because it may run into deadlock if it works under a delayed transport - // (https://github.com/grpc/grpc-java/issues/1510). - assertSame(rt, t1); - assertSame(rt, t2); + assertTrue(t1 instanceof DelayedClientTransport); + assertFalse(t2 instanceof DelayedClientTransport); + assertSame(transportInfo.transport, t2); verify(mockBackoffPolicyProvider).get(); verify(mockBackoffPolicy, times(0)).nextBackoffMillis(); verifyNoMoreInteractions(mockTransportFactory); diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index 8af25c39d9..f3cb772fa5 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -33,6 +33,9 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -106,7 +109,7 @@ public class TransportSetTest { transports = TestUtils.captureTransports(mockTransportFactory); } - @Test public void singleAddressBackoff() { + @Test public void singleAddressReconnect() { SocketAddress addr = mock(SocketAddress.class); createTransportSet(addr); @@ -159,7 +162,7 @@ public class TransportSetTest { verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis(); } - @Test public void twoAddressesBackoff() { + @Test public void twoAddressesReconnect() { SocketAddress addr1 = mock(SocketAddress.class); SocketAddress addr2 = mock(SocketAddress.class); createTransportSet(addr1, addr2); @@ -173,21 +176,29 @@ public class TransportSetTest { int backoffReset = 0; // First attempt - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport1 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Let this one fail without success transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertNull(delayedTransport1.getTransportSupplier()); // Second attempt will start immediately. Keep back-off policy. - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport2 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + assertSame(delayedTransport1, delayedTransport2); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // All addresses have failed. Delayed transport will see an error. + assertTrue(delayedTransport2.getTransportSupplier().get() instanceof FailingClientTransport); // Third attempt is the first address, thus controlled by the first back-off interval. - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport3 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + assertNotSame(delayedTransport2, delayedTransport3); verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); fakeClock.forwardMillis(9); @@ -196,16 +207,23 @@ public class TransportSetTest { verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertNull(delayedTransport3.getTransportSupplier()); // Forth attempt will start immediately. Keep back-off policy. - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport4 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + assertSame(delayedTransport3, delayedTransport4); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // All addresses have failed again. Delayed transport will see an error + assertTrue(delayedTransport4.getTransportSupplier().get() instanceof FailingClientTransport); // Fifth attempt for the first address, thus controlled by the second back-off interval. - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport5 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + assertNotSame(delayedTransport4, delayedTransport5); verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); fakeClock.forwardMillis(99); @@ -214,12 +232,16 @@ public class TransportSetTest { verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Let it through transports.peek().listener.transportReady(); + // Delayed transport will see the connected transport. + assertSame(transports.peek().transport, delayedTransport5.getTransportSupplier().get()); // Then close it. transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // First attempt after a successful connection. Reset back-off policy, and start from the first // address. - transportSet.obtainActiveTransport(); + DelayedClientTransport delayedTransport6 = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + assertNotSame(delayedTransport5, delayedTransport6); verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); @@ -297,11 +319,15 @@ public class TransportSetTest { // Reconnect will eventually happen, even though TransportSet has been shut down fakeClock.forwardMillis(10); verify(mockTransportFactory, times(2)).newClientTransport(addr, authority); - // The pending stream will be started on this newly started transport, which is promptly shut - // down by TransportSet right after the stream is created. + // The pending stream will be started on this newly started transport after it's ready. + // The transport is shut down by TransportSet right after the stream is created. transportInfo = transports.poll(); + verify(transportInfo.transport, times(0)).newStream(same(method), same(headers)); + verify(transportInfo.transport, times(0)).shutdown(); + transportInfo.listener.transportReady(); verify(transportInfo.transport).newStream(same(method), same(headers)); verify(transportInfo.transport).shutdown(); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportSetCallback, never()).onTerminated(); // Terminating the transport will let TransportSet to be terminated. transportInfo.listener.transportTerminated();