From 529b14c07b1aae1325386c51e6084c4e4ec52a64 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Thu, 3 Dec 2015 15:26:39 -0800 Subject: [PATCH] Add compressor registry, and auto negotiate compression --- core/src/main/java/io/grpc/CallOptions.java | 25 ------- .../main/java/io/grpc/CompressorRegistry.java | 73 +++++++++++++++++++ .../io/grpc/inprocess/InProcessTransport.java | 29 ++++---- .../grpc/internal/AbstractServerStream.java | 5 ++ .../java/io/grpc/internal/AbstractStream.java | 35 +++++---- .../java/io/grpc/internal/ClientCallImpl.java | 54 +++++++++++--- .../java/io/grpc/internal/DelayedStream.java | 29 ++++++-- .../main/java/io/grpc/internal/GrpcUtil.java | 3 + .../io/grpc/internal/ManagedChannelImpl.java | 45 +++++------- .../io/grpc/internal/NoopClientStream.java | 13 ++-- .../main/java/io/grpc/internal/Stream.java | 23 ++++-- .../test/java/io/grpc/CallOptionsTest.java | 30 +------- .../io/grpc/internal/ClientCallImplTest.java | 47 +++++++----- .../io/grpc/internal/DelayedStreamTest.java | 22 ++---- .../grpc/internal/ManagedChannelImplTest.java | 2 + .../CompressingHelloWorldClient.java | 3 +- .../main/java/io/grpc/stub/AbstractStub.java | 10 --- 17 files changed, 270 insertions(+), 178 deletions(-) create mode 100644 core/src/main/java/io/grpc/CompressorRegistry.java diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java index b7ba679c29..e5a1136575 100644 --- a/core/src/main/java/io/grpc/CallOptions.java +++ b/core/src/main/java/io/grpc/CallOptions.java @@ -55,10 +55,6 @@ public final class CallOptions { // unnamed arguments, which is undesirable. private Long deadlineNanoTime; - - @Nullable - private Compressor compressor; - @Nullable private String authority; @@ -114,25 +110,6 @@ public final class CallOptions { return deadlineNanoTime; } - /** - * Returns the compressor, or {@code null} if none is set. - */ - @Nullable - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/492") - public Compressor getCompressor() { - return compressor; - } - - /** - * Use the desired compression. - */ - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/492") - public CallOptions withCompressor(@Nullable Compressor compressor) { - CallOptions newOptions = new CallOptions(this); - newOptions.compressor = compressor; - return newOptions; - } - /** * Returns a new {@code CallOptions} with a request key for affinity-based routing. */ @@ -175,7 +152,6 @@ public final class CallOptions { */ private CallOptions(CallOptions other) { deadlineNanoTime = other.deadlineNanoTime; - compressor = other.compressor; authority = other.authority; requestKey = other.requestKey; } @@ -188,7 +164,6 @@ public final class CallOptions { long remainingNanos = deadlineNanoTime - System.nanoTime(); toStringHelper.addValue(remainingNanos + " ns from now"); } - toStringHelper.add("compressor", compressor); toStringHelper.add("authority", authority); return toStringHelper.toString(); diff --git a/core/src/main/java/io/grpc/CompressorRegistry.java b/core/src/main/java/io/grpc/CompressorRegistry.java new file mode 100644 index 0000000000..0eda388c7e --- /dev/null +++ b/core/src/main/java/io/grpc/CompressorRegistry.java @@ -0,0 +1,73 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Encloses classes related to the compression and decompression of messages. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/492") +@ThreadSafe +public final class CompressorRegistry { + private static final CompressorRegistry DEFAULT_INSTANCE = new CompressorRegistry( + new Codec.Gzip()); + + public static CompressorRegistry getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + public static CompressorRegistry newEmptyInstance() { + return new CompressorRegistry(); + } + + private final ConcurrentMap compressors; + + @VisibleForTesting + CompressorRegistry(Compressor ...cs) { + compressors = new ConcurrentHashMap(); + for (Compressor c : cs) { + compressors.put(c.getMessageEncoding(), c); + } + } + + @Nullable + public Compressor lookupCompressor(String compressorName) { + return compressors.get(compressorName); + } +} diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 6971d1e894..c50e880911 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -31,7 +31,7 @@ package io.grpc.inprocess; -import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -215,12 +215,6 @@ class InProcessTransport implements ServerTransport, ClientTransport { clientStreamListener = listener; } - @Override - public void setCompressor(Compressor c) { - // I don't *think* there is any good reason to do this, so just throw away the compressor - // intentional nop - } - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} @@ -334,6 +328,12 @@ class InProcessTransport implements ServerTransport, ClientTransport { public void setMessageCompression(boolean enable) { // noop } + + @Override + public void pickCompressor(Iterable messageEncodings) {} + + @Override + public void setCompressionRegistry(CompressorRegistry registry) {} } private class InProcessClientStream implements ClientStream { @@ -440,18 +440,17 @@ class InProcessTransport implements ServerTransport, ClientTransport { } } - @Override - public void setCompressor(Compressor c) { - // nop - } - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} @Override - public void setMessageCompression(boolean enable) { - // noop - } + public void setMessageCompression(boolean enable) {} + + @Override + public void pickCompressor(Iterable messageEncodings) {} + + @Override + public void setCompressionRegistry(CompressorRegistry registry) {} } } } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index 67c63d5ae1..242ce0a736 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -32,6 +32,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER; import com.google.common.base.Preconditions; @@ -147,6 +148,10 @@ public abstract class AbstractServerStream extends AbstractStream return; } } + if (headers.containsKey(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)) { + pickCompressor( + ACCEPT_ENCODING_SPLITER.split(headers.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY))); + } inboundPhase(Phase.MESSAGE); } diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 33aae2c5bc..23eb082855 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -40,6 +40,7 @@ import com.google.common.base.MoreObjects; import io.grpc.Codec; import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; @@ -102,6 +103,8 @@ public abstract class AbstractStream implements Stream { private final Object onReadyLock = new Object(); private volatile DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); + private volatile CompressorRegistry compressorRegistry = + CompressorRegistry.getDefaultInstance(); @VisibleForTesting class FramerSink implements MessageFramer.Sink { @@ -302,15 +305,6 @@ public abstract class AbstractStream implements Stream { } } - /** - * Set the decompressor for this stream. This may be called at most once. Typically this is set - * after the message encoding header is provided by the remote host, but before any messages are - * received. - */ - protected final void setDecompressor(Decompressor d) { - deframer.setDecompressor(d); - } - /** * Looks up the decompressor by its message encoding name, and sets it for this stream. * Decompressors are registered with {@link DecompressorRegistry#register}. @@ -318,11 +312,11 @@ public abstract class AbstractStream implements Stream { * @param messageEncoding the name of the encoding provided by the remote host * @throws IllegalArgumentException if the provided message encoding cannot be found. */ - public final void setDecompressor(String messageEncoding) { + protected final void setDecompressor(String messageEncoding) { Decompressor d = decompressorRegistry.lookupDecompressor(messageEncoding); checkArgument(d != null, "Unable to find decompressor for message encoding %s", messageEncoding); - setDecompressor(d); + deframer.setDecompressor(d); } @Override @@ -331,10 +325,21 @@ public abstract class AbstractStream implements Stream { } @Override - public void setCompressor(Compressor c) { - // TODO(carl-mastrangelo): check that headers haven't already been sent. I can't find where - // the client stream changes outbound phase correctly, so I am ignoring it. - framer.setCompressor(c); + public final void setCompressionRegistry(CompressorRegistry registry) { + compressorRegistry = checkNotNull(registry); + } + + @Override + public final void pickCompressor(Iterable messageEncodings) { + for (String messageEncoding : messageEncodings) { + Compressor c = compressorRegistry.lookupCompressor(messageEncoding); + if (c != null) { + // TODO(carl-mastrangelo): check that headers haven't already been sent. I can't find where + // the client stream changes outbound phase correctly, so I am ignoring it. + framer.setCompressor(c); + break; + } + } } /** diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 7aed8a8742..505ed545d8 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -31,8 +31,11 @@ package io.grpc.internal; +import static com.google.common.collect.Iterables.addAll; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER; import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY; +import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; @@ -50,6 +53,7 @@ import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Codec; import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.Context; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; @@ -58,6 +62,8 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import java.io.InputStream; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -82,7 +88,9 @@ final class ClientCallImpl extends ClientCall private final ClientTransportProvider clientTransportProvider; private String userAgent; private ScheduledExecutorService deadlineCancellationExecutor; + private Set knownMessageEncodingRegistry; private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); + private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); ClientCallImpl(MethodDescriptor method, Executor executor, CallOptions callOptions, ClientTransportProvider clientTransportProvider, @@ -129,9 +137,24 @@ final class ClientCallImpl extends ClientCall return this; } + ClientCallImpl setCompressorRegistry(CompressorRegistry compressorRegistry) { + this.compressorRegistry = compressorRegistry; + return this; + } + + /** + * Sets encodings known to be supported by the server. This set MUST be thread safe, and MAY be + * modified by any code as it learns about new supported encodings. + */ + ClientCallImpl setKnownMessageEncodingRegistry(Set knownMessageEncodings) { + this.knownMessageEncodingRegistry = knownMessageEncodings; + return this; + } + @VisibleForTesting static void prepareHeaders(Metadata headers, CallOptions callOptions, String userAgent, - DecompressorRegistry decompressorRegistry) { + Set knownMessageEncodings, DecompressorRegistry decompressorRegistry, + CompressorRegistry compressorRegistry) { // Hack to propagate authority. This should be properly pass to the transport.newStream // somehow. headers.removeAll(AUTHORITY_KEY); @@ -146,16 +169,19 @@ final class ClientCallImpl extends ClientCall } headers.removeAll(MESSAGE_ENCODING_KEY); - Compressor compressor = callOptions.getCompressor(); - if (compressor != null && compressor != Codec.Identity.NONE) { - headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); + for (String messageEncoding : knownMessageEncodings) { + Compressor compressor = compressorRegistry.lookupCompressor(messageEncoding); + if (compressor != null && compressor != Codec.Identity.NONE) { + headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); + break; + } } - headers.removeAll(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY); + headers.removeAll(MESSAGE_ACCEPT_ENCODING_KEY); if (!decompressorRegistry.getAdvertisedMessageEncodings().isEmpty()) { String acceptEncoding = Joiner.on(',').join(decompressorRegistry.getAdvertisedMessageEncodings()); - headers.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, acceptEncoding); + headers.put(MESSAGE_ACCEPT_ENCODING_KEY, acceptEncoding); } } @@ -175,7 +201,8 @@ final class ClientCallImpl extends ClientCall }); return; } - prepareHeaders(headers, callOptions, userAgent, decompressorRegistry); + prepareHeaders(headers, callOptions, userAgent, + knownMessageEncodingRegistry, decompressorRegistry, compressorRegistry); ClientStreamListener listener = new ClientStreamListenerImpl(observer); ListenableFuture transportFuture = clientTransportProvider.get(callOptions); @@ -203,9 +230,9 @@ final class ClientCallImpl extends ClientCall } stream.setDecompressionRegistry(decompressorRegistry); - Compressor compressor = callOptions.getCompressor(); - if (compressor != null) { - stream.setCompressor(compressor); + stream.setCompressionRegistry(compressorRegistry); + if (headers.containsKey(MESSAGE_ENCODING_KEY)) { + stream.pickCompressor(Collections.singleton(headers.get(MESSAGE_ENCODING_KEY))); // TODO(carl-mastrangelo): move this to ClientCall. stream.setMessageCompression(true); } @@ -337,6 +364,13 @@ final class ClientCallImpl extends ClientCall @Override public void headersRead(final Metadata headers) { + if (headers.containsKey(MESSAGE_ACCEPT_ENCODING_KEY)) { + // TODO(carl-mastrangelo): after the first time we contact the server, it almost certainly + // won't change. It might be possible to recover performance by not adding to the known + // encodings if it isn't empty. + String serverAcceptEncodings = headers.get(MESSAGE_ACCEPT_ENCODING_KEY); + addAll(knownMessageEncodingRegistry, ACCEPT_ENCODING_SPLITER.split(serverAcceptEncodings)); + } callExecutor.execute(new ContextRunnable(context) { @Override public final void runInContext() { diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 616cce6573..5343fb475c 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -34,6 +34,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkState; import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.Status; @@ -61,6 +62,9 @@ class DelayedStream implements ClientStream { @GuardedBy("this") private Compressor compressor; + + @GuardedBy("this") + private Iterable compressionMessageEncodings; // Can be either a Decompressor or a String @GuardedBy("this") private Object decompressor; @@ -75,6 +79,8 @@ class DelayedStream implements ClientStream { private int pendingFlowControlRequests; @GuardedBy("this") private boolean pendingFlush; + @GuardedBy("lock") + private CompressorRegistry compressionRegistry; static final class PendingMessage { final InputStream message; @@ -104,12 +110,15 @@ class DelayedStream implements ClientStream { } checkState(realStream == null, "Stream already created: %s", realStream); realStream = stream; - if (compressor != null) { - realStream.setCompressor(compressor); + if (compressionMessageEncodings != null) { + realStream.pickCompressor(compressionMessageEncodings); } if (this.decompressionRegistry != null) { realStream.setDecompressionRegistry(this.decompressionRegistry); } + if (this.compressionRegistry != null) { + realStream.setCompressionRegistry(this.compressionRegistry); + } for (PendingMessage message : pendingMessages) { realStream.setMessageCompression(message.shouldBeCompressed); realStream.writeMessage(message.message); @@ -206,11 +215,21 @@ class DelayedStream implements ClientStream { } @Override - public void setCompressor(Compressor c) { + public void pickCompressor(Iterable messageEncodings) { synchronized (this) { - compressor = c; + compressionMessageEncodings = messageEncodings; if (realStream != null) { - realStream.setCompressor(c); + realStream.pickCompressor(messageEncodings); + } + } + } + + @Override + public void setCompressionRegistry(CompressorRegistry registry) { + synchronized (this) { + this.compressionRegistry = registry; + if (realStream != null) { + realStream.setCompressionRegistry(registry); } } } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 1227510575..0214ba3f81 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -37,6 +37,7 @@ import static io.grpc.Status.Code.DEADLINE_EXCEEDED; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.Metadata; @@ -156,6 +157,8 @@ public final class GrpcUtil { public static final Set CANCEL_REASONS = EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN); + public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults(); + /** * Maps HTTP error response status codes to transport codes. */ diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 4001edfd10..24212f39ac 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -44,11 +44,9 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; -import io.grpc.Codec; -import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.EquivalentAddressGroup; -import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; @@ -62,9 +60,12 @@ import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -95,8 +96,21 @@ public final class ManagedChannelImpl extends ManagedChannel { private final String userAgent; private final Object lock = new Object(); + /* Compression related */ + /** + * When a client connects to a server, it does not know what encodings are supported. This set + * is the union of all accept-encoding headers the server has sent. It is used to pick an + * encoding when contacting the server again. One problem with the gRPC protocol is that if + * there is only one RPC made (perhaps streaming, or otherwise long lived) an encoding will not + * be selected. To combat this you can preflight a request to the server to fill in the mapping + * for the next one. A better solution is if you have prior knowledge that the server supports + * an encoding, and fill this structure before the request. + */ + private final Set knownAcceptEncodingRegistry = + Collections.newSetFromMap(new ConcurrentHashMap()); private final DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); + private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); /** * Executor that runs deadline timers for requests. @@ -126,8 +140,6 @@ public final class ManagedChannelImpl extends ManagedChannel { @GuardedBy("lock") private boolean terminated; - private volatile Compressor defaultCompressor; - private final ClientTransportProvider transportProvider = new ClientTransportProvider() { @Override public ListenableFuture get(CallOptions callOptions) { @@ -220,21 +232,6 @@ public final class ManagedChannelImpl extends ManagedChannel { target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors.toString() + ")" : "")); } - - /** - * Sets the default compression method for this Channel. By default, new calls will use the - * provided compressor. Each individual Call can override this by specifying it in CallOptions. - * If the remote host does not support the message encoding, the call will likely break. There - * is currently no provided way to discover what message encodings the remote host supports. - * @param c The compressor to use. If {@code null} no compression will by performed. This is - * equivalent to using {@code Codec.Identity.NONE}. If not null, the Compressor must be - * threadsafe. - */ - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/492") - public void setDefaultCompressor(@Nullable Compressor c) { - defaultCompressor = (c != null) ? c : Codec.Identity.NONE; - } - /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. @@ -311,10 +308,6 @@ public final class ManagedChannelImpl extends ManagedChannel { @Override public ClientCall newCall(MethodDescriptor method, CallOptions callOptions) { - boolean hasCodecOverride = callOptions.getCompressor() != null; - if (!hasCodecOverride && defaultCompressor != Codec.Identity.NONE) { - callOptions = callOptions.withCompressor(defaultCompressor); - } return interceptorChannel.newCall(method, callOptions); } @@ -334,7 +327,9 @@ public final class ManagedChannelImpl extends ManagedChannel { transportProvider, scheduledExecutor) .setUserAgent(userAgent) - .setDecompressorRegistry(decompressorRegistry); + .setDecompressorRegistry(decompressorRegistry) + .setCompressorRegistry(compressorRegistry) + .setKnownMessageEncodingRegistry(knownAcceptEncodingRegistry); } @Override diff --git a/core/src/main/java/io/grpc/internal/NoopClientStream.java b/core/src/main/java/io/grpc/internal/NoopClientStream.java index 4194aae6f0..0e2db85a4e 100644 --- a/core/src/main/java/io/grpc/internal/NoopClientStream.java +++ b/core/src/main/java/io/grpc/internal/NoopClientStream.java @@ -31,7 +31,7 @@ package io.grpc.internal; -import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.Status; @@ -63,11 +63,6 @@ public class NoopClientStream implements ClientStream { @Override public void halfClose() {} - @Override - public void setCompressor(Compressor c) { - // very much a nop - } - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} @@ -75,4 +70,10 @@ public class NoopClientStream implements ClientStream { public void setMessageCompression(boolean enable) { // noop } + + @Override + public void pickCompressor(Iterable messageEncodings) {} + + @Override + public void setCompressionRegistry(CompressorRegistry registry) {} } diff --git a/core/src/main/java/io/grpc/internal/Stream.java b/core/src/main/java/io/grpc/internal/Stream.java index beb6f3f4de..8fac4b24e9 100644 --- a/core/src/main/java/io/grpc/internal/Stream.java +++ b/core/src/main/java/io/grpc/internal/Stream.java @@ -31,7 +31,7 @@ package io.grpc.internal; -import io.grpc.Compressor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import java.io.InputStream; @@ -80,10 +80,13 @@ public interface Stream { boolean isReady(); /** - * Sets the default message encoder for messages on this stream. - * @param c the compressor + * Picks a compressor for for this stream. If no message encodings are acceptable, compression is + * not used. + * + * @param messageEncodings a group of message encoding names that the remote endpoint is known + * to support. */ - void setCompressor(Compressor c); + void pickCompressor(Iterable messageEncodings); /** * Enables per-message compression, if an encoding type has been negotiated. If no message @@ -92,7 +95,7 @@ public interface Stream { void setMessageCompression(boolean enable); /** - * Sets the decompressor registry to use when resolving {@link #setDecompressor(String)}. If + * Sets the decompressor registry to use when resolving {@code #setDecompressor(String)}. If * unset, the default DecompressorRegistry will be used. * * @see DecompressorRegistry#getDefaultInstance() @@ -100,4 +103,14 @@ public interface Stream { * @param registry the decompressors to use. */ void setDecompressionRegistry(DecompressorRegistry registry); + + /** + * Sets the compressor registry to use when resolving {@link #pickCompressor}. If + * unset, the default CompressorRegistry will be used. + * + * @see CompressorRegistry#getDefaultInstance() + * + * @param registry the compressors to use. + */ + void setCompressionRegistry(CompressorRegistry registry); } diff --git a/core/src/test/java/io/grpc/CallOptionsTest.java b/core/src/test/java/io/grpc/CallOptionsTest.java index 5e917e4188..b73b623a48 100644 --- a/core/src/test/java/io/grpc/CallOptionsTest.java +++ b/core/src/test/java/io/grpc/CallOptionsTest.java @@ -42,7 +42,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.OutputStream; import java.util.concurrent.TimeUnit; /** Unit tests for {@link CallOptions}. */ @@ -50,18 +49,15 @@ import java.util.concurrent.TimeUnit; public class CallOptionsTest { private String sampleAuthority = "authority"; private Long sampleDeadlineNanoTime = 1L; - private Compressor sampleCompressor = new Codec.Gzip(); private RequestKey sampleRequestKey = new RequestKey(); private CallOptions allSet = CallOptions.DEFAULT .withAuthority(sampleAuthority) .withDeadlineNanoTime(sampleDeadlineNanoTime) - .withCompressor(sampleCompressor) .withRequestKey(sampleRequestKey); @Test public void defaultsAreAllNull() { assertNull(CallOptions.DEFAULT.getDeadlineNanoTime()); - assertNull(CallOptions.DEFAULT.getCompressor()); assertNull(CallOptions.DEFAULT.getAuthority()); assertNull(CallOptions.DEFAULT.getRequestKey()); } @@ -70,7 +66,6 @@ public class CallOptionsTest { public void allWiths() { assertSame(sampleAuthority, allSet.getAuthority()); assertSame(sampleDeadlineNanoTime, allSet.getDeadlineNanoTime()); - assertSame(sampleCompressor, allSet.getCompressor()); assertSame(sampleRequestKey, allSet.getRequestKey()); } @@ -80,8 +75,6 @@ public class CallOptionsTest { allSet.withAuthority("blah").withAuthority(sampleAuthority))); assertTrue(equal(allSet, allSet.withDeadlineNanoTime(314L).withDeadlineNanoTime(sampleDeadlineNanoTime))); - assertTrue(equal(allSet, - allSet.withCompressor(Codec.Identity.NONE).withCompressor(sampleCompressor))); assertTrue(equal(allSet, allSet.withRequestKey(new RequestKey()).withRequestKey(sampleRequestKey))); } @@ -109,33 +102,16 @@ public class CallOptionsTest { @Test public void testToString() { - Compressor gzip = new Compressor() { - @Override - public String toString() { - return "GziP"; - } - - @Override - public String getMessageEncoding() { - throw new UnsupportedOperationException(); - } - - @Override - public OutputStream compress(OutputStream os) { - throw new UnsupportedOperationException(); - } - }; - assertEquals("CallOptions{deadlineNanoTime=null, compressor=null, authority=null}", + assertEquals("CallOptions{deadlineNanoTime=null, authority=null}", CallOptions.DEFAULT.toString()); // Deadline makes it hard to check string for equality. - assertEquals("CallOptions{deadlineNanoTime=null, compressor=GziP, authority=authority}", - allSet.withCompressor(gzip).withDeadlineNanoTime(null).toString()); + assertEquals("CallOptions{deadlineNanoTime=null, authority=authority}", + allSet.withDeadlineNanoTime(null).toString()); assertTrue(allSet.toString().contains("deadlineNanoTime=" + sampleDeadlineNanoTime + ",")); } private static boolean equal(CallOptions o1, CallOptions o2) { return Objects.equal(o1.getDeadlineNanoTime(), o2.getDeadlineNanoTime()) - && Objects.equal(o1.getCompressor(), o2.getCompressor()) && Objects.equal(o1.getAuthority(), o2.getAuthority()) && Objects.equal(o1.getRequestKey(), o2.getRequestKey()); } diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 324bcc8113..de23c069cc 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -31,6 +31,7 @@ package io.grpc.internal; +import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -48,7 +49,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -58,6 +58,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Codec; +import io.grpc.CompressorRegistry; import io.grpc.Context; import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; @@ -82,6 +83,7 @@ import org.mockito.MockitoAnnotations; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -103,6 +105,8 @@ public class ClientCallImplTest { private final ScheduledExecutorService deadlineCancellationExecutor = Executors.newScheduledThreadPool(0); + private final Set knownMessageEncodings = new HashSet(); + private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance(); private final DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private final MethodDescriptor method = MethodDescriptor.create( @@ -164,7 +168,9 @@ public class ClientCallImplTest { CallOptions.DEFAULT, provider, deadlineCancellationExecutor) - .setDecompressorRegistry(decompressorRegistry); + .setDecompressorRegistry(decompressorRegistry) + .setCompressorRegistry(compressorRegistry) + .setKnownMessageEncodingRegistry(knownMessageEncodings); call.start(new TestClientCallListener(), new Metadata()); @@ -182,8 +188,8 @@ public class ClientCallImplTest { public void prepareHeaders_authorityAdded() { Metadata m = new Metadata(); CallOptions callOptions = CallOptions.DEFAULT.withAuthority("auth"); - ClientCallImpl.prepareHeaders( - m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance()); + ClientCallImpl.prepareHeaders(m, callOptions, "user agent", knownMessageEncodings, + decompressorRegistry, compressorRegistry); assertEquals(m.get(GrpcUtil.AUTHORITY_KEY), "auth"); } @@ -191,8 +197,8 @@ public class ClientCallImplTest { @Test public void prepareHeaders_userAgentAdded() { Metadata m = new Metadata(); - ClientCallImpl.prepareHeaders( - m, CallOptions.DEFAULT, "user agent", DecompressorRegistry.getDefaultInstance()); + ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings, + decompressorRegistry, compressorRegistry); assertEquals(m.get(GrpcUtil.USER_AGENT_KEY), "user agent"); } @@ -200,9 +206,9 @@ public class ClientCallImplTest { @Test public void prepareHeaders_messageEncodingAdded() { Metadata m = new Metadata(); - CallOptions callOptions = CallOptions.DEFAULT.withCompressor(new Codec.Gzip()); - ClientCallImpl.prepareHeaders( - m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance()); + knownMessageEncodings.add(new Codec.Gzip().getMessageEncoding()); + ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings, + decompressorRegistry, compressorRegistry); assertEquals(m.get(GrpcUtil.MESSAGE_ENCODING_KEY), new Codec.Gzip().getMessageEncoding()); } @@ -210,9 +216,9 @@ public class ClientCallImplTest { @Test public void prepareHeaders_ignoreIdentityEncoding() { Metadata m = new Metadata(); - CallOptions callOptions = CallOptions.DEFAULT.withCompressor(Codec.Identity.NONE); - ClientCallImpl.prepareHeaders( - m, callOptions, "user agent", DecompressorRegistry.getDefaultInstance()); + knownMessageEncodings.add(Codec.Identity.NONE.getMessageEncoding()); + ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings, + decompressorRegistry, compressorRegistry); assertNull(m.get(GrpcUtil.MESSAGE_ENCODING_KEY)); } @@ -255,10 +261,11 @@ public class ClientCallImplTest { } }, false); // not advertised - ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", customRegistry); + ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, "user agent", knownMessageEncodings, + customRegistry, compressorRegistry); Iterable acceptedEncodings = - Splitter.on(',').split(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)); + ACCEPT_ENCODING_SPLITER.split(m.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY)); // Order may be different, since decoder priorities have not yet been implemented. assertEquals(ImmutableSet.of("b", "a"), ImmutableSet.copyOf(acceptedEncodings)); @@ -272,8 +279,8 @@ public class ClientCallImplTest { m.put(GrpcUtil.MESSAGE_ENCODING_KEY, "gzip"); m.put(GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY, "gzip"); - ClientCallImpl.prepareHeaders( - m, CallOptions.DEFAULT, null, DecompressorRegistry.newEmptyInstance()); + ClientCallImpl.prepareHeaders(m, CallOptions.DEFAULT, null, knownMessageEncodings, + DecompressorRegistry.newEmptyInstance(), compressorRegistry); assertNull(m.get(GrpcUtil.AUTHORITY_KEY)); assertNull(m.get(GrpcUtil.USER_AGENT_KEY)); @@ -296,7 +303,9 @@ public class ClientCallImplTest { CallOptions.DEFAULT, provider, deadlineCancellationExecutor) - .setDecompressorRegistry(decompressorRegistry); + .setDecompressorRegistry(decompressorRegistry) + .setCompressorRegistry(compressorRegistry) + .setKnownMessageEncodingRegistry(knownMessageEncodings); Context.ROOT.attach(); @@ -371,7 +380,9 @@ public class ClientCallImplTest { CallOptions.DEFAULT, provider, deadlineCancellationExecutor) - .setDecompressorRegistry(decompressorRegistry); + .setDecompressorRegistry(decompressorRegistry) + .setCompressorRegistry(compressorRegistry) + .setKnownMessageEncodingRegistry(knownMessageEncodings); previous.attach(); diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index e78a8b5bdf..e93c2fa451 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -36,12 +36,9 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import io.grpc.Codec; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; -import io.grpc.IntegerMarshaller; import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import org.junit.Before; @@ -71,10 +68,6 @@ public class DelayedStreamTest { @Mock private ClientStream realStream; @Captor private ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); private DelayedStream stream; - private Metadata headers = new Metadata(); - - private MethodDescriptor method = MethodDescriptor.create( - MethodType.UNARY, "service/method", new IntegerMarshaller(), new IntegerMarshaller()); @Before public void setUp() { @@ -85,10 +78,10 @@ public class DelayedStreamTest { @Test public void setStream_sendsAllMessages() { - stream.setCompressor(Codec.Identity.NONE); - - DecompressorRegistry registry = DecompressorRegistry.newEmptyInstance(); - stream.setDecompressionRegistry(registry); + DecompressorRegistry decompressors = DecompressorRegistry.newEmptyInstance(); + CompressorRegistry compressors = CompressorRegistry.newEmptyInstance(); + stream.setDecompressionRegistry(decompressors); + stream.setCompressionRegistry(compressors); stream.setMessageCompression(true); InputStream message = new ByteArrayInputStream(new byte[]{'a'}); @@ -98,9 +91,8 @@ public class DelayedStreamTest { stream.setStream(realStream); - - verify(realStream).setCompressor(Codec.Identity.NONE); - verify(realStream).setDecompressionRegistry(registry); + verify(realStream).setDecompressionRegistry(decompressors); + verify(realStream).setCompressionRegistry(compressors); // Verify that the order was correct, even though they should be interleaved with the // writeMessage calls diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index fdd9367e70..32208f2d7f 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -54,6 +54,7 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; @@ -194,6 +195,7 @@ public class ManagedChannelImplTest { verify(mockTransport, timeout(1000)) .newStream(same(method), same(headers), streamListenerCaptor.capture()); verify(mockStream).setDecompressionRegistry(isA(DecompressorRegistry.class)); + verify(mockStream).setCompressionRegistry(isA(CompressorRegistry.class)); ClientStreamListener streamListener = streamListenerCaptor.getValue(); // Second call diff --git a/examples/src/main/java/io/grpc/examples/experimental/CompressingHelloWorldClient.java b/examples/src/main/java/io/grpc/examples/experimental/CompressingHelloWorldClient.java index 658b016cec..3de4631228 100644 --- a/examples/src/main/java/io/grpc/examples/experimental/CompressingHelloWorldClient.java +++ b/examples/src/main/java/io/grpc/examples/experimental/CompressingHelloWorldClient.java @@ -31,7 +31,6 @@ package io.grpc.examples.experimental; -import io.grpc.Codec; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.examples.helloworld.GreeterGrpc; @@ -61,7 +60,7 @@ public class CompressingHelloWorldClient { channel = ManagedChannelBuilder.forAddress(host, port) .usePlaintext(true) .build(); - blockingStub = GreeterGrpc.newBlockingStub(channel).withCompressor(new Codec.Gzip()); + blockingStub = GreeterGrpc.newBlockingStub(channel); } public void shutdown() throws InterruptedException { diff --git a/stub/src/main/java/io/grpc/stub/AbstractStub.java b/stub/src/main/java/io/grpc/stub/AbstractStub.java index e7e3639847..007c60c735 100644 --- a/stub/src/main/java/io/grpc/stub/AbstractStub.java +++ b/stub/src/main/java/io/grpc/stub/AbstractStub.java @@ -35,8 +35,6 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; -import io.grpc.Compressor; -import io.grpc.ExperimentalApi; import java.util.concurrent.TimeUnit; @@ -125,14 +123,6 @@ public abstract class AbstractStub> { return build(newChannel, callOptions); } - /** - * Returns a new stub that uses the given compressor. - */ - @ExperimentalApi("https://github.com/grpc/grpc-java/issues/492") - public final S withCompressor(Compressor c) { - return build(channel, callOptions.withCompressor(c)); - } - /** * Returns a new stub that has the given interceptors attached to the underlying channel. */