mirror of https://github.com/grpc/grpc-java.git
netty: Fix NPE in NettyClientTransport
Fixes NPE when keepalive is enabled. * Move creation of keepAliveManager to the bottom of start() * Enable keepAlive in NettyClientTransportTest * Add test cases checking if keepalive is enabled/disabled, specifically. Fixes #2726
This commit is contained in:
parent
72923dca87
commit
d116cc9875
|
|
@ -165,11 +165,6 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
lifecycleManager = new ClientTransportLifecycleManager(
|
lifecycleManager = new ClientTransportLifecycleManager(
|
||||||
Preconditions.checkNotNull(transportListener, "listener"));
|
Preconditions.checkNotNull(transportListener, "listener"));
|
||||||
|
|
||||||
if (enableKeepAlive) {
|
|
||||||
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
|
|
||||||
keepAliveTimeoutNanos);
|
|
||||||
}
|
|
||||||
|
|
||||||
handler = newHandler();
|
handler = newHandler();
|
||||||
HandlerSettings.setAutoWindow(handler);
|
HandlerSettings.setAutoWindow(handler);
|
||||||
|
|
||||||
|
|
@ -234,6 +229,12 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
|
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (enableKeepAlive) {
|
||||||
|
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
|
||||||
|
keepAliveTimeoutNanos);
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -276,6 +277,11 @@ class NettyClientTransport implements ConnectionClientTransport {
|
||||||
return channel;
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
KeepAliveManager keepAliveManager() {
|
||||||
|
return keepAliveManager;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
|
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
|
||||||
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
|
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
|
@ -179,7 +180,7 @@ public class NettyClientTransportTest {
|
||||||
public void overrideDefaultUserAgent() throws Exception {
|
public void overrideDefaultUserAgent() throws Exception {
|
||||||
startServer();
|
startServer();
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent");
|
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
|
||||||
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
|
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
|
||||||
|
|
@ -196,7 +197,7 @@ public class NettyClientTransportTest {
|
||||||
startServer();
|
startServer();
|
||||||
// Allow the response payloads of up to 1 byte.
|
// Allow the response payloads of up to 1 byte.
|
||||||
NettyClientTransport transport = newTransport(newNegotiator(),
|
NettyClientTransport transport = newTransport(newNegotiator(),
|
||||||
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null);
|
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -278,7 +279,7 @@ public class NettyClientTransportTest {
|
||||||
startServer();
|
startServer();
|
||||||
|
|
||||||
NettyClientTransport transport =
|
NettyClientTransport transport =
|
||||||
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null);
|
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
|
||||||
callMeMaybe(transport.start(clientTransportListener));
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
@ -344,6 +345,30 @@ public class NettyClientTransportTest {
|
||||||
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
|
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void keepAliveEnabled() throws Exception {
|
||||||
|
startServer();
|
||||||
|
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
|
||||||
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
|
rpc.waitForResponse();
|
||||||
|
|
||||||
|
assertNotNull(transport.keepAliveManager());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void keepAliveDisabled() throws Exception {
|
||||||
|
startServer();
|
||||||
|
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
|
||||||
|
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
|
||||||
|
callMeMaybe(transport.start(clientTransportListener));
|
||||||
|
Rpc rpc = new Rpc(transport).halfClose();
|
||||||
|
rpc.waitForResponse();
|
||||||
|
|
||||||
|
assertNull(transport.keepAliveManager());
|
||||||
|
}
|
||||||
|
|
||||||
private Throwable getRootCause(Throwable t) {
|
private Throwable getRootCause(Throwable t) {
|
||||||
if (t.getCause() == null) {
|
if (t.getCause() == null) {
|
||||||
return t;
|
return t;
|
||||||
|
|
@ -359,15 +384,18 @@ public class NettyClientTransportTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
|
private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
|
||||||
return newTransport(negotiator,
|
return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
|
||||||
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */);
|
null /* user agent */, true /* keep alive */);
|
||||||
}
|
}
|
||||||
|
|
||||||
private NettyClientTransport newTransport(
|
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
|
||||||
ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) {
|
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
|
||||||
NettyClientTransport transport = new NettyClientTransport(
|
NettyClientTransport transport = new NettyClientTransport(
|
||||||
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
|
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
|
||||||
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
|
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
|
||||||
|
if (enableKeepAlive) {
|
||||||
|
transport.enableKeepAlive(true, 1000, 1000);
|
||||||
|
}
|
||||||
transports.add(transport);
|
transports.add(transport);
|
||||||
return transport;
|
return transport;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue