diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index f6a87d3a8e..203a28cf44 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -59,6 +59,7 @@ import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** @@ -220,6 +221,13 @@ class NettyClientTransport implements ConnectionClientTransport { b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. b.option(SO_KEEPALIVE, true); + // For non-epoll based channel, the option will be ignored. + if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) { + ChannelOption tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption(); + if (tcpUserTimeout != null) { + b.option(tcpUserTimeout, (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeNanos)); + } + } for (Map.Entry, ?> entry : channelOptions.entrySet()) { // Every entry in the map is obtained from // NettyChannelBuilder#withOption(ChannelOption option, T value) diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index db52284f87..6675779479 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; /** * Common utility methods. @@ -71,7 +72,6 @@ class Utils { public static final AsciiString TE_HEADER = AsciiString.of(GrpcUtil.TE_HEADER.name()); public static final AsciiString TE_TRAILERS = AsciiString.of(GrpcUtil.TE_TRAILERS); public static final AsciiString USER_AGENT = AsciiString.of(GrpcUtil.USER_AGENT_KEY.name()); - public static final Resource NIO_BOSS_EVENT_LOOP_GROUP = new DefaultEventLoopGroupResource(1, "grpc-nio-boss-ELG", NioEventLoopGroup.class); public static final Resource NIO_WORKER_EVENT_LOOP_GROUP @@ -281,6 +281,31 @@ class Utils { } } + /** + * Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise + * null. + */ + @Nullable + static ChannelOption maybeGetTcpUserTimeoutOption() { + return getEpollChannelOption("TCP_USER_TIMEOUT"); + } + + @Nullable + @SuppressWarnings("unchecked") + private static ChannelOption getEpollChannelOption(String optionName) { + if (isEpollAvailable()) { + try { + return + (ChannelOption) Class.forName("io.netty.channel.epoll.EpollChannelOption") + .getField(optionName) + .get(null); + } catch (Exception e) { + throw new RuntimeException("ChannelOption(" + optionName + ") is not available", e); + } + } + return null; + } + private static final class DefaultEventLoopGroupResource implements Resource { private final String name; private final int numEventLoops; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index f0a557ab9f..6224a5d67e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -18,6 +18,7 @@ package io.grpc.netty; import static com.google.common.base.Charsets.UTF_8; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.TruthJUnit.assume; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; @@ -69,6 +70,7 @@ import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.ReflectiveChannelFactory; import io.netty.channel.local.LocalChannel; import io.netty.channel.nio.NioEventLoopGroup; @@ -480,7 +482,8 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true, - new ReflectiveChannelFactory<>(NioSocketChannel.class)); + TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(NioSocketChannel.class), + group); callMeMaybe(transport.start(clientTransportListener)); @@ -493,7 +496,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true, - new ReflectiveChannelFactory<>(LocalChannel.class)); + TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(LocalChannel.class), group); callMeMaybe(transport.start(clientTransportListener)); @@ -609,6 +612,55 @@ public class NettyClientTransportTest { assertNull(transport.keepAliveManager()); } + @Test + public void keepAliveEnabled_shouldSetTcpUserTimeout() throws Exception { + assume().that(Utils.isEpollAvailable()).isTrue(); + + startServer(); + EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create(); + int keepAliveTimeMillis = 1234567; + try { + NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */, + TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis), + new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup); + + callMeMaybe(transport.start(clientTransportListener)); + + ChannelOption tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption(); + assertThat(tcpUserTimeoutOption).isNotNull(); + // on some linux based system, the integer value may have error (usually +-1) + assertThat((double) transport.channel().config().getOption(tcpUserTimeoutOption)) + .isWithin(5.0).of((double) keepAliveTimeMillis); + } finally { + epollGroup.shutdownGracefully(); + } + } + + @Test + public void keepAliveDisabled_shouldNotSetTcpUserTimeout() throws Exception { + assume().that(Utils.isEpollAvailable()).isTrue(); + + startServer(); + EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create(); + int keepAliveTimeMillis = 12345670; + try { + NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */, + TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis), + new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup); + + callMeMaybe(transport.start(clientTransportListener)); + + ChannelOption tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption(); + assertThat(tcpUserTimeoutOption).isNotNull(); + // default TCP_USER_TIMEOUT=0 (use the system default) + assertThat(transport.channel().config().getOption(tcpUserTimeoutOption)).isEqualTo(0); + } finally { + epollGroup.shutdownGracefully(); + } + } + private Throwable getRootCause(Throwable t) { if (t.getCause() == null) { return t; @@ -631,16 +683,16 @@ public class NettyClientTransportTest { private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent, boolean enableKeepAlive) { return newTransport(negotiator, maxMsgSize, maxHeaderListSize, userAgent, enableKeepAlive, - new ReflectiveChannelFactory<>(NioSocketChannel.class)); + TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(NioSocketChannel.class), + group); } private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize, - int maxHeaderListSize, String userAgent, boolean enableKeepAlive, - ChannelFactory channelFactory) { - long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; + int maxHeaderListSize, String userAgent, boolean enableKeepAlive, long keepAliveTimeNano, + ChannelFactory channelFactory, EventLoopGroup group) { long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L); - if (enableKeepAlive) { - keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L); + if (!enableKeepAlive) { + keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; } NettyClientTransport transport = new NettyClientTransport( address, channelFactory, new HashMap, Object>(), group, diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java index d9dc4dd2a0..bb66d29c2a 100644 --- a/netty/src/test/java/io/grpc/netty/UtilsTest.java +++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java @@ -212,4 +212,11 @@ public class UtilsTest { assertThat(clientChannelType.getName()) .isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel"); } + + @Test + public void maybeGetTcpUserTimeoutOption() { + assume().that(Utils.isEpollAvailable()).isTrue(); + + assertThat(Utils.maybeGetTcpUserTimeoutOption()).isNotNull(); + } }