Expose compression on ClientCall and Server Call

This commit is contained in:
Carl Mastrangelo 2015-12-07 14:40:11 -08:00
parent 3fef40368d
commit 82a79d8f93
17 changed files with 192 additions and 28 deletions

View File

@ -183,4 +183,13 @@ public abstract class ClientCall<ReqT, RespT> {
public boolean isReady() {
return true;
}
/**
* Enables per-message compression, if an encoding type has been negotiated. If no message
* encoding has been negotiated, this is a no-op.
*/
@ExperimentalApi
public void setMessageCompression(boolean enabled) {
// noop
}
}

View File

@ -54,7 +54,7 @@ import javax.annotation.concurrent.ThreadSafe;
public final class DecompressorRegistry {
private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry(
new DecompressorInfo(new Codec.Gzip(), false),
new DecompressorInfo(new Codec.Gzip(), true),
new DecompressorInfo(Codec.Identity.NONE, false));
public static DecompressorRegistry getDefaultInstance() {

View File

@ -65,6 +65,11 @@ public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT,
delegate().sendMessage(message);
}
@Override
public void setMessageCompression(boolean enabled) {
delegate().setMessageCompression(enabled);
}
@Override
public boolean isReady() {
return delegate().isReady();

View File

@ -163,6 +163,22 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
@ExperimentalApi
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
/**
* Set the decompression registry for use in the channel. This is an advanced API call and
* shouldn't be used unless you are using custom message encoding. The default supported
* decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
*/
@ExperimentalApi
public abstract T decompressorRegistry(DecompressorRegistry registry);
/**
* Set the compression registry for use in the channel. This is an advanced API call and
* shouldn't be used unless you are using custom message encoding. The default supported
* compressors are in {@code CompressorRegistry.getDefaultInstance}.
*/
@ExperimentalApi
public abstract T compressorRegistry(CompressorRegistry registry);
/**
* Builds a channel using the given parameters.
*/

View File

@ -87,6 +87,22 @@ public abstract class ServerBuilder<T extends ServerBuilder<T>> {
*/
public abstract T useTransportSecurity(File certChain, File privateKey);
/**
* Set the decompression registry for use in the channel. This is an advanced API call and
* shouldn't be used unless you are using custom message encoding. The default supported
* decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
*/
@ExperimentalApi
public abstract T decompressorRegistry(DecompressorRegistry registry);
/**
* Set the compression registry for use in the channel. This is an advanced API call and
* shouldn't be used unless you are using custom message encoding. The default supported
* compressors are in {@code CompressorRegistry.getDefaultInstance}.
*/
@ExperimentalApi
public abstract T compressorRegistry(CompressorRegistry registry);
/**
* Builds a server using the given parameters.
*

View File

@ -31,11 +31,16 @@
package io.grpc.internal;
import static com.google.common.base.MoreObjects.firstNonNull;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
@ -82,6 +87,12 @@ public abstract class AbstractManagedChannelImplBuilder
@Nullable
private LoadBalancer.Factory loadBalancerFactory;
@Nullable
private DecompressorRegistry decompressorRegistry;
@Nullable
private CompressorRegistry compressorRegistry;
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target);
this.directServerAddress = null;
@ -133,6 +144,20 @@ public abstract class AbstractManagedChannelImplBuilder
return thisT();
}
@Override
@ExperimentalApi
public final T decompressorRegistry(DecompressorRegistry registry) {
this.decompressorRegistry = registry;
return thisT();
}
@Override
@ExperimentalApi
public final T compressorRegistry(CompressorRegistry registry) {
this.compressorRegistry = registry;
return thisT();
}
private T thisT() {
@SuppressWarnings("unchecked")
T thisT = (T) this;
@ -168,12 +193,13 @@ public abstract class AbstractManagedChannelImplBuilder
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry()
: nameResolverFactory,
firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
getNameResolverParams(),
loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance()
: loadBalancerFactory,
transportFactory, executor, userAgent, interceptors);
firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
transportFactory,
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
executor, userAgent, interceptors);
}
/**
@ -186,7 +212,7 @@ public abstract class AbstractManagedChannelImplBuilder
/**
* Subclasses can override this method to provide additional parameters to {@link
* NameResolver.Factory#newNameResolver}. The default implementation returns {@link
* Attributes.EMPTY}.
* Attributes#EMPTY}.
*/
protected Attributes getNameResolverParams() {
return Attributes.EMPTY;

View File

@ -34,7 +34,9 @@ package io.grpc.internal;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
import io.grpc.MutableHandlerRegistry;
@ -58,6 +60,12 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
@Nullable
private Executor executor;
@Nullable
private DecompressorRegistry decompressorRegistry;
@Nullable
private CompressorRegistry compressorRegistry;
/**
* Constructs using a given handler registry.
*/
@ -98,6 +106,26 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable");
}
@Override
public final T decompressorRegistry(DecompressorRegistry registry) {
decompressorRegistry = registry;
return thisT();
}
protected final DecompressorRegistry decompressorRegistry() {
return decompressorRegistry;
}
@Override
public final T compressorRegistry(CompressorRegistry registry) {
compressorRegistry = registry;
return thisT();
}
protected final CompressorRegistry compressorRegistry() {
return compressorRegistry;
}
@Override
public ServerImpl build() {
io.grpc.internal.Server transportServer = buildTransportServer();

View File

@ -31,6 +31,7 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.addAll;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
@ -337,6 +338,12 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
}
}
@Override
public void setMessageCompression(boolean enabled) {
checkState(stream != null, "Not started");
stream.setMessageCompression(enabled);
}
@Override
public boolean isReady() {
return stream.isReady();

View File

@ -108,9 +108,8 @@ public final class ManagedChannelImpl extends ManagedChannel {
*/
private final Set<String> knownAcceptEncodingRegistry =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance();
private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
/**
* Executor that runs deadline timers for requests.
@ -154,9 +153,10 @@ public final class ManagedChannelImpl extends ManagedChannel {
ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
LoadBalancer.Factory loadBalancerFactory,
ClientTransportFactory transportFactory, @Nullable Executor executor,
@Nullable String userAgent, List<ClientInterceptor> interceptors) {
LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
@Nullable Executor executor, @Nullable String userAgent,
List<ClientInterceptor> interceptors) {
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@ -171,6 +171,8 @@ public final class ManagedChannelImpl extends ManagedChannel {
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.nameResolver.start(new NameResolver.Listener() {
@Override

View File

@ -134,7 +134,8 @@ public class ManagedChannelImplTest {
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
mockTransportFactory, executor, null, interceptors);
mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(), executor, null, interceptors);
}
@Before

View File

@ -48,6 +48,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
@ -119,7 +121,9 @@ public class ManagedChannelImplTransportManagerTest {
channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(), executor, null,
Collections.<ClientInterceptor>emptyList());
ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());

View File

@ -35,6 +35,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.internal.Server;
import io.grpc.internal.ServerListener;
import io.grpc.internal.SharedResourceHolder;
@ -72,6 +74,8 @@ class NettyServer implements Server {
private EventLoopGroup workerGroup;
private ServerListener listener;
private Channel channel;
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
@ -79,7 +83,8 @@ class NettyServer implements Server {
NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, int maxStreamsPerConnection,
ProtocolNegotiator protocolNegotiator, DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry, int maxStreamsPerConnection,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
@ -92,6 +97,8 @@ class NettyServer implements Server {
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
}
@Override
@ -113,13 +120,15 @@ class NettyServer implements Server {
public void initChannel(Channel ch) throws Exception {
eventLoopReferenceCounter.retain();
ch.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
eventLoopReferenceCounter.release();
}
});
NettyServerTransport transport
= new NettyServerTransport(ch, protocolNegotiator, maxStreamsPerConnection,
flowControlWindow, maxMessageSize, maxHeaderListSize);
= new NettyServerTransport(ch, protocolNegotiator, decompressorRegistry,
compressorRegistry, maxStreamsPerConnection, flowControlWindow, maxMessageSize,
maxHeaderListSize);
transport.start(listener.transportCreated(transport));
}
});

View File

@ -31,11 +31,14 @@
package io.grpc.netty;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.base.Preconditions;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
@ -242,7 +245,10 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
ProtocolNegotiators.serverPlaintext();
}
return new NettyServer(address, channelType, bossEventLoopGroup,
workerEventLoopGroup, negotiator, maxConcurrentCallsPerConnection, flowControlWindow,
workerEventLoopGroup, negotiator,
firstNonNull(decompressorRegistry(), DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry(), CompressorRegistry.getDefaultInstance()),
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize);
}

View File

@ -32,6 +32,7 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC;
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
import static io.grpc.netty.Utils.HTTP_METHOD;
@ -42,6 +43,8 @@ import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAU
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
@ -94,6 +97,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
private final int maxMessageSize;
@ -102,6 +107,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private WriteQueue serverWriteQueue;
static NettyServerHandler newHandler(ServerTransportListener transportListener,
DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry,
int maxStreams,
int flowControlWindow,
int maxHeaderListSize,
@ -114,13 +121,15 @@ class NettyServerHandler extends AbstractNettyHandler {
new DefaultHttp2FrameReader(headersDecoder), frameLogger);
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
maxMessageSize);
return newHandler(frameReader, frameWriter, transportListener, decompressorRegistry,
compressorRegistry, maxStreams, flowControlWindow, maxMessageSize);
}
@VisibleForTesting
static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
ServerTransportListener transportListener,
DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry,
int maxStreams,
int flowControlWindow,
int maxMessageSize) {
@ -142,19 +151,24 @@ class NettyServerHandler extends AbstractNettyHandler {
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);
return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
return new NettyServerHandler(transportListener, decoder, encoder, settings,
decompressorRegistry, compressorRegistry, maxMessageSize);
}
private NettyServerHandler(ServerTransportListener transportListener,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2Settings settings,
DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry,
int maxMessageSize) {
super(decoder, encoder, settings);
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize;
this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
streamKey = encoder.connection().newKey();
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
this.transportListener = checkNotNull(transportListener, "transportListener");
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@ -192,6 +206,11 @@ class NettyServerHandler extends AbstractNettyHandler {
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this,
maxMessageSize);
// These must be called before inboundHeadersReceived, because the framers depend on knowing
// the compression algorithms available before negotiation.
stream.setDecompressionRegistry(decompressorRegistry);
stream.setCompressionRegistry(compressorRegistry);
Metadata metadata = Utils.convertHeaders(headers);
stream.inboundHeadersReceived(metadata);

View File

@ -31,8 +31,12 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Preconditions;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.netty.channel.Channel;
@ -51,6 +55,8 @@ class NettyServerTransport implements ServerTransport {
private final Channel channel;
private final ProtocolNegotiator protocolNegotiator;
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
private final int maxStreams;
private ServerTransportListener listener;
private boolean terminated;
@ -58,14 +64,18 @@ class NettyServerTransport implements ServerTransport {
private final int maxMessageSize;
private final int maxHeaderListSize;
NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, int maxStreams,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator,
DecompressorRegistry decompressorRegistry,
CompressorRegistry compressorRegistry, int maxStreams, int flowControlWindow,
int maxMessageSize, int maxHeaderListSize) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.maxStreams = maxStreams;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
}
public void start(ServerTransportListener listener) {
@ -115,7 +125,7 @@ class NettyServerTransport implements ServerTransport {
* Creates the Netty handler to be used in the channel pipeline.
*/
private NettyServerHandler createHandler(ServerTransportListener transportListener) {
return NettyServerHandler.newHandler(transportListener, maxStreams, flowControlWindow,
maxHeaderListSize, maxMessageSize);
return NettyServerHandler.newHandler(transportListener, decompressorRegistry,
compressorRegistry, maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize);
}
}

View File

@ -43,6 +43,8 @@ import static org.junit.Assert.fail;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
@ -312,7 +314,8 @@ public class NettyClientTransportTest {
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(serverContext);
server = new NettyServer(address, NioServerSocketChannel.class,
group, group, negotiator, maxStreamsPerConnection,
group, group, negotiator, DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(), maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize);
server.start(serverListener);
}

View File

@ -54,6 +54,8 @@ import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
@ -345,6 +347,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
@Override
protected NettyServerHandler newHandler() {
return NettyServerHandler.newHandler(frameReader(), frameWriter(), transportListener,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
maxConcurrentStreams, flowControlWindow, DEFAULT_MAX_MESSAGE_SIZE);
}