Make handling of GoAwayClosedStreamException more consistent.

This commit is contained in:
nmittler 2016-01-20 15:11:50 -08:00
parent 484c46c62b
commit 70ef5b1172
5 changed files with 70 additions and 38 deletions

View File

@ -35,6 +35,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
@ -217,7 +218,8 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) { private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) {
Iterator<PendingStream> iter = pendingStreams.values().iterator(); Iterator<PendingStream> iter = pendingStreams.values().iterator();
Exception e = new GoAwayClosedStreamException(lastStreamId, errorCode, debugData); Exception e = new GoAwayClosedStreamException(lastStreamId, errorCode,
ByteBufUtil.getBytes(debugData));
while (iter.hasNext()) { while (iter.hasNext()) {
PendingStream stream = iter.next(); PendingStream stream = iter.next();
if (stream.streamId > lastStreamId) { if (stream.streamId > lastStreamId) {

View File

@ -31,15 +31,13 @@
package io.grpc.netty; package io.grpc.netty;
import io.netty.buffer.ByteBuf;
class GoAwayClosedStreamException extends Exception { class GoAwayClosedStreamException extends Exception {
private static final long serialVersionUID = 1326785622777291198L; private static final long serialVersionUID = 1326785622777291198L;
private final int lastStreamId; private final int lastStreamId;
private final long errorCode; private final long errorCode;
private final ByteBuf debugData; private final byte[] debugData;
GoAwayClosedStreamException(int lastStreamId, long errorCode, ByteBuf debugData) { GoAwayClosedStreamException(int lastStreamId, long errorCode, byte[] debugData) {
this.lastStreamId = lastStreamId; this.lastStreamId = lastStreamId;
this.errorCode = errorCode; this.errorCode = errorCode;
this.debugData = debugData; this.debugData = debugData;
@ -53,7 +51,7 @@ class GoAwayClosedStreamException extends Exception {
return errorCode; return errorCode;
} }
ByteBuf debugData() { byte[] debugData() {
return debugData; return debugData;
} }
} }

View File

@ -47,6 +47,7 @@ import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping; import io.grpc.internal.Http2Ping;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -111,6 +112,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private WriteQueue clientWriteQueue; private WriteQueue clientWriteQueue;
private Http2Ping ping; private Http2Ping ping;
private Status goAwayStatus; private Status goAwayStatus;
private Throwable goAwayStatusThrowable;
private int nextStreamId; private int nextStreamId;
static NettyClientHandler newHandler(ClientTransport.Listener listener, static NettyClientHandler newHandler(ClientTransport.Listener listener,
@ -186,7 +188,7 @@ class NettyClientHandler extends AbstractNettyHandler {
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
goAwayStatus(statusFromGoAway(errorCode, debugData)); goAwayStatus(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData)));
goingAway(); goingAway();
} }
}); });
@ -350,7 +352,17 @@ class NettyClientHandler extends AbstractNettyHandler {
final NettyClientStream stream = command.stream(); final NettyClientStream stream = command.stream();
final Http2Headers headers = command.headers(); final Http2Headers headers = command.headers();
stream.id(streamId); stream.id(streamId);
encoder().writeHeaders(ctx(), streamId, headers, 0, false, promise)
if (goAwayStatus != null) {
// The connection is going away, just terminate the stream now.
promise.setFailure(goAwayStatusThrowable);
return;
}
// Create an intermediate promise so that we can intercept the failure reported back to the
// application.
ChannelPromise tempPromise = ctx().newPromise();
encoder().writeHeaders(ctx(), streamId, headers, 0, false, tempPromise)
.addListener(new ChannelFutureListener() { .addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
@ -360,25 +372,25 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2Stream http2Stream = connection().stream(streamId); Http2Stream http2Stream = connection().stream(streamId);
if (http2Stream != null) { if (http2Stream != null) {
http2Stream.setProperty(streamKey, stream); http2Stream.setProperty(streamKey, stream);
} else if (stream.isClosed()) {
// The stream has been cancelled and Netty is sending a RST_STREAM frame which // Attach the client stream to the HTTP/2 stream object as user data.
// causes it to purge pending writes from the flow-controller and delete the stream.setHttp2Stream(http2Stream);
// http2Stream. The stream listener has already been notified of cancellation
// so there is nothing to do.
return;
} else {
throw new IllegalStateException("Stream closed but http2 stream not defined");
} }
// Attach the client stream to the HTTP/2 stream object as user data. // Otherwise, the stream has been cancelled and Netty is sending a
stream.setHttp2Stream(http2Stream); // RST_STREAM frame which causes it to purge pending writes from the
// flow-controller and delete the http2Stream. The stream listener has already
// been notified of cancellation so there is nothing to do.
// Just forward on the success status to the original promise.
promise.setSuccess();
} else { } else {
if (future.cause() instanceof GoAwayClosedStreamException) { final Throwable cause = future.cause();
GoAwayClosedStreamException e = (GoAwayClosedStreamException) future.cause(); if (cause instanceof GoAwayClosedStreamException) {
GoAwayClosedStreamException e = (GoAwayClosedStreamException) cause;
goAwayStatus(statusFromGoAway(e.errorCode(), e.debugData())); goAwayStatus(statusFromGoAway(e.errorCode(), e.debugData()));
stream.transportReportStatus(goAwayStatus, false, new Metadata()); promise.setFailure(goAwayStatusThrowable);
} else { } else {
stream.transportReportStatus(Status.fromThrowable(future.cause()), true, promise.setFailure(cause);
new Metadata());
} }
} }
} }
@ -486,7 +498,11 @@ class NettyClientHandler extends AbstractNettyHandler {
} }
private void goAwayStatus(Status status) { private void goAwayStatus(Status status) {
goAwayStatus = goAwayStatus == null ? status : goAwayStatus; // Don't overwrite if we already have a goAwayStatus.
if (goAwayStatus == null) {
goAwayStatus = status;
goAwayStatusThrowable = status.asException();
}
} }
private void cancelPing() { private void cancelPing() {
@ -496,11 +512,11 @@ class NettyClientHandler extends AbstractNettyHandler {
} }
} }
private Status statusFromGoAway(long errorCode, ByteBuf debugData) { private Status statusFromGoAway(long errorCode, byte[] debugData) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode); Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
if (debugData.isReadable()) { if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it. // If a debug message was provided, use it.
String msg = debugData.toString(UTF_8); String msg = new String(debugData, UTF_8);
status = status.augmentDescription(msg); status = status.augmentDescription(msg);
} }
return status; return status;

View File

@ -34,6 +34,7 @@ package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.Http2Error.CANCEL; import static io.grpc.internal.GrpcUtil.Http2Error.CANCEL;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -83,6 +84,7 @@ import java.util.List;
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class BufferingHttp2ConnectionEncoderTest { public class BufferingHttp2ConnectionEncoderTest {
private static final byte[] DEBUG_DATA = "test exception".getBytes(UTF_8);
private BufferingHttp2ConnectionEncoder encoder; private BufferingHttp2ConnectionEncoder encoder;
@ -197,7 +199,7 @@ public class BufferingHttp2ConnectionEncoderTest {
public void bufferingNewStreamFailsAfterGoAwayReceived() { public void bufferingNewStreamFailsAfterGoAwayReceived() {
encoder.writeSettingsAck(ctx, newPromise()); encoder.writeSettingsAck(ctx, newPromise());
connection.local().maxActiveStreams(0); connection.local().maxActiveStreams(0);
connection.goAwayReceived(1, 8, null); connection.goAwayReceived(1, 8, Unpooled.wrappedBuffer(DEBUG_DATA));
ChannelFuture future = encoderWriteHeaders(3); ChannelFuture future = encoderWriteHeaders(3);
assertEquals(0, encoder.numBufferedStreams()); assertEquals(0, encoder.numBufferedStreams());
@ -221,7 +223,7 @@ public class BufferingHttp2ConnectionEncoderTest {
flush(); flush();
assertEquals(4, encoder.numBufferedStreams()); assertEquals(4, encoder.numBufferedStreams());
connection.goAwayReceived(11, 8, null); connection.goAwayReceived(11, 8, Unpooled.wrappedBuffer(DEBUG_DATA));
assertEquals(5, connection.numActiveStreams()); assertEquals(5, connection.numActiveStreams());
// The 4 buffered streams must have been failed. // The 4 buffered streams must have been failed.

View File

@ -61,6 +61,7 @@ import io.grpc.Status;
import io.grpc.StatusException; import io.grpc.StatusException;
import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -264,8 +265,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
channelRead(goAwayFrame(0)); channelRead(goAwayFrame(0));
assertTrue(future.isDone()); assertTrue(future.isDone());
assertFalse(future.isSuccess()); assertFalse(future.isSuccess());
verify(stream).transportReportStatus(any(Status.class), eq(false), Status status = Status.fromThrowable(future.cause());
notNull(Metadata.class)); assertEquals(GrpcUtil.Http2Error.NO_ERROR.status(), status);
} }
@Test @Test
@ -286,16 +287,29 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception { public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
receiveMaxConcurrentStreams(0); receiveMaxConcurrentStreams(0);
enqueue(new CreateStreamCommand(grpcHeaders, stream)); ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, stream));
// Read a GOAWAY that indicates our stream was never processed by the server. // Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); assertTrue(future.isDone());
verify(stream).transportReportStatus(captor.capture(), eq(false), assertFalse(future.isSuccess());
notNull(Metadata.class)); Status status = Status.fromThrowable(future.cause());
assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("HTTP/2 error code: CANCEL\nthis is a test", assertEquals("HTTP/2 error code: CANCEL\nthis is a test", status.getDescription());
captor.getValue().getDescription()); }
@Test
public void receivedGoAwayShouldFailNewStreams() throws Exception {
// Read a GOAWAY that indicates our stream was never processed by the server.
channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
// Now try to create a stream.
ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, stream));
assertTrue(future.isDone());
assertFalse(future.isSuccess());
Status status = Status.fromThrowable(future.cause());
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("HTTP/2 error code: CANCEL\nthis is a test", status.getDescription());
} }
@Test @Test