From d42a4b24b990aceb241ad4810318a2b33c1664d3 Mon Sep 17 00:00:00 2001 From: zpencer Date: Tue, 13 Jun 2017 11:01:30 -0700 Subject: [PATCH] core: ensure ServerStream and ClientStream become noops after closing (#3092) Add unit tests to enforce behavior that we already expect today. fixes #3083 --- .../testing/AbstractTransportTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 1fe2fba86a..fa776ee968 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -1245,6 +1245,79 @@ public abstract class AbstractTransportTest { assertEquals(status.getDescription(), statusCaptor.getValue().getDescription()); } + @Test + public void interactionsAfterServerStreamCloseAreNoops() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mockClientTransportListener)); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + // boilerplate + ClientStream clientStream = + client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListener clientListener = mock(ClientStreamListener.class); + clientStream.start(clientListener); + StreamCreation server + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // setup + clientStream.request(1); + server.stream.close(Status.INTERNAL, new Metadata()); + verify(clientListener, timeout(TIMEOUT_MS).times(1)) + .closed(any(Status.class), any(Metadata.class)); + reset(clientListener); + + // Ensure that for a closed ServerStream, interactions are noops + server.stream.writeHeaders(new Metadata()); + server.stream.writeMessage(methodDescriptor.streamResponse("response")); + server.stream.close(Status.INTERNAL, new Metadata()); + // Even though the client requested a message earlier, the write should not go through + verify(clientListener, never()).headersRead(any(Metadata.class)); + verify(clientListener, never()).messageRead(any(InputStream.class)); + verify(clientListener, never()).closed(any(Status.class), any(Metadata.class)); + + // Make sure new streams still work properly + doPingPong(serverListener); + } + + @Test + public void interactionsAfterClientStreamCancelAreNoops() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mockClientTransportListener)); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + // boilerplate + ClientStream clientStream = + client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListener clientListener = mock(ClientStreamListener.class); + clientStream.start(clientListener); + StreamCreation server + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // setup + server.stream.request(1); + clientStream.cancel(Status.UNKNOWN); + verify(server.listener, timeout(TIMEOUT_MS)).closed(any(Status.class)); + reset(server.listener); + + // Ensure that for a cancelled ClientStream, interactions are noops + clientStream.writeMessage(methodDescriptor.streamRequest("request")); + clientStream.halfClose(); + clientStream.cancel(Status.UNKNOWN); + // Even though the server requested a message earlier, the write should not go through + verify(server.listener, never()).messageRead(any(InputStream.class)); + verify(server.listener, never()).halfClosed(); + verify(server.listener, never()).closed(any(Status.class)); + + // Make sure new streams still work properly + doPingPong(serverListener); + } + /** * Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give * time for actions _not_ to happen. Since it is based on doing an actual RPC with actual