core/netty: infinite local-only transparent retry for netty (#8878)

In core, add a new enum element to `RpcProgress` for the case that the stream is closed even before anything leaves the client. `RetriableStream` will do unlimited transparent retry for this type of `RpcProgress` since they are local-only.

In netty, call `tranportReportStatus()` for pending streams on failure.

Also fixes #8394
This commit is contained in:
ZHANG Dapeng 2022-02-04 22:12:03 -08:00 committed by GitHub
parent 467985e958
commit 431fb0255f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 139 additions and 32 deletions

View File

@ -57,12 +57,16 @@ public interface ClientStreamListener extends StreamListener {
*/
PROCESSED,
/**
* The RPC is not processed by the server's application logic.
* The stream on the wire is created but not processed by the server's application logic.
*/
REFUSED,
/**
* The RPC is dropped (by load balancer).
*/
DROPPED
DROPPED,
/**
* The stream is closed even before anything leaves the client.
*/
MISCARRIED
}
}

View File

@ -101,7 +101,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
false, 0);
/**
* Either transparent retry happened or reached server's application logic.
* Either non-local transparent retry happened or reached server's application logic.
*
* <p>Note that local-only transparent retries are unlimited.
*/
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
@ -851,8 +853,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
if (state.winningSubstream == null) {
if (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true)) {
if (rpcProgress == RpcProgress.MISCARRIED
|| (rpcProgress == RpcProgress.REFUSED
&& noMoreTransparentRetry.compareAndSet(false, true))) {
// transparent retry
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
if (isHedging) {

View File

@ -43,7 +43,7 @@ final class SubchannelChannel extends Channel {
Status.UNAVAILABLE.withDescription(
"wait-for-ready RPC is not supported on Subchannel.asChannel()");
private static final FailingClientTransport notReadyTransport =
new FailingClientTransport(NOT_READY_ERROR, RpcProgress.REFUSED);
new FailingClientTransport(NOT_READY_ERROR, RpcProgress.MISCARRIED);
private final InternalSubchannel subchannel;
private final Executor executor;
private final ScheduledExecutorService deadlineCancellationExecutor;

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.DROPPED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.RetriableStream.GRPC_PREVIOUS_RPC_ATTEMPTS;
@ -1611,7 +1612,7 @@ public class RetriableStreamTest {
}
@Test
public void transparentRetry() {
public void transparentRetry_onlyOnceOnRefused() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
@ -1661,6 +1662,55 @@ public class RetriableStreamTest {
assertEquals(0, fakeClock.numPendingTasks());
}
@Test
public void transparentRetry_unlimitedTimesOnMiscarried() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
ClientStream mockStream3 = mock(ClientStream.class);
InOrder inOrder = inOrder(
retriableStreamRecorder,
mockStream1, mockStream2, mockStream3);
// start
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
retriableStream.start(masterListener);
inOrder.verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
// transparent retry
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
sublistenerCaptor1.getValue()
.closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
// more transparent retry
doReturn(mockStream3).when(retriableStreamRecorder).newSubstream(0);
sublistenerCaptor2.getValue()
.closed(Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
inOrder.verify(retriableStreamRecorder).newSubstream(0);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
verify(retriableStreamRecorder, never()).postCommit();
assertEquals(0, fakeClock.numPendingTasks());
}
@Test
public void normalRetry_thenNoTransparentRetry_butNormalRetry() {
ClientStream mockStream1 = mock(ClientStream.class);

View File

@ -74,7 +74,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -459,33 +458,41 @@ public class RetryTest {
assertRetryStatsRecorded(0, 0, 0);
}
@Ignore("flaky because old transportReportStatus() is not completely migrated yet")
@Test
public void transparentRetryStatsRecorded() throws Exception {
startNewServer();
createNewChannel();
final AtomicBoolean transparentRetryTriggered = new AtomicBoolean();
final AtomicBoolean originalAttemptFailed = new AtomicBoolean();
class TransparentRetryTriggeringTracer extends ClientStreamTracer {
@Override
public void streamCreated(Attributes transportAttrs, Metadata metadata) {
if (transparentRetryTriggered.get()) {
if (originalAttemptFailed.get()) {
return;
}
// Send GOAWAY from server. The client may either receive GOAWAY or create the underlying
// netty stream and write headers first, even we await server termination as below.
// In the latter case, we rerun the test. We can also call localServer.shutdown() to trigger
// GOAWAY, but it takes a lot longer time to gracefully shut down.
localServer.shutdownNow();
try {
localServer.awaitTermination();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
@Override
public void streamClosed(Status status) {
if (transparentRetryTriggered.get()) {
if (originalAttemptFailed.get()) {
return;
}
transparentRetryTriggered.set(true);
originalAttemptFailed.set(true);
try {
startNewServer();
channel.resetConnectBackoff();
channel.getState(true);
} catch (Exception e) {
throw new AssertionError("local server can not be restarted", e);
}
@ -502,13 +509,28 @@ public class RetryTest {
CallOptions callOptions = CallOptions.DEFAULT
.withWaitForReady()
.withStreamTracerFactory(new TransparentRetryTracerFactory());
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded();
assertRpcStatusRecorded(Code.UNAVAILABLE, 0, 0);
assertRpcStartedRecorded();
call.cancel("cancel", null);
assertRpcStatusRecorded(Code.CANCELLED, 0, 0);
while (true) {
ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
call.start(mockCallListener, new Metadata());
assertRpcStartedRecorded(); // original attempt
MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
.isEqualTo(1);
TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
if (statusTag.asString().equals(Code.UNAVAILABLE.toString())) {
break;
} else {
// Due to race condition, GOAWAY is not received/processed before the stream is closed due
// to connection error. Rerun the test.
assertThat(statusTag.asString()).isEqualTo(Code.UNKNOWN.toString());
assertRetryStatsRecorded(0, 0, 0);
originalAttemptFailed.set(false);
}
}
assertRpcStartedRecorded(); // retry attempt
ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
serverCall.close(Status.INVALID_ARGUMENT, new Metadata());
assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0);
assertRetryStatsRecorded(0, 1, 0);
}
}

View File

@ -535,7 +535,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// The connection is going away (it is really the GOAWAY case),
// just terminate the stream now.
command.stream().transportReportStatus(
lifecycleManager.getShutdownStatus(), RpcProgress.REFUSED, true, new Metadata());
lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
promise.setFailure(lifecycleManager.getShutdownThrowable());
return;
}
@ -576,7 +576,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// This should only be reachable during onGoAwayReceived, as otherwise
// getShutdownThrowable() != null
command.stream().setNonExistent();
command.stream().transportReportStatus(s, RpcProgress.REFUSED, true, new Metadata());
command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
promise.setFailure(s.asRuntimeException());
return;
}
@ -635,18 +635,24 @@ class NettyClientHandler extends AbstractNettyHandler {
// Just forward on the success status to the original promise.
promise.setSuccess();
} else {
final Throwable cause = future.cause();
Throwable cause = future.cause();
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
StreamBufferingEncoder.Http2GoAwayException e =
(StreamBufferingEncoder.Http2GoAwayException) cause;
Status status = statusFromH2Error(
Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
e.errorCode(), e.debugData());
stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata());
promise.setFailure(status.asRuntimeException());
} else {
promise.setFailure(cause);
cause = status.asRuntimeException();
stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
} else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
Status status = lifecycleManager.getShutdownStatus();
if (status == null) {
status = Status.UNAVAILABLE.withCause(cause)
.withDescription("Connection closed while stream is buffered");
}
stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
}
promise.setFailure(cause);
}
}
});

View File

@ -31,6 +31,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
@ -152,12 +153,18 @@ class NettyClientStream extends AbstractClientStream {
// Stream creation failed. Close the stream if not already closed.
// When the channel is shutdown, the lifecycle manager has a better view of the failure,
// especially before negotiation completes (because the negotiator commonly doesn't
// receive the execeptionCaught because NettyClientHandler does not propagate it).
// receive the exceptionCaught because NettyClientHandler does not propagate it).
Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
if (s == null) {
s = transportState().statusFromFailedFuture(future);
}
transportState().transportReportStatus(s, true, new Metadata());
if (transportState().isNonExistent()) {
transportState().transportReportStatus(
s, RpcProgress.MISCARRIED, true, new Metadata());
} else {
transportState().transportReportStatus(
s, RpcProgress.PROCESSED, true, new Metadata());
}
}
}
};
@ -268,7 +275,7 @@ class NettyClientStream extends AbstractClientStream {
}
boolean isNonExistent() {
return this.id == NON_EXISTENT_ID;
return this.id == NON_EXISTENT_ID || this.id == 0;
}
/**

View File

@ -18,6 +18,7 @@ package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientStreamListener.RpcProgress.MISCARRIED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
@ -378,7 +379,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(REFUSED),
verify(streamListener).closed(captor.capture(), same(MISCARRIED),
ArgumentMatchers.<Metadata>notNull());
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
assertEquals(
@ -471,6 +472,20 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
status.getDescription());
}
@Test
public void channelClosureShouldFailBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0);
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
channel().pipeline().fireChannelInactive();
assertTrue(future.isDone());
assertFalse(future.isSuccess());
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture(), same(MISCARRIED), ArgumentMatchers.notNull());
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
}
@Test
public void receivedGoAwayShouldFailNewStreams() throws Exception {
// Read a GOAWAY that indicates our stream was never processed by the server.