diff --git a/build.gradle b/build.gradle index 36aae8d913..1ae3996c99 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ subprojects { // TODO: Unreleased dependencies. // These must already be installed in the local maven repository. - netty: 'io.netty:netty-codec-http2:5.0.0.Alpha2-SNAPSHOT', + netty: 'io.netty:netty-codec-http2:5.0.0.Alpha3-SNAPSHOT', // Test dependencies. junit: 'junit:junit:4.11', diff --git a/lib/netty b/lib/netty index 1c6b3307be..44ee2cac43 160000 --- a/lib/netty +++ b/lib/netty @@ -1 +1 @@ -Subproject commit 1c6b3307becc3b0c19fdbac6a7058bd731a4db2c +Subproject commit 44ee2cac433a6f8640d01a70e8b90b70852aeeae diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java index ee8883de21..aea8292632 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java @@ -294,10 +294,10 @@ class NettyServerHandler extends Http2ConnectionHandler { // Write the GO_AWAY frame to the remote endpoint. int lastKnownStream = connection().remote().lastStreamCreated(); - ChannelFuture future = writeGoAway(ctx, lastKnownStream, errorCode, data, promise); + writeGoAway(ctx, lastKnownStream, errorCode, data, promise); // When the write completes, close this channel. - future.addListener(new ChannelFutureListener() { + promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ctx.close(); diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java index 5fa746bc3c..7cb5e433df 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java @@ -128,6 +128,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { // Reset the context to clear any interactions resulting from the HTTP/2 // connection preface handshake. mockContext(); + mockFuture(promise, true); } @Test @@ -211,8 +212,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { // Send a frame and verify that it was written. handler.write(ctx, new SendGrpcFrameCommand(stream, content, true), promise); verify(promise, never()).setFailure(any(Throwable.class)); - verify(ctx).write(any(ByteBuf.class), eq(promise)); + ByteBuf bufWritten = captureWrite(ctx); verify(ctx).flush(); + int startIndex = bufWritten.readerIndex() + Http2CodecUtil.FRAME_HEADER_LENGTH; + int length = bufWritten.writerIndex() - startIndex; + ByteBuf writtenContent = bufWritten.slice(startIndex, length); + assertEquals(content, writtenContent); } @Test diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/transport/netty/NettyHandlerTestBase.java index a71ee2670c..747891b0dc 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyHandlerTestBase.java @@ -33,11 +33,13 @@ package io.grpc.transport.netty; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; @@ -78,6 +80,9 @@ public abstract class NettyHandlerTestBase { @Mock protected ChannelPromise promise; + @Mock + protected ChannelFuture succeededFuture; + @Mock protected Http2FrameListener frameListener; @@ -118,6 +123,7 @@ public abstract class NettyHandlerTestBase { protected final ChannelHandlerContext newContext() { ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(ctx.executor()).thenReturn(eventLoop); return ctx; } @@ -127,8 +133,13 @@ public abstract class NettyHandlerTestBase { protected final ByteBuf captureWrite(ChannelHandlerContext ctx) { ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); - verify(ctx).write(captor.capture(), any(ChannelPromise.class)); - return captor.getValue(); + verify(ctx, atLeastOnce()).write(captor.capture(), any(ChannelPromise.class)); + CompositeByteBuf composite = Unpooled.compositeBuffer(); + for (ByteBuf buf : captor.getAllValues()) { + composite.addComponent(buf); + composite.writerIndex(composite.writerIndex() + buf.readableBytes()); + } + return composite; } protected final void mockContext() { @@ -141,8 +152,12 @@ public abstract class NettyHandlerTestBase { when(ctx.writeAndFlush(any())).thenReturn(future); when(ctx.writeAndFlush(any(), eq(promise))).thenReturn(future); when(ctx.newPromise()).thenReturn(promise); + when(ctx.newSucceededFuture()).thenReturn(succeededFuture); + when(ctx.executor()).thenReturn(eventLoop); when(channel.eventLoop()).thenReturn(eventLoop); + mockFuture(succeededFuture, true); + doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -154,6 +169,10 @@ public abstract class NettyHandlerTestBase { } protected final void mockFuture(boolean succeeded) { + mockFuture(future, succeeded); + } + + protected final void mockFuture(final ChannelFuture future, boolean succeeded) { when(future.isDone()).thenReturn(true); when(future.isCancelled()).thenReturn(false); when(future.isSuccess()).thenReturn(succeeded); diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java index cea5ee063d..e6ca745122 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java @@ -42,7 +42,7 @@ import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -135,6 +135,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { // Reset the context to clear any interactions resulting from the HTTP/2 // connection preface handshake. mockContext(); + mockFuture(promise, true); } @Test @@ -145,8 +146,12 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { // Send a frame and verify that it was written. handler.write(ctx, new SendGrpcFrameCommand(stream, content, false), promise); verify(promise, never()).setFailure(any(Throwable.class)); - verify(ctx).write(any(ByteBuf.class), eq(promise)); - assertEquals(0, content.refCnt()); + ByteBuf bufWritten = captureWrite(ctx); + verify(ctx, atLeastOnce()).flush(); + int startIndex = bufWritten.readerIndex() + Http2CodecUtil.FRAME_HEADER_LENGTH; + int length = bufWritten.writerIndex() - startIndex; + ByteBuf writtenContent = bufWritten.slice(startIndex, length); + assertEquals(content, writtenContent); } @Test