mirror of https://github.com/grpc/grpc-java.git
netty: use singleton for the custom allocator. (#6526)
The allocator has a circular reference that prevents it from GC'ed, thus causes memory leak if gRPC Channels are created and shutdown (even cleanly) on a regular basis. See https://github.com/netty/netty/issues/6891#issuecomment-457809308 and internal b/146074696.
This commit is contained in:
parent
16ba163291
commit
ada575dd24
|
|
@ -40,7 +40,6 @@ import io.grpc.internal.KeepAliveManager;
|
||||||
import io.grpc.internal.ObjectPool;
|
import io.grpc.internal.ObjectPool;
|
||||||
import io.grpc.internal.SharedResourcePool;
|
import io.grpc.internal.SharedResourcePool;
|
||||||
import io.grpc.internal.TransportTracer;
|
import io.grpc.internal.TransportTracer;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFactory;
|
import io.netty.channel.ChannelFactory;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
|
|
@ -75,8 +74,6 @@ public final class NettyChannelBuilder
|
||||||
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
|
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
|
||||||
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
|
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
|
||||||
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
|
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
|
|
||||||
|
|
||||||
private final Map<ChannelOption<?>, Object> channelOptions =
|
private final Map<ChannelOption<?>, Object> channelOptions =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
|
|
@ -406,7 +403,7 @@ public final class NettyChannelBuilder
|
||||||
|
|
||||||
return new NettyTransportFactory(
|
return new NettyTransportFactory(
|
||||||
negotiator, channelFactory, channelOptions,
|
negotiator, channelFactory, channelOptions,
|
||||||
eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
|
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
|
||||||
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
|
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
|
||||||
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
|
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
|
||||||
}
|
}
|
||||||
|
|
@ -521,8 +518,6 @@ public final class NettyChannelBuilder
|
||||||
private final Map<ChannelOption<?>, ?> channelOptions;
|
private final Map<ChannelOption<?>, ?> channelOptions;
|
||||||
private final ObjectPool<? extends EventLoopGroup> groupPool;
|
private final ObjectPool<? extends EventLoopGroup> groupPool;
|
||||||
private final EventLoopGroup group;
|
private final EventLoopGroup group;
|
||||||
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
|
|
||||||
private final ByteBufAllocator allocator;
|
|
||||||
private final int flowControlWindow;
|
private final int flowControlWindow;
|
||||||
private final int maxMessageSize;
|
private final int maxMessageSize;
|
||||||
private final int maxHeaderListSize;
|
private final int maxHeaderListSize;
|
||||||
|
|
@ -538,7 +533,6 @@ public final class NettyChannelBuilder
|
||||||
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
|
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
|
||||||
ChannelFactory<? extends Channel> channelFactory,
|
ChannelFactory<? extends Channel> channelFactory,
|
||||||
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
|
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
|
||||||
ObjectPool<? extends ByteBufAllocator> allocatorPool,
|
|
||||||
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
|
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
|
||||||
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
|
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
|
||||||
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
|
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
|
||||||
|
|
@ -548,8 +542,6 @@ public final class NettyChannelBuilder
|
||||||
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
|
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
|
||||||
this.groupPool = groupPool;
|
this.groupPool = groupPool;
|
||||||
this.group = groupPool.getObject();
|
this.group = groupPool.getObject();
|
||||||
this.allocatorPool = allocatorPool;
|
|
||||||
this.allocator = allocatorPool.getObject();
|
|
||||||
this.flowControlWindow = flowControlWindow;
|
this.flowControlWindow = flowControlWindow;
|
||||||
this.maxMessageSize = maxMessageSize;
|
this.maxMessageSize = maxMessageSize;
|
||||||
this.maxHeaderListSize = maxHeaderListSize;
|
this.maxHeaderListSize = maxHeaderListSize;
|
||||||
|
|
@ -588,7 +580,7 @@ public final class NettyChannelBuilder
|
||||||
|
|
||||||
// TODO(carl-mastrangelo): Pass channelLogger in.
|
// TODO(carl-mastrangelo): Pass channelLogger in.
|
||||||
NettyClientTransport transport = new NettyClientTransport(
|
NettyClientTransport transport = new NettyClientTransport(
|
||||||
serverAddress, channelFactory, channelOptions, group, allocator,
|
serverAddress, channelFactory, channelOptions, group,
|
||||||
localNegotiator, flowControlWindow,
|
localNegotiator, flowControlWindow,
|
||||||
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
|
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
|
||||||
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
|
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
|
||||||
|
|
@ -609,7 +601,6 @@ public final class NettyChannelBuilder
|
||||||
}
|
}
|
||||||
closed = true;
|
closed = true;
|
||||||
|
|
||||||
allocatorPool.returnObject(allocator);
|
|
||||||
protocolNegotiator.close();
|
protocolNegotiator.close();
|
||||||
groupPool.returnObject(group);
|
groupPool.returnObject(group);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,6 @@ import io.grpc.internal.StatsTraceContext;
|
||||||
import io.grpc.internal.TransportTracer;
|
import io.grpc.internal.TransportTracer;
|
||||||
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
|
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFactory;
|
import io.netty.channel.ChannelFactory;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
|
@ -76,7 +75,6 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
private final SocketAddress remoteAddress;
|
private final SocketAddress remoteAddress;
|
||||||
private final ChannelFactory<? extends Channel> channelFactory;
|
private final ChannelFactory<? extends Channel> channelFactory;
|
||||||
private final EventLoopGroup group;
|
private final EventLoopGroup group;
|
||||||
private final ByteBufAllocator allocator;
|
|
||||||
private final ProtocolNegotiator negotiator;
|
private final ProtocolNegotiator negotiator;
|
||||||
private final String authorityString;
|
private final String authorityString;
|
||||||
private final AsciiString authority;
|
private final AsciiString authority;
|
||||||
|
|
@ -108,7 +106,6 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
NettyClientTransport(
|
NettyClientTransport(
|
||||||
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
|
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
|
||||||
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
|
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
|
||||||
ByteBufAllocator allocator,
|
|
||||||
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
|
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
|
||||||
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
|
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
|
||||||
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
|
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
|
||||||
|
|
@ -119,7 +116,6 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
this.negotiationScheme = this.negotiator.scheme();
|
this.negotiationScheme = this.negotiator.scheme();
|
||||||
this.remoteAddress = Preconditions.checkNotNull(address, "address");
|
this.remoteAddress = Preconditions.checkNotNull(address, "address");
|
||||||
this.group = Preconditions.checkNotNull(group, "group");
|
this.group = Preconditions.checkNotNull(group, "group");
|
||||||
this.allocator = Preconditions.checkNotNull(allocator, "allocator");
|
|
||||||
this.channelFactory = channelFactory;
|
this.channelFactory = channelFactory;
|
||||||
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
|
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
|
||||||
this.flowControlWindow = flowControlWindow;
|
this.flowControlWindow = flowControlWindow;
|
||||||
|
|
@ -230,7 +226,7 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
|
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
|
||||||
|
|
||||||
Bootstrap b = new Bootstrap();
|
Bootstrap b = new Bootstrap();
|
||||||
b.option(ALLOCATOR, allocator);
|
b.option(ALLOCATOR, Utils.getByteBufAllocator());
|
||||||
b.attr(LOGGER_KEY, channelLogger);
|
b.attr(LOGGER_KEY, channelLogger);
|
||||||
b.group(eventLoop);
|
b.group(eventLoop);
|
||||||
b.channelFactory(channelFactory);
|
b.channelFactory(channelFactory);
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,6 @@ import io.grpc.internal.ServerListener;
|
||||||
import io.grpc.internal.ServerTransportListener;
|
import io.grpc.internal.ServerTransportListener;
|
||||||
import io.grpc.internal.TransportTracer;
|
import io.grpc.internal.TransportTracer;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFactory;
|
import io.netty.channel.ChannelFactory;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
|
@ -75,10 +74,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
|
||||||
private final int maxStreamsPerConnection;
|
private final int maxStreamsPerConnection;
|
||||||
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
|
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
|
||||||
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
|
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
|
||||||
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
|
|
||||||
private EventLoopGroup bossGroup;
|
private EventLoopGroup bossGroup;
|
||||||
private EventLoopGroup workerGroup;
|
private EventLoopGroup workerGroup;
|
||||||
private ByteBufAllocator allocator;
|
|
||||||
private ServerListener listener;
|
private ServerListener listener;
|
||||||
private Channel channel;
|
private Channel channel;
|
||||||
private final int flowControlWindow;
|
private final int flowControlWindow;
|
||||||
|
|
@ -105,7 +102,6 @@ class NettyServer implements InternalServer, InternalWithLogId {
|
||||||
Map<ChannelOption<?>, ?> channelOptions,
|
Map<ChannelOption<?>, ?> channelOptions,
|
||||||
ObjectPool<? extends EventLoopGroup> bossGroupPool,
|
ObjectPool<? extends EventLoopGroup> bossGroupPool,
|
||||||
ObjectPool<? extends EventLoopGroup> workerGroupPool,
|
ObjectPool<? extends EventLoopGroup> workerGroupPool,
|
||||||
ObjectPool<? extends ByteBufAllocator> allocatorPool,
|
|
||||||
ProtocolNegotiator protocolNegotiator,
|
ProtocolNegotiator protocolNegotiator,
|
||||||
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
|
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
|
||||||
TransportTracer.Factory transportTracerFactory,
|
TransportTracer.Factory transportTracerFactory,
|
||||||
|
|
@ -121,10 +117,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
|
||||||
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
|
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
|
||||||
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
|
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
|
||||||
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
|
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
|
||||||
this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
|
|
||||||
this.bossGroup = bossGroupPool.getObject();
|
this.bossGroup = bossGroupPool.getObject();
|
||||||
this.workerGroup = workerGroupPool.getObject();
|
this.workerGroup = workerGroupPool.getObject();
|
||||||
this.allocator = allocatorPool.getObject();
|
|
||||||
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
|
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
|
||||||
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
|
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
|
||||||
this.transportTracerFactory = transportTracerFactory;
|
this.transportTracerFactory = transportTracerFactory;
|
||||||
|
|
@ -163,8 +157,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
|
||||||
listener = checkNotNull(serverListener, "serverListener");
|
listener = checkNotNull(serverListener, "serverListener");
|
||||||
|
|
||||||
ServerBootstrap b = new ServerBootstrap();
|
ServerBootstrap b = new ServerBootstrap();
|
||||||
b.option(ALLOCATOR, allocator);
|
b.option(ALLOCATOR, Utils.getByteBufAllocator());
|
||||||
b.childOption(ALLOCATOR, allocator);
|
b.childOption(ALLOCATOR, Utils.getByteBufAllocator());
|
||||||
b.group(bossGroup, workerGroup);
|
b.group(bossGroup, workerGroup);
|
||||||
b.channelFactory(channelFactory);
|
b.channelFactory(channelFactory);
|
||||||
// For non-socket based channel, the option will be ignored.
|
// For non-socket based channel, the option will be ignored.
|
||||||
|
|
@ -331,13 +325,6 @@ class NettyServer implements InternalServer, InternalWithLogId {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
workerGroup = null;
|
workerGroup = null;
|
||||||
try {
|
|
||||||
if (allocator != null) {
|
|
||||||
allocatorPool.returnObject(allocator);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
allocator = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,6 @@ import io.grpc.internal.GrpcUtil;
|
||||||
import io.grpc.internal.KeepAliveManager;
|
import io.grpc.internal.KeepAliveManager;
|
||||||
import io.grpc.internal.ObjectPool;
|
import io.grpc.internal.ObjectPool;
|
||||||
import io.grpc.internal.SharedResourcePool;
|
import io.grpc.internal.SharedResourcePool;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.ChannelFactory;
|
import io.netty.channel.ChannelFactory;
|
||||||
import io.netty.channel.ChannelOption;
|
import io.netty.channel.ChannelOption;
|
||||||
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.EventLoopGroup;
|
||||||
|
|
@ -80,8 +79,6 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
|
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
|
||||||
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
|
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
|
||||||
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
|
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);
|
|
||||||
|
|
||||||
private final List<SocketAddress> listenAddresses = new ArrayList<>();
|
private final List<SocketAddress> listenAddresses = new ArrayList<>();
|
||||||
|
|
||||||
|
|
@ -544,7 +541,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
|
||||||
for (SocketAddress listenAddress : listenAddresses) {
|
for (SocketAddress listenAddress : listenAddresses) {
|
||||||
NettyServer transportServer = new NettyServer(
|
NettyServer transportServer = new NettyServer(
|
||||||
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
|
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
|
||||||
workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
|
workerEventLoopGroupPool, negotiator, streamTracerFactories,
|
||||||
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
|
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
|
||||||
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
|
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
|
||||||
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
|
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
|
||||||
|
|
|
||||||
|
|
@ -86,41 +86,37 @@ class Utils {
|
||||||
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
|
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
|
||||||
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
|
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;
|
||||||
|
|
||||||
public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
|
// This class is initialized on first use, thus provides delayed allocator creation.
|
||||||
new Resource<ByteBufAllocator>() {
|
private static final class ByteBufAllocatorHolder {
|
||||||
@Override
|
private static final ByteBufAllocator allocator;
|
||||||
public ByteBufAllocator create() {
|
|
||||||
if (Boolean.parseBoolean(
|
|
||||||
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
|
|
||||||
int maxOrder;
|
|
||||||
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
|
|
||||||
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
|
|
||||||
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
|
|
||||||
// 2MiB, thus reducing the maxOrder to 8.
|
|
||||||
maxOrder = 8;
|
|
||||||
} else {
|
|
||||||
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
|
|
||||||
}
|
|
||||||
return new PooledByteBufAllocator(
|
|
||||||
PooledByteBufAllocator.defaultPreferDirect(),
|
|
||||||
PooledByteBufAllocator.defaultNumHeapArena(),
|
|
||||||
PooledByteBufAllocator.defaultNumDirectArena(),
|
|
||||||
PooledByteBufAllocator.defaultPageSize(),
|
|
||||||
maxOrder,
|
|
||||||
PooledByteBufAllocator.defaultTinyCacheSize(),
|
|
||||||
PooledByteBufAllocator.defaultSmallCacheSize(),
|
|
||||||
PooledByteBufAllocator.defaultNormalCacheSize(),
|
|
||||||
PooledByteBufAllocator.defaultUseCacheForAllThreads());
|
|
||||||
} else {
|
|
||||||
return ByteBufAllocator.DEFAULT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
static {
|
||||||
public void close(ByteBufAllocator allocator) {
|
if (Boolean.parseBoolean(
|
||||||
// PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
|
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
|
||||||
|
int maxOrder;
|
||||||
|
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
|
||||||
|
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
|
||||||
|
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
|
||||||
|
// 2MiB, thus reducing the maxOrder to 8.
|
||||||
|
maxOrder = 8;
|
||||||
|
} else {
|
||||||
|
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
|
||||||
}
|
}
|
||||||
};
|
allocator = new PooledByteBufAllocator(
|
||||||
|
PooledByteBufAllocator.defaultPreferDirect(),
|
||||||
|
PooledByteBufAllocator.defaultNumHeapArena(),
|
||||||
|
PooledByteBufAllocator.defaultNumDirectArena(),
|
||||||
|
PooledByteBufAllocator.defaultPageSize(),
|
||||||
|
maxOrder,
|
||||||
|
PooledByteBufAllocator.defaultTinyCacheSize(),
|
||||||
|
PooledByteBufAllocator.defaultSmallCacheSize(),
|
||||||
|
PooledByteBufAllocator.defaultNormalCacheSize(),
|
||||||
|
PooledByteBufAllocator.defaultUseCacheForAllThreads());
|
||||||
|
} else {
|
||||||
|
allocator = ByteBufAllocator.DEFAULT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
|
public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
|
||||||
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
|
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
|
||||||
|
|
@ -148,6 +144,10 @@ class Utils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ByteBufAllocator getByteBufAllocator() {
|
||||||
|
return ByteBufAllocatorHolder.allocator;
|
||||||
|
}
|
||||||
|
|
||||||
public static Metadata convertHeaders(Http2Headers http2Headers) {
|
public static Metadata convertHeaders(Http2Headers http2Headers) {
|
||||||
if (http2Headers instanceof GrpcHttp2InboundHeaders) {
|
if (http2Headers instanceof GrpcHttp2InboundHeaders) {
|
||||||
GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers;
|
GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers;
|
||||||
|
|
|
||||||
|
|
@ -62,12 +62,9 @@ import io.grpc.internal.ServerStream;
|
||||||
import io.grpc.internal.ServerStreamListener;
|
import io.grpc.internal.ServerStreamListener;
|
||||||
import io.grpc.internal.ServerTransport;
|
import io.grpc.internal.ServerTransport;
|
||||||
import io.grpc.internal.ServerTransportListener;
|
import io.grpc.internal.ServerTransportListener;
|
||||||
import io.grpc.internal.SharedResourceHolder;
|
|
||||||
import io.grpc.internal.SharedResourcePool;
|
|
||||||
import io.grpc.internal.TransportTracer;
|
import io.grpc.internal.TransportTracer;
|
||||||
import io.grpc.internal.testing.TestUtils;
|
import io.grpc.internal.testing.TestUtils;
|
||||||
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
|
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelConfig;
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
|
|
@ -126,7 +123,6 @@ public class NettyClientTransportTest {
|
||||||
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
|
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
|
||||||
new LinkedBlockingQueue<>();
|
new LinkedBlockingQueue<>();
|
||||||
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
|
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
|
||||||
private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
|
|
||||||
private final EchoServerListener serverListener = new EchoServerListener();
|
private final EchoServerListener serverListener = new EchoServerListener();
|
||||||
private final InternalChannelz channelz = new InternalChannelz();
|
private final InternalChannelz channelz = new InternalChannelz();
|
||||||
private Runnable tooManyPingsRunnable = new Runnable() {
|
private Runnable tooManyPingsRunnable = new Runnable() {
|
||||||
|
|
@ -157,7 +153,6 @@ public class NettyClientTransportTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
|
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
|
||||||
SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -195,7 +190,7 @@ public class NettyClientTransportTest {
|
||||||
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
|
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
|
||||||
NettyClientTransport transport = new NettyClientTransport(
|
NettyClientTransport transport = new NettyClientTransport(
|
||||||
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
|
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
|
||||||
allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
|
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
|
||||||
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
|
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
|
||||||
new SocketPicker(), new FakeChannelLogger(), false);
|
new SocketPicker(), new FakeChannelLogger(), false);
|
||||||
|
|
@ -440,7 +435,7 @@ public class NettyClientTransportTest {
|
||||||
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
|
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
|
||||||
NettyClientTransport transport = new NettyClientTransport(
|
NettyClientTransport transport = new NettyClientTransport(
|
||||||
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
|
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
|
||||||
new HashMap<ChannelOption<?>, Object>(), group, allocator,
|
new HashMap<ChannelOption<?>, Object>(), group,
|
||||||
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
|
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
|
||||||
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
|
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
|
||||||
|
|
@ -710,7 +705,7 @@ public class NettyClientTransportTest {
|
||||||
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
|
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
|
||||||
}
|
}
|
||||||
NettyClientTransport transport = new NettyClientTransport(
|
NettyClientTransport transport = new NettyClientTransport(
|
||||||
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
|
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
|
||||||
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
|
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
|
||||||
keepAliveTimeNano, keepAliveTimeoutNano,
|
keepAliveTimeNano, keepAliveTimeoutNano,
|
||||||
false, authority, userAgent, tooManyPingsRunnable,
|
false, authority, userAgent, tooManyPingsRunnable,
|
||||||
|
|
@ -728,8 +723,7 @@ public class NettyClientTransportTest {
|
||||||
TestUtils.testServerAddress(new InetSocketAddress(0)),
|
TestUtils.testServerAddress(new InetSocketAddress(0)),
|
||||||
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
|
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
|
||||||
new HashMap<ChannelOption<?>, Object>(),
|
new HashMap<ChannelOption<?>, Object>(),
|
||||||
new FixedObjectPool<>(group), new FixedObjectPool<>(group),
|
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
|
|
||||||
Collections.<ServerStreamTracer.Factory>emptyList(),
|
Collections.<ServerStreamTracer.Factory>emptyList(),
|
||||||
TransportTracer.getDefaultFactory(),
|
TransportTracer.getDefaultFactory(),
|
||||||
maxStreamsPerConnection,
|
maxStreamsPerConnection,
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,6 @@ public class NettyServerTest {
|
||||||
new HashMap<ChannelOption<?>, Object>(),
|
new HashMap<ChannelOption<?>, Object>(),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
|
|
||||||
protocolNegotiator,
|
protocolNegotiator,
|
||||||
Collections.<ServerStreamTracer.Factory>emptyList(),
|
Collections.<ServerStreamTracer.Factory>emptyList(),
|
||||||
TransportTracer.getDefaultFactory(),
|
TransportTracer.getDefaultFactory(),
|
||||||
|
|
@ -128,7 +127,6 @@ public class NettyServerTest {
|
||||||
new HashMap<ChannelOption<?>, Object>(),
|
new HashMap<ChannelOption<?>, Object>(),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
|
|
||||||
ProtocolNegotiators.plaintext(),
|
ProtocolNegotiators.plaintext(),
|
||||||
Collections.<ServerStreamTracer.Factory>emptyList(),
|
Collections.<ServerStreamTracer.Factory>emptyList(),
|
||||||
TransportTracer.getDefaultFactory(),
|
TransportTracer.getDefaultFactory(),
|
||||||
|
|
@ -167,7 +165,6 @@ public class NettyServerTest {
|
||||||
channelOptions,
|
channelOptions,
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
|
|
||||||
ProtocolNegotiators.plaintext(),
|
ProtocolNegotiators.plaintext(),
|
||||||
Collections.<ServerStreamTracer.Factory>emptyList(),
|
Collections.<ServerStreamTracer.Factory>emptyList(),
|
||||||
TransportTracer.getDefaultFactory(),
|
TransportTracer.getDefaultFactory(),
|
||||||
|
|
@ -218,7 +215,6 @@ public class NettyServerTest {
|
||||||
new HashMap<ChannelOption<?>, Object>(),
|
new HashMap<ChannelOption<?>, Object>(),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
|
||||||
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
|
|
||||||
ProtocolNegotiators.plaintext(),
|
ProtocolNegotiators.plaintext(),
|
||||||
Collections.<ServerStreamTracer.Factory>emptyList(),
|
Collections.<ServerStreamTracer.Factory>emptyList(),
|
||||||
TransportTracer.getDefaultFactory(),
|
TransportTracer.getDefaultFactory(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue