From 8d98e5ff7f8e59b8c4704116a7c80732578a947d Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 3 Apr 2023 15:18:54 -0700 Subject: [PATCH] core: Fix NPE race during hedging The problem was one hedge was committed before another had drained start(). This was not testable because HedgingRunnable checks whether scheduledHedgingRef is cancelled, which is racy, but there's no way to deterministically trigger either race. The same problem couldn't be triggered with retries because only one attempt will be draining at a time. Retries with cancellation also couldn't trigger it, for the surprising reason that the noop stream used in cancel() wasn't considered drained. This commit marks the noop stream as drained with cancel(), which allows memory to be garbage collected sooner and exposes the race for tests. That then showed the stream as hanging, because inFlightSubStreams wasn't being decremented. Fixes #9185 --- .../io/grpc/internal/RetriableStream.java | 39 ++++++++------- .../io/grpc/internal/RetriableStreamTest.java | 47 +++++++++++++++++-- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index ab85ec1bdb..7f1b41cb2d 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -282,14 +282,12 @@ abstract class RetriableStream implements ClientStream { synchronized (lock) { savedState = state; - if (streamStarted) { - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me, to be cancelled - break; - } - if (savedState.cancelled) { - break; - } + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); @@ -326,15 +324,13 @@ abstract class RetriableStream implements ClientStream { if (bufferEntry instanceof RetriableStream.StartEntry) { streamStarted = true; } - if (streamStarted) { - savedState = state; - if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { - // committed but not me, to be cancelled - break; - } - if (savedState.cancelled) { - break; - } + savedState = state; + if (savedState.winningSubstream != null && savedState.winningSubstream != substream) { + // committed but not me, to be cancelled + break; + } + if (savedState.cancelled) { + break; } } } @@ -344,6 +340,10 @@ abstract class RetriableStream implements ClientStream { return; } + if (!streamStarted) { + // Start stream so inFlightSubStreams is decremented in Sublistener.closed() + substream.stream.start(new Sublistener(substream)); + } substream.stream.cancel( state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED); } @@ -484,6 +484,8 @@ abstract class RetriableStream implements ClientStream { } if (cancelled) { + // Start stream so inFlightSubStreams is decremented in Sublistener.closed() + newSubstream.stream.start(new Sublistener(newSubstream)); newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging")); return; } @@ -507,6 +509,9 @@ abstract class RetriableStream implements ClientStream { Runnable runnable = commit(noopSubstream); if (runnable != null) { + synchronized (lock) { + state = state.substreamDrained(noopSubstream); + } runnable.run(); safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); return; diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index bbbf36d37c..e14e71cb2a 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -188,7 +188,7 @@ public class RetriableStreamTest { } } - private final RetriableStream retriableStream = + private RetriableStream retriableStream = newThrottledRetriableStream(null /* throttle */); private final RetriableStream hedgingStream = newThrottledHedgingStream(null /* throttle */); @@ -196,10 +196,13 @@ public class RetriableStreamTest { private ClientStreamTracer bufferSizeTracer; private RetriableStream newThrottledRetriableStream(Throttle throttle) { + return newThrottledRetriableStream(throttle, MoreExecutors.directExecutor()); + } + + private RetriableStream newThrottledRetriableStream(Throttle throttle, Executor drainer) { return new RecordedRetriableStream( method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, - MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY, - null, throttle); + drainer, fakeClock.getScheduledExecutorService(), RETRY_POLICY, null, throttle); } private RetriableStream newThrottledHedgingStream(Throttle throttle) { @@ -598,6 +601,44 @@ public class RetriableStreamTest { inOrder.verify(retriableStreamRecorder, never()).postCommit(); } + @Test + public void transparentRetry_cancel_race() { + FakeClock drainer = new FakeClock(); + retriableStream = newThrottledRetriableStream(null, drainer.getScheduledExecutorService()); + ClientStream mockStream1 = mock(ClientStream.class); + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + InOrder inOrder = inOrder(retriableStreamRecorder); + + retriableStream.start(masterListener); + + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream1).start(sublistenerCaptor1.capture()); + + // retry, but don't drain + ClientStream mockStream2 = mock(ClientStream.class); + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0); + sublistenerCaptor1.getValue().closed( + Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata()); + assertEquals(1, drainer.numPendingTasks()); + + // cancel + retriableStream.cancel(Status.CANCELLED); + // drain transparent retry + drainer.runDueTasks(); + inOrder.verify(retriableStreamRecorder).postCommit(); + + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + verify(mockStream2).start(sublistenerCaptor2.capture()); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockStream2).cancel(statusCaptor.capture()); + assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode()); + assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription()); + sublistenerCaptor2.getValue().closed(statusCaptor.getValue(), PROCESSED, new Metadata()); + verify(masterListener).closed(same(Status.CANCELLED), same(PROCESSED), any(Metadata.class)); + } + @Test public void unretriableClosed_cancel() { ClientStream mockStream1 = mock(ClientStream.class);