mirror of https://github.com/grpc/grpc-java.git
netty: Fix NPE in NettyClientTransport
Fixes NPE when keepalive is enabled. * Move creation of keepAliveManager to the bottom of start() * Enable keepAlive in NettyClientTransportTest * Add test cases checking if keepalive is enabled/disabled, specifically. Fixes #2726
This commit is contained in:
parent
72923dca87
commit
d116cc9875
|
|
@ -165,11 +165,6 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
lifecycleManager = new ClientTransportLifecycleManager(
|
||||
Preconditions.checkNotNull(transportListener, "listener"));
|
||||
|
||||
if (enableKeepAlive) {
|
||||
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
|
||||
keepAliveTimeoutNanos);
|
||||
}
|
||||
|
||||
handler = newHandler();
|
||||
HandlerSettings.setAutoWindow(handler);
|
||||
|
||||
|
|
@ -234,6 +229,12 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
|
||||
}
|
||||
});
|
||||
|
||||
if (enableKeepAlive) {
|
||||
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
|
||||
keepAliveTimeoutNanos);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -276,6 +277,11 @@ class NettyClientTransport implements ConnectionClientTransport {
|
|||
return channel;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
KeepAliveManager keepAliveManager() {
|
||||
return keepAliveManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
|
||||
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
|
@ -179,7 +180,7 @@ public class NettyClientTransportTest {
|
|||
public void overrideDefaultUserAgent() throws Exception {
|
||||
startServer();
|
||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent");
|
||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
|
||||
callMeMaybe(transport.start(clientTransportListener));
|
||||
|
||||
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
|
||||
|
|
@ -196,7 +197,7 @@ public class NettyClientTransportTest {
|
|||
startServer();
|
||||
// Allow the response payloads of up to 1 byte.
|
||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null);
|
||||
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
|
||||
callMeMaybe(transport.start(clientTransportListener));
|
||||
|
||||
try {
|
||||
|
|
@ -278,7 +279,7 @@ public class NettyClientTransportTest {
|
|||
startServer();
|
||||
|
||||
NettyClientTransport transport =
|
||||
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null);
|
||||
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
|
||||
callMeMaybe(transport.start(clientTransportListener));
|
||||
|
||||
try {
|
||||
|
|
@ -344,6 +345,30 @@ public class NettyClientTransportTest {
|
|||
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void keepAliveEnabled() throws Exception {
|
||||
startServer();
|
||||
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
|
||||
callMeMaybe(transport.start(clientTransportListener));
|
||||
Rpc rpc = new Rpc(transport).halfClose();
|
||||
rpc.waitForResponse();
|
||||
|
||||
assertNotNull(transport.keepAliveManager());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void keepAliveDisabled() throws Exception {
|
||||
startServer();
|
||||
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
|
||||
callMeMaybe(transport.start(clientTransportListener));
|
||||
Rpc rpc = new Rpc(transport).halfClose();
|
||||
rpc.waitForResponse();
|
||||
|
||||
assertNull(transport.keepAliveManager());
|
||||
}
|
||||
|
||||
private Throwable getRootCause(Throwable t) {
|
||||
if (t.getCause() == null) {
|
||||
return t;
|
||||
|
|
@ -359,15 +384,18 @@ public class NettyClientTransportTest {
|
|||
}
|
||||
|
||||
private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
|
||||
return newTransport(negotiator,
|
||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */);
|
||||
return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
|
||||
null /* user agent */, true /* keep alive */);
|
||||
}
|
||||
|
||||
private NettyClientTransport newTransport(
|
||||
ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) {
|
||||
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
|
||||
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
|
||||
NettyClientTransport transport = new NettyClientTransport(
|
||||
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
|
||||
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
|
||||
if (enableKeepAlive) {
|
||||
transport.enableKeepAlive(true, 1000, 1000);
|
||||
}
|
||||
transports.add(transport);
|
||||
return transport;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue