From 28f2647aaf3aea8484354324787f48b523238994 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 29 Sep 2021 09:42:59 -0700 Subject: [PATCH] core: move closed check from Stream.isReady() to Call.isReady() (#8566) This fixes data race described in #8565. We are doubtful whether checking closed in isReady() is necessary (#3201 might be a requirement), but it was easier to just maintain the existing behavior than think heavily about it. --- .../java/io/grpc/internal/AbstractStream.java | 3 --- .../java/io/grpc/internal/ClientCallImpl.java | 3 +++ .../java/io/grpc/internal/ServerCallImpl.java | 3 +++ .../io/grpc/internal/ClientCallImplTest.java | 18 ++++++++++++++++++ .../io/grpc/internal/ServerCallImplTest.java | 2 ++ .../io/grpc/netty/NettyStreamTestBase.java | 7 ------- 6 files changed, 26 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index e066018249..69df1eee8e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -91,9 +91,6 @@ public abstract class AbstractStream implements Stream { @Override public boolean isReady() { - if (framer().isClosed()) { - return false; - } return transportState().isReady(); } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 6f850ade66..db1a992b96 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -544,6 +544,9 @@ final class ClientCallImpl extends ClientCall { @Override public boolean isReady() { + if (halfCloseCalled) { + return false; + } return stream.isReady(); } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index deba21c315..b31aadd08a 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -194,6 +194,9 @@ final class ServerCallImpl extends ServerCall { @Override public boolean isReady() { + if (closeCalled) { + return false; + } return stream.isReady(); } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index ecf7a90b13..e409f2f9df 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -1022,6 +1022,24 @@ public class ClientCallImplTest { assertSame(cause, status.getCause()); } + @Test + public void halfClosedShouldNotBeReady() { + when(stream.isReady()).thenReturn(true); + ClientCallImpl call = new ClientCallImpl<>( + method, + MoreExecutors.directExecutor(), + baseCallOptions, + clientStreamProvider, + deadlineCancellationExecutor, + channelCallTracer, configSelector); + + call.start(callListener, new Metadata()); + assertThat(call.isReady()).isTrue(); + + call.halfClose(); + assertThat(call.isReady()).isFalse(); + } + @Test public void startAddsMaxSize() { CallOptions callOptions = diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index edf303a0bc..9c25f47480 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -334,6 +334,8 @@ public class ServerCallImplTest { when(stream.isReady()).thenReturn(true); assertTrue(call.isReady()); + call.close(Status.OK, new Metadata()); + assertFalse(call.isReady()); } @Test diff --git a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java index 0f7d54e794..dc245b3f50 100644 --- a/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyStreamTestBase.java @@ -138,13 +138,6 @@ public abstract class NettyStreamTestBase { assertTrue(stream.isReady()); } - @Test - public void closedShouldNotBeReady() throws IOException { - assertTrue(stream.isReady()); - closeStream(); - assertFalse(stream.isReady()); - } - @Test public void notifiedOnReadyAfterWriteCompletes() throws IOException { sendHeadersIfServer();