From cb6cf1ae2fe7e0fb7cb4bcd9f062fbe20822c613 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 16 Dec 2016 10:36:12 -0800 Subject: [PATCH] netty: refactor how internal netty channel works --- .../netty/InternalNettyChannelBuilder.java | 29 +++-- .../io/grpc/netty/NettyChannelBuilder.java | 119 +++++++++++++----- 2 files changed, 107 insertions(+), 41 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java index 1f2e80389c..f0d02d965c 100644 --- a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java @@ -32,9 +32,6 @@ package io.grpc.netty; import io.grpc.Internal; -import io.grpc.internal.ClientTransportFactory; -import io.grpc.internal.ConnectionClientTransport; -import io.grpc.netty.NettyChannelBuilder.NettyTransportFactory; import java.net.SocketAddress; @@ -57,14 +54,26 @@ public final class InternalNettyChannelBuilder { } /** - * Creates a custom client transport that allows overriding the protocol negotiator. + * Interface to create netty dynamic parameters. */ - public static ConnectionClientTransport newClientTransport( - ClientTransportFactory transportFactory, SocketAddress serverAddress, String authority, - String userAgent, ProtocolNegotiator negotiator) { - // Casting to avoid making {@link NettyTransportFactory} public. - return ((NettyTransportFactory) transportFactory) - .newClientTransport(serverAddress, authority, userAgent, negotiator); + public interface TransportCreationParamsFilterFactory + extends NettyChannelBuilder.TransportCreationParamsFilterFactory { + @Override + TransportCreationParamsFilter create( + SocketAddress targetServerAddress, String authority, String userAgent); + } + + /** + * {@link TransportCreationParamsFilter} are those that may depend on late-known information about + * a client transport. This interface can be used to dynamically alter params based on the + * params of {@code ClientTransportFactory#newClientTransport}. + */ + public interface TransportCreationParamsFilter + extends NettyChannelBuilder.TransportCreationParamsFilter {} + + public static void setDynamicTransportParamsFactory( + NettyChannelBuilder builder, TransportCreationParamsFilterFactory factory) { + builder.setDynamicParamsFactory(factory); } private InternalNettyChannelBuilder() {} diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index af3e0e7727..5f656e4501 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -32,7 +32,8 @@ package io.grpc.netty; import static com.google.common.base.Preconditions.checkArgument; - +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_DELAY_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; @@ -85,6 +86,8 @@ public final class NettyChannelBuilder private boolean enableKeepAlive; private long keepAliveDelayNanos; private long keepAliveTimeoutNanos; + private TransportCreationParamsFilterFactory dynamicParamsFactory = + DynamicNettyTransportParams.FACTORY; /** * Creates a new builder with the given server address. This factory method is primarily intended @@ -260,10 +263,9 @@ public final class NettyChannelBuilder @Override protected ClientTransportFactory buildTransportFactory() { - return new NettyTransportFactory( - channelType, channelOptions, negotiationType, null /* protocolNegotiator */, sslContext, - eventLoopGroup, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, - enableKeepAlive, keepAliveDelayNanos, keepAliveTimeoutNanos); + return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions, + negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(), + maxHeaderListSize, enableKeepAlive, keepAliveDelayNanos, keepAliveTimeoutNanos); } @Override @@ -324,36 +326,99 @@ public final class NettyChannelBuilder return super.checkAuthority(authority); } + void setDynamicParamsFactory(TransportCreationParamsFilterFactory factory) { + this.dynamicParamsFactory = checkNotNull(factory, "factory"); + } + + interface TransportCreationParamsFilterFactory { + TransportCreationParamsFilter create( + SocketAddress targetServerAddress, String authority, String userAgent); + } + + interface TransportCreationParamsFilter { + SocketAddress getTargetServerAddress(); + + String getAuthority(); + + @Nullable String getUserAgent(); + + ProtocolNegotiator getProtocolNegotiator( + NegotiationType negotiationType, SslContext sslContext); + } + + private static final class DynamicNettyTransportParams implements TransportCreationParamsFilter { + private static final TransportCreationParamsFilterFactory FACTORY = + new TransportCreationParamsFilterFactory() { + + @Override + public TransportCreationParamsFilter create( + SocketAddress targetServerAddress, String authority, String userAgent) { + return new DynamicNettyTransportParams(targetServerAddress, authority, userAgent); + } + }; + + private final SocketAddress targetServerAddress; + private final String authority; + @Nullable private final String userAgent; + + private DynamicNettyTransportParams( + SocketAddress targetServerAddress, String authority, String userAgent) { + this.targetServerAddress = targetServerAddress; + this.authority = authority; + this.userAgent = userAgent; + } + + @Override + public SocketAddress getTargetServerAddress() { + return targetServerAddress; + } + + @Override + public String getAuthority() { + return authority; + } + + @Override + public String getUserAgent() { + return userAgent; + } + + @Override + public ProtocolNegotiator getProtocolNegotiator( + NegotiationType negotiationType, SslContext sslContext) { + return createProtocolNegotiator(authority, negotiationType, sslContext); + } + } + /** * Creates Netty transports. Exposed for internal use, as it should be private. */ - static class NettyTransportFactory implements ClientTransportFactory { + private static final class NettyTransportFactory implements ClientTransportFactory { + private final TransportCreationParamsFilterFactory dynamicParams; private final Class channelType; private final Map, ?> channelOptions; private final NegotiationType negotiationType; - private final ProtocolNegotiator protocolNegotiator; private final SslContext sslContext; private final EventLoopGroup group; private final boolean usingSharedGroup; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; - private boolean enableKeepAlive; - private long keepAliveDelayNanos; - private long keepAliveTimeoutNanos; + private final boolean enableKeepAlive; + private final long keepAliveDelayNanos; + private final long keepAliveTimeoutNanos; private boolean closed; - private NettyTransportFactory( + NettyTransportFactory(TransportCreationParamsFilterFactory dynamicParams, Class channelType, Map, ?> channelOptions, - NegotiationType negotiationType, ProtocolNegotiator protocolNegotiator, - SslContext sslContext, EventLoopGroup group, int flowControlWindow, int maxMessageSize, - int maxHeaderListSize, boolean enableKeepAlive, long keepAliveDelayNanos, - long keepAliveTimeoutNanos) { + NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group, + int flowControlWindow, int maxMessageSize, int maxHeaderListSize, boolean enableKeepAlive, + long keepAliveDelayNanos, long keepAliveTimeoutNanos) { + this.dynamicParams = dynamicParams; this.channelType = channelType; this.negotiationType = negotiationType; this.channelOptions = new HashMap, Object>(channelOptions); - this.protocolNegotiator = protocolNegotiator; this.sslContext = sslContext; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; @@ -373,23 +438,15 @@ public final class NettyChannelBuilder @Override public ConnectionClientTransport newClientTransport( SocketAddress serverAddress, String authority, @Nullable String userAgent) { - if (closed) { - throw new IllegalStateException("The transport factory is closed."); - } - ProtocolNegotiator negotiator = protocolNegotiator != null ? protocolNegotiator : - createProtocolNegotiator(authority, negotiationType, sslContext); - return newClientTransport(serverAddress, authority, userAgent, negotiator); - } + checkState(!closed, "The transport factory is closed."); + + TransportCreationParamsFilter dparams = + dynamicParams.create(serverAddress, authority, userAgent); - ConnectionClientTransport newClientTransport( - SocketAddress serverAddress, String authority, String userAgent, - ProtocolNegotiator negotiator) { - if (closed) { - throw new IllegalStateException("The transport factory is closed."); - } NettyClientTransport transport = new NettyClientTransport( - serverAddress, channelType, channelOptions, group, negotiator, flowControlWindow, - maxMessageSize, maxHeaderListSize, authority, userAgent); + dparams.getTargetServerAddress(), channelType, channelOptions, group, + dparams.getProtocolNegotiator(negotiationType, sslContext), flowControlWindow, + maxMessageSize, maxHeaderListSize, dparams.getAuthority(), dparams.getUserAgent()); if (enableKeepAlive) { transport.enableKeepAlive(true, keepAliveDelayNanos, keepAliveTimeoutNanos); }