diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index fc31d5986a..3f33b6da5e 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -33,6 +33,7 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.verify; @@ -237,6 +238,22 @@ public class AbstractClientStreamTest { assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode()); } + @Test + public void rstStreamClosesStream() { + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, mockListener); + // The application will call request when waiting for a message, which will in turn call this + // on the transport thread. + stream.requestMessagesFromDeframer(1); + // Send first byte of 2 byte message + stream.deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false); + Status status = Status.INTERNAL; + // Simulate getting a reset + stream.transportReportStatus(status, false /*stop delivery*/, new Metadata()); + + assertTrue(stream.isClosed()); + } + /** * No-op base class for testing. */ diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 2d0d5477e3..e3c037c310 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -45,7 +45,6 @@ import io.grpc.StatusException; import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -226,11 +225,7 @@ class NettyClientHandler extends Http2ConnectionHandler { private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception { NettyClientStream stream = clientStream(requireHttp2Stream(streamId)); Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode); - // TODO(carl-mastrangelo): This is a hack! Currently, due to a bug in the deframer, the stream - // listener might not be closed if the stream is stopped while in the middle of a recieving a - // message. This is a quick work around to get things working again, but should be changed - // back to not stopping delivery once a proper, thought-out fix is in place in the Deframer. - stream.transportReportStatus(status, true /*stop delivery*/, new Metadata()); + stream.transportReportStatus(status, false /*stop delivery*/, new Metadata()); } @Override