From b593871801cc28c31c4d2286fa83083ae027f82c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 22 Nov 2022 13:04:06 -0800 Subject: [PATCH] core: fix RejectedExecutionException in Retriable Stream (#9626) Add big negative integer to pending stream count when cancelled. The count is used to delay closing master listener until streams fully drained. Increment pending stream count before creating one. The count is also used to indicate callExecutor is safe to be used. New stream will not be created if big negative number was added, i.e. stream cancelled. New stream is created if not cancelled, callExecutor is safe to be used, because cancel will be delayed. Create new streams (retry, hedging) is moved to the main thread, before callExecutor calls drain. Minor refactor the masterListener.close() scenario. --- .../io/grpc/internal/RetriableStream.java | 92 ++++++++++--------- .../io/grpc/internal/RetriableStreamTest.java | 19 ++-- .../grpc/testing/integration/RetryTest.java | 15 +-- 3 files changed, 70 insertions(+), 56 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index cb94195cce..c8c45c61f8 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -107,6 +107,8 @@ abstract class RetriableStream implements ClientStream { */ private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger(); + private final AtomicInteger inFlightSubStreams = new AtomicInteger(); + private Status savedCancellationReason; // Used for recording the share of buffer used for the current call out of the channel buffer. // This field would not be necessary if there is no channel buffer limit. @@ -220,7 +222,12 @@ abstract class RetriableStream implements ClientStream { } } + @Nullable // returns null when cancelled private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) { + // increment only when >= 0, i.e. not cancelled + if (inFlightSubStreams.updateAndGet(value -> value < 0 ? value : value + 1) < 0) { + return null; + } Substream sub = new Substream(previousAttemptCount); // one tracer per substream final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub); @@ -367,6 +374,9 @@ abstract class RetriableStream implements ClientStream { } Substream substream = createSubstream(0, false); + if (substream == null) { + return; + } if (isHedging) { FutureCanceller scheduledHedgingRef = null; @@ -434,16 +444,19 @@ abstract class RetriableStream implements ClientStream { @Override public void run() { + // It's safe to read state.hedgingAttemptCount here. + // If this run is not cancelled, the value of state.hedgingAttemptCount won't change + // until state.addActiveHedge() is called subsequently, even the state could possibly + // change. + Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); + if (newSubstream == null) { + return; + } callExecutor.execute( new Runnable() { @SuppressWarnings("GuardedBy") @Override public void run() { - // It's safe to read state.hedgingAttemptCount here. - // If this run is not cancelled, the value of state.hedgingAttemptCount won't change - // until state.addActiveHedge() is called subsequently, even the state could possibly - // change. - Substream newSubstream = createSubstream(state.hedgingAttemptCount, false); boolean cancelled = false; FutureCanceller future = null; @@ -489,16 +502,11 @@ abstract class RetriableStream implements ClientStream { Runnable runnable = commit(noopSubstream); if (runnable != null) { + savedCancellationReason = reason; runnable.run(); - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(reason, RpcProgress.PROCESSED, new Metadata()); - - } - }); + if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) { + safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); + } return; } @@ -803,6 +811,17 @@ abstract class RetriableStream implements ClientStream { } } + private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) { + listenerSerializeExecutor.execute( + new Runnable() { + @Override + public void run() { + isClosed = true; + masterListener.closed(status, progress, metadata); + } + }); + } + private interface BufferEntry { /** Replays the buffer entry with the given stream. */ void runWith(Substream substream); @@ -840,19 +859,18 @@ abstract class RetriableStream implements ClientStream { closedSubstreamsInsight.append(status.getCode()); } + if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) { + assert savedCancellationReason != null; + safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata()); + return; + } + // handle a race between buffer limit exceeded and closed, when setting // substream.bufferLimitExceeded = true happens before state.substreamClosed(substream). if (substream.bufferLimitExceeded) { commitAndRun(substream); if (state.winningSubstream == substream) { - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(status, rpcProgress, trailers); - } - }); + safeCloseMasterListener(status, rpcProgress, trailers); } return; } @@ -863,14 +881,7 @@ abstract class RetriableStream implements ClientStream { Status tooManyTransparentRetries = Status.INTERNAL .withDescription("Too many transparent retries. Might be a bug in gRPC") .withCause(status.asRuntimeException()); - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(tooManyTransparentRetries, rpcProgress, trailers); - } - }); + safeCloseMasterListener(tooManyTransparentRetries, rpcProgress, trailers); } return; } @@ -881,6 +892,9 @@ abstract class RetriableStream implements ClientStream { && noMoreTransparentRetry.compareAndSet(false, true))) { // transparent retry final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); + if (newSubstream == null) { + return; + } if (isHedging) { boolean commit = false; synchronized (lock) { @@ -942,6 +956,11 @@ abstract class RetriableStream implements ClientStream { } else { RetryPlan retryPlan = makeRetryDecision(status, trailers); if (retryPlan.shouldRetry) { + // retry + Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false); + if (newSubstream == null) { + return; + } // The check state.winningSubstream == null, checking if is not already committed, is // racy, but is still safe b/c the retry will also handle committed/cancellation FutureCanceller scheduledRetryCopy; @@ -955,10 +974,6 @@ abstract class RetriableStream implements ClientStream { new Runnable() { @Override public void run() { - // retry - Substream newSubstream = createSubstream( - substream.previousAttemptCount + 1, - false); drain(newSubstream); } }); @@ -978,14 +993,7 @@ abstract class RetriableStream implements ClientStream { commitAndRun(substream); if (state.winningSubstream == substream) { - listenerSerializeExecutor.execute( - new Runnable() { - @Override - public void run() { - isClosed = true; - masterListener.closed(status, rpcProgress, trailers); - } - }); + safeCloseMasterListener(status, rpcProgress, trailers); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index f20e772e92..12bf697027 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -283,6 +283,7 @@ public class RetriableStreamTest { doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1); sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_2), PROCESSED, new Metadata()); + inOrder.verify(retriableStreamRecorder).newSubstream(1); assertEquals(1, fakeClock.numPendingTasks()); // send more messages during backoff @@ -294,7 +295,6 @@ public class RetriableStreamTest { assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); - inOrder.verify(retriableStreamRecorder).newSubstream(1); inOrder.verify(mockStream2).setAuthority(AUTHORITY); inOrder.verify(mockStream2).setCompressor(COMPRESSOR); inOrder.verify(mockStream2).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); @@ -339,6 +339,7 @@ public class RetriableStreamTest { doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(2); sublistenerCaptor2.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); + inOrder.verify(retriableStreamRecorder).newSubstream(2); assertEquals(1, fakeClock.numPendingTasks()); // send more messages during backoff @@ -353,7 +354,6 @@ public class RetriableStreamTest { assertEquals(1, fakeClock.numPendingTasks()); fakeClock.forwardTime(1L, TimeUnit.SECONDS); assertEquals(0, fakeClock.numPendingTasks()); - inOrder.verify(retriableStreamRecorder).newSubstream(2); inOrder.verify(mockStream3).setAuthority(AUTHORITY); inOrder.verify(mockStream3).setCompressor(COMPRESSOR); inOrder.verify(mockStream3).setDecompressorRegistry(DECOMPRESSOR_REGISTRY); @@ -792,6 +792,8 @@ public class RetriableStreamTest { public void cancelWhileDraining() { ArgumentCaptor sublistenerCaptor1 = ArgumentCaptor.forClass(ClientStreamListener.class); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream2 = mock( @@ -818,7 +820,7 @@ public class RetriableStreamTest { Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); - inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(mockStream2).request(3); inOrder.verify(retriableStreamRecorder).postCommit(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); @@ -826,6 +828,7 @@ public class RetriableStreamTest { assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getDescription()) .isEqualTo("Stream thrown away because RetriableStream committed"); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); verify(masterListener).closed( statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); @@ -848,6 +851,8 @@ public class RetriableStreamTest { Status.CANCELLED.withDescription("cancelled while retry start")); } })); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); InOrder inOrder = inOrder(retriableStreamRecorder, mockStream1, mockStream2); doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); @@ -860,13 +865,14 @@ public class RetriableStreamTest { Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); fakeClock.forwardTime((long) (INITIAL_BACKOFF_IN_SECONDS * FAKE_RANDOM), TimeUnit.SECONDS); - inOrder.verify(mockStream2).start(any(ClientStreamListener.class)); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); inOrder.verify(retriableStreamRecorder).postCommit(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); inOrder.verify(mockStream2).cancel(statusCaptor.capture()); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); assertThat(statusCaptor.getValue().getDescription()) .isEqualTo("Stream thrown away because RetriableStream committed"); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); verify(masterListener).closed( statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.CANCELLED); @@ -1121,7 +1127,6 @@ public class RetriableStreamTest { sublistenerCaptor1.getValue().closed( Status.fromCode(RETRIABLE_STATUS_CODE_1), PROCESSED, new Metadata()); - // bufferSizeTracer.outboundWireSize() quits immediately while backoff b/c substream1 is closed assertEquals(1, fakeClock.numPendingTasks()); bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder, never()).postCommit(); @@ -1132,8 +1137,6 @@ public class RetriableStreamTest { // bufferLimitExceeded bufferSizeTracer.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1); - verify(retriableStreamRecorder, never()).postCommit(); - bufferSizeTracer.outboundWireSize(2); verify(retriableStreamRecorder).postCommit(); verifyNoMoreInteractions(mockStream1); @@ -2464,6 +2467,8 @@ public class RetriableStreamTest { assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); inOrder.verify(retriableStreamRecorder).postCommit(); + sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); + sublistenerCaptor2.getValue().closed(Status.CANCELLED, PROCESSED, new Metadata()); inOrder.verify(masterListener).closed( any(Status.class), any(RpcProgress.class), any(Metadata.class)); inOrder.verifyNoMoreInteractions(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index 157fd79524..eca563fb7c 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -351,8 +351,8 @@ public class RetryTest { call.request(1); assertInboundMessageRecorded(); assertInboundWireSizeRecorded(1); - assertRpcStatusRecorded(Status.Code.OK, 2000, 2); - assertRetryStatsRecorded(1, 0, 10_000); + assertRpcStatusRecorded(Status.Code.OK, 12000, 2); + assertRetryStatsRecorded(1, 0, 0); } @Test @@ -410,13 +410,14 @@ public class RetryTest { serverCall.request(2); assertOutboundWireSizeRecorded(message.length()); fakeClock.forwardTime(7, SECONDS); - call.cancel("Cancelled before commit", null); // A noop substream will commit. - // The call listener is closed, but the netty substream listener is not yet closed. - verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + // A noop substream will commit. But call is not yet closed. + call.cancel("Cancelled before commit", null); // Let the netty substream listener be closed. streamClosedLatch.countDown(); - assertRetryStatsRecorded(1, 0, 10_000); - assertRpcStatusRecorded(Code.CANCELLED, 7_000, 1); + // The call listener is closed. + verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); + assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); + assertRetryStatsRecorded(1, 0, 0); } @Test