diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 72c5c27db6..f0cfea99d7 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -38,6 +38,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; @@ -66,7 +67,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private final InternalLogId logId; private final SocketAddress address; - private final Class channelType; + private final ChannelFactory channelFactory; private final Map, ?> channelOptions; private final ProtocolNegotiator protocolNegotiator; private final int maxStreamsPerConnection; @@ -95,7 +96,7 @@ class NettyServer implements InternalServer, InternalWithLogId { new AtomicReference<>(); NettyServer( - SocketAddress address, Class channelType, + SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool bossGroupPool, ObjectPool workerGroupPool, @@ -109,7 +110,7 @@ class NettyServer implements InternalServer, InternalWithLogId { boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, InternalChannelz channelz) { this.address = address; - this.channelType = checkNotNull(channelType, "channelType"); + this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); this.channelOptions = new HashMap, Object>(channelOptions); this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool"); @@ -155,7 +156,7 @@ class NettyServer implements InternalServer, InternalWithLogId { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); - b.channel(channelType); + b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. b.option(SO_BACKLOG, 128); b.childOption(SO_KEEPALIVE, true); @@ -170,7 +171,7 @@ class NettyServer implements InternalServer, InternalWithLogId { b.childHandler(new ChannelInitializer() { @Override - public void initChannel(Channel ch) throws Exception { + public void initChannel(Channel ch) { ChannelPromise channelDone = ch.newPromise(); @@ -217,7 +218,7 @@ class NettyServer implements InternalServer, InternalWithLogId { * Releases the event loop if the channel is "done", possibly due to the channel closing. */ final class LoopReleaser implements ChannelFutureListener { - boolean done; + private boolean done; @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 94d68f2441..22fab43a2e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -25,7 +25,6 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.ExperimentalApi; import io.grpc.Internal; @@ -36,8 +35,10 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; +import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ReflectiveChannelFactory; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; @@ -79,7 +80,9 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder listenAddresses = new ArrayList<>(); - private Class channelType = null; + + private ChannelFactory channelFactory = + Utils.DEFAULT_SERVER_CHANNEL_FACTORY; private final Map, Object> channelOptions = new HashMap<>(); private ObjectPool bossEventLoopGroupPool = DEFAULT_BOSS_EVENT_LOOP_GROUP_POOL; @@ -91,7 +94,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilderYou either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your + * {@link ServerChannel} implementation has no no-args constructor. + * + *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory + * when the channel is built, the builder will use the default one which is static. * *

You must also provide corresponding {@link EventLoopGroup} using {@link * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For @@ -151,7 +160,26 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder channelType) { - this.channelType = Preconditions.checkNotNull(channelType, "channelType"); + checkNotNull(channelType, "channelType"); + return channelFactory(new ReflectiveChannelFactory<>(channelType)); + } + + /** + * Specifies the {@link ChannelFactory} to create {@link ServerChannel} instances. This method is + * usually only used if the specific {@code ServerChannel} requires complex logic which requires + * additional information to create the {@code ServerChannel}. Otherwise, recommend to use {@link + * #channelType(Class)}. + * + *

It's an optional parameter. If the user has not provided an Channel type or ChannelFactory + * when the channel is built, the builder will use the default one which is static. + * + *

You must also provide corresponding {@link EventLoopGroup} using {@link + * #workerEventLoopGroup(EventLoopGroup)} and {@link #bossEventLoopGroup(EventLoopGroup)}. For + * example, if the factory creates {@link NioServerSocketChannel} you must use {@link + * io.netty.channel.nio.NioEventLoopGroup}, otherwise your server won't start. + */ + public NettyServerBuilder channelFactory(ChannelFactory channelFactory) { + this.channelFactory = checkNotNull(channelFactory, "channelFactory"); return this; } @@ -499,16 +527,13 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder resolvedChannelType = - channelType == null ? Utils.DEFAULT_SERVER_CHANNEL_TYPE : channelType; - List transportServers = new ArrayList<>(listenAddresses.size()); for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( - listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool, + listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool, workerEventLoopGroupPool, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, @@ -521,10 +546,10 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder DEFAULT_BOSS_EVENT_LOOP_GROUP; public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; - public static final Class DEFAULT_SERVER_CHANNEL_TYPE; + public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; @Nullable @@ -90,8 +92,8 @@ class Utils { static { // Decide default channel types and EventLoopGroup based on Epoll availability if (isEpollAvailable()) { - DEFAULT_SERVER_CHANNEL_TYPE = epollServerChannelType(); DEFAULT_CLIENT_CHANNEL_TYPE = epollChannelType(); + DEFAULT_SERVER_CHANNEL_FACTORY = new ReflectiveChannelFactory<>(epollServerChannelType()); EPOLL_EVENT_LOOP_GROUP_CONSTRUCTOR = epollEventLoopGroupConstructor(); DEFAULT_BOSS_EVENT_LOOP_GROUP = new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG", EventLoopGroupType.EPOLL); @@ -99,7 +101,7 @@ class Utils { = new DefaultEventLoopGroupResource(0,"grpc-default-worker-ELG", EventLoopGroupType.EPOLL); } else { logger.log(Level.FINE, "Epoll is not available, using Nio.", getEpollUnavailabilityCause()); - DEFAULT_SERVER_CHANNEL_TYPE = NioServerSocketChannel.class; + DEFAULT_SERVER_CHANNEL_FACTORY = nioServerChannelFactory(); DEFAULT_CLIENT_CHANNEL_TYPE = NioSocketChannel.class; DEFAULT_BOSS_EVENT_LOOP_GROUP = NIO_BOSS_EVENT_LOOP_GROUP; DEFAULT_WORKER_EVENT_LOOP_GROUP = NIO_WORKER_EVENT_LOOP_GROUP; @@ -290,6 +292,15 @@ class Utils { } } + private static ChannelFactory nioServerChannelFactory() { + return new ChannelFactory() { + @Override + public ServerChannel newChannel() { + return new NioServerSocketChannel(); + } + }; + } + /** * Returns TCP_USER_TIMEOUT channel option for Epoll channel if Epoll is available, otherwise * null. diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 75e40bb244..a32f7e06ed 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -719,7 +719,7 @@ public class NettyClientTransportTest { private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { server = new NettyServer( TestUtils.testServerAddress(new InetSocketAddress(0)), - NioServerSocketChannel.class, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator, Collections.emptyList(), diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index b4fa966c49..51f0a50f27 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -58,7 +58,7 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_TYPE, + Utils.DEFAULT_SERVER_CHANNEL_FACTORY, new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), @@ -96,7 +96,7 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_TYPE, + Utils.DEFAULT_SERVER_CHANNEL_FACTORY, new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), @@ -134,7 +134,7 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_TYPE, + Utils.DEFAULT_SERVER_CHANNEL_FACTORY, channelOptions, SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), @@ -184,7 +184,7 @@ public class NettyServerTest { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( addr, - Utils.DEFAULT_SERVER_CHANNEL_TYPE, + Utils.DEFAULT_SERVER_CHANNEL_FACTORY, new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java index bb66d29c2a..c5d5113850 100644 --- a/netty/src/test/java/io/grpc/netty/UtilsTest.java +++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java @@ -29,9 +29,11 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.GrpcUtil; import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -204,13 +206,13 @@ public class UtilsTest { } @Test - public void defaultServerChannelType_whenEpollIsAvailable() { + public void defaultServerChannelFactory_whenEpollIsAvailable() { assume().that(Utils.isEpollAvailable()).isTrue(); - Class clientChannelType = Utils.DEFAULT_SERVER_CHANNEL_TYPE; + ChannelFactory channelFactory = Utils.DEFAULT_SERVER_CHANNEL_FACTORY; - assertThat(clientChannelType.getName()) - .isEqualTo("io.netty.channel.epoll.EpollServerSocketChannel"); + assertThat(channelFactory.toString()) + .isEqualTo("ReflectiveChannelFactory(EpollServerSocketChannel.class)"); } @Test