diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index a0ffba891e..1380b93f9f 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -37,6 +37,7 @@ import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; import java.io.InputStream; +import java.util.logging.Level; import java.util.logging.Logger; final class ServerCallImpl extends ServerCall { @@ -205,7 +206,8 @@ final class ServerCallImpl extends ServerCall { * on. */ private void internalClose(Status internalError) { - stream.close(internalError, new Metadata()); + log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError}); + stream.cancel(internalError); } /** diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index b787f9b2e4..292028f85f 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -629,8 +629,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use. */ private void internalClose() { - // TODO(ejona86): this is not thread-safe :) - stream.close(Status.UNKNOWN, new Metadata()); + stream.cancel(Status.INTERNAL); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index de084a2df2..39b4c5532a 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -192,10 +192,9 @@ public class ServerCallImplTest { verify(stream, times(1)).writeMessage(any(InputStream.class)); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); - verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture()); + verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(ServerCallImpl.TOO_MANY_RESPONSES, statusCaptor.getValue().getDescription()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); } @Test @@ -221,7 +220,7 @@ public class ServerCallImplTest { serverCall.sendMessage(1L); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); - verify(stream, times(1)).close(any(Status.class), any(Metadata.class)); + verify(stream, times(1)).cancel(any(Status.class)); // App runs to completion but everything is ignored serverCall.sendMessage(1L); @@ -255,11 +254,9 @@ public class ServerCallImplTest { CompressorRegistry.getDefaultInstance()); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); - verify(stream, times(1)).close(statusCaptor.capture(), metadataCaptor.capture()); + verify(stream, times(1)).cancel(statusCaptor.capture()); assertEquals(Status.Code.INTERNAL, statusCaptor.getValue().getCode()); assertEquals(ServerCallImpl.MISSING_RESPONSE, statusCaptor.getValue().getDescription()); - assertTrue(metadataCaptor.getValue().keys().isEmpty()); } @Test diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 0c1daa21f4..017b43ac8a 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -1108,7 +1108,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (TestError t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1133,7 +1133,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (RuntimeException t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1156,7 +1156,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (TestError t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1179,7 +1179,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (RuntimeException t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1202,7 +1202,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (TestError t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1225,7 +1225,7 @@ public class ServerImplTest { fail("Expected exception"); } catch (RuntimeException t) { assertSame(expectedT, t); - ensureServerStateNotLeaked(); + ensureServerStateIsCancelled(); } } @@ -1396,6 +1396,12 @@ public class ServerImplTest { assertTrue(metadataCaptor.getValue().keys().isEmpty()); } + private void ensureServerStateIsCancelled() { + verify(stream).cancel(statusCaptor.capture()); + assertEquals(Status.INTERNAL, statusCaptor.getValue()); + assertNull(statusCaptor.getValue().getCause()); + } + private static class SimpleServer implements io.grpc.internal.InternalServer { ServerListener listener; diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java index 26f3c88875..510a421b6a 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/MoreInProcessTest.java @@ -247,7 +247,7 @@ public class MoreInProcessTest { .onNext(StreamingInputCallRequest.getDefaultInstance()); assertTrue(finishLatch.await(900, TimeUnit.MILLISECONDS)); - assertEquals(Status.UNKNOWN, Status.fromThrowable(throwableRef.get())); + assertEquals(Status.CANCELLED.getCode(), Status.fromThrowable(throwableRef.get()).getCode()); assertNull(responseRef.get()); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 7c5b6d0a53..acf363b5dc 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -654,8 +654,13 @@ class NettyServerHandler extends AbstractNettyHandler { ChannelPromise promise) { // Notify the listener if we haven't already. cmd.stream().transportReportStatus(cmd.reason()); + Http2Error http2Error = Http2Error.INTERNAL_ERROR; + if (Status.DEADLINE_EXCEEDED.getCode().equals(cmd.reason().getCode()) || Status.CANCELLED + .getCode().equals(cmd.reason().getCode())) { + http2Error = Http2Error.CANCEL; + } // Terminate the stream. - encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); + encoder().writeRstStream(ctx, cmd.stream().id(), http2Error.code(), promise); } private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,