core: fail fast implementation

resolves #1759
This commit is contained in:
ZHANG Dapeng 2016-06-11 09:30:16 -07:00 committed by GitHub
parent 432cec7973
commit 9d4a43fb79
7 changed files with 279 additions and 44 deletions

View File

@ -72,6 +72,11 @@ public final class CallOptions {
private Object[][] customOptions = new Object[0][2];
/**
* Opposite to fail fast.
*/
private boolean waitForReady;
/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@ -182,6 +187,29 @@ public final class CallOptions {
return newOptions;
}
/**
* Enables 'wait for ready' feature for the call.
* <a href="https://github.com/grpc/grpc/blob/master/doc/fail_fast.md">'Fail fast'</a>
* is the default option for gRPC calls and 'wait for ready' is the opposite to it.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915")
public CallOptions withWaitForReady() {
CallOptions newOptions = new CallOptions(this);
newOptions.waitForReady = true;
return newOptions;
}
/**
* Disables 'wait for ready' feature for the call.
* This method should be rarely used because the default is without 'wait for ready'.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915")
public CallOptions withoutWaitForReady() {
CallOptions newOptions = new CallOptions(this);
newOptions.waitForReady = false;
return newOptions;
}
/**
* Returns the attributes for affinity-based routing.
*/
@ -310,6 +338,16 @@ public final class CallOptions {
private CallOptions() {
}
/**
* Returns whether 'wait for ready' option is enabled for the call.
* <a href="https://github.com/grpc/grpc/blob/master/doc/fail_fast.md">'Fail fast'</a>
* is the default option for gRPC calls and 'wait for ready' is the opposite to it.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915")
public boolean isWaitForReady() {
return waitForReady;
}
/**
* Copy constructor.
*/
@ -333,6 +371,7 @@ public final class CallOptions {
toStringHelper.add("executor", executor != null ? executor.getClass() : null);
toStringHelper.add("compressorName", compressorName);
toStringHelper.add("customOptions", Arrays.toString(customOptions));
toStringHelper.add("waitForReady", isWaitForReady());
return toStringHelper.toString();
}

View File

@ -43,6 +43,7 @@ import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
@ -76,6 +77,17 @@ class DelayedClientTransport implements ManagedClientTransport {
@GuardedBy("lock")
private boolean shutdown;
/**
* The delayed client transport will come into a back-off interval if it fails to establish a real
* transport for all addresses, namely the channel is in TRANSIENT_FAILURE.
*
* <p>If the transport is in a back-off interval, then all fail fast streams (including the
* pending as well as new ones) will fail immediately. New non-fail fast streams can be created as
* {@link PendingStream} and will keep pending during this back-off period.
*/
@GuardedBy("lock")
private boolean inBackoffPeriod;
DelayedClientTransport(Executor streamCreationExecutor) {
this.streamCreationExecutor = streamCreationExecutor;
}
@ -85,6 +97,17 @@ class DelayedClientTransport implements ManagedClientTransport {
this.listener = Preconditions.checkNotNull(listener, "listener");
}
/**
* If the transport has acquired a transport {@link Supplier}, then returned stream is delegated
* from its supplier.
*
* <p>If the new stream to be created is with fail fast call option and the delayed transport is
* in a back-off interval, then a {@link FailingClientStream} is returned.
*
* <p>If it is not the above cases and the delayed transport is not shutdown, then a
* {@link PendingStream} is returned; if the transport is shutdown, then a
* {@link FailingClientStream} is returned.
*/
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers, CallOptions
callOptions) {
@ -94,6 +117,10 @@ class DelayedClientTransport implements ManagedClientTransport {
// Check again, since it may have changed while waiting for lock
supplier = transportSupplier;
if (supplier == null && !shutdown) {
if (inBackoffPeriod && !callOptions.isWaitForReady()) {
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Terminated fail fast stream."));
}
PendingStream pendingStream = new PendingStream(method, headers, callOptions);
pendingStreams.add(pendingStream);
return pendingStream;
@ -190,6 +217,8 @@ class DelayedClientTransport implements ManagedClientTransport {
* transport is {@link #shutdown}.
*/
public void setTransport(ClientTransport transport) {
Preconditions.checkArgument(this != transport,
"delayed transport calling setTransport on itself");
setTransportSupplier(Suppliers.ofInstance(transport));
}
@ -250,6 +279,69 @@ class DelayedClientTransport implements ManagedClientTransport {
}
}
/**
* True return value indicates that the delayed transport is in a back-off interval (in
* TRANSIENT_FAILURE), that all fail fast streams (including pending as well as new ones) should
* fail immediately, and that non-fail fast streams can be created as {@link PendingStream} and
* should keep pending during this back-off period.
*/
@VisibleForTesting
boolean isInBackoffPeriod() {
synchronized (lock) {
return inBackoffPeriod;
}
}
/**
* Is only called at the beginning of {@link TransportSet#scheduleBackoff}.
*
* <p>Does jobs at the beginning of the back-off:
*
* <p>sets {@link #inBackoffPeriod} flag to true;
*
* <p>sets all pending streams with a fail fast call option of the delayed transport as
* {@link FailingClientStream}s, and removes them from the list of pending streams of the
* transport.
*
* @param status the causal status for triggering back-off.
*/
void startBackoff(final Status status) {
synchronized (lock) {
Preconditions.checkState(!inBackoffPeriod);
inBackoffPeriod = true;
final ArrayList<PendingStream> failFastPendingStreams = new ArrayList<PendingStream>();
if (pendingStreams != null && !pendingStreams.isEmpty()) {
final Iterator<PendingStream> it = pendingStreams.iterator();
while (it.hasNext()) {
PendingStream stream = it.next();
if (!stream.callOptions.isWaitForReady()) {
failFastPendingStreams.add(stream);
it.remove();
}
}
streamCreationExecutor.execute(new Runnable() {
@Override
public void run() {
for (PendingStream stream : failFastPendingStreams) {
stream.setStream(new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Terminated fail fast stream.").withCause(status.asException())));
}
}
});
}
}
}
/**
* Is only called at the beginning of the callback function of {@code endOfCurrentBackoff} in the
* {@link TransportSet#scheduleBackoff} method.
*/
void endBackoff() {
synchronized (lock) {
inBackoffPeriod = false;
}
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
@ -266,8 +358,8 @@ class DelayedClientTransport implements ManagedClientTransport {
private final Metadata headers;
private final CallOptions callOptions;
private PendingStream(MethodDescriptor<?, ?> method, Metadata headers, CallOptions
callOptions) {
private PendingStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions) {
this.method = method;
this.headers = headers;
this.callOptions = callOptions;

View File

@ -94,7 +94,7 @@ final class TransportSet implements WithLogId {
private ScheduledFuture<?> reconnectTask;
/**
* All transports that are not terminated. At the very least the value of {@link activeTransport}
* All transports that are not terminated. At the very least the value of {@link #activeTransport}
* will be present, but previously used transports that still have streams or are stopping may
* also be present.
*/
@ -207,10 +207,12 @@ final class TransportSet implements WithLogId {
}
/**
* Only called after all addresses attempted and failed.
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
* @param status the causal status when the channel begins transition to
* TRANSIENT_FAILURE.
*/
@GuardedBy("lock")
private void scheduleBackoff(final DelayedClientTransport delayedTransport) {
private void scheduleBackoff(final DelayedClientTransport delayedTransport, Status status) {
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
if (reconnectPolicy == null) {
@ -222,10 +224,12 @@ final class TransportSet implements WithLogId {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms",
new Object[]{getLogId(), delayMillis});
}
delayedTransport.startBackoff(status);
Runnable endOfCurrentBackoff = new Runnable() {
@Override
public void run() {
try {
delayedTransport.endBackoff();
boolean shutdownDelayedTransport = false;
synchronized (lock) {
reconnectTask = null;
@ -414,17 +418,11 @@ final class TransportSet implements WithLogId {
closedByServer = !shutdown;
} else if (activeTransport == delayedTransport) {
// Continue reconnect if there are still addresses to try.
// Fail if all addresses have been tried and failed in a row.
if (nextAddressIndex == 0) {
allAddressesFailed = true;
// Initiate backoff
// Transition to TRANSIENT_FAILURE
DelayedClientTransport newDelayedTransport = new DelayedClientTransport(appExecutor);
transports.add(newDelayedTransport);
newDelayedTransport.start(new BaseTransportListener(newDelayedTransport));
activeTransport = newDelayedTransport;
scheduleBackoff(newDelayedTransport);
scheduleBackoff(delayedTransport, s);
} else {
// Still CONNECTING
startNewTransport(delayedTransport);
@ -433,8 +431,6 @@ final class TransportSet implements WithLogId {
}
loadBalancer.handleTransportShutdown(addressGroup, s);
if (allAddressesFailed) {
delayedTransport.setTransport(new FailingClientTransport(s));
delayedTransport.shutdown();
callback.onAllAddressesFailed();
}
if (closedByServer) {

View File

@ -65,7 +65,8 @@ public class CallOptionsTest {
.withAuthority(sampleAuthority)
.withDeadline(sampleDeadline)
.withAffinity(sampleAffinity)
.withCredentials(sampleCreds);
.withCredentials(sampleCreds)
.withWaitForReady();
private CallOptions.Key<String> option1 = CallOptions.Key.of("option1", "default");
private CallOptions.Key<String> option2 = CallOptions.Key.of("option2", "default");
@ -76,6 +77,14 @@ public class CallOptionsTest {
assertThat(CallOptions.DEFAULT.getAffinity()).isEqualTo(Attributes.EMPTY);
assertThat(CallOptions.DEFAULT.getExecutor()).isNull();
assertThat(CallOptions.DEFAULT.getCredentials()).isNull();
assertThat(CallOptions.DEFAULT.isWaitForReady()).isFalse();
}
@Test
public void withAndWithoutWaitForReady() {
assertThat(CallOptions.DEFAULT.withWaitForReady().isWaitForReady()).isTrue();
assertThat(CallOptions.DEFAULT.withWaitForReady().withoutWaitForReady().isWaitForReady())
.isFalse();
}
@Test
@ -84,6 +93,7 @@ public class CallOptionsTest {
assertThat(allSet.getDeadline()).isSameAs(sampleDeadline);
assertThat(allSet.getAffinity()).isSameAs(sampleAffinity);
assertThat(allSet.getCredentials()).isSameAs(sampleCreds);
assertThat(allSet.isWaitForReady()).isTrue();
}
@Test
@ -141,7 +151,7 @@ public class CallOptionsTest {
String expected = "CallOptions{deadline=null, authority=authority, callCredentials=null, "
+ "affinity={sample=blah}, "
+ "executor=class io.grpc.internal.SerializingExecutor, compressorName=null, "
+ "customOptions=[]}";
+ "customOptions=[], waitForReady=false}";
String actual = allSet
.withDeadline(null)
.withExecutor(new SerializingExecutor(directExecutor()))
@ -154,8 +164,9 @@ public class CallOptionsTest {
@Test
public void toStringMatches_noDeadline() {
assertThat("CallOptions{deadline=null, authority=null, callCredentials=null, "
+ "affinity={}, executor=null, compressorName=null, customOptions=[]}")
.isEqualTo(CallOptions.DEFAULT.toString());
+ "affinity={}, executor=null, compressorName=null, customOptions=[], "
+ "waitForReady=false}")
.isEqualTo(CallOptions.DEFAULT.toString());
}
@Test

View File

@ -35,6 +35,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -48,6 +49,7 @@ import io.grpc.IntegerMarshaller;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StringMarshaller;
import org.junit.After;
@ -279,4 +281,28 @@ public class DelayedClientTransportTest {
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void startBackOff_ClearsFailFastPendingStreams() {
final Status cause = Status.UNKNOWN;
final CallOptions failFastCallOptions = CallOptions.DEFAULT;
final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady();
final ClientStream ffStream = delayedTransport.newStream(method, headers, failFastCallOptions);
delayedTransport.newStream(method, headers, waitForReadyCallOptions);
delayedTransport.newStream(method, headers, failFastCallOptions);
assertEquals(3, delayedTransport.getPendingStreamsCount());
delayedTransport.startBackoff(cause);
assertTrue(delayedTransport.isInBackoffPeriod());
assertEquals(1, delayedTransport.getPendingStreamsCount());
ffStream.start(streamListener);
// Fail fast stream not failed yet.
verify(streamListener, never()).closed(any(Status.class), any(Metadata.class));
fakeExecutor.runDueTasks();
// Now fail fast stream failed.
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.UNAVAILABLE.getCode(), statusCaptor.getValue().getCode());
assertEquals(cause, ((StatusException) statusCaptor.getValue().getCause()).getStatus());
}
}

View File

@ -279,9 +279,8 @@ public class ManagedChannelImplTransportManagerTest {
ClientTransport t4 = tm.getTransport(addressGroup);
assertNotNull(t4);
// If backoff's DelayedTransport is still active, this is necessary. Otherwise it would be racy.
t4.newStream(method, new Metadata());
t4.newStream(method, new Metadata(), CallOptions.DEFAULT.withWaitForReady());
verify(mockTransportFactory, timeout(1000).times(++transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
// Back-off policy was reset and consulted.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();

View File

@ -32,6 +32,7 @@
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
@ -97,6 +98,8 @@ public class TransportSetTest {
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final Metadata headers = new Metadata();
private final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady();
private final CallOptions failFastCallOptions = CallOptions.DEFAULT;
private TransportSet transportSet;
private EquivalentAddressGroup addressGroup;
@ -132,7 +135,7 @@ public class TransportSetTest {
int onAllAddressesFailed = 0;
// First attempt
transportSet.obtainActiveTransport().newStream(method, new Metadata());
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, authority, userAgent);
@ -144,7 +147,6 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Second attempt
transportSet.obtainActiveTransport().newStream(method, new Metadata());
// Transport creation doesn't happen until time is due
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsCreated))
@ -160,7 +162,6 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
// Third attempt
transportSet.obtainActiveTransport().newStream(method, new Metadata());
// Transport creation doesn't happen until time is due
fakeClock.forwardMillis(99);
verify(mockTransportFactory, times(transportsCreated))
@ -178,7 +179,7 @@ public class TransportSetTest {
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
// Back-off is reset, and the next attempt will happen immediately
transportSet.obtainActiveTransport().newStream(method, new Metadata());
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, authority, userAgent);
@ -207,9 +208,9 @@ public class TransportSetTest {
// First attempt
DelayedClientTransport delayedTransport1 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
delayedTransport1.newStream(method, new Metadata());
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
delayedTransport1.newStream(method, new Metadata(), waitForReadyCallOptions);
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertNull(delayedTransport1.getTransportSupplier());
@ -219,14 +220,14 @@ public class TransportSetTest {
DelayedClientTransport delayedTransport2 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertSame(delayedTransport1, delayedTransport2);
delayedTransport2.newStream(method, new Metadata());
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, authority, userAgent);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Delayed transport will see an error.
assertTrue(delayedTransport2.getTransportSupplier().get() instanceof FailingClientTransport);
// All addresses have failed. Delayed transport will be in back-off interval.
assertNull(delayedTransport2.getTransportSupplier());
assertTrue(delayedTransport2.isInBackoffPeriod());
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
// Backoff reset and first back-off interval begins
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
@ -235,8 +236,7 @@ public class TransportSetTest {
// Third attempt is the first address, thus controlled by the first back-off interval.
DelayedClientTransport delayedTransport3 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport2, delayedTransport3);
delayedTransport3.newStream(method, new Metadata());
assertSame(delayedTransport2, delayedTransport3);
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
@ -252,14 +252,14 @@ public class TransportSetTest {
DelayedClientTransport delayedTransport4 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertSame(delayedTransport3, delayedTransport4);
delayedTransport4.newStream(method, new Metadata());
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, authority, userAgent);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed again. Delayed transport will see an error
assertTrue(delayedTransport4.getTransportSupplier().get() instanceof FailingClientTransport);
// All addresses have failed again. Delayed transport will be in back-off interval.
assertNull(delayedTransport4.getTransportSupplier());
assertTrue(delayedTransport4.isInBackoffPeriod());
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
// Second back-off interval begins
verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis();
@ -268,8 +268,7 @@ public class TransportSetTest {
// Fifth attempt for the first address, thus controlled by the second back-off interval.
DelayedClientTransport delayedTransport5 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport4, delayedTransport5);
delayedTransport5.newStream(method, new Metadata());
assertSame(delayedTransport4, delayedTransport5);
fakeClock.forwardMillis(99);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
@ -291,7 +290,7 @@ public class TransportSetTest {
DelayedClientTransport delayedTransport6 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport5, delayedTransport6);
delayedTransport6.newStream(method, new Metadata());
delayedTransport6.newStream(method, headers, waitForReadyCallOptions);
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
@ -304,14 +303,14 @@ public class TransportSetTest {
DelayedClientTransport delayedTransport7 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertSame(delayedTransport6, delayedTransport7);
delayedTransport7.newStream(method, new Metadata());
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport(addr2, authority, userAgent);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Delayed transport will see an error.
assertTrue(delayedTransport7.getTransportSupplier().get() instanceof FailingClientTransport);
// All addresses have failed. Delayed transport will be in back-off interval.
assertNull(delayedTransport7.getTransportSupplier());
assertTrue(delayedTransport7.isInBackoffPeriod());
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();
// Back-off reset and first back-off interval begins
verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis();
@ -320,8 +319,7 @@ public class TransportSetTest {
// Third attempt is the first address, thus controlled by the first back-off interval.
DelayedClientTransport delayedTransport8 =
(DelayedClientTransport) transportSet.obtainActiveTransport();
assertNotSame(delayedTransport7, delayedTransport8);
delayedTransport8.newStream(method, new Metadata());
assertSame(delayedTransport7, delayedTransport8);
fakeClock.forwardMillis(9);
verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport(addr1, authority, userAgent);
@ -337,6 +335,80 @@ public class TransportSetTest {
fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
}
@Test
public void verifyFailFastAndNonFailFastBehaviors() {
int pendingStreamsCount = 0;
int failFastPendingStreamsCount = 0;
final SocketAddress addr1 = mock(SocketAddress.class);
final SocketAddress addr2 = mock(SocketAddress.class);
createTransportSet(addr1, addr2);
final DelayedClientTransport delayedTransport =
(DelayedClientTransport) transportSet.obtainActiveTransport();
// Now transport is in CONNECTING.
assertFalse(delayedTransport.isInBackoffPeriod());
// Create a new fail fast stream.
delayedTransport.newStream(method, headers, failFastCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
failFastPendingStreamsCount++;
// Create a new non fail fast stream.
delayedTransport.newStream(method, headers, waitForReadyCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
// Let this 1st address fail without success.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Now transport is still in CONNECTING.
assertFalse(delayedTransport.isInBackoffPeriod());
// Verify pending streams still in queue.
assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount());
// Create a new fail fast stream.
delayedTransport.newStream(method, headers, failFastCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
failFastPendingStreamsCount++;
// Create a new non fail fast stream
delayedTransport.newStream(method, headers, waitForReadyCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
// Let this 2nd address fail without success.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Now transport is still in TRANSIENT_FAILURE.
assertTrue(delayedTransport.isInBackoffPeriod());
// Fail fast pending streams should be cleared
assertEquals(pendingStreamsCount - failFastPendingStreamsCount,
delayedTransport.getPendingStreamsCount());
pendingStreamsCount -= failFastPendingStreamsCount;
failFastPendingStreamsCount = 0;
// Create a new fail fast stream.
delayedTransport.newStream(method, headers, failFastCallOptions);
// Verify it is not queued.
assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount());
// Create a new non fail fast stream
delayedTransport.newStream(method, headers, waitForReadyCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
fakeClock.forwardMillis(10);
// Now back-off is over
assertFalse(delayedTransport.isInBackoffPeriod());
// Create a new fail fast stream.
delayedTransport.newStream(method, headers, failFastCallOptions);
// Verify it is queued.
assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount());
failFastPendingStreamsCount++;
assertEquals(1, failFastPendingStreamsCount);
fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
}
@Test
public void connectIsLazy() {
SocketAddress addr = mock(SocketAddress.class);
@ -371,7 +443,7 @@ public class TransportSetTest {
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Request immediately, but will wait for back-off before reconnecting
transportSet.obtainActiveTransport().newStream(method, new Metadata());
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
verify(mockTransportFactory, times(transportsCreated))
.newClientTransport(addr, authority, userAgent);
fakeClock.forwardMillis(100);
@ -428,7 +500,7 @@ public class TransportSetTest {
pick = transportSet.obtainActiveTransport();
assertTrue(pick instanceof DelayedClientTransport);
// Start a stream, which will be pending in the delayed transport
ClientStream pendingStream = pick.newStream(method, headers);
ClientStream pendingStream = pick.newStream(method, headers, waitForReadyCallOptions);
pendingStream.start(mockStreamListener);
// Shut down TransportSet before the transport is created. Further call to
@ -452,7 +524,7 @@ public class TransportSetTest {
any(MethodDescriptor.class), any(Metadata.class));
assertEquals(1, fakeExecutor.runDueTasks());
verify(transportInfo.transport).newStream(same(method), same(headers),
same(CallOptions.DEFAULT));
same(waitForReadyCallOptions));
verify(transportInfo.transport).shutdown();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, never()).onTerminated();