mirror of https://github.com/grpc/grpc-java.git
core: ensure ServerStream and ClientStream become noops after closing (#3092)
Add unit tests to enforce behavior that we already expect today. fixes #3083
This commit is contained in:
parent
0a8d761c19
commit
d42a4b24b9
|
|
@ -1245,6 +1245,79 @@ public abstract class AbstractTransportTest {
|
||||||
assertEquals(status.getDescription(), statusCaptor.getValue().getDescription());
|
assertEquals(status.getDescription(), statusCaptor.getValue().getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
|
||||||
|
server.start(serverListener);
|
||||||
|
client = newClientTransport(server);
|
||||||
|
runIfNotNull(client.start(mockClientTransportListener));
|
||||||
|
MockServerTransportListener serverTransportListener
|
||||||
|
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||||
|
serverTransport = serverTransportListener.transport;
|
||||||
|
|
||||||
|
// boilerplate
|
||||||
|
ClientStream clientStream =
|
||||||
|
client.newStream(methodDescriptor, new Metadata(), callOptions);
|
||||||
|
ClientStreamListener clientListener = mock(ClientStreamListener.class);
|
||||||
|
clientStream.start(clientListener);
|
||||||
|
StreamCreation server
|
||||||
|
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
// setup
|
||||||
|
clientStream.request(1);
|
||||||
|
server.stream.close(Status.INTERNAL, new Metadata());
|
||||||
|
verify(clientListener, timeout(TIMEOUT_MS).times(1))
|
||||||
|
.closed(any(Status.class), any(Metadata.class));
|
||||||
|
reset(clientListener);
|
||||||
|
|
||||||
|
// Ensure that for a closed ServerStream, interactions are noops
|
||||||
|
server.stream.writeHeaders(new Metadata());
|
||||||
|
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
|
||||||
|
server.stream.close(Status.INTERNAL, new Metadata());
|
||||||
|
// Even though the client requested a message earlier, the write should not go through
|
||||||
|
verify(clientListener, never()).headersRead(any(Metadata.class));
|
||||||
|
verify(clientListener, never()).messageRead(any(InputStream.class));
|
||||||
|
verify(clientListener, never()).closed(any(Status.class), any(Metadata.class));
|
||||||
|
|
||||||
|
// Make sure new streams still work properly
|
||||||
|
doPingPong(serverListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void interactionsAfterClientStreamCancelAreNoops() throws Exception {
|
||||||
|
server.start(serverListener);
|
||||||
|
client = newClientTransport(server);
|
||||||
|
runIfNotNull(client.start(mockClientTransportListener));
|
||||||
|
MockServerTransportListener serverTransportListener
|
||||||
|
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||||
|
serverTransport = serverTransportListener.transport;
|
||||||
|
|
||||||
|
// boilerplate
|
||||||
|
ClientStream clientStream =
|
||||||
|
client.newStream(methodDescriptor, new Metadata(), callOptions);
|
||||||
|
ClientStreamListener clientListener = mock(ClientStreamListener.class);
|
||||||
|
clientStream.start(clientListener);
|
||||||
|
StreamCreation server
|
||||||
|
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
// setup
|
||||||
|
server.stream.request(1);
|
||||||
|
clientStream.cancel(Status.UNKNOWN);
|
||||||
|
verify(server.listener, timeout(TIMEOUT_MS)).closed(any(Status.class));
|
||||||
|
reset(server.listener);
|
||||||
|
|
||||||
|
// Ensure that for a cancelled ClientStream, interactions are noops
|
||||||
|
clientStream.writeMessage(methodDescriptor.streamRequest("request"));
|
||||||
|
clientStream.halfClose();
|
||||||
|
clientStream.cancel(Status.UNKNOWN);
|
||||||
|
// Even though the server requested a message earlier, the write should not go through
|
||||||
|
verify(server.listener, never()).messageRead(any(InputStream.class));
|
||||||
|
verify(server.listener, never()).halfClosed();
|
||||||
|
verify(server.listener, never()).closed(any(Status.class));
|
||||||
|
|
||||||
|
// Make sure new streams still work properly
|
||||||
|
doPingPong(serverListener);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give
|
* Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give
|
||||||
* time for actions _not_ to happen. Since it is based on doing an actual RPC with actual
|
* time for actions _not_ to happen. Since it is based on doing an actual RPC with actual
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue