From 071942dc3b9416c6d22754c8975faf13cd3befd2 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 26 Oct 2017 09:32:41 -0700 Subject: [PATCH] core: fix a race in InProcessTransport Resolves #3571 --- .../io/grpc/inprocess/InProcessTransport.java | 7 +++- .../testing/AbstractTransportTest.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 0559cc6f5d..3b518663e7 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -388,6 +388,12 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @Override public void close(Status status, Metadata trailers) { + // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise + // clientStreamListener.closed can trigger clientStream.cancel (see code in + // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are + // calling internalCancel(). + clientStream.serverClosed(Status.OK, status); + Status clientStatus = stripCause(status); synchronized (this) { if (closed) { @@ -403,7 +409,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } } - clientStream.serverClosed(Status.OK, status); streamClosed(); } diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index fe9402bc7d..6ae05d073f 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -947,6 +947,45 @@ public abstract class AbstractTransportTest { assertSame(status, serverStreamTracer1.getStatus()); } + @Test + public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mockClientTransportListener)); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + final ClientStream clientStream = + client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase() { + @Override + public void closed(Status status, Metadata trailers) { + super.closed(status, trailers); + // This simulates the blocking calls which can trigger clientStream.cancel(). + clientStream.cancel(Status.CANCELLED.withCause(status.asRuntimeException())); + } + }; + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ServerStream serverStream = serverStreamCreation.stream; + ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; + + Status strippedStatus = Status.INTERNAL.withDescription("I'm not listening"); + Status status = strippedStatus.withCause(new Exception()); + serverStream.close(status, new Metadata()); + assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals(status.getCode(), clientStreamStatus.getCode()); + assertEquals(status.getDescription(), clientStreamStatus.getDescription()); + assertNull(clientStreamStatus.getCause()); + assertTrue(clientStreamTracer1.getOutboundHeaders()); + assertSame(clientStreamStatus, clientStreamTracer1.getStatus()); + assertSame(status, serverStreamTracer1.getStatus()); + } + @Test public void clientCancel() throws Exception { server.start(serverListener);