diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java index 1904b7a71e..6deb4163de 100644 --- a/core/src/main/java/io/grpc/CallOptions.java +++ b/core/src/main/java/io/grpc/CallOptions.java @@ -72,6 +72,11 @@ public final class CallOptions { private Object[][] customOptions = new Object[0][2]; + /** + * Opposite to fail fast. + */ + private boolean waitForReady; + /** * Override the HTTP/2 authority the channel claims to be connecting to. This is not * generally safe. Overriding allows advanced users to re-use a single Channel for multiple @@ -182,6 +187,29 @@ public final class CallOptions { return newOptions; } + /** + * Enables 'wait for ready' feature for the call. + * 'Fail fast' + * is the default option for gRPC calls and 'wait for ready' is the opposite to it. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915") + public CallOptions withWaitForReady() { + CallOptions newOptions = new CallOptions(this); + newOptions.waitForReady = true; + return newOptions; + } + + /** + * Disables 'wait for ready' feature for the call. + * This method should be rarely used because the default is without 'wait for ready'. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915") + public CallOptions withoutWaitForReady() { + CallOptions newOptions = new CallOptions(this); + newOptions.waitForReady = false; + return newOptions; + } + /** * Returns the attributes for affinity-based routing. */ @@ -310,6 +338,16 @@ public final class CallOptions { private CallOptions() { } + /** + * Returns whether 'wait for ready' option is enabled for the call. + * 'Fail fast' + * is the default option for gRPC calls and 'wait for ready' is the opposite to it. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1915") + public boolean isWaitForReady() { + return waitForReady; + } + /** * Copy constructor. */ @@ -333,6 +371,7 @@ public final class CallOptions { toStringHelper.add("executor", executor != null ? executor.getClass() : null); toStringHelper.add("compressorName", compressorName); toStringHelper.add("customOptions", Arrays.toString(customOptions)); + toStringHelper.add("waitForReady", isWaitForReady()); return toStringHelper.toString(); } diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index cc2150276f..26311d6bdd 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -43,6 +43,7 @@ import io.grpc.Status; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.concurrent.Executor; @@ -76,6 +77,17 @@ class DelayedClientTransport implements ManagedClientTransport { @GuardedBy("lock") private boolean shutdown; + /** + * The delayed client transport will come into a back-off interval if it fails to establish a real + * transport for all addresses, namely the channel is in TRANSIENT_FAILURE. + * + *

If the transport is in a back-off interval, then all fail fast streams (including the + * pending as well as new ones) will fail immediately. New non-fail fast streams can be created as + * {@link PendingStream} and will keep pending during this back-off period. + */ + @GuardedBy("lock") + private boolean inBackoffPeriod; + DelayedClientTransport(Executor streamCreationExecutor) { this.streamCreationExecutor = streamCreationExecutor; } @@ -85,6 +97,17 @@ class DelayedClientTransport implements ManagedClientTransport { this.listener = Preconditions.checkNotNull(listener, "listener"); } + /** + * If the transport has acquired a transport {@link Supplier}, then returned stream is delegated + * from its supplier. + * + *

If the new stream to be created is with fail fast call option and the delayed transport is + * in a back-off interval, then a {@link FailingClientStream} is returned. + * + *

If it is not the above cases and the delayed transport is not shutdown, then a + * {@link PendingStream} is returned; if the transport is shutdown, then a + * {@link FailingClientStream} is returned. + */ @Override public ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions) { @@ -94,6 +117,10 @@ class DelayedClientTransport implements ManagedClientTransport { // Check again, since it may have changed while waiting for lock supplier = transportSupplier; if (supplier == null && !shutdown) { + if (inBackoffPeriod && !callOptions.isWaitForReady()) { + return new FailingClientStream(Status.UNAVAILABLE.withDescription( + "Terminated fail fast stream.")); + } PendingStream pendingStream = new PendingStream(method, headers, callOptions); pendingStreams.add(pendingStream); return pendingStream; @@ -190,6 +217,8 @@ class DelayedClientTransport implements ManagedClientTransport { * transport is {@link #shutdown}. */ public void setTransport(ClientTransport transport) { + Preconditions.checkArgument(this != transport, + "delayed transport calling setTransport on itself"); setTransportSupplier(Suppliers.ofInstance(transport)); } @@ -250,6 +279,69 @@ class DelayedClientTransport implements ManagedClientTransport { } } + /** + * True return value indicates that the delayed transport is in a back-off interval (in + * TRANSIENT_FAILURE), that all fail fast streams (including pending as well as new ones) should + * fail immediately, and that non-fail fast streams can be created as {@link PendingStream} and + * should keep pending during this back-off period. + */ + @VisibleForTesting + boolean isInBackoffPeriod() { + synchronized (lock) { + return inBackoffPeriod; + } + } + + /** + * Is only called at the beginning of {@link TransportSet#scheduleBackoff}. + * + *

Does jobs at the beginning of the back-off: + * + *

sets {@link #inBackoffPeriod} flag to true; + * + *

sets all pending streams with a fail fast call option of the delayed transport as + * {@link FailingClientStream}s, and removes them from the list of pending streams of the + * transport. + * + * @param status the causal status for triggering back-off. + */ + void startBackoff(final Status status) { + synchronized (lock) { + Preconditions.checkState(!inBackoffPeriod); + inBackoffPeriod = true; + final ArrayList failFastPendingStreams = new ArrayList(); + if (pendingStreams != null && !pendingStreams.isEmpty()) { + final Iterator it = pendingStreams.iterator(); + while (it.hasNext()) { + PendingStream stream = it.next(); + if (!stream.callOptions.isWaitForReady()) { + failFastPendingStreams.add(stream); + it.remove(); + } + } + streamCreationExecutor.execute(new Runnable() { + @Override + public void run() { + for (PendingStream stream : failFastPendingStreams) { + stream.setStream(new FailingClientStream(Status.UNAVAILABLE.withDescription( + "Terminated fail fast stream.").withCause(status.asException()))); + } + } + }); + } + } + } + + /** + * Is only called at the beginning of the callback function of {@code endOfCurrentBackoff} in the + * {@link TransportSet#scheduleBackoff} method. + */ + void endBackoff() { + synchronized (lock) { + inBackoffPeriod = false; + } + } + @Override public String getLogId() { return GrpcUtil.getLogId(this); @@ -266,8 +358,8 @@ class DelayedClientTransport implements ManagedClientTransport { private final Metadata headers; private final CallOptions callOptions; - private PendingStream(MethodDescriptor method, Metadata headers, CallOptions - callOptions) { + private PendingStream(MethodDescriptor method, Metadata headers, + CallOptions callOptions) { this.method = method; this.headers = headers; this.callOptions = callOptions; diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 21e4305aab..ea2d859ad7 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -94,7 +94,7 @@ final class TransportSet implements WithLogId { private ScheduledFuture reconnectTask; /** - * All transports that are not terminated. At the very least the value of {@link activeTransport} + * All transports that are not terminated. At the very least the value of {@link #activeTransport} * will be present, but previously used transports that still have streams or are stopping may * also be present. */ @@ -207,10 +207,12 @@ final class TransportSet implements WithLogId { } /** - * Only called after all addresses attempted and failed. + * Only called after all addresses attempted and failed (TRANSIENT_FAILURE). + * @param status the causal status when the channel begins transition to + * TRANSIENT_FAILURE. */ @GuardedBy("lock") - private void scheduleBackoff(final DelayedClientTransport delayedTransport) { + private void scheduleBackoff(final DelayedClientTransport delayedTransport, Status status) { Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); if (reconnectPolicy == null) { @@ -222,10 +224,12 @@ final class TransportSet implements WithLogId { log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", new Object[]{getLogId(), delayMillis}); } + delayedTransport.startBackoff(status); Runnable endOfCurrentBackoff = new Runnable() { @Override public void run() { try { + delayedTransport.endBackoff(); boolean shutdownDelayedTransport = false; synchronized (lock) { reconnectTask = null; @@ -414,17 +418,11 @@ final class TransportSet implements WithLogId { closedByServer = !shutdown; } else if (activeTransport == delayedTransport) { // Continue reconnect if there are still addresses to try. - // Fail if all addresses have been tried and failed in a row. if (nextAddressIndex == 0) { allAddressesFailed = true; - // Initiate backoff // Transition to TRANSIENT_FAILURE - DelayedClientTransport newDelayedTransport = new DelayedClientTransport(appExecutor); - transports.add(newDelayedTransport); - newDelayedTransport.start(new BaseTransportListener(newDelayedTransport)); - activeTransport = newDelayedTransport; - scheduleBackoff(newDelayedTransport); + scheduleBackoff(delayedTransport, s); } else { // Still CONNECTING startNewTransport(delayedTransport); @@ -433,8 +431,6 @@ final class TransportSet implements WithLogId { } loadBalancer.handleTransportShutdown(addressGroup, s); if (allAddressesFailed) { - delayedTransport.setTransport(new FailingClientTransport(s)); - delayedTransport.shutdown(); callback.onAllAddressesFailed(); } if (closedByServer) { diff --git a/core/src/test/java/io/grpc/CallOptionsTest.java b/core/src/test/java/io/grpc/CallOptionsTest.java index 98fb3ebad0..339f610488 100644 --- a/core/src/test/java/io/grpc/CallOptionsTest.java +++ b/core/src/test/java/io/grpc/CallOptionsTest.java @@ -65,7 +65,8 @@ public class CallOptionsTest { .withAuthority(sampleAuthority) .withDeadline(sampleDeadline) .withAffinity(sampleAffinity) - .withCredentials(sampleCreds); + .withCredentials(sampleCreds) + .withWaitForReady(); private CallOptions.Key option1 = CallOptions.Key.of("option1", "default"); private CallOptions.Key option2 = CallOptions.Key.of("option2", "default"); @@ -76,6 +77,14 @@ public class CallOptionsTest { assertThat(CallOptions.DEFAULT.getAffinity()).isEqualTo(Attributes.EMPTY); assertThat(CallOptions.DEFAULT.getExecutor()).isNull(); assertThat(CallOptions.DEFAULT.getCredentials()).isNull(); + assertThat(CallOptions.DEFAULT.isWaitForReady()).isFalse(); + } + + @Test + public void withAndWithoutWaitForReady() { + assertThat(CallOptions.DEFAULT.withWaitForReady().isWaitForReady()).isTrue(); + assertThat(CallOptions.DEFAULT.withWaitForReady().withoutWaitForReady().isWaitForReady()) + .isFalse(); } @Test @@ -84,6 +93,7 @@ public class CallOptionsTest { assertThat(allSet.getDeadline()).isSameAs(sampleDeadline); assertThat(allSet.getAffinity()).isSameAs(sampleAffinity); assertThat(allSet.getCredentials()).isSameAs(sampleCreds); + assertThat(allSet.isWaitForReady()).isTrue(); } @Test @@ -141,7 +151,7 @@ public class CallOptionsTest { String expected = "CallOptions{deadline=null, authority=authority, callCredentials=null, " + "affinity={sample=blah}, " + "executor=class io.grpc.internal.SerializingExecutor, compressorName=null, " - + "customOptions=[]}"; + + "customOptions=[], waitForReady=false}"; String actual = allSet .withDeadline(null) .withExecutor(new SerializingExecutor(directExecutor())) @@ -154,8 +164,9 @@ public class CallOptionsTest { @Test public void toStringMatches_noDeadline() { assertThat("CallOptions{deadline=null, authority=null, callCredentials=null, " - + "affinity={}, executor=null, compressorName=null, customOptions=[]}") - .isEqualTo(CallOptions.DEFAULT.toString()); + + "affinity={}, executor=null, compressorName=null, customOptions=[], " + + "waitForReady=false}") + .isEqualTo(CallOptions.DEFAULT.toString()); } @Test diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 5a43bdc268..59891e515f 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -35,6 +35,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.same; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,6 +49,7 @@ import io.grpc.IntegerMarshaller; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; +import io.grpc.StatusException; import io.grpc.StringMarshaller; import org.junit.After; @@ -279,4 +281,28 @@ public class DelayedClientTransportTest { verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); } + + @Test public void startBackOff_ClearsFailFastPendingStreams() { + final Status cause = Status.UNKNOWN; + final CallOptions failFastCallOptions = CallOptions.DEFAULT; + final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady(); + final ClientStream ffStream = delayedTransport.newStream(method, headers, failFastCallOptions); + delayedTransport.newStream(method, headers, waitForReadyCallOptions); + delayedTransport.newStream(method, headers, failFastCallOptions); + assertEquals(3, delayedTransport.getPendingStreamsCount()); + + delayedTransport.startBackoff(cause); + assertTrue(delayedTransport.isInBackoffPeriod()); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + + ffStream.start(streamListener); + // Fail fast stream not failed yet. + verify(streamListener, never()).closed(any(Status.class), any(Metadata.class)); + + fakeExecutor.runDueTasks(); + // Now fail fast stream failed. + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.UNAVAILABLE.getCode(), statusCaptor.getValue().getCode()); + assertEquals(cause, ((StatusException) statusCaptor.getValue().getCause()).getStatus()); + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index a0608f9a63..0c79cfd5b7 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -279,9 +279,8 @@ public class ManagedChannelImplTransportManagerTest { ClientTransport t4 = tm.getTransport(addressGroup); assertNotNull(t4); // If backoff's DelayedTransport is still active, this is necessary. Otherwise it would be racy. - t4.newStream(method, new Metadata()); + t4.newStream(method, new Metadata(), CallOptions.DEFAULT.withWaitForReady()); verify(mockTransportFactory, timeout(1000).times(++transportsAddr1)) - .newClientTransport(addr1, authority, userAgent); // Back-off policy was reset and consulted. verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index f7423780bf..b391ebcd1c 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -32,6 +32,7 @@ package io.grpc.internal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; @@ -97,6 +98,8 @@ public class TransportSetTest { MethodDescriptor.MethodType.UNKNOWN, "/service/method", new StringMarshaller(), new IntegerMarshaller()); private final Metadata headers = new Metadata(); + private final CallOptions waitForReadyCallOptions = CallOptions.DEFAULT.withWaitForReady(); + private final CallOptions failFastCallOptions = CallOptions.DEFAULT; private TransportSet transportSet; private EquivalentAddressGroup addressGroup; @@ -132,7 +135,7 @@ public class TransportSetTest { int onAllAddressesFailed = 0; // First attempt - transportSet.obtainActiveTransport().newStream(method, new Metadata()); + transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport(addr, authority, userAgent); @@ -144,7 +147,6 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); // Second attempt - transportSet.obtainActiveTransport().newStream(method, new Metadata()); // Transport creation doesn't happen until time is due fakeClock.forwardMillis(9); verify(mockTransportFactory, times(transportsCreated)) @@ -160,7 +162,6 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); // Third attempt - transportSet.obtainActiveTransport().newStream(method, new Metadata()); // Transport creation doesn't happen until time is due fakeClock.forwardMillis(99); verify(mockTransportFactory, times(transportsCreated)) @@ -178,7 +179,7 @@ public class TransportSetTest { verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed(); // Back-off is reset, and the next attempt will happen immediately - transportSet.obtainActiveTransport().newStream(method, new Metadata()); + transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport(addr, authority, userAgent); @@ -207,9 +208,9 @@ public class TransportSetTest { // First attempt DelayedClientTransport delayedTransport1 = (DelayedClientTransport) transportSet.obtainActiveTransport(); - delayedTransport1.newStream(method, new Metadata()); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport(addr1, authority, userAgent); + delayedTransport1.newStream(method, new Metadata(), waitForReadyCallOptions); // Let this one fail without success transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertNull(delayedTransport1.getTransportSupplier()); @@ -219,14 +220,14 @@ public class TransportSetTest { DelayedClientTransport delayedTransport2 = (DelayedClientTransport) transportSet.obtainActiveTransport(); assertSame(delayedTransport1, delayedTransport2); - delayedTransport2.newStream(method, new Metadata()); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport(addr2, authority, userAgent); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // All addresses have failed. Delayed transport will see an error. - assertTrue(delayedTransport2.getTransportSupplier().get() instanceof FailingClientTransport); + // All addresses have failed. Delayed transport will be in back-off interval. + assertNull(delayedTransport2.getTransportSupplier()); + assertTrue(delayedTransport2.isInBackoffPeriod()); verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed(); // Backoff reset and first back-off interval begins verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); @@ -235,8 +236,7 @@ public class TransportSetTest { // Third attempt is the first address, thus controlled by the first back-off interval. DelayedClientTransport delayedTransport3 = (DelayedClientTransport) transportSet.obtainActiveTransport(); - assertNotSame(delayedTransport2, delayedTransport3); - delayedTransport3.newStream(method, new Metadata()); + assertSame(delayedTransport2, delayedTransport3); fakeClock.forwardMillis(9); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport(addr1, authority, userAgent); @@ -252,14 +252,14 @@ public class TransportSetTest { DelayedClientTransport delayedTransport4 = (DelayedClientTransport) transportSet.obtainActiveTransport(); assertSame(delayedTransport3, delayedTransport4); - delayedTransport4.newStream(method, new Metadata()); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport(addr2, authority, userAgent); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // All addresses have failed again. Delayed transport will see an error - assertTrue(delayedTransport4.getTransportSupplier().get() instanceof FailingClientTransport); + // All addresses have failed again. Delayed transport will be in back-off interval. + assertNull(delayedTransport4.getTransportSupplier()); + assertTrue(delayedTransport4.isInBackoffPeriod()); verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed(); // Second back-off interval begins verify(mockBackoffPolicy1, times(++backoff1Consulted)).nextBackoffMillis(); @@ -268,8 +268,7 @@ public class TransportSetTest { // Fifth attempt for the first address, thus controlled by the second back-off interval. DelayedClientTransport delayedTransport5 = (DelayedClientTransport) transportSet.obtainActiveTransport(); - assertNotSame(delayedTransport4, delayedTransport5); - delayedTransport5.newStream(method, new Metadata()); + assertSame(delayedTransport4, delayedTransport5); fakeClock.forwardMillis(99); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport(addr1, authority, userAgent); @@ -291,7 +290,7 @@ public class TransportSetTest { DelayedClientTransport delayedTransport6 = (DelayedClientTransport) transportSet.obtainActiveTransport(); assertNotSame(delayedTransport5, delayedTransport6); - delayedTransport6.newStream(method, new Metadata()); + delayedTransport6.newStream(method, headers, waitForReadyCallOptions); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport(addr1, authority, userAgent); @@ -304,14 +303,14 @@ public class TransportSetTest { DelayedClientTransport delayedTransport7 = (DelayedClientTransport) transportSet.obtainActiveTransport(); assertSame(delayedTransport6, delayedTransport7); - delayedTransport7.newStream(method, new Metadata()); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport(addr2, authority, userAgent); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); - // All addresses have failed. Delayed transport will see an error. - assertTrue(delayedTransport7.getTransportSupplier().get() instanceof FailingClientTransport); + // All addresses have failed. Delayed transport will be in back-off interval. + assertNull(delayedTransport7.getTransportSupplier()); + assertTrue(delayedTransport7.isInBackoffPeriod()); verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed(); // Back-off reset and first back-off interval begins verify(mockBackoffPolicy2, times(++backoff2Consulted)).nextBackoffMillis(); @@ -320,8 +319,7 @@ public class TransportSetTest { // Third attempt is the first address, thus controlled by the first back-off interval. DelayedClientTransport delayedTransport8 = (DelayedClientTransport) transportSet.obtainActiveTransport(); - assertNotSame(delayedTransport7, delayedTransport8); - delayedTransport8.newStream(method, new Metadata()); + assertSame(delayedTransport7, delayedTransport8); fakeClock.forwardMillis(9); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport(addr1, authority, userAgent); @@ -337,6 +335,80 @@ public class TransportSetTest { fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test. } + @Test + public void verifyFailFastAndNonFailFastBehaviors() { + int pendingStreamsCount = 0; + int failFastPendingStreamsCount = 0; + + final SocketAddress addr1 = mock(SocketAddress.class); + final SocketAddress addr2 = mock(SocketAddress.class); + createTransportSet(addr1, addr2); + + final DelayedClientTransport delayedTransport = + (DelayedClientTransport) transportSet.obtainActiveTransport(); + // Now transport is in CONNECTING. + assertFalse(delayedTransport.isInBackoffPeriod()); + + // Create a new fail fast stream. + delayedTransport.newStream(method, headers, failFastCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + failFastPendingStreamsCount++; + // Create a new non fail fast stream. + delayedTransport.newStream(method, headers, waitForReadyCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + + // Let this 1st address fail without success. + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // Now transport is still in CONNECTING. + assertFalse(delayedTransport.isInBackoffPeriod()); + // Verify pending streams still in queue. + assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + + // Create a new fail fast stream. + delayedTransport.newStream(method, headers, failFastCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + failFastPendingStreamsCount++; + // Create a new non fail fast stream + delayedTransport.newStream(method, headers, waitForReadyCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + + // Let this 2nd address fail without success. + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + // Now transport is still in TRANSIENT_FAILURE. + assertTrue(delayedTransport.isInBackoffPeriod()); + // Fail fast pending streams should be cleared + assertEquals(pendingStreamsCount - failFastPendingStreamsCount, + delayedTransport.getPendingStreamsCount()); + pendingStreamsCount -= failFastPendingStreamsCount; + failFastPendingStreamsCount = 0; + + // Create a new fail fast stream. + delayedTransport.newStream(method, headers, failFastCallOptions); + // Verify it is not queued. + assertEquals(pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + // Create a new non fail fast stream + delayedTransport.newStream(method, headers, waitForReadyCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + + fakeClock.forwardMillis(10); + // Now back-off is over + assertFalse(delayedTransport.isInBackoffPeriod()); + + // Create a new fail fast stream. + delayedTransport.newStream(method, headers, failFastCallOptions); + // Verify it is queued. + assertEquals(++pendingStreamsCount, delayedTransport.getPendingStreamsCount()); + failFastPendingStreamsCount++; + assertEquals(1, failFastPendingStreamsCount); + + fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test. + } + @Test public void connectIsLazy() { SocketAddress addr = mock(SocketAddress.class); @@ -371,7 +443,7 @@ public class TransportSetTest { transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Request immediately, but will wait for back-off before reconnecting - transportSet.obtainActiveTransport().newStream(method, new Metadata()); + transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions); verify(mockTransportFactory, times(transportsCreated)) .newClientTransport(addr, authority, userAgent); fakeClock.forwardMillis(100); @@ -428,7 +500,7 @@ public class TransportSetTest { pick = transportSet.obtainActiveTransport(); assertTrue(pick instanceof DelayedClientTransport); // Start a stream, which will be pending in the delayed transport - ClientStream pendingStream = pick.newStream(method, headers); + ClientStream pendingStream = pick.newStream(method, headers, waitForReadyCallOptions); pendingStream.start(mockStreamListener); // Shut down TransportSet before the transport is created. Further call to @@ -452,7 +524,7 @@ public class TransportSetTest { any(MethodDescriptor.class), any(Metadata.class)); assertEquals(1, fakeExecutor.runDueTasks()); verify(transportInfo.transport).newStream(same(method), same(headers), - same(CallOptions.DEFAULT)); + same(waitForReadyCallOptions)); verify(transportInfo.transport).shutdown(); transportInfo.listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportSetCallback, never()).onTerminated();