diff --git a/core/src/main/java/io/grpc/AbstractChannelBuilder.java b/core/src/main/java/io/grpc/AbstractChannelBuilder.java index 5c263798df..d137e26a41 100644 --- a/core/src/main/java/io/grpc/AbstractChannelBuilder.java +++ b/core/src/main/java/io/grpc/AbstractChannelBuilder.java @@ -127,7 +127,7 @@ public abstract class AbstractChannelBuilderpublic class TcpTransportServerFactory { * public static Server newServer(Executor executor, HandlerRegistry registry, * String configuration) { - * ServerImpl server = new ServerImpl(executor, registry); - * return server.setTransportServer( - * new TcpTransportServer(server.serverListener(), configuration)); + * return new ServerImpl(executor, registry, new TcpTransportServer(configuration)); * } * } * @@ -67,8 +64,6 @@ import java.util.concurrent.TimeUnit; public class ServerImpl implements Server { private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); - private final ServerListener serverListener = new ServerListenerImpl(); - private final ServerTransportListener serverTransportListener = new ServerTransportListenerImpl(); /** Executor for application processing. */ private final Executor executor; private final HandlerRegistry registry; @@ -77,46 +72,21 @@ public class ServerImpl implements Server { private boolean terminated; private Runnable terminationRunnable; /** Service encapsulating something similar to an accept() socket. */ - private Service transportServer; + private final io.grpc.transport.Server transportServer; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ - private final Collection transports = new HashSet(); + private final Collection transports = new HashSet(); /** - * Construct a server. {@link #setTransportServer(Service)} must be called before starting the - * server. + * Construct a server. * * @param executor to call methods on behalf of remote clients * @param registry of methods to expose to remote clients. */ - public ServerImpl(Executor executor, HandlerRegistry registry) { - this.executor = Preconditions.checkNotNull(executor); - this.registry = Preconditions.checkNotNull(registry); - } - - /** - * Set the transport server for the server. {@code transportServer} should be in state NEW and not - * shared with any other {@code Server}s; it will be started and managed by the newly-created - * server instance. Must be called before starting server. - * - * @return this object - */ - public synchronized ServerImpl setTransportServer(Service transportServer) { - if (shutdown) { - throw new IllegalStateException("Already shutdown"); - } - Preconditions.checkState(this.transportServer == null, "transportServer already set"); - this.transportServer = Preconditions.checkNotNull(transportServer); - Preconditions.checkArgument( - transportServer.state() == Service.State.NEW, "transport server not in NEW state"); - transportServer.addListener(new TransportServiceListener(transportServer), - MoreExecutors.directExecutor()); - transports.add(transportServer); - return this; - } - - /** Listener to be called by transport factories to notify of new transport instances. */ - public ServerListener serverListener() { - return serverListener; + public ServerImpl(Executor executor, HandlerRegistry registry, + io.grpc.transport.Server transportServer) { + this.executor = Preconditions.checkNotNull(executor, "executor"); + this.registry = Preconditions.checkNotNull(registry, "registry"); + this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); } /** Hack to allow executors to auto-shutdown. Not for general use. */ @@ -130,22 +100,15 @@ public class ServerImpl implements Server { * * @return {@code this} object * @throws IllegalStateException if already started + * @throws IOException if unable to bind */ - public synchronized ServerImpl start() { + public synchronized ServerImpl start() throws IOException { if (started) { throw new IllegalStateException("Already started"); } + // Start and wait for any port to actually be bound. + transportServer.start(new ServerListenerImpl()); started = true; - try { - // Start and wait for any port to actually be bound. - transportServer.startAsync().awaitRunning(); - } catch (IllegalStateException ex) { - Throwable t = transportServer.failureCause(); - if (t != null) { - throw Throwables.propagate(t); - } - throw ex; - } return this; } @@ -153,12 +116,11 @@ public class ServerImpl implements Server { * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected. */ public synchronized ServerImpl shutdown() { - shutdown = true; - // transports collection can be modified during stopAsync(), even if we hold the lock, due to - // reentrancy. - for (Service transport : transports.toArray(new Service[transports.size()])) { - transport.stopAsync(); + if (shutdown) { + return this; } + transportServer.shutdown(); + shutdown = true; return this; } @@ -226,10 +188,15 @@ public class ServerImpl implements Server { * * @param transport service to remove */ - private synchronized void transportClosed(Service transport) { + private synchronized void transportClosed(ServerTransport transport) { if (!transports.remove(transport)) { throw new AssertionError("Transport already removed"); } + checkForTermination(); + } + + /** Notify of complete shutdown if necessary. */ + private synchronized void checkForTermination() { if (shutdown && transports.isEmpty()) { terminated = true; notifyAll(); @@ -241,50 +208,39 @@ public class ServerImpl implements Server { private class ServerListenerImpl implements ServerListener { @Override - public ServerTransportListener transportCreated(Service transport) { - Service.State transportState = transport.state(); - Preconditions.checkArgument( - transportState == Service.State.STARTING || transportState == Service.State.RUNNING, - "Created transport should be starting or running"); - synchronized (this) { - if (shutdown) { - transport.stopAsync(); - return serverTransportListener; - } + public ServerTransportListener transportCreated(ServerTransport transport) { + synchronized (ServerImpl.this) { transports.add(transport); } - // transports collection can be modified during this call, even if we hold the lock, due to - // reentrancy. - transport.addListener(new TransportServiceListener(transport), - MoreExecutors.directExecutor()); - // We assume that transport.state() won't change by another thread before the listener was - // registered. - Preconditions.checkState( - transport.state() == transportState, "transport changed state unexpectedly!"); - return serverTransportListener; - } - } - - /** Listens for lifecycle changes to a "TCP connection." */ - private class TransportServiceListener extends Service.Listener { - private final Service transport; - - public TransportServiceListener(Service transport) { - this.transport = transport; + return new ServerTransportListenerImpl(transport); } @Override - public void failed(Service.State from, Throwable failure) { - transportClosed(transport); - } - - @Override - public void terminated(Service.State from) { - transportClosed(transport); + public void serverShutdown() { + synchronized (ServerImpl.this) { + // transports collection can be modified during shutdown(), even if we hold the lock, due + // to reentrancy. + for (ServerTransport transport + : transports.toArray(new ServerTransport[transports.size()])) { + transport.shutdown(); + } + checkForTermination(); + } } } private class ServerTransportListenerImpl implements ServerTransportListener { + private final ServerTransport transport; + + public ServerTransportListenerImpl(ServerTransport transport) { + this.transport = transport; + } + + @Override + public void transportTerminated() { + transportClosed(transport); + } + @Override public ServerStreamListener streamCreated(final ServerStream stream, final String methodName, final Metadata.Headers headers) { diff --git a/core/src/main/java/io/grpc/transport/Server.java b/core/src/main/java/io/grpc/transport/Server.java new file mode 100644 index 0000000000..de4dccbaca --- /dev/null +++ b/core/src/main/java/io/grpc/transport/Server.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +import java.io.IOException; + +/** + * A server accepts new incomming connections. This is would commonly encapsulate a bound socket + * that {@code accept(}}s new connections. + */ +public interface Server { + /** + * Starts transport. Implementations must not call {@code listener} until after {@code start()} + * returns. The method only returns after it has done the equivalent of bind()ing, so it will be + * able to service any connections created after returning. + * + * @param listener non-{@code null} listener of server events + * @throws IOException if unable to bind + */ + void start(ServerListener listener) throws IOException; + + /** + * Initiates an orderly shutdown of the server. Existing transports continue, but new transports + * will not be created (once {@link ServerListener#serverShutdown()} callback called). + */ + void shutdown(); +} diff --git a/core/src/main/java/io/grpc/transport/ServerListener.java b/core/src/main/java/io/grpc/transport/ServerListener.java index 43d793ff4b..8c49e892f4 100644 --- a/core/src/main/java/io/grpc/transport/ServerListener.java +++ b/core/src/main/java/io/grpc/transport/ServerListener.java @@ -31,10 +31,9 @@ package io.grpc.transport; -import com.google.common.util.concurrent.Service; - /** - * A listener to a server for transport creation events. + * A listener to a server for transport creation events. Notifications must occur from the transport + * thread. */ public interface ServerListener { @@ -44,5 +43,12 @@ public interface ServerListener { * @param transport the new transport to be observed. * @return a listener for stream creation events on the transport. */ - ServerTransportListener transportCreated(Service transport); + ServerTransportListener transportCreated(ServerTransport transport); + + /** + * The server is shutting down. No new transports will be processed, but existing streams may + * continue. Shutdown is only caused by a call to {@link Server#shutdown()}. All resources have + * been released. + */ + void serverShutdown(); } diff --git a/core/src/main/java/io/grpc/transport/ServerTransport.java b/core/src/main/java/io/grpc/transport/ServerTransport.java new file mode 100644 index 0000000000..ab2d88e6f2 --- /dev/null +++ b/core/src/main/java/io/grpc/transport/ServerTransport.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +/** An inbound connection. */ +public interface ServerTransport { + /** + * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will + * eventually begin failing. New streams "eventually" begin failing because shutdown may need to + * be processed on a separate thread. + */ + void shutdown(); +} diff --git a/core/src/main/java/io/grpc/transport/ServerTransportListener.java b/core/src/main/java/io/grpc/transport/ServerTransportListener.java index f5a37185d6..a0af53ef13 100644 --- a/core/src/main/java/io/grpc/transport/ServerTransportListener.java +++ b/core/src/main/java/io/grpc/transport/ServerTransportListener.java @@ -34,7 +34,8 @@ package io.grpc.transport; import io.grpc.Metadata; /** - * A observer of a server-side transport for stream creation events. + * A observer of a server-side transport for stream creation events. Notifications must occur from + * the transport thread. */ public interface ServerTransportListener { @@ -48,4 +49,9 @@ public interface ServerTransportListener { */ ServerStreamListener streamCreated(ServerStream stream, String method, Metadata.Headers headers); + + /** + * The transport completed shutting down. All resources have been released. + */ + void transportTerminated(); } diff --git a/core/src/test/java/io/grpc/ServerImplTest.java b/core/src/test/java/io/grpc/ServerImplTest.java index 9ed2e15c4c..fe7a072a93 100644 --- a/core/src/test/java/io/grpc/ServerImplTest.java +++ b/core/src/test/java/io/grpc/ServerImplTest.java @@ -37,6 +37,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.notNull; @@ -47,11 +48,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.io.ByteStreams; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.Service; +import io.grpc.transport.ServerListener; import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStreamListener; +import io.grpc.transport.ServerTransport; import io.grpc.transport.ServerTransportListener; import org.junit.After; @@ -71,7 +72,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** Unit tests for {@link ServerImpl}. */ @@ -82,9 +82,8 @@ public class ServerImplTest { private ExecutorService executor = Executors.newSingleThreadExecutor(); private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl(); - private Service transportServer = new NoopService(); - private ServerImpl server = new ServerImpl(executor, registry) - .setTransportServer(transportServer); + private SimpleServer transportServer = new SimpleServer(); + private ServerImpl server = new ServerImpl(executor, registry, transportServer); @Mock private ServerStream stream; @@ -94,7 +93,7 @@ public class ServerImplTest { /** Set up for test. */ @Before - public void startUp() { + public void startUp() throws IOException { MockitoAnnotations.initMocks(this); server.start(); @@ -107,92 +106,62 @@ public class ServerImplTest { } @Test - public void startStopImmediate() throws InterruptedException { - Service transportServer = new NoopService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); - assertEquals(Service.State.NEW, transportServer.state()); + public void startStopImmediate() throws IOException { + transportServer = new SimpleServer() { + @Override + public void shutdown() {} + }; + ServerImpl server = new ServerImpl(executor, registry, transportServer); server.start(); - assertEquals(Service.State.RUNNING, transportServer.state()); server.shutdown(); - assertTrue(server.awaitTerminated(100, TimeUnit.MILLISECONDS)); - assertEquals(Service.State.TERMINATED, transportServer.state()); + assertTrue(server.isShutdown()); + assertFalse(server.isTerminated()); + transportServer.listener.serverShutdown(); + assertTrue(server.isTerminated()); + } + + @Test + public void startStopImmediateWithChildTransport() throws IOException { + ServerImpl server = new ServerImpl(executor, registry, transportServer); + server.start(); + class DelayedShutdownServerTransport extends SimpleServerTransport { + boolean shutdown; + + @Override + public void shutdown() { + shutdown = true; + } + } + + DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport(); + transportServer.registerNewServerTransport(serverTransport); + server.shutdown(); + assertTrue(server.isShutdown()); + assertFalse(server.isTerminated()); + assertTrue(serverTransport.shutdown); + serverTransport.listener.transportTerminated(); + assertTrue(server.isTerminated()); } @Test public void transportServerFailsStartup() { - final Exception ex = new RuntimeException(); - class FailingStartupService extends NoopService { + final IOException ex = new IOException(); + class FailingStartupServer extends SimpleServer { @Override - public void doStart() { - notifyFailed(ex); + public void start(ServerListener listener) throws IOException { + throw ex; } } - FailingStartupService transportServer = new FailingStartupService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); + ServerImpl server = new ServerImpl(executor, registry, new FailingStartupServer()); try { server.start(); - } catch (Exception e) { + fail("expected exception"); + } catch (IOException e) { assertSame(ex, e); } } - @Test - public void transportServerFirstToShutdown() { - class ManualStoppedService extends NoopService { - public void doNotifyStopped() { - notifyStopped(); - } - - @Override - public void doStop() {} // Don't notify. - } - - NoopService transportServer = new NoopService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer) - .start(); - ManualStoppedService transport = new ManualStoppedService(); - transport.startAsync(); - server.serverListener().transportCreated(transport); - server.shutdown(); - assertEquals(Service.State.STOPPING, transport.state()); - assertEquals(Service.State.TERMINATED, transportServer.state()); - assertTrue(server.isShutdown()); - assertFalse(server.isTerminated()); - - transport.doNotifyStopped(); - assertEquals(Service.State.TERMINATED, transport.state()); - assertTrue(server.isTerminated()); - } - - @Test - public void transportServerLastToShutdown() { - class ManualStoppedService extends NoopService { - public void doNotifyStopped() { - notifyStopped(); - } - - @Override - public void doStop() {} // Don't notify. - } - - ManualStoppedService transportServer = new ManualStoppedService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer) - .start(); - Service transport = new NoopService(); - transport.startAsync(); - server.serverListener().transportCreated(transport); - server.shutdown(); - assertEquals(Service.State.TERMINATED, transport.state()); - assertEquals(Service.State.STOPPING, transportServer.state()); - assertTrue(server.isShutdown()); - assertFalse(server.isTerminated()); - - transportServer.doNotifyStopped(); - assertEquals(Service.State.TERMINATED, transportServer.state()); - assertTrue(server.isTerminated()); - } - @Test public void basicExchangeSuccessful() throws Exception { final Metadata.Key metadataKey @@ -213,7 +182,8 @@ public class ServerImplTest { return callListener; } }).build()); - ServerTransportListener transportListener = newTransport(server); + ServerTransportListener transportListener + = transportServer.registerNewServerTransport(new SimpleServerTransport()); Metadata.Headers headers = new Metadata.Headers(); headers.put(metadataKey, 0); @@ -271,7 +241,8 @@ public class ServerImplTest { throw status.asRuntimeException(); } }).build()); - ServerTransportListener transportListener = newTransport(server); + ServerTransportListener transportListener + = transportServer.registerNewServerTransport(new SimpleServerTransport()); ServerStreamListener streamListener = transportListener.streamCreated(stream, "/Waiter/serve", new Metadata.Headers()); @@ -284,12 +255,6 @@ public class ServerImplTest { verifyNoMoreInteractions(stream); } - private static ServerTransportListener newTransport(ServerImpl server) { - Service transport = new NoopService(); - transport.startAsync(); - return server.serverListener().transportCreated(transport); - } - /** * Useful for plugging a single-threaded executor from processing tasks, or for waiting until a * single-threaded executor has processed queued tasks. @@ -311,15 +276,30 @@ public class ServerImplTest { return barrier; } - private static class NoopService extends AbstractService { + private static class SimpleServer implements io.grpc.transport.Server { + ServerListener listener; + @Override - protected void doStart() { - notifyStarted(); + public void start(ServerListener listener) throws IOException { + this.listener = listener; } @Override - protected void doStop() { - notifyStopped(); + public void shutdown() { + listener.serverShutdown(); + } + + public ServerTransportListener registerNewServerTransport(SimpleServerTransport transport) { + return transport.listener = listener.transportCreated(transport); + } + } + + private static class SimpleServerTransport implements ServerTransport { + ServerTransportListener listener; + + @Override + public void shutdown() { + listener.transportTerminated(); } } diff --git a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java index 638b3d8485..bcbd768808 100644 --- a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java +++ b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java @@ -80,7 +80,7 @@ public class RouteGuideServer { } /** Start serving requests. */ - public void start() { + public void start() throws IOException { grpcServer = NettyServerBuilder.forPort(port) .addService(RouteGuideGrpc.bindService(new RouteGuideService(features))) .build().start(); diff --git a/integration-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java index 1f0e5adfba..d11c2c7cff 100644 --- a/integration-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java +++ b/integration-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java @@ -71,6 +71,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; @@ -93,11 +94,14 @@ public abstract class AbstractTransportTest { protected static void startStaticServer(AbstractServerBuilder builder) { testServiceExecutor = Executors.newScheduledThreadPool(2); - server = builder - .addService(ServerInterceptors.intercept( - TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)), - TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))) - .build().start(); + builder.addService(ServerInterceptors.intercept( + TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)), + TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))); + try { + server = builder.build().start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } } protected static void stopStaticServer() { diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServer.java b/netty/src/main/java/io/grpc/transport/netty/NettyServer.java index 140cb1e8c2..e2f2b9fed3 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServer.java @@ -35,8 +35,8 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractService; +import io.grpc.transport.Server; import io.grpc.transport.ServerListener; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -48,53 +48,47 @@ import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; +import java.io.IOException; import java.net.SocketAddress; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** - * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a - * Netty-based server. + * Netty-based server implementation. */ -public class NettyServer extends AbstractService { +public class NettyServer implements Server { + private static final Logger log = Logger.getLogger(Server.class.getName()); + private final SocketAddress address; private final Class channelType; - private final ChannelInitializer channelInitializer; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; + private final SslContext sslContext; + private final int maxStreamsPerConnection; + private ServerListener listener; private Channel channel; - NettyServer(ServerListener serverListener, SocketAddress address, - Class channelType, EventLoopGroup bossGroup, - EventLoopGroup workerGroup, int maxStreamsPerConnection) { - this(serverListener, address, channelType, bossGroup, workerGroup, null, - maxStreamsPerConnection); + NettyServer(SocketAddress address, Class channelType, + EventLoopGroup bossGroup, EventLoopGroup workerGroup, int maxStreamsPerConnection) { + this(address, channelType, bossGroup, workerGroup, null, maxStreamsPerConnection); } - NettyServer(final ServerListener serverListener, SocketAddress address, - Class channelType, EventLoopGroup bossGroup, - EventLoopGroup workerGroup, @Nullable final SslContext sslContext, - final int maxStreamsPerConnection) { + NettyServer(SocketAddress address, Class channelType, + EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable SslContext sslContext, + int maxStreamsPerConnection) { this.address = address; this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup"); this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup"); - this.channelInitializer = new ChannelInitializer() { - @Override - public void initChannel(Channel ch) throws Exception { - NettyServerTransport transport - = new NettyServerTransport(ch, serverListener, sslContext, maxStreamsPerConnection); - // TODO(ejona86): Ideally we wouldn't handle handler registration asyncly and then be forced - // to block for completion on another thread. This should be resolved as part of removing - // Service from server transport. - transport.startAsync().awaitRunning(); - // TODO(nmittler): Should we wait for transport shutdown before shutting down server? - } - }; + this.sslContext = sslContext; + this.maxStreamsPerConnection = maxStreamsPerConnection; } @Override - protected void doStart() { + public void start(ServerListener serverListener) throws IOException { + listener = serverListener; ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(channelType); @@ -102,36 +96,42 @@ public class NettyServer extends AbstractService { b.option(SO_BACKLOG, 128); b.childOption(SO_KEEPALIVE, true); } - b.childHandler(channelInitializer); - - // Bind and start to accept incoming connections. - b.bind(address).addListener(new ChannelFutureListener() { + b.childHandler(new ChannelInitializer() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - channel = future.channel(); - notifyStarted(); - } else { - notifyFailed(future.cause()); - } + public void initChannel(Channel ch) throws Exception { + NettyServerTransport transport + = new NettyServerTransport(ch, sslContext, maxStreamsPerConnection); + transport.start(listener.transportCreated(transport)); } }); + + // Bind and start to accept incoming connections. + ChannelFuture future = b.bind(address); + try { + future.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted waiting for bind"); + } + if (!future.isSuccess()) { + throw new IOException("Failed to bind", future.cause()); + } + channel = future.channel(); } @Override - protected void doStop() { - // Wait for the channel to close. - if (channel != null && channel.isOpen()) { - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - notifyStopped(); - } else { - notifyFailed(future.cause()); - } - } - }); + public void shutdown() { + if (channel == null || channel.isOpen()) { + return; } + channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + log.log(Level.WARNING, "Error shutting down server", future.cause()); + } + listener.serverShutdown(); + } + }); } } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java index 0ea7ec1f0c..9fbc6ce80d 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java @@ -32,13 +32,10 @@ package io.grpc.transport.netty; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Service; import io.grpc.AbstractServerBuilder; import io.grpc.HandlerRegistry; import io.grpc.SharedResourceHolder; -import io.grpc.transport.ServerListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -177,44 +174,25 @@ public final class NettyServerBuilder extends AbstractServerBuilder