netty: implement server max connection age

This commit is contained in:
ZHANG Dapeng 2017-04-10 11:32:28 -07:00 committed by GitHub
parent 44cca5507d
commit 83a06cc1a5
7 changed files with 225 additions and 25 deletions

View File

@ -79,6 +79,8 @@ class NettyServer implements InternalServer {
private final int maxHeaderListSize; private final int maxHeaderListSize;
private final long keepAliveTimeInNanos; private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos; private final long keepAliveTimeoutInNanos;
private final long maxConnectionAgeInNanos;
private final long maxConnectionAgeGraceInNanos;
private final boolean permitKeepAliveWithoutCalls; private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos; private final long permitKeepAliveTimeInNanos;
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
@ -90,6 +92,7 @@ class NettyServer implements InternalServer {
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories, ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
this.address = address; this.address = address;
this.channelType = checkNotNull(channelType, "channelType"); this.channelType = checkNotNull(channelType, "channelType");
@ -105,6 +108,8 @@ class NettyServer implements InternalServer {
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
} }
@ -141,8 +146,10 @@ class NettyServer implements InternalServer {
NettyServerTransport transport = NettyServerTransport transport =
new NettyServerTransport( new NettyServerTransport(
ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection, ch, protocolNegotiator, streamTracerFactories, maxStreamsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, flowControlWindow, maxMessageSize, maxHeaderListSize,
keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
ServerTransportListener transportListener; ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel. // This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) { synchronized (NettyServer.this) {

View File

@ -65,8 +65,12 @@ import javax.net.ssl.SSLException;
public final class NettyServerBuilder extends AbstractServerImplBuilder<NettyServerBuilder> { public final class NettyServerBuilder extends AbstractServerImplBuilder<NettyServerBuilder> {
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
private static final long MIN_KEEPALIVE_TIME_NANO = TimeUnit.MILLISECONDS.toNanos(1L); private static final long MIN_KEEPALIVE_TIME_NANO = TimeUnit.MILLISECONDS.toNanos(1L);
private static final long MIN_KEEPALIVE_TIMEOUT_NANO = TimeUnit.MICROSECONDS.toNanos(499L); private static final long MIN_KEEPALIVE_TIMEOUT_NANO = TimeUnit.MICROSECONDS.toNanos(499L);
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private final SocketAddress address; private final SocketAddress address;
@ -83,6 +87,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
private boolean permitKeepAliveWithoutCalls; private boolean permitKeepAliveWithoutCalls;
private long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5); private long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
@ -278,6 +284,43 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return this; return this;
} }
/**
* Sets a custom max connection age, connection lasting longer than which will be gracefully
* terminated. An unreasonably small value might be increased. {@code Long.MAX_VALUE} nano seconds
* or an unreasonably large value will disable max connection age.
*
* @since 1.3.0
*/
public NettyServerBuilder maxConnectionAge(long maxConnectionAge, TimeUnit timeUnit) {
checkArgument(maxConnectionAge > 0L, "max connection age must be positive");
maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge);
if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
}
if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) {
maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO;
}
return this;
}
/**
* Sets a custom grace time for the graceful connection termination. Once the max connection age
* is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be
* cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an
* unreasonably large value are considered infinite.
*
* @see #maxConnectionAge(long, TimeUnit)
* @since 1.3.0
*/
public NettyServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) {
checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative");
maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace);
if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
}
return this;
}
/** /**
* Specify the most aggressive keep-alive time clients are permitted to configure. The server will * Specify the most aggressive keep-alive time clients are permitted to configure. The server will
* try to detect clients exceeding this rate and when detected will forcefully close the * try to detect clients exceeding this rate and when detected will forcefully close the
@ -322,6 +365,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return new NettyServer(address, channelType, bossEventLoopGroup, workerEventLoopGroup, return new NettyServer(address, channelType, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow, negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
} }

View File

@ -34,6 +34,7 @@ package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
import static io.grpc.netty.Utils.HTTP_METHOD; import static io.grpc.netty.Utils.HTTP_METHOD;
import static io.grpc.netty.Utils.TE_HEADER; import static io.grpc.netty.Utils.TE_HEADER;
@ -50,6 +51,7 @@ import io.grpc.ServerStreamTracer;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StatsTraceContext;
import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ServerHeadersDecoder; import io.grpc.netty.GrpcHttp2HeadersDecoder.GrpcHttp2ServerHeadersDecoder;
@ -87,6 +89,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -106,6 +109,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private final int maxMessageSize; private final int maxMessageSize;
private final long keepAliveTimeInNanos; private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos; private final long keepAliveTimeoutInNanos;
private final long maxConnectionAgeInNanos;
private final long maxConnectionAgeGraceInNanos;
private final List<ServerStreamTracer.Factory> streamTracerFactories; private final List<ServerStreamTracer.Factory> streamTracerFactories;
private final KeepAliveEnforcer keepAliveEnforcer; private final KeepAliveEnforcer keepAliveEnforcer;
private Attributes attributes; private Attributes attributes;
@ -114,6 +119,7 @@ class NettyServerHandler extends AbstractNettyHandler {
private WriteQueue serverWriteQueue; private WriteQueue serverWriteQueue;
private AsciiString lastKnownAuthority; private AsciiString lastKnownAuthority;
private KeepAliveManager keepAliveManager; private KeepAliveManager keepAliveManager;
private ScheduledFuture<?> maxConnectionAgeMonitor;
static NettyServerHandler newHandler( static NettyServerHandler newHandler(
ServerTransportListener transportListener, ServerTransportListener transportListener,
@ -124,6 +130,8 @@ class NettyServerHandler extends AbstractNettyHandler {
int maxMessageSize, int maxMessageSize,
long keepAliveTimeInNanos, long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos, long keepAliveTimeoutInNanos,
long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos) { long permitKeepAliveTimeInNanos) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
@ -133,9 +141,12 @@ class NettyServerHandler extends AbstractNettyHandler {
new DefaultHttp2FrameReader(headersDecoder), frameLogger); new DefaultHttp2FrameReader(headersDecoder), frameLogger);
Http2FrameWriter frameWriter = Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return newHandler(frameReader, frameWriter, transportListener, streamTracerFactories, return newHandler(
maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, frameReader, frameWriter, transportListener, streamTracerFactories,
keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
} }
@VisibleForTesting @VisibleForTesting
@ -149,6 +160,8 @@ class NettyServerHandler extends AbstractNettyHandler {
int maxMessageSize, int maxMessageSize,
long keepAliveTimeInNanos, long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos, long keepAliveTimeoutInNanos,
long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos) { long permitKeepAliveTimeInNanos) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive"); Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive");
@ -191,8 +204,11 @@ class NettyServerHandler extends AbstractNettyHandler {
settings.maxConcurrentStreams(maxStreams); settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize); settings.maxHeaderListSize(maxHeaderListSize);
return new NettyServerHandler(transportListener, streamTracerFactories, decoder, encoder, return new NettyServerHandler(
settings, maxMessageSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, keepAliveEnforcer); transportListener, streamTracerFactories, decoder, encoder, settings, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer);
} }
private NettyServerHandler( private NettyServerHandler(
@ -203,12 +219,16 @@ class NettyServerHandler extends AbstractNettyHandler {
int maxMessageSize, int maxMessageSize,
long keepAliveTimeInNanos, long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos, long keepAliveTimeoutInNanos,
long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos,
KeepAliveEnforcer keepAliveEnforcer) { KeepAliveEnforcer keepAliveEnforcer) {
super(decoder, encoder, settings); super(decoder, encoder, settings);
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0"); checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer"); this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
streamKey = encoder.connection().newKey(); streamKey = encoder.connection().newKey();
@ -225,8 +245,41 @@ class NettyServerHandler extends AbstractNettyHandler {
} }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
serverWriteQueue = new WriteQueue(ctx.channel()); serverWriteQueue = new WriteQueue(ctx.channel());
// init max connection age monitor
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
maxConnectionAgeMonitor = ctx.executor().schedule(
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
// send GO_AWAY
ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age");
goAway(
ctx,
Integer.MAX_VALUE,
Http2Error.NO_ERROR.code(),
debugData,
ctx.newPromise());
// gracefully shutdown with specified grace time
long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis();
try {
gracefulShutdownTimeoutMillis(
TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos));
close(ctx, ctx.newPromise());
} catch (Exception e) {
onError(ctx, e);
} finally {
gracefulShutdownTimeoutMillis(savedGracefulShutdownTime);
}
}
}),
maxConnectionAgeInNanos,
TimeUnit.NANOSECONDS);
}
if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) { if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(), keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */); keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
@ -361,6 +414,9 @@ class NettyServerHandler extends AbstractNettyHandler {
if (keepAliveManager != null) { if (keepAliveManager != null) {
keepAliveManager.onTransportTermination(); keepAliveManager.onTransportTermination();
} }
if (maxConnectionAgeMonitor != null) {
maxConnectionAgeMonitor.cancel(false);
}
final Status status = final Status status =
Status.UNAVAILABLE.withDescription("connection terminated for unknown reason"); Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
// Any streams that are still active must be closed // Any streams that are still active must be closed

View File

@ -62,15 +62,19 @@ class NettyServerTransport implements ServerTransport {
private final int maxHeaderListSize; private final int maxHeaderListSize;
private final long keepAliveTimeInNanos; private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos; private final long keepAliveTimeoutInNanos;
private final long maxConnectionAgeInNanos;
private final long maxConnectionAgeGraceInNanos;
private final boolean permitKeepAliveWithoutCalls; private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos; private final long permitKeepAliveTimeInNanos;
private final List<ServerStreamTracer.Factory> streamTracerFactories; private final List<ServerStreamTracer.Factory> streamTracerFactories;
NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, NettyServerTransport(
Channel channel, ProtocolNegotiator protocolNegotiator,
List<ServerStreamTracer.Factory> streamTracerFactories, int maxStreams, List<ServerStreamTracer.Factory> streamTracerFactories, int maxStreams,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeoutInNanos, boolean permitKeepAliveWithoutCalls, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long permitKeepAliveTimeInNanos) { long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,long permitKeepAliveTimeInNanos) {
this.channel = Preconditions.checkNotNull(channel, "channel"); this.channel = Preconditions.checkNotNull(channel, "channel");
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = this.streamTracerFactories =
@ -81,6 +85,8 @@ class NettyServerTransport implements ServerTransport {
this.maxHeaderListSize = maxHeaderListSize; this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos; this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
} }
@ -145,8 +151,11 @@ class NettyServerTransport implements ServerTransport {
* Creates the Netty handler to be used in the channel pipeline. * Creates the Netty handler to be used in the channel pipeline.
*/ */
private NettyServerHandler createHandler(ServerTransportListener transportListener) { private NettyServerHandler createHandler(ServerTransportListener transportListener) {
return NettyServerHandler.newHandler(transportListener, streamTracerFactories, maxStreams, return NettyServerHandler.newHandler(
flowControlWindow, maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, transportListener, streamTracerFactories, maxStreams,
keepAliveTimeoutInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); flowControlWindow, maxHeaderListSize, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
} }
} }

View File

@ -38,6 +38,8 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -482,7 +484,8 @@ public class NettyClientTransportTest {
NioServerSocketChannel.class, group, group, negotiator, NioServerSocketChannel.class, group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(), maxStreamsPerConnection, Collections.<ServerStreamTracer.Factory>emptyList(), maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, true, 0); DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0);
server.start(serverListener); server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort()); address = TestUtils.testServerAddress(server.getPort());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());

View File

@ -33,6 +33,8 @@ package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC; import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC;
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
import static io.grpc.netty.Utils.HTTP_METHOD; import static io.grpc.netty.Utils.HTTP_METHOD;
@ -122,6 +124,8 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
private int maxHeaderListSize = Integer.MAX_VALUE; private int maxHeaderListSize = Integer.MAX_VALUE;
private boolean permitKeepAliveWithoutCalls = true; private boolean permitKeepAliveWithoutCalls = true;
private long permitKeepAliveTimeInNanos = 0; private long permitKeepAliveTimeInNanos = 0;
private long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
private class ServerTransportListenerImpl implements ServerTransportListener { private class ServerTransportListenerImpl implements ServerTransportListener {
@ -529,6 +533,84 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class)); eq(Http2Error.ENHANCE_YOUR_CALM.code()), any(ByteBuf.class), any(ChannelPromise.class));
} }
@Test
public void noGoAwaySentBeforeMaxConnectionAgeReached() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MINUTES.toNanos(30L);
setUp();
Thread.sleep(10L);
channel().runPendingTasks();
// GO_AWAY not sent yet
verifyWrite(never()).writeGoAway(
any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
}
@Test
public void maxConnectionAge_goAwaySent() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
setUp();
assertTrue(channel().isOpen());
Thread.sleep(10L);
channel().runPendingTasks();
// GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
public void maxConnectionAgeGrace_channelStillOpenDuringGracePeriod() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L);
setUp();
createStream();
Thread.sleep(10L);
channel().runPendingTasks();
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
Thread.sleep(10L);
channel().runPendingTasks();
// channel not closed yet
assertTrue(channel().isOpen());
}
@Test
public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
maxConnectionAgeGraceInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
setUp();
createStream();
// runPendingTasks so that GO_AWAY is sent and the forceful shutdown is scheduled
Thread.sleep(10L);
channel().runPendingTasks();
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
assertTrue(channel().isOpen());
// need runPendingTasks again so that the forceful shutdown can be executed
Thread.sleep(10L);
channel().runPendingTasks();
// channel closed
assertTrue(!channel().isOpen());
}
private void createStream() throws Exception { private void createStream() throws Exception {
Http2Headers headers = new DefaultHttp2Headers() Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD) .method(HTTP_METHOD)
@ -560,7 +642,8 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
return NettyServerHandler.newHandler(frameReader(), frameWriter(), transportListener, return NettyServerHandler.newHandler(frameReader(), frameWriter(), transportListener,
Arrays.asList(streamTracerFactory), maxConcurrentStreams, flowControlWindow, Arrays.asList(streamTracerFactory), maxConcurrentStreams, flowControlWindow,
maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE,
2000L, 100L, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); 2000L, 100L, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
} }
@Override @Override

View File

@ -61,10 +61,9 @@ public class NettyServerTest {
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, 1, // ignore
1, // ignore 1, 1, // ignore
true, // ignore true, 0); // ignore
0); // ignore
ns.start(new ServerListener() { ns.start(new ServerListener() {
@Override @Override
public ServerTransportListener transportCreated(ServerTransport transport) { public ServerTransportListener transportCreated(ServerTransport transport) {
@ -96,10 +95,9 @@ public class NettyServerTest {
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, // ignore
1, // ignore 1, 1, // ignore
1, // ignore 1, 1, // ignore
true, // ignore true, 0); // ignore
0); // ignore
assertThat(ns.getPort()).isEqualTo(-1); assertThat(ns.getPort()).isEqualTo(-1);
} }