netty: NettyChannelBuilder extends a public API class

This commit is contained in:
Sergii Tkachenko 2020-09-02 11:18:10 -04:00 committed by Sergii Tkachenko
parent b0f0ed080e
commit c056979671
8 changed files with 161 additions and 55 deletions

View File

@ -27,6 +27,7 @@ import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.okhttp.OkHttpChannelBuilder;
@ -425,7 +426,9 @@ public class TestServiceClient {
if (fullStreamDecompression) {
nettyBuilder.enableFullStreamDecompression();
}
builder = nettyBuilder;
// Disable the default census stats interceptor, use testing interceptor instead.
InternalNettyChannelBuilder.setStatsEnabled(nettyBuilder, false);
return nettyBuilder.intercept(createCensusStatsClientInterceptor());
} else {
OkHttpChannelBuilder okBuilder = OkHttpChannelBuilder.forAddress(serverHost, serverPort);
if (serverHostOverride != null) {

View File

@ -17,6 +17,7 @@
package io.grpc.testing.integration;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
@ -39,7 +40,7 @@ public class AutoWindowSizingOnTest extends AbstractInteropTest {
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.initialFlowControlWindow(NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW);
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor());
}
}

View File

@ -17,6 +17,7 @@
package io.grpc.testing.integration;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
@ -57,7 +58,7 @@ public class Http2NettyLocalChannelTest extends AbstractInteropTest {
.flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor());
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
@ -71,7 +72,7 @@ public class Http2NettyTest extends AbstractInteropTest {
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.build());
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor());
} catch (Exception ex) {
throw new RuntimeException(ex);

View File

@ -37,6 +37,7 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.testing.integration.Messages.BoolValue;
@ -165,7 +166,7 @@ public class TransportCompressionTest extends AbstractInteropTest {
})
.usePlaintext();
// Disable the default census stats interceptor, use testing interceptor instead.
io.grpc.internal.TestingAccessor.setStatsEnabled(builder, false);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
return builder.intercept(createCensusStatsClientInterceptor());
}

View File

@ -31,14 +31,31 @@ public final class InternalNettyChannelBuilder {
/**
* Checks authority upon channel construction. The purpose of this interface is to raise the
* visibility of {@link NettyChannelBuilder.OverrideAuthorityChecker}.
* @deprecated To be removed, use {@link #disableCheckAuthority(NettyChannelBuilder builder)} to
* disable authority check.
*/
@Deprecated
public interface OverrideAuthorityChecker extends NettyChannelBuilder.OverrideAuthorityChecker {}
/**
* Overrides authority checker.
* @deprecated To be removed, use {@link #disableCheckAuthority(NettyChannelBuilder builder)} to
* disable authority check.
*/
@Deprecated
public static void overrideAuthorityChecker(
NettyChannelBuilder channelBuilder, OverrideAuthorityChecker authorityChecker) {
channelBuilder.overrideAuthorityChecker(authorityChecker);
}
public static void disableCheckAuthority(NettyChannelBuilder builder) {
builder.disableCheckAuthority();
}
public static void enableCheckAuthority(NettyChannelBuilder builder) {
builder.enableCheckAuthority();
}
/** A class that provides a Netty handler to control protocol negotiation. */
public interface ProtocolNegotiatorFactory
extends NettyChannelBuilder.ProtocolNegotiatorFactory {
@ -68,6 +85,10 @@ public final class InternalNettyChannelBuilder {
builder.setStatsRecordStartedRpcs(value);
}
public static void setStatsRecordFinishedRpcs(NettyChannelBuilder builder, boolean value) {
builder.setStatsRecordFinishedRpcs(value);
}
public static void setStatsRecordRealTimeMetrics(NettyChannelBuilder builder, boolean value) {
builder.setStatsRecordRealTimeMetrics(value);
}

View File

@ -28,15 +28,19 @@ import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingChannelBuilder;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.Internal;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.ManagedChannelBuilder;
import io.grpc.internal.AtomicBackoff;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ManagedChannelImplBuilder;
import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider;
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
@ -63,8 +67,7 @@ import javax.net.ssl.SSLException;
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
@CanIgnoreReturnValue
public final class NettyChannelBuilder
extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
public final class NettyChannelBuilder extends ForwardingChannelBuilder<NettyChannelBuilder> {
// 1MiB.
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1024 * 1024;
@ -85,16 +88,16 @@ public final class NettyChannelBuilder
DEFAULT_AUTO_FLOW_CONTROL = Boolean.parseBoolean(autoFlowControl);
}
private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
private final ManagedChannelImplBuilder managedChannelImplBuilder;
private TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
private NegotiationType negotiationType = NegotiationType.TLS;
private OverrideAuthorityChecker authorityChecker;
private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY;
private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL;
private SslContext sslContext;
private boolean autoFlowControl = DEFAULT_AUTO_FLOW_CONTROL;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
@ -142,14 +145,39 @@ public final class NettyChannelBuilder
this(GrpcUtil.authorityFromHostAndPort(host, port));
}
private final class NettyChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
@Override
public ClientTransportFactory buildClientTransportFactory() {
return buildTransportFactory();
}
}
private final class NettyChannelDefaultPortProvider implements ChannelBuilderDefaultPortProvider {
@Override
public int getDefaultPort() {
return NettyChannelBuilder.this.getDefaultPort();
}
}
@CheckReturnValue
NettyChannelBuilder(String target) {
super(target);
managedChannelImplBuilder = new ManagedChannelImplBuilder(target,
new NettyChannelTransportFactoryBuilder(),
new NettyChannelDefaultPortProvider());
}
@CheckReturnValue
NettyChannelBuilder(SocketAddress address) {
super(address, getAuthorityFromAddress(address));
managedChannelImplBuilder = new ManagedChannelImplBuilder(address,
getAuthorityFromAddress(address),
new NettyChannelTransportFactoryBuilder(),
new NettyChannelDefaultPortProvider());
}
@Internal
@Override
protected ManagedChannelBuilder<?> delegate() {
return managedChannelImplBuilder;
}
@CheckReturnValue
@ -408,10 +436,20 @@ public final class NettyChannelBuilder
}
}
/**
* Sets the maximum message size allowed for a single gRPC frame. If an inbound messages larger
* than this limit is received it will not be processed and the RPC will fail with
* RESOURCE_EXHAUSTED.
*/
@Override
public NettyChannelBuilder maxInboundMessageSize(int max) {
checkArgument(max >= 0, "negative max");
maxInboundMessageSize = max;
return this;
}
@CheckReturnValue
@Internal
protected ClientTransportFactory buildTransportFactory() {
ClientTransportFactory buildTransportFactory() {
assertEventLoopAndChannelType();
ProtocolNegotiator negotiator;
@ -427,12 +465,12 @@ public final class NettyChannelBuilder
}
}
negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext,
this.getOffloadExecutorPool());
this.managedChannelImplBuilder.getOffloadExecutorPool());
}
return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize(),
eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize,
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
@ -448,9 +486,8 @@ public final class NettyChannelBuilder
"Both EventLoopGroup and ChannelType should be provided or neither should be");
}
@Override
@CheckReturnValue
protected int getDefaultPort() {
int getDefaultPort() {
switch (negotiationType) {
case PLAINTEXT:
case PLAINTEXT_UPGRADE:
@ -462,10 +499,6 @@ public final class NettyChannelBuilder
}
}
void overrideAuthorityChecker(@Nullable OverrideAuthorityChecker authorityChecker) {
this.authorityChecker = authorityChecker;
}
@VisibleForTesting
@CheckReturnValue
static ProtocolNegotiator createProtocolNegotiatorByType(
@ -484,19 +517,22 @@ public final class NettyChannelBuilder
}
}
@CheckReturnValue
interface OverrideAuthorityChecker {
String checkAuthority(String authority);
@Deprecated
interface OverrideAuthorityChecker extends ManagedChannelImplBuilder.OverrideAuthorityChecker {}
@Deprecated
void overrideAuthorityChecker(@Nullable OverrideAuthorityChecker authorityChecker) {
this.managedChannelImplBuilder.overrideAuthorityChecker(authorityChecker);
}
@Override
@CheckReturnValue
@Internal
protected String checkAuthority(String authority) {
if (authorityChecker != null) {
return authorityChecker.checkAuthority(authority);
}
return super.checkAuthority(authority);
NettyChannelBuilder disableCheckAuthority() {
this.managedChannelImplBuilder.disableCheckAuthority();
return this;
}
NettyChannelBuilder enableCheckAuthority() {
this.managedChannelImplBuilder.enableCheckAuthority();
return this;
}
void protocolNegotiatorFactory(ProtocolNegotiatorFactory protocolNegotiatorFactory) {
@ -504,24 +540,24 @@ public final class NettyChannelBuilder
= checkNotNull(protocolNegotiatorFactory, "protocolNegotiatorFactory");
}
@Override
protected void setTracingEnabled(boolean value) {
super.setTracingEnabled(value);
void setTracingEnabled(boolean value) {
this.managedChannelImplBuilder.setTracingEnabled(value);
}
@Override
protected void setStatsEnabled(boolean value) {
super.setStatsEnabled(value);
void setStatsEnabled(boolean value) {
this.managedChannelImplBuilder.setStatsEnabled(value);
}
@Override
protected void setStatsRecordStartedRpcs(boolean value) {
super.setStatsRecordStartedRpcs(value);
void setStatsRecordStartedRpcs(boolean value) {
this.managedChannelImplBuilder.setStatsRecordStartedRpcs(value);
}
@Override
protected void setStatsRecordRealTimeMetrics(boolean value) {
super.setStatsRecordRealTimeMetrics(value);
void setStatsRecordFinishedRpcs(boolean value) {
this.managedChannelImplBuilder.setStatsRecordFinishedRpcs(value);
}
void setStatsRecordRealTimeMetrics(boolean value) {
this.managedChannelImplBuilder.setStatsRecordRealTimeMetrics(value);
}
@VisibleForTesting

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.grpc.ManagedChannel;
import io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.NettyTestUtil.TrackingObjectPoolForTest;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
@ -44,7 +44,7 @@ public class NettyChannelBuilderTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
private final SslContext noSslContext = null;
private void shutdown(ManagedChannel mc) throws Exception {
mc.shutdownNow();
assertTrue(mc.awaitTermination(1, TimeUnit.SECONDS));
@ -92,14 +92,35 @@ public class NettyChannelBuilderTest {
}
@Test
@Deprecated
public void overrideAllowsInvalidAuthority() {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){});
InternalNettyChannelBuilder.overrideAuthorityChecker(builder, new OverrideAuthorityChecker() {
@Override
public String checkAuthority(String authority) {
return authority;
}
});
InternalNettyChannelBuilder.overrideAuthorityChecker(builder,
new io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker() {
@Override
public String checkAuthority(String authority) {
return authority;
}
});
Object unused = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
@Test
@Deprecated
public void overrideFailsInvalidAuthority() {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){});
InternalNettyChannelBuilder.overrideAuthorityChecker(builder,
new io.grpc.netty.InternalNettyChannelBuilder.OverrideAuthorityChecker() {
@Override
public String checkAuthority(String authority) {
return GrpcUtil.checkAuthority(authority);
}
});
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid authority:");
Object unused = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
@ -115,6 +136,27 @@ public class NettyChannelBuilderTest {
builder.overrideAuthority("[invalidauthority");
}
@Test
public void disableCheckAuthorityAllowsInvalidAuthority() {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){})
.disableCheckAuthority();
Object unused = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
@Test
public void enableCheckAuthorityFailOverrideInvalidAuthority() {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){})
.disableCheckAuthority()
.enableCheckAuthority();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid authority:");
builder.overrideAuthority("[invalidauthority");
}
@Test
public void failInvalidAuthority() {
thrown.expect(IllegalArgumentException.class);