diff --git a/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java new file mode 100644 index 0000000000..4919a17e34 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/AbstractNettyHandler.java @@ -0,0 +1,107 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.netty; + +import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2ConnectionDecoder; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; +import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2Stream; + +/** + * Base class for all Netty gRPC handlers. This class standardizes exception handling (always + * shutdown the connection) as well as sending the initial connection window at startup. + */ +abstract class AbstractNettyHandler extends Http2ConnectionHandler { + + private int initialConnectionWindow; + private ChannelHandlerContext ctx; + + AbstractNettyHandler(Http2ConnectionDecoder decoder, + Http2ConnectionEncoder encoder, + Http2Settings initialSettings) { + super(decoder, encoder, initialSettings); + + // If a stream window was specified, update the connection window to match it. + this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : + initialSettings.initialWindowSize(); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + // Sends the connection preface if we haven't already. + super.handlerAdded(ctx); + sendInitialConnectionWindow(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Sends connection preface if we haven't already. + super.channelActive(ctx); + sendInitialConnectionWindow(); + } + + @Override + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Http2Exception embedded = getEmbeddedHttp2Exception(cause); + if (embedded == null) { + // Kill the connection instead of propagating the exceptionCaught(). Http2ConnectionHandler + // only handles Http2Exceptions and propagates everything else. + cause = Http2Exception.connectionError(Http2Error.INTERNAL_ERROR, cause, cause.getMessage()); + } + super.exceptionCaught(ctx, cause); + } + + protected final ChannelHandlerContext ctx() { + return ctx; + } + + /** + * Sends initial connection window to the remote endpoint if necessary. + */ + private void sendInitialConnectionWindow() throws Http2Exception { + if (ctx.channel().isActive() && initialConnectionWindow > 0) { + Http2Stream connectionStream = connection().connectionStream(); + int currentSize = connection().local().flowController().windowSize(connectionStream); + int delta = initialConnectionWindow - currentSize; + decoder().flowController().incrementWindowSize(connectionStream, delta); + initialConnectionWindow = -1; + ctx.flush(); + } + } +} diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index bc59dc36fb..be4552a326 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -31,7 +31,6 @@ package io.grpc.netty; -import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; import static io.netty.util.CharsetUtil.UTF_8; import com.google.common.annotations.VisibleForTesting; @@ -55,7 +54,6 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; -import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameAdapter; @@ -76,7 +74,7 @@ import javax.annotation.Nullable; * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within * the context of the Netty Channel thread. */ -class NettyClientHandler extends Http2ConnectionHandler { +class NettyClientHandler extends AbstractNettyHandler { private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName()); /** @@ -95,17 +93,14 @@ class NettyClientHandler extends Http2ConnectionHandler { private final Ticker ticker; private final Random random = new Random(); private WriteQueue clientWriteQueue; - private int initialConnectionWindow; private Http2Ping ping; private Status goAwayStatus; - private ChannelHandlerContext ctx; private int nextStreamId; public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection, Http2FrameReader frameReader, int flowControlWindow) { - this(encoder, connection, frameReader, flowControlWindow, - Ticker.systemTicker()); + this(encoder, connection, frameReader, flowControlWindow, Ticker.systemTicker()); } @VisibleForTesting @@ -114,7 +109,6 @@ class NettyClientHandler extends Http2ConnectionHandler { super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader, new LazyFrameListener()), encoder, createInitialSettings(flowControlWindow)); this.ticker = ticker; - this.initialConnectionWindow = flowControlWindow; initListener(); @@ -144,21 +138,6 @@ class NettyClientHandler extends Http2ConnectionHandler { return goAwayStatus; } - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - // Sends the connection preface if we haven't already. - super.handlerAdded(ctx); - sendInitialConnectionWindow(); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // Sends connection preface if we haven't already. - super.channelActive(ctx); - sendInitialConnectionWindow(); - } - /** * Handler for commands sent from the stream. */ @@ -257,17 +236,6 @@ class NettyClientHandler extends Http2ConnectionHandler { } } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (getEmbeddedHttp2Exception(cause) == null) { - // Kill the connection instead of propagating the exceptionCaught(). Http2ConnectionHandler - // only handles Http2Exceptions and propagates everything else. - goAwayStatus(Status.fromThrowable(cause)); - cause = new Http2Exception(Http2Error.INTERNAL_ERROR, null, cause); - } - super.exceptionCaught(ctx, cause); - } - @Override protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Http2Exception http2Ex) { @@ -319,7 +287,7 @@ class NettyClientHandler extends Http2ConnectionHandler { if (!connection().goAwaySent()) { logger.fine("Stream IDs have been exhausted for this connection. " + "Initiating graceful shutdown of the connection."); - super.close(ctx, ctx.newPromise()); + super.close(ctx(), ctx().newPromise()); } return; } @@ -327,7 +295,7 @@ class NettyClientHandler extends Http2ConnectionHandler { final NettyClientStream stream = command.stream(); final Http2Headers headers = command.headers(); stream.id(streamId); - encoder().writeHeaders(ctx, streamId, headers, 0, false, promise) + encoder().writeHeaders(ctx(), streamId, headers, 0, false, promise) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -501,20 +469,6 @@ class NettyClientHandler extends Http2ConnectionHandler { return stream; } - /** - * Sends initial connection window to the remote endpoint if necessary. - */ - private void sendInitialConnectionWindow() throws Http2Exception { - if (ctx.channel().isActive() && initialConnectionWindow > 0) { - Http2Stream connectionStream = connection().connectionStream(); - int currentSize = connection().local().flowController().windowSize(connectionStream); - int delta = initialConnectionWindow - currentSize; - decoder().flowController().incrementWindowSize(connectionStream, delta); - initialConnectionWindow = -1; - ctx.flush(); - } - } - private static class LazyFrameListener extends Http2FrameAdapter { private NettyClientHandler handler; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 9a8de2a583..7466d82253 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -51,8 +51,10 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2ConnectionHandler; +import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception.StreamException; @@ -75,7 +77,7 @@ import javax.annotation.Nullable; * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within * the context of the Netty Channel thread. */ -class NettyServerHandler extends Http2ConnectionHandler { +class NettyServerHandler extends AbstractNettyHandler { private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); @@ -85,25 +87,30 @@ class NettyServerHandler extends Http2ConnectionHandler { private final ServerTransportListener transportListener; private final int maxMessageSize; private Throwable connectionError; - private ChannelHandlerContext ctx; private boolean teWarningLogged; - private int connectionWindow; private WriteQueue serverWriteQueue; NettyServerHandler(ServerTransportListener transportListener, - Http2Connection connection, - Http2FrameReader frameReader, - Http2FrameWriter frameWriter, - int maxStreams, - int flowControlWindow, - int maxMessageSize) { - super(connection, frameReader, frameWriter, new LazyFrameListener(), - createInitialSettings(flowControlWindow, maxStreams)); - this.connectionWindow = flowControlWindow; + Http2Connection connection, + Http2FrameReader frameReader, + Http2FrameWriter frameWriter, + int maxStreams, + int flowControlWindow, + int maxMessageSize) { + this(transportListener, new DefaultHttp2ConnectionEncoder(connection, frameWriter), frameReader, + createInitialSettings(flowControlWindow, maxStreams), maxMessageSize); + } + + private NettyServerHandler(ServerTransportListener transportListener, + Http2ConnectionEncoder encoder, + Http2FrameReader frameReader, Http2Settings settings, + int maxMessageSize) { + super(new DefaultHttp2ConnectionDecoder(encoder.connection(), encoder, frameReader, + new LazyFrameListener()), encoder, settings); checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0"); this.maxMessageSize = maxMessageSize; - streamKey = connection.newKey(); + streamKey = encoder.connection().newKey(); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); initListener(); } @@ -127,17 +134,8 @@ class NettyServerHandler extends Http2ConnectionHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; serverWriteQueue = new WriteQueue(ctx.channel()); super.handlerAdded(ctx); - sendInitialConnectionWindow(); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // Sends connection preface if we haven't already. - super.channelActive(ctx); - sendInitialConnectionWindow(); } private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) @@ -383,21 +381,6 @@ class NettyServerHandler extends Http2ConnectionHandler { streamId, Http2Error.INTERNAL_ERROR, cause, cause.getMessage()); } - /** - * Sends initial connection window to the remote endpoint, if necessary. - */ - private void sendInitialConnectionWindow() throws Http2Exception { - // Send the initial connection window if different than the default. - if (ctx.channel().isActive() && connectionWindow > 0) { - Http2Stream connectionStream = connection().connectionStream(); - int currentSize = connection().local().flowController().windowSize(connectionStream); - int delta = connectionWindow - currentSize; - decoder().flowController().incrementWindowSize(connectionStream, delta); - connectionWindow = -1; - ctx.flush(); - } - } - private static class LazyFrameListener extends Http2FrameAdapter { private NettyServerHandler handler; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 059f3717ea..ca3e76d2f7 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -425,6 +425,17 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase