mirror of https://github.com/grpc/grpc-java.git
netty: Differentiate GOAWAY closure status descriptions
With this, it will be clear if the RPC failed because the server didn't use a double-GOAWAY or if it failed because of MAX_CONCURRENT_STREAMS or if it was due to a local race. It also fixes the status code to be UNAVAILABLE except for the RPCs included in the GOAWAY error (modulo the Netty bug). Fixes #5855
This commit is contained in:
parent
59528d8efe
commit
735b85fb33
|
|
@ -345,7 +345,11 @@ public final class GrpcUtil {
|
||||||
|
|
||||||
Http2Error(int code, Status status) {
|
Http2Error(int code, Status status) {
|
||||||
this.code = code;
|
this.code = code;
|
||||||
this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
|
String description = "HTTP/2 error code: " + this.name();
|
||||||
|
if (status.getDescription() != null) {
|
||||||
|
description += " (" + status.getDescription() + ")";
|
||||||
|
}
|
||||||
|
this.status = status.withDescription(description);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
package io.grpc.netty;
|
package io.grpc.netty;
|
||||||
|
|
||||||
|
import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.internal.ManagedClientTransport;
|
import io.grpc.internal.ManagedClientTransport;
|
||||||
|
|
||||||
|
|
@ -55,13 +56,16 @@ final class ClientTransportLifecycleManager {
|
||||||
listener.transportShutdown(s);
|
listener.transportShutdown(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyShutdown(Status s) {
|
/** Returns {@code true} if was the first shutdown. */
|
||||||
|
@CanIgnoreReturnValue
|
||||||
|
public boolean notifyShutdown(Status s) {
|
||||||
notifyGracefulShutdown(s);
|
notifyGracefulShutdown(s);
|
||||||
if (shutdownStatus != null) {
|
if (shutdownStatus != null) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
shutdownStatus = s;
|
shutdownStatus = s;
|
||||||
shutdownThrowable = s.asException();
|
shutdownThrowable = s.asException();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void notifyInUse(boolean inUse) {
|
public void notifyInUse(boolean inUse) {
|
||||||
|
|
|
||||||
|
|
@ -130,6 +130,7 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
private Attributes attributes;
|
private Attributes attributes;
|
||||||
private InternalChannelz.Security securityInfo;
|
private InternalChannelz.Security securityInfo;
|
||||||
private Status abruptGoAwayStatus;
|
private Status abruptGoAwayStatus;
|
||||||
|
private Status channelInactiveReason;
|
||||||
|
|
||||||
static NettyClientHandler newHandler(
|
static NettyClientHandler newHandler(
|
||||||
ClientTransportLifecycleManager lifecycleManager,
|
ClientTransportLifecycleManager lifecycleManager,
|
||||||
|
|
@ -278,7 +279,7 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
@Override
|
@Override
|
||||||
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
|
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
|
||||||
byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
|
byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
|
||||||
goingAway(statusFromGoAway(errorCode, debugDataBytes));
|
goingAway(errorCode, debugDataBytes);
|
||||||
if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
|
if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
|
||||||
String data = new String(debugDataBytes, UTF_8);
|
String data = new String(debugDataBytes, UTF_8);
|
||||||
logger.log(
|
logger.log(
|
||||||
|
|
@ -400,8 +401,7 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
|
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
|
||||||
if (stream != null) {
|
if (stream != null) {
|
||||||
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
|
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
|
||||||
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
|
Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
|
||||||
.augmentDescription("Received Rst Stream");
|
|
||||||
stream.transportReportStatus(
|
stream.transportReportStatus(
|
||||||
status,
|
status,
|
||||||
errorCode == Http2Error.REFUSED_STREAM.code()
|
errorCode == Http2Error.REFUSED_STREAM.code()
|
||||||
|
|
@ -433,6 +433,12 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
logger.fine("Network channel is closed");
|
logger.fine("Network channel is closed");
|
||||||
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
|
Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
|
||||||
lifecycleManager.notifyShutdown(status);
|
lifecycleManager.notifyShutdown(status);
|
||||||
|
final Status streamStatus;
|
||||||
|
if (channelInactiveReason != null) {
|
||||||
|
streamStatus = channelInactiveReason;
|
||||||
|
} else {
|
||||||
|
streamStatus = lifecycleManager.getShutdownStatus();
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
cancelPing(lifecycleManager.getShutdownThrowable());
|
cancelPing(lifecycleManager.getShutdownThrowable());
|
||||||
// Report status to the application layer for any open streams
|
// Report status to the application layer for any open streams
|
||||||
|
|
@ -441,8 +447,7 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||||
NettyClientStream.TransportState clientStream = clientStream(stream);
|
NettyClientStream.TransportState clientStream = clientStream(stream);
|
||||||
if (clientStream != null) {
|
if (clientStream != null) {
|
||||||
clientStream.transportReportStatus(
|
clientStream.transportReportStatus(streamStatus, false, new Metadata());
|
||||||
lifecycleManager.getShutdownStatus(), false, new Metadata());
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
@ -630,8 +635,11 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
|
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
|
||||||
StreamBufferingEncoder.Http2GoAwayException e =
|
StreamBufferingEncoder.Http2GoAwayException e =
|
||||||
(StreamBufferingEncoder.Http2GoAwayException) cause;
|
(StreamBufferingEncoder.Http2GoAwayException) cause;
|
||||||
lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData()));
|
Status status = statusFromH2Error(
|
||||||
promise.setFailure(lifecycleManager.getShutdownThrowable());
|
Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
|
||||||
|
e.errorCode(), e.debugData());
|
||||||
|
stream.transportReportStatus(status, RpcProgress.REFUSED, true, new Metadata());
|
||||||
|
promise.setFailure(status.asRuntimeException());
|
||||||
} else {
|
} else {
|
||||||
promise.setFailure(cause);
|
promise.setFailure(cause);
|
||||||
}
|
}
|
||||||
|
|
@ -786,9 +794,20 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
* Handler for a GOAWAY being received. Fails any streams created after the
|
* Handler for a GOAWAY being received. Fails any streams created after the
|
||||||
* last known stream. May only be called during a read.
|
* last known stream. May only be called during a read.
|
||||||
*/
|
*/
|
||||||
private void goingAway(Status status) {
|
private void goingAway(long errorCode, byte[] debugData) {
|
||||||
lifecycleManager.notifyGracefulShutdown(status);
|
Status finalStatus = statusFromH2Error(
|
||||||
abruptGoAwayStatus = status;
|
Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
|
||||||
|
lifecycleManager.notifyGracefulShutdown(finalStatus);
|
||||||
|
abruptGoAwayStatus = statusFromH2Error(
|
||||||
|
Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
|
||||||
|
// While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
|
||||||
|
// fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
|
||||||
|
// we assume any sent streams may be related to the GOAWAY. This should rarely impact users
|
||||||
|
// since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
|
||||||
|
// there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
|
||||||
|
// UNAVAILABLE. https://github.com/netty/netty/issues/10670
|
||||||
|
final Status abruptGoAwayStatusConservative = statusFromH2Error(
|
||||||
|
null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
|
||||||
// Try to allocate as many in-flight streams as possible, to reduce race window of
|
// Try to allocate as many in-flight streams as possible, to reduce race window of
|
||||||
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
|
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
|
||||||
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
|
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
|
||||||
|
|
@ -798,9 +817,13 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
// This can cause reentrancy, but should be minor since it is normal to handle writes in
|
// This can cause reentrancy, but should be minor since it is normal to handle writes in
|
||||||
// response to a read. Also, the call stack is rather shallow at this point
|
// response to a read. Also, the call stack is rather shallow at this point
|
||||||
clientWriteQueue.drainNow();
|
clientWriteQueue.drainNow();
|
||||||
lifecycleManager.notifyShutdown(status);
|
if (lifecycleManager.notifyShutdown(finalStatus)) {
|
||||||
|
// This is for the only RPCs that are actually covered by the GOAWAY error code. All other
|
||||||
|
// RPCs were not observed by the remote and so should be UNAVAILABLE.
|
||||||
|
channelInactiveReason = statusFromH2Error(
|
||||||
|
null, "Connection closed after GOAWAY", errorCode, debugData);
|
||||||
|
}
|
||||||
|
|
||||||
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
|
|
||||||
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
|
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
|
||||||
try {
|
try {
|
||||||
connection().forEachActiveStream(new Http2StreamVisitor() {
|
connection().forEachActiveStream(new Http2StreamVisitor() {
|
||||||
|
|
@ -809,8 +832,13 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
if (stream.id() > lastKnownStream) {
|
if (stream.id() > lastKnownStream) {
|
||||||
NettyClientStream.TransportState clientStream = clientStream(stream);
|
NettyClientStream.TransportState clientStream = clientStream(stream);
|
||||||
if (clientStream != null) {
|
if (clientStream != null) {
|
||||||
|
// RpcProgress _should_ be REFUSED, but are being conservative. See comment for
|
||||||
|
// abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
|
||||||
|
// retries, but our main goal of transporent retries is to resolve the local race. We
|
||||||
|
// still hope/expect servers to use the graceful double-GOAWAY when closing
|
||||||
|
// connections.
|
||||||
clientStream.transportReportStatus(
|
clientStream.transportReportStatus(
|
||||||
goAwayStatus, RpcProgress.REFUSED, false, new Metadata());
|
abruptGoAwayStatusConservative, RpcProgress.PROCESSED, false, new Metadata());
|
||||||
}
|
}
|
||||||
stream.close();
|
stream.close();
|
||||||
}
|
}
|
||||||
|
|
@ -829,15 +857,20 @@ class NettyClientHandler extends AbstractNettyHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Status statusFromGoAway(long errorCode, byte[] debugData) {
|
/** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
|
||||||
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
|
private Status statusFromH2Error(
|
||||||
.augmentDescription("Received Goaway");
|
Status.Code statusCode, String context, long errorCode, byte[] debugData) {
|
||||||
|
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
|
||||||
|
if (statusCode == null) {
|
||||||
|
statusCode = status.getCode();
|
||||||
|
}
|
||||||
|
String debugString = "";
|
||||||
if (debugData != null && debugData.length > 0) {
|
if (debugData != null && debugData.length > 0) {
|
||||||
// If a debug message was provided, use it.
|
// If a debug message was provided, use it.
|
||||||
String msg = new String(debugData, UTF_8);
|
debugString = ", debug data: " + new String(debugData, UTF_8);
|
||||||
status = status.augmentDescription(msg);
|
|
||||||
}
|
}
|
||||||
return status;
|
return statusCode.toStatus()
|
||||||
|
.withDescription(context + ". " + status.getDescription() + debugString);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -330,24 +330,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
assertNull("no additional message expected", streamListenerMessageQueue.poll());
|
assertNull("no additional message expected", streamListenerMessageQueue.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void receivedGoAwayShouldCancelBufferedStream() throws Exception {
|
|
||||||
// Force the stream to be buffered.
|
|
||||||
receiveMaxConcurrentStreams(0);
|
|
||||||
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
|
||||||
channelRead(goAwayFrame(0));
|
|
||||||
assertTrue(future.isDone());
|
|
||||||
assertFalse(future.isSuccess());
|
|
||||||
Status status = Status.fromThrowable(future.cause());
|
|
||||||
assertEquals(Status.Code.UNAVAILABLE, status.getCode());
|
|
||||||
assertEquals("HTTP/2 error code: NO_ERROR\nReceived Goaway", status.getDescription());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
|
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
|
||||||
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
||||||
channelRead(goAwayFrame(streamId - 1));
|
channelRead(goAwayFrame(streamId - 1));
|
||||||
verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
|
// This _should_ be REFUSED, but we purposefully use PROCESSED. See comment for
|
||||||
|
// abruptGoAwayStatusConservative in NettyClientHandler
|
||||||
|
verify(streamListener).closed(any(Status.class), eq(PROCESSED), any(Metadata.class));
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -386,8 +375,10 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
verify(streamListener).closed(captor.capture(), same(REFUSED),
|
verify(streamListener).closed(captor.capture(), same(REFUSED),
|
||||||
ArgumentMatchers.<Metadata>notNull());
|
ArgumentMatchers.<Metadata>notNull());
|
||||||
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
|
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
|
||||||
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
|
assertEquals(
|
||||||
|
"Abrupt GOAWAY closed unsent stream. HTTP/2 error code: CANCEL, "
|
||||||
|
+ "debug data: this is a test",
|
||||||
captor.getValue().getDescription());
|
captor.getValue().getDescription());
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
}
|
}
|
||||||
|
|
@ -415,15 +406,18 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
// Read a GOAWAY that indicates our stream was never processed by the server.
|
// Read a GOAWAY that indicates our stream was never processed by the server.
|
||||||
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
|
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
|
||||||
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
|
||||||
verify(streamListener).closed(captor.capture(), same(REFUSED),
|
// See comment for abruptGoAwayStatusConservative in NettyClientHandler
|
||||||
|
verify(streamListener).closed(captor.capture(), same(PROCESSED),
|
||||||
ArgumentMatchers.<Metadata>notNull());
|
ArgumentMatchers.<Metadata>notNull());
|
||||||
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
|
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode());
|
||||||
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
|
assertEquals(
|
||||||
|
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: CANCEL, "
|
||||||
|
+ "debug data: this is a test",
|
||||||
captor.getValue().getDescription());
|
captor.getValue().getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
|
public void receivedGoAwayShouldFailBufferedStreams() throws Exception {
|
||||||
receiveMaxConcurrentStreams(0);
|
receiveMaxConcurrentStreams(0);
|
||||||
|
|
||||||
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
|
||||||
|
|
@ -433,8 +427,10 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
assertFalse(future.isSuccess());
|
assertFalse(future.isSuccess());
|
||||||
Status status = Status.fromThrowable(future.cause());
|
Status status = Status.fromThrowable(future.cause());
|
||||||
assertEquals(Status.CANCELLED.getCode(), status.getCode());
|
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
|
||||||
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
|
assertEquals(
|
||||||
|
"GOAWAY closed buffered stream. HTTP/2 error code: CANCEL, "
|
||||||
|
+ "debug data: this is a test",
|
||||||
status.getDescription());
|
status.getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -448,8 +444,10 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
|
||||||
assertTrue(future.isDone());
|
assertTrue(future.isDone());
|
||||||
assertFalse(future.isSuccess());
|
assertFalse(future.isSuccess());
|
||||||
Status status = Status.fromThrowable(future.cause());
|
Status status = Status.fromThrowable(future.cause());
|
||||||
assertEquals(Status.CANCELLED.getCode(), status.getCode());
|
assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
|
||||||
assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
|
assertEquals(
|
||||||
|
"GOAWAY shut down transport. HTTP/2 error code: CANCEL, "
|
||||||
|
+ "debug data: this is a test",
|
||||||
status.getDescription());
|
status.getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -529,7 +529,7 @@ public class NettyClientTransportTest {
|
||||||
Throwable rootCause = getRootCause(e);
|
Throwable rootCause = getRootCause(e);
|
||||||
Status status = ((StatusException) rootCause).getStatus();
|
Status status = ((StatusException) rootCause).getStatus();
|
||||||
assertEquals(Status.Code.INTERNAL, status.getCode());
|
assertEquals(Status.Code.INTERNAL, status.getCode());
|
||||||
assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
|
assertEquals("RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR",
|
||||||
status.getDescription());
|
status.getDescription());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue