From 7de6c04d14cfaa9ffdcecd22aa8a233479e7461c Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 18 Nov 2015 15:11:58 -0800 Subject: [PATCH] Move decompressor setting to the AbstractServerStream --- .../grpc/internal/AbstractClientStream.java | 5 +---- .../grpc/internal/AbstractServerStream.java | 22 +++++++++++++++++++ .../io/grpc/netty/NettyServerHandler.java | 14 ++---------- .../java/io/grpc/netty/NettyServerStream.java | 5 +++++ 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 182523b50c..3e83a14d6d 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -99,10 +99,7 @@ public abstract class AbstractClientStream extends AbstractStream } /** - * Called by transport implementations when they receive headers. When receiving headers - * a transport may determine that there is an error in the protocol at this phase which is - * why this method takes an error {@link Status}. If a transport reports an - * {@link io.grpc.Status.Code#INTERNAL} error + * Called by transport implementations when they receive headers. * * @param headers the parsed headers */ diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index b28ea64266..67c63d5ae1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -129,6 +129,28 @@ public abstract class AbstractServerStream extends AbstractStream } } + /** + * Called by transport implementations when they receive headers. + * + * @param headers the parsed headers + */ + protected void inboundHeadersReceived(Metadata headers) { + if (headers.containsKey(GrpcUtil.MESSAGE_ENCODING_KEY)) { + String messageEncoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY); + try { + setDecompressor(messageEncoding); + } catch (IllegalArgumentException e) { + Status status = Status.INVALID_ARGUMENT + .withDescription("Unable to decompress encoding " + messageEncoding) + .withCause(e); + abortStream(status, true); + return; + } + } + + inboundPhase(Phase.MESSAGE); + } + /** * Called in the network thread to process the content of an inbound DATA frame from the client. * diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index a813b44350..7af4790f0b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -151,19 +151,9 @@ class NettyServerHandler extends AbstractNettyHandler { NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this, maxMessageSize); - Metadata metadata = Utils.convertHeaders(headers); - String messageEncoding = metadata.get(GrpcUtil.MESSAGE_ENCODING_KEY); - if (messageEncoding != null) { - try { - stream.setDecompressor(messageEncoding); - } catch (IllegalArgumentException e) { - throw Status.INVALID_ARGUMENT - .withDescription("Unable to decompress message with encoding: " + messageEncoding) - .withCause(e) - .asRuntimeException(); - } - } + Metadata metadata = Utils.convertHeaders(headers); + stream.inboundHeadersReceived(metadata); ServerStreamListener listener = transportListener.streamCreated(stream, method, metadata); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 94e976a3ad..53474b85e2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -68,6 +68,11 @@ class NettyServerStream extends AbstractServerStream { return http2Stream.id(); } + @Override + protected void inboundHeadersReceived(Metadata headers) { + super.inboundHeadersReceived(headers); + } + void inboundDataReceived(ByteBuf frame, boolean endOfStream) { super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); }