netty: change default transport to Epoll if available, otherwise using Nio (#5581)

Motivation:
To support TCP_USER_TIMEOUT(proposal). Nio doesn't support TCP_USER_TIMEOUT while Epoll and KQueue supports TCP_USER_TIME. Since most users/servers are using linux based system, adding Epoll is necessary. KQueue maybe supported later, but not in this PR.

To make it backward compatible for cases where channelType and eventLoop is mixed in with default and user provided object(s), we will fallback to Nio (NioSocketChannel, NioEventLoop). This ensures not breaking existing code (same as existing behavior). Users not specified both channelType and EventLoops will be affect by this Epoll change if netty-epoll is available.
In later version (possibly 1.22.0), the backward compatible behavior will be removed and it will start to throw exception since this is error prone.
This commit is contained in:
Jihun Cho 2019-04-15 17:53:14 -07:00 committed by GitHub
parent 71f32bb700
commit a48ebb1616
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 458 additions and 86 deletions

View File

@ -42,6 +42,7 @@ dependencies {
libraries.mockito,
libraries.truth
testRuntime libraries.netty_tcnative,
libraries.netty_epoll,
libraries.conscrypt
signature 'org.codehaus.mojo.signature:java17:1.0@signature'
}

View File

@ -9,7 +9,8 @@ dependencies {
project(':grpc-testing'),
project(':grpc-testing-proto')
testRuntime libraries.netty_tcnative,
libraries.conscrypt
libraries.conscrypt,
libraries.netty_epoll
signature "org.codehaus.mojo.signature:java17:1.0@signature"
}

View File

@ -11,7 +11,8 @@ sourceSets { testShadow {} }
dependencies {
compile project(':grpc-netty')
runtime libraries.netty_tcnative
runtime libraries.netty_tcnative,
libraries.netty_epoll
testShadowCompile files(shadowJar),
configurations.shadow,
project(':grpc-testing-proto'),

View File

@ -35,9 +35,11 @@ import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AtomicBackoff;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
@ -52,6 +54,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -63,20 +67,24 @@ import javax.net.ssl.SSLException;
@CanIgnoreReturnValue
public final class NettyChannelBuilder
extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
private static final Logger logger = Logger.getLogger(NettyChannelBuilder.class.getName());
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ChannelFactory<? extends Channel> DEFAULT_CHANNEL_FACTORY =
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
private NegotiationType negotiationType = NegotiationType.TLS;
private OverrideAuthorityChecker authorityChecker;
private ChannelFactory<? extends Channel> channelFactory =
new ReflectiveChannelFactory<>(NioSocketChannel.class);
@Nullable
private EventLoopGroup eventLoopGroup;
private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY;
private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL;
private SslContext sslContext;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
@ -140,10 +148,18 @@ public final class NettyChannelBuilder
}
/**
* Specifies the channel type to use, by default we use {@link NioSocketChannel}.
* Specifies the channel type to use, by default we use {@code EpollSocketChannel} if available,
* otherwise using {@link NioSocketChannel}.
*
* <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
* when the channel is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example,
* {@link NioSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup}, otherwise
* your application won't start.
*/
public NettyChannelBuilder channelType(Class<? extends Channel> channelType) {
checkNotNull(channelType, "channelType");
@ -155,6 +171,13 @@ public final class NettyChannelBuilder
* usually only used if the specific {@code Channel} requires complex logic which requires
* additional information to create the {@code Channel}. Otherwise, recommend to use {@link
* #channelType(Class)}.
*
* <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
* when the channel is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example,
* {@link NioSocketChannel} based {@link ChannelFactory} must use {@link
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start.
*/
public NettyChannelBuilder channelFactory(ChannelFactory<? extends Channel> channelFactory) {
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
@ -186,11 +209,19 @@ public final class NettyChannelBuilder
* <p>It's an optional parameter. If the user has not provided an EventGroupLoop when the channel
* is built, the builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link #channelType(Class)} or {@link
* #channelFactory(ChannelFactory)} corresponding to the given {@code EventLoopGroup}. For
* example, {@link io.netty.channel.nio.NioEventLoopGroup} requires {@link NioSocketChannel}
*
* <p>The channel won't take ownership of the given EventLoopGroup. It's caller's responsibility
* to shut it down when it's desired.
*/
public NettyChannelBuilder eventLoopGroup(@Nullable EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
if (eventLoopGroup != null) {
this.eventLoopGroupPool = new FixedObjectPool<>(eventLoopGroup);
} else {
this.eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL;
}
return this;
}
@ -406,13 +437,47 @@ public final class NettyChannelBuilder
}
negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext);
}
// TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0
ObjectPool<? extends EventLoopGroup> resolvedEventLoopGroupPool = eventLoopGroupPool;
ChannelFactory<? extends Channel> resolvedChannelFactory = channelFactory;
if (shouldFallBackToNio()) {
logger.log(
Level.WARNING,
"Both EventLoopGroup and ChannelType should be provided or neither should be, "
+ "otherwise client may not start. Not provided values will use Nio "
+ "(NioSocketChannel, NioEventLoopGroup) for compatibility. This will cause an "
+ "Exception in the future.");
if (eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL) {
resolvedEventLoopGroupPool =
SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type or ChannelFactory is provided, but EventLoopGroup is "
+ "missing. Fall back to NioEventLoopGroup.");
}
if (channelFactory == DEFAULT_CHANNEL_FACTORY) {
resolvedChannelFactory = new ReflectiveChannelFactory<>(NioSocketChannel.class);
logger.log(
Level.FINE, "EventLoopGroup is provided, but Channel type or ChannelFactory is missing."
+ " Fall back to NioSocketChannel.");
}
}
return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
negotiator, resolvedChannelFactory, channelOptions,
resolvedEventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory.create(), localSocketPicker);
}
@VisibleForTesting
boolean shouldFallBackToNio() {
return (channelFactory != DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL)
|| (channelFactory == DEFAULT_CHANNEL_FACTORY
&& eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL);
}
@Override
@CheckReturnValue
protected int getDefaultPort() {
@ -510,8 +575,8 @@ public final class NettyChannelBuilder
private final ProtocolNegotiator protocolNegotiator;
private final ChannelFactory<? extends Channel> channelFactory;
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
private final boolean usingSharedGroup;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
@ -524,13 +589,16 @@ public final class NettyChannelBuilder
private boolean closed;
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory, Map<ChannelOption<?>, ?> channelOptions,
EventLoopGroup group, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer transportTracer, LocalSocketPicker localSocketPicker) {
this.protocolNegotiator = protocolNegotiator;
this.channelFactory = channelFactory;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
@ -540,14 +608,6 @@ public final class NettyChannelBuilder
this.transportTracer = transportTracer;
this.localSocketPicker =
localSocketPicker != null ? localSocketPicker : new LocalSocketPicker();
usingSharedGroup = group == null;
if (usingSharedGroup) {
// The group was unspecified, using the shared group.
this.group = SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
} else {
this.group = group;
}
}
@Override
@ -598,9 +658,7 @@ public final class NettyChannelBuilder
closed = true;
protocolNegotiator.close();
if (usingSharedGroup) {
SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, group);
}
groupPool.returnObject(group);
}
}
}

View File

@ -32,9 +32,9 @@ import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@ -58,7 +58,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Netty-based server implementation.
@ -72,8 +71,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final boolean usingSharedBossGroup;
private final boolean usingSharedWorkerGroup;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerListener listener;
@ -99,7 +98,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
@ -113,12 +113,12 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null;
this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection;
this.flowControlWindow = flowControlWindow;
@ -154,9 +154,6 @@ class NettyServer implements InternalServer, InternalWithLogId {
public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");
// If using the shared groups, get references to them.
allocateSharedGroups();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(channelType);
@ -290,15 +287,6 @@ class NettyServer implements InternalServer, InternalWithLogId {
});
}
private void allocateSharedGroups() {
if (bossGroup == null) {
bossGroup = SharedResourceHolder.get(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
}
if (workerGroup == null) {
workerGroup = SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
}
}
@Override
public InternalLogId getLogId() {
return logId;
@ -316,14 +304,14 @@ class NettyServer implements InternalServer, InternalWithLogId {
@Override
protected void deallocate() {
try {
if (usingSharedBossGroup && bossGroup != null) {
SharedResourceHolder.release(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP, bossGroup);
if (bossGroup != null) {
bossGroupPool.returnObject(bossGroup);
}
} finally {
bossGroup = null;
try {
if (usingSharedWorkerGroup && workerGroup != null) {
SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, workerGroup);
if (workerGroup != null) {
workerGroupPool.returnObject(workerGroup);
}
} finally {
workerGroup = null;

View File

@ -23,14 +23,18 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@ -46,6 +50,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -56,6 +62,8 @@ import javax.net.ssl.SSLException;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
@CanIgnoreReturnValue
public final class NettyServerBuilder extends AbstractServerImplBuilder<NettyServerBuilder> {
private static final Logger logger = Logger.getLogger(NettyServerBuilder.class.getName());
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
@ -67,14 +75,18 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private final List<SocketAddress> listenAddresses = new ArrayList<>();
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
private Class<? extends ServerChannel> channelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
@Nullable
private EventLoopGroup bossEventLoopGroup;
@Nullable
private EventLoopGroup workerEventLoopGroup;
private ObjectPool<? extends EventLoopGroup> bossEventLoopGroupPool =
DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
private ObjectPool<? extends EventLoopGroup> workerEventLoopGroupPool =
DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
private SslContext sslContext;
private ProtocolNegotiator protocolNegotiator;
private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
@ -132,7 +144,13 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
/**
* Specify the channel type to use, by default we use {@link NioServerSocketChannel}.
* Specify the channel type to use, by default we use {@link NioServerSocketChannel} or {@code
* EpollServerSocketChannel}.
*
* <p>You must also provide corresponding {@link EventLoopGroup} using {@link
* #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For
* example, {@link NioServerSocketChannel} must use {@link
* io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start.
*/
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
@ -156,6 +174,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
* <p>It's an optional parameter. If the user has not provided one when the server is built, the
* builder will use the default one which is static.
*
* <p>You must also provide corresponding {@link io.netty.channel.Channel} type using {@link
* #channelType(Class)} and {@link #workerEventLoopGroup(EventLoopGroup)}. For example, {@link
* NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss
* and worker {@link EventLoopGroup}, otherwise your server won't start.
*
* <p>The server won't take ownership of the given EventLoopGroup. It's caller's responsibility
* to shut it down when it's desired.
*
@ -169,7 +192,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
* keep the main thread alive until the server has terminated.
*/
public NettyServerBuilder bossEventLoopGroup(EventLoopGroup group) {
this.bossEventLoopGroup = group;
if (group != null) {
this.bossEventLoopGroupPool = new FixedObjectPool<>(group);
} else {
this.bossEventLoopGroupPool = DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL;
}
return this;
}
@ -179,6 +206,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
* <p>It's an optional parameter. If the user has not provided one when the server is built, the
* builder will create one.
*
* <p>You must also provide corresponding {@link io.netty.channel.Channel} type using {@link
* #channelType(Class)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For example, {@link
* NioServerSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup} for both boss
* and worker {@link EventLoopGroup}, otherwise your server won't start.
*
* <p>The server won't take ownership of the given EventLoopGroup. It's caller's responsibility
* to shut it down when it's desired.
*
@ -192,7 +224,11 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
* keep the main thread alive until the server has terminated.
*/
public NettyServerBuilder workerEventLoopGroup(EventLoopGroup group) {
this.workerEventLoopGroup = group;
if (group != null) {
this.workerEventLoopGroupPool = new FixedObjectPool<>(group);
} else {
this.workerEventLoopGroupPool = DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
}
return this;
}
@ -456,21 +492,62 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
ProtocolNegotiators.serverPlaintext();
}
Class<? extends ServerChannel> resolvedChannelType = channelType;
ObjectPool<? extends EventLoopGroup> resolvedBossGroupPool = bossEventLoopGroupPool;
ObjectPool<? extends EventLoopGroup> resolvedWorkerGroupPool = workerEventLoopGroupPool;
if (shouldFallBackToNio()) {
// TODO(jihuncho) throw exception if not groupOrChannelProvided after 1.22.0
// Use NIO based channel type and eventloop group for backward compatibility reason
logger.log(
Level.WARNING,
"All of BossEventLoopGroup, WorkerEventLoopGroup and ChannelType should be provided or "
+ "neither should be, otherwise server may not start. Missing values will use Nio "
+ "(NioServerSocketChannel, NioEventLoopGroup) for backward compatibility. "
+ "This will cause an Exception in the future.");
if (channelType == Utils.DEFAULT_SERVER_CHANNEL_TYPE) {
resolvedChannelType = NioServerSocketChannel.class;
logger.log(Level.FINE, "One or more EventLoopGroup is provided, but Channel type is "
+ "missing. Fall back to NioServerSocketChannel.");
}
if (bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL) {
resolvedBossGroupPool = SharedResourcePool.forResource(Utils.NIO_BOSS_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type and/or WorkerEventLoopGroup is provided, but "
+ "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup.");
}
if (workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL) {
resolvedWorkerGroupPool = SharedResourcePool.forResource(Utils.NIO_WORKER_EVENT_LOOP_GROUP);
logger.log(Level.FINE, "Channel type and/or BossEventLoopGroup is provided, but "
+ "BossEventLoopGroup is missing. Fall back to NioEventLoopGroup.");
}
}
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, getTransportTracerFactory(),
maxConcurrentCallsPerConnection, flowControlWindow,
listenAddress, resolvedChannelType, channelOptions, resolvedBossGroupPool,
resolvedWorkerGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
transportServers.add(transportServer);
}
return Collections.unmodifiableList(transportServers);
}
@VisibleForTesting
boolean shouldFallBackToNio() {
boolean hasNonDefault = channelType != Utils.DEFAULT_SERVER_CHANNEL_TYPE
|| bossEventLoopGroupPool != DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
|| workerEventLoopGroupPool != DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
boolean hasDefault = channelType == Utils.DEFAULT_SERVER_CHANNEL_TYPE
|| bossEventLoopGroupPool == DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL
|| workerEventLoopGroupPool == DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL;
return hasNonDefault && hasDefault;
}
@Override
public NettyServerBuilder useTransportSecurity(File certChain, File privateKey) {
try {

View File

@ -23,6 +23,7 @@ import static io.netty.channel.ChannelOption.SO_LINGER;
import static io.netty.channel.ChannelOption.SO_TIMEOUT;
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.InternalChannelz;
import io.grpc.InternalMetadata;
@ -36,7 +37,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
@ -47,12 +51,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
/**
* Common utility methods.
*/
class Utils {
private static final Logger logger = Logger.getLogger(Utils.class.getName());
public static final AsciiString STATUS_OK = AsciiString.of("200");
public static final AsciiString HTTP_METHOD = AsciiString.of(GrpcUtil.HTTP_METHOD);
@ -65,11 +72,34 @@ class Utils {
public static final AsciiString TE_TRAILERS = AsciiString.of(GrpcUtil.TE_TRAILERS);
public static final AsciiString USER_AGENT = AsciiString.of(GrpcUtil.USER_AGENT_KEY.name());
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP =
new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG");
public static final Resource<EventLoopGroup> NIO_BOSS_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(1, "grpc-nio-boss-ELG", NioEventLoopGroup.class);
public static final Resource<EventLoopGroup> NIO_WORKER_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(0, "grpc-nio-worker-ELG", NioEventLoopGroup.class);
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP =
new DefaultEventLoopGroupResource(0, "grpc-default-worker-ELG");
public static final Class<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_TYPE;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
static {
// Decide default channel types and EventLoopGroup based on Epoll availability
if (isEpollAvailable()) {
DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType();
DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType();
Class<? extends EventLoopGroup> eventLoopGroupType = epollEventLoopGroupType();
DEFAULT_BOSS_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", eventLoopGroupType);
DEFAULT_WORKER_EVENT_LOOP_GROUP
= new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", eventLoopGroupType);
} else {
logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause());
DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class;
DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class;
DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP;
DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP;
}
}
public static Metadata convertHeaders(Http2Headers http2Headers) {
if (http2Headers instanceof GrpcHttp2InboundHeaders) {
@ -176,20 +206,98 @@ class Utils {
return s;
}
@VisibleForTesting
static boolean isEpollAvailable() {
try {
return (boolean) (Boolean)
Class
.forName("io.netty.channel.epoll.Epoll")
.getDeclaredMethod("isAvailable")
.invoke(null);
} catch (ClassNotFoundException e) {
// this is normal if netty-epoll runtime dependency doesn't exist.
return false;
} catch (Exception e) {
throw new RuntimeException("Exception while checking Epoll availability", e);
}
}
private static Throwable getEpollUnavailabilityCause() {
try {
return (Throwable)
Class
.forName("io.netty.channel.epoll.Epoll")
.getDeclaredMethod("unavailabilityCause")
.invoke(null);
} catch (Exception e) {
return e;
}
}
// Must call when epoll is available
private static Class<? extends Channel> epollChannelType() {
try {
Class<? extends Channel> channelType = Class
.forName("io.netty.channel.epoll.EpollSocketChannel").asSubclass(Channel.class);
return channelType;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load EpollSocketChannel", e);
}
}
// Must call when epoll is available
private static Class<? extends EventLoopGroup> epollEventLoopGroupType() {
try {
return Class
.forName("io.netty.channel.epoll.EpollEventLoopGroup").asSubclass(EventLoopGroup.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load EpollEventLoopGroup", e);
}
}
// Must call when epoll is available
private static Class<? extends ServerChannel> epollServerChannelType() {
try {
Class<? extends ServerChannel> serverSocketChannel =
Class
.forName("io.netty.channel.epoll.EpollServerSocketChannel")
.asSubclass(ServerChannel.class);
return serverSocketChannel;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load EpollServerSocketChannel", e);
}
}
private static EventLoopGroup createEventLoopGroup(
Class<? extends EventLoopGroup> eventLoopGroupType,
int parallelism,
ThreadFactory threadFactory) {
try {
return eventLoopGroupType
.getConstructor(Integer.TYPE, ThreadFactory.class)
.newInstance(parallelism, threadFactory);
} catch (Exception e) {
throw new RuntimeException("Cannot create EventLoopGroup for " + eventLoopGroupType, e);
}
}
private static final class DefaultEventLoopGroupResource implements Resource<EventLoopGroup> {
private final String name;
private final int numEventLoops;
private final Class<? extends EventLoopGroup> eventLoopGroupType;
DefaultEventLoopGroupResource(int numEventLoops, String name) {
DefaultEventLoopGroupResource(
int numEventLoops, String name, Class<? extends EventLoopGroup> eventLoopGroupType) {
this.name = name;
this.numEventLoops = numEventLoops;
this.eventLoopGroupType = eventLoopGroupType;
}
@Override
public EventLoopGroup create() {
// Use Netty's DefaultThreadFactory in order to get the benefit of FastThreadLocal.
ThreadFactory threadFactory = new DefaultThreadFactory(name, /* daemon= */ true);
return new NioEventLoopGroup(numEventLoops, threadFactory);
return createEventLoopGroup(eventLoopGroupType, numEventLoops, threadFactory);
}
@Override

View File

@ -17,11 +17,16 @@
package io.grpc.netty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.grpc.ManagedChannel;
import io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalChannel;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -194,4 +199,53 @@ public class NettyChannelBuilderTest {
thrown.expectMessage("keepalive timeout must be positive");
builder.keepAliveTimeout(-1L, TimeUnit.HOURS);
}
@Test
public void shouldFallBackToNio_onlyGroupProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.eventLoopGroup(mock(EventLoopGroup.class));
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_onlyTypeProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.channelType(LocalChannel.class);
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_onlyFactoryProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.channelFactory(new ChannelFactory<Channel>() {
@Override
public Channel newChannel() {
return null;
}
});
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_usingDefault() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
assertFalse(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_bothProvided() {
NettyChannelBuilder builder = NettyChannelBuilder.forTarget("fakeTarget");
builder.eventLoopGroup(mock(EventLoopGroup.class));
builder.channelType(LocalChannel.class);
assertFalse(builder.shouldFallBackToNio());
}
}

View File

@ -53,6 +53,7 @@ import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.FakeClock;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
@ -660,7 +661,7 @@ public class NettyClientTransportTest {
TestUtils.testServerAddress(new InetSocketAddress(0)),
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,

View File

@ -16,12 +16,16 @@
package io.grpc.netty;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import io.grpc.ServerStreamTracer.Factory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.util.List;
@ -128,4 +132,45 @@ public class NettyServerBuilderTest {
builder.permitKeepAliveTime(-1, TimeUnit.HOURS);
}
@Test
public void shouldFallBackToNio_onlyBossGroupProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_onlyWorkerGroupProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.workerEventLoopGroup(mockEventLoopGroup);
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_onlyTypeProvided() {
builder.channelType(LocalServerChannel.class);
assertTrue(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_usingDefault() {
assertFalse(builder.shouldFallBackToNio());
}
@Test
public void shouldFallBackToNio_allProvided() {
EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
builder.bossEventLoopGroup(mockEventLoopGroup);
builder.workerEventLoopGroup(mockEventLoopGroup);
builder.channelType(LocalServerChannel.class);
assertFalse(builder.shouldFallBackToNio());
}
}

View File

@ -33,11 +33,11 @@ import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
@ -58,10 +58,10 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -96,10 +96,10 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -134,10 +134,10 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
channelOptions,
null, // no boss group
null, // no event group
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
@ -184,10 +184,10 @@ public class NettyServerTest {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
Utils.DEFAULT_SERVER_CHANNEL_TYPE,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),

View File

@ -16,12 +16,13 @@
package io.grpc.netty;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import com.google.common.base.MoreObjects;
import com.google.common.truth.Truth;
import io.grpc.InternalChannelz;
import io.grpc.InternalChannelz.SocketOptions;
import io.grpc.Metadata;
@ -30,6 +31,7 @@ import io.grpc.internal.GrpcUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@ -170,8 +172,44 @@ public class UtilsTest {
private static void assertStatusEquals(Status expected, Status actual) {
assertEquals(expected.getCode(), actual.getCode());
Truth.assertThat(MoreObjects.firstNonNull(actual.getDescription(), ""))
assertThat(MoreObjects.firstNonNull(actual.getDescription(), ""))
.contains(MoreObjects.firstNonNull(expected.getDescription(), ""));
assertEquals(expected.getCause(), actual.getCause());
}
@Test
public void defaultEventLoopGroup_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
EventLoopGroup defaultBossGroup = Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP.create();
EventLoopGroup defaultWorkerGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
assertThat(defaultBossGroup.getClass().getName())
.isEqualTo("io.netty.channel.epoll.EpollEventLoopGroup");
assertThat(defaultWorkerGroup.getClass().getName())
.isEqualTo("io.netty.channel.epoll.EpollEventLoopGroup");
defaultBossGroup.shutdownGracefully();
defaultWorkerGroup.shutdownGracefully();
}
@Test
public void defaultClientChannelType_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
Class<? extends Channel> clientChannelType = Utils.DEFAULT_CLIENT_CHANNEL_TYPE;
assertThat(clientChannelType.getName())
.isEqualTo("io.netty.channel.epoll.EpollSocketChannel");
}
@Test
public void defaultServerChannelType_whenEpollIsAvailable() {
assume().that(Utils.isEpollAvailable()).isTrue();
Class<? extends Channel> clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE;
assertThat(clientChannelType.getName())
.isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel");
}
}