diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 302dda1636..ee95906f66 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -40,6 +40,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -170,6 +171,52 @@ public abstract class AbstractTransportTest { // queued message InputStreams are closed on stream cancel // (and maybe exceptions handled) + /** + * Test for issue https://github.com/grpc/grpc-java/issues/1682 + */ + @Test + public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { + server.start(serverListener); + client.start(mockClientTransportListener); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + // Try to create a sequence of frames so that the client receives a HEADERS or DATA frame + // after having sent a RST_STREAM to the server. Previously, this would have broken the + // Netty channel. + + ClientStream stream = client.newStream(methodDescriptor, new Metadata()); + stream.start(mockClientStreamListener); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + stream.flush(); + stream.writeMessage(methodDescriptor.streamRequest("foo")); + stream.flush(); + stream.cancel(Status.CANCELLED); + stream.flush(); + serverStreamCreation.stream.writeHeaders(new Metadata()); + serverStreamCreation.stream.flush(); + serverStreamCreation.stream.writeMessage(methodDescriptor.streamResponse("bar")); + serverStreamCreation.stream.flush(); + + verify(mockClientStreamListener, timeout(250)) + .closed(eq(Status.CANCELLED), any(Metadata.class)); + + ClientStreamListener mockClientStreamListener2 = mock(ClientStreamListener.class); + + // Test that the channel is still usable i.e. we can receive headers from the server on a + // new stream. + stream = client.newStream(methodDescriptor, new Metadata()); + stream.start(mockClientStreamListener2); + serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverStreamCreation.stream.writeHeaders(new Metadata()); + serverStreamCreation.stream.flush(); + + verify(mockClientStreamListener2, timeout(250)).headersRead(any(Metadata.class)); + } + @Test public void serverNotListening() { server = null;