core,netty: expose listening on multiple ports

This commit is contained in:
Carl Mastrangelo 2019-02-06 15:49:59 -08:00 committed by GitHub
parent 6b68d874f5
commit f6ec07d87d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 152 additions and 92 deletions

View File

@ -17,6 +17,7 @@
package io.grpc; package io.grpc;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,8 +42,10 @@ public abstract class Server {
/** /**
* Returns the port number the server is listening on. This can return -1 if there is no actual * Returns the port number the server is listening on. This can return -1 if there is no actual
* port or the result otherwise does not make sense. Result is undefined after the server is * port or the result otherwise does not make sense. Result is undefined after the server is
* terminated. * terminated. If there are multiple possible ports, this will return one arbitrarily.
* Implementations are encouraged to return the same port on each call.
* *
* @see #getListenSockets()
* @throws IllegalStateException if the server has not yet been started. * @throws IllegalStateException if the server has not yet been started.
* @since 1.0.0 * @since 1.0.0
*/ */
@ -50,6 +53,19 @@ public abstract class Server {
return -1; return -1;
} }
/**
* Returns a list of listening sockets for this server. May be different than the originally
* requested sockets (e.g. listening on port '0' may end up listening on a different port).
* The list is unmodifiable.
*
* @throws IllegalStateException if the server has not yet been started.
* @since 1.19.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/FIXME")
public List<? extends SocketAddress> getListenSockets() {
throw new UnsupportedOperationException();
}
/** /**
* Returns all services registered with the server, or an empty list if not supported by the * Returns all services registered with the server, or an empty list if not supported by the
* implementation. * implementation.

View File

@ -27,6 +27,7 @@ import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener; import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -77,13 +78,13 @@ final class InProcessServer implements InternalServer {
} }
@Override @Override
public int getPort() { public SocketAddress getListenSocketAddress() {
return -1; return new InProcessSocketAddress(name);
} }
@Override @Override
public List<InternalInstrumented<SocketStats>> getListenSockets() { public InternalInstrumented<SocketStats> getListenSocketStats() {
return Collections.emptyList(); return null;
} }
@Override @Override

View File

@ -19,7 +19,8 @@ package io.grpc.internal;
import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented; import io.grpc.InternalInstrumented;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.net.SocketAddress;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
/** /**
@ -46,13 +47,13 @@ public interface InternalServer {
void shutdown(); void shutdown();
/** /**
* Returns what underlying port the server is listening on, or -1 if the port number is not * Returns the listening socket address. May change after {@link start(ServerListener)} is
* available or does not make sense. * called.
*/ */
int getPort(); SocketAddress getListenSocketAddress();
/** /**
* Returns the listen sockets of this server. May return an empty list but never returns null. * Returns the listen socket stats of this server. May return {@code null}.
*/ */
List<InternalInstrumented<SocketStats>> getListenSockets(); @Nullable InternalInstrumented<SocketStats> getListenSocketStats();
} }

View File

@ -38,6 +38,7 @@ import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry; import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz; import io.grpc.InternalChannelz;
import io.grpc.InternalChannelz.ServerStats; import io.grpc.InternalChannelz.ServerStats;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented; import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalServerInterceptors; import io.grpc.InternalServerInterceptors;
@ -51,6 +52,8 @@ import io.grpc.ServerTransportFilter;
import io.grpc.Status; import io.grpc.Status;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -136,9 +139,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
Preconditions.checkNotNull(transportServers, "transportServers"); Preconditions.checkNotNull(transportServers, "transportServers");
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
this.transportServers = new ArrayList<>(transportServers); this.transportServers = new ArrayList<>(transportServers);
// TODO(notcarl): concatenate all listening ports in the Log Id.
this.logId = this.logId =
InternalLogId.allocate("Server", String.valueOf(transportServers.get(0).getPort())); InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
// Fork from the passed in context so that it does not propagate cancellation, it only // Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values. // inherits values.
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork(); this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
@ -181,21 +183,41 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
} }
} }
@Override @Override
public int getPort() { public int getPort() {
synchronized (lock) { synchronized (lock) {
checkState(started, "Not started"); checkState(started, "Not started");
checkState(!terminated, "Already terminated"); checkState(!terminated, "Already terminated");
for (InternalServer ts : transportServers) { for (InternalServer ts : transportServers) {
int port = ts.getPort(); SocketAddress addr = ts.getListenSocketAddress();
if (port != -1) { if (addr instanceof InetSocketAddress) {
return port; return ((InetSocketAddress) addr).getPort();
} }
} }
return -1; return -1;
} }
} }
@Override
public List<SocketAddress> getListenSockets() {
synchronized (lock) {
checkState(started, "Not started");
checkState(!terminated, "Already terminated");
return getListenSocketsIgnoringLifecycle();
}
}
private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
synchronized (lock) {
List<SocketAddress> addrs = new ArrayList<>(transportServers.size());
for (InternalServer ts : transportServers) {
addrs.add(ts.getListenSocketAddress());
}
return Collections.unmodifiableList(addrs);
}
}
@Override @Override
public List<ServerServiceDefinition> getServices() { public List<ServerServiceDefinition> getServices() {
List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices(); List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices();
@ -602,7 +624,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
public ListenableFuture<ServerStats> getStats() { public ListenableFuture<ServerStats> getStats() {
ServerStats.Builder builder = new ServerStats.Builder(); ServerStats.Builder builder = new ServerStats.Builder();
for (InternalServer ts : transportServers) { for (InternalServer ts : transportServers) {
builder.addListenSockets(ts.getListenSockets()); // TODO(carl-mastrangelo): remove the list and just add directly.
InternalInstrumented<SocketStats> stats = ts.getListenSocketStats();
if (stats != null ) {
builder.addListenSockets(Collections.singletonList(stats));
}
} }
serverCallTracer.updateBuilder(builder); serverCallTracer.updateBuilder(builder);
SettableFuture<ServerStats> ret = SettableFuture.create(); SettableFuture<ServerStats> ret = SettableFuture.create();

View File

@ -38,7 +38,7 @@ public class InProcessServerTest {
InProcessServer s = InProcessServer s =
new InProcessServer(builder, Collections.<ServerStreamTracer.Factory>emptyList()); new InProcessServer(builder, Collections.<ServerStreamTracer.Factory>emptyList());
Truth.assertThat(s.getPort()).isEqualTo(-1); Truth.assertThat(s.getListenSocketAddress()).isEqualTo(new InProcessSocketAddress("name"));
} }
@Test @Test

View File

@ -83,6 +83,7 @@ import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -1070,15 +1071,16 @@ public class ServerImplTest {
@Test @Test
public void getPort() throws Exception { public void getPort() throws Exception {
final InetSocketAddress addr = new InetSocketAddress(65535);
transportServer = new SimpleServer() { transportServer = new SimpleServer() {
@Override @Override
public int getPort() { public SocketAddress getListenSocketAddress() {
return 65535; return addr;
} }
}; };
createAndStartServer(); createAndStartServer();
assertThat(server.getPort()).isEqualTo(65535); assertThat(server.getPort()).isEqualTo(addr.getPort());
} }
@Test @Test
@ -1397,13 +1399,13 @@ public class ServerImplTest {
} }
@Override @Override
public int getPort() { public SocketAddress getListenSocketAddress() {
return -1; return new InetSocketAddress(12345);
} }
@Override @Override
public List<InternalInstrumented<SocketStats>> getListenSockets() { public InternalInstrumented<SocketStats> getListenSocketStats() {
return Collections.emptyList(); return null;
} }
@Override @Override

View File

@ -246,8 +246,8 @@ public abstract class AbstractInteropTest {
} }
@VisibleForTesting @VisibleForTesting
final int getPort() { final SocketAddress getListenAddress() {
return server.getPort(); return server.getListenSockets().iterator().next();
} }
protected ManagedChannel channel; protected ManagedChannel channel;

View File

@ -43,7 +43,7 @@ public class AutoWindowSizingOnTest extends AbstractInteropTest {
@Override @Override
protected ManagedChannel createChannel() { protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", getPort()) NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.negotiationType(NegotiationType.PLAINTEXT) .negotiationType(NegotiationType.PLAINTEXT)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
io.grpc.internal.TestingAccessor.setStatsImplementation( io.grpc.internal.TestingAccessor.setStatsImplementation(

View File

@ -62,7 +62,7 @@ public class Http2NettyTest extends AbstractInteropTest {
protected ManagedChannel createChannel() { protected ManagedChannel createChannel() {
try { try {
NettyChannelBuilder builder = NettyChannelBuilder NettyChannelBuilder builder = NettyChannelBuilder
.forAddress(TestUtils.testServerAddress(getPort())) .forAddress(TestUtils.testServerAddress((InetSocketAddress) getListenAddress()))
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.sslContext(GrpcSslContexts .sslContext(GrpcSslContexts
@ -80,18 +80,18 @@ public class Http2NettyTest extends AbstractInteropTest {
} }
@Test @Test
public void remoteAddr() throws Exception { public void remoteAddr() {
InetSocketAddress isa = (InetSocketAddress) obtainRemoteClientAddr(); InetSocketAddress isa = (InetSocketAddress) obtainRemoteClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress()); assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
// It should not be the same as the server // It should not be the same as the server
assertNotEquals(getPort(), isa.getPort()); assertNotEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
} }
@Test @Test
public void localAddr() throws Exception { public void localAddr() throws Exception {
InetSocketAddress isa = (InetSocketAddress) obtainLocalClientAddr(); InetSocketAddress isa = (InetSocketAddress) obtainLocalClientAddr();
assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress()); assertEquals(InetAddress.getLoopbackAddress(), isa.getAddress());
assertEquals(getPort(), isa.getPort()); assertEquals(((InetSocketAddress) getListenAddress()).getPort(), isa.getPort());
} }
@Test @Test

View File

@ -39,6 +39,7 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
@ -91,13 +92,14 @@ public class Http2OkHttpTest extends AbstractInteropTest {
} }
private OkHttpChannelBuilder createChannelBuilder() { private OkHttpChannelBuilder createChannelBuilder() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("localhost", getPort()) int port = ((InetSocketAddress) getListenAddress()).getPort();
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("localhost", port)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.connectionSpec(new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS) .connectionSpec(new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS)
.cipherSuites(TestUtils.preferredTestCiphers().toArray(new String[0])) .cipherSuites(TestUtils.preferredTestCiphers().toArray(new String[0]))
.build()) .build())
.overrideAuthority(GrpcUtil.authorityFromHostAndPort( .overrideAuthority(GrpcUtil.authorityFromHostAndPort(
TestUtils.TEST_SERVER_HOST, getPort())); TestUtils.TEST_SERVER_HOST, port));
io.grpc.internal.TestingAccessor.setStatsImplementation( io.grpc.internal.TestingAccessor.setStatsImplementation(
builder, createClientCensusStatsModule()); builder, createClientCensusStatsModule());
try { try {
@ -135,9 +137,10 @@ public class Http2OkHttpTest extends AbstractInteropTest {
@Test @Test
public void wrongHostNameFailHostnameVerification() throws Exception { public void wrongHostNameFailHostnameVerification() throws Exception {
int port = ((InetSocketAddress) getListenAddress()).getPort();
ManagedChannel channel = createChannelBuilder() ManagedChannel channel = createChannelBuilder()
.overrideAuthority(GrpcUtil.authorityFromHostAndPort( .overrideAuthority(GrpcUtil.authorityFromHostAndPort(
BAD_HOSTNAME, getPort())) BAD_HOSTNAME, port))
.build(); .build();
TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.TestServiceBlockingStub blockingStub =
TestServiceGrpc.newBlockingStub(channel); TestServiceGrpc.newBlockingStub(channel);
@ -157,9 +160,10 @@ public class Http2OkHttpTest extends AbstractInteropTest {
@Test @Test
public void hostnameVerifierWithBadHostname() throws Exception { public void hostnameVerifierWithBadHostname() throws Exception {
int port = ((InetSocketAddress) getListenAddress()).getPort();
ManagedChannel channel = createChannelBuilder() ManagedChannel channel = createChannelBuilder()
.overrideAuthority(GrpcUtil.authorityFromHostAndPort( .overrideAuthority(GrpcUtil.authorityFromHostAndPort(
BAD_HOSTNAME, getPort())) BAD_HOSTNAME, port))
.hostnameVerifier(new HostnameVerifier() { .hostnameVerifier(new HostnameVerifier() {
@Override @Override
public boolean verify(String hostname, SSLSession session) { public boolean verify(String hostname, SSLSession session) {
@ -177,9 +181,10 @@ public class Http2OkHttpTest extends AbstractInteropTest {
@Test @Test
public void hostnameVerifierWithCorrectHostname() throws Exception { public void hostnameVerifierWithCorrectHostname() throws Exception {
int port = ((InetSocketAddress) getListenAddress()).getPort();
ManagedChannel channel = createChannelBuilder() ManagedChannel channel = createChannelBuilder()
.overrideAuthority(GrpcUtil.authorityFromHostAndPort( .overrideAuthority(GrpcUtil.authorityFromHostAndPort(
TestUtils.TEST_SERVER_HOST, getPort())) TestUtils.TEST_SERVER_HOST, port))
.hostnameVerifier(new HostnameVerifier() { .hostnameVerifier(new HostnameVerifier() {
@Override @Override
public boolean verify(String hostname, SSLSession session) { public boolean verify(String hostname, SSLSession session) {

View File

@ -123,7 +123,7 @@ public class TransportCompressionTest extends AbstractInteropTest {
@Override @Override
protected ManagedChannel createChannel() { protected ManagedChannel createChannel() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", getPort()) NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.decompressorRegistry(decompressors) .decompressorRegistry(decompressors)
.compressorRegistry(compressors) .compressorRegistry(compressors)

View File

@ -23,7 +23,6 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InternalChannelz; import io.grpc.InternalChannelz;
@ -52,11 +51,11 @@ import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -94,9 +93,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final TransportTracer.Factory transportTracerFactory; private final TransportTracer.Factory transportTracerFactory;
private final InternalChannelz channelz; private final InternalChannelz channelz;
// Only modified in event loop but safe to read any time. Set at startup and unset at shutdown. // Only modified in event loop but safe to read any time. Set at startup and unset at shutdown.
// In the future we may have >1 listen socket. private final AtomicReference<InternalInstrumented<SocketStats>> listenSocketStats =
private volatile ImmutableList<InternalInstrumented<SocketStats>> listenSockets new AtomicReference<>();
= ImmutableList.of();
NettyServer( NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType, SocketAddress address, Class<? extends ServerChannel> channelType,
@ -139,20 +137,17 @@ class NettyServer implements InternalServer, InternalWithLogId {
} }
@Override @Override
public int getPort() { public SocketAddress getListenSocketAddress() {
if (channel == null) { if (channel == null) {
return -1; // server is not listening/bound yet, just return the original port.
return address;
} }
SocketAddress localAddr = channel.localAddress(); return channel.localAddress();
if (!(localAddr instanceof InetSocketAddress)) {
return -1;
}
return ((InetSocketAddress) localAddr).getPort();
} }
@Override @Override
public List<InternalInstrumented<SocketStats>> getListenSockets() { public InternalInstrumented<SocketStats> getListenSocketStats() {
return listenSockets; return listenSocketStats.get();
} }
@Override @Override
@ -260,7 +255,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
@Override @Override
public void run() { public void run() {
InternalInstrumented<SocketStats> listenSocket = new ListenSocket(channel); InternalInstrumented<SocketStats> listenSocket = new ListenSocket(channel);
listenSockets = ImmutableList.of(listenSocket); listenSocketStats.set(listenSocket);
channelz.addListenSocket(listenSocket); channelz.addListenSocket(listenSocket);
} }
}); });
@ -283,10 +278,10 @@ class NettyServer implements InternalServer, InternalWithLogId {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.log(Level.WARNING, "Error shutting down server", future.cause()); log.log(Level.WARNING, "Error shutting down server", future.cause());
} }
for (InternalInstrumented<SocketStats> listenSocket : listenSockets) { InternalInstrumented<SocketStats> stats = listenSocketStats.getAndSet(null);
channelz.removeListenSocket(listenSocket); if (stats != null) {
channelz.removeListenSocket(stats);
} }
listenSockets = null;
synchronized (NettyServer.this) { synchronized (NettyServer.this) {
listener.serverShutdown(); listener.serverShutdown();
} }

View File

@ -17,12 +17,12 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
@ -121,11 +121,13 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
this.listenAddresses.add(address); this.listenAddresses.add(address);
} }
/**
// TODO(notcarl): expose this API at a higher level. * Adds an additional address for this server to listen on. Callers must ensure that all socket
@VisibleForTesting * addresses are compatible with the Netty channel type, and that they don't conflict with each
NettyServerBuilder addPort(int port) { * other.
this.listenAddresses.add(new InetSocketAddress(port)); */
public NettyServerBuilder addListenAddress(SocketAddress listenAddress) {
this.listenAddresses.add(checkNotNull(listenAddress, "listenAddress"));
return this; return this;
} }

View File

@ -150,7 +150,7 @@ public class NettyClientTransportTest {
@Test @Test
public void testToString() throws Exception { public void testToString() throws Exception {
address = TestUtils.testServerAddress(12345); address = TestUtils.testServerAddress(new InetSocketAddress(12345));
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
String s = newTransport(newNegotiator()).toString(); String s = newTransport(newNegotiator()).toString();
transports.clear(); transports.clear();
@ -420,7 +420,7 @@ public class NettyClientTransportTest {
@Test @Test
public void failingToConstructChannelShouldFailGracefully() throws Exception { public void failingToConstructChannelShouldFailGracefully() throws Exception {
address = TestUtils.testServerAddress(12345); address = TestUtils.testServerAddress(new InetSocketAddress(12345));
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport( NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class), address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
@ -539,7 +539,7 @@ public class NettyClientTransportTest {
@Test @Test
public void getAttributes_negotiatorHandler() throws Exception { public void getAttributes_negotiatorHandler() throws Exception {
address = TestUtils.testServerAddress(12345); address = TestUtils.testServerAddress(new InetSocketAddress(12345));
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = newTransport(new NoopProtocolNegotiator()); NettyClientTransport transport = newTransport(new NoopProtocolNegotiator());
@ -550,7 +550,7 @@ public class NettyClientTransportTest {
@Test @Test
public void getEagAttributes_negotiatorHandler() throws Exception { public void getEagAttributes_negotiatorHandler() throws Exception {
address = TestUtils.testServerAddress(12345); address = TestUtils.testServerAddress(new InetSocketAddress(12345));
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NoopProtocolNegotiator npn = new NoopProtocolNegotiator(); NoopProtocolNegotiator npn = new NoopProtocolNegotiator();
@ -654,7 +654,7 @@ public class NettyClientTransportTest {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer( server = new NettyServer(
TestUtils.testServerAddress(0), TestUtils.testServerAddress(new InetSocketAddress(0)),
NioServerSocketChannel.class, NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(), new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator, group, group, negotiator,
@ -667,7 +667,7 @@ public class NettyClientTransportTest {
MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0, MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
channelz); channelz);
server.start(serverListener); server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort()); address = TestUtils.testServerAddress((InetSocketAddress) server.getListenSocketAddress());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
} }

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth; import com.google.common.truth.Truth;
import io.grpc.ServerStreamTracer.Factory; import io.grpc.ServerStreamTracer.Factory;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Rule; import org.junit.Rule;
@ -43,7 +44,7 @@ public class NettyServerBuilderTest {
@Test @Test
public void createMultipleServers() { public void createMultipleServers() {
builder.addPort(8081); builder.addListenAddress(new InetSocketAddress(8081));
List<NettyServer> servers = builder.buildTransportServers(ImmutableList.<Factory>of()); List<NettyServer> servers = builder.buildTransportServers(ImmutableList.<Factory>of());
Truth.assertThat(servers).hasSize(2); Truth.assertThat(servers).hasSize(2);

View File

@ -16,7 +16,6 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.InternalChannelz.id; import static io.grpc.InternalChannelz.id;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -86,7 +85,7 @@ public class NettyServerTest {
}); });
// Check that we got an actual port. // Check that we got an actual port.
assertThat(ns.getPort()).isGreaterThan(0); assertThat(((InetSocketAddress) ns.getListenSocketAddress()).getPort()).isGreaterThan(0);
// Cleanup // Cleanup
ns.shutdown(); ns.shutdown();
@ -114,7 +113,7 @@ public class NettyServerTest {
true, 0, // ignore true, 0, // ignore
channelz); channelz);
assertThat(ns.getPort()).isEqualTo(-1); assertThat(ns.getListenSocketAddress()).isEqualTo(addr);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -170,7 +169,7 @@ public class NettyServerTest {
}); });
Socket socket = new Socket(); Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", ns.getPort()), /* timeout= */ 8000); socket.connect(ns.getListenSocketAddress(), /* timeout= */ 8000);
countDownLatch.await(); countDownLatch.await();
socket.close(); socket.close();
@ -213,14 +212,14 @@ public class NettyServerTest {
shutdownCompleted.set(null); shutdownCompleted.set(null);
} }
}); });
assertThat(ns.getPort()).isGreaterThan(0); assertThat(((InetSocketAddress) ns.getListenSocketAddress()).getPort()).isGreaterThan(0);
InternalInstrumented<SocketStats> listenSocket = getOnlyElement(ns.getListenSockets()); InternalInstrumented<SocketStats> listenSocket = ns.getListenSocketStats();
assertSame(listenSocket, channelz.getSocket(id(listenSocket))); assertSame(listenSocket, channelz.getSocket(id(listenSocket)));
// very basic sanity check of the contents // very basic sanity check of the contents
SocketStats socketStats = listenSocket.getStats().get(); SocketStats socketStats = listenSocket.getStats().get();
assertEquals(ns.getPort(), ((InetSocketAddress) socketStats.local).getPort()); assertEquals(ns.getListenSocketAddress(), socketStats.local);
assertNull(socketStats.remote); assertNull(socketStats.remote);
// TODO(zpencer): uncomment when sock options are exposed // TODO(zpencer): uncomment when sock options are exposed

View File

@ -58,7 +58,7 @@ public class NettyTransportTest extends AbstractTransportTest {
protected List<? extends InternalServer> newServer( protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) { List<ServerStreamTracer.Factory> streamTracerFactories) {
return NettyServerBuilder return NettyServerBuilder
.forPort(0) .forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer) .setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServers(streamTracerFactories); .buildTransportServers(streamTracerFactories);
@ -68,7 +68,7 @@ public class NettyTransportTest extends AbstractTransportTest {
protected List<? extends InternalServer> newServer( protected List<? extends InternalServer> newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) { int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return NettyServerBuilder return NettyServerBuilder
.forPort(port) .forAddress(new InetSocketAddress("localhost", port))
.flowControlWindow(65 * 1024) .flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer) .setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServers(streamTracerFactories); .buildTransportServers(streamTracerFactories);
@ -76,7 +76,7 @@ public class NettyTransportTest extends AbstractTransportTest {
@Override @Override
protected String testAuthority(InternalServer server) { protected String testAuthority(InternalServer server) {
return "localhost:" + server.getPort(); return "localhost:" + server.getListenSocketAddress();
} }
@Override @Override
@ -91,9 +91,8 @@ public class NettyTransportTest extends AbstractTransportTest {
@Override @Override
protected ManagedClientTransport newClientTransport(InternalServer server) { protected ManagedClientTransport newClientTransport(InternalServer server) {
int port = server.getPort();
return clientFactory.newClientTransport( return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port), server.getListenSocketAddress(),
new ClientTransportFactory.ClientTransportOptions() new ClientTransportFactory.ClientTransportOptions()
.setAuthority(testAuthority(server))); .setAuthority(testAuthority(server)));
} }

View File

@ -66,7 +66,7 @@ public class OkHttpTransportTest extends AbstractTransportTest {
int port, List<ServerStreamTracer.Factory> streamTracerFactories) { int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return AccessProtectedHack.serverBuilderBuildTransportServer( return AccessProtectedHack.serverBuilderBuildTransportServer(
NettyServerBuilder NettyServerBuilder
.forPort(port) .forAddress(new InetSocketAddress(port))
.flowControlWindow(65 * 1024), .flowControlWindow(65 * 1024),
streamTracerFactories, streamTracerFactories,
fakeClockTransportTracer); fakeClockTransportTracer);
@ -74,12 +74,12 @@ public class OkHttpTransportTest extends AbstractTransportTest {
@Override @Override
protected String testAuthority(InternalServer server) { protected String testAuthority(InternalServer server) {
return "thebestauthority:" + server.getPort(); return "thebestauthority:" + server.getListenSocketAddress();
} }
@Override @Override
protected ManagedClientTransport newClientTransport(InternalServer server) { protected ManagedClientTransport newClientTransport(InternalServer server) {
int port = server.getPort(); int port = ((InetSocketAddress) server.getListenSocketAddress()).getPort();
return clientFactory.newClientTransport( return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port), new InetSocketAddress("localhost", port),
new ClientTransportFactory.ClientTransportOptions() new ClientTransportFactory.ClientTransportOptions()

View File

@ -74,6 +74,7 @@ import io.grpc.internal.TransportTracer;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -364,7 +365,11 @@ public abstract class AbstractTransportTest {
public void serverAlreadyListening() throws Exception { public void serverAlreadyListening() throws Exception {
client = null; client = null;
server.start(serverListener); server.start(serverListener);
int port = server.getPort(); int port = -1;
SocketAddress addr = server.getListenSocketAddress();
if (addr instanceof InetSocketAddress) {
port = ((InetSocketAddress) addr).getPort();
}
InternalServer server2 = InternalServer server2 =
Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory))); Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
thrown.expect(IOException.class); thrown.expect(IOException.class);
@ -374,7 +379,11 @@ public abstract class AbstractTransportTest {
@Test @Test
public void openStreamPreventsTermination() throws Exception { public void openStreamPreventsTermination() throws Exception {
server.start(serverListener); server.start(serverListener);
int port = server.getPort(); int port = -1;
SocketAddress addr = server.getListenSocketAddress();
if (addr instanceof InetSocketAddress) {
port = ((InetSocketAddress) addr).getPort();
}
client = newClientTransport(server); client = newClientTransport(server);
startTransport(client, mockClientTransportListener); startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener MockServerTransportListener serverTransportListener
@ -1800,15 +1809,19 @@ public abstract class AbstractTransportTest {
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketStats clientSocketStats = client.getStats().get(); SocketStats clientSocketStats = client.getStats().get();
assertEquals(clientAddress, clientSocketStats.local); assertEquals(
assertEquals(serverAddress, clientSocketStats.remote); "clientLocal " + clientStream.getAttributes(), clientAddress, clientSocketStats.local);
assertEquals(
"clientRemote " + clientStream.getAttributes(), serverAddress, clientSocketStats.remote);
// very basic sanity check that socket options are populated // very basic sanity check that socket options are populated
assertNotNull(clientSocketStats.socketOptions.lingerSeconds); assertNotNull(clientSocketStats.socketOptions.lingerSeconds);
assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
SocketStats serverSocketStats = serverTransportListener.transport.getStats().get(); SocketStats serverSocketStats = serverTransportListener.transport.getStats().get();
assertEquals(serverAddress, serverSocketStats.local); assertEquals(
assertEquals(clientAddress, serverSocketStats.remote); "serverLocal " + serverStream.getAttributes(), serverAddress, serverSocketStats.local);
assertEquals(
"serverRemote " + serverStream.getAttributes(), clientAddress, serverSocketStats.remote);
// very basic sanity check that socket options are populated // very basic sanity check that socket options are populated
assertNotNull(serverSocketStats.socketOptions.lingerSeconds); assertNotNull(serverSocketStats.socketOptions.lingerSeconds);
assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF")); assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));

View File

@ -67,11 +67,11 @@ public class TestUtils {
* Creates a new {@link InetSocketAddress} on localhost that overrides the host with * Creates a new {@link InetSocketAddress} on localhost that overrides the host with
* {@link #TEST_SERVER_HOST}. * {@link #TEST_SERVER_HOST}.
*/ */
public static InetSocketAddress testServerAddress(int port) { public static InetSocketAddress testServerAddress(InetSocketAddress originalSockAddr) {
try { try {
InetAddress inetAddress = InetAddress.getByName("localhost"); InetAddress inetAddress = InetAddress.getByName("localhost");
inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress()); inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
return new InetSocketAddress(inetAddress, port); return new InetSocketAddress(inetAddress, originalSockAddr.getPort());
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }