diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport2.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport2.java new file mode 100644 index 0000000000..c787b96c36 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport2.java @@ -0,0 +1,345 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import io.grpc.CallOptions; +import io.grpc.Context; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.concurrent.Executor; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * A client transport that queues requests before a real transport is available. When {@link + * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a + * transport for each pending stream. + * + *

This transport owns every stream that it has created until a real transport has been picked + * for that stream, at which point the ownership of the stream is transferred to the real transport, + * thus the delayed transport stops owning the stream. + */ +final class DelayedClientTransport2 implements ManagedClientTransport { + + private final LogId lodId = LogId.allocate(getClass().getName()); + + private final Object lock = new Object(); + + private final Executor streamCreationExecutor; + + private Listener listener; + + @GuardedBy("lock") + private Collection pendingStreams = new LinkedHashSet(); + + /** + * When shutdown == true and pendingStreams == null, then the transport is considered terminated. + */ + @GuardedBy("lock") + private boolean shutdown; + + /** + * The last picker that {@link #reprocess} has used. + */ + @GuardedBy("lock") + @Nullable + private SubchannelPicker lastPicker; + + DelayedClientTransport2(Executor streamCreationExecutor) { + this.streamCreationExecutor = streamCreationExecutor; + } + + @Override + public final Runnable start(Listener listener) { + this.listener = Preconditions.checkNotNull(listener, "listener"); + return null; + } + + /** + * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last + * picker will be consulted. + * + *

Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is + * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned. + */ + @Override + public final ClientStream newStream(MethodDescriptor method, Metadata headers, + CallOptions callOptions, StatsTraceContext statsTraceCtx) { + SubchannelPicker picker = null; + synchronized (lock) { + if (!shutdown) { + if (lastPicker == null) { + return createPendingStream(method, headers, callOptions, statsTraceCtx); + } + picker = lastPicker; + } + } + if (picker != null) { + while (true) { + PickResult pickResult = picker.pickSubchannel(callOptions.getAffinity(), headers); + ClientTransport transport = GrpcUtil.getTransportFromPickResult( + pickResult, callOptions.isWaitForReady()); + if (transport != null) { + return transport.newStream(method, headers, callOptions, statsTraceCtx); + } + // This picker's conclusion is "buffer". If there hasn't been a newer picker set + // (possible race with reprocess()), we will buffer it. Otherwise, will try with the new + // picker. + synchronized (lock) { + if (shutdown) { + break; + } + if (picker == lastPicker) { + return createPendingStream(method, headers, callOptions, statsTraceCtx); + } + picker = lastPicker; + } + } + } + return new FailingClientStream(Status.UNAVAILABLE.withDescription( + "Channel has shutdown (reported by delayed transport)")); + } + + @Override + public final ClientStream newStream(MethodDescriptor method, Metadata headers) { + return newStream(method, headers, CallOptions.DEFAULT, StatsTraceContext.NOOP); + } + + @GuardedBy("lock") + private PendingStream createPendingStream(MethodDescriptor method, Metadata headers, + CallOptions callOptions, StatsTraceContext statsTraceCtx) { + PendingStream pendingStream = new PendingStream(method, headers, callOptions, + statsTraceCtx); + pendingStreams.add(pendingStream); + if (pendingStreams.size() == 1) { + listener.transportInUse(true); + } + return pendingStream; + } + + @Override + public final void ping(final PingCallback callback, Executor executor) { + throw new UnsupportedOperationException("This method is not expected to be called"); + } + + /** + * Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are + * not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called, + * you still need to call {@link #setTransport} to make this transport terminated. + */ + @Override + public final void shutdown() { + synchronized (lock) { + if (shutdown) { + return; + } + shutdown = true; + listener.transportShutdown( + Status.UNAVAILABLE.withDescription("Channel requested transport to shut down")); + if (pendingStreams == null || pendingStreams.isEmpty()) { + pendingStreams = null; + listener.transportTerminated(); + } + } + } + + /** + * Shuts down this transport and cancels all streams that it owns, hence immediately terminates + * this transport. + */ + @Override + public final void shutdownNow(Status status) { + shutdown(); + Collection savedPendingStreams = null; + synchronized (lock) { + if (pendingStreams != null) { + savedPendingStreams = pendingStreams; + pendingStreams = null; + } + } + if (savedPendingStreams != null) { + for (PendingStream stream : savedPendingStreams) { + stream.cancel(status); + } + listener.transportTerminated(); + } + // If savedPendingStreams == null, transportTerminated() has already been called in shutdown(). + } + + public final boolean hasPendingStreams() { + synchronized (lock) { + return pendingStreams != null && !pendingStreams.isEmpty(); + } + } + + @VisibleForTesting + final int getPendingStreamsCount() { + synchronized (lock) { + return pendingStreams == null ? 0 : pendingStreams.size(); + } + } + + /** + * Use the picker to try picking a transport for every pending stream, proceed the stream if the + * pick is successful, otherwise keep it pending. + * + *

This method may be called concurrently with {@code newStream()}, and it's safe. All pending + * streams will be served by the latest picker as soon as possible. + * + *

This method must not be called concurrently, with itself or with {@link + * #setTransportSupplier}/{@link #setTransport}. + * + * @return the version number of the given picker. + */ + final void reprocess(SubchannelPicker picker) { + ArrayList toProcess; + ArrayList toRemove = new ArrayList(); + synchronized (lock) { + lastPicker = picker; + if (pendingStreams == null || pendingStreams.isEmpty()) { + return; + } + toProcess = new ArrayList(pendingStreams); + } + + for (final PendingStream stream : toProcess) { + PickResult pickResult = picker.pickSubchannel( + stream.callOptions.getAffinity(), stream.headers); + final ClientTransport transport = GrpcUtil.getTransportFromPickResult( + pickResult, stream.callOptions.isWaitForReady()); + if (transport != null) { + Executor executor = streamCreationExecutor; + // createRealStream may be expensive. It will start real streams on the transport. If + // there are pending requests, they will be serialized too, which may be expensive. Since + // we are now on transport thread, we need to offload the work to an executor. + if (stream.callOptions.getExecutor() != null) { + executor = stream.callOptions.getExecutor(); + } + executor.execute(new Runnable() { + @Override + public void run() { + stream.createRealStream(transport); + } + }); + toRemove.add(stream); + } // else: stay pending + } + + synchronized (lock) { + // Between this synchronized and the previous one: + // - Streams may have been cancelled, which may turn pendingStreams into emptiness. + // - shutdown() may be called, which may turn pendingStreams into null. + if (pendingStreams == null || pendingStreams.isEmpty()) { + return; + } + pendingStreams.removeAll(toRemove); + if (pendingStreams.isEmpty()) { + // There may be a brief gap between delayed transport clearing in-use state, and first real + // transport starting streams and setting in-use state. During the gap the whole channel's + // in-use state may be false. However, it shouldn't cause spurious switching to idleness + // (which would shutdown the transports and LoadBalancer) because the gap should be shorter + // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). + listener.transportInUse(false); + if (shutdown) { + pendingStreams = null; + listener.transportTerminated(); + } else { + // Because delayed transport is long-lived, we take this opportunity to down-size the + // hashmap. + pendingStreams = new LinkedHashSet(); + } + } + } + } + + // TODO(carl-mastrangelo): remove this once the Subchannel change is in. + @Override + public LogId getLogId() { + return lodId; + } + + private class PendingStream extends DelayedStream { + private final MethodDescriptor method; + private final Metadata headers; + private final CallOptions callOptions; + private final Context context; + private final StatsTraceContext statsTraceCtx; + + private PendingStream(MethodDescriptor method, Metadata headers, + CallOptions callOptions, StatsTraceContext statsTraceCtx) { + this.method = method; + this.headers = headers; + this.callOptions = callOptions; + this.context = Context.current(); + this.statsTraceCtx = statsTraceCtx; + } + + private void createRealStream(ClientTransport transport) { + ClientStream realStream; + Context origContext = context.attach(); + try { + realStream = transport.newStream(method, headers, callOptions, statsTraceCtx); + } finally { + context.detach(origContext); + } + setStream(realStream); + } + + @Override + public void cancel(Status reason) { + super.cancel(reason); + synchronized (lock) { + if (pendingStreams != null) { + boolean justRemovedAnElement = pendingStreams.remove(this); + if (pendingStreams.isEmpty() && justRemovedAnElement) { + listener.transportInUse(false); + if (shutdown) { + pendingStreams = null; + listener.transportTerminated(); + } + } + } + } + } + } +} diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index c738fc7873..b62f3d840a 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -41,6 +41,8 @@ import com.google.common.base.Supplier; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.Subchannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.SharedResourceHolder.Resource; @@ -535,6 +537,27 @@ public final class GrpcUtil { } } + /** + * Returns a transport out of a PickResult, or {@code null} if the result is "buffer". + */ + @Nullable + static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) { + ClientTransport transport; + Subchannel subchannel = result.getSubchannel(); + if (subchannel != null) { + transport = ((SubchannelImpl) subchannel).obtainActiveTransport(); + } else { + transport = null; + } + if (transport != null) { + return transport; + } + if (!result.getStatus().isOk() && !isWaitForReady) { + return new FailingClientTransport(result.getStatus()); + } + return null; + } + private GrpcUtil() {} private static String getImplementationVersion() { diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java b/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java new file mode 100644 index 0000000000..6abad191f7 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransport2Test.java @@ -0,0 +1,465 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.IntegerMarshaller; +import io.grpc.LoadBalancer2.PickResult; +import io.grpc.LoadBalancer2.SubchannelPicker; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.StringMarshaller; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.concurrent.Executor; + +/** + * Unit tests for {@link DelayedClientTransport2}. + */ +@RunWith(JUnit4.class) +public class DelayedClientTransport2Test { + @Mock private ManagedClientTransport.Listener transportListener; + @Mock private SubchannelPicker mockPicker; + @Mock private SubchannelImpl mockSubchannel; + @Mock private ClientTransport mockRealTransport; + @Mock private ClientTransport mockRealTransport2; + @Mock private ClientStream mockRealStream; + @Mock private ClientStream mockRealStream2; + @Mock private ClientStreamListener streamListener; + @Mock private Executor mockExecutor; + @Captor private ArgumentCaptor statusCaptor; + @Captor private ArgumentCaptor listenerCaptor; + + private static final Attributes.Key SHARD_ID = Attributes.Key.of("shard-id"); + + private final MethodDescriptor method = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method", + new StringMarshaller(), new IntegerMarshaller()); + + private final MethodDescriptor method2 = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method2", + new StringMarshaller(), new IntegerMarshaller()); + + private final Metadata headers = new Metadata(); + private final Metadata headers2 = new Metadata(); + + private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); + private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2"); + private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext( + method.getFullMethodName(), NoopCensusContextFactory.INSTANCE, + GrpcUtil.STOPWATCH_SUPPLIER); + private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext( + method2.getFullMethodName(), NoopCensusContextFactory.INSTANCE, + GrpcUtil.STOPWATCH_SUPPLIER); + + private final FakeClock fakeExecutor = new FakeClock(); + + private final DelayedClientTransport2 delayedTransport = + new DelayedClientTransport2(fakeExecutor.getScheduledExecutorService()); + + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class))) + .thenReturn(PickResult.withSubchannel(mockSubchannel)); + when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(mockRealTransport.newStream(same(method), same(headers), same(callOptions), + same(statsTraceCtx))) + .thenReturn(mockRealStream); + when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2), + same(statsTraceCtx2))) + .thenReturn(mockRealStream2); + delayedTransport.start(transportListener); + } + + @After public void noMorePendingTasks() { + assertEquals(0, fakeExecutor.numPendingTasks()); + } + + @Test public void streamStartThenAssignTransport() { + assertFalse(delayedTransport.hasPendingStreams()); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertTrue(delayedTransport.hasPendingStreams()); + assertTrue(stream instanceof DelayedStream); + assertEquals(0, fakeExecutor.numPendingTasks()); + delayedTransport.reprocess(mockPicker); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertFalse(delayedTransport.hasPendingStreams()); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions), + same(statsTraceCtx)); + verify(mockRealStream).start(listenerCaptor.capture()); + verifyNoMoreInteractions(streamListener); + listenerCaptor.getValue().onReady(); + verify(streamListener).onReady(); + verifyNoMoreInteractions(streamListener); + } + + @Test public void newStreamThenAssignTransportThenShutdown() { + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertTrue(stream instanceof DelayedStream); + delayedTransport.reprocess(mockPicker); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + assertEquals(1, fakeExecutor.runDueTasks()); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions), + same(statsTraceCtx)); + stream.start(streamListener); + verify(mockRealStream).start(same(streamListener)); + } + + @Test public void transportTerminatedThenAssignTransport() { + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + delayedTransport.reprocess(mockPicker); + verifyNoMoreInteractions(transportListener); + } + + @Test public void assignTransportThenShutdownThenNewStream() { + delayedTransport.reprocess(mockPicker); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertTrue(stream instanceof FailingClientStream); + verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class), any(StatsTraceContext.class)); + } + + @Test public void assignTransportThenShutdownNowThenNewStream() { + delayedTransport.reprocess(mockPicker); + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + assertTrue(stream instanceof FailingClientStream); + verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class), any(StatsTraceContext.class)); + } + + @Test public void cancelStreamWithoutSetTransport() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void startThenCancelStreamWithoutSetTransport() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void newStreamThenShutdownTransportThenAssignTransport() { + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx); + stream.start(streamListener); + delayedTransport.shutdown(); + + // Stream is still buffered + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener, times(0)).transportTerminated(); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + + // ... and will proceed if a real transport is available + delayedTransport.reprocess(mockPicker); + fakeExecutor.runDueTasks(); + verify(mockRealTransport).newStream(method, headers, callOptions, statsTraceCtx); + verify(mockRealStream).start(any(ClientStreamListener.class)); + + // Since no more streams are pending, delayed transport is now terminated + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(transportListener).transportTerminated(); + + // Further newStream() will return a failing stream + stream = delayedTransport.newStream(method, new Metadata()); + verify(streamListener, never()).closed(any(Status.class), any(Metadata.class)); + stream.start(streamListener); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void newStreamThenShutdownTransportThenCancelStream() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener, times(0)).transportTerminated(); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + verify(transportListener).transportTerminated(); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void shutdownThenNewStream() { + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } + + @Test public void startStreamThenShutdownNow() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } + + @Test public void shutdownNowThenNewStream() { + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } + + @Test public void reprocessSemantics() { + Attributes affinity1 = Attributes.newBuilder().set(SHARD_ID, 1).build(); + Attributes affinity2 = Attributes.newBuilder().set(SHARD_ID, 2).build(); + CallOptions failFastCallOptions = CallOptions.DEFAULT.withAffinity(affinity1); + CallOptions waitForReadyCallOptions = + CallOptions.DEFAULT.withWaitForReady().withAffinity(affinity2); + + SubchannelImpl subchannel1 = mock(SubchannelImpl.class); + SubchannelImpl subchannel2 = mock(SubchannelImpl.class); + SubchannelImpl subchannel3 = mock(SubchannelImpl.class); + when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream); + when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream2); + when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport); + when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2); + when(subchannel3.obtainActiveTransport()).thenReturn(null); + + // Fail-fast streams + DelayedStream ff1 = (DelayedStream) delayedTransport.newStream( + method, headers, failFastCallOptions, statsTraceCtx); + verify(transportListener).transportInUse(true); + DelayedStream ff2 = (DelayedStream) delayedTransport.newStream( + method2, headers2, failFastCallOptions, statsTraceCtx); + DelayedStream ff3 = (DelayedStream) delayedTransport.newStream( + method, headers, failFastCallOptions, statsTraceCtx); + DelayedStream ff4 = (DelayedStream) delayedTransport.newStream( + method2, headers2, failFastCallOptions, statsTraceCtx); + + // Wait-for-ready streams + FakeClock wfr3Executor = new FakeClock(); + DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream( + method, headers, waitForReadyCallOptions, statsTraceCtx); + DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream( + method2, headers2, waitForReadyCallOptions, statsTraceCtx); + DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream( + method, headers, + waitForReadyCallOptions.withExecutor(wfr3Executor.getScheduledExecutorService()), + statsTraceCtx); + DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream( + method2, headers2, waitForReadyCallOptions, statsTraceCtx); + assertEquals(8, delayedTransport.getPendingStreamsCount()); + + // First reprocess(). Some will proceed, some will fail and the rest will stay buffered. + SubchannelPicker picker = mock(SubchannelPicker.class); + when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn( + // For the fail-fast streams + PickResult.withSubchannel(subchannel1), // ff1: proceed + PickResult.withError(Status.UNAVAILABLE), // ff2: fail + PickResult.withSubchannel(subchannel3), // ff3: stay + PickResult.withNoResult(), // ff4: stay + // For the wait-for-ready streams + PickResult.withSubchannel(subchannel2), // wfr1: proceed + PickResult.withError(Status.RESOURCE_EXHAUSTED), // wfr2: stay + PickResult.withSubchannel(subchannel3)); // wfr3: stay + InOrder inOrder = inOrder(picker); + delayedTransport.reprocess(picker); + + assertEquals(5, delayedTransport.getPendingStreamsCount()); + inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff1 + inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff2 + inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3 + inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4 + inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr1 + inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2 + inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3 + inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4 + inOrder.verifyNoMoreInteractions(); + // Make sure that real transport creates streams in the executor + verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class), + any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class)); + verify(mockRealTransport2, never()).newStream(any(MethodDescriptor.class), + any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class)); + fakeExecutor.runDueTasks(); + assertEquals(0, fakeExecutor.numPendingTasks()); + // ff1 and wfr1 went through + verify(mockRealTransport).newStream(method, headers, failFastCallOptions, statsTraceCtx); + verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions, statsTraceCtx); + assertSame(mockRealStream, ff1.getRealStream()); + assertSame(mockRealStream2, wfr1.getRealStream()); + // The ff2 has failed due to picker returning an error + assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError()); + // Other streams are still buffered + assertNull(ff3.getRealStream()); + assertNull(ff4.getRealStream()); + assertNull(wfr2.getRealStream()); + assertNull(wfr3.getRealStream()); + assertNull(wfr4.getRealStream()); + + // Second reprocess(). All existing streams will proceed. + picker = mock(SubchannelPicker.class); + when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn( + PickResult.withSubchannel(subchannel1), // ff3 + PickResult.withSubchannel(subchannel2), // ff4 + PickResult.withSubchannel(subchannel2), // wfr2 + PickResult.withSubchannel(subchannel1), // wfr3 + PickResult.withSubchannel(subchannel2), // wfr4 + PickResult.withNoResult()); // wfr5 (not yet created) + inOrder = inOrder(picker); + assertEquals(0, wfr3Executor.numPendingTasks()); + verify(transportListener, never()).transportInUse(false); + + delayedTransport.reprocess(picker); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(transportListener).transportInUse(false); + inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3 + inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4 + inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2 + inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3 + inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4 + inOrder.verifyNoMoreInteractions(); + fakeExecutor.runDueTasks(); + assertEquals(0, fakeExecutor.numPendingTasks()); + assertSame(mockRealStream, ff3.getRealStream()); + assertSame(mockRealStream2, ff4.getRealStream()); + assertSame(mockRealStream2, wfr2.getRealStream()); + assertSame(mockRealStream2, wfr4.getRealStream()); + + // If there is an executor in the CallOptions, it will be used to create the real tream. + assertNull(wfr3.getRealStream()); + wfr3Executor.runDueTasks(); + assertSame(mockRealStream, wfr3.getRealStream()); + + // New streams will use the last picker + DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream( + method, headers, waitForReadyCallOptions, statsTraceCtx); + assertNull(wfr5.getRealStream()); + inOrder.verify(picker).pickSubchannel(affinity2, headers); + inOrder.verifyNoMoreInteractions(); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + + // wfr5 will stop delayed transport from terminating + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener, never()).transportTerminated(); + // ... until it's gone + picker = mock(SubchannelPicker.class); + when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn( + PickResult.withSubchannel(subchannel1)); + delayedTransport.reprocess(picker); + verify(picker).pickSubchannel(affinity2, headers); + fakeExecutor.runDueTasks(); + assertSame(mockRealStream, wfr5.getRealStream()); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(transportListener).transportTerminated(); + } + + @Test + public void reprocess_NoPendingStream() { + SubchannelPicker picker = mock(SubchannelPicker.class); + SubchannelImpl subchannel = mock(SubchannelImpl.class); + when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport); + when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn( + PickResult.withSubchannel(subchannel)); + when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream); + delayedTransport.reprocess(picker); + verifyNoMoreInteractions(picker); + verifyNoMoreInteractions(transportListener); + + // Though picker was not originally used, it will be saved and serve future streams. + ClientStream stream = delayedTransport.newStream( + method, headers, CallOptions.DEFAULT, statsTraceCtx); + verify(picker).pickSubchannel(CallOptions.DEFAULT.getAffinity(), headers); + verify(subchannel).obtainActiveTransport(); + assertSame(mockRealStream, stream); + } +}