From 70ef5b1172cffc37c50ebff56c3e2d77fabf149b Mon Sep 17 00:00:00 2001 From: nmittler Date: Wed, 20 Jan 2016 15:11:50 -0800 Subject: [PATCH] Make handling of GoAwayClosedStreamException more consistent. --- .../BufferingHttp2ConnectionEncoder.java | 4 +- .../netty/GoAwayClosedStreamException.java | 8 +-- .../io/grpc/netty/NettyClientHandler.java | 58 ++++++++++++------- .../BufferingHttp2ConnectionEncoderTest.java | 6 +- .../io/grpc/netty/NettyClientHandlerTest.java | 32 +++++++--- 5 files changed, 70 insertions(+), 38 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/BufferingHttp2ConnectionEncoder.java b/netty/src/main/java/io/grpc/netty/BufferingHttp2ConnectionEncoder.java index fcdf6a57f4..53f84693bd 100644 --- a/netty/src/main/java/io/grpc/netty/BufferingHttp2ConnectionEncoder.java +++ b/netty/src/main/java/io/grpc/netty/BufferingHttp2ConnectionEncoder.java @@ -35,6 +35,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -217,7 +218,8 @@ class BufferingHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) { Iterator iter = pendingStreams.values().iterator(); - Exception e = new GoAwayClosedStreamException(lastStreamId, errorCode, debugData); + Exception e = new GoAwayClosedStreamException(lastStreamId, errorCode, + ByteBufUtil.getBytes(debugData)); while (iter.hasNext()) { PendingStream stream = iter.next(); if (stream.streamId > lastStreamId) { diff --git a/netty/src/main/java/io/grpc/netty/GoAwayClosedStreamException.java b/netty/src/main/java/io/grpc/netty/GoAwayClosedStreamException.java index 1e189c6f1c..8ac461c2b6 100644 --- a/netty/src/main/java/io/grpc/netty/GoAwayClosedStreamException.java +++ b/netty/src/main/java/io/grpc/netty/GoAwayClosedStreamException.java @@ -31,15 +31,13 @@ package io.grpc.netty; -import io.netty.buffer.ByteBuf; - class GoAwayClosedStreamException extends Exception { private static final long serialVersionUID = 1326785622777291198L; private final int lastStreamId; private final long errorCode; - private final ByteBuf debugData; + private final byte[] debugData; - GoAwayClosedStreamException(int lastStreamId, long errorCode, ByteBuf debugData) { + GoAwayClosedStreamException(int lastStreamId, long errorCode, byte[] debugData) { this.lastStreamId = lastStreamId; this.errorCode = errorCode; this.debugData = debugData; @@ -53,7 +51,7 @@ class GoAwayClosedStreamException extends Exception { return errorCode; } - ByteBuf debugData() { + byte[] debugData() { return debugData; } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index c6558b46df..f97a25265e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -47,6 +47,7 @@ import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -111,6 +112,7 @@ class NettyClientHandler extends AbstractNettyHandler { private WriteQueue clientWriteQueue; private Http2Ping ping; private Status goAwayStatus; + private Throwable goAwayStatusThrowable; private int nextStreamId; static NettyClientHandler newHandler(ClientTransport.Listener listener, @@ -186,7 +188,7 @@ class NettyClientHandler extends AbstractNettyHandler { connection.addListener(new Http2ConnectionAdapter() { @Override public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { - goAwayStatus(statusFromGoAway(errorCode, debugData)); + goAwayStatus(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData))); goingAway(); } }); @@ -350,7 +352,17 @@ class NettyClientHandler extends AbstractNettyHandler { final NettyClientStream stream = command.stream(); final Http2Headers headers = command.headers(); stream.id(streamId); - encoder().writeHeaders(ctx(), streamId, headers, 0, false, promise) + + if (goAwayStatus != null) { + // The connection is going away, just terminate the stream now. + promise.setFailure(goAwayStatusThrowable); + return; + } + + // Create an intermediate promise so that we can intercept the failure reported back to the + // application. + ChannelPromise tempPromise = ctx().newPromise(); + encoder().writeHeaders(ctx(), streamId, headers, 0, false, tempPromise) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -360,25 +372,25 @@ class NettyClientHandler extends AbstractNettyHandler { Http2Stream http2Stream = connection().stream(streamId); if (http2Stream != null) { http2Stream.setProperty(streamKey, stream); - } else if (stream.isClosed()) { - // The stream has been cancelled and Netty is sending a RST_STREAM frame which - // causes it to purge pending writes from the flow-controller and delete the - // http2Stream. The stream listener has already been notified of cancellation - // so there is nothing to do. - return; - } else { - throw new IllegalStateException("Stream closed but http2 stream not defined"); + + // Attach the client stream to the HTTP/2 stream object as user data. + stream.setHttp2Stream(http2Stream); } - // Attach the client stream to the HTTP/2 stream object as user data. - stream.setHttp2Stream(http2Stream); + // Otherwise, the stream has been cancelled and Netty is sending a + // RST_STREAM frame which causes it to purge pending writes from the + // flow-controller and delete the http2Stream. The stream listener has already + // been notified of cancellation so there is nothing to do. + + // Just forward on the success status to the original promise. + promise.setSuccess(); } else { - if (future.cause() instanceof GoAwayClosedStreamException) { - GoAwayClosedStreamException e = (GoAwayClosedStreamException) future.cause(); + final Throwable cause = future.cause(); + if (cause instanceof GoAwayClosedStreamException) { + GoAwayClosedStreamException e = (GoAwayClosedStreamException) cause; goAwayStatus(statusFromGoAway(e.errorCode(), e.debugData())); - stream.transportReportStatus(goAwayStatus, false, new Metadata()); + promise.setFailure(goAwayStatusThrowable); } else { - stream.transportReportStatus(Status.fromThrowable(future.cause()), true, - new Metadata()); + promise.setFailure(cause); } } } @@ -486,7 +498,11 @@ class NettyClientHandler extends AbstractNettyHandler { } private void goAwayStatus(Status status) { - goAwayStatus = goAwayStatus == null ? status : goAwayStatus; + // Don't overwrite if we already have a goAwayStatus. + if (goAwayStatus == null) { + goAwayStatus = status; + goAwayStatusThrowable = status.asException(); + } } private void cancelPing() { @@ -496,11 +512,11 @@ class NettyClientHandler extends AbstractNettyHandler { } } - private Status statusFromGoAway(long errorCode, ByteBuf debugData) { + private Status statusFromGoAway(long errorCode, byte[] debugData) { Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode); - if (debugData.isReadable()) { + if (debugData != null && debugData.length > 0) { // If a debug message was provided, use it. - String msg = debugData.toString(UTF_8); + String msg = new String(debugData, UTF_8); status = status.augmentDescription(msg); } return status; diff --git a/netty/src/test/java/io/grpc/netty/BufferingHttp2ConnectionEncoderTest.java b/netty/src/test/java/io/grpc/netty/BufferingHttp2ConnectionEncoderTest.java index 53e59d1a74..9cefe072cd 100644 --- a/netty/src/test/java/io/grpc/netty/BufferingHttp2ConnectionEncoderTest.java +++ b/netty/src/test/java/io/grpc/netty/BufferingHttp2ConnectionEncoderTest.java @@ -34,6 +34,7 @@ package io.grpc.netty; import static io.grpc.internal.GrpcUtil.Http2Error.CANCEL; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; +import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -83,6 +84,7 @@ import java.util.List; */ @RunWith(JUnit4.class) public class BufferingHttp2ConnectionEncoderTest { + private static final byte[] DEBUG_DATA = "test exception".getBytes(UTF_8); private BufferingHttp2ConnectionEncoder encoder; @@ -197,7 +199,7 @@ public class BufferingHttp2ConnectionEncoderTest { public void bufferingNewStreamFailsAfterGoAwayReceived() { encoder.writeSettingsAck(ctx, newPromise()); connection.local().maxActiveStreams(0); - connection.goAwayReceived(1, 8, null); + connection.goAwayReceived(1, 8, Unpooled.wrappedBuffer(DEBUG_DATA)); ChannelFuture future = encoderWriteHeaders(3); assertEquals(0, encoder.numBufferedStreams()); @@ -221,7 +223,7 @@ public class BufferingHttp2ConnectionEncoderTest { flush(); assertEquals(4, encoder.numBufferedStreams()); - connection.goAwayReceived(11, 8, null); + connection.goAwayReceived(11, 8, Unpooled.wrappedBuffer(DEBUG_DATA)); assertEquals(5, connection.numActiveStreams()); // The 4 buffered streams must have been failed. diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index bce9e6c360..d7a69f11bb 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -61,6 +61,7 @@ import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransport.PingCallback; +import io.grpc.internal.GrpcUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; @@ -264,8 +265,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(Status.class); - verify(stream).transportReportStatus(captor.capture(), eq(false), - notNull(Metadata.class)); - assertEquals(Status.CANCELLED.getCode(), captor.getValue().getCode()); - assertEquals("HTTP/2 error code: CANCEL\nthis is a test", - captor.getValue().getDescription()); + assertTrue(future.isDone()); + assertFalse(future.isSuccess()); + Status status = Status.fromThrowable(future.cause()); + assertEquals(Status.CANCELLED.getCode(), status.getCode()); + assertEquals("HTTP/2 error code: CANCEL\nthis is a test", status.getDescription()); + } + + @Test + public void receivedGoAwayShouldFailNewStreams() throws Exception { + // Read a GOAWAY that indicates our stream was never processed by the server. + channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8))); + + // Now try to create a stream. + ChannelFuture future = enqueue(new CreateStreamCommand(grpcHeaders, stream)); + assertTrue(future.isDone()); + assertFalse(future.isSuccess()); + Status status = Status.fromThrowable(future.cause()); + assertEquals(Status.CANCELLED.getCode(), status.getCode()); + assertEquals("HTTP/2 error code: CANCEL\nthis is a test", status.getDescription()); } @Test