core: fix a race in InProcessTransport

Resolves #3571
This commit is contained in:
ZHANG Dapeng 2017-10-26 09:32:41 -07:00 committed by GitHub
parent 255643bed9
commit 071942dc3b
2 changed files with 45 additions and 1 deletions

View File

@ -388,6 +388,12 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
@Override
public void close(Status status, Metadata trailers) {
// clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
// clientStreamListener.closed can trigger clientStream.cancel (see code in
// ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
// calling internalCancel().
clientStream.serverClosed(Status.OK, status);
Status clientStatus = stripCause(status);
synchronized (this) {
if (closed) {
@ -403,7 +409,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
}
clientStream.serverClosed(Status.OK, status);
streamClosed();
}

View File

@ -947,6 +947,45 @@ public abstract class AbstractTransportTest {
assertSame(status, serverStreamTracer1.getStatus());
}
@Test
public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
final ClientStream clientStream =
client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase() {
@Override
public void closed(Status status, Metadata trailers) {
super.closed(status, trailers);
// This simulates the blocking calls which can trigger clientStream.cancel().
clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException()));
}
};
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening");
Status status = strippedStatus.withCause(new Exception());
serverStream.close(status, new Metadata());
assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertEquals(status.getCode(), clientStreamStatus.getCode());
assertEquals(status.getDescription(), clientStreamStatus.getDescription());
assertNull(clientStreamStatus.getCause());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
assertSame(status, serverStreamTracer1.getStatus());
}
@Test
public void clientCancel() throws Exception {
server.start(serverListener);