netty: maybe set TCP_USER_TIMEOUT when epoll and keepalive is enabled (#5599)

This commit is contained in:
Jihun Cho 2019-04-18 10:12:36 -07:00 committed by GitHub
parent 39e66fa22b
commit 2cdaac2adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 9 deletions

View File

@ -59,6 +59,7 @@ import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
@ -220,6 +221,13 @@ class NettyClientTransport implements ConnectionClientTransport {
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
b.option(SO_KEEPALIVE, true);
// For non-epoll based channel, the option will be ignored.
if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
ChannelOption<Integer> tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption();
if (tcpUserTimeout != null) {
b.option(tcpUserTimeout, (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeNanos));
}
}
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
// Every entry in the map is obtained from
// NettyChannelBuilder#withOption(ChannelOption<T> option, T value)

View File

@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
/**
* Common utility methods.
@ -71,7 +72,6 @@ class Utils {
public static final AsciiString TE_HEADER = AsciiString.of(GrpcUtil.TE_HEADER.name());
public static final AsciiString TE_TRAILERS = AsciiString.of(GrpcUtil.TE_TRAILERS);
public static final AsciiString USER_AGENT = AsciiString.of(GrpcUtil.USER_AGENT_KEY.name());
public static final Resource<EventLoopGroup> NIO_BOSS_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(1, "grpc-nio-boss-ELG", NioEventLoopGroup.class);
public static final Resource<EventLoopGroup> NIO_WORKER_EVENT_LOOP_GROUP
@ -281,6 +281,31 @@ class Utils {
}
}
/**
* Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise
* null.
*/
@Nullable
static ChannelOption<Integer> maybeGetTcpUserTimeoutOption() {
return getEpollChannelOption("TCP_USER_TIMEOUT");
}
@Nullable
@SuppressWarnings("unchecked")
private static <T> ChannelOption<T> getEpollChannelOption(String optionName) {
if (isEpollAvailable()) {
try {
return
(ChannelOption<T>) Class.forName("io.netty.channel.epoll.EpollChannelOption")
.getField(optionName)
.get(null);
} catch (Exception e) {
throw new RuntimeException("ChannelOption(" + optionName + ") is not available", e);
}
}
return null;
}
private static final class DefaultEventLoopGroupResource implements Resource<EventLoopGroup> {
private final String name;
private final int numEventLoops;

View File

@ -18,6 +18,7 @@ package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
@ -69,6 +70,7 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ReflectiveChannelFactory;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@ -480,7 +482,8 @@ public class NettyClientTransportTest {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true,
new ReflectiveChannelFactory<>(NioSocketChannel.class));
TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(NioSocketChannel.class),
group);
callMeMaybe(transport.start(clientTransportListener));
@ -493,7 +496,7 @@ public class NettyClientTransportTest {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true,
new ReflectiveChannelFactory<>(LocalChannel.class));
TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(LocalChannel.class), group);
callMeMaybe(transport.start(clientTransportListener));
@ -609,6 +612,55 @@ public class NettyClientTransportTest {
assertNull(transport.keepAliveManager());
}
@Test
public void keepAliveEnabled_shouldSetTcpUserTimeout() throws Exception {
assume().that(Utils.isEpollAvailable()).isTrue();
startServer();
EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
int keepAliveTimeMillis = 1234567;
try {
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */,
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis),
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup);
callMeMaybe(transport.start(clientTransportListener));
ChannelOption<Integer> tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption();
assertThat(tcpUserTimeoutOption).isNotNull();
// on some linux based system, the integer value may have error (usually +-1)
assertThat((double) transport.channel().config().getOption(tcpUserTimeoutOption))
.isWithin(5.0).of((double) keepAliveTimeMillis);
} finally {
epollGroup.shutdownGracefully();
}
}
@Test
public void keepAliveDisabled_shouldNotSetTcpUserTimeout() throws Exception {
assume().that(Utils.isEpollAvailable()).isTrue();
startServer();
EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
int keepAliveTimeMillis = 12345670;
try {
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */,
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis),
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup);
callMeMaybe(transport.start(clientTransportListener));
ChannelOption<Integer> tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption();
assertThat(tcpUserTimeoutOption).isNotNull();
// default TCP_USER_TIMEOUT=0 (use the system default)
assertThat(transport.channel().config().getOption(tcpUserTimeoutOption)).isEqualTo(0);
} finally {
epollGroup.shutdownGracefully();
}
}
private Throwable getRootCause(Throwable t) {
if (t.getCause() == null) {
return t;
@ -631,16 +683,16 @@ public class NettyClientTransportTest {
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
return newTransport(negotiator, maxMsgSize, maxHeaderListSize, userAgent, enableKeepAlive,
new ReflectiveChannelFactory<>(NioSocketChannel.class));
TimeUnit.SECONDS.toNanos(10L), new ReflectiveChannelFactory<>(NioSocketChannel.class),
group);
}
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive,
ChannelFactory<? extends Channel> channelFactory) {
long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
int maxHeaderListSize, String userAgent, boolean enableKeepAlive, long keepAliveTimeNano,
ChannelFactory<? extends Channel> channelFactory, EventLoopGroup group) {
long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L);
if (enableKeepAlive) {
keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L);
if (!enableKeepAlive) {
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,

View File

@ -212,4 +212,11 @@ public class UtilsTest {
assertThat(clientChannelType.getName())
.isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
}
@Test
public void maybeGetTcpUserTimeoutOption() {
assume().that(Utils.isEpollAvailable()).isTrue();
assertThat(Utils.maybeGetTcpUserTimeoutOption()).isNotNull();
}
}