netty: NettyServer should close ProtocolNegotiator

This commit is contained in:
Eric Anderson 2019-12-10 16:46:30 -08:00 committed by GitHub
parent 9485003cf9
commit 9a646518cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 7 deletions

View File

@ -288,10 +288,11 @@ class NettyServer implements InternalServer, InternalWithLogId {
if (stats != null) { if (stats != null) {
channelz.removeListenSocket(stats); channelz.removeListenSocket(stats);
} }
sharedResourceReferenceCounter.release();
protocolNegotiator.close();
synchronized (NettyServer.this) { synchronized (NettyServer.this) {
listener.serverShutdown(); listener.serverShutdown();
} }
sharedResourceReferenceCounter.release();
} }
}); });
try { try {

View File

@ -38,9 +38,10 @@ interface ProtocolNegotiator {
ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler); ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler);
/** /**
* Releases resources held by this negotiator. Called when the Channel transitions to terminated. * Releases resources held by this negotiator. Called when the Channel transitions to terminated
* Is currently only supported on client-side; server-side protocol negotiators will not see this * or when InternalServer is shutdown (depending on client or server). That means handlers
* method called. * returned by {@link #newHandler()} can outlive their parent negotiator on server-side, but not
* on client-side.
*/ */
void close(); void close();
} }

View File

@ -36,14 +36,17 @@ import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourcePool; import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer; import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.AsciiString;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -54,8 +57,26 @@ public class NettyServerTest {
private final InternalChannelz channelz = new InternalChannelz(); private final InternalChannelz channelz = new InternalChannelz();
@Test @Test
public void getPort() throws Exception { public void startStop() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0); InetSocketAddress addr = new InetSocketAddress(0);
class TestProtocolNegotiator implements ProtocolNegotiator {
boolean closed;
@Override public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
throw new UnsupportedOperationException();
}
@Override public void close() {
closed = true;
}
@Override public AsciiString scheme() {
return Utils.HTTP;
}
}
TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator();
NettyServer ns = new NettyServer( NettyServer ns = new NettyServer(
addr, addr,
Utils.DEFAULT_SERVER_CHANNEL_FACTORY, Utils.DEFAULT_SERVER_CHANNEL_FACTORY,
@ -63,7 +84,7 @@ public class NettyServerTest {
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(), protocolNegotiator,
Collections.<ServerStreamTracer.Factory>emptyList(), Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(), TransportTracer.getDefaultFactory(),
1, // ignore 1, // ignore
@ -75,6 +96,7 @@ public class NettyServerTest {
1, 1, // ignore 1, 1, // ignore
true, 0, // ignore true, 0, // ignore
channelz); channelz);
final SettableFuture<Void> serverShutdownCalled = SettableFuture.create();
ns.start(new ServerListener() { ns.start(new ServerListener() {
@Override @Override
public ServerTransportListener transportCreated(ServerTransport transport) { public ServerTransportListener transportCreated(ServerTransport transport) {
@ -82,7 +104,9 @@ public class NettyServerTest {
} }
@Override @Override
public void serverShutdown() {} public void serverShutdown() {
serverShutdownCalled.set(null);
}
}); });
// Check that we got an actual port. // Check that we got an actual port.
@ -90,6 +114,9 @@ public class NettyServerTest {
// Cleanup // Cleanup
ns.shutdown(); ns.shutdown();
// serverShutdown() signals that resources are freed
serverShutdownCalled.get(1, TimeUnit.SECONDS);
assertThat(protocolNegotiator.closed).isTrue();
} }
@Test @Test