diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index a422506f95..7922ebd21a 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -82,11 +82,21 @@ final class InProcessServer implements InternalServer { return new InProcessSocketAddress(name); } + @Override + public List getListenSocketAddresses() { + return Collections.singletonList(getListenSocketAddress()); + } + @Override public InternalInstrumented getListenSocketStats() { return null; } + @Override + public List> getListenSocketStatsList() { + return null; + } + @Override public void shutdown() { if (!registry.remove(name, this)) { diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index f768c608b1..25291c2848 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -33,7 +33,6 @@ import io.grpc.internal.ServerImplBuilder; import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; import io.grpc.internal.SharedResourcePool; import java.io.File; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -109,7 +108,7 @@ public final class InProcessServerBuilder extends final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder { @Override - public List buildClientTransportServers( + public InternalServer buildClientTransportServers( List streamTracerFactories) { return buildTransportServers(streamTracerFactories); } @@ -187,9 +186,9 @@ public final class InProcessServerBuilder extends return this; } - List buildTransportServers( + InProcessServer buildTransportServers( List streamTracerFactories) { - return Collections.singletonList(new InProcessServer(this, streamTracerFactories)); + return new InProcessServer(this, streamTracerFactories); } @Override diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java index 389b24b45a..0445ae3dfa 100644 --- a/core/src/main/java/io/grpc/internal/InternalServer.java +++ b/core/src/main/java/io/grpc/internal/InternalServer.java @@ -20,12 +20,13 @@ import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; import java.io.IOException; import java.net.SocketAddress; +import java.util.List; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** - * An object that accepts new incoming connections. This would commonly encapsulate a bound socket - * that {@code accept()}s new connections. + * An object that accepts new incoming connections on one or more listening socket addresses. + * This would commonly encapsulate a bound socket that {@code accept()}s new connections. */ @ThreadSafe public interface InternalServer { @@ -49,13 +50,25 @@ public interface InternalServer { void shutdown(); /** - * Returns the listening socket address. May change after {@link start(ServerListener)} is + * Returns the first listening socket address. May change after {@link start(ServerListener)} is * called. */ SocketAddress getListenSocketAddress(); /** - * Returns the listen socket stats of this server. May return {@code null}. + * Returns the first listen socket stats of this server. May return {@code null}. */ @Nullable InternalInstrumented getListenSocketStats(); + + /** + * Returns a list of listening socket addresses. May change after {@link start(ServerListener)} + * is called. + */ + List getListenSocketAddresses(); + + /** + * Returns a list of listen socket stats of this server. May return {@code null}. + */ + @Nullable List> getListenSocketStatsList(); + } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index a400fca033..6c66aac07a 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -110,12 +110,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @GuardedBy("lock") private boolean serverShutdownCallbackInvoked; @GuardedBy("lock") private boolean terminated; /** Service encapsulating something similar to an accept() socket. */ - private final List transportServers; + private final InternalServer transportServer; private final Object lock = new Object(); @GuardedBy("lock") private boolean transportServersTerminated; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ @GuardedBy("lock") private final Set transports = new HashSet<>(); - @GuardedBy("lock") private int activeTransportServers; private final Context rootContext; @@ -131,20 +130,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume * Construct a server. * * @param builder builder with configuration for server - * @param transportServers transport servers that will create new incoming transports + * @param transportServer transport servers that will create new incoming transports * @param rootContext context that callbacks for new RPCs should be derived from */ ServerImpl( ServerImplBuilder builder, - List transportServers, + InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); - Preconditions.checkNotNull(transportServers, "transportServers"); - Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); - this.transportServers = new ArrayList<>(transportServers); + this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); this.logId = InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); // Fork from the passed in context so that it does not propagate cancellation, it only @@ -179,10 +176,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume // Start and wait for any ports to actually be bound. ServerListenerImpl listener = new ServerListenerImpl(); - for (InternalServer ts : transportServers) { - ts.start(listener); - activeTransportServers++; - } + transportServer.start(listener); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; return this; @@ -195,8 +189,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume synchronized (lock) { checkState(started, "Not started"); checkState(!terminated, "Already terminated"); - for (InternalServer ts : transportServers) { - SocketAddress addr = ts.getListenSocketAddress(); + for (SocketAddress addr: transportServer.getListenSocketAddresses()) { if (addr instanceof InetSocketAddress) { return ((InetSocketAddress) addr).getPort(); } @@ -216,11 +209,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private List getListenSocketsIgnoringLifecycle() { synchronized (lock) { - List addrs = new ArrayList<>(transportServers.size()); - for (InternalServer ts : transportServers) { - addrs.add(ts.getListenSocketAddress()); - } - return Collections.unmodifiableList(addrs); + return Collections.unmodifiableList(transportServer.getListenSocketAddresses()); } } @@ -268,9 +257,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } } if (shutdownTransportServers) { - for (InternalServer ts : transportServers) { - ts.shutdown(); - } + transportServer.shutdown(); } return this; } @@ -388,8 +375,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ArrayList copiedTransports; Status shutdownNowStatusCopy; synchronized (lock) { - activeTransportServers--; - if (activeTransportServers != 0) { + if (serverShutdownCallbackInvoked) { return; } @@ -662,12 +648,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public ListenableFuture getStats() { ServerStats.Builder builder = new ServerStats.Builder(); - for (InternalServer ts : transportServers) { - // TODO(carl-mastrangelo): remove the list and just add directly. - InternalInstrumented stats = ts.getListenSocketStats(); - if (stats != null ) { - builder.addListenSockets(Collections.singletonList(stats)); - } + List> stats = transportServer.getListenSocketStatsList(); + if (stats != null ) { + builder.addListenSockets(stats); } serverCallTracer.updateBuilder(builder); SettableFuture ret = SettableFuture.create(); @@ -679,7 +662,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("transportServers", transportServers) + .add("transportServer", transportServer) .toString(); } diff --git a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java index 9208394183..04e6059d13 100644 --- a/core/src/main/java/io/grpc/internal/ServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/ServerImplBuilder.java @@ -97,7 +97,7 @@ public final class ServerImplBuilder extends ServerBuilder { * is meant for Transport implementors and should not be used by normal users. */ public interface ClientTransportServersBuilder { - List buildClientTransportServers( + InternalServer buildClientTransportServers( List streamTracerFactories); } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java index 9f56a3562d..6095e29be7 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; -import com.google.common.collect.Iterables; import io.grpc.ServerStreamTracer; import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; @@ -55,8 +54,8 @@ public class InProcessServerBuilderTest { @Test public void scheduledExecutorService_default() { InProcessServerBuilder builder = InProcessServerBuilder.forName("foo"); - InProcessServer server = Iterables.getOnlyElement( - builder.buildTransportServers(new ArrayList())); + InProcessServer server = + builder.buildTransportServers(new ArrayList()); ObjectPool scheduledExecutorServicePool = server.getScheduledExecutorServicePool(); @@ -80,8 +79,8 @@ public class InProcessServerBuilderTest { InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService); assertSame(builder, builder1); - InProcessServer server = Iterables.getOnlyElement( - builder1.buildTransportServers(new ArrayList())); + InProcessServer server = + builder1.buildTransportServers(new ArrayList()); ObjectPool scheduledExecutorServicePool = server.getScheduledExecutorServicePool(); diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index f7e325ad5a..a987937592 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -19,7 +19,6 @@ package io.grpc.inprocess; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import com.google.common.collect.ImmutableList; import io.grpc.CallOptions; import io.grpc.ManagedChannel; import io.grpc.Metadata; @@ -55,16 +54,16 @@ public class InProcessTransportTest extends AbstractTransportTest { public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); @Override - protected List newServer( + protected InternalServer newServer( List streamTracerFactories) { InProcessServerBuilder builder = InProcessServerBuilder .forName(TRANSPORT_NAME) .maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); - return ImmutableList.of(new InProcessServer(builder, streamTracerFactories)); + return new InProcessServer(builder, streamTracerFactories); } @Override - protected List newServer( + protected InternalServer newServer( int port, List streamTracerFactories) { return newServer(streamTracerFactories); } diff --git a/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java index 6d8f3a1ca4..152fdf2252 100644 --- a/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java @@ -16,7 +16,6 @@ package io.grpc.inprocess; -import com.google.common.collect.ImmutableList; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalInstrumented; import io.grpc.ServerStreamTracer; @@ -31,6 +30,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.SharedResourcePool; import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import javax.annotation.Nullable; @@ -52,13 +52,13 @@ public final class StandaloneInProcessTransportTest extends AbstractTransportTes private TestServer currentServer; @Override - protected List newServer( + protected InternalServer newServer( List streamTracerFactories) { - return ImmutableList.of(new TestServer(streamTracerFactories)); + return new TestServer(streamTracerFactories); } @Override - protected List newServer( + protected InternalServer newServer( int port, List streamTracerFactories) { return newServer(streamTracerFactories); } @@ -126,11 +126,22 @@ public final class StandaloneInProcessTransportTest extends AbstractTransportTes return new SocketAddress() {}; } + @Override + public List getListenSocketAddresses() { + return Collections.singletonList(getListenSocketAddress()); + } + @Override @Nullable public InternalInstrumented getListenSocketStats() { return null; } + + @Override + @Nullable + public List> getListenSocketStatsList() { + return null; + } } /** Wraps the server listener to ensure we don't accept new transports after shutdown. */ diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java index 5c2b9c5691..e19db3e963 100644 --- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java @@ -40,7 +40,6 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import com.google.common.base.Objects; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.MoreExecutors; @@ -118,13 +117,13 @@ public abstract class AbstractTransportTest { * Returns a new server that when started will be able to be connected to from the client. Each * returned instance should be new and yet be accessible by new client transports. */ - protected abstract List newServer( + protected abstract InternalServer newServer( List streamTracerFactories); /** * Builds a new server that is listening on the same port as the given server instance does. */ - protected abstract List newServer( + protected abstract InternalServer newServer( int port, List streamTracerFactories); /** @@ -230,7 +229,7 @@ public abstract class AbstractTransportTest { @Before public void setUp() { - server = Iterables.getOnlyElement(newServer(Arrays.asList(serverStreamTracerFactory))); + server = newServer(Arrays.asList(serverStreamTracerFactory)); callOptions = CallOptions.DEFAULT.withStreamTracerFactory(clientStreamTracerFactory); } @@ -401,8 +400,7 @@ public abstract class AbstractTransportTest { if (addr instanceof InetSocketAddress) { port = ((InetSocketAddress) addr).getPort(); } - InternalServer server2 = - Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory))); + InternalServer server2 = newServer(port, Arrays.asList(serverStreamTracerFactory)); thrown.expect(IOException.class); server2.start(new MockServerListener()); } @@ -421,7 +419,7 @@ public abstract class AbstractTransportTest { assumeTrue("transport is not using InetSocketAddress", port != -1); server.shutdown(); - server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory))); + server = newServer(port, Arrays.asList(serverStreamTracerFactory)); boolean success; Thread.currentThread().interrupt(); try { @@ -473,7 +471,7 @@ public abstract class AbstractTransportTest { // resources. There may be cases this is impossible in the future, but for now it is a useful // property. serverListener = new MockServerListener(); - server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory))); + server = newServer(port, Arrays.asList(serverStreamTracerFactory)); server.start(serverListener); // Try to "flush" out any listener notifications on client and server. This also ensures that diff --git a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java index 1d6a2ec9f7..ad8cf41598 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java @@ -46,7 +46,7 @@ public class ServerImplBuilderTest { builder = new ServerImplBuilder( new ClientTransportServersBuilder() { @Override - public List buildClientTransportServers( + public InternalServer buildClientTransportServers( List streamTracerFactories) { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 5ca3685ee8..5b5f5384d3 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -44,7 +44,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -90,7 +89,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -205,7 +203,7 @@ public class ServerImplTest { builder = new ServerImplBuilder( new ClientTransportServersBuilder() { @Override - public List buildClientTransportServers( + public InternalServer buildClientTransportServers( List streamTracerFactories) { throw new UnsupportedOperationException(); } @@ -226,39 +224,19 @@ public class ServerImplTest { } @Test - public void multiport() throws Exception { - final CountDownLatch starts = new CountDownLatch(2); - final CountDownLatch shutdowns = new CountDownLatch(2); - - final class Serv extends SimpleServer { + public void getListenSockets() throws Exception { + int port = 800; + final List addresses = + Collections.singletonList(new InetSocketAddress(800)); + transportServer = new SimpleServer() { @Override - public void start(ServerListener listener) throws IOException { - super.start(listener); - starts.countDown(); + public List getListenSocketAddresses() { + return addresses; } - - @Override - public void shutdown() { - super.shutdown(); - shutdowns.countDown(); - } - } - - SimpleServer transportServer1 = new Serv(); - SimpleServer transportServer2 = new Serv(); - assertNull(server); - builder.fallbackHandlerRegistry(fallbackRegistry); - builder.executorPool = executorPool; - server = new ServerImpl( - builder, ImmutableList.of(transportServer1, transportServer2), SERVER_CONTEXT); - - server.start(); - assertTrue(starts.await(1, TimeUnit.SECONDS)); - assertEquals(2, shutdowns.getCount()); - - server.shutdown(); - assertTrue(shutdowns.await(1, TimeUnit.SECONDS)); - assertTrue(server.awaitTermination(1, TimeUnit.SECONDS)); + }; + createAndStartServer(); + assertEquals(port, server.getPort()); + assertThat(server.getListenSockets()).isEqualTo(addresses); } @Test @@ -1131,15 +1109,22 @@ public class ServerImplTest { @Test public void getPort() throws Exception { final InetSocketAddress addr = new InetSocketAddress(65535); + final List addrs = Collections.singletonList(addr); transportServer = new SimpleServer() { @Override - public SocketAddress getListenSocketAddress() { + public InetSocketAddress getListenSocketAddress() { return addr; } + + @Override + public List getListenSocketAddresses() { + return addrs; + } }; createAndStartServer(); assertThat(server.getPort()).isEqualTo(addr.getPort()); + assertThat(server.getListenSockets()).isEqualTo(addrs); } @Test @@ -1431,7 +1416,7 @@ public class ServerImplTest { builder.fallbackHandlerRegistry(fallbackRegistry); builder.executorPool = executorPool; - server = new ServerImpl(builder, Collections.singletonList(transportServer), SERVER_CONTEXT); + server = new ServerImpl(builder, transportServer, SERVER_CONTEXT); } private void verifyExecutorsAcquired() { @@ -1469,11 +1454,21 @@ public class ServerImplTest { return new InetSocketAddress(12345); } + @Override + public List getListenSocketAddresses() { + return Collections.singletonList(new InetSocketAddress(12345)); + } + @Override public InternalInstrumented getListenSocketStats() { return null; } + @Override + public List> getListenSocketStatsList() { + return null; + } + @Override public void shutdown() { listener.serverShutdown(); diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java index 915de274af..876c55ebfc 100644 --- a/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java @@ -30,7 +30,7 @@ import java.util.List; */ @Internal public final class InternalNettyServerBuilder { - public static List buildTransportServers(NettyServerBuilder builder, + public static NettyServer buildTransportServers(NettyServerBuilder builder, List streamTracerFactories) { return builder.buildTransportServers(streamTracerFactories); } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 5962207873..4c16ac50a2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -45,17 +45,26 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.ChannelGroupFutureListener; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; @@ -66,7 +75,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private static final Logger log = Logger.getLogger(InternalServer.class.getName()); private final InternalLogId logId; - private final SocketAddress address; + private final List addresses; private final ChannelFactory channelFactory; private final Map, ?> channelOptions; private final Map, ?> childChannelOptions; @@ -78,7 +87,7 @@ class NettyServer implements InternalServer, InternalWithLogId { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerListener listener; - private Channel channel; + private final ChannelGroup channelGroup; private final boolean autoFlowControl; private final int flowControlWindow; private final int maxMessageSize; @@ -96,11 +105,14 @@ class NettyServer implements InternalServer, InternalWithLogId { private final List streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; private final InternalChannelz channelz; - // Only modified in event loop but safe to read any time. - private volatile InternalInstrumented listenSocketStats; + private volatile List> listenSocketStatsList = + Collections.emptyList(); + private volatile boolean terminated; + private final EventLoop bossExecutor; NettyServer( - SocketAddress address, ChannelFactory channelFactory, + List addresses, + ChannelFactory channelFactory, Map, ?> channelOptions, Map, ?> childChannelOptions, ObjectPool bossGroupPool, @@ -116,7 +128,7 @@ class NettyServer implements InternalServer, InternalWithLogId { long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes, InternalChannelz channelz) { - this.address = address; + this.addresses = checkNotNull(addresses, "addresses"); this.channelFactory = checkNotNull(channelFactory, "channelFactory"); checkNotNull(channelOptions, "channelOptions"); this.channelOptions = new HashMap, Object>(channelOptions); @@ -126,6 +138,8 @@ class NettyServer implements InternalServer, InternalWithLogId { this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool"); this.forceHeapBuffer = forceHeapBuffer; this.bossGroup = bossGroupPool.getObject(); + this.bossExecutor = bossGroup.next(); + this.channelGroup = new DefaultChannelGroup(this.bossExecutor); this.workerGroup = workerGroupPool.getObject(); this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); @@ -144,32 +158,53 @@ class NettyServer implements InternalServer, InternalWithLogId { this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); this.channelz = Preconditions.checkNotNull(channelz); - this.logId = - InternalLogId.allocate(getClass(), address != null ? address.toString() : "No address"); + this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" : + String.valueOf(addresses)); } @Override public SocketAddress getListenSocketAddress() { - if (channel == null) { + Iterator it = channelGroup.iterator(); + if (it.hasNext()) { + return it.next().localAddress(); + } else { // server is not listening/bound yet, just return the original port. - return address; + return addresses.isEmpty() ? null : addresses.get(0); } - return channel.localAddress(); + } + + @Override + public List getListenSocketAddresses() { + List listenSocketAddresses = new ArrayList<>(); + for (Channel c: channelGroup) { + listenSocketAddresses.add(c.localAddress()); + } + // server is not listening/bound yet, just return the original ports. + if (listenSocketAddresses.isEmpty()) { + listenSocketAddresses.addAll(addresses); + } + return listenSocketAddresses; } @Override public InternalInstrumented getListenSocketStats() { - return listenSocketStats; + List> savedListenSocketStatsList = listenSocketStatsList; + return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0); + } + + @Override + public List> getListenSocketStatsList() { + return listenSocketStatsList; } @Override public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); - ServerBootstrap b = new ServerBootstrap(); + final ServerBootstrap b = new ServerBootstrap(); b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer)); b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer)); - b.group(bossGroup, workerGroup); + b.group(bossExecutor, workerGroup); b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. b.childOption(SO_KEEPALIVE, true); @@ -226,8 +261,8 @@ class NettyServer implements InternalServer, InternalWithLogId { ServerTransportListener transportListener; // This is to order callbacks on the listener, not to guard access to channel. synchronized (NettyServer.this) { - if (channel != null && !channel.isOpen()) { - // Server already shutdown. + if (terminated) { + // Server already terminated. ch.close(); return; } @@ -258,51 +293,77 @@ class NettyServer implements InternalServer, InternalWithLogId { ch.closeFuture().addListener(loopReleaser); } }); - // Bind and start to accept incoming connections. - ChannelFuture future = b.bind(address); - // We'd love to observe interruption, but if interrupted we will need to close the channel, - // which itself would need an await() to guarantee the port is not used when the method returns. - // See #6850 - future.awaitUninterruptibly(); - if (!future.isSuccess()) { - throw new IOException(String.format("Failed to bind to address %s", address), future.cause()); + Future> bindCallFuture = + bossExecutor.submit( + new Callable>() { + @Override + public Map call() { + Map bindFutures = new HashMap<>(); + for (SocketAddress address: addresses) { + ChannelFuture future = b.bind(address); + channelGroup.add(future.channel()); + bindFutures.put(future, address); + } + return bindFutures; + } + } + ); + Map channelFutures = + bindCallFuture.awaitUninterruptibly().getNow(); + + if (!bindCallFuture.isSuccess()) { + channelGroup.close().awaitUninterruptibly(); + throw new IOException(String.format("Failed to bind to addresses %s", + addresses), bindCallFuture.cause()); } - channel = future.channel(); - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - listenSocketStats = new ListenSocket(channel); - channelz.addListenSocket(listenSocketStats); + final List> socketStats = new ArrayList<>(); + for (Map.Entry entry: channelFutures.entrySet()) { + // We'd love to observe interruption, but if interrupted we will need to close the channel, + // which itself would need an await() to guarantee the port is not used when the method + // returns. See #6850 + final ChannelFuture future = entry.getKey(); + if (!future.awaitUninterruptibly().isSuccess()) { + channelGroup.close().awaitUninterruptibly(); + throw new IOException(String.format("Failed to bind to address %s", + entry.getValue()), future.cause()); } - }); + final InternalInstrumented listenSocketStats = + new ListenSocket(future.channel()); + channelz.addListenSocket(listenSocketStats); + socketStats.add(listenSocketStats); + future.channel().closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + channelz.removeListenSocket(listenSocketStats); + } + }); + } + listenSocketStatsList = Collections.unmodifiableList(socketStats); } @Override public void shutdown() { - if (channel == null || !channel.isOpen()) { - // Already closed. + if (terminated) { return; } - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - log.log(Level.WARNING, "Error shutting down server", future.cause()); - } - InternalInstrumented stats = listenSocketStats; - listenSocketStats = null; - if (stats != null) { - channelz.removeListenSocket(stats); - } - sharedResourceReferenceCounter.release(); - protocolNegotiator.close(); - synchronized (NettyServer.this) { - listener.serverShutdown(); - } - } - }); + ChannelGroupFuture groupFuture = channelGroup.close() + .addListener(new ChannelGroupFutureListener() { + @Override + public void operationComplete(ChannelGroupFuture future) throws Exception { + if (!future.isSuccess()) { + log.log(Level.WARNING, "Error closing server channel group", future.cause()); + } + sharedResourceReferenceCounter.release(); + protocolNegotiator.close(); + listenSocketStatsList = Collections.emptyList(); + synchronized (NettyServer.this) { + listener.serverShutdown(); + terminated = true; + } + } + }); try { - channel.closeFuture().await(); + groupFuture.await(); } catch (InterruptedException e) { log.log(Level.FINE, "Interrupted while shutting down", e); Thread.currentThread().interrupt(); @@ -318,7 +379,7 @@ class NettyServer implements InternalServer, InternalWithLogId { public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("address", address) + .add("addresses", addresses) .toString(); } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index f243c1067a..9b67cd21f1 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -54,7 +54,6 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -165,7 +164,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder buildClientTransportServers( + public InternalServer buildClientTransportServers( List streamTracerFactories) { return buildTransportServers(streamTracerFactories); } @@ -623,27 +622,22 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder buildTransportServers( + NettyServer buildTransportServers( List streamTracerFactories) { assertEventLoopsAndChannelType(); ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator( this.serverImplBuilder.getExecutorPool()); - List transportServers = new ArrayList<>(listenAddresses.size()); - for (SocketAddress listenAddress : listenAddresses) { - NettyServer transportServer = new NettyServer( - listenAddress, channelFactory, channelOptions, childChannelOptions, - bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, - streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection, - autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, - keepAliveTimeInNanos, keepAliveTimeoutInNanos, - maxConnectionIdleInNanos, maxConnectionAgeInNanos, - maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, - eagAttributes, this.serverImplBuilder.getChannelz()); - transportServers.add(transportServer); - } - return Collections.unmodifiableList(transportServers); + return new NettyServer( + listenAddresses, channelFactory, channelOptions, childChannelOptions, + bossEventLoopGroupPool, workerEventLoopGroupPool, forceHeapBuffer, negotiator, + streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection, + autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, + keepAliveTimeInNanos, keepAliveTimeoutInNanos, + maxConnectionIdleInNanos, maxConnectionAgeInNanos, + maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, + eagAttributes, this.serverImplBuilder.getChannelz()); } @VisibleForTesting diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 7bff844987..fe3e604b9e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -773,7 +773,7 @@ public class NettyClientTransportTest { private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { server = new NettyServer( - TestUtils.testServerAddress(new InetSocketAddress(0)), + TestUtils.testServerAddresses(new InetSocketAddress(0)), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), new HashMap, Object>(), diff --git a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java index 44e13d80c3..6d8192322a 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerBuilderTest.java @@ -26,7 +26,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; -import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; @@ -46,12 +45,12 @@ public class NettyServerBuilderTest { private NettyServerBuilder builder = NettyServerBuilder.forPort(8080); @Test - public void createMultipleServers() { + public void addMultipleListenAddresses() { builder.addListenAddress(new InetSocketAddress(8081)); - List servers = + NettyServer server = builder.buildTransportServers(ImmutableList.of()); - Truth.assertThat(servers).hasSize(2); + Truth.assertThat(server.getListenSocketAddresses()).hasSize(2); } @Test diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index 94c42cff03..3f277ed435 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -19,9 +19,18 @@ package io.grpc.netty; import static com.google.common.truth.Truth.assertThat; import static io.grpc.InternalChannelz.id; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.InternalChannelz; @@ -36,31 +45,63 @@ import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.netty.channel.Channel; +import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; import io.netty.channel.ReflectiveChannelFactory; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AsciiString; +import io.netty.util.concurrent.Future; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class NettyServerTest { private final InternalChannelz channelz = new InternalChannelz(); private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(1); + private final ChannelFactory channelFactory = + new ReflectiveChannelFactory<>(NioServerSocketChannel.class); + + @Mock + EventLoopGroup mockEventLoopGroup; + @Mock + EventLoop mockEventLoop; + @Mock + Future> bindFuture; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + when(mockEventLoopGroup.next()).thenReturn(mockEventLoop); + when(mockEventLoop + .submit(ArgumentMatchers.>>any())) + .thenReturn(bindFuture); + } @After public void tearDown() throws Exception { @@ -90,7 +131,7 @@ public class NettyServerTest { NoHandlerProtocolNegotiator protocolNegotiator = new NoHandlerProtocolNegotiator(); NettyServer ns = new NettyServer( - addr, + Arrays.asList(addr), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), new HashMap, Object>(), @@ -134,11 +175,147 @@ public class NettyServerTest { assertThat(protocolNegotiator.closed).isTrue(); } + @Test + public void multiPortStartStopGet() throws Exception { + InetSocketAddress addr1 = new InetSocketAddress(0); + InetSocketAddress addr2 = new InetSocketAddress(0); + + NettyServer ns = new NettyServer( + Arrays.asList(addr1, addr2), + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), + new HashMap, Object>(), + new HashMap, Object>(), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), + false, + ProtocolNegotiators.plaintext(), + Collections.emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + false, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0, // ignore + Attributes.EMPTY, + channelz); + final SettableFuture shutdownCompleted = SettableFuture.create(); + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + return new NoopServerTransportListener(); + } + + @Override + public void serverShutdown() { + shutdownCompleted.set(null); + } + }); + + // SocketStats won't be available until the event loop task of adding SocketStats created by + // ns.start() complete. So submit a noop task and await until it's drained. + eventLoop.submit(new Runnable() { + @Override + public void run() {} + }).await(5, TimeUnit.SECONDS); + + assertEquals(2, ns.getListenSocketAddresses().size()); + for (SocketAddress address: ns.getListenSocketAddresses()) { + assertThat(((InetSocketAddress) address).getPort()).isGreaterThan(0); + } + + List> stats = ns.getListenSocketStatsList(); + assertEquals(2, ns.getListenSocketStatsList().size()); + for (InternalInstrumented listenSocket : stats) { + assertSame(listenSocket, channelz.getSocket(id(listenSocket))); + // very basic sanity check of the contents + SocketStats socketStats = listenSocket.getStats().get(); + assertThat(ns.getListenSocketAddresses()).contains(socketStats.local); + assertNull(socketStats.remote); + } + + // Cleanup + ns.shutdown(); + shutdownCompleted.get(); + + // listen socket is removed + for (InternalInstrumented listenSocket : stats) { + assertNull(channelz.getSocket(id(listenSocket))); + } + } + + @Test(timeout = 60000) + public void multiPortConnections() throws Exception { + InetSocketAddress addr1 = new InetSocketAddress(0); + InetSocketAddress addr2 = new InetSocketAddress(0); + final CountDownLatch allPortsConnectedCountDown = new CountDownLatch(2); + + NettyServer ns = new NettyServer( + Arrays.asList(addr1, addr2), + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), + new HashMap, Object>(), + new HashMap, Object>(), + new FixedObjectPool<>(eventLoop), + new FixedObjectPool<>(eventLoop), + false, + ProtocolNegotiators.plaintext(), + Collections.emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + false, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0, // ignore + Attributes.EMPTY, + channelz); + final SettableFuture shutdownCompleted = SettableFuture.create(); + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + allPortsConnectedCountDown.countDown(); + return new NoopServerTransportListener(); + } + + @Override + public void serverShutdown() { + shutdownCompleted.set(null); + } + }); + + // SocketStats won't be available until the event loop task of adding SocketStats created by + // ns.start() complete. So submit a noop task and await until it's drained. + eventLoop.submit(new Runnable() { + @Override + public void run() {} + }).await(5, TimeUnit.SECONDS); + + List serverSockets = ns.getListenSocketAddresses(); + assertEquals(2, serverSockets.size()); + + for (int i = 0; i < 2; i++) { + Socket socket = new Socket(); + socket.connect(serverSockets.get(i), /* timeout= */ 8000); + socket.close(); + } + allPortsConnectedCountDown.await(); + // Cleanup + ns.shutdown(); + shutdownCompleted.get(); + } + @Test public void getPort_notStarted() { InetSocketAddress addr = new InetSocketAddress(0); + List addresses = Collections.singletonList(addr); NettyServer ns = new NettyServer( - addr, + addresses, new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), new HashMap, Object>(), @@ -161,6 +338,7 @@ public class NettyServerTest { channelz); assertThat(ns.getListenSocketAddress()).isEqualTo(addr); + assertThat(ns.getListenSocketAddresses()).isEqualTo(addresses); } @Test(timeout = 60000) @@ -211,7 +389,7 @@ public class NettyServerTest { TestProtocolNegotiator protocolNegotiator = new TestProtocolNegotiator(); InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( - addr, + Arrays.asList(addr), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), childChannelOptions, @@ -258,7 +436,7 @@ public class NettyServerTest { public void channelzListenSocket() throws Exception { InetSocketAddress addr = new InetSocketAddress(0); NettyServer ns = new NettyServer( - addr, + Arrays.asList(addr), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), new HashMap, Object>(), @@ -320,6 +498,110 @@ public class NettyServerTest { assertNull(channelz.getSocket(id(listenSocket))); } + @Test + @SuppressWarnings("unchecked") + public void testBindScheduleFailure() throws Exception { + when(bindFuture.awaitUninterruptibly()).thenReturn(bindFuture); + when(bindFuture.isSuccess()).thenReturn(false); + when(bindFuture.getNow()).thenReturn(null); + Throwable mockCause = mock(Throwable.class); + when(bindFuture.cause()).thenReturn(mockCause); + Future mockFuture = (Future) mock(Future.class); + doReturn(mockFuture).when(mockEventLoopGroup).submit(any(Runnable.class)); + SocketAddress addr = new InetSocketAddress(0); + verifyServerNotStart(Collections.singletonList(addr), mockEventLoopGroup, + IOException.class, "Failed to bind to addresses " + Arrays.asList(addr)); + } + + @Test + @SuppressWarnings("unchecked") + public void testBindFailure() throws Exception { + when(bindFuture.awaitUninterruptibly()).thenReturn(bindFuture); + ChannelFuture future = mock(ChannelFuture.class); + when(future.awaitUninterruptibly()).thenReturn(future); + when(future.isSuccess()).thenReturn(false); + Channel channel = channelFactory.newChannel(); + eventLoop.register(channel); + when(future.channel()).thenReturn(channel); + Throwable mockCause = mock(Throwable.class); + when(future.cause()).thenReturn(mockCause); + SocketAddress addr = new InetSocketAddress(0); + Map map = ImmutableMap.of(future, addr); + when(bindFuture.getNow()).thenReturn(map); + when(bindFuture.isSuccess()).thenReturn(true); + Future mockFuture = (Future) mock(Future.class); + doReturn(mockFuture).when(mockEventLoopGroup).submit(any(Runnable.class)); + verifyServerNotStart(Collections.singletonList(addr), mockEventLoopGroup, + IOException.class, "Failed to bind to address " + addr); + } + + @Test + public void testBindPartialFailure() throws Exception { + SocketAddress add1 = new InetSocketAddress(0); + SocketAddress add2 = new InetSocketAddress(2); + SocketAddress add3 = new InetSocketAddress(2); + verifyServerNotStart(ImmutableList.of(add1, add2, add3), eventLoop, + IOException.class, "Failed to bind to address " + add3); + } + + private void verifyServerNotStart(List addr, EventLoopGroup ev, + Class expectedException, String expectedMessage) + throws Exception { + NettyServer ns = getServer(addr, ev); + try { + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + return new NoopServerTransportListener(); + } + + @Override + public void serverShutdown() { + } + }); + } catch (Exception ex) { + assertTrue(expectedException.isInstance(ex)); + assertThat(ex.getMessage()).isEqualTo(expectedMessage); + assertFalse(addr.isEmpty()); + // Listener tasks are executed on the event loop, so await until noop task is drained. + ev.submit(new Runnable() { + @Override + public void run() {} + }).await(5, TimeUnit.SECONDS); + assertThat(ns.getListenSocketAddress()).isEqualTo(addr.get(0)); + assertThat(ns.getListenSocketAddresses()).isEqualTo(addr); + assertTrue(ns.getListenSocketStatsList().isEmpty()); + assertNull(ns.getListenSocketStats()); + return; + } + fail(); + } + + private NettyServer getServer(List addr, EventLoopGroup ev) { + return new NettyServer( + addr, + new ReflectiveChannelFactory<>(NioServerSocketChannel.class), + new HashMap, Object>(), + new HashMap, Object>(), + new FixedObjectPool<>(ev), + new FixedObjectPool<>(ev), + false, + ProtocolNegotiators.plaintext(), + Collections.emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + false, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0, // ignore + Attributes.EMPTY, + channelz); + } + private static class NoopServerTransportListener implements ServerTransportListener { @Override public void streamCreated(ServerStream stream, String method, Metadata headers) {} diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java index 7280ba6494..6aa6ca1b32 100644 --- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java @@ -63,7 +63,7 @@ public class NettyTransportTest extends AbstractTransportTest { } @Override - protected List newServer( + protected InternalServer newServer( List streamTracerFactories) { return NettyServerBuilder .forAddress(new InetSocketAddress("localhost", 0)) @@ -73,7 +73,7 @@ public class NettyTransportTest extends AbstractTransportTest { } @Override - protected List newServer( + protected InternalServer newServer( int port, List streamTracerFactories) { return NettyServerBuilder .forAddress(new InetSocketAddress("localhost", port)) diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java index 1f812fe064..44e493c259 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpTransportTest.java @@ -51,7 +51,7 @@ public class OkHttpTransportTest extends AbstractTransportTest { } @Override - protected List newServer( + protected InternalServer newServer( List streamTracerFactories) { NettyServerBuilder builder = NettyServerBuilder .forPort(0) @@ -61,7 +61,7 @@ public class OkHttpTransportTest extends AbstractTransportTest { } @Override - protected List newServer( + protected InternalServer newServer( int port, List streamTracerFactories) { NettyServerBuilder builder = NettyServerBuilder .forAddress(new InetSocketAddress(port)) diff --git a/testing/src/main/java/io/grpc/internal/testing/TestUtils.java b/testing/src/main/java/io/grpc/internal/testing/TestUtils.java index 6b467eff52..e715b19202 100644 --- a/testing/src/main/java/io/grpc/internal/testing/TestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/TestUtils.java @@ -77,6 +77,24 @@ public class TestUtils { } } + /** + * Creates a new list of {@link InetSocketAddress} on localhost that overrides the host with + * {@link #TEST_SERVER_HOST}. + */ + public static List testServerAddresses(InetSocketAddress... originalSockAddr) { + try { + InetAddress inetAddress = InetAddress.getByName("localhost"); + inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress()); + List addresses = new ArrayList<>(); + for (InetSocketAddress orig: originalSockAddr) { + addresses.add(new InetSocketAddress(inetAddress, orig.getPort())); + } + return addresses; + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + /** * Returns the ciphers preferred to use during tests. They may be chosen because they are widely * available or because they are fast. There is no requirement that they provide confidentiality