diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 2b1e8126fd..2b06a3fcf5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -678,9 +678,7 @@ class NettyServerHandler extends AbstractNettyHandler { return serverWriteQueue; } - /** - * Handler for commands sent from the stream. - */ + /** Handler for commands sent from the stream. */ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { @@ -720,31 +718,45 @@ class NettyServerHandler extends AbstractNettyHandler { } } - private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception { - final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); - if (stream != null) { - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - stream.complete(); - } - }); - } + private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) { + promise.addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + serverStream(stream).complete(); + } + }); } - /** - * Sends the given gRPC frame to the client. - */ - private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, - ChannelPromise promise) throws Http2Exception { + private static void streamGone(int streamId, ChannelPromise promise) { + promise.setFailure( + new IllegalStateException( + "attempting to write to stream " + streamId + " that no longer exists") { + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + }); + } + + /** Sends the given gRPC frame to the client. */ + private void sendGrpcFrame( + ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) + throws Http2Exception { try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) { PerfMark.attachTag(cmd.stream().tag()); PerfMark.linkIn(cmd.getLink()); + int streamId = cmd.stream().id(); + Http2Stream stream = connection().stream(streamId); + if (stream == null) { + streamGone(streamId, promise); + return; + } if (cmd.endStream()) { - closeStreamWhenDone(promise, cmd.stream().id()); + closeStreamWhenDone(promise, stream); } // Call the base class to write the HTTP/2 DATA frame. - encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); + encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise); } } @@ -756,16 +768,14 @@ class NettyServerHandler extends AbstractNettyHandler { try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) { PerfMark.attachTag(cmd.stream().tag()); PerfMark.linkIn(cmd.getLink()); - // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 - // is fixed. int streamId = cmd.stream().id(); Http2Stream stream = connection().stream(streamId); if (stream == null) { - resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); + streamGone(streamId, promise); return; } if (cmd.endOfStream()) { - closeStreamWhenDone(promise, streamId); + closeStreamWhenDone(promise, stream); } encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); }