Move decompressor setting down into the AbstractStream

This commit is contained in:
Carl Mastrangelo 2015-09-28 10:25:58 -07:00
parent f7f57b79c9
commit 46c18ea237
7 changed files with 14 additions and 71 deletions

View File

@ -32,7 +32,6 @@
package io.grpc.inprocess;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -221,12 +220,6 @@ class InProcessTransport implements ServerTransport, ClientTransport {
// intentional nop
}
@Override
public void setDecompressor(Decompressor d) {}
@Override
public void setDecompressor(String messageEncoding) {}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@ -446,16 +439,6 @@ class InProcessTransport implements ServerTransport, ClientTransport {
// nop
}
@Override
public void setDecompressor(Decompressor d) {
// nop
}
@Override
public void setDecompressor(String messageEncoding) {
// nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
}
@ -487,16 +470,6 @@ class InProcessTransport implements ServerTransport, ClientTransport {
// very much a nop
}
@Override
public void setDecompressor(Decompressor d) {
// nop
}
@Override
public void setDecompressor(String messageEncoding) {
// nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
}

View File

@ -293,8 +293,7 @@ public abstract class AbstractStream<IdT> implements Stream {
* after the message encoding header is provided by the remote host, but before any messages are
* received.
*/
@Override
public final void setDecompressor(Decompressor d) {
protected final void setDecompressor(Decompressor d) {
deframer.setDecompressor(d);
}
@ -305,7 +304,6 @@ public abstract class AbstractStream<IdT> implements Stream {
* @param messageEncoding the name of the encoding provided by the remote host
* @throws IllegalArgumentException if the provided message encoding cannot be found.
*/
@Override
public final void setDecompressor(String messageEncoding) {
Decompressor d = decompressorRegistry.lookupDecompressor(messageEncoding);
checkArgument(d != null,

View File

@ -43,7 +43,6 @@ import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -371,12 +370,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
return false;
}
@Override
public void setDecompressor(Decompressor d) {}
@Override
public void setDecompressor(String messageEncoding) {}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
}

View File

@ -325,19 +325,6 @@ public final class ServerImpl extends io.grpc.Server {
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
Metadata headers) {
String messageEncoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY);
if (messageEncoding != null) {
try {
stream.setDecompressor(messageEncoding);
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT
.withDescription("Unable to decompress message with encoding: " + messageEncoding)
.withCause(e)
.asRuntimeException();
}
}
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor());

View File

@ -32,7 +32,6 @@
package io.grpc.internal;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import java.io.InputStream;
@ -86,22 +85,6 @@ public interface Stream {
*/
void setCompressor(Compressor c);
/**
* Set the decompressor for this stream. This may be called at most once. Typically this is set
* after the message encoding header is provided by the remote host, but before any messages are
* received.
*/
void setDecompressor(Decompressor d);
/**
* Looks up the decompressor by its message encoding name, and sets it for this stream.
* Decompressors are registered with {@link io.grpc.DecompressorRegistry#register}.
*
* @param messageEncoding the name of the encoding provided by the remote host
* @throws IllegalArgumentException if the provided message encoding cannot be found.
*/
void setDecompressor(String messageEncoding);
/**
* Sets the decompressor registry to use when resolving {@link #setDecompressor(String)}. If
* unset, the default DecompressorRegistry will be used.

View File

@ -161,6 +161,19 @@ class NettyServerHandler extends Http2ConnectionHandler {
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this,
maxMessageSize);
Metadata metadata = Utils.convertHeaders(headers);
String messageEncoding = metadata.get(GrpcUtil.MESSAGE_ENCODING_KEY);
if (messageEncoding != null) {
try {
stream.setDecompressor(messageEncoding);
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT
.withDescription("Unable to decompress message with encoding: " + messageEncoding)
.withCause(e)
.asRuntimeException();
}
}
ServerStreamListener listener =
transportListener.streamCreated(stream, method, metadata);
stream.setListener(listener);

View File

@ -128,8 +128,4 @@ class NettyServerStream extends AbstractServerStream<Integer> {
public void cancel(Status status) {
writeQueue.enqueue(new CancelServerStreamCommand(this, status), true);
}
void useDecompressor(String messageEncoding) {
setDecompressor(messageEncoding);
}
}