Add test to prove RST closes stream, and remove hack from transport to force closure.

This commit is contained in:
Carl Mastrangelo 2015-08-31 10:41:57 -07:00
parent 5bb0ea9899
commit 396f0606f3
2 changed files with 18 additions and 6 deletions

View File

@ -33,6 +33,7 @@ package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -237,6 +238,22 @@ public class AbstractClientStreamTest {
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
} }
@Test
public void rstStreamClosesStream() {
AbstractClientStream<Integer> stream =
new BaseAbstractClientStream<Integer>(allocator, mockListener);
// The application will call request when waiting for a message, which will in turn call this
// on the transport thread.
stream.requestMessagesFromDeframer(1);
// Send first byte of 2 byte message
stream.deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false);
Status status = Status.INTERNAL;
// Simulate getting a reset
stream.transportReportStatus(status, false /*stop delivery*/, new Metadata());
assertTrue(stream.isClosed());
}
/** /**
* No-op base class for testing. * No-op base class for testing.
*/ */

View File

@ -45,7 +45,6 @@ import io.grpc.StatusException;
import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping; import io.grpc.internal.Http2Ping;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -226,11 +225,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception { private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
NettyClientStream stream = clientStream(requireHttp2Stream(streamId)); NettyClientStream stream = clientStream(requireHttp2Stream(streamId));
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode); Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode);
// TODO(carl-mastrangelo): This is a hack! Currently, due to a bug in the deframer, the stream stream.transportReportStatus(status, false /*stop delivery*/, new Metadata());
// listener might not be closed if the stream is stopped while in the middle of a recieving a
// message. This is a quick work around to get things working again, but should be changed
// back to not stopping delivery once a proper, thought-out fix is in place in the Deframer.
stream.transportReportStatus(status, true /*stop delivery*/, new Metadata());
} }
@Override @Override