From 431fb0255fbed3f46ff3aca15ab37273c6143a47 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 4 Feb 2022 22:12:03 -0800 Subject: [PATCH] core/netty: infinite local-only transparent retry for netty (#8878) In core, add a new enum element to `RpcProgress` for the case that the stream is closed even before anything leaves the client. `RetriableStream` will do unlimited transparent retry for this type of `RpcProgress` since they are local-only. In netty, call `tranportReportStatus()` for pending streams on failure. Also fixes #8394 --- .../grpc/internal/ClientStreamListener.java | 8 ++- .../io/grpc/internal/RetriableStream.java | 9 ++-- .../io/grpc/internal/SubchannelChannel.java | 2 +- .../io/grpc/internal/RetriableStreamTest.java | 52 ++++++++++++++++++- .../grpc/testing/integration/RetryTest.java | 50 +++++++++++++----- .../io/grpc/netty/NettyClientHandler.java | 20 ++++--- .../java/io/grpc/netty/NettyClientStream.java | 13 +++-- .../io/grpc/netty/NettyClientHandlerTest.java | 17 +++++- 8 files changed, 139 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ClientStreamListener.java b/core/src/main/java/io/grpc/internal/ClientStreamListener.java index dfb521f40a..8db1fbe445 100644 --- a/core/src/main/java/io/grpc/internal/ClientStreamListener.java +++ b/core/src/main/java/io/grpc/internal/ClientStreamListener.java @@ -57,12 +57,16 @@ public interface ClientStreamListener extends StreamListener { */ PROCESSED, /** - * The RPC is not processed by the server's application logic. + * The stream on the wire is created but not processed by the server's application logic. */ REFUSED, /** * The RPC is dropped (by load balancer). */ - DROPPED + DROPPED, + /** + * The stream is closed even before anything leaves the client. + */ + MISCARRIED } } diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 4afdb3f750..8451ea6b2d 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -101,7 +101,9 @@ abstract class RetriableStream implements ClientStream { false, 0); /** - * Either transparent retry happened or reached server's application logic. + * Either non-local transparent retry happened or reached server's application logic. + * + *

Note that local-only transparent retries are unlimited. */ private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean(); @@ -851,8 +853,9 @@ abstract class RetriableStream implements ClientStream { } if (state.winningSubstream == null) { - if (rpcProgress == RpcProgress.REFUSED - && noMoreTransparentRetry.compareAndSet(false, true)) { + if (rpcProgress == RpcProgress.MISCARRIED + || (rpcProgress == RpcProgress.REFUSED + && noMoreTransparentRetry.compareAndSet(false, true))) { // transparent retry final Substream newSubstream = createSubstream(substream.previousAttemptCount, true); if (isHedging) { diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index a1d454ed2f..773dcb99dd 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -43,7 +43,7 @@ final class SubchannelChannel extends Channel { Status.UNAVAILABLE.withDescription( "wait-for-ready RPC is not supported on Subchannel.asChannel()"); private static final FailingClientTransport notReadyTransport = - new FailingClientTransport(NOT_READY_ERROR, RpcProgress.REFUSED); + new FailingClientTransport(NOT_READY_ERROR, RpcProgress.MISCARRIED); private final InternalSubchannel subchannel; private final Executor executor; private final ScheduledExecutorService deadlineCancellationExecutor; diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 8b851573b2..fe147a843a 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED; +import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS; @@ -1611,7 +1612,7 @@ public class RetriableStreamTest { } @Test - public void transparentRetry() { + public void transparentRetry_onlyOnceOnRefused() { ClientStream mockStream1 = mock(ClientStream.class); ClientStream mockStream2 = mock(ClientStream.class); ClientStream mockStream3 = mock(ClientStream.class); @@ -1661,6 +1662,55 @@ public class RetriableStreamTest { assertEquals(0, fakeClock.numPendingTasks()); } + @Test + public void transparentRetry_unlimitedTimesOnMiscarried() { + ClientStream mockStream1 = mock(ClientStream.class); + ClientStream mockStream2 = mock(ClientStream.class); + ClientStream mockStream3 = mock(ClientStream.class); + InOrder inOrder = inOrder( + retriableStreamRecorder, + mockStream1, mockStream2, mockStream3); + + // start + doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0); + retriableStream.start(masterListener); + + inOrder.verify(retriableStreamRecorder).newSubstream(0); + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); + inOrder.verifyNoMoreInteractions(); + + // transparent retry + doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0); + sublistenerCaptor1.getValue() + .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata()); + + inOrder.verify(retriableStreamRecorder).newSubstream(0); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); + inOrder.verifyNoMoreInteractions(); + verify(retriableStreamRecorder, never()).postCommit(); + assertEquals(0, fakeClock.numPendingTasks()); + + // more transparent retry + doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(0); + sublistenerCaptor2.getValue() + .closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata()); + + inOrder.verify(retriableStreamRecorder).newSubstream(0); + ArgumentCaptor sublistenerCaptor3 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); + inOrder.verifyNoMoreInteractions(); + verify(retriableStreamRecorder, never()).postCommit(); + assertEquals(0, fakeClock.numPendingTasks()); + } + @Test public void normalRetry_thenNoTransparentRetry_butNormalRetry() { ClientStream mockStream1 = mock(ClientStream.class); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java index 80cf83b093..229d873571 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/RetryTest.java @@ -74,7 +74,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -459,33 +458,41 @@ public class RetryTest { assertRetryStatsRecorded(0, 0, 0); } - @Ignore("flaky because old transportReportStatus() is not completely migrated yet") @Test public void transparentRetryStatsRecorded() throws Exception { startNewServer(); createNewChannel(); - final AtomicBoolean transparentRetryTriggered = new AtomicBoolean(); + final AtomicBoolean originalAttemptFailed = new AtomicBoolean(); class TransparentRetryTriggeringTracer extends ClientStreamTracer { @Override public void streamCreated(Attributes transportAttrs, Metadata metadata) { - if (transparentRetryTriggered.get()) { + if (originalAttemptFailed.get()) { return; } + // Send GOAWAY from server. The client may either receive GOAWAY or create the underlying + // netty stream and write headers first, even we await server termination as below. + // In the latter case, we rerun the test. We can also call localServer.shutdown() to trigger + // GOAWAY, but it takes a lot longer time to gracefully shut down. localServer.shutdownNow(); + try { + localServer.awaitTermination(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } } @Override public void streamClosed(Status status) { - if (transparentRetryTriggered.get()) { + if (originalAttemptFailed.get()) { return; } - transparentRetryTriggered.set(true); + originalAttemptFailed.set(true); try { startNewServer(); channel.resetConnectBackoff(); - channel.getState(true); } catch (Exception e) { throw new AssertionError("local server can not be restarted", e); } @@ -502,13 +509,28 @@ public class RetryTest { CallOptions callOptions = CallOptions.DEFAULT .withWaitForReady() .withStreamTracerFactory(new TransparentRetryTracerFactory()); - ClientCall call = channel.newCall(clientStreamingMethod, callOptions); - call.start(mockCallListener, new Metadata()); - assertRpcStartedRecorded(); - assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0); - assertRpcStartedRecorded(); - call.cancel("cancel", null); - assertRpcStatusRecorded(Code.CANCELLED, 0, 0); + while (true) { + ClientCall call = channel.newCall(clientStreamingMethod, callOptions); + call.start(mockCallListener, new Metadata()); + assertRpcStartedRecorded(); // original attempt + MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); + assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) + .isEqualTo(1); + TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); + if (statusTag.asString().equals(Code.UNAVAILABLE.toString())) { + break; + } else { + // Due to race condition, GOAWAY is not received/processed before the stream is closed due + // to connection error. Rerun the test. + assertThat(statusTag.asString()).isEqualTo(Code.UNKNOWN.toString()); + assertRetryStatsRecorded(0, 0, 0); + originalAttemptFailed.set(false); + } + } + assertRpcStartedRecorded(); // retry attempt + ServerCall serverCall = serverCalls.poll(5, SECONDS); + serverCall.close(Status.INVALID_ARGUMENT, new Metadata()); + assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0); assertRetryStatsRecorded(0, 1, 0); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 6dde8c825e..80d11e5485 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -535,7 +535,7 @@ class NettyClientHandler extends AbstractNettyHandler { // The connection is going away (it is really the GOAWAY case), // just terminate the stream now. command.stream().transportReportStatus( - lifecycleManager.getShutdownStatus(), RpcProgress.REFUSED, true, new Metadata()); + lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata()); promise.setFailure(lifecycleManager.getShutdownThrowable()); return; } @@ -576,7 +576,7 @@ class NettyClientHandler extends AbstractNettyHandler { // This should only be reachable during onGoAwayReceived, as otherwise // getShutdownThrowable() != null command.stream().setNonExistent(); - command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata()); + command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata()); promise.setFailure(s.asRuntimeException()); return; } @@ -635,18 +635,24 @@ class NettyClientHandler extends AbstractNettyHandler { // Just forward on the success status to the original promise. promise.setSuccess(); } else { - final Throwable cause = future.cause(); + Throwable cause = future.cause(); if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { StreamBufferingEncoder.Http2GoAwayException e = (StreamBufferingEncoder.Http2GoAwayException) cause; Status status = statusFromH2Error( Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream", e.errorCode(), e.debugData()); - stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata()); - promise.setFailure(status.asRuntimeException()); - } else { - promise.setFailure(cause); + cause = status.asRuntimeException(); + stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata()); + } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) { + Status status = lifecycleManager.getShutdownStatus(); + if (status == null) { + status = Status.UNAVAILABLE.withCause(cause) + .withDescription("Connection closed while stream is buffered"); + } + stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata()); } + promise.setFailure(cause); } } }); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0e7e69635a..9c6c12cc0f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -31,6 +31,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.AbstractClientStream; +import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; @@ -152,12 +153,18 @@ class NettyClientStream extends AbstractClientStream { // Stream creation failed. Close the stream if not already closed. // When the channel is shutdown, the lifecycle manager has a better view of the failure, // especially before negotiation completes (because the negotiator commonly doesn't - // receive the execeptionCaught because NettyClientHandler does not propagate it). + // receive the exceptionCaught because NettyClientHandler does not propagate it). Status s = transportState().handler.getLifecycleManager().getShutdownStatus(); if (s == null) { s = transportState().statusFromFailedFuture(future); } - transportState().transportReportStatus(s, true, new Metadata()); + if (transportState().isNonExistent()) { + transportState().transportReportStatus( + s, RpcProgress.MISCARRIED, true, new Metadata()); + } else { + transportState().transportReportStatus( + s, RpcProgress.PROCESSED, true, new Metadata()); + } } } }; @@ -268,7 +275,7 @@ class NettyClientStream extends AbstractClientStream { } boolean isNonExistent() { - return this.id == NON_EXISTENT_ID; + return this.id == NON_EXISTENT_ID || this.id == 0; } /** diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index d0d48fe9b4..d47942858a 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -18,6 +18,7 @@ package io.grpc.netty; import static com.google.common.base.Charsets.UTF_8; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED; import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; @@ -378,7 +379,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(Status.class); - verify(streamListener).closed(captor.capture(), same(REFUSED), + verify(streamListener).closed(captor.capture(), same(MISCARRIED), ArgumentMatchers.notNull()); assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); assertEquals( @@ -471,6 +472,20 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(Status.class); + verify(streamListener).closed(captor.capture(), same(MISCARRIED), ArgumentMatchers.notNull()); + assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); + } + @Test public void receivedGoAwayShouldFailNewStreams() throws Exception { // Read a GOAWAY that indicates our stream was never processed by the server.