diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java index 89bd3aa21f..6d8f47f07e 100644 --- a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Except import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Exception; @@ -46,10 +47,12 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { private static final ByteBuf payloadBuf = unreleasableBuffer(directBuffer(8).writeLong(BDP_MEASUREMENT_PING)); - AbstractNettyHandler(Http2ConnectionDecoder decoder, - Http2ConnectionEncoder encoder, - Http2Settings initialSettings) { - super(decoder, encoder, initialSettings); + AbstractNettyHandler( + ChannelPromise channelUnused, + Http2ConnectionDecoder decoder, + Http2ConnectionEncoder encoder, + Http2Settings initialSettings) { + super(channelUnused, decoder, encoder, initialSettings); // During a graceful shutdown, wait until all streams are closed. gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT); diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java index 18a6919354..7fedf3327e 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java @@ -18,20 +18,29 @@ package io.grpc.netty; import io.grpc.Attributes; import io.grpc.Internal; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Settings; +import javax.annotation.Nullable; /** * gRPC wrapper for {@link Http2ConnectionHandler}. */ @Internal public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler { - public GrpcHttp2ConnectionHandler(Http2ConnectionDecoder decoder, + + @Nullable + protected final ChannelPromise channelUnused; + + public GrpcHttp2ConnectionHandler( + ChannelPromise channelUnused, + Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) { super(decoder, encoder, initialSettings); + this.channelUnused = channelUnused; } /** @@ -44,4 +53,14 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler */ public void handleProtocolNegotiationCompleted(Attributes attrs) { } + + /** + * Calling this method indicates that the channel will no longer be used. This method is roughly + * the same as calling {@link #close} on the channel, but leaving the channel alive. This is + * useful if the channel will soon be deregistered from the executor and used in a non-Netty + * context. + */ + public void notifyUnused() { + channelUnused.setSuccess(null); + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index f680cd8e9c..1c8bd92e82 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -208,7 +208,7 @@ class NettyClientHandler extends AbstractNettyHandler { Supplier stopwatchFactory, final Runnable tooManyPingsRunnable, TransportTracer transportTracer) { - super(decoder, encoder, settings); + super(/* channelUnused= */ null, decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index c9ea596127..bd435c5f97 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -35,6 +35,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -156,6 +157,8 @@ class NettyServer implements InternalServer, InternalWithLogId { @Override public void initChannel(Channel ch) throws Exception { + ChannelPromise channelDone = ch.newPromise(); + long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos; if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { // apply a random jitter of +/-10% to max connection age @@ -165,13 +168,22 @@ class NettyServer implements InternalServer, InternalWithLogId { NettyServerTransport transport = new NettyServerTransport( - ch, protocolNegotiator, streamTracerFactories, transportTracerFactory.create(), + ch, + channelDone, + protocolNegotiator, + streamTracerFactories, + transportTracerFactory.create(), maxStreamsPerConnection, - flowControlWindow, maxMessageSize, maxHeaderListSize, - keepAliveTimeInNanos, keepAliveTimeoutInNanos, + flowControlWindow, + maxMessageSize, + maxHeaderListSize, + keepAliveTimeInNanos, + keepAliveTimeoutInNanos, maxConnectionIdleInNanos, - maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + maxConnectionAgeInNanos, + maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, + permitKeepAliveTimeInNanos); ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { @@ -185,13 +197,26 @@ class NettyServer implements InternalServer, InternalWithLogId { eventLoopReferenceCounter.retain(); transportListener = listener.transportCreated(transport); } - transport.start(transportListener); - ch.closeFuture().addListener(new ChannelFutureListener() { + + /** + * Releases the event loop if the channel is "done", possibly due to the channel closing. + */ + final class LoopReleaser implements ChannelFutureListener { + boolean done; + @Override - public void operationComplete(ChannelFuture future) { - eventLoopReferenceCounter.release(); + public void operationComplete(ChannelFuture future) throws Exception { + if (!done) { + done = true; + eventLoopReferenceCounter.release(); + } } - }); + } + + transport.start(transportListener); + ChannelFutureListener loopReleaser = new LoopReleaser(); + channelDone.addListener(loopReleaser); + ch.closeFuture().addListener(loopReleaser); } }); // Bind and start to accept incoming connections. diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 45d8eec7e2..7e90e86b31 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -126,6 +126,7 @@ class NettyServerHandler extends AbstractNettyHandler { static NettyServerHandler newHandler( ServerTransportListener transportListener, + ChannelPromise channelUnused, List streamTracerFactories, TransportTracer transportTracer, int maxStreams, @@ -147,6 +148,7 @@ class NettyServerHandler extends AbstractNettyHandler { Http2FrameWriter frameWriter = new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); return newHandler( + channelUnused, frameReader, frameWriter, transportListener, @@ -167,7 +169,9 @@ class NettyServerHandler extends AbstractNettyHandler { @VisibleForTesting static NettyServerHandler newHandler( - Http2FrameReader frameReader, Http2FrameWriter frameWriter, + ChannelPromise channelUnused, + Http2FrameReader frameReader, + Http2FrameWriter frameWriter, ServerTransportListener transportListener, List streamTracerFactories, TransportTracer transportTracer, @@ -210,6 +214,7 @@ class NettyServerHandler extends AbstractNettyHandler { settings.maxHeaderListSize(maxHeaderListSize); return new NettyServerHandler( + channelUnused, connection, transportListener, streamTracerFactories, @@ -223,6 +228,7 @@ class NettyServerHandler extends AbstractNettyHandler { } private NettyServerHandler( + ChannelPromise channelUnused, final Http2Connection connection, ServerTransportListener transportListener, List streamTracerFactories, @@ -237,7 +243,7 @@ class NettyServerHandler extends AbstractNettyHandler { long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer) { - super(decoder, encoder, settings); + super(channelUnused, decoder, encoder, settings); final MaxConnectionIdleManager maxConnectionIdleManager; if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 8a16fb93fb..f02f3fed8a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -32,6 +32,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPromise; import java.io.IOException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -53,6 +54,7 @@ class NettyServerTransport implements ServerTransport { private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); private final Channel channel; + private final ChannelPromise channelUnused; private final ProtocolNegotiator protocolNegotiator; private final int maxStreams; private ServerTransportListener listener; @@ -71,15 +73,24 @@ class NettyServerTransport implements ServerTransport { private final TransportTracer transportTracer; NettyServerTransport( - Channel channel, ProtocolNegotiator protocolNegotiator, + Channel channel, + ChannelPromise channelUnused, + ProtocolNegotiator protocolNegotiator, List streamTracerFactories, - TransportTracer transportTracer, int maxStreams, - int flowControlWindow, int maxMessageSize, int maxHeaderListSize, - long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, + TransportTracer transportTracer, + int maxStreams, + int flowControlWindow, + int maxMessageSize, + int maxHeaderListSize, + long keepAliveTimeInNanos, + long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, - long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, - boolean permitKeepAliveWithoutCalls,long permitKeepAliveTimeInNanos) { + long maxConnectionAgeInNanos, + long maxConnectionAgeGraceInNanos, + boolean permitKeepAliveWithoutCalls, + long permitKeepAliveTimeInNanos) { this.channel = Preconditions.checkNotNull(channel, "channel"); + this.channelUnused = channelUnused; this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); @@ -102,16 +113,25 @@ class NettyServerTransport implements ServerTransport { this.listener = listener; // Create the Netty handler for the pipeline. - final NettyServerHandler grpcHandler = createHandler(listener); + final NettyServerHandler grpcHandler = createHandler(listener, channelUnused); NettyHandlerSettings.setAutoWindow(grpcHandler); // Notify when the channel closes. - channel.closeFuture().addListener(new ChannelFutureListener() { + final class TerminationNotifier implements ChannelFutureListener { + boolean done; + @Override public void operationComplete(ChannelFuture future) throws Exception { - notifyTerminated(grpcHandler.connectionError()); + if (!done) { + done = true; + notifyTerminated(grpcHandler.connectionError()); + } } - }); + } + + ChannelFutureListener terminationNotifier = new TerminationNotifier(); + channelUnused.addListener(terminationNotifier); + channel.closeFuture().addListener(terminationNotifier); ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); channel.pipeline().addLast(negotiationHandler); @@ -196,13 +216,23 @@ class NettyServerTransport implements ServerTransport { /** * Creates the Netty handler to be used in the channel pipeline. */ - private NettyServerHandler createHandler(ServerTransportListener transportListener) { + private NettyServerHandler createHandler( + ServerTransportListener transportListener, ChannelPromise channelUnused) { return NettyServerHandler.newHandler( - transportListener, streamTracerFactories, transportTracer, maxStreams, - flowControlWindow, maxHeaderListSize, maxMessageSize, - keepAliveTimeInNanos, keepAliveTimeoutInNanos, + transportListener, + channelUnused, + streamTracerFactories, + transportTracer, + maxStreams, + flowControlWindow, + maxHeaderListSize, + maxMessageSize, + keepAliveTimeInNanos, + keepAliveTimeoutInNanos, maxConnectionIdleInNanos, - maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + maxConnectionAgeInNanos, + maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, + permitKeepAliveTimeInNanos); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index 9c938773ad..8864a94189 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -116,9 +116,9 @@ public abstract class NettyHandlerTestBase { frameWriter = mock(Http2FrameWriter.class, delegatesTo(new DefaultHttp2FrameWriter())); frameReader = new DefaultHttp2FrameReader(headersDecoder); + channel = new FakeClockSupportedChanel(); handler = newHandler(); - - channel = new FakeClockSupportedChanel(handler); + channel.pipeline().addLast(handler); ctx = channel.pipeline().context(handler); writeQueue = initWriteQueue(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 9a2f84b389..8e3def7b59 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -91,6 +91,8 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.DisableOnDebug; +import org.junit.rules.TestRule; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -107,7 +109,7 @@ import org.mockito.stubbing.Answer; public class NettyServerHandlerTest extends NettyHandlerTestBase { @Rule - public final Timeout globalTimeout = Timeout.seconds(1); + public final TestRule globalTimeout = new DisableOnDebug(Timeout.seconds(10)); private static final int STREAM_ID = 3; @@ -858,14 +860,23 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase