Cleaning up closing for Netty client/server.

This commit is contained in:
nmittler 2015-05-05 08:32:36 -07:00
parent ef6a2f02d4
commit a45e0a4767
3 changed files with 12 additions and 44 deletions

View File

@ -36,9 +36,6 @@ import static io.grpc.transport.netty.Utils.CONTENT_TYPE_HEADER;
import static io.grpc.transport.netty.Utils.HTTP_METHOD;
import static io.grpc.transport.netty.Utils.TE_HEADER;
import static io.grpc.transport.netty.Utils.TE_TRAILERS;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import com.google.common.base.Preconditions;
@ -135,18 +132,6 @@ class NettyServerHandler extends Http2ConnectionHandler {
initConnectionWindow();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(promise);
return;
}
// Write the GO_AWAY frame to the remote endpoint and then shutdown the channel.
goAwayAndClose(ctx, (int) NO_ERROR.code(), EMPTY_BUFFER, promise);
}
private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
throws Http2Exception {
if (!teWarningLogged && !TE_TRAILERS.equals(headers.get(TE_HEADER))) {
@ -199,10 +184,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
Http2Exception http2Ex) {
logger.log(Level.WARNING, "Connection Error", cause);
connectionError = cause;
Http2Error error = http2Ex != null ? http2Ex.error() : Http2Error.INTERNAL_ERROR;
// Write the GO_AWAY frame to the remote endpoint and then shutdown the channel.
goAwayAndClose(ctx, (int) error.code(), toByteBuf(ctx, cause), ctx.newPromise());
super.onConnectionError(ctx, cause, http2Ex);
}
@Override
@ -302,30 +284,6 @@ class NettyServerHandler extends Http2ConnectionHandler {
ctx.flush();
}
/**
* Writes a {@code GO_AWAY} frame to the remote endpoint. When it completes, shuts down the
* channel.
*/
private void goAwayAndClose(final ChannelHandlerContext ctx, int errorCode, ByteBuf data,
ChannelPromise promise) {
if (connection().goAwaySent()) {
// Already sent the GO_AWAY. Do nothing.
return;
}
// Write the GO_AWAY frame to the remote endpoint.
int lastKnownStream = connection().remote().lastStreamCreated();
goAway(ctx, lastKnownStream, errorCode, data, promise);
// When the write completes, close this channel.
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.close();
}
});
}
private Http2Stream requireHttp2Stream(int streamId) {
Http2Stream stream = connection().stream(streamId);
if (stream == null) {

View File

@ -167,6 +167,16 @@ public abstract class NettyHandlerTestBase {
return null;
}
}).when(eventLoop).execute(any(Runnable.class));
// Make all writes complete immediately.
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) throws Throwable {
ChannelPromise promise = (ChannelPromise) in.getArguments()[1];
promise.setSuccess();
return promise;
}
}).when(ctx).write(any(), any(ChannelPromise.class));
}
protected final void mockFuture(boolean succeeded) {

View File

@ -267,7 +267,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
assertEquals(expected, actual);
// Verify that the context was closed.
verify(ctx).close();
verify(ctx).close(promise);
}
@Test