netty: make GrpcHttp2ConnectionHandler able to indicate it will no longer be used

This adds a method on GrpcHttp2ConnectionHandler which, when called, indicates that the channel associated with the handler is no longer needed.   

Notes:

* The handler may not be on the channel, but will either need to be added or will never be added.
* The channel will only be "unused" on the server side.
* It is expected that after calling `notifyUnused()`, the channel will be deregistered from the loop without being properly shut down.   This allows the channel to be handed off to a Non-netty API.
This commit is contained in:
Carl Mastrangelo 2018-01-11 14:12:12 -08:00 committed by GitHub
parent 6990e57a42
commit a3d801d07d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 138 additions and 44 deletions

View File

@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Except
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
@ -46,10 +47,12 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
private static final ByteBuf payloadBuf = private static final ByteBuf payloadBuf =
unreleasableBuffer(directBuffer(8).writeLong(BDP_MEASUREMENT_PING)); unreleasableBuffer(directBuffer(8).writeLong(BDP_MEASUREMENT_PING));
AbstractNettyHandler(Http2ConnectionDecoder decoder, AbstractNettyHandler(
Http2ConnectionEncoder encoder, ChannelPromise channelUnused,
Http2Settings initialSettings) { Http2ConnectionDecoder decoder,
super(decoder, encoder, initialSettings); Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(channelUnused, decoder, encoder, initialSettings);
// During a graceful shutdown, wait until all streams are closed. // During a graceful shutdown, wait until all streams are closed.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT); gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);

View File

@ -18,20 +18,29 @@ package io.grpc.netty;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Internal; import io.grpc.Internal;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import javax.annotation.Nullable;
/** /**
* gRPC wrapper for {@link Http2ConnectionHandler}. * gRPC wrapper for {@link Http2ConnectionHandler}.
*/ */
@Internal @Internal
public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler { public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler {
public GrpcHttp2ConnectionHandler(Http2ConnectionDecoder decoder,
@Nullable
protected final ChannelPromise channelUnused;
public GrpcHttp2ConnectionHandler(
ChannelPromise channelUnused,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) { Http2Settings initialSettings) {
super(decoder, encoder, initialSettings); super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
} }
/** /**
@ -44,4 +53,14 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
*/ */
public void handleProtocolNegotiationCompleted(Attributes attrs) { public void handleProtocolNegotiationCompleted(Attributes attrs) {
} }
/**
* Calling this method indicates that the channel will no longer be used. This method is roughly
* the same as calling {@link #close} on the channel, but leaving the channel alive. This is
* useful if the channel will soon be deregistered from the executor and used in a non-Netty
* context.
*/
public void notifyUnused() {
channelUnused.setSuccess(null);
}
} }

View File

@ -208,7 +208,7 @@ class NettyClientHandler extends AbstractNettyHandler {
Supplier<Stopwatch> stopwatchFactory, Supplier<Stopwatch> stopwatchFactory,
final Runnable tooManyPingsRunnable, final Runnable tooManyPingsRunnable,
TransportTracer transportTracer) { TransportTracer transportTracer) {
super(decoder, encoder, settings); super(/* channelUnused= */ null, decoder, encoder, settings);
this.lifecycleManager = lifecycleManager; this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager; this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory; this.stopwatchFactory = stopwatchFactory;

View File

@ -35,6 +35,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -156,6 +157,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
@Override @Override
public void initChannel(Channel ch) throws Exception { public void initChannel(Channel ch) throws Exception {
ChannelPromise channelDone = ch.newPromise();
long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos; long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
// apply a random jitter of +/-10% to max connection age // apply a random jitter of +/-10% to max connection age
@ -165,13 +168,22 @@ class NettyServer implements InternalServer, InternalWithLogId {
NettyServerTransport transport = NettyServerTransport transport =
new NettyServerTransport( new NettyServerTransport(
ch, protocolNegotiator, streamTracerFactories, transportTracerFactory.create(), ch,
channelDone,
protocolNegotiator,
streamTracerFactories,
transportTracerFactory.create(),
maxStreamsPerConnection, maxStreamsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize, flowControlWindow,
keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxMessageSize,
maxHeaderListSize,
keepAliveTimeInNanos,
keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, maxConnectionAgeInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls,
permitKeepAliveTimeInNanos);
ServerTransportListener transportListener; ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel. // This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) { synchronized (NettyServer.this) {
@ -185,13 +197,26 @@ class NettyServer implements InternalServer, InternalWithLogId {
eventLoopReferenceCounter.retain(); eventLoopReferenceCounter.retain();
transportListener = listener.transportCreated(transport); transportListener = listener.transportCreated(transport);
} }
transport.start(transportListener);
ch.closeFuture().addListener(new ChannelFutureListener() { /**
* Releases the event loop if the channel is "done", possibly due to the channel closing.
*/
final class LoopReleaser implements ChannelFutureListener {
boolean done;
@Override @Override
public void operationComplete(ChannelFuture future) { public void operationComplete(ChannelFuture future) throws Exception {
eventLoopReferenceCounter.release(); if (!done) {
done = true;
eventLoopReferenceCounter.release();
}
} }
}); }
transport.start(transportListener);
ChannelFutureListener loopReleaser = new LoopReleaser();
channelDone.addListener(loopReleaser);
ch.closeFuture().addListener(loopReleaser);
} }
}); });
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.

View File

@ -126,6 +126,7 @@ class NettyServerHandler extends AbstractNettyHandler {
static NettyServerHandler newHandler( static NettyServerHandler newHandler(
ServerTransportListener transportListener, ServerTransportListener transportListener,
ChannelPromise channelUnused,
List<ServerStreamTracer.Factory> streamTracerFactories, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, TransportTracer transportTracer,
int maxStreams, int maxStreams,
@ -147,6 +148,7 @@ class NettyServerHandler extends AbstractNettyHandler {
Http2FrameWriter frameWriter = Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return newHandler( return newHandler(
channelUnused,
frameReader, frameReader,
frameWriter, frameWriter,
transportListener, transportListener,
@ -167,7 +169,9 @@ class NettyServerHandler extends AbstractNettyHandler {
@VisibleForTesting @VisibleForTesting
static NettyServerHandler newHandler( static NettyServerHandler newHandler(
Http2FrameReader frameReader, Http2FrameWriter frameWriter, ChannelPromise channelUnused,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
ServerTransportListener transportListener, ServerTransportListener transportListener,
List<ServerStreamTracer.Factory> streamTracerFactories, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, TransportTracer transportTracer,
@ -210,6 +214,7 @@ class NettyServerHandler extends AbstractNettyHandler {
settings.maxHeaderListSize(maxHeaderListSize); settings.maxHeaderListSize(maxHeaderListSize);
return new NettyServerHandler( return new NettyServerHandler(
channelUnused,
connection, connection,
transportListener, transportListener,
streamTracerFactories, streamTracerFactories,
@ -223,6 +228,7 @@ class NettyServerHandler extends AbstractNettyHandler {
} }
private NettyServerHandler( private NettyServerHandler(
ChannelPromise channelUnused,
final Http2Connection connection, final Http2Connection connection,
ServerTransportListener transportListener, ServerTransportListener transportListener,
List<ServerStreamTracer.Factory> streamTracerFactories, List<ServerStreamTracer.Factory> streamTracerFactories,
@ -237,7 +243,7 @@ class NettyServerHandler extends AbstractNettyHandler {
long maxConnectionAgeInNanos, long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos, long maxConnectionAgeGraceInNanos,
final KeepAliveEnforcer keepAliveEnforcer) { final KeepAliveEnforcer keepAliveEnforcer) {
super(decoder, encoder, settings); super(channelUnused, decoder, encoder, settings);
final MaxConnectionIdleManager maxConnectionIdleManager; final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {

View File

@ -32,6 +32,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPromise;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -53,6 +54,7 @@ class NettyServerTransport implements ServerTransport {
private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
private final Channel channel; private final Channel channel;
private final ChannelPromise channelUnused;
private final ProtocolNegotiator protocolNegotiator; private final ProtocolNegotiator protocolNegotiator;
private final int maxStreams; private final int maxStreams;
private ServerTransportListener listener; private ServerTransportListener listener;
@ -71,15 +73,24 @@ class NettyServerTransport implements ServerTransport {
private final TransportTracer transportTracer; private final TransportTracer transportTracer;
NettyServerTransport( NettyServerTransport(
Channel channel, ProtocolNegotiator protocolNegotiator, Channel channel,
ChannelPromise channelUnused,
ProtocolNegotiator protocolNegotiator,
List<ServerStreamTracer.Factory> streamTracerFactories, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer, int maxStreams, TransportTracer transportTracer,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int maxStreams,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, int flowControlWindow,
int maxMessageSize,
int maxHeaderListSize,
long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos, long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, long maxConnectionAgeInNanos,
boolean permitKeepAliveWithoutCalls,long permitKeepAliveTimeInNanos) { long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos) {
this.channel = Preconditions.checkNotNull(channel, "channel"); this.channel = Preconditions.checkNotNull(channel, "channel");
this.channelUnused = channelUnused;
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = this.streamTracerFactories =
Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
@ -102,16 +113,25 @@ class NettyServerTransport implements ServerTransport {
this.listener = listener; this.listener = listener;
// Create the Netty handler for the pipeline. // Create the Netty handler for the pipeline.
final NettyServerHandler grpcHandler = createHandler(listener); final NettyServerHandler grpcHandler = createHandler(listener, channelUnused);
NettyHandlerSettings.setAutoWindow(grpcHandler); NettyHandlerSettings.setAutoWindow(grpcHandler);
// Notify when the channel closes. // Notify when the channel closes.
channel.closeFuture().addListener(new ChannelFutureListener() { final class TerminationNotifier implements ChannelFutureListener {
boolean done;
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
notifyTerminated(grpcHandler.connectionError()); if (!done) {
done = true;
notifyTerminated(grpcHandler.connectionError());
}
} }
}); }
ChannelFutureListener terminationNotifier = new TerminationNotifier();
channelUnused.addListener(terminationNotifier);
channel.closeFuture().addListener(terminationNotifier);
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
channel.pipeline().addLast(negotiationHandler); channel.pipeline().addLast(negotiationHandler);
@ -196,13 +216,23 @@ class NettyServerTransport implements ServerTransport {
/** /**
* Creates the Netty handler to be used in the channel pipeline. * Creates the Netty handler to be used in the channel pipeline.
*/ */
private NettyServerHandler createHandler(ServerTransportListener transportListener) { private NettyServerHandler createHandler(
ServerTransportListener transportListener, ChannelPromise channelUnused) {
return NettyServerHandler.newHandler( return NettyServerHandler.newHandler(
transportListener, streamTracerFactories, transportTracer, maxStreams, transportListener,
flowControlWindow, maxHeaderListSize, maxMessageSize, channelUnused,
keepAliveTimeInNanos, keepAliveTimeoutInNanos, streamTracerFactories,
transportTracer,
maxStreams,
flowControlWindow,
maxHeaderListSize,
maxMessageSize,
keepAliveTimeInNanos,
keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, maxConnectionAgeInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls,
permitKeepAliveTimeInNanos);
} }
} }

View File

@ -116,9 +116,9 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
frameWriter = mock(Http2FrameWriter.class, delegatesTo(new DefaultHttp2FrameWriter())); frameWriter = mock(Http2FrameWriter.class, delegatesTo(new DefaultHttp2FrameWriter()));
frameReader = new DefaultHttp2FrameReader(headersDecoder); frameReader = new DefaultHttp2FrameReader(headersDecoder);
channel = new FakeClockSupportedChanel();
handler = newHandler(); handler = newHandler();
channel.pipeline().addLast(handler);
channel = new FakeClockSupportedChanel(handler);
ctx = channel.pipeline().context(handler); ctx = channel.pipeline().context(handler);
writeQueue = initWriteQueue(); writeQueue = initWriteQueue();

View File

@ -91,6 +91,8 @@ import java.util.concurrent.TimeUnit;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
@ -107,7 +109,7 @@ import org.mockito.stubbing.Answer;
public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHandler> { public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHandler> {
@Rule @Rule
public final Timeout globalTimeout = Timeout.seconds(1); public final TestRule globalTimeout = new DisableOnDebug(Timeout.seconds(10));
private static final int STREAM_ID = 3; private static final int STREAM_ID = 3;
@ -858,14 +860,23 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
@Override @Override
protected NettyServerHandler newHandler() { protected NettyServerHandler newHandler() {
return NettyServerHandler.newHandler( return NettyServerHandler.newHandler(
frameReader(), frameWriter(), transportListener, /* channelUnused= */ channel().newPromise(),
Arrays.asList(streamTracerFactory), transportTracer, frameReader(),
maxConcurrentStreams, flowControlWindow, frameWriter(),
maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE, transportListener,
keepAliveTimeInNanos, keepAliveTimeoutInNanos, Arrays.asList(streamTracerFactory),
transportTracer,
maxConcurrentStreams,
flowControlWindow,
maxHeaderListSize,
DEFAULT_MAX_MESSAGE_SIZE,
keepAliveTimeInNanos,
keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, maxConnectionAgeInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls,
permitKeepAliveTimeInNanos);
} }
@Override @Override