core: handle a race in DelayedClientTransport(2). (#2486)

* Fork DelayedClientTransport into DelayedClientTransport2, and fix a race
on it.  Consider the following sequence:

1. Channel is created. Picker is initially null

2. Channel has a new RPC. Because picker is null, the delayed transport
is picked, but newStream() is not called yet.

3. LoadBalancer updates a new picker to Channel. Channel runs
reprocess() but does nothing because no pending stream is there.

4. newStream() called on the delayed transport.

In previous implementation, newStream() on step 4 will not see the
picker. It will only use the next picker.

After this change, delayed transport would save the latest picker and
use it on newStream(), which is the desired behavior.

Also deleted all the code that will not be used after the LB refactor.


* Also fixed a bug: newStream() should always return a failing stream if it's shutdown. Previously it's not doing so if picker is not null.
This commit is contained in:
Kun Zhang 2016-12-12 09:24:13 -08:00 committed by GitHub
parent bd9e041b74
commit 178b5260c2
3 changed files with 833 additions and 0 deletions

View File

@ -0,0 +1,345 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A client transport that queues requests before a real transport is available. When {@link
* #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
* transport for each pending stream.
*
* <p>This transport owns every stream that it has created until a real transport has been picked
* for that stream, at which point the ownership of the stream is transferred to the real transport,
* thus the delayed transport stops owning the stream.
*/
final class DelayedClientTransport2 implements ManagedClientTransport {
private final LogId lodId = LogId.allocate(getClass().getName());
private final Object lock = new Object();
private final Executor streamCreationExecutor;
private Listener listener;
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<PendingStream>();
/**
* When shutdown == true and pendingStreams == null, then the transport is considered terminated.
*/
@GuardedBy("lock")
private boolean shutdown;
/**
* The last picker that {@link #reprocess} has used.
*/
@GuardedBy("lock")
@Nullable
private SubchannelPicker lastPicker;
DelayedClientTransport2(Executor streamCreationExecutor) {
this.streamCreationExecutor = streamCreationExecutor;
}
@Override
public final Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
return null;
}
/**
* If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
* picker will be consulted.
*
* <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
* returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
*/
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
SubchannelPicker picker = null;
synchronized (lock) {
if (!shutdown) {
if (lastPicker == null) {
return createPendingStream(method, headers, callOptions, statsTraceCtx);
}
picker = lastPicker;
}
}
if (picker != null) {
while (true) {
PickResult pickResult = picker.pickSubchannel(callOptions.getAffinity(), headers);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(method, headers, callOptions, statsTraceCtx);
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set
// (possible race with reprocess()), we will buffer it. Otherwise, will try with the new
// picker.
synchronized (lock) {
if (shutdown) {
break;
}
if (picker == lastPicker) {
return createPendingStream(method, headers, callOptions, statsTraceCtx);
}
picker = lastPicker;
}
}
}
return new FailingClientStream(Status.UNAVAILABLE.withDescription(
"Channel has shutdown (reported by delayed transport)"));
}
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
return newStream(method, headers, CallOptions.DEFAULT, StatsTraceContext.NOOP);
}
@GuardedBy("lock")
private PendingStream createPendingStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
PendingStream pendingStream = new PendingStream(method, headers, callOptions,
statsTraceCtx);
pendingStreams.add(pendingStream);
if (pendingStreams.size() == 1) {
listener.transportInUse(true);
}
return pendingStream;
}
@Override
public final void ping(final PingCallback callback, Executor executor) {
throw new UnsupportedOperationException("This method is not expected to be called");
}
/**
* Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are
* not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called,
* you still need to call {@link #setTransport} to make this transport terminated.
*/
@Override
public final void shutdown() {
synchronized (lock) {
if (shutdown) {
return;
}
shutdown = true;
listener.transportShutdown(
Status.UNAVAILABLE.withDescription("Channel requested transport to shut down"));
if (pendingStreams == null || pendingStreams.isEmpty()) {
pendingStreams = null;
listener.transportTerminated();
}
}
}
/**
* Shuts down this transport and cancels all streams that it owns, hence immediately terminates
* this transport.
*/
@Override
public final void shutdownNow(Status status) {
shutdown();
Collection<PendingStream> savedPendingStreams = null;
synchronized (lock) {
if (pendingStreams != null) {
savedPendingStreams = pendingStreams;
pendingStreams = null;
}
}
if (savedPendingStreams != null) {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
}
listener.transportTerminated();
}
// If savedPendingStreams == null, transportTerminated() has already been called in shutdown().
}
public final boolean hasPendingStreams() {
synchronized (lock) {
return pendingStreams != null && !pendingStreams.isEmpty();
}
}
@VisibleForTesting
final int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams == null ? 0 : pendingStreams.size();
}
}
/**
* Use the picker to try picking a transport for every pending stream, proceed the stream if the
* pick is successful, otherwise keep it pending.
*
* <p>This method may be called concurrently with {@code newStream()}, and it's safe. All pending
* streams will be served by the latest picker as soon as possible.
*
* <p>This method <strong>must not</strong> be called concurrently, with itself or with {@link
* #setTransportSupplier}/{@link #setTransport}.
*
* @return the version number of the given picker.
*/
final void reprocess(SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
ArrayList<PendingStream> toRemove = new ArrayList<PendingStream>();
synchronized (lock) {
lastPicker = picker;
if (pendingStreams == null || pendingStreams.isEmpty()) {
return;
}
toProcess = new ArrayList<PendingStream>(pendingStreams);
}
for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(
stream.callOptions.getAffinity(), stream.headers);
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, stream.callOptions.isWaitForReady());
if (transport != null) {
Executor executor = streamCreationExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (stream.callOptions.getExecutor() != null) {
executor = stream.callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
toRemove.add(stream);
} // else: stay pending
}
synchronized (lock) {
// Between this synchronized and the previous one:
// - Streams may have been cancelled, which may turn pendingStreams into emptiness.
// - shutdown() may be called, which may turn pendingStreams into null.
if (pendingStreams == null || pendingStreams.isEmpty()) {
return;
}
pendingStreams.removeAll(toRemove);
if (pendingStreams.isEmpty()) {
// There may be a brief gap between delayed transport clearing in-use state, and first real
// transport starting streams and setting in-use state. During the gap the whole channel's
// in-use state may be false. However, it shouldn't cause spurious switching to idleness
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
listener.transportInUse(false);
if (shutdown) {
pendingStreams = null;
listener.transportTerminated();
} else {
// Because delayed transport is long-lived, we take this opportunity to down-size the
// hashmap.
pendingStreams = new LinkedHashSet<PendingStream>();
}
}
}
}
// TODO(carl-mastrangelo): remove this once the Subchannel change is in.
@Override
public LogId getLogId() {
return lodId;
}
private class PendingStream extends DelayedStream {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;
private final CallOptions callOptions;
private final Context context;
private final StatsTraceContext statsTraceCtx;
private PendingStream(MethodDescriptor<?, ?> method, Metadata headers,
CallOptions callOptions, StatsTraceContext statsTraceCtx) {
this.method = method;
this.headers = headers;
this.callOptions = callOptions;
this.context = Context.current();
this.statsTraceCtx = statsTraceCtx;
}
private void createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
realStream = transport.newStream(method, headers, callOptions, statsTraceCtx);
} finally {
context.detach(origContext);
}
setStream(realStream);
}
@Override
public void cancel(Status reason) {
super.cancel(reason);
synchronized (lock) {
if (pendingStreams != null) {
boolean justRemovedAnElement = pendingStreams.remove(this);
if (pendingStreams.isEmpty() && justRemovedAnElement) {
listener.transportInUse(false);
if (shutdown) {
pendingStreams = null;
listener.transportTerminated();
}
}
}
}
}
}
}

View File

@ -41,6 +41,8 @@ import com.google.common.base.Supplier;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.Subchannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource;
@ -535,6 +537,27 @@ public final class GrpcUtil {
}
}
/**
* Returns a transport out of a PickResult, or {@code null} if the result is "buffer".
*/
@Nullable
static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) {
ClientTransport transport;
Subchannel subchannel = result.getSubchannel();
if (subchannel != null) {
transport = ((SubchannelImpl) subchannel).obtainActiveTransport();
} else {
transport = null;
}
if (transport != null) {
return transport;
}
if (!result.getStatus().isOk() && !isWaitForReady) {
return new FailingClientTransport(result.getStatus());
}
return null;
}
private GrpcUtil() {}
private static String getImplementationVersion() {

View File

@ -0,0 +1,465 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer2.PickResult;
import io.grpc.LoadBalancer2.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.concurrent.Executor;
/**
* Unit tests for {@link DelayedClientTransport2}.
*/
@RunWith(JUnit4.class)
public class DelayedClientTransport2Test {
@Mock private ManagedClientTransport.Listener transportListener;
@Mock private SubchannelPicker mockPicker;
@Mock private SubchannelImpl mockSubchannel;
@Mock private ClientTransport mockRealTransport;
@Mock private ClientTransport mockRealTransport2;
@Mock private ClientStream mockRealStream;
@Mock private ClientStream mockRealStream2;
@Mock private ClientStreamListener streamListener;
@Mock private Executor mockExecutor;
@Captor private ArgumentCaptor<Status> statusCaptor;
@Captor private ArgumentCaptor<ClientStreamListener> listenerCaptor;
private static final Attributes.Key<Integer> SHARD_ID = Attributes.Key.of("shard-id");
private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final MethodDescriptor<String, Integer> method2 = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method2",
new StringMarshaller(), new IntegerMarshaller());
private final Metadata headers = new Metadata();
private final Metadata headers2 = new Metadata();
private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value");
private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2");
private final StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(
method.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
GrpcUtil.STOPWATCH_SUPPLIER);
private final StatsTraceContext statsTraceCtx2 = StatsTraceContext.newClientContext(
method2.getFullMethodName(), NoopCensusContextFactory.INSTANCE,
GrpcUtil.STOPWATCH_SUPPLIER);
private final FakeClock fakeExecutor = new FakeClock();
private final DelayedClientTransport2 delayedTransport =
new DelayedClientTransport2(fakeExecutor.getScheduledExecutorService());
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockPicker.pickSubchannel(any(Attributes.class), any(Metadata.class)))
.thenReturn(PickResult.withSubchannel(mockSubchannel));
when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(mockRealTransport.newStream(same(method), same(headers), same(callOptions),
same(statsTraceCtx)))
.thenReturn(mockRealStream);
when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2),
same(statsTraceCtx2)))
.thenReturn(mockRealStream2);
delayedTransport.start(transportListener);
}
@After public void noMorePendingTasks() {
assertEquals(0, fakeExecutor.numPendingTasks());
}
@Test public void streamStartThenAssignTransport() {
assertFalse(delayedTransport.hasPendingStreams());
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertTrue(delayedTransport.hasPendingStreams());
assertTrue(stream instanceof DelayedStream);
assertEquals(0, fakeExecutor.numPendingTasks());
delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertFalse(delayedTransport.hasPendingStreams());
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
same(statsTraceCtx));
verify(mockRealStream).start(listenerCaptor.capture());
verifyNoMoreInteractions(streamListener);
listenerCaptor.getValue().onReady();
verify(streamListener).onReady();
verifyNoMoreInteractions(streamListener);
}
@Test public void newStreamThenAssignTransportThenShutdown() {
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertTrue(stream instanceof DelayedStream);
delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions),
same(statsTraceCtx));
stream.start(streamListener);
verify(mockRealStream).start(same(streamListener));
}
@Test public void transportTerminatedThenAssignTransport() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
delayedTransport.reprocess(mockPicker);
verifyNoMoreInteractions(transportListener);
}
@Test public void assignTransportThenShutdownThenNewStream() {
delayedTransport.reprocess(mockPicker);
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertTrue(stream instanceof FailingClientStream);
verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class), any(StatsTraceContext.class));
}
@Test public void assignTransportThenShutdownNowThenNewStream() {
delayedTransport.reprocess(mockPicker);
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertTrue(stream instanceof FailingClientStream);
verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class), any(StatsTraceContext.class));
}
@Test public void cancelStreamWithoutSetTransport() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void startThenCancelStreamWithoutSetTransport() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void newStreamThenShutdownTransportThenAssignTransport() {
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, statsTraceCtx);
stream.start(streamListener);
delayedTransport.shutdown();
// Stream is still buffered
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener, times(0)).transportTerminated();
assertEquals(1, delayedTransport.getPendingStreamsCount());
// ... and will proceed if a real transport is available
delayedTransport.reprocess(mockPicker);
fakeExecutor.runDueTasks();
verify(mockRealTransport).newStream(method, headers, callOptions, statsTraceCtx);
verify(mockRealStream).start(any(ClientStreamListener.class));
// Since no more streams are pending, delayed transport is now terminated
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(transportListener).transportTerminated();
// Further newStream() will return a failing stream
stream = delayedTransport.newStream(method, new Metadata());
verify(streamListener, never()).closed(any(Status.class), any(Metadata.class));
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
assertEquals(0, delayedTransport.getPendingStreamsCount());
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void newStreamThenShutdownTransportThenCancelStream() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener, times(0)).transportTerminated();
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
verify(transportListener).transportTerminated();
assertEquals(0, delayedTransport.getPendingStreamsCount());
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void shutdownThenNewStream() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void startStreamThenShutdownNow() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void shutdownNowThenNewStream() {
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void reprocessSemantics() {
Attributes affinity1 = Attributes.newBuilder().set(SHARD_ID, 1).build();
Attributes affinity2 = Attributes.newBuilder().set(SHARD_ID, 2).build();
CallOptions failFastCallOptions = CallOptions.DEFAULT.withAffinity(affinity1);
CallOptions waitForReadyCallOptions =
CallOptions.DEFAULT.withWaitForReady().withAffinity(affinity2);
SubchannelImpl subchannel1 = mock(SubchannelImpl.class);
SubchannelImpl subchannel2 = mock(SubchannelImpl.class);
SubchannelImpl subchannel3 = mock(SubchannelImpl.class);
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream);
when(mockRealTransport2.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream2);
when(subchannel1.obtainActiveTransport()).thenReturn(mockRealTransport);
when(subchannel2.obtainActiveTransport()).thenReturn(mockRealTransport2);
when(subchannel3.obtainActiveTransport()).thenReturn(null);
// Fail-fast streams
DelayedStream ff1 = (DelayedStream) delayedTransport.newStream(
method, headers, failFastCallOptions, statsTraceCtx);
verify(transportListener).transportInUse(true);
DelayedStream ff2 = (DelayedStream) delayedTransport.newStream(
method2, headers2, failFastCallOptions, statsTraceCtx);
DelayedStream ff3 = (DelayedStream) delayedTransport.newStream(
method, headers, failFastCallOptions, statsTraceCtx);
DelayedStream ff4 = (DelayedStream) delayedTransport.newStream(
method2, headers2, failFastCallOptions, statsTraceCtx);
// Wait-for-ready streams
FakeClock wfr3Executor = new FakeClock();
DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream(
method, headers, waitForReadyCallOptions, statsTraceCtx);
DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream(
method2, headers2, waitForReadyCallOptions, statsTraceCtx);
DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream(
method, headers,
waitForReadyCallOptions.withExecutor(wfr3Executor.getScheduledExecutorService()),
statsTraceCtx);
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
method2, headers2, waitForReadyCallOptions, statsTraceCtx);
assertEquals(8, delayedTransport.getPendingStreamsCount());
// First reprocess(). Some will proceed, some will fail and the rest will stay buffered.
SubchannelPicker picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
// For the fail-fast streams
PickResult.withSubchannel(subchannel1), // ff1: proceed
PickResult.withError(Status.UNAVAILABLE), // ff2: fail
PickResult.withSubchannel(subchannel3), // ff3: stay
PickResult.withNoResult(), // ff4: stay
// For the wait-for-ready streams
PickResult.withSubchannel(subchannel2), // wfr1: proceed
PickResult.withError(Status.RESOURCE_EXHAUSTED), // wfr2: stay
PickResult.withSubchannel(subchannel3)); // wfr3: stay
InOrder inOrder = inOrder(picker);
delayedTransport.reprocess(picker);
assertEquals(5, delayedTransport.getPendingStreamsCount());
inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff1
inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff2
inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3
inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4
inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr1
inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2
inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3
inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4
inOrder.verifyNoMoreInteractions();
// Make sure that real transport creates streams in the executor
verify(mockRealTransport, never()).newStream(any(MethodDescriptor.class),
any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
verify(mockRealTransport2, never()).newStream(any(MethodDescriptor.class),
any(Metadata.class), any(CallOptions.class), any(StatsTraceContext.class));
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
// ff1 and wfr1 went through
verify(mockRealTransport).newStream(method, headers, failFastCallOptions, statsTraceCtx);
verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions, statsTraceCtx);
assertSame(mockRealStream, ff1.getRealStream());
assertSame(mockRealStream2, wfr1.getRealStream());
// The ff2 has failed due to picker returning an error
assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError());
// Other streams are still buffered
assertNull(ff3.getRealStream());
assertNull(ff4.getRealStream());
assertNull(wfr2.getRealStream());
assertNull(wfr3.getRealStream());
assertNull(wfr4.getRealStream());
// Second reprocess(). All existing streams will proceed.
picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
PickResult.withSubchannel(subchannel1), // ff3
PickResult.withSubchannel(subchannel2), // ff4
PickResult.withSubchannel(subchannel2), // wfr2
PickResult.withSubchannel(subchannel1), // wfr3
PickResult.withSubchannel(subchannel2), // wfr4
PickResult.withNoResult()); // wfr5 (not yet created)
inOrder = inOrder(picker);
assertEquals(0, wfr3Executor.numPendingTasks());
verify(transportListener, never()).transportInUse(false);
delayedTransport.reprocess(picker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(transportListener).transportInUse(false);
inOrder.verify(picker).pickSubchannel(affinity1, headers); // ff3
inOrder.verify(picker).pickSubchannel(affinity1, headers2); // ff4
inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr2
inOrder.verify(picker).pickSubchannel(affinity2, headers); // wfr3
inOrder.verify(picker).pickSubchannel(affinity2, headers2); // wfr4
inOrder.verifyNoMoreInteractions();
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
assertSame(mockRealStream, ff3.getRealStream());
assertSame(mockRealStream2, ff4.getRealStream());
assertSame(mockRealStream2, wfr2.getRealStream());
assertSame(mockRealStream2, wfr4.getRealStream());
// If there is an executor in the CallOptions, it will be used to create the real tream.
assertNull(wfr3.getRealStream());
wfr3Executor.runDueTasks();
assertSame(mockRealStream, wfr3.getRealStream());
// New streams will use the last picker
DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream(
method, headers, waitForReadyCallOptions, statsTraceCtx);
assertNull(wfr5.getRealStream());
inOrder.verify(picker).pickSubchannel(affinity2, headers);
inOrder.verifyNoMoreInteractions();
assertEquals(1, delayedTransport.getPendingStreamsCount());
// wfr5 will stop delayed transport from terminating
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener, never()).transportTerminated();
// ... until it's gone
picker = mock(SubchannelPicker.class);
when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
PickResult.withSubchannel(subchannel1));
delayedTransport.reprocess(picker);
verify(picker).pickSubchannel(affinity2, headers);
fakeExecutor.runDueTasks();
assertSame(mockRealStream, wfr5.getRealStream());
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(transportListener).transportTerminated();
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
SubchannelImpl subchannel = mock(SubchannelImpl.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(picker.pickSubchannel(any(Attributes.class), any(Metadata.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class), same(statsTraceCtx))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(
method, headers, CallOptions.DEFAULT, statsTraceCtx);
verify(picker).pickSubchannel(CallOptions.DEFAULT.getAffinity(), headers);
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
}