Adding consistent exception handling for Netty

NettyClientHandler currently handles non-HTTP/2 exceptions properly by forcing a shutdown of the connection.  We need to do the server-side as well.

Fixes #1097
This commit is contained in:
nmittler 2015-10-05 11:05:29 -07:00
parent 2d067edf85
commit 6e04c3100e
5 changed files with 154 additions and 88 deletions

View File

@ -0,0 +1,107 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.netty;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
/**
* Base class for all Netty gRPC handlers. This class standardizes exception handling (always
* shutdown the connection) as well as sending the initial connection window at startup.
*/
abstract class AbstractNettyHandler extends Http2ConnectionHandler {
private int initialConnectionWindow;
private ChannelHandlerContext ctx;
AbstractNettyHandler(Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
// If a stream window was specified, update the connection window to match it.
this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
initialSettings.initialWindowSize();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Sends the connection preface if we haven't already.
super.handlerAdded(ctx);
sendInitialConnectionWindow();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Sends connection preface if we haven't already.
super.channelActive(ctx);
sendInitialConnectionWindow();
}
@Override
public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception embedded = getEmbeddedHttp2Exception(cause);
if (embedded == null) {
// Kill the connection instead of propagating the exceptionCaught(). Http2ConnectionHandler
// only handles Http2Exceptions and propagates everything else.
cause = Http2Exception.connectionError(Http2Error.INTERNAL_ERROR, cause, cause.getMessage());
}
super.exceptionCaught(ctx, cause);
}
protected final ChannelHandlerContext ctx() {
return ctx;
}
/**
* Sends initial connection window to the remote endpoint if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1;
ctx.flush();
}
}
}

View File

@ -31,7 +31,6 @@
package io.grpc.netty;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.annotations.VisibleForTesting;
@ -55,7 +54,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
@ -76,7 +74,7 @@ import javax.annotation.Nullable;
* Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
* the context of the Netty Channel thread.
*/
class NettyClientHandler extends Http2ConnectionHandler {
class NettyClientHandler extends AbstractNettyHandler {
private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
/**
@ -95,17 +93,14 @@ class NettyClientHandler extends Http2ConnectionHandler {
private final Ticker ticker;
private final Random random = new Random();
private WriteQueue clientWriteQueue;
private int initialConnectionWindow;
private Http2Ping ping;
private Status goAwayStatus;
private ChannelHandlerContext ctx;
private int nextStreamId;
public NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection,
Http2FrameReader frameReader,
int flowControlWindow) {
this(encoder, connection, frameReader, flowControlWindow,
Ticker.systemTicker());
this(encoder, connection, frameReader, flowControlWindow, Ticker.systemTicker());
}
@VisibleForTesting
@ -114,7 +109,6 @@ class NettyClientHandler extends Http2ConnectionHandler {
super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
new LazyFrameListener()), encoder, createInitialSettings(flowControlWindow));
this.ticker = ticker;
this.initialConnectionWindow = flowControlWindow;
initListener();
@ -144,21 +138,6 @@ class NettyClientHandler extends Http2ConnectionHandler {
return goAwayStatus;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Sends the connection preface if we haven't already.
super.handlerAdded(ctx);
sendInitialConnectionWindow();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Sends connection preface if we haven't already.
super.channelActive(ctx);
sendInitialConnectionWindow();
}
/**
* Handler for commands sent from the stream.
*/
@ -257,17 +236,6 @@ class NettyClientHandler extends Http2ConnectionHandler {
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (getEmbeddedHttp2Exception(cause) == null) {
// Kill the connection instead of propagating the exceptionCaught(). Http2ConnectionHandler
// only handles Http2Exceptions and propagates everything else.
goAwayStatus(Status.fromThrowable(cause));
cause = new Http2Exception(Http2Error.INTERNAL_ERROR, null, cause);
}
super.exceptionCaught(ctx, cause);
}
@Override
protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause,
Http2Exception http2Ex) {
@ -319,7 +287,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
if (!connection().goAwaySent()) {
logger.fine("Stream IDs have been exhausted for this connection. "
+ "Initiating graceful shutdown of the connection.");
super.close(ctx, ctx.newPromise());
super.close(ctx(), ctx().newPromise());
}
return;
}
@ -327,7 +295,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
final NettyClientStream stream = command.stream();
final Http2Headers headers = command.headers();
stream.id(streamId);
encoder().writeHeaders(ctx, streamId, headers, 0, false, promise)
encoder().writeHeaders(ctx(), streamId, headers, 0, false, promise)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -501,20 +469,6 @@ class NettyClientHandler extends Http2ConnectionHandler {
return stream;
}
/**
* Sends initial connection window to the remote endpoint if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
if (ctx.channel().isActive() && initialConnectionWindow > 0) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = initialConnectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
initialConnectionWindow = -1;
ctx.flush();
}
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyClientHandler handler;

View File

@ -51,8 +51,10 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
@ -75,7 +77,7 @@ import javax.annotation.Nullable;
* Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
* the context of the Netty Channel thread.
*/
class NettyServerHandler extends Http2ConnectionHandler {
class NettyServerHandler extends AbstractNettyHandler {
private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
@ -85,25 +87,30 @@ class NettyServerHandler extends Http2ConnectionHandler {
private final ServerTransportListener transportListener;
private final int maxMessageSize;
private Throwable connectionError;
private ChannelHandlerContext ctx;
private boolean teWarningLogged;
private int connectionWindow;
private WriteQueue serverWriteQueue;
NettyServerHandler(ServerTransportListener transportListener,
Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
int maxStreams,
int flowControlWindow,
int maxMessageSize) {
super(connection, frameReader, frameWriter, new LazyFrameListener(),
createInitialSettings(flowControlWindow, maxStreams));
this.connectionWindow = flowControlWindow;
Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
int maxStreams,
int flowControlWindow,
int maxMessageSize) {
this(transportListener, new DefaultHttp2ConnectionEncoder(connection, frameWriter), frameReader,
createInitialSettings(flowControlWindow, maxStreams), maxMessageSize);
}
private NettyServerHandler(ServerTransportListener transportListener,
Http2ConnectionEncoder encoder,
Http2FrameReader frameReader, Http2Settings settings,
int maxMessageSize) {
super(new DefaultHttp2ConnectionDecoder(encoder.connection(), encoder, frameReader,
new LazyFrameListener()), encoder, settings);
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize;
streamKey = connection.newKey();
streamKey = encoder.connection().newKey();
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
initListener();
}
@ -127,17 +134,8 @@ class NettyServerHandler extends Http2ConnectionHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
serverWriteQueue = new WriteQueue(ctx.channel());
super.handlerAdded(ctx);
sendInitialConnectionWindow();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Sends connection preface if we haven't already.
super.channelActive(ctx);
sendInitialConnectionWindow();
}
private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
@ -383,21 +381,6 @@ class NettyServerHandler extends Http2ConnectionHandler {
streamId, Http2Error.INTERNAL_ERROR, cause, cause.getMessage());
}
/**
* Sends initial connection window to the remote endpoint, if necessary.
*/
private void sendInitialConnectionWindow() throws Http2Exception {
// Send the initial connection window if different than the default.
if (ctx.channel().isActive() && connectionWindow > 0) {
Http2Stream connectionStream = connection().connectionStream();
int currentSize = connection().local().flowController().windowSize(connectionStream);
int delta = connectionWindow - currentSize;
decoder().flowController().incrementWindowSize(connectionStream, delta);
connectionWindow = -1;
ctx.flush();
}
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyServerHandler handler;

View File

@ -425,6 +425,17 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
((StatusException) callback.failureCause).getStatus().getCode());
}
@Test
public void exceptionCaughtShouldCloseConnection() throws Exception {
handler().exceptionCaught(ctx(), new RuntimeException("fake exception"));
// TODO(nmittler): EmbeddedChannel does not currently invoke the channelInactive processing,
// so exceptionCaught() will not close streams properly in this test.
// Once https://github.com/netty/netty/issues/4316 is resolved, we should also verify that
// any open streams are closed properly.
assertFalse(channel().isOpen());
}
private ChannelFuture sendPing(PingCallback callback) {
return enqueue(new SendPingCommand(callback, MoreExecutors.directExecutor()));
}

View File

@ -228,6 +228,17 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertFalse(channel().isOpen());
}
@Test
public void exceptionCaughtShouldCloseConnection() throws Exception {
handler().exceptionCaught(ctx(), new RuntimeException("fake exception"));
// TODO(nmittler): EmbeddedChannel does not currently invoke the channelInactive processing,
// so exceptionCaught() will not close streams properly in this test.
// Once https://github.com/netty/netty/issues/4316 is resolved, we should also verify that
// any open streams are closed properly.
assertFalse(channel().isOpen());
}
@Test
public void shouldAdvertiseMaxConcurrentStreams() throws Exception {
maxConcurrentStreams = 314;