Only link delayed transport AFTER real transport has called transportReady().

If TransportSet fails to connect a transport (i.e., transportShutdown()
called without transportReady()), TransportSet will automatically
schedule reconnection for the next address, unless it has reached the end
of the address list, in which case it will fail the delayed transport.

This will reduce stream errors caused by bad addresses appearing before
good addresses in the resolved address list.

Before this change, TransportSet would return the real transport on the
first call of obtainActiveTransport(). After this change, it will return
the delayed transport instead.
This commit is contained in:
Kun Zhang 2016-02-27 23:00:22 -08:00
parent 27d848901f
commit 08c74d46f5
5 changed files with 236 additions and 120 deletions

View File

@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
@ -232,6 +233,12 @@ class DelayedClientTransport implements ManagedClientTransport {
return GrpcUtil.getLogId(this);
}
@VisibleForTesting
@Nullable
Supplier<ClientTransport> getTransportSupplier() {
return transportSupplier;
}
private class PendingStream extends DelayedStream {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;

View File

@ -34,7 +34,6 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
@ -44,7 +43,6 @@ import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -108,20 +106,16 @@ final class TransportSet implements WithLogId {
/*
* The transport for new outgoing requests.
* - If shutdown == true, activeTransport is null (shutdown)
* - Otherwise, if delayedTransport != null,
* activeTransport is delayedTransport (waiting to connect)
* - Otherwise, if a connection is pending or connecting,
* activeTransport is a DelayedClientTransport
* - Otherwise, activeTransport is either null (initially or when idle)
* or points to a real transport (when connecting or connected).
* or points to a real transport (when ready).
*
* 'lock' must be held when assigning to it.
*/
@Nullable
private volatile ManagedClientTransport activeTransport;
@GuardedBy("lock")
@Nullable
private DelayedClientTransport delayedTransport;
TransportSet(EquivalentAddressGroup addressGroup, String authority,
LoadBalancer<ClientTransport> loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
@ -155,40 +149,24 @@ final class TransportSet implements WithLogId {
if (savedTransport != null) {
return savedTransport;
}
Callable<ClientTransport> immediateConnectionTask = null;
synchronized (lock) {
// Check again, since it could have changed before acquiring the lock
if (activeTransport == null) {
if (shutdown) {
return SHUTDOWN_TRANSPORT;
}
delayedTransport = new DelayedClientTransport();
DelayedClientTransport delayedTransport = new DelayedClientTransport();
transports.add(delayedTransport);
delayedTransport.start(new BaseTransportListener(delayedTransport));
activeTransport = delayedTransport;
immediateConnectionTask = scheduleConnection();
scheduleConnection(delayedTransport);
}
savedTransport = activeTransport;
return activeTransport;
}
if (immediateConnectionTask != null) {
try {
return immediateConnectionTask.call();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
return savedTransport;
}
/**
* Schedule a task that creates a new transport.
*
* @return if not {@code null}, caller should run the returned callable outside of lock. The
* callable returns the real transport that has been created.
*/
@Nullable
@GuardedBy("lock")
private Callable<ClientTransport> scheduleConnection() {
private void scheduleConnection(final DelayedClientTransport delayedTransport) {
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
"previous reconnectTask is not done");
@ -203,47 +181,23 @@ final class TransportSet implements WithLogId {
nextAddressIndex = 0;
}
final Callable<ClientTransport> createTransportCallable = new Callable<ClientTransport>() {
Runnable createTransportRunnable = new Runnable() {
@Override
public ClientTransport call() {
DelayedClientTransport savedDelayedTransport;
ManagedClientTransport newActiveTransport;
boolean savedShutdown;
public void run() {
synchronized (lock) {
savedShutdown = shutdown;
reconnectTask = null;
if (currentAddressIndex == 0) {
backoffWatch.reset().start();
}
newActiveTransport = transportFactory.newClientTransport(address, authority);
ManagedClientTransport transport =
transportFactory.newClientTransport(address, authority);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {getLogId(), newActiveTransport.getLogId(), address});
new Object[] {getLogId(), transport.getLogId(), address});
}
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, address));
if (shutdown) {
// If TransportSet already shutdown, newActiveTransport is only to take care of pending
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
// as soon as it's set to the delayedTransport.
// activeTransport should have already been set to null by shutdown(). We keep it null.
Preconditions.checkState(activeTransport == null,
"Unexpected non-null activeTransport");
} else {
activeTransport = newActiveTransport;
}
savedDelayedTransport = delayedTransport;
delayedTransport = null;
transports.add(transport);
transport.start(new TransportListener(transport, delayedTransport, address));
}
savedDelayedTransport.setTransport(newActiveTransport);
// This delayed transport will terminate and be removed from transports.
savedDelayedTransport.shutdown();
if (savedShutdown) {
// See comments in the synchronized block above on why we shutdown here.
newActiveTransport.shutdown();
}
return newActiveTransport;
}
};
@ -266,20 +220,10 @@ final class TransportSet implements WithLogId {
if (delayMillis <= 0) {
reconnectTask = null;
// No back-off this time.
// Note createTransportRunnable is not supposed to run under the lock.
return createTransportCallable;
createTransportRunnable.run();
} else {
reconnectTask = scheduledExecutor.schedule(
new Runnable() {
@Override public void run() {
try {
createTransportCallable.call();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}, delayMillis, TimeUnit.MILLISECONDS);
return null;
createTransportRunnable, delayMillis, TimeUnit.MILLISECONDS);
}
}
@ -301,7 +245,6 @@ final class TransportSet implements WithLogId {
if (transports.isEmpty()) {
runCallback = true;
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport");
} // else: the callback will be run once all transports have been terminated
}
if (savedActiveTransport != null) {
@ -361,14 +304,13 @@ final class TransportSet implements WithLogId {
/** Listener for real transports. */
private class TransportListener extends BaseTransportListener {
private final SocketAddress address;
private final DelayedClientTransport delayedTransport;
public TransportListener(ManagedClientTransport transport, SocketAddress address) {
public TransportListener(ManagedClientTransport transport,
DelayedClientTransport delayedTransport, SocketAddress address) {
super(transport);
this.address = address;
}
private boolean isAttachedToActiveTransport() {
return activeTransport == transport;
this.delayedTransport = delayedTransport;
}
@Override
@ -378,11 +320,28 @@ final class TransportSet implements WithLogId {
new Object[] {getLogId(), transport.getLogId(), address});
}
super.transportReady();
boolean savedShutdown;
synchronized (lock) {
if (isAttachedToActiveTransport()) {
firstAttempt = true;
savedShutdown = shutdown;
firstAttempt = true;
if (shutdown) {
// If TransportSet already shutdown, transport is only to take care of pending
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
// as soon as it's set to the delayedTransport.
// activeTransport should have already been set to null by shutdown(). We keep it null.
Preconditions.checkState(activeTransport == null,
"Unexpected non-null activeTransport");
} else if (activeTransport == delayedTransport) {
activeTransport = transport;
}
}
delayedTransport.setTransport(transport);
// This delayed transport will terminate and be removed from transports.
delayedTransport.shutdown();
if (savedShutdown) {
// See comments in the synchronized block above on why we shutdown here.
transport.shutdown();
}
loadBalancer.handleTransportReady(addressGroup);
}
@ -394,8 +353,18 @@ final class TransportSet implements WithLogId {
}
super.transportShutdown(s);
synchronized (lock) {
if (isAttachedToActiveTransport()) {
if (activeTransport == transport) {
activeTransport = null;
} else if (activeTransport == delayedTransport) {
// Continue reconnect if there are still addresses to try.
// Fail if all addresses have been tried and failed in a row.
if (nextAddressIndex == 0) {
delayedTransport.setTransport(new FailingClientTransport(s));
delayedTransport.shutdown();
activeTransport = null;
} else {
scheduleConnection(delayedTransport);
}
}
}
loadBalancer.handleTransportShutdown(addressGroup, s);
@ -408,9 +377,9 @@ final class TransportSet implements WithLogId {
new Object[] {getLogId(), transport.getLogId(), address});
}
super.transportTerminated();
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransport. "
+ "Seems transportTerminated was not called.");
Preconditions.checkState(activeTransport != transport,
"activeTransport still points to the delayedTransport. "
+ "Seems transportShutdown() was not called.");
}
}

View File

@ -48,6 +48,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -196,6 +197,7 @@ public class ManagedChannelImplTest {
.newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
transportListener.transportReady();
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
verify(mockStream).start(streamListenerCaptor.capture());
verify(mockStream).setCompressor(isA(Compressor.class));
@ -341,6 +343,8 @@ public class ManagedChannelImplTest {
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withExecutor(executor));
call.start(mockCallListener, headers);
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
transportListenerCaptor.getValue().transportReady();
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
verify(mockStream).start(streamListenerCaptor.capture());
ClientStreamListener streamListener = streamListenerCaptor.getValue();
@ -385,16 +389,27 @@ public class ManagedChannelImplTest {
}
/**
* Verify that if one resolved address points to a bad server, the retry will use another address.
* Verify that if the first resolved address points to a server that cannot be connected, the call
* will end up with the second address which works.
*/
@Test
public void firstResolvedServerIsBad() throws Exception {
final SocketAddress goodAddress = new SocketAddress() {};
final SocketAddress badAddress = new SocketAddress() {};
public void firstResolvedServerFailedToConnect() throws Exception {
final SocketAddress goodAddress = new SocketAddress() {
@Override public String toString() {
return "goodAddress";
}
};
final SocketAddress badAddress = new SocketAddress() {
@Override public String toString() {
return "badAddress";
}
};
final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
final ManagedClientTransport goodTransport = mock(ManagedClientTransport.class);
final ManagedClientTransport badTransport = mock(ManagedClientTransport.class);
when(goodTransport.newStream(any(MethodDescriptor.class), any(Metadata.class)))
.thenReturn(mock(ClientStream.class));
when(mockTransportFactory.newClientTransport(same(goodAddress), any(String.class)))
.thenReturn(goodTransport);
when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class)))
@ -405,31 +420,131 @@ public class ManagedChannelImplTest {
ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
ClientStream badStream = mock(ClientStream.class);
when(badTransport.newStream(same(method), same(headers))).thenReturn(badStream);
doAnswer(new Answer<ClientStream>() {
@Override
public ClientStream answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
final ClientStreamListener listener = (ClientStreamListener) args[0];
listener.closed(Status.UNAVAILABLE, new Metadata());
return mock(ClientStream.class);
}
}).when(badStream).start(any(ClientStreamListener.class));
when(goodTransport.newStream(same(method), same(headers))).thenReturn(mock(ClientStream.class));
// First try should fail with the bad address.
// Start a call. The channel will starts with the first address (badAddress)
call.start(mockCallListener, headers);
ArgumentCaptor<ManagedClientTransport.Listener> badTransportListenerCaptor =
ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class));
verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture());
verify(mockTransportFactory).newClientTransport(same(badAddress), any(String.class));
verify(mockTransportFactory, times(0))
.newClientTransport(same(goodAddress), any(String.class));
badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
// Retry should work with the good address.
// The channel then try the second address (goodAddress)
ArgumentCaptor<ManagedClientTransport.Listener> goodTransportListenerCaptor =
ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
verify(mockTransportFactory, timeout(1000))
.newClientTransport(same(goodAddress), any(String.class));
verify(goodTransport, timeout(1000)).start(goodTransportListenerCaptor.capture());
goodTransportListenerCaptor.getValue().transportReady();
verify(goodTransport, timeout(1000)).newStream(same(method), same(headers));
// The bad transport was never used.
verify(badTransport, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
}
/**
* Verify that if all resolved addresses failed to connect, the call will fail.
*/
@Test
public void allServersFailedToConnect() throws Exception {
final SocketAddress addr1 = new SocketAddress() {
@Override public String toString() {
return "addr1";
}
};
final SocketAddress addr2 = new SocketAddress() {
@Override public String toString() {
return "addr2";
}
};
final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
final ManagedClientTransport transport1 = mock(ManagedClientTransport.class);
final ManagedClientTransport transport2 = mock(ManagedClientTransport.class);
when(mockTransportFactory.newClientTransport(same(addr1), any(String.class)))
.thenReturn(transport1);
when(mockTransportFactory.newClientTransport(same(addr2), any(String.class)))
.thenReturn(transport2);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory(Arrays.asList(server1, server2));
ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
// Start a call. The channel will starts with the first address, which will fail to connect.
call.start(mockCallListener, headers);
verify(transport1, timeout(1000)).start(transportListenerCaptor.capture());
verify(mockTransportFactory).newClientTransport(same(addr1), any(String.class));
verify(mockTransportFactory, times(0))
.newClientTransport(same(addr2), any(String.class));
transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
// The channel then try the second address, which will fail to connect too.
verify(transport2, timeout(1000)).start(transportListenerCaptor.capture());
verify(mockTransportFactory).newClientTransport(same(addr2), any(String.class));
verify(transport2, timeout(1000)).start(transportListenerCaptor.capture());
transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
// Call fails
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
// No real stream was ever created
verify(transport1, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
verify(transport2, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class));
}
/**
* Verify that if the first resolved address points to a server that is at first connected, but
* disconnected later, all calls will stick to the first address.
*/
@Test
public void firstResolvedServerConnectedThenDisconnected() throws Exception {
final SocketAddress addr1 = new SocketAddress() {
@Override public String toString() {
return "addr1";
}
};
final SocketAddress addr2 = new SocketAddress() {
@Override public String toString() {
return "addr2";
}
};
final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
// Addr1 will have two transports throughout this test.
final ManagedClientTransport transport1 = mock(ManagedClientTransport.class);
final ManagedClientTransport transport2 = mock(ManagedClientTransport.class);
when(transport1.newStream(any(MethodDescriptor.class), any(Metadata.class)))
.thenReturn(mock(ClientStream.class));
when(transport2.newStream(any(MethodDescriptor.class), any(Metadata.class)))
.thenReturn(mock(ClientStream.class));
when(mockTransportFactory.newClientTransport(same(addr1), any(String.class)))
.thenReturn(transport1, transport2);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory(Arrays.asList(server1, server2));
ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
// First call will use the first address
call.start(mockCallListener, headers);
verify(mockTransportFactory, timeout(1000)).newClientTransport(same(addr1), any(String.class));
verify(transport1, timeout(1000)).start(transportListenerCaptor.capture());
transportListenerCaptor.getValue().transportReady();
verify(transport1, timeout(1000)).newStream(same(method), same(headers));
transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
// Second call still use the first address, since it was successfully connected.
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
call2.start(mockCallListener, headers);
verify(goodTransport, timeout(1000)).newStream(same(method), same(headers));
verify(transport2, timeout(1000)).start(transportListenerCaptor.capture());
verify(mockTransportFactory, times(2)).newClientTransport(same(addr1), any(String.class));
transportListenerCaptor.getValue().transportReady();
verify(transport2, timeout(1000)).newStream(same(method), same(headers));
}
private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {

View File

@ -165,13 +165,12 @@ public class ManagedChannelImplTransportManagerTest {
ClientTransport t1 = tm.getTransport(addressGroup);
verify(mockTransportFactory, timeout(1000)).newClientTransport(addr, authority);
// The real transport
ClientTransport rt = transports.poll(1, TimeUnit.SECONDS).transport;
MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS);
transportInfo.listener.transportReady();
ClientTransport t2 = tm.getTransport(addressGroup);
// Make sure the first transport is always a real transport. This promise is especially made for
// InProcessTransport, because it may run into deadlock if it works under a delayed transport
// (https://github.com/grpc/grpc-java/issues/1510).
assertSame(rt, t1);
assertSame(rt, t2);
assertTrue(t1 instanceof DelayedClientTransport);
assertFalse(t2 instanceof DelayedClientTransport);
assertSame(transportInfo.transport, t2);
verify(mockBackoffPolicyProvider).get();
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verifyNoMoreInteractions(mockTransportFactory);

View File

@ -33,6 +33,9 @@ package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -106,7 +109,7 @@ public class TransportSetTest {
transports = TestUtils.captureTransports(mockTransportFactory);
}
@Test public void singleAddressBackoff() {
@Test public void singleAddressReconnect() {
SocketAddress addr = mock(SocketAddress.class);
createTransportSet(addr);
@ -159,7 +162,7 @@ public class TransportSetTest {
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
}
@Test public void twoAddressesBackoff() {
@Test public void twoAddressesReconnect() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createTransportSet(addr1, addr2);
@ -173,21 +176,29 @@ public class TransportSetTest {
int backoffReset = 0;
// First attempt
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport1 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertNull(delayedTransport1.getTransportSupplier());
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport2 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertSame(delayedTransport1, delayedTransport2);
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Delayed transport will see an error.
assertTrue(delayedTransport2.getTransportSupplier().get() instanceof FailingClientTransport);
// Third attempt is the first address, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport3 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport2, delayedTransport3);
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
fakeClock.forwardMillis(9);
@ -196,16 +207,23 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertNull(delayedTransport3.getTransportSupplier());
// Forth attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport4 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertSame(delayedTransport3, delayedTransport4);
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed again. Delayed transport will see an error
assertTrue(delayedTransport4.getTransportSupplier().get() instanceof FailingClientTransport);
// Fifth attempt for the first address, thus controlled by the second back-off interval.
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport5 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport4, delayedTransport5);
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
fakeClock.forwardMillis(99);
@ -214,12 +232,16 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let it through
transports.peek().listener.transportReady();
// Delayed transport will see the connected transport.
assertSame(transports.peek().transport, delayedTransport5.getTransportSupplier().get());
// Then close it.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// First attempt after a successful connection. Reset back-off policy, and start from the first
// address.
transportSet.obtainActiveTransport();
DelayedClientTransport delayedTransport6 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport5, delayedTransport6);
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
@ -297,11 +319,15 @@ public class TransportSetTest {
// Reconnect will eventually happen, even though TransportSet has been shut down
fakeClock.forwardMillis(10);
verify(mockTransportFactory, times(2)).newClientTransport(addr, authority);
// The pending stream will be started on this newly started transport, which is promptly shut
// down by TransportSet right after the stream is created.
// The pending stream will be started on this newly started transport after it's ready.
// The transport is shut down by TransportSet right after the stream is created.
transportInfo = transports.poll();
verify(transportInfo.transport, times(0)).newStream(same(method), same(headers));
verify(transportInfo.transport, times(0)).shutdown();
transportInfo.listener.transportReady();
verify(transportInfo.transport).newStream(same(method), same(headers));
verify(transportInfo.transport).shutdown();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, never()).onTerminated();
// Terminating the transport will let TransportSet to be terminated.
transportInfo.listener.transportTerminated();