netty: make server sockets be configurable

This commit is contained in:
Xiaoshuang LU 2017-11-23 08:00:33 +08:00 committed by Carl Mastrangelo
parent 7fd199f32e
commit cdb1f54794
4 changed files with 108 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -42,7 +43,9 @@ import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -56,6 +59,7 @@ class NettyServer implements InternalServer, WithLogId {
private final LogId logId = LogId.allocate(getClass().getName());
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final boolean usingSharedBossGroup;
@ -80,6 +84,7 @@ class NettyServer implements InternalServer, WithLogId {
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
@ -90,6 +95,8 @@ class NettyServer implements InternalServer, WithLogId {
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
@ -136,6 +143,15 @@ class NettyServer implements InternalServer, WithLogId {
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}
if (channelOptions != null) {
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
@SuppressWarnings("unchecked")
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
b.childOption(key, entry.getValue());
}
}
b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {

View File

@ -32,6 +32,7 @@ import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -39,7 +40,9 @@ import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
@ -65,6 +68,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private final SocketAddress address;
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<ChannelOption<?>, Object>();
@Nullable
private EventLoopGroup bossEventLoopGroup;
@Nullable
@ -123,6 +128,17 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return this;
}
/**
* Specifies a channel option. As the underlying channel as well as network implementation may
* ignore this value applications should consider it a hint.
*
* @since 1.9.0
*/
public <T> NettyServerBuilder withChildOption(ChannelOption<T> option, T value) {
this.channelOptions.put(option, value);
return this;
}
/**
* Provides the boss EventGroupLoop to the server.
*
@ -403,7 +419,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
return new NettyServer(
address, channelType, bossEventLoopGroup, workerEventLoopGroup,
address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, transportTracerFactory,
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,

View File

@ -556,7 +556,9 @@ public class NettyClientTransportTest {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
NioServerSocketChannel.class, group, group, negotiator,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,

View File

@ -23,9 +23,18 @@ import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -39,6 +48,7 @@ public class NettyServerTest {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
@ -75,6 +85,7 @@ public class NettyServerTest {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
@ -91,4 +102,65 @@ public class NettyServerTest {
assertThat(ns.getPort()).isEqualTo(-1);
}
@Test(timeout = 60000)
public void childChannelOptions() throws Exception {
final int originalLowWaterMark = 2097169;
final int originalHighWaterMark = 2097211;
Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark));
final AtomicInteger lowWaterMark = new AtomicInteger(0);
final AtomicInteger highWaterMark = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
NettyServer ns = new NettyServer(
new InetSocketAddress(9999),
NioServerSocketChannel.class,
channelOptions,
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0); // ignore
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
Channel channel = ((NettyServerTransport)transport).channel();
WriteBufferWaterMark writeBufferWaterMark = channel.config()
.getOption(ChannelOption.WRITE_BUFFER_WATER_MARK);
lowWaterMark.set(writeBufferWaterMark.low());
highWaterMark.set(writeBufferWaterMark.high());
countDownLatch.countDown();
return null;
}
@Override
public void serverShutdown() {}
});
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9999), 8000);
countDownLatch.await();
socket.close();
assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark);
assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark);
ns.shutdown();
}
}