From a48ebb1616014de707c1b78abe9f6f60eb7b3a37 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Mon, 15 Apr 2019 17:53:14 -0700 Subject: [PATCH] netty: change default transport to Epoll if available, otherwise using Nio (#5581) Motivation: To support TCP_USER_TIMEOUT(proposal). Nio doesn't support TCP_USER_TIMEOUT while Epoll and KQueue supports TCP_USER_TIME. Since most users/servers are using linux based system, adding Epoll is necessary. KQueue maybe supported later, but not in this PR. To make it backward compatible for cases where channelType and eventLoop is mixed in with default and user provided object(s), we will fallback to Nio (NioSocketChannel, NioEventLoop). This ensures not breaking existing code (same as existing behavior). Users not specified both channelType and EventLoops will be affect by this Epoll change if netty-epoll is available. In later version (possibly 1.22.0), the backward compatible behavior will be removed and it will start to throw exception since this is error prone. --- alts/build.gradle | 1 + netty/build.gradle | 3 +- netty/shaded/build.gradle | 3 +- .../io/grpc/netty/NettyChannelBuilder.java | 106 ++++++++++++---- .../main/java/io/grpc/netty/NettyServer.java | 38 ++---- .../io/grpc/netty/NettyServerBuilder.java | 103 +++++++++++++-- netty/src/main/java/io/grpc/netty/Utils.java | 120 +++++++++++++++++- .../grpc/netty/NettyChannelBuilderTest.java | 54 ++++++++ .../grpc/netty/NettyClientTransportTest.java | 3 +- .../io/grpc/netty/NettyServerBuilderTest.java | 45 +++++++ .../java/io/grpc/netty/NettyServerTest.java | 26 ++-- .../test/java/io/grpc/netty/UtilsTest.java | 42 +++++- 12 files changed, 458 insertions(+), 86 deletions(-) diff --git a/alts/build.gradle b/alts/build.gradle index 654d89a699..d71f58f9d8 100644 --- a/alts/build.gradle +++ b/alts/build.gradle @@ -42,6 +42,7 @@ dependencies { libraries.mockito, libraries.truth testRuntime libraries.netty_tcnative, + libraries.netty_epoll, libraries.conscrypt signature 'org.codehaus.mojo.signature:java17:1.0@signature' } diff --git a/netty/build.gradle b/netty/build.gradle index a8d45247c2..9099dee07e 100644 --- a/netty/build.gradle +++ b/netty/build.gradle @@ -9,7 +9,8 @@ dependencies { project(':grpc-testing'), project(':grpc-testing-proto') testRuntime libraries.netty_tcnative, - libraries.conscrypt + libraries.conscrypt, + libraries.netty_epoll signature "org.codehaus.mojo.signature:java17:1.0@signature" } diff --git a/netty/shaded/build.gradle b/netty/shaded/build.gradle index dc39aa13e2..95197bcc76 100644 --- a/netty/shaded/build.gradle +++ b/netty/shaded/build.gradle @@ -11,7 +11,8 @@ sourceSets { testShadow {} } dependencies { compile project(':grpc-netty') - runtime libraries.netty_tcnative + runtime libraries.netty_tcnative, + libraries.netty_epoll testShadowCompile files(shadowJar), configurations.shadow, project(':grpc-testing-proto'), diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 4d1eb4515d..a1cd1105d7 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -35,9 +35,11 @@ import io.grpc.internal.AbstractManagedChannelImplBuilder; import io.grpc.internal.AtomicBackoff; import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; -import io.grpc.internal.SharedResourceHolder; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; @@ -52,6 +54,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.net.ssl.SSLException; @@ -63,20 +67,24 @@ import javax.net.ssl.SSLException; @CanIgnoreReturnValue public final class NettyChannelBuilder extends AbstractManagedChannelImplBuilder { + private static final Logger logger = Logger.getLogger(NettyChannelBuilder.class.getName()); + public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); + private static final ChannelFactory DEFAULT_CHANNEL_FACTORY = + new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE); + private static final ObjectPool DEFAULT_EVENT_LOOP_GROUP_POOL = + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); + private final Map, Object> channelOptions = new HashMap<>(); private NegotiationType negotiationType = NegotiationType.TLS; private OverrideAuthorityChecker authorityChecker; - private ChannelFactory channelFactory = - new ReflectiveChannelFactory<>(NioSocketChannel.class); - - @Nullable - private EventLoopGroup eventLoopGroup; + private ChannelFactory channelFactory = DEFAULT_CHANNEL_FACTORY; + private ObjectPool eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; private SslContext sslContext; private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; @@ -140,10 +148,18 @@ public final class NettyChannelBuilder } /** - * Specifies the channel type to use, by default we use {@link NioSocketChannel}. + * Specifies the channel type to use, by default we use {@code EpollSocketChannel} if available, + * otherwise using {@link NioSocketChannel}. * *

You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your * {@link Channel} implementation has no no-args constructor. + * + *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory + * when the channel is built, the builder will use the default one which is static. + * + *

You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, + * {@link NioSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup}, otherwise + * your application won't start. */ public NettyChannelBuilder channelType(Class channelType) { checkNotNull(channelType, "channelType"); @@ -155,6 +171,13 @@ public final class NettyChannelBuilder * usually only used if the specific {@code Channel} requires complex logic which requires * additional information to create the {@code Channel}. Otherwise, recommend to use {@link * #channelType(Class)}. + * + *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory + * when the channel is built, the builder will use the default one which is static. + * + *

You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, + * {@link NioSocketChannel} based {@link ChannelFactory} must use {@link + * io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start. */ public NettyChannelBuilder channelFactory(ChannelFactory channelFactory) { this.channelFactory = checkNotNull(channelFactory, "channelFactory"); @@ -186,11 +209,19 @@ public final class NettyChannelBuilder *

It's an optional parameter. If the user has not provided an EventGroupLoop when the channel * is built, the builder will use the default one which is static. * + *

You must also provide corresponding {@link #channelType(Class)} or {@link + * #channelFactory(ChannelFactory)} corresponding to the given {@code EventLoopGroup}. For + * example, {@link io.netty.channel.nio.NioEventLoopGroup} requires {@link NioSocketChannel} + * *

The channel won't take ownership of the given EventLoopGroup. It's caller's responsibility * to shut it down when it's desired. */ public NettyChannelBuilder eventLoopGroup(@Nullable EventLoopGroup eventLoopGroup) { - this.eventLoopGroup = eventLoopGroup; + if (eventLoopGroup != null) { + this.eventLoopGroupPool = new FixedObjectPool<>(eventLoopGroup); + } else { + this.eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; + } return this; } @@ -406,13 +437,47 @@ public final class NettyChannelBuilder } negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext); } + + // TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0 + ObjectPool resolvedEventLoopGroupPool = eventLoopGroupPool; + ChannelFactory resolvedChannelFactory = channelFactory; + if (shouldFallBackToNio()) { + logger.log( + Level.WARNING, + "Both EventLoopGroup and ChannelType should be provided or neither should be, " + + "otherwise client may not start. Not provided values will use Nio " + + "(NioSocketChannel, NioEventLoopGroup) for compatibility. This will cause an " + + "Exception in the future."); + + if (eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL) { + resolvedEventLoopGroupPool = + SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP); + logger.log(Level.FINE, "Channel type or ChannelFactory is provided, but EventLoopGroup is " + + "missing. Fall back to NioEventLoopGroup."); + } + if (channelFactory == DEFAULT_CHANNEL_FACTORY) { + resolvedChannelFactory = new ReflectiveChannelFactory<>(NioSocketChannel.class); + logger.log( + Level.FINE, "EventLoopGroup is provided, but Channel type or ChannelFactory is missing." + + " Fall back to NioSocketChannel."); + } + } + return new NettyTransportFactory( - negotiator, channelFactory, channelOptions, - eventLoopGroup, flowControlWindow, maxInboundMessageSize(), + negotiator, resolvedChannelFactory, channelOptions, + resolvedEventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory.create(), localSocketPicker); } + @VisibleForTesting + boolean shouldFallBackToNio() { + return (channelFactory != DEFAULT_CHANNEL_FACTORY + && eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL) + || (channelFactory == DEFAULT_CHANNEL_FACTORY + && eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL); + } + @Override @CheckReturnValue protected int getDefaultPort() { @@ -510,8 +575,8 @@ public final class NettyChannelBuilder private final ProtocolNegotiator protocolNegotiator; private final ChannelFactory channelFactory; private final Map, ?> channelOptions; + private final ObjectPool groupPool; private final EventLoopGroup group; - private final boolean usingSharedGroup; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -524,13 +589,16 @@ public final class NettyChannelBuilder private boolean closed; NettyTransportFactory(ProtocolNegotiator protocolNegotiator, - ChannelFactory channelFactory, Map, ?> channelOptions, - EventLoopGroup group, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, + ChannelFactory channelFactory, + Map, ?> channelOptions, ObjectPool groupPool, + int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer transportTracer, LocalSocketPicker localSocketPicker) { this.protocolNegotiator = protocolNegotiator; this.channelFactory = channelFactory; this.channelOptions = new HashMap, Object>(channelOptions); + this.groupPool = groupPool; + this.group = groupPool.getObject(); this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -540,14 +608,6 @@ public final class NettyChannelBuilder this.transportTracer = transportTracer; this.localSocketPicker = localSocketPicker != null ? localSocketPicker : new LocalSocketPicker(); - - usingSharedGroup = group == null; - if (usingSharedGroup) { - // The group was unspecified, using the shared group. - this.group = SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); - } else { - this.group = group; - } } @Override @@ -598,9 +658,7 @@ public final class NettyChannelBuilder closed = true; protocolNegotiator.close(); - if (usingSharedGroup) { - SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, group); - } + groupPool.returnObject(group); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index fac8f81a3c..4cedf2edab 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -32,9 +32,9 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.ServerStreamTracer; import io.grpc.internal.InternalServer; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TransportTracer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -58,7 +58,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Netty-based server implementation. @@ -72,8 +71,8 @@ class NettyServer implements InternalServer, InternalWithLogId { private final Map, ?> channelOptions; private final ProtocolNegotiator protocolNegotiator; private final int maxStreamsPerConnection; - private final boolean usingSharedBossGroup; - private final boolean usingSharedWorkerGroup; + private final ObjectPool bossGroupPool; + private final ObjectPool workerGroupPool; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerListener listener; @@ -99,7 +98,8 @@ class NettyServer implements InternalServer, InternalWithLogId { NettyServer( SocketAddress address, Class channelType, Map, ?> channelOptions, - @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup, + ObjectPool bossGroupPool, + ObjectPool workerGroupPool, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -113,12 +113,12 @@ class NettyServer implements InternalServer, InternalWithLogId { this.channelType = checkNotNull(channelType, "channelType"); checkNotNull(channelOptions, "channelOptions"); this.channelOptions = new HashMap, Object>(channelOptions); - this.bossGroup = bossGroup; - this.workerGroup = workerGroup; + this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool"); + this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool"); + this.bossGroup = bossGroupPool.getObject(); + this.workerGroup = workerGroupPool.getObject(); this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); - this.usingSharedBossGroup = bossGroup == null; - this.usingSharedWorkerGroup = workerGroup == null; this.transportTracerFactory = transportTracerFactory; this.maxStreamsPerConnection = maxStreamsPerConnection; this.flowControlWindow = flowControlWindow; @@ -154,9 +154,6 @@ class NettyServer implements InternalServer, InternalWithLogId { public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); - // If using the shared groups, get references to them. - allocateSharedGroups(); - ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(channelType); @@ -290,15 +287,6 @@ class NettyServer implements InternalServer, InternalWithLogId { }); } - private void allocateSharedGroups() { - if (bossGroup == null) { - bossGroup = SharedResourceHolder.get(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP); - } - if (workerGroup == null) { - workerGroup = SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); - } - } - @Override public InternalLogId getLogId() { return logId; @@ -316,14 +304,14 @@ class NettyServer implements InternalServer, InternalWithLogId { @Override protected void deallocate() { try { - if (usingSharedBossGroup && bossGroup != null) { - SharedResourceHolder.release(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP, bossGroup); + if (bossGroup != null) { + bossGroupPool.returnObject(bossGroup); } } finally { bossGroup = null; try { - if (usingSharedWorkerGroup && workerGroup != null) { - SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, workerGroup); + if (workerGroup != null) { + workerGroupPool.returnObject(workerGroup); } } finally { workerGroup = null; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 6a694ad132..b946622be4 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -23,14 +23,18 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.ExperimentalApi; import io.grpc.Internal; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -46,6 +50,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.net.ssl.SSLException; @@ -56,6 +62,8 @@ import javax.net.ssl.SSLException; @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784") @CanIgnoreReturnValue public final class NettyServerBuilder extends AbstractServerImplBuilder { + private static final Logger logger = Logger.getLogger(NettyServerBuilder.class.getName()); + public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE; @@ -67,14 +75,18 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL = + SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP); + private static final ObjectPool DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL = + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); private final List listenAddresses = new ArrayList<>(); - private Class channelType = NioServerSocketChannel.class; + private Class channelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE; private final Map, Object> channelOptions = new HashMap<>(); - @Nullable - private EventLoopGroup bossEventLoopGroup; - @Nullable - private EventLoopGroup workerEventLoopGroup; + private ObjectPool bossEventLoopGroupPool = + DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL; + private ObjectPool workerEventLoopGroupPool = + DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; private SslContext sslContext; private ProtocolNegotiator protocolNegotiator; private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE; @@ -132,7 +144,13 @@ public final class NettyServerBuilder extends AbstractServerImplBuilderYou must also provide corresponding {@link EventLoopGroup} using {@link + * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For + * example, {@link NioServerSocketChannel} must use {@link + * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start. */ public NettyServerBuilder channelType(Class channelType) { this.channelType = Preconditions.checkNotNull(channelType, "channelType"); @@ -156,6 +174,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilderIt's an optional parameter. If the user has not provided one when the server is built, the * builder will use the default one which is static. * + *

You must also provide corresponding {@link io.netty.channel.Channel} type using {@link + * #channelType(Class)} and {@link #workerEventLoopGroup(EventLoopGroup)}. For example, {@link + * NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss + * and worker {@link EventLoopGroup}, otherwise your server won't start. + * *

The server won't take ownership of the given EventLoopGroup. It's caller's responsibility * to shut it down when it's desired. * @@ -169,7 +192,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder(group); + } else { + this.bossEventLoopGroupPool = DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL; + } return this; } @@ -179,6 +206,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilderIt's an optional parameter. If the user has not provided one when the server is built, the * builder will create one. * + *

You must also provide corresponding {@link io.netty.channel.Channel} type using {@link + * #channelType(Class)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For example, {@link + * NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss + * and worker {@link EventLoopGroup}, otherwise your server won't start. + * *

The server won't take ownership of the given EventLoopGroup. It's caller's responsibility * to shut it down when it's desired. * @@ -192,7 +224,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder(group); + } else { + this.workerEventLoopGroupPool = DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; + } return this; } @@ -456,21 +492,62 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder resolvedChannelType = channelType; + ObjectPool resolvedBossGroupPool = bossEventLoopGroupPool; + ObjectPool resolvedWorkerGroupPool = workerEventLoopGroupPool; + + if (shouldFallBackToNio()) { + // TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0 + // Use NIO based channel type and eventloop group for backward compatibility reason + logger.log( + Level.WARNING, + "All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided or " + + "neither should be, otherwise server may not start. Missing values will use Nio " + + "(NioServerSocketChannel, NioEventLoopGroup) for backward compatibility. " + + "This will cause an Exception in the future."); + if (channelType == Utils.DEFAULT_SERVER_CHANNEL_TYPE) { + resolvedChannelType = NioServerSocketChannel.class; + logger.log(Level.FINE, "One or more EventLoopGroup is provided, but Channel type is " + + "missing. Fall back to NioServerSocketChannel."); + } + if (bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL) { + resolvedBossGroupPool = SharedResourcePool.forResource(Utils.NIO_BOSS_EVENT_LOOP_GROUP); + logger.log(Level.FINE, "Channel type and/or WorkerEventLoopGroup is provided, but " + + "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup."); + } + if (workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL) { + resolvedWorkerGroupPool = SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP); + logger.log(Level.FINE, "Channel type and/or BossEventLoopGroup is provided, but " + + "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup."); + } + } + List transportServers = new ArrayList<>(listenAddresses.size()); for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( - listenAddress, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup, - negotiator, streamTracerFactories, getTransportTracerFactory(), - maxConcurrentCallsPerConnection, flowControlWindow, + listenAddress, resolvedChannelType, channelOptions, resolvedBossGroupPool, + resolvedWorkerGroupPool, negotiator, streamTracerFactories, + getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, - maxConnectionIdleInNanos, - maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, + maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz()); transportServers.add(transportServer); } return Collections.unmodifiableList(transportServers); } + @VisibleForTesting + boolean shouldFallBackToNio() { + boolean hasNonDefault = channelType != Utils.DEFAULT_SERVER_CHANNEL_TYPE + || bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL + || workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; + boolean hasDefault = channelType == Utils.DEFAULT_SERVER_CHANNEL_TYPE + || bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL + || workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL; + return hasNonDefault && hasDefault; + } + @Override public NettyServerBuilder useTransportSecurity(File certChain, File privateKey) { try { diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index f0e59e108e..db52284f87 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -23,6 +23,7 @@ import static io.netty.channel.ChannelOption.SO_LINGER; import static io.netty.channel.ChannelOption.SO_TIMEOUT; import static io.netty.util.CharsetUtil.UTF_8; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.InternalChannelz; import io.grpc.InternalMetadata; @@ -36,7 +37,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Headers; import io.netty.util.AsciiString; @@ -47,12 +51,15 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.CheckReturnValue; /** * Common utility methods. */ class Utils { + private static final Logger logger = Logger.getLogger(Utils.class.getName()); public static final AsciiString STATUS_OK = AsciiString.of("200"); public static final AsciiString HTTP_METHOD = AsciiString.of(GrpcUtil.HTTP_METHOD); @@ -65,11 +72,34 @@ class Utils { public static final AsciiString TE_TRAILERS = AsciiString.of(GrpcUtil.TE_TRAILERS); public static final AsciiString USER_AGENT = AsciiString.of(GrpcUtil.USER_AGENT_KEY.name()); - public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP = - new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG"); + public static final Resource NIO_BOSS_EVENT_LOOP_GROUP + = new DefaultEventLoopGroupResource(1, "grpc-nio-boss-ELG", NioEventLoopGroup.class); + public static final Resource NIO_WORKER_EVENT_LOOP_GROUP + = new DefaultEventLoopGroupResource(0, "grpc-nio-worker-ELG", NioEventLoopGroup.class); + public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP; + public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; - public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP = - new DefaultEventLoopGroupResource(0, "grpc-default-worker-ELG"); + public static final Class DEFAULT_SERVER_CHANNEL_TYPE; + public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; + + static { + // Decide default channel types and EventLoopGroup based on Epoll availability + if (isEpollAvailable()) { + DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType(); + DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType(); + Class eventLoopGroupType = epollEventLoopGroupType(); + DEFAULT_BOSS_EVENT_LOOP_GROUP + = new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", eventLoopGroupType); + DEFAULT_WORKER_EVENT_LOOP_GROUP + = new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", eventLoopGroupType); + } else { + logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause()); + DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class; + DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class; + DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP; + DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP; + } + } public static Metadata convertHeaders(Http2Headers http2Headers) { if (http2Headers instanceof GrpcHttp2InboundHeaders) { @@ -176,20 +206,98 @@ class Utils { return s; } + @VisibleForTesting + static boolean isEpollAvailable() { + try { + return (boolean) (Boolean) + Class + .forName("io.netty.channel.epoll.Epoll") + .getDeclaredMethod("isAvailable") + .invoke(null); + } catch (ClassNotFoundException e) { + // this is normal if netty-epoll runtime dependency doesn't exist. + return false; + } catch (Exception e) { + throw new RuntimeException("Exception while checking Epoll availability", e); + } + } + + private static Throwable getEpollUnavailabilityCause() { + try { + return (Throwable) + Class + .forName("io.netty.channel.epoll.Epoll") + .getDeclaredMethod("unavailabilityCause") + .invoke(null); + } catch (Exception e) { + return e; + } + } + + // Must call when epoll is available + private static Class epollChannelType() { + try { + Class channelType = Class + .forName("io.netty.channel.epoll.EpollSocketChannel").asSubclass(Channel.class); + return channelType; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot load EpollSocketChannel", e); + } + } + + // Must call when epoll is available + private static Class epollEventLoopGroupType() { + try { + return Class + .forName("io.netty.channel.epoll.EpollEventLoopGroup").asSubclass(EventLoopGroup.class); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot load EpollEventLoopGroup", e); + } + } + + // Must call when epoll is available + private static Class epollServerChannelType() { + try { + Class serverSocketChannel = + Class + .forName("io.netty.channel.epoll.EpollServerSocketChannel") + .asSubclass(ServerChannel.class); + return serverSocketChannel; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot load EpollServerSocketChannel", e); + } + } + + private static EventLoopGroup createEventLoopGroup( + Class eventLoopGroupType, + int parallelism, + ThreadFactory threadFactory) { + try { + return eventLoopGroupType + .getConstructor(Integer.TYPE, ThreadFactory.class) + .newInstance(parallelism, threadFactory); + } catch (Exception e) { + throw new RuntimeException("Cannot create EventLoopGroup for " + eventLoopGroupType, e); + } + } + private static final class DefaultEventLoopGroupResource implements Resource { private final String name; private final int numEventLoops; + private final Class eventLoopGroupType; - DefaultEventLoopGroupResource(int numEventLoops, String name) { + DefaultEventLoopGroupResource( + int numEventLoops, String name, Class eventLoopGroupType) { this.name = name; this.numEventLoops = numEventLoops; + this.eventLoopGroupType = eventLoopGroupType; } @Override public EventLoopGroup create() { // Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal. ThreadFactory threadFactory = new DefaultThreadFactory(name, /* daemon= */ true); - return new NioEventLoopGroup(numEventLoops, threadFactory); + return createEventLoopGroup(eventLoopGroupType, numEventLoops, threadFactory); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java index 5ac6b4742d..b5d406f2dc 100644 --- a/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyChannelBuilderTest.java @@ -17,11 +17,16 @@ package io.grpc.netty; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import io.grpc.ManagedChannel; import io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalChannel; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -194,4 +199,53 @@ public class NettyChannelBuilderTest { thrown.expectMessage("keepalive timeout must be positive"); builder.keepAliveTimeout(-1L, TimeUnit.HOURS); } + + @Test + public void shouldFallBackToNio_onlyGroupProvided() { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget"); + + builder.eventLoopGroup(mock(EventLoopGroup.class)); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_onlyTypeProvided() { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget"); + + builder.channelType(LocalChannel.class); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_onlyFactoryProvided() { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget"); + + builder.channelFactory(new ChannelFactory() { + @Override + public Channel newChannel() { + return null; + } + }); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_usingDefault() { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget"); + + assertFalse(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_bothProvided() { + NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget"); + + builder.eventLoopGroup(mock(EventLoopGroup.class)); + builder.channelType(LocalChannel.class); + + assertFalse(builder.shouldFallBackToNio()); + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 6b51256458..f0a557ab9f 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -53,6 +53,7 @@ import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.FakeClock; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ServerListener; @@ -660,7 +661,7 @@ public class NettyClientTransportTest { TestUtils.testServerAddress(new InetSocketAddress(0)), NioServerSocketChannel.class, new HashMap, Object>(), - group, group, negotiator, + new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java index d5e906a0a8..0d74eb862d 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java @@ -16,12 +16,16 @@ package io.grpc.netty; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.truth.Truth; import io.grpc.ServerStreamTracer.Factory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.local.LocalServerChannel; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; import java.util.List; @@ -128,4 +132,45 @@ public class NettyServerBuilderTest { builder.permitKeepAliveTime(-1, TimeUnit.HOURS); } + + @Test + public void shouldFallBackToNio_onlyBossGroupProvided() { + EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class); + + builder.bossEventLoopGroup(mockEventLoopGroup); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_onlyWorkerGroupProvided() { + EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class); + + builder.workerEventLoopGroup(mockEventLoopGroup); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_onlyTypeProvided() { + builder.channelType(LocalServerChannel.class); + + assertTrue(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_usingDefault() { + assertFalse(builder.shouldFallBackToNio()); + } + + @Test + public void shouldFallBackToNio_allProvided() { + EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class); + + builder.bossEventLoopGroup(mockEventLoopGroup); + builder.workerEventLoopGroup(mockEventLoopGroup); + builder.channelType(LocalServerChannel.class); + + assertFalse(builder.shouldFallBackToNio()); + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index e1ea382f9f..b4fa966c49 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -33,11 +33,11 @@ import io.grpc.internal.ServerListener; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; @@ -58,10 +58,10 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - NioServerSocketChannel.class, + Utils.DEFAULT_SERVER_CHANNEL_TYPE, new HashMap, Object>(), - null, // no boss group - null, // no event group + SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -96,10 +96,10 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - NioServerSocketChannel.class, + Utils.DEFAULT_SERVER_CHANNEL_TYPE, new HashMap, Object>(), - null, // no boss group - null, // no event group + SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -134,10 +134,10 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - NioServerSocketChannel.class, + Utils.DEFAULT_SERVER_CHANNEL_TYPE, channelOptions, - null, // no boss group - null, // no event group + SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -184,10 +184,10 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - NioServerSocketChannel.class, + Utils.DEFAULT_SERVER_CHANNEL_TYPE, new HashMap, Object>(), - null, // no boss group - null, // no event group + SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java index 9575d054ee..d9dc4dd2a0 100644 --- a/netty/src/test/java/io/grpc/netty/UtilsTest.java +++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java @@ -16,12 +16,13 @@ package io.grpc.netty; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.TruthJUnit.assume; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import com.google.common.base.MoreObjects; -import com.google.common.truth.Truth; import io.grpc.InternalChannelz; import io.grpc.InternalChannelz.SocketOptions; import io.grpc.Metadata; @@ -30,6 +31,7 @@ import io.grpc.internal.GrpcUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.EventLoopGroup; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -170,8 +172,44 @@ public class UtilsTest { private static void assertStatusEquals(Status expected, Status actual) { assertEquals(expected.getCode(), actual.getCode()); - Truth.assertThat(MoreObjects.firstNonNull(actual.getDescription(), "")) + assertThat(MoreObjects.firstNonNull(actual.getDescription(), "")) .contains(MoreObjects.firstNonNull(expected.getDescription(), "")); assertEquals(expected.getCause(), actual.getCause()); } + + @Test + public void defaultEventLoopGroup_whenEpollIsAvailable() { + assume().that(Utils.isEpollAvailable()).isTrue(); + + EventLoopGroup defaultBossGroup = Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP.create(); + EventLoopGroup defaultWorkerGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create(); + + assertThat(defaultBossGroup.getClass().getName()) + .isEqualTo("io.netty.channel.epoll.EpollEventLoopGroup"); + assertThat(defaultWorkerGroup.getClass().getName()) + .isEqualTo("io.netty.channel.epoll.EpollEventLoopGroup"); + + defaultBossGroup.shutdownGracefully(); + defaultWorkerGroup.shutdownGracefully(); + } + + @Test + public void defaultClientChannelType_whenEpollIsAvailable() { + assume().that(Utils.isEpollAvailable()).isTrue(); + + Class clientChannelType = Utils.DEFAULT_CLIENT_CHANNEL_TYPE; + + assertThat(clientChannelType.getName()) + .isEqualTo("io.netty.channel.epoll.EpollSocketChannel"); + } + + @Test + public void defaultServerChannelType_whenEpollIsAvailable() { + assume().that(Utils.isEpollAvailable()).isTrue(); + + Class clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE; + + assertThat(clientChannelType.getName()) + .isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel"); + } }