From 82a79d8f93f7466138dd05c1ca593f837e7699bb Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Mon, 7 Dec 2015 14:40:11 -0800 Subject: [PATCH] Expose compression on ClientCall and Server Call --- core/src/main/java/io/grpc/ClientCall.java | 9 +++++ .../java/io/grpc/DecompressorRegistry.java | 2 +- .../java/io/grpc/ForwardingClientCall.java | 5 +++ .../java/io/grpc/ManagedChannelBuilder.java | 16 ++++++++ core/src/main/java/io/grpc/ServerBuilder.java | 16 ++++++++ .../AbstractManagedChannelImplBuilder.java | 38 ++++++++++++++++--- .../internal/AbstractServerImplBuilder.java | 28 ++++++++++++++ .../java/io/grpc/internal/ClientCallImpl.java | 7 ++++ .../io/grpc/internal/ManagedChannelImpl.java | 14 ++++--- .../grpc/internal/ManagedChannelImplTest.java | 3 +- ...anagedChannelImplTransportManagerTest.java | 6 ++- .../main/java/io/grpc/netty/NettyServer.java | 15 ++++++-- .../io/grpc/netty/NettyServerBuilder.java | 8 +++- .../io/grpc/netty/NettyServerHandler.java | 27 +++++++++++-- .../io/grpc/netty/NettyServerTransport.java | 18 +++++++-- .../grpc/netty/NettyClientTransportTest.java | 5 ++- .../io/grpc/netty/NettyServerHandlerTest.java | 3 ++ 17 files changed, 192 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java index fadd54eb26..f6111a7321 100644 --- a/core/src/main/java/io/grpc/ClientCall.java +++ b/core/src/main/java/io/grpc/ClientCall.java @@ -183,4 +183,13 @@ public abstract class ClientCall { public boolean isReady() { return true; } + + /** + * Enables per-message compression, if an encoding type has been negotiated. If no message + * encoding has been negotiated, this is a no-op. + */ + @ExperimentalApi + public void setMessageCompression(boolean enabled) { + // noop + } } diff --git a/core/src/main/java/io/grpc/DecompressorRegistry.java b/core/src/main/java/io/grpc/DecompressorRegistry.java index 4ed146e395..a11fbf16dd 100644 --- a/core/src/main/java/io/grpc/DecompressorRegistry.java +++ b/core/src/main/java/io/grpc/DecompressorRegistry.java @@ -54,7 +54,7 @@ import javax.annotation.concurrent.ThreadSafe; public final class DecompressorRegistry { private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry( - new DecompressorInfo(new Codec.Gzip(), false), + new DecompressorInfo(new Codec.Gzip(), true), new DecompressorInfo(Codec.Identity.NONE, false)); public static DecompressorRegistry getDefaultInstance() { diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java index 8d85a72dac..5d6f1fea3c 100644 --- a/core/src/main/java/io/grpc/ForwardingClientCall.java +++ b/core/src/main/java/io/grpc/ForwardingClientCall.java @@ -65,6 +65,11 @@ public abstract class ForwardingClientCall extends ClientCall> @ExperimentalApi public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory); + /** + * Set the decompression registry for use in the channel. This is an advanced API call and + * shouldn't be used unless you are using custom message encoding. The default supported + * decompressors are in {@code DecompressorRegistry.getDefaultInstance}. + */ + @ExperimentalApi + public abstract T decompressorRegistry(DecompressorRegistry registry); + + /** + * Set the compression registry for use in the channel. This is an advanced API call and + * shouldn't be used unless you are using custom message encoding. The default supported + * compressors are in {@code CompressorRegistry.getDefaultInstance}. + */ + @ExperimentalApi + public abstract T compressorRegistry(CompressorRegistry registry); + /** * Builds a channel using the given parameters. */ diff --git a/core/src/main/java/io/grpc/ServerBuilder.java b/core/src/main/java/io/grpc/ServerBuilder.java index 047dfa0f94..6af7f53c33 100644 --- a/core/src/main/java/io/grpc/ServerBuilder.java +++ b/core/src/main/java/io/grpc/ServerBuilder.java @@ -87,6 +87,22 @@ public abstract class ServerBuilder> { */ public abstract T useTransportSecurity(File certChain, File privateKey); + /** + * Set the decompression registry for use in the channel. This is an advanced API call and + * shouldn't be used unless you are using custom message encoding. The default supported + * decompressors are in {@code DecompressorRegistry.getDefaultInstance}. + */ + @ExperimentalApi + public abstract T decompressorRegistry(DecompressorRegistry registry); + + /** + * Set the compression registry for use in the channel. This is an advanced API call and + * shouldn't be used unless you are using custom message encoding. The default supported + * compressors are in {@code CompressorRegistry.getDefaultInstance}. + */ + @ExperimentalApi + public abstract T compressorRegistry(CompressorRegistry registry); + /** * Builds a server using the given parameters. * diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index ea79c4ebf3..a610d6cbe1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -31,11 +31,16 @@ package io.grpc.internal; +import static com.google.common.base.MoreObjects.firstNonNull; + import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Attributes; import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; +import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; @@ -82,6 +87,12 @@ public abstract class AbstractManagedChannelImplBuilder @Nullable private LoadBalancer.Factory loadBalancerFactory; + @Nullable + private DecompressorRegistry decompressorRegistry; + + @Nullable + private CompressorRegistry compressorRegistry; + protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target); this.directServerAddress = null; @@ -133,6 +144,20 @@ public abstract class AbstractManagedChannelImplBuilder return thisT(); } + @Override + @ExperimentalApi + public final T decompressorRegistry(DecompressorRegistry registry) { + this.decompressorRegistry = registry; + return thisT(); + } + + @Override + @ExperimentalApi + public final T compressorRegistry(CompressorRegistry registry) { + this.compressorRegistry = registry; + return thisT(); + } + private T thisT() { @SuppressWarnings("unchecked") T thisT = (T) this; @@ -168,12 +193,13 @@ public abstract class AbstractManagedChannelImplBuilder target, // TODO(carl-mastrangelo): Allow clients to pass this in new ExponentialBackoffPolicy.Provider(), - nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry() - : nameResolverFactory, + firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()), getNameResolverParams(), - loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance() - : loadBalancerFactory, - transportFactory, executor, userAgent, interceptors); + firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()), + transportFactory, + firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), + firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), + executor, userAgent, interceptors); } /** @@ -186,7 +212,7 @@ public abstract class AbstractManagedChannelImplBuilder /** * Subclasses can override this method to provide additional parameters to {@link * NameResolver.Factory#newNameResolver}. The default implementation returns {@link - * Attributes.EMPTY}. + * Attributes#EMPTY}. */ protected Attributes getNameResolverParams() { return Attributes.EMPTY; diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 6657734baf..95df85ec42 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -34,7 +34,9 @@ package io.grpc.internal; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.CompressorRegistry; import io.grpc.Context; +import io.grpc.DecompressorRegistry; import io.grpc.HandlerRegistry; import io.grpc.Internal; import io.grpc.MutableHandlerRegistry; @@ -58,6 +60,12 @@ public abstract class AbstractServerImplBuilder extends ClientCall } } + @Override + public void setMessageCompression(boolean enabled) { + checkState(stream != null, "Not started"); + stream.setMessageCompression(enabled); + } + @Override public boolean isReady() { return stream.isReady(); diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 24212f39ac..b473e56e05 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -108,9 +108,8 @@ public final class ManagedChannelImpl extends ManagedChannel { */ private final Set knownAcceptEncodingRegistry = Collections.newSetFromMap(new ConcurrentHashMap()); - private final DecompressorRegistry decompressorRegistry = - DecompressorRegistry.getDefaultInstance(); - private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); + private final DecompressorRegistry decompressorRegistry; + private final CompressorRegistry compressorRegistry; /** * Executor that runs deadline timers for requests. @@ -154,9 +153,10 @@ public final class ManagedChannelImpl extends ManagedChannel { ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams, - LoadBalancer.Factory loadBalancerFactory, - ClientTransportFactory transportFactory, @Nullable Executor executor, - @Nullable String userAgent, List interceptors) { + LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory, + DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, + @Nullable Executor executor, @Nullable String userAgent, + List interceptors) { if (executor == null) { usingSharedExecutor = true; this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR); @@ -171,6 +171,8 @@ public final class ManagedChannelImpl extends ManagedChannel { this.userAgent = userAgent; this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE); + this.decompressorRegistry = decompressorRegistry; + this.compressorRegistry = compressorRegistry; this.nameResolver.start(new NameResolver.Listener() { @Override diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 32208f2d7f..ef3fc936a5 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -134,7 +134,8 @@ public class ManagedChannelImplTest { NameResolver.Factory nameResolverFactory, List interceptors) { return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(), nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory, - mockTransportFactory, executor, null, interceptors); + mockTransportFactory, DecompressorRegistry.getDefaultInstance(), + CompressorRegistry.getDefaultInstance(), executor, null, interceptors); } @Before diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index 171e5eacd2..bc606e93b4 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -48,6 +48,8 @@ import com.google.common.util.concurrent.ListenableFuture; import io.grpc.Attributes; import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.NameResolver; @@ -119,7 +121,9 @@ public class ManagedChannelImplTransportManagerTest { channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider, nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory, - mockTransportFactory, executor, null, Collections.emptyList()); + mockTransportFactory, DecompressorRegistry.getDefaultInstance(), + CompressorRegistry.getDefaultInstance(), executor, null, + Collections.emptyList()); ArgumentCaptor tmCaptor = ArgumentCaptor.forClass(TransportManager.class); verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture()); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index f2e9eb70df..f422dc213b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -35,6 +35,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.internal.Server; import io.grpc.internal.ServerListener; import io.grpc.internal.SharedResourceHolder; @@ -72,6 +74,8 @@ class NettyServer implements Server { private EventLoopGroup workerGroup; private ServerListener listener; private Channel channel; + private final DecompressorRegistry decompressorRegistry; + private final CompressorRegistry compressorRegistry; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -79,7 +83,8 @@ class NettyServer implements Server { NettyServer(SocketAddress address, Class channelType, @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup, - ProtocolNegotiator protocolNegotiator, int maxStreamsPerConnection, + ProtocolNegotiator protocolNegotiator, DecompressorRegistry decompressorRegistry, + CompressorRegistry compressorRegistry, int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); @@ -92,6 +97,8 @@ class NettyServer implements Server { this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; + this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry"); + this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry"); } @Override @@ -113,13 +120,15 @@ class NettyServer implements Server { public void initChannel(Channel ch) throws Exception { eventLoopReferenceCounter.retain(); ch.closeFuture().addListener(new ChannelFutureListener() { + @Override public void operationComplete(ChannelFuture future) { eventLoopReferenceCounter.release(); } }); NettyServerTransport transport - = new NettyServerTransport(ch, protocolNegotiator, maxStreamsPerConnection, - flowControlWindow, maxMessageSize, maxHeaderListSize); + = new NettyServerTransport(ch, protocolNegotiator, decompressorRegistry, + compressorRegistry, maxStreamsPerConnection, flowControlWindow, maxMessageSize, + maxHeaderListSize); transport.start(listener.transportCreated(transport)); } }); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index d4aca99949..449f115a00 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -31,11 +31,14 @@ package io.grpc.netty; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import com.google.common.base.Preconditions; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.ExperimentalApi; import io.grpc.HandlerRegistry; import io.grpc.Internal; @@ -242,7 +245,10 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder= 0, "maxMessageSize must be >= 0"); this.maxMessageSize = maxMessageSize; + this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry"); + this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry"); streamKey = encoder.connection().newKey(); - this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); + this.transportListener = checkNotNull(transportListener, "transportListener"); // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -192,6 +206,11 @@ class NettyServerHandler extends AbstractNettyHandler { NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this, maxMessageSize); + // These must be called before inboundHeadersReceived, because the framers depend on knowing + // the compression algorithms available before negotiation. + stream.setDecompressionRegistry(decompressorRegistry); + stream.setCompressionRegistry(compressorRegistry); + Metadata metadata = Utils.convertHeaders(headers); stream.inboundHeadersReceived(metadata); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index e8e5e393a1..7275c3c5b4 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -31,8 +31,12 @@ package io.grpc.netty; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Preconditions; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.netty.channel.Channel; @@ -51,6 +55,8 @@ class NettyServerTransport implements ServerTransport { private final Channel channel; private final ProtocolNegotiator protocolNegotiator; + private final DecompressorRegistry decompressorRegistry; + private final CompressorRegistry compressorRegistry; private final int maxStreams; private ServerTransportListener listener; private boolean terminated; @@ -58,14 +64,18 @@ class NettyServerTransport implements ServerTransport { private final int maxMessageSize; private final int maxHeaderListSize; - NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, int maxStreams, - int flowControlWindow, int maxMessageSize, int maxHeaderListSize) { + NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, + DecompressorRegistry decompressorRegistry, + CompressorRegistry compressorRegistry, int maxStreams, int flowControlWindow, + int maxMessageSize, int maxHeaderListSize) { this.channel = Preconditions.checkNotNull(channel, "channel"); this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); this.maxStreams = maxStreams; this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; + this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry"); + this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry"); } public void start(ServerTransportListener listener) { @@ -115,7 +125,7 @@ class NettyServerTransport implements ServerTransport { * Creates the Netty handler to be used in the channel pipeline. */ private NettyServerHandler createHandler(ServerTransportListener transportListener) { - return NettyServerHandler.newHandler(transportListener, maxStreams, flowControlWindow, - maxHeaderListSize, maxMessageSize); + return NettyServerHandler.newHandler(transportListener, decompressorRegistry, + compressorRegistry, maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index fb79187877..c2e171f8af 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -43,6 +43,8 @@ import static org.junit.Assert.fail; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.SettableFuture; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.Marshaller; @@ -312,7 +314,8 @@ public class NettyClientTransportTest { .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(serverContext); server = new NettyServer(address, NioServerSocketChannel.class, - group, group, negotiator, maxStreamsPerConnection, + group, group, negotiator, DecompressorRegistry.getDefaultInstance(), + CompressorRegistry.getDefaultInstance(), maxStreamsPerConnection, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize); server.start(serverListener); } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 52ec3fd402..9e4c0bbfd8 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -54,6 +54,8 @@ import static org.mockito.Mockito.when; import com.google.common.io.ByteStreams; +import io.grpc.CompressorRegistry; +import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; @@ -345,6 +347,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase