netty: refactor how internal netty channel works

This commit is contained in:
Carl Mastrangelo 2016-12-16 10:36:12 -08:00 committed by GitHub
parent e36eb10728
commit cb6cf1ae2f
2 changed files with 107 additions and 41 deletions

View File

@ -32,9 +32,6 @@
package io.grpc.netty; package io.grpc.netty;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.netty.NettyChannelBuilder.NettyTransportFactory;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -57,14 +54,26 @@ public final class InternalNettyChannelBuilder {
} }
/** /**
* Creates a custom client transport that allows overriding the protocol negotiator. * Interface to create netty dynamic parameters.
*/ */
public static ConnectionClientTransport newClientTransport( public interface TransportCreationParamsFilterFactory
ClientTransportFactory transportFactory, SocketAddress serverAddress, String authority, extends NettyChannelBuilder.TransportCreationParamsFilterFactory {
String userAgent, ProtocolNegotiator negotiator) { @Override
// Casting to avoid making {@link NettyTransportFactory} public. TransportCreationParamsFilter create(
return ((NettyTransportFactory) transportFactory) SocketAddress targetServerAddress, String authority, String userAgent);
.newClientTransport(serverAddress, authority, userAgent, negotiator); }
/**
* {@link TransportCreationParamsFilter} are those that may depend on late-known information about
* a client transport. This interface can be used to dynamically alter params based on the
* params of {@code ClientTransportFactory#newClientTransport}.
*/
public interface TransportCreationParamsFilter
extends NettyChannelBuilder.TransportCreationParamsFilter {}
public static void setDynamicTransportParamsFactory(
NettyChannelBuilder builder, TransportCreationParamsFilterFactory factory) {
builder.setDynamicParamsFactory(factory);
} }
private InternalNettyChannelBuilder() {} private InternalNettyChannelBuilder() {}

View File

@ -32,7 +32,8 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_DELAY_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_DELAY_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
@ -85,6 +86,8 @@ public final class NettyChannelBuilder
private boolean enableKeepAlive; private boolean enableKeepAlive;
private long keepAliveDelayNanos; private long keepAliveDelayNanos;
private long keepAliveTimeoutNanos; private long keepAliveTimeoutNanos;
private TransportCreationParamsFilterFactory dynamicParamsFactory =
DynamicNettyTransportParams.FACTORY;
/** /**
* Creates a new builder with the given server address. This factory method is primarily intended * Creates a new builder with the given server address. This factory method is primarily intended
@ -260,10 +263,9 @@ public final class NettyChannelBuilder
@Override @Override
protected ClientTransportFactory buildTransportFactory() { protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory( return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
channelType, channelOptions, negotiationType, null /* protocolNegotiator */, sslContext, negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
eventLoopGroup, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, maxHeaderListSize, enableKeepAlive, keepAliveDelayNanos, keepAliveTimeoutNanos);
enableKeepAlive, keepAliveDelayNanos, keepAliveTimeoutNanos);
} }
@Override @Override
@ -324,36 +326,99 @@ public final class NettyChannelBuilder
return super.checkAuthority(authority); return super.checkAuthority(authority);
} }
void setDynamicParamsFactory(TransportCreationParamsFilterFactory factory) {
this.dynamicParamsFactory = checkNotNull(factory, "factory");
}
interface TransportCreationParamsFilterFactory {
TransportCreationParamsFilter create(
SocketAddress targetServerAddress, String authority, String userAgent);
}
interface TransportCreationParamsFilter {
SocketAddress getTargetServerAddress();
String getAuthority();
@Nullable String getUserAgent();
ProtocolNegotiator getProtocolNegotiator(
NegotiationType negotiationType, SslContext sslContext);
}
private static final class DynamicNettyTransportParams implements TransportCreationParamsFilter {
private static final TransportCreationParamsFilterFactory FACTORY =
new TransportCreationParamsFilterFactory() {
@Override
public TransportCreationParamsFilter create(
SocketAddress targetServerAddress, String authority, String userAgent) {
return new DynamicNettyTransportParams(targetServerAddress, authority, userAgent);
}
};
private final SocketAddress targetServerAddress;
private final String authority;
@Nullable private final String userAgent;
private DynamicNettyTransportParams(
SocketAddress targetServerAddress, String authority, String userAgent) {
this.targetServerAddress = targetServerAddress;
this.authority = authority;
this.userAgent = userAgent;
}
@Override
public SocketAddress getTargetServerAddress() {
return targetServerAddress;
}
@Override
public String getAuthority() {
return authority;
}
@Override
public String getUserAgent() {
return userAgent;
}
@Override
public ProtocolNegotiator getProtocolNegotiator(
NegotiationType negotiationType, SslContext sslContext) {
return createProtocolNegotiator(authority, negotiationType, sslContext);
}
}
/** /**
* Creates Netty transports. Exposed for internal use, as it should be private. * Creates Netty transports. Exposed for internal use, as it should be private.
*/ */
static class NettyTransportFactory implements ClientTransportFactory { private static final class NettyTransportFactory implements ClientTransportFactory {
private final TransportCreationParamsFilterFactory dynamicParams;
private final Class<? extends Channel> channelType; private final Class<? extends Channel> channelType;
private final Map<ChannelOption<?>, ?> channelOptions; private final Map<ChannelOption<?>, ?> channelOptions;
private final NegotiationType negotiationType; private final NegotiationType negotiationType;
private final ProtocolNegotiator protocolNegotiator;
private final SslContext sslContext; private final SslContext sslContext;
private final EventLoopGroup group; private final EventLoopGroup group;
private final boolean usingSharedGroup; private final boolean usingSharedGroup;
private final int flowControlWindow; private final int flowControlWindow;
private final int maxMessageSize; private final int maxMessageSize;
private final int maxHeaderListSize; private final int maxHeaderListSize;
private boolean enableKeepAlive; private final boolean enableKeepAlive;
private long keepAliveDelayNanos; private final long keepAliveDelayNanos;
private long keepAliveTimeoutNanos; private final long keepAliveTimeoutNanos;
private boolean closed; private boolean closed;
private NettyTransportFactory( NettyTransportFactory(TransportCreationParamsFilterFactory dynamicParams,
Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions, Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions,
NegotiationType negotiationType, ProtocolNegotiator protocolNegotiator, NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group,
SslContext sslContext, EventLoopGroup group, int flowControlWindow, int maxMessageSize, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, boolean enableKeepAlive,
int maxHeaderListSize, boolean enableKeepAlive, long keepAliveDelayNanos, long keepAliveDelayNanos, long keepAliveTimeoutNanos) {
long keepAliveTimeoutNanos) { this.dynamicParams = dynamicParams;
this.channelType = channelType; this.channelType = channelType;
this.negotiationType = negotiationType; this.negotiationType = negotiationType;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.protocolNegotiator = protocolNegotiator;
this.sslContext = sslContext; this.sslContext = sslContext;
this.flowControlWindow = flowControlWindow; this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize; this.maxMessageSize = maxMessageSize;
@ -373,23 +438,15 @@ public final class NettyChannelBuilder
@Override @Override
public ConnectionClientTransport newClientTransport( public ConnectionClientTransport newClientTransport(
SocketAddress serverAddress, String authority, @Nullable String userAgent) { SocketAddress serverAddress, String authority, @Nullable String userAgent) {
if (closed) { checkState(!closed, "The transport factory is closed.");
throw new IllegalStateException("The transport factory is closed.");
} TransportCreationParamsFilter dparams =
ProtocolNegotiator negotiator = protocolNegotiator != null ? protocolNegotiator : dynamicParams.create(serverAddress, authority, userAgent);
createProtocolNegotiator(authority, negotiationType, sslContext);
return newClientTransport(serverAddress, authority, userAgent, negotiator);
}
ConnectionClientTransport newClientTransport(
SocketAddress serverAddress, String authority, String userAgent,
ProtocolNegotiator negotiator) {
if (closed) {
throw new IllegalStateException("The transport factory is closed.");
}
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelType, channelOptions, group, negotiator, flowControlWindow, dparams.getTargetServerAddress(), channelType, channelOptions, group,
maxMessageSize, maxHeaderListSize, authority, userAgent); dparams.getProtocolNegotiator(negotiationType, sslContext), flowControlWindow,
maxMessageSize, maxHeaderListSize, dparams.getAuthority(), dparams.getUserAgent());
if (enableKeepAlive) { if (enableKeepAlive) {
transport.enableKeepAlive(true, keepAliveDelayNanos, keepAliveTimeoutNanos); transport.enableKeepAlive(true, keepAliveDelayNanos, keepAliveTimeoutNanos);
} }