core: fix RejectedExecutionException in Retriable Stream (#9626)

Add big negative integer to pending stream count when cancelled. The count is used to delay closing master listener until streams fully drained.
Increment pending stream count before creating one. The count is also used to indicate callExecutor is safe to be used. New stream will not be created if big negative number was added, i.e. stream cancelled. New stream is created if not cancelled, callExecutor is safe to be used, because cancel will be delayed.
Create new streams (retry, hedging) is moved to the main thread, before callExecutor calls drain.
Minor refactor the masterListener.close() scenario.
This commit is contained in:
yifeizhuang 2022-11-22 13:04:06 -08:00 committed by GitHub
parent 159bb8c55b
commit b593871801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 56 deletions

View File

@ -107,6 +107,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
*/ */
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger(); private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
private Status savedCancellationReason;
// Used for recording the share of buffer used for the current call out of the channel buffer. // Used for recording the share of buffer used for the current call out of the channel buffer.
// This field would not be necessary if there is no channel buffer limit. // This field would not be necessary if there is no channel buffer limit.
@ -220,7 +222,12 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
} }
@Nullable // returns null when cancelled
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) { private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
// increment only when >= 0, i.e. not cancelled
if (inFlightSubStreams.updateAndGet(value -> value < 0 ? value : value + 1) < 0) {
return null;
}
Substream sub = new Substream(previousAttemptCount); Substream sub = new Substream(previousAttemptCount);
// one tracer per substream // one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
@ -367,6 +374,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
Substream substream = createSubstream(0, false); Substream substream = createSubstream(0, false);
if (substream == null) {
return;
}
if (isHedging) { if (isHedging) {
FutureCanceller scheduledHedgingRef = null; FutureCanceller scheduledHedgingRef = null;
@ -432,11 +442,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
scheduledHedgingRef = scheduledHedging; scheduledHedgingRef = scheduledHedging;
} }
@Override
public void run() {
callExecutor.execute(
new Runnable() {
@SuppressWarnings("GuardedBy")
@Override @Override
public void run() { public void run() {
// It's safe to read state.hedgingAttemptCount here. // It's safe to read state.hedgingAttemptCount here.
@ -444,6 +449,14 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// until state.addActiveHedge() is called subsequently, even the state could possibly // until state.addActiveHedge() is called subsequently, even the state could possibly
// change. // change.
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
if (newSubstream == null) {
return;
}
callExecutor.execute(
new Runnable() {
@SuppressWarnings("GuardedBy")
@Override
public void run() {
boolean cancelled = false; boolean cancelled = false;
FutureCanceller future = null; FutureCanceller future = null;
@ -489,16 +502,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
Runnable runnable = commit(noopSubstream); Runnable runnable = commit(noopSubstream);
if (runnable != null) { if (runnable != null) {
savedCancellationReason = reason;
runnable.run(); runnable.run();
listenerSerializeExecutor.execute( if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
new Runnable() { safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
@Override
public void run() {
isClosed = true;
masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata());
} }
});
return; return;
} }
@ -803,6 +811,17 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
} }
private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, progress, metadata);
}
});
}
private interface BufferEntry { private interface BufferEntry {
/** Replays the buffer entry with the given stream. */ /** Replays the buffer entry with the given stream. */
void runWith(Substream substream); void runWith(Substream substream);
@ -840,19 +859,18 @@ abstract class RetriableStream<ReqT> implements ClientStream {
closedSubstreamsInsight.append(status.getCode()); closedSubstreamsInsight.append(status.getCode());
} }
if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCancellationReason != null;
safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata());
return;
}
// handle a race between buffer limit exceeded and closed, when setting // handle a race between buffer limit exceeded and closed, when setting
// substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream).
if (substream.bufferLimitExceeded) { if (substream.bufferLimitExceeded) {
commitAndRun(substream); commitAndRun(substream);
if (state.winningSubstream == substream) { if (state.winningSubstream == substream) {
listenerSerializeExecutor.execute( safeCloseMasterListener(status, rpcProgress, trailers);
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
} }
return; return;
} }
@ -863,14 +881,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
Status tooManyTransparentRetries = Status.INTERNAL Status tooManyTransparentRetries = Status.INTERNAL
.withDescription("Too many transparent retries. Might be a bug in gRPC") .withDescription("Too many transparent retries. Might be a bug in gRPC")
.withCause(status.asRuntimeException()); .withCause(status.asRuntimeException());
listenerSerializeExecutor.execute( safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers);
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(tooManyTransparentRetries, rpcProgress, trailers);
}
});
} }
return; return;
} }
@ -881,6 +892,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
&& noMoreTransparentRetry.compareAndSet(false, true))) { && noMoreTransparentRetry.compareAndSet(false, true))) {
// transparent retry // transparent retry
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
if (newSubstream == null) {
return;
}
if (isHedging) { if (isHedging) {
boolean commit = false; boolean commit = false;
synchronized (lock) { synchronized (lock) {
@ -942,6 +956,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} else { } else {
RetryPlan retryPlan = makeRetryDecision(status, trailers); RetryPlan retryPlan = makeRetryDecision(status, trailers);
if (retryPlan.shouldRetry) { if (retryPlan.shouldRetry) {
// retry
Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
if (newSubstream == null) {
return;
}
// The check state.winningSubstream == null, checking if is not already committed, is // The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation // racy, but is still safe b/c the retry will also handle committed/cancellation
FutureCanceller scheduledRetryCopy; FutureCanceller scheduledRetryCopy;
@ -955,10 +974,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
new Runnable() { new Runnable() {
@Override @Override
public void run() { public void run() {
// retry
Substream newSubstream = createSubstream(
substream.previousAttemptCount + 1,
false);
drain(newSubstream); drain(newSubstream);
} }
}); });
@ -978,14 +993,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
commitAndRun(substream); commitAndRun(substream);
if (state.winningSubstream == substream) { if (state.winningSubstream == substream) {
listenerSerializeExecutor.execute( safeCloseMasterListener(status, rpcProgress, trailers);
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(status, rpcProgress, trailers);
}
});
} }
} }

View File

@ -283,6 +283,7 @@ public class RetriableStreamTest {
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
sublistenerCaptor1.getValue().closed( sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(1);
assertEquals(1, fakeClock.numPendingTasks()); assertEquals(1, fakeClock.numPendingTasks());
// send more messages during backoff // send more messages during backoff
@ -294,7 +295,6 @@ public class RetriableStreamTest {
assertEquals(1, fakeClock.numPendingTasks()); assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS); fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(1);
inOrder.verify(mockStream2).setAuthority(AUTHORITY); inOrder.verify(mockStream2).setAuthority(AUTHORITY);
inOrder.verify(mockStream2).setCompressor(COMPRESSOR); inOrder.verify(mockStream2).setCompressor(COMPRESSOR);
inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
@ -339,6 +339,7 @@ public class RetriableStreamTest {
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2);
sublistenerCaptor2.getValue().closed( sublistenerCaptor2.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(2);
assertEquals(1, fakeClock.numPendingTasks()); assertEquals(1, fakeClock.numPendingTasks());
// send more messages during backoff // send more messages during backoff
@ -353,7 +354,6 @@ public class RetriableStreamTest {
assertEquals(1, fakeClock.numPendingTasks()); assertEquals(1, fakeClock.numPendingTasks());
fakeClock.forwardTime(1L, TimeUnit.SECONDS); fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
inOrder.verify(retriableStreamRecorder).newSubstream(2);
inOrder.verify(mockStream3).setAuthority(AUTHORITY); inOrder.verify(mockStream3).setAuthority(AUTHORITY);
inOrder.verify(mockStream3).setCompressor(COMPRESSOR); inOrder.verify(mockStream3).setCompressor(COMPRESSOR);
inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY);
@ -792,6 +792,8 @@ public class RetriableStreamTest {
public void cancelWhileDraining() { public void cancelWhileDraining() {
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 = ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class); ArgumentCaptor.forClass(ClientStreamListener.class);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = ClientStream mockStream2 =
mock( mock(
@ -818,7 +820,7 @@ public class RetriableStreamTest {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).request(3); inOrder.verify(mockStream2).request(3);
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
@ -826,6 +828,7 @@ public class RetriableStreamTest {
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription()) assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed"); .isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed( verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
@ -848,6 +851,8 @@ public class RetriableStreamTest {
Status.CANCELLED.withDescription("cancelled while retry start")); Status.CANCELLED.withDescription("cancelled while retry start"));
} }
})); }));
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
@ -860,13 +865,14 @@ public class RetriableStreamTest {
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS);
inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
inOrder.verify(mockStream2).cancel(statusCaptor.capture()); inOrder.verify(mockStream2).cancel(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
assertThat(statusCaptor.getValue().getDescription()) assertThat(statusCaptor.getValue().getDescription())
.isEqualTo("Stream thrown away because RetriableStream committed"); .isEqualTo("Stream thrown away because RetriableStream committed");
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
verify(masterListener).closed( verify(masterListener).closed(
statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED);
@ -1121,7 +1127,6 @@ public class RetriableStreamTest {
sublistenerCaptor1.getValue().closed( sublistenerCaptor1.getValue().closed(
Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata());
// bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed
assertEquals(1, fakeClock.numPendingTasks()); assertEquals(1, fakeClock.numPendingTasks());
bufferSizeTracer.outboundWireSize(2); bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder, never()).postCommit(); verify(retriableStreamRecorder, never()).postCommit();
@ -1132,8 +1137,6 @@ public class RetriableStreamTest {
// bufferLimitExceeded // bufferLimitExceeded
bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
verify(retriableStreamRecorder, never()).postCommit();
bufferSizeTracer.outboundWireSize(2);
verify(retriableStreamRecorder).postCommit(); verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1); verifyNoMoreInteractions(mockStream1);
@ -2464,6 +2467,8 @@ public class RetriableStreamTest {
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
inOrder.verify(retriableStreamRecorder).postCommit(); inOrder.verify(retriableStreamRecorder).postCommit();
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata());
inOrder.verify(masterListener).closed( inOrder.verify(masterListener).closed(
any(Status.class), any(RpcProgress.class), any(Metadata.class)); any(Status.class), any(RpcProgress.class), any(Metadata.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();

View File

@ -351,8 +351,8 @@ public class RetryTest {
call.request(1); call.request(1);
assertInboundMessageRecorded(); assertInboundMessageRecorded();
assertInboundWireSizeRecorded(1); assertInboundWireSizeRecorded(1);
assertRpcStatusRecorded(Status.Code.OK, 2000, 2); assertRpcStatusRecorded(Status.Code.OK, 12000, 2);
assertRetryStatsRecorded(1, 0, 10_000); assertRetryStatsRecorded(1, 0, 0);
} }
@Test @Test
@ -410,13 +410,14 @@ public class RetryTest {
serverCall.request(2); serverCall.request(2);
assertOutboundWireSizeRecorded(message.length()); assertOutboundWireSizeRecorded(message.length());
fakeClock.forwardTime(7, SECONDS); fakeClock.forwardTime(7, SECONDS);
call.cancel("Cancelled before commit", null); // A noop substream will commit. // A noop substream will commit. But call is not yet closed.
// The call listener is closed, but the netty substream listener is not yet closed. call.cancel("Cancelled before commit", null);
verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
// Let the netty substream listener be closed. // Let the netty substream listener be closed.
streamClosedLatch.countDown(); streamClosedLatch.countDown();
assertRetryStatsRecorded(1, 0, 10_000); // The call listener is closed.
assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1); verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1);
assertRetryStatsRecorded(1, 0, 0);
} }
@Test @Test