diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index ae0160dbbf..915ce2081a 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -32,7 +32,6 @@ package io.grpc.inprocess; import io.grpc.Compressor; -import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -221,12 +220,6 @@ class InProcessTransport implements ServerTransport, ClientTransport { // intentional nop } - @Override - public void setDecompressor(Decompressor d) {} - - @Override - public void setDecompressor(String messageEncoding) {} - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} @@ -446,16 +439,6 @@ class InProcessTransport implements ServerTransport, ClientTransport { // nop } - @Override - public void setDecompressor(Decompressor d) { - // nop - } - - @Override - public void setDecompressor(String messageEncoding) { - // nop - } - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} } @@ -487,16 +470,6 @@ class InProcessTransport implements ServerTransport, ClientTransport { // very much a nop } - @Override - public void setDecompressor(Decompressor d) { - // nop - } - - @Override - public void setDecompressor(String messageEncoding) { - // nop - } - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} } diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 3611ef251b..a17de176d1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -293,8 +293,7 @@ public abstract class AbstractStream implements Stream { * after the message encoding header is provided by the remote host, but before any messages are * received. */ - @Override - public final void setDecompressor(Decompressor d) { + protected final void setDecompressor(Decompressor d) { deframer.setDecompressor(d); } @@ -305,7 +304,6 @@ public abstract class AbstractStream implements Stream { * @param messageEncoding the name of the encoding provided by the remote host * @throws IllegalArgumentException if the provided message encoding cannot be found. */ - @Override public final void setDecompressor(String messageEncoding) { Decompressor d = decompressorRegistry.lookupDecompressor(messageEncoding); checkArgument(d != null, diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 7ffc79f407..27fd3ed34b 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -43,7 +43,6 @@ import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Codec; import io.grpc.Compressor; -import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -371,12 +370,6 @@ final class ClientCallImpl extends ClientCall { return false; } - @Override - public void setDecompressor(Decompressor d) {} - - @Override - public void setDecompressor(String messageEncoding) {} - @Override public void setDecompressionRegistry(DecompressorRegistry registry) {} } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 8cc12f72cb..6de6dffdfd 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -325,19 +325,6 @@ public final class ServerImpl extends io.grpc.Server { private ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition methodDef, Future timeout, Metadata headers) { - - String messageEncoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY); - if (messageEncoding != null) { - try { - stream.setDecompressor(messageEncoding); - } catch (IllegalArgumentException e) { - throw Status.INVALID_ARGUMENT - .withDescription("Unable to decompress message with encoding: " + messageEncoding) - .withCause(e) - .asRuntimeException(); - } - } - // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? ServerCallImpl call = new ServerCallImpl( stream, methodDef.getMethodDescriptor()); diff --git a/core/src/main/java/io/grpc/internal/Stream.java b/core/src/main/java/io/grpc/internal/Stream.java index 1edf147976..efb9abd8a7 100644 --- a/core/src/main/java/io/grpc/internal/Stream.java +++ b/core/src/main/java/io/grpc/internal/Stream.java @@ -32,7 +32,6 @@ package io.grpc.internal; import io.grpc.Compressor; -import io.grpc.Decompressor; import io.grpc.DecompressorRegistry; import java.io.InputStream; @@ -86,22 +85,6 @@ public interface Stream { */ void setCompressor(Compressor c); - /** - * Set the decompressor for this stream. This may be called at most once. Typically this is set - * after the message encoding header is provided by the remote host, but before any messages are - * received. - */ - void setDecompressor(Decompressor d); - - /** - * Looks up the decompressor by its message encoding name, and sets it for this stream. - * Decompressors are registered with {@link io.grpc.DecompressorRegistry#register}. - * - * @param messageEncoding the name of the encoding provided by the remote host - * @throws IllegalArgumentException if the provided message encoding cannot be found. - */ - void setDecompressor(String messageEncoding); - /** * Sets the decompressor registry to use when resolving {@link #setDecompressor(String)}. If * unset, the default DecompressorRegistry will be used. diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 72f0346e13..9a8de2a583 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -161,6 +161,19 @@ class NettyServerHandler extends Http2ConnectionHandler { NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this, maxMessageSize); Metadata metadata = Utils.convertHeaders(headers); + + String messageEncoding = metadata.get(GrpcUtil.MESSAGE_ENCODING_KEY); + if (messageEncoding != null) { + try { + stream.setDecompressor(messageEncoding); + } catch (IllegalArgumentException e) { + throw Status.INVALID_ARGUMENT + .withDescription("Unable to decompress message with encoding: " + messageEncoding) + .withCause(e) + .asRuntimeException(); + } + } + ServerStreamListener listener = transportListener.streamCreated(stream, method, metadata); stream.setListener(listener); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 861e0bf524..579005b42e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -128,8 +128,4 @@ class NettyServerStream extends AbstractServerStream { public void cancel(Status status) { writeQueue.enqueue(new CancelServerStreamCommand(this, status), true); } - - void useDecompressor(String messageEncoding) { - setDecompressor(messageEncoding); - } }