Move decompressor setting to the AbstractServerStream

This commit is contained in:
Carl Mastrangelo 2015-11-18 15:11:58 -08:00
parent 602473d786
commit 7de6c04d14
4 changed files with 30 additions and 16 deletions

View File

@ -99,10 +99,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
} }
/** /**
* Called by transport implementations when they receive headers. When receiving headers * Called by transport implementations when they receive headers.
* a transport may determine that there is an error in the protocol at this phase which is
* why this method takes an error {@link Status}. If a transport reports an
* {@link io.grpc.Status.Code#INTERNAL} error
* *
* @param headers the parsed headers * @param headers the parsed headers
*/ */

View File

@ -129,6 +129,28 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
} }
} }
/**
* Called by transport implementations when they receive headers.
*
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
if (headers.containsKey(GrpcUtil.MESSAGE_ENCODING_KEY)) {
String messageEncoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY);
try {
setDecompressor(messageEncoding);
} catch (IllegalArgumentException e) {
Status status = Status.INVALID_ARGUMENT
.withDescription("Unable to decompress encoding " + messageEncoding)
.withCause(e);
abortStream(status, true);
return;
}
}
inboundPhase(Phase.MESSAGE);
}
/** /**
* Called in the network thread to process the content of an inbound DATA frame from the client. * Called in the network thread to process the content of an inbound DATA frame from the client.
* *

View File

@ -151,19 +151,9 @@ class NettyServerHandler extends AbstractNettyHandler {
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this, NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this,
maxMessageSize); maxMessageSize);
Metadata metadata = Utils.convertHeaders(headers);
String messageEncoding = metadata.get(GrpcUtil.MESSAGE_ENCODING_KEY); Metadata metadata = Utils.convertHeaders(headers);
if (messageEncoding != null) { stream.inboundHeadersReceived(metadata);
try {
stream.setDecompressor(messageEncoding);
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT
.withDescription("Unable to decompress message with encoding: " + messageEncoding)
.withCause(e)
.asRuntimeException();
}
}
ServerStreamListener listener = ServerStreamListener listener =
transportListener.streamCreated(stream, method, metadata); transportListener.streamCreated(stream, method, metadata);

View File

@ -68,6 +68,11 @@ class NettyServerStream extends AbstractServerStream<Integer> {
return http2Stream.id(); return http2Stream.id();
} }
@Override
protected void inboundHeadersReceived(Metadata headers) {
super.inboundHeadersReceived(headers);
}
void inboundDataReceived(ByteBuf frame, boolean endOfStream) { void inboundDataReceived(ByteBuf frame, boolean endOfStream) {
super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
} }