core: fix retry flow control issue (#8401)

There has been an issue about flow control when retry is enabled.

Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called.

The user's `onReady()` implementation might do

```
while(observer.isReady()) {
  // send one more message.
}
```

However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained.

This PR fixes the issue by

- Use a SerializeExecutor to call all `masterListener` callbacks.
- Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`.
- Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`.
This commit is contained in:
ZHANG Dapeng 2021-08-11 10:25:57 -07:00 committed by GitHub
parent fd2a58a55e
commit 2142902343
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 170 additions and 18 deletions

View File

@ -30,8 +30,10 @@ import io.grpc.DecompressorRegistry;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.io.InputStream; import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -64,6 +66,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private final MethodDescriptor<ReqT, ?> method; private final MethodDescriptor<ReqT, ?> method;
private final Executor callExecutor; private final Executor callExecutor;
private final Executor listenerSerializeExecutor = new SynchronizationContext(
new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw Status.fromThrowable(e)
.withDescription("Uncaught exception in the SynchronizationContext. Re-thrown.")
.asRuntimeException();
}
}
);
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
// Must not modify it. // Must not modify it.
private final Metadata headers; private final Metadata headers;
@ -105,6 +117,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private FutureCanceller scheduledHedging; private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos; private long nextBackoffIntervalNanos;
private Status cancellationStatus; private Status cancellationStatus;
private boolean isClosed;
RetriableStream( RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers, MethodDescriptor<ReqT, ?> method, Metadata headers,
@ -247,6 +260,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
int chunk = 0x80; int chunk = 0x80;
List<BufferEntry> list = null; List<BufferEntry> list = null;
boolean streamStarted = false; boolean streamStarted = false;
Runnable onReadyRunnable = null;
while (true) { while (true) {
State savedState; State savedState;
@ -264,7 +278,18 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
if (index == savedState.buffer.size()) { // I'm drained if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream); state = savedState.substreamDrained(substream);
return; if (!isReady()) {
return;
}
onReadyRunnable = new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
}
}
};
break;
} }
if (substream.closed) { if (substream.closed) {
@ -299,6 +324,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
} }
if (onReadyRunnable != null) {
listenerSerializeExecutor.execute(onReadyRunnable);
return;
}
substream.stream.cancel( substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
} }
@ -450,14 +480,22 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
@Override @Override
public final void cancel(Status reason) { public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream(); noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream); Runnable runnable = commit(noopSubstream);
if (runnable != null) { if (runnable != null) {
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
runnable.run(); runnable.run();
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
}
});
return; return;
} }
@ -771,18 +809,25 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
@Override @Override
public void headersRead(Metadata headers) { public void headersRead(final Metadata headers) {
commitAndRun(substream); commitAndRun(substream);
if (state.winningSubstream == substream) { if (state.winningSubstream == substream) {
masterListener.headersRead(headers);
if (throttle != null) { if (throttle != null) {
throttle.onSuccess(); throttle.onSuccess();
} }
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.headersRead(headers);
}
});
} }
} }
@Override @Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { public void closed(
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
synchronized (lock) { synchronized (lock) {
state = state.substreamClosed(substream); state = state.substreamClosed(substream);
closedSubstreamsInsight.append(status.getCode()); closedSubstreamsInsight.append(status.getCode());
@ -793,7 +838,14 @@ abstract class RetriableStream<ReqT> implements ClientStream {
if (substream.bufferLimitExceeded) { if (substream.bufferLimitExceeded) {
commitAndRun(substream); commitAndRun(substream);
if (state.winningSubstream == substream) { if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers); listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
} }
return; return;
} }
@ -900,7 +952,14 @@ abstract class RetriableStream<ReqT> implements ClientStream {
commitAndRun(substream); commitAndRun(substream);
if (state.winningSubstream == substream) { if (state.winningSubstream == substream) {
masterListener.closed(status, rpcProgress, trailers); listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
} }
} }
@ -970,22 +1029,37 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
@Override @Override
public void messagesAvailable(MessageProducer producer) { public void messagesAvailable(final MessageProducer producer) {
State savedState = state; State savedState = state;
checkState( checkState(
savedState.winningSubstream != null, "Headers should be received prior to messages."); savedState.winningSubstream != null, "Headers should be received prior to messages.");
if (savedState.winningSubstream != substream) { if (savedState.winningSubstream != substream) {
return; return;
} }
masterListener.messagesAvailable(producer); listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
masterListener.messagesAvailable(producer);
}
});
} }
@Override @Override
public void onReady() { public void onReady() {
// FIXME(#7089): hedging case is broken. // FIXME(#7089): hedging case is broken.
// TODO(zdapeng): optimization: if the substream is not drained yet, delay onReady() once if (!isReady()) {
// drained and if is still ready. return;
masterListener.onReady(); }
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
if (!isClosed) {
masterListener.onReady();
}
}
});
} }
} }

View File

@ -256,6 +256,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
retriableStream.sendMessage("msg1"); retriableStream.sendMessage("msg1");
@ -308,6 +309,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream2).request(456);
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// send more messages // send more messages
@ -356,6 +358,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3).request(456);
inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3, times(7)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder(); InsightBuilder insight = new InsightBuilder();
@ -637,6 +640,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture()); verify(mockStream1).start(sublistenerCaptor1.capture());
verify(mockStream1).isReady();
// retry // retry
ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream2 = mock(ClientStream.class);
@ -656,7 +660,7 @@ public class RetriableStreamTest {
@Test @Test
public void operationsWhileDraining() { public void operationsWhileDraining() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = final ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
final AtomicReference<ClientStreamListener> sublistenerCaptor2 = final AtomicReference<ClientStreamListener> sublistenerCaptor2 =
new AtomicReference<>(); new AtomicReference<>();
@ -669,10 +673,16 @@ public class RetriableStreamTest {
@Override @Override
public void request(int numMessages) { public void request(int numMessages) {
retriableStream.sendMessage("substream1 request " + numMessages); retriableStream.sendMessage("substream1 request " + numMessages);
sublistenerCaptor1.getValue().onReady();
if (numMessages > 1) { if (numMessages > 1) {
retriableStream.request(--numMessages); retriableStream.request(--numMessages);
} }
} }
@Override
public boolean isReady() {
return true;
}
})); }));
final ClientStream mockStream2 = final ClientStream mockStream2 =
@ -688,7 +698,7 @@ public class RetriableStreamTest {
@Override @Override
public void request(int numMessages) { public void request(int numMessages) {
retriableStream.sendMessage("substream2 request " + numMessages); retriableStream.sendMessage("substream2 request " + numMessages);
sublistenerCaptor2.get().onReady();
if (numMessages == 3) { if (numMessages == 3) {
sublistenerCaptor2.get().headersRead(new Metadata()); sublistenerCaptor2.get().headersRead(new Metadata());
} }
@ -699,9 +709,14 @@ public class RetriableStreamTest {
retriableStream.cancel(cancelStatus); retriableStream.cancel(cancelStatus);
} }
} }
@Override
public boolean isReady() {
return true;
}
})); }));
InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2, masterListener);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener); retriableStream.start(masterListener);
@ -716,6 +731,7 @@ public class RetriableStreamTest {
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2" inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 2"
inOrder.verify(mockStream1).request(1); inOrder.verify(mockStream1).request(1);
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1"
inOrder.verify(masterListener).onReady();
// retry // retry
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
@ -743,8 +759,8 @@ public class RetriableStreamTest {
// msg "substream2 request 2" // msg "substream2 request 2"
inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(100); inOrder.verify(mockStream2).request(100);
inOrder.verify(mockStream2).cancel(cancelStatus);
verify(mockStream2).cancel(cancelStatus); inOrder.verify(masterListener, never()).onReady();
// "substream2 request 1" will never be sent // "substream2 request 1" will never be sent
inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2, never()).writeMessage(any(InputStream.class));
@ -1073,6 +1089,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture()); verify(mockStream1).start(sublistenerCaptor1.capture());
verify(mockStream1).isReady();
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@ -1089,6 +1106,7 @@ public class RetriableStreamTest {
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
verify(mockStream2).start(any(ClientStreamListener.class)); verify(mockStream2).start(any(ClientStreamListener.class));
verify(mockStream2).isReady();
// bufferLimitExceeded // bufferLimitExceeded
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@ -1152,6 +1170,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
@ -1167,6 +1186,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry2 // retry2
@ -1183,6 +1203,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry3 // retry3
@ -1200,6 +1221,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry4 // retry4
@ -1214,6 +1236,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry5 // retry5
@ -1228,6 +1251,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// can not retry any more // can not retry any more
@ -1258,6 +1282,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
@ -1276,6 +1301,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry2 // retry2
@ -1293,6 +1319,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry3 // retry3
@ -1307,6 +1334,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry4 // retry4
@ -1323,6 +1351,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// retry5 // retry5
@ -1340,6 +1369,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// can not retry any more even pushback is positive // can not retry any more even pushback is positive
@ -1597,6 +1627,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// transparent retry // transparent retry
@ -1608,6 +1639,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
@ -1623,6 +1655,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
@ -1645,6 +1678,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// normal retry // normal retry
@ -1658,6 +1692,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
@ -1674,6 +1709,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
} }
@ -1695,6 +1731,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// normal retry // normal retry
@ -1708,6 +1745,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
@ -1738,6 +1776,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// transparent retry // transparent retry
@ -1750,6 +1789,7 @@ public class RetriableStreamTest {
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
} }
@ -1768,6 +1808,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture()); verify(mockStream1).start(sublistenerCaptor1.capture());
verify(mockStream1).isReady();
// drop and verify no retry // drop and verify no retry
Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1); Status status = Status.fromCode(RETRIABLE_STATUS_CODE_1);
@ -1839,6 +1880,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
hedgingStream.sendMessage("msg1"); hedgingStream.sendMessage("msg1");
@ -1880,6 +1922,8 @@ public class RetriableStreamTest {
inOrder.verify(mockStream2, times(2)).flush(); inOrder.verify(mockStream2, times(2)).flush();
inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).writeMessage(any(InputStream.class));
inOrder.verify(mockStream2).request(456); inOrder.verify(mockStream2).request(456);
inOrder.verify(mockStream1).isReady();
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// send more messages // send more messages
@ -1917,6 +1961,9 @@ public class RetriableStreamTest {
inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).writeMessage(any(InputStream.class));
inOrder.verify(mockStream3).request(456); inOrder.verify(mockStream3).request(456);
inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3, times(2)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream1).isReady();
inOrder.verify(mockStream2).isReady();
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// send one more message // send one more message
@ -1959,6 +2006,9 @@ public class RetriableStreamTest {
inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4).writeMessage(any(InputStream.class));
inOrder.verify(mockStream4).request(456); inOrder.verify(mockStream4).request(456);
inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4, times(4)).writeMessage(any(InputStream.class));
inOrder.verify(mockStream1).isReady();
inOrder.verify(mockStream2).isReady();
inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
InsightBuilder insight = new InsightBuilder(); InsightBuilder insight = new InsightBuilder();
@ -2009,6 +2059,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2016,6 +2067,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2023,6 +2075,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2030,6 +2083,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// a random one of the hedges fails // a random one of the hedges fails
@ -2041,6 +2095,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor5 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream5).start(sublistenerCaptor5.capture()); inOrder.verify(mockStream5).start(sublistenerCaptor5.capture());
inOrder.verify(mockStream5).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2048,6 +2103,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor6 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream6).start(sublistenerCaptor6.capture()); inOrder.verify(mockStream6).start(sublistenerCaptor6.capture());
inOrder.verify(mockStream6).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2092,6 +2148,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2099,6 +2156,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2106,6 +2164,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// a random one of the hedges receives headers // a random one of the hedges receives headers
@ -2143,6 +2202,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2150,6 +2210,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2157,6 +2218,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// a random one of the hedges receives a negative pushback // a random one of the hedges receives a negative pushback
@ -2188,6 +2250,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2195,6 +2258,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
@ -2212,6 +2276,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second // hedge2 receives a pushback for HEDGING_DELAY_IN_SECONDS - 1 second
@ -2225,6 +2290,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor4 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream4).start(sublistenerCaptor4.capture()); inOrder.verify(mockStream4).start(sublistenerCaptor4.capture());
inOrder.verify(mockStream4).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// commit // commit
@ -2254,6 +2320,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
@ -2261,6 +2328,8 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
Status status = Status.CANCELLED.withDescription("cancelled"); Status status = Status.CANCELLED.withDescription("cancelled");
@ -2275,6 +2344,8 @@ public class RetriableStreamTest {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }
@ -2289,6 +2360,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture()); verify(mockStream1).start(sublistenerCaptor1.capture());
verify(mockStream1).isReady();
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@ -2297,6 +2369,8 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture()); verify(mockStream2).start(sublistenerCaptor2.capture());
verify(mockStream1, times(2)).isReady();
verify(mockStream2).isReady();
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
@ -2313,6 +2387,7 @@ public class RetriableStreamTest {
verify(retriableStreamRecorder).postCommit(); verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1); verifyNoMoreInteractions(mockStream1);
verify(mockStream2).isReady();
verifyNoMoreInteractions(mockStream2); verifyNoMoreInteractions(mockStream2);
} }
@ -2327,6 +2402,7 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture()); verify(mockStream1).start(sublistenerCaptor1.capture());
verify(mockStream1).isReady();
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer; ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(100); bufferSizeTracer1.outboundWireSize(100);
@ -2335,6 +2411,8 @@ public class RetriableStreamTest {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture()); verify(mockStream2).start(sublistenerCaptor2.capture());
verify(mockStream1, times(2)).isReady();
verify(mockStream2).isReady();
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(100); bufferSizeTracer2.outboundWireSize(100);