Remove Guava's Service from server transport

ServerImpl.start() now throws IOException to make the error explicit.
This was previously being papered over by wrapping the exception in
RuntimeException.
This commit is contained in:
Eric Anderson 2015-04-13 18:13:44 -07:00
parent f920badc5e
commit 4f4f8e40bf
13 changed files with 361 additions and 311 deletions

View File

@ -127,7 +127,7 @@ public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBui
* Constructor. * Constructor.
* *
* @param transportFactory the created channel uses this factory to create transports * @param transportFactory the created channel uses this factory to create transports
* @param terminationRunnable will be called at the channel's life-cycle events * @param terminationRunnable will be called at the channel termination
*/ */
public ChannelEssentials(ClientTransportFactory transportFactory, public ChannelEssentials(ClientTransportFactory transportFactory,
@Nullable Runnable terminationRunnable) { @Nullable Runnable terminationRunnable) {

View File

@ -34,9 +34,6 @@ package io.grpc;
import static io.grpc.AbstractChannelBuilder.DEFAULT_EXECUTOR; import static io.grpc.AbstractChannelBuilder.DEFAULT_EXECUTOR;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Service;
import io.grpc.transport.ServerListener;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -114,18 +111,39 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild
releaseExecutor = true; releaseExecutor = true;
} }
ServerImpl server = new ServerImpl(executor, registry); final ServerEssentials essentials = buildEssentials();
server.setTransportServer(buildTransportServer(server.serverListener())); ServerImpl server = new ServerImpl(executor, registry, essentials.server);
server.setTerminationRunnable(new Runnable() { server.setTerminationRunnable(new Runnable() {
@Override @Override
public void run() { public void run() {
if (releaseExecutor) { if (releaseExecutor) {
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor); SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
} }
if (essentials.terminationRunnable != null) {
essentials.terminationRunnable.run();
}
} }
}); });
return server; return server;
} }
protected abstract Service buildTransportServer(ServerListener serverListener); protected abstract ServerEssentials buildEssentials();
protected static class ServerEssentials {
final io.grpc.transport.Server server;
@Nullable
final Runnable terminationRunnable;
/**
* Constructor.
*
* @param server the created server uses this server to accept transports
* @param terminationRunnable will be called at the server termination
*/
public ServerEssentials(io.grpc.transport.Server server,
@Nullable Runnable terminationRunnable) {
this.server = Preconditions.checkNotNull(server, "server");
this.terminationRunnable = terminationRunnable;
}
}
} }

View File

@ -33,12 +33,11 @@ package io.grpc;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import io.grpc.transport.ServerListener; import io.grpc.transport.ServerListener;
import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStream;
import io.grpc.transport.ServerStreamListener; import io.grpc.transport.ServerStreamListener;
import io.grpc.transport.ServerTransport;
import io.grpc.transport.ServerTransportListener; import io.grpc.transport.ServerTransportListener;
import java.io.IOException; import java.io.IOException;
@ -55,9 +54,7 @@ import java.util.concurrent.TimeUnit;
* <pre><code>public class TcpTransportServerFactory { * <pre><code>public class TcpTransportServerFactory {
* public static Server newServer(Executor executor, HandlerRegistry registry, * public static Server newServer(Executor executor, HandlerRegistry registry,
* String configuration) { * String configuration) {
* ServerImpl server = new ServerImpl(executor, registry); * return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
* return server.setTransportServer(
* new TcpTransportServer(server.serverListener(), configuration));
* } * }
* }</code></pre> * }</code></pre>
* *
@ -67,8 +64,6 @@ import java.util.concurrent.TimeUnit;
public class ServerImpl implements Server { public class ServerImpl implements Server {
private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
private final ServerListener serverListener = new ServerListenerImpl();
private final ServerTransportListener serverTransportListener = new ServerTransportListenerImpl();
/** Executor for application processing. */ /** Executor for application processing. */
private final Executor executor; private final Executor executor;
private final HandlerRegistry registry; private final HandlerRegistry registry;
@ -77,46 +72,21 @@ public class ServerImpl implements Server {
private boolean terminated; private boolean terminated;
private Runnable terminationRunnable; private Runnable terminationRunnable;
/** Service encapsulating something similar to an accept() socket. */ /** Service encapsulating something similar to an accept() socket. */
private Service transportServer; private final io.grpc.transport.Server transportServer;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */ /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
private final Collection<Service> transports = new HashSet<Service>(); private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
/** /**
* Construct a server. {@link #setTransportServer(Service)} must be called before starting the * Construct a server.
* server.
* *
* @param executor to call methods on behalf of remote clients * @param executor to call methods on behalf of remote clients
* @param registry of methods to expose to remote clients. * @param registry of methods to expose to remote clients.
*/ */
public ServerImpl(Executor executor, HandlerRegistry registry) { public ServerImpl(Executor executor, HandlerRegistry registry,
this.executor = Preconditions.checkNotNull(executor); io.grpc.transport.Server transportServer) {
this.registry = Preconditions.checkNotNull(registry); this.executor = Preconditions.checkNotNull(executor, "executor");
} this.registry = Preconditions.checkNotNull(registry, "registry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
/**
* Set the transport server for the server. {@code transportServer} should be in state NEW and not
* shared with any other {@code Server}s; it will be started and managed by the newly-created
* server instance. Must be called before starting server.
*
* @return this object
*/
public synchronized ServerImpl setTransportServer(Service transportServer) {
if (shutdown) {
throw new IllegalStateException("Already shutdown");
}
Preconditions.checkState(this.transportServer == null, "transportServer already set");
this.transportServer = Preconditions.checkNotNull(transportServer);
Preconditions.checkArgument(
transportServer.state() == Service.State.NEW, "transport server not in NEW state");
transportServer.addListener(new TransportServiceListener(transportServer),
MoreExecutors.directExecutor());
transports.add(transportServer);
return this;
}
/** Listener to be called by transport factories to notify of new transport instances. */
public ServerListener serverListener() {
return serverListener;
} }
/** Hack to allow executors to auto-shutdown. Not for general use. */ /** Hack to allow executors to auto-shutdown. Not for general use. */
@ -130,22 +100,15 @@ public class ServerImpl implements Server {
* *
* @return {@code this} object * @return {@code this} object
* @throws IllegalStateException if already started * @throws IllegalStateException if already started
* @throws IOException if unable to bind
*/ */
public synchronized ServerImpl start() { public synchronized ServerImpl start() throws IOException {
if (started) { if (started) {
throw new IllegalStateException("Already started"); throw new IllegalStateException("Already started");
} }
// Start and wait for any port to actually be bound.
transportServer.start(new ServerListenerImpl());
started = true; started = true;
try {
// Start and wait for any port to actually be bound.
transportServer.startAsync().awaitRunning();
} catch (IllegalStateException ex) {
Throwable t = transportServer.failureCause();
if (t != null) {
throw Throwables.propagate(t);
}
throw ex;
}
return this; return this;
} }
@ -153,12 +116,11 @@ public class ServerImpl implements Server {
* Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected. * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
*/ */
public synchronized ServerImpl shutdown() { public synchronized ServerImpl shutdown() {
shutdown = true; if (shutdown) {
// transports collection can be modified during stopAsync(), even if we hold the lock, due to return this;
// reentrancy.
for (Service transport : transports.toArray(new Service[transports.size()])) {
transport.stopAsync();
} }
transportServer.shutdown();
shutdown = true;
return this; return this;
} }
@ -226,10 +188,15 @@ public class ServerImpl implements Server {
* *
* @param transport service to remove * @param transport service to remove
*/ */
private synchronized void transportClosed(Service transport) { private synchronized void transportClosed(ServerTransport transport) {
if (!transports.remove(transport)) { if (!transports.remove(transport)) {
throw new AssertionError("Transport already removed"); throw new AssertionError("Transport already removed");
} }
checkForTermination();
}
/** Notify of complete shutdown if necessary. */
private synchronized void checkForTermination() {
if (shutdown && transports.isEmpty()) { if (shutdown && transports.isEmpty()) {
terminated = true; terminated = true;
notifyAll(); notifyAll();
@ -241,50 +208,39 @@ public class ServerImpl implements Server {
private class ServerListenerImpl implements ServerListener { private class ServerListenerImpl implements ServerListener {
@Override @Override
public ServerTransportListener transportCreated(Service transport) { public ServerTransportListener transportCreated(ServerTransport transport) {
Service.State transportState = transport.state(); synchronized (ServerImpl.this) {
Preconditions.checkArgument(
transportState == Service.State.STARTING || transportState == Service.State.RUNNING,
"Created transport should be starting or running");
synchronized (this) {
if (shutdown) {
transport.stopAsync();
return serverTransportListener;
}
transports.add(transport); transports.add(transport);
} }
// transports collection can be modified during this call, even if we hold the lock, due to return new ServerTransportListenerImpl(transport);
// reentrancy.
transport.addListener(new TransportServiceListener(transport),
MoreExecutors.directExecutor());
// We assume that transport.state() won't change by another thread before the listener was
// registered.
Preconditions.checkState(
transport.state() == transportState, "transport changed state unexpectedly!");
return serverTransportListener;
}
}
/** Listens for lifecycle changes to a "TCP connection." */
private class TransportServiceListener extends Service.Listener {
private final Service transport;
public TransportServiceListener(Service transport) {
this.transport = transport;
} }
@Override @Override
public void failed(Service.State from, Throwable failure) { public void serverShutdown() {
transportClosed(transport); synchronized (ServerImpl.this) {
} // transports collection can be modified during shutdown(), even if we hold the lock, due
// to reentrancy.
@Override for (ServerTransport transport
public void terminated(Service.State from) { : transports.toArray(new ServerTransport[transports.size()])) {
transportClosed(transport); transport.shutdown();
}
checkForTermination();
}
} }
} }
private class ServerTransportListenerImpl implements ServerTransportListener { private class ServerTransportListenerImpl implements ServerTransportListener {
private final ServerTransport transport;
public ServerTransportListenerImpl(ServerTransport transport) {
this.transport = transport;
}
@Override
public void transportTerminated() {
transportClosed(transport);
}
@Override @Override
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName, public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
final Metadata.Headers headers) { final Metadata.Headers headers) {

View File

@ -0,0 +1,56 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import java.io.IOException;
/**
* A server accepts new incomming connections. This is would commonly encapsulate a bound socket
* that {@code accept(}}s new connections.
*/
public interface Server {
/**
* Starts transport. Implementations must not call {@code listener} until after {@code start()}
* returns. The method only returns after it has done the equivalent of bind()ing, so it will be
* able to service any connections created after returning.
*
* @param listener non-{@code null} listener of server events
* @throws IOException if unable to bind
*/
void start(ServerListener listener) throws IOException;
/**
* Initiates an orderly shutdown of the server. Existing transports continue, but new transports
* will not be created (once {@link ServerListener#serverShutdown()} callback called).
*/
void shutdown();
}

View File

@ -31,10 +31,9 @@
package io.grpc.transport; package io.grpc.transport;
import com.google.common.util.concurrent.Service;
/** /**
* A listener to a server for transport creation events. * A listener to a server for transport creation events. Notifications must occur from the transport
* thread.
*/ */
public interface ServerListener { public interface ServerListener {
@ -44,5 +43,12 @@ public interface ServerListener {
* @param transport the new transport to be observed. * @param transport the new transport to be observed.
* @return a listener for stream creation events on the transport. * @return a listener for stream creation events on the transport.
*/ */
ServerTransportListener transportCreated(Service transport); ServerTransportListener transportCreated(ServerTransport transport);
/**
* The server is shutting down. No new transports will be processed, but existing streams may
* continue. Shutdown is only caused by a call to {@link Server#shutdown()}. All resources have
* been released.
*/
void serverShutdown();
} }

View File

@ -0,0 +1,42 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
/** An inbound connection. */
public interface ServerTransport {
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
* eventually begin failing. New streams "eventually" begin failing because shutdown may need to
* be processed on a separate thread.
*/
void shutdown();
}

View File

@ -34,7 +34,8 @@ package io.grpc.transport;
import io.grpc.Metadata; import io.grpc.Metadata;
/** /**
* A observer of a server-side transport for stream creation events. * A observer of a server-side transport for stream creation events. Notifications must occur from
* the transport thread.
*/ */
public interface ServerTransportListener { public interface ServerTransportListener {
@ -48,4 +49,9 @@ public interface ServerTransportListener {
*/ */
ServerStreamListener streamCreated(ServerStream stream, String method, ServerStreamListener streamCreated(ServerStream stream, String method,
Metadata.Headers headers); Metadata.Headers headers);
/**
* The transport completed shutting down. All resources have been released.
*/
void transportTerminated();
} }

View File

@ -37,6 +37,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.notNull;
@ -47,11 +48,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import io.grpc.transport.ServerListener;
import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStream;
import io.grpc.transport.ServerStreamListener; import io.grpc.transport.ServerStreamListener;
import io.grpc.transport.ServerTransport;
import io.grpc.transport.ServerTransportListener; import io.grpc.transport.ServerTransportListener;
import org.junit.After; import org.junit.After;
@ -71,7 +72,6 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** Unit tests for {@link ServerImpl}. */ /** Unit tests for {@link ServerImpl}. */
@ -82,9 +82,8 @@ public class ServerImplTest {
private ExecutorService executor = Executors.newSingleThreadExecutor(); private ExecutorService executor = Executors.newSingleThreadExecutor();
private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl(); private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl();
private Service transportServer = new NoopService(); private SimpleServer transportServer = new SimpleServer();
private ServerImpl server = new ServerImpl(executor, registry) private ServerImpl server = new ServerImpl(executor, registry, transportServer);
.setTransportServer(transportServer);
@Mock @Mock
private ServerStream stream; private ServerStream stream;
@ -94,7 +93,7 @@ public class ServerImplTest {
/** Set up for test. */ /** Set up for test. */
@Before @Before
public void startUp() { public void startUp() throws IOException {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
server.start(); server.start();
@ -107,92 +106,62 @@ public class ServerImplTest {
} }
@Test @Test
public void startStopImmediate() throws InterruptedException { public void startStopImmediate() throws IOException {
Service transportServer = new NoopService(); transportServer = new SimpleServer() {
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); @Override
assertEquals(Service.State.NEW, transportServer.state()); public void shutdown() {}
};
ServerImpl server = new ServerImpl(executor, registry, transportServer);
server.start(); server.start();
assertEquals(Service.State.RUNNING, transportServer.state());
server.shutdown(); server.shutdown();
assertTrue(server.awaitTerminated(100, TimeUnit.MILLISECONDS)); assertTrue(server.isShutdown());
assertEquals(Service.State.TERMINATED, transportServer.state()); assertFalse(server.isTerminated());
transportServer.listener.serverShutdown();
assertTrue(server.isTerminated());
}
@Test
public void startStopImmediateWithChildTransport() throws IOException {
ServerImpl server = new ServerImpl(executor, registry, transportServer);
server.start();
class DelayedShutdownServerTransport extends SimpleServerTransport {
boolean shutdown;
@Override
public void shutdown() {
shutdown = true;
}
}
DelayedShutdownServerTransport serverTransport = new DelayedShutdownServerTransport();
transportServer.registerNewServerTransport(serverTransport);
server.shutdown();
assertTrue(server.isShutdown());
assertFalse(server.isTerminated());
assertTrue(serverTransport.shutdown);
serverTransport.listener.transportTerminated();
assertTrue(server.isTerminated());
} }
@Test @Test
public void transportServerFailsStartup() { public void transportServerFailsStartup() {
final Exception ex = new RuntimeException(); final IOException ex = new IOException();
class FailingStartupService extends NoopService { class FailingStartupServer extends SimpleServer {
@Override @Override
public void doStart() { public void start(ServerListener listener) throws IOException {
notifyFailed(ex); throw ex;
} }
} }
FailingStartupService transportServer = new FailingStartupService(); ServerImpl server = new ServerImpl(executor, registry, new FailingStartupServer());
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
try { try {
server.start(); server.start();
} catch (Exception e) { fail("expected exception");
} catch (IOException e) {
assertSame(ex, e); assertSame(ex, e);
} }
} }
@Test
public void transportServerFirstToShutdown() {
class ManualStoppedService extends NoopService {
public void doNotifyStopped() {
notifyStopped();
}
@Override
public void doStop() {} // Don't notify.
}
NoopService transportServer = new NoopService();
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
.start();
ManualStoppedService transport = new ManualStoppedService();
transport.startAsync();
server.serverListener().transportCreated(transport);
server.shutdown();
assertEquals(Service.State.STOPPING, transport.state());
assertEquals(Service.State.TERMINATED, transportServer.state());
assertTrue(server.isShutdown());
assertFalse(server.isTerminated());
transport.doNotifyStopped();
assertEquals(Service.State.TERMINATED, transport.state());
assertTrue(server.isTerminated());
}
@Test
public void transportServerLastToShutdown() {
class ManualStoppedService extends NoopService {
public void doNotifyStopped() {
notifyStopped();
}
@Override
public void doStop() {} // Don't notify.
}
ManualStoppedService transportServer = new ManualStoppedService();
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
.start();
Service transport = new NoopService();
transport.startAsync();
server.serverListener().transportCreated(transport);
server.shutdown();
assertEquals(Service.State.TERMINATED, transport.state());
assertEquals(Service.State.STOPPING, transportServer.state());
assertTrue(server.isShutdown());
assertFalse(server.isTerminated());
transportServer.doNotifyStopped();
assertEquals(Service.State.TERMINATED, transportServer.state());
assertTrue(server.isTerminated());
}
@Test @Test
public void basicExchangeSuccessful() throws Exception { public void basicExchangeSuccessful() throws Exception {
final Metadata.Key<Integer> metadataKey final Metadata.Key<Integer> metadataKey
@ -213,7 +182,8 @@ public class ServerImplTest {
return callListener; return callListener;
} }
}).build()); }).build());
ServerTransportListener transportListener = newTransport(server); ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
Metadata.Headers headers = new Metadata.Headers(); Metadata.Headers headers = new Metadata.Headers();
headers.put(metadataKey, 0); headers.put(metadataKey, 0);
@ -271,7 +241,8 @@ public class ServerImplTest {
throw status.asRuntimeException(); throw status.asRuntimeException();
} }
}).build()); }).build());
ServerTransportListener transportListener = newTransport(server); ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
ServerStreamListener streamListener ServerStreamListener streamListener
= transportListener.streamCreated(stream, "/Waiter/serve", new Metadata.Headers()); = transportListener.streamCreated(stream, "/Waiter/serve", new Metadata.Headers());
@ -284,12 +255,6 @@ public class ServerImplTest {
verifyNoMoreInteractions(stream); verifyNoMoreInteractions(stream);
} }
private static ServerTransportListener newTransport(ServerImpl server) {
Service transport = new NoopService();
transport.startAsync();
return server.serverListener().transportCreated(transport);
}
/** /**
* Useful for plugging a single-threaded executor from processing tasks, or for waiting until a * Useful for plugging a single-threaded executor from processing tasks, or for waiting until a
* single-threaded executor has processed queued tasks. * single-threaded executor has processed queued tasks.
@ -311,15 +276,30 @@ public class ServerImplTest {
return barrier; return barrier;
} }
private static class NoopService extends AbstractService { private static class SimpleServer implements io.grpc.transport.Server {
ServerListener listener;
@Override @Override
protected void doStart() { public void start(ServerListener listener) throws IOException {
notifyStarted(); this.listener = listener;
} }
@Override @Override
protected void doStop() { public void shutdown() {
notifyStopped(); listener.serverShutdown();
}
public ServerTransportListener registerNewServerTransport(SimpleServerTransport transport) {
return transport.listener = listener.transportCreated(transport);
}
}
private static class SimpleServerTransport implements ServerTransport {
ServerTransportListener listener;
@Override
public void shutdown() {
listener.transportTerminated();
} }
} }

View File

@ -80,7 +80,7 @@ public class RouteGuideServer {
} }
/** Start serving requests. */ /** Start serving requests. */
public void start() { public void start() throws IOException {
grpcServer = NettyServerBuilder.forPort(port) grpcServer = NettyServerBuilder.forPort(port)
.addService(RouteGuideGrpc.bindService(new RouteGuideService(features))) .addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
.build().start(); .build().start();

View File

@ -71,6 +71,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -93,11 +94,14 @@ public abstract class AbstractTransportTest {
protected static void startStaticServer(AbstractServerBuilder<?> builder) { protected static void startStaticServer(AbstractServerBuilder<?> builder) {
testServiceExecutor = Executors.newScheduledThreadPool(2); testServiceExecutor = Executors.newScheduledThreadPool(2);
server = builder builder.addService(ServerInterceptors.intercept(
.addService(ServerInterceptors.intercept( TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)),
TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)), TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)));
TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))) try {
.build().start(); server = builder.build().start();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
} }
protected static void stopStaticServer() { protected static void stopStaticServer() {

View File

@ -35,8 +35,8 @@ import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import io.grpc.transport.Server;
import io.grpc.transport.ServerListener; import io.grpc.transport.ServerListener;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -48,53 +48,47 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* Implementation of the {@link com.google.common.util.concurrent.Service} interface for a * Netty-based server implementation.
* Netty-based server.
*/ */
public class NettyServer extends AbstractService { public class NettyServer implements Server {
private static final Logger log = Logger.getLogger(Server.class.getName());
private final SocketAddress address; private final SocketAddress address;
private final Class<? extends ServerChannel> channelType; private final Class<? extends ServerChannel> channelType;
private final ChannelInitializer<Channel> channelInitializer;
private final EventLoopGroup bossGroup; private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
private final SslContext sslContext;
private final int maxStreamsPerConnection;
private ServerListener listener;
private Channel channel; private Channel channel;
NettyServer(ServerListener serverListener, SocketAddress address, NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup, EventLoopGroup bossGroup, EventLoopGroup workerGroup, int maxStreamsPerConnection) {
EventLoopGroup workerGroup, int maxStreamsPerConnection) { this(address, channelType, bossGroup, workerGroup, null, maxStreamsPerConnection);
this(serverListener, address, channelType, bossGroup, workerGroup, null,
maxStreamsPerConnection);
} }
NettyServer(final ServerListener serverListener, SocketAddress address, NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup, EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable SslContext sslContext,
EventLoopGroup workerGroup, @Nullable final SslContext sslContext, int maxStreamsPerConnection) {
final int maxStreamsPerConnection) {
this.address = address; this.address = address;
this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup"); this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup");
this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup"); this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup");
this.channelInitializer = new ChannelInitializer<Channel>() { this.sslContext = sslContext;
@Override this.maxStreamsPerConnection = maxStreamsPerConnection;
public void initChannel(Channel ch) throws Exception {
NettyServerTransport transport
= new NettyServerTransport(ch, serverListener, sslContext, maxStreamsPerConnection);
// TODO(ejona86): Ideally we wouldn't handle handler registration asyncly and then be forced
// to block for completion on another thread. This should be resolved as part of removing
// Service from server transport.
transport.startAsync().awaitRunning();
// TODO(nmittler): Should we wait for transport shutdown before shutting down server?
}
};
} }
@Override @Override
protected void doStart() { public void start(ServerListener serverListener) throws IOException {
listener = serverListener;
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup); b.group(bossGroup, workerGroup);
b.channel(channelType); b.channel(channelType);
@ -102,36 +96,42 @@ public class NettyServer extends AbstractService {
b.option(SO_BACKLOG, 128); b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true); b.childOption(SO_KEEPALIVE, true);
} }
b.childHandler(channelInitializer); b.childHandler(new ChannelInitializer<Channel>() {
// Bind and start to accept incoming connections.
b.bind(address).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void initChannel(Channel ch) throws Exception {
if (future.isSuccess()) { NettyServerTransport transport
channel = future.channel(); = new NettyServerTransport(ch, sslContext, maxStreamsPerConnection);
notifyStarted(); transport.start(listener.transportCreated(transport));
} else {
notifyFailed(future.cause());
}
} }
}); });
// Bind and start to accept incoming connections.
ChannelFuture future = b.bind(address);
try {
future.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted waiting for bind");
}
if (!future.isSuccess()) {
throw new IOException("Failed to bind", future.cause());
}
channel = future.channel();
} }
@Override @Override
protected void doStop() { public void shutdown() {
// Wait for the channel to close. if (channel == null || channel.isOpen()) {
if (channel != null && channel.isOpen()) { return;
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
notifyStopped();
} else {
notifyFailed(future.cause());
}
}
});
} }
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.log(Level.WARNING, "Error shutting down server", future.cause());
}
listener.serverShutdown();
}
});
} }
} }

View File

@ -32,13 +32,10 @@
package io.grpc.transport.netty; package io.grpc.transport.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import io.grpc.AbstractServerBuilder; import io.grpc.AbstractServerBuilder;
import io.grpc.HandlerRegistry; import io.grpc.HandlerRegistry;
import io.grpc.SharedResourceHolder; import io.grpc.SharedResourceHolder;
import io.grpc.transport.ServerListener;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -177,44 +174,25 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
} }
@Override @Override
protected Service buildTransportServer(ServerListener serverListener) { protected ServerEssentials buildEssentials() {
final EventLoopGroup bossEventLoopGroup = (userBossEventLoopGroup == null) final EventLoopGroup bossEventLoopGroup = (userBossEventLoopGroup == null)
? SharedResourceHolder.get(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP) : userBossEventLoopGroup; ? SharedResourceHolder.get(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP) : userBossEventLoopGroup;
final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null) final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) ? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
: userWorkerEventLoopGroup; : userWorkerEventLoopGroup;
NettyServer server = new NettyServer(serverListener, address, channelType, bossEventLoopGroup, NettyServer server = new NettyServer(address, channelType, bossEventLoopGroup,
workerEventLoopGroup, sslContext, maxConcurrentCallsPerConnection); workerEventLoopGroup, sslContext, maxConcurrentCallsPerConnection);
if (userBossEventLoopGroup == null) { Runnable terminationRunnable = new Runnable() {
server.addListener(new ClosureHook() { @Override
@Override public void run() {
protected void onClosed() { if (userBossEventLoopGroup == null) {
SharedResourceHolder.release(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP, bossEventLoopGroup); SharedResourceHolder.release(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP, bossEventLoopGroup);
} }
}, MoreExecutors.directExecutor()); if (userWorkerEventLoopGroup == null) {
}
if (userWorkerEventLoopGroup == null) {
server.addListener(new ClosureHook() {
@Override
protected void onClosed() {
SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, workerEventLoopGroup); SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, workerEventLoopGroup);
} }
}, MoreExecutors.directExecutor()); }
} };
return server; return new ServerEssentials(server, terminationRunnable);
}
private abstract static class ClosureHook extends Service.Listener {
protected abstract void onClosed();
@Override
public void terminated(Service.State from) {
onClosed();
}
@Override
public void failed(Service.State from, Throwable failure) {
onClosed();
}
} }
} }

View File

@ -32,9 +32,8 @@
package io.grpc.transport.netty; package io.grpc.transport.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import io.grpc.transport.ServerListener; import io.grpc.transport.ServerTransport;
import io.grpc.transport.ServerTransportListener; import io.grpc.transport.ServerTransportListener;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -51,50 +50,48 @@ import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* The Netty-based server transport. * The Netty-based server transport.
*/ */
class NettyServerTransport extends AbstractService { class NettyServerTransport implements ServerTransport {
private static final Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG); private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName());
private final Channel channel;
private final ServerListener serverListener;
private final SslContext sslContext;
private NettyServerHandler handler;
private int maxStreams;
NettyServerTransport(Channel channel, ServerListener serverListener, private final Channel channel;
@Nullable SslContext sslContext, int maxStreams) { private final SslContext sslContext;
private final int maxStreams;
private ServerTransportListener listener;
private boolean terminated;
NettyServerTransport(Channel channel, @Nullable SslContext sslContext, int maxStreams) {
this.channel = Preconditions.checkNotNull(channel, "channel"); this.channel = Preconditions.checkNotNull(channel, "channel");
this.serverListener = Preconditions.checkNotNull(serverListener, "serverListener");
this.sslContext = sslContext; this.sslContext = sslContext;
this.maxStreams = maxStreams; this.maxStreams = maxStreams;
} }
@Override public void start(ServerTransportListener listener) {
protected void doStart() { Preconditions.checkState(this.listener == null, "Handler already registered");
Preconditions.checkState(handler == null, "Handler already registered"); this.listener = listener;
// Notify the listener that this transport is being constructed.
ServerTransportListener transportListener = serverListener.transportCreated(this);
// Create the Netty handler for the pipeline. // Create the Netty handler for the pipeline.
handler = createHandler(transportListener); final NettyServerHandler handler = createHandler(listener);
// Notify when the channel closes. // Notify when the channel closes.
channel.closeFuture().addListener(new ChannelFutureListener() { channel.closeFuture().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
// Close failed. notifyTerminated(future.cause());
notifyFailed(future.cause());
} else if (handler.connectionError() != null) { } else if (handler.connectionError() != null) {
// The handler encountered a connection error. // The handler encountered a connection error.
notifyFailed(handler.connectionError()); notifyTerminated(handler.connectionError());
} else { } else {
// Normal termination of the connection. // Normal termination of the connection.
notifyStopped(); notifyTerminated(null);
} }
} }
}); });
@ -103,24 +100,31 @@ class NettyServerTransport extends AbstractService {
channel.pipeline().addLast(Http2Negotiator.serverTls(sslContext.newEngine(channel.alloc()))); channel.pipeline().addLast(Http2Negotiator.serverTls(sslContext.newEngine(channel.alloc())));
} }
channel.pipeline().addLast(handler); channel.pipeline().addLast(handler);
notifyStarted();
} }
@Override @Override
protected void doStop() { public void shutdown() {
// No explicit call to notifyStopped() here, since this is automatically done when the
// channel closes.
if (channel.isOpen()) { if (channel.isOpen()) {
channel.close(); channel.close();
} }
} }
private void notifyTerminated(Throwable t) {
if (t != null) {
log.log(Level.SEVERE, "Transport failed", t);
}
if (!terminated) {
terminated = true;
listener.transportTerminated();
}
}
/** /**
* Creates the Netty handler to be used in the channel pipeline. * Creates the Netty handler to be used in the channel pipeline.
*/ */
private NettyServerHandler createHandler(ServerTransportListener transportListener) { private NettyServerHandler createHandler(ServerTransportListener transportListener) {
Http2Connection connection = new DefaultHttp2Connection(true); Http2Connection connection = new DefaultHttp2Connection(true);
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG);
Http2FrameReader frameReader = Http2FrameReader frameReader =
new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), frameLogger); new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), frameLogger);
Http2FrameWriter frameWriter = Http2FrameWriter frameWriter =