diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index d745409a0b..0be1e1f509 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -165,11 +165,6 @@ class NettyClientTransport implements ConnectionClientTransport { lifecycleManager = new ClientTransportLifecycleManager( Preconditions.checkNotNull(transportListener, "listener")); - if (enableKeepAlive) { - keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos, - keepAliveTimeoutNanos); - } - handler = newHandler(); HandlerSettings.setAutoWindow(handler); @@ -234,6 +229,12 @@ class NettyClientTransport implements ConnectionClientTransport { Status.INTERNAL.withDescription("Connection closed with unknown cause")); } }); + + if (enableKeepAlive) { + keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos, + keepAliveTimeoutNanos); + } + return null; } @@ -276,6 +277,11 @@ class NettyClientTransport implements ConnectionClientTransport { return channel; } + @VisibleForTesting + KeepAliveManager keepAliveManager() { + return keepAliveManager; + } + /** * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed * from the pipeline when the channel is closed. Since handlers are removed, you may get an diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 464f548d9d..ed8827d776 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -39,6 +39,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -179,7 +180,7 @@ public class NettyClientTransportTest { public void overrideDefaultUserAgent() throws Exception { startServer(); NettyClientTransport transport = newTransport(newNegotiator(), - DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent"); + DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true); callMeMaybe(transport.start(clientTransportListener)); new Rpc(transport, new Metadata()).halfClose().waitForResponse(); @@ -196,7 +197,7 @@ public class NettyClientTransportTest { startServer(); // Allow the response payloads of up to 1 byte. NettyClientTransport transport = newTransport(newNegotiator(), - 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null); + 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); callMeMaybe(transport.start(clientTransportListener)); try { @@ -278,7 +279,7 @@ public class NettyClientTransportTest { startServer(); NettyClientTransport transport = - newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null); + newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true); callMeMaybe(transport.start(clientTransportListener)); try { @@ -344,6 +345,30 @@ public class NettyClientTransportTest { assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); } + @Test + public void keepAliveEnabled() throws Exception { + startServer(); + NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */); + callMeMaybe(transport.start(clientTransportListener)); + Rpc rpc = new Rpc(transport).halfClose(); + rpc.waitForResponse(); + + assertNotNull(transport.keepAliveManager()); + } + + @Test + public void keepAliveDisabled() throws Exception { + startServer(); + NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */); + callMeMaybe(transport.start(clientTransportListener)); + Rpc rpc = new Rpc(transport).halfClose(); + rpc.waitForResponse(); + + assertNull(transport.keepAliveManager()); + } + private Throwable getRootCause(Throwable t) { if (t.getCause() == null) { return t; @@ -359,15 +384,18 @@ public class NettyClientTransportTest { } private NettyClientTransport newTransport(ProtocolNegotiator negotiator) { - return newTransport(negotiator, - DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */); + return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + null /* user agent */, true /* keep alive */); } - private NettyClientTransport newTransport( - ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) { + private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize, + int maxHeaderListSize, String userAgent, boolean enableKeepAlive) { NettyClientTransport transport = new NettyClientTransport( address, NioSocketChannel.class, new HashMap, Object>(), group, negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent); + if (enableKeepAlive) { + transport.enableKeepAlive(true, 1000, 1000); + } transports.add(transport); return transport; }