mirror of https://github.com/grpc/grpc-java.git
parent
19052499f7
commit
aeeebb7cdb
|
|
@ -31,12 +31,13 @@
|
|||
|
||||
package com.google.net.stubby;
|
||||
|
||||
import static com.google.net.stubby.AbstractServiceBuilder.DEFAULT_EXECUTOR;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.net.stubby.SharedResourceHolder.Resource;
|
||||
import com.google.net.stubby.transport.ClientTransportFactory;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
|
|
@ -46,6 +47,26 @@ import javax.annotation.Nullable;
|
|||
* @param <BuilderT> The concrete type of this builder.
|
||||
*/
|
||||
public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>> {
|
||||
static final Resource<ExecutorService> DEFAULT_EXECUTOR =
|
||||
new Resource<ExecutorService>() {
|
||||
private static final String name = "grpc-default-executor";
|
||||
@Override
|
||||
public ExecutorService create() {
|
||||
return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
|
||||
.setNameFormat(name + "-%d").build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ExecutorService instance) {
|
||||
instance.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
};
|
||||
|
||||
@Nullable
|
||||
private ExecutorService userExecutor;
|
||||
|
||||
|
|
|
|||
|
|
@ -31,21 +31,26 @@
|
|||
|
||||
package com.google.net.stubby;
|
||||
|
||||
import static com.google.net.stubby.AbstractChannelBuilder.DEFAULT_EXECUTOR;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
import com.google.net.stubby.transport.ServerListener;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* The base class for server builders.
|
||||
*
|
||||
* @param <BuilderT> The concrete type for this builder.
|
||||
*/
|
||||
public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>>
|
||||
extends AbstractServiceBuilder<ServerImpl, BuilderT> {
|
||||
public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>> {
|
||||
|
||||
private final HandlerRegistry registry;
|
||||
@Nullable
|
||||
private ExecutorService userExecutor;
|
||||
|
||||
/**
|
||||
* Constructs using a given handler registry.
|
||||
|
|
@ -61,6 +66,21 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild
|
|||
this.registry = new MutableHandlerRegistryImpl();
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a custom executor.
|
||||
*
|
||||
* <p>It's an optional parameter. If the user has not provided an executor when the server is
|
||||
* built, the builder will use a static cached thread pool.
|
||||
*
|
||||
* <p>The server won't take ownership of the given executor. It's caller's responsibility to
|
||||
* shut down the executor when it's desired.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final BuilderT executor(ExecutorService executor) {
|
||||
userExecutor = executor;
|
||||
return (BuilderT) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a service implementation to the handler registry.
|
||||
*
|
||||
|
|
@ -76,10 +96,33 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild
|
|||
throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final ServerImpl buildImpl(ExecutorService executor) {
|
||||
/**
|
||||
* Builds a server using the given parameters.
|
||||
*
|
||||
* <p>The returned service will not been started or be bound a port. You will need to start it
|
||||
* with {@link ServerImpl#start()}.
|
||||
*/
|
||||
public ServerImpl build() {
|
||||
final ExecutorService executor;
|
||||
final boolean releaseExecutor;
|
||||
if (userExecutor != null) {
|
||||
executor = userExecutor;
|
||||
releaseExecutor = false;
|
||||
} else {
|
||||
executor = SharedResourceHolder.get(DEFAULT_EXECUTOR);
|
||||
releaseExecutor = true;
|
||||
}
|
||||
|
||||
ServerImpl server = new ServerImpl(executor, registry);
|
||||
server.setTransportServer(buildTransportServer(server.serverListener()));
|
||||
server.setTerminationRunnable(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (releaseExecutor) {
|
||||
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
|
||||
}
|
||||
}
|
||||
});
|
||||
return server;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,194 +0,0 @@
|
|||
/*
|
||||
* Copyright 2014, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package com.google.net.stubby;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.net.stubby.SharedResourceHolder.Resource;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Base class for channel builders and server builders.
|
||||
*
|
||||
* <p>The ownership rule: a builder generally does not take ownership of any objects passed to it.
|
||||
* The caller is responsible for closing them if needed. The builder is only responsible for the
|
||||
* life-cycle of objects created inside.
|
||||
*
|
||||
* @param <ProductT> The product that is built by this builder.
|
||||
* @param <BuilderT> The concrete type of this builder.
|
||||
*/
|
||||
abstract class AbstractServiceBuilder<ProductT extends Service,
|
||||
BuilderT extends AbstractServiceBuilder<ProductT, BuilderT>> {
|
||||
|
||||
static final Resource<ExecutorService> DEFAULT_EXECUTOR =
|
||||
new Resource<ExecutorService>() {
|
||||
private static final String name = "grpc-default-executor";
|
||||
@Override
|
||||
public ExecutorService create() {
|
||||
return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
|
||||
.setNameFormat(name + "-%d").build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ExecutorService instance) {
|
||||
instance.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
};
|
||||
|
||||
@Nullable
|
||||
private ExecutorService userExecutor;
|
||||
|
||||
/**
|
||||
* Provides a custom executor.
|
||||
*
|
||||
* <p>It's an optional parameter. If the user has not provided an executor when the service is
|
||||
* built, the builder will use a static cached thread pool.
|
||||
*
|
||||
* <p>The service won't take ownership of the given executor. It's caller's responsibility to
|
||||
* shut down the executor when it's desired.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final BuilderT executor(ExecutorService executor) {
|
||||
userExecutor = executor;
|
||||
return (BuilderT) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a service using the given parameters.
|
||||
*
|
||||
* <p>The returned service has not been started at this point. You will need to start it by
|
||||
* yourself or use {@link #buildAndStart()}.
|
||||
*/
|
||||
public ProductT build() {
|
||||
final ExecutorService executor = (userExecutor == null)
|
||||
? SharedResourceHolder.get(DEFAULT_EXECUTOR) : userExecutor;
|
||||
ProductT service = buildImpl(executor);
|
||||
// We shut down the executor only if we created it.
|
||||
if (userExecutor == null) {
|
||||
service.addListener(new ClosureHook() {
|
||||
@Override
|
||||
protected void onClosed() {
|
||||
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds and starts a service.
|
||||
*
|
||||
* <p>The service may not be running when this method returns. If you want to wait until it's up
|
||||
* and running, either use {@link Service#awaitRunning()} or {@link #buildAndWaitForRunning()}.
|
||||
*
|
||||
* @return the service that has just been built and started
|
||||
*/
|
||||
public final ProductT buildAndStart() {
|
||||
ProductT service = build();
|
||||
service.startAsync();
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds and starts a service, and wait until it's up and running.
|
||||
*
|
||||
* @return the service that has just been built and is now running.
|
||||
*/
|
||||
public final ProductT buildAndWaitForRunning() {
|
||||
ProductT service = buildAndStart();
|
||||
try {
|
||||
service.awaitRunning();
|
||||
} catch (Exception e) {
|
||||
service.stopAsync();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds and starts a service, and wait until it's up and running, with a timeout.
|
||||
*
|
||||
* @return the service that has just been built and is now running.
|
||||
* @throws TimeoutException if the service didn't become running within the given timeout.
|
||||
*/
|
||||
public final ProductT buildAndWaitForRunning(long timeout, TimeUnit unit)
|
||||
throws TimeoutException {
|
||||
ProductT service = buildAndStart();
|
||||
try {
|
||||
service.awaitRunning(timeout, unit);
|
||||
} catch (Exception e) {
|
||||
service.stopAsync();
|
||||
if (e instanceof TimeoutException) {
|
||||
throw (TimeoutException) e;
|
||||
} else {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses may use this as a convenient listener for cleaning up after the built service.
|
||||
*/
|
||||
protected abstract static class ClosureHook extends Service.Listener {
|
||||
protected abstract void onClosed();
|
||||
|
||||
@Override
|
||||
public void terminated(Service.State from) {
|
||||
onClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Service.State from, Throwable failure) {
|
||||
onClosed();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implemented by subclasses to build the actual service object. The given executor is owned by
|
||||
* this base class.
|
||||
*/
|
||||
protected abstract ProductT buildImpl(ExecutorService executor);
|
||||
}
|
||||
|
|
@ -31,8 +31,6 @@
|
|||
|
||||
package com.google.net.stubby;
|
||||
|
||||
import com.google.common.util.concurrent.Service;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
|
|
@ -40,4 +38,4 @@ import javax.annotation.concurrent.ThreadSafe;
|
|||
* not expected to be implemented by application code or interceptors.
|
||||
*/
|
||||
@ThreadSafe
|
||||
public interface Server extends Service {}
|
||||
public interface Server {}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ package com.google.net.stubby;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.AbstractService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.Service;
|
||||
import com.google.net.stubby.transport.ServerListener;
|
||||
|
|
@ -47,6 +46,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link Server}, for creation by transports.
|
||||
|
|
@ -64,7 +64,7 @@ import java.util.concurrent.Executor;
|
|||
* <p>Starting the server starts the underlying transport for servicing requests. Stopping the
|
||||
* server stops servicing new requests and waits for all connections to terminate.
|
||||
*/
|
||||
public class ServerImpl extends AbstractService implements Server {
|
||||
public class ServerImpl implements Server {
|
||||
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
|
||||
|
||||
private final ServerListener serverListener = new ServerListenerImpl();
|
||||
|
|
@ -72,11 +72,14 @@ public class ServerImpl extends AbstractService implements Server {
|
|||
/** Executor for application processing. */
|
||||
private final Executor executor;
|
||||
private final HandlerRegistry registry;
|
||||
private boolean started;
|
||||
private boolean shutdown;
|
||||
private boolean terminated;
|
||||
private Runnable terminationRunnable;
|
||||
/** Service encapsulating something similar to an accept() socket. */
|
||||
private Service transportServer;
|
||||
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
|
||||
private final Collection<Service> transports
|
||||
= Collections.synchronizedSet(new HashSet<Service>());
|
||||
private final Collection<Service> transports = new HashSet<Service>();
|
||||
|
||||
/**
|
||||
* Construct a server. {@link #setTransportServer(Service)} must be called before starting the
|
||||
|
|
@ -96,18 +99,17 @@ public class ServerImpl extends AbstractService implements Server {
|
|||
*
|
||||
* @return this object
|
||||
*/
|
||||
public ServerImpl setTransportServer(Service transportServer) {
|
||||
Preconditions.checkState(state() == Server.State.NEW, "server must be in NEW state");
|
||||
public synchronized ServerImpl setTransportServer(Service transportServer) {
|
||||
if (shutdown) {
|
||||
throw new IllegalStateException("Already shutdown");
|
||||
}
|
||||
Preconditions.checkState(this.transportServer == null, "transportServer already set");
|
||||
this.transportServer = Preconditions.checkNotNull(transportServer);
|
||||
Preconditions.checkArgument(
|
||||
transportServer.state() == Server.State.NEW, "transport server not in NEW state");
|
||||
transportServer.addListener(new TransportLifecycleListener(), MoreExecutors.directExecutor());
|
||||
transportServer.state() == Service.State.NEW, "transport server not in NEW state");
|
||||
transportServer.addListener(new TransportServiceListener(transportServer),
|
||||
MoreExecutors.directExecutor());
|
||||
transports.add(transportServer);
|
||||
// We assume that transport.state() won't change by another thread before we return from this
|
||||
// call.
|
||||
Preconditions.checkState(
|
||||
transportServer.state() == Server.State.NEW, "transport server changed state!");
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -116,64 +118,114 @@ public class ServerImpl extends AbstractService implements Server {
|
|||
return serverListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
Preconditions.checkState(transportServer != null, "setTransportServer not called");
|
||||
transportServer.startAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
stopTransports();
|
||||
/** Hack to allow executors to auto-shutdown. Not for general use. */
|
||||
// TODO(ejona): Replace with a real API.
|
||||
synchronized void setTerminationRunnable(Runnable runnable) {
|
||||
this.terminationRunnable = runnable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove transport service from accounting list and notify of complete shutdown if necessary.
|
||||
* Bind and start the server.
|
||||
*
|
||||
* @return {@code this} object
|
||||
* @throws IllegalStateException if already started
|
||||
*/
|
||||
public synchronized ServerImpl start() {
|
||||
if (started) {
|
||||
throw new IllegalStateException("Already started");
|
||||
}
|
||||
started = true;
|
||||
try {
|
||||
// Start and wait for any port to actually be bound.
|
||||
transportServer.startAsync().awaitRunning();
|
||||
} catch (IllegalStateException ex) {
|
||||
Throwable t = transportServer.failureCause();
|
||||
if (t != null) {
|
||||
throw Throwables.propagate(t);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
|
||||
*/
|
||||
public synchronized ServerImpl shutdown() {
|
||||
shutdown = true;
|
||||
// transports collection can be modified during stopAsync(), even if we hold the lock, due to
|
||||
// reentrancy.
|
||||
for (Service transport : transports.toArray(new Service[transports.size()])) {
|
||||
transport.stopAsync();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates a forceful shutdown in which preexisting and new calls are rejected. Although
|
||||
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
|
||||
* return {@code false} immediately after this method returns.
|
||||
*
|
||||
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
|
||||
*/
|
||||
// TODO(ejona): cancel preexisting calls.
|
||||
public synchronized ServerImpl shutdownNow() {
|
||||
shutdown();
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the server is shutdown. Shutdown servers reject any new calls, but may still
|
||||
* have some calls being processed.
|
||||
*
|
||||
* @see #shutdown()
|
||||
* @see #isTerminated()
|
||||
*/
|
||||
public synchronized boolean isShutdown() {
|
||||
return shutdown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the server to become terminated, giving up if the timeout is reached.
|
||||
*
|
||||
* @return whether the server is terminated, as would be done by {@link #isTerminated()}.
|
||||
*/
|
||||
public synchronized boolean awaitTerminated(long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
long timeoutNanos = unit.toNanos(timeout);
|
||||
long endTimeNanos = System.nanoTime() + timeoutNanos;
|
||||
while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
|
||||
TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos);
|
||||
}
|
||||
return terminated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the server is terminated. Terminated servers have no running calls and
|
||||
* relevant resources released (like TCP connections).
|
||||
*
|
||||
* @see #isShutdown()
|
||||
*/
|
||||
public synchronized boolean isTerminated() {
|
||||
return terminated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove transport service from accounting collection and notify of complete shutdown if
|
||||
* necessary.
|
||||
*
|
||||
* @param transport service to remove
|
||||
* @return {@code true} if shutting down and it is now complete
|
||||
*/
|
||||
private boolean transportClosed(Service transport) {
|
||||
boolean shutdownComplete;
|
||||
synchronized (transports) {
|
||||
if (!transports.remove(transport)) {
|
||||
throw new AssertionError("Transport already removed");
|
||||
private synchronized void transportClosed(Service transport) {
|
||||
if (!transports.remove(transport)) {
|
||||
throw new AssertionError("Transport already removed");
|
||||
}
|
||||
if (shutdown && transports.isEmpty()) {
|
||||
terminated = true;
|
||||
notifyAll();
|
||||
if (terminationRunnable != null) {
|
||||
terminationRunnable.run();
|
||||
}
|
||||
shutdownComplete = transports.isEmpty();
|
||||
}
|
||||
if (shutdownComplete) {
|
||||
Service.State state = state();
|
||||
if (state == Service.State.STOPPING) {
|
||||
notifyStopped();
|
||||
} else if (state == Service.State.FAILED) {
|
||||
// NOOP: already failed
|
||||
} else {
|
||||
notifyFailed(new IllegalStateException("server transport terminated unexpectedly"));
|
||||
}
|
||||
}
|
||||
return shutdownComplete;
|
||||
}
|
||||
|
||||
/**
|
||||
* The transport server closed, so cleanup its resources and start shutdown.
|
||||
*/
|
||||
private void transportServerClosed() {
|
||||
boolean shutdownComplete = transportClosed(transportServer);
|
||||
if (shutdownComplete) {
|
||||
return;
|
||||
}
|
||||
stopTransports();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all transports (including transportServer). Safe to be called even if previously
|
||||
* called.
|
||||
*/
|
||||
private void stopTransports() {
|
||||
for (Service transport : transports.toArray(new Service[0])) {
|
||||
// transports list can be modified during this call, even if we hold the lock, due to
|
||||
// reentrancy.
|
||||
transport.stopAsync();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -184,12 +236,14 @@ public class ServerImpl extends AbstractService implements Server {
|
|||
Preconditions.checkArgument(
|
||||
transportState == Service.State.STARTING || transportState == Service.State.RUNNING,
|
||||
"Created transport should be starting or running");
|
||||
if (state() != Server.State.RUNNING) {
|
||||
transport.stopAsync();
|
||||
return serverTransportListener;
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
transport.stopAsync();
|
||||
return serverTransportListener;
|
||||
}
|
||||
transports.add(transport);
|
||||
}
|
||||
transports.add(transport);
|
||||
// transports list can be modified during this call, even if we hold the lock, due to
|
||||
// transports collection can be modified during this call, even if we hold the lock, due to
|
||||
// reentrancy.
|
||||
transport.addListener(new TransportServiceListener(transport),
|
||||
MoreExecutors.directExecutor());
|
||||
|
|
@ -201,28 +255,6 @@ public class ServerImpl extends AbstractService implements Server {
|
|||
}
|
||||
}
|
||||
|
||||
/** Listens for lifecycle changes to the "accept() socket." */
|
||||
private class TransportLifecycleListener extends Service.Listener {
|
||||
@Override
|
||||
public void running() {
|
||||
notifyStarted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void terminated(Service.State from) {
|
||||
transportServerClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Service.State from, Throwable failure) {
|
||||
// TODO(ejona): Ideally we would want to force-stop transports before notifying application of
|
||||
// failure, but that would cause us to have an unrepresentative state since we would be
|
||||
// RUNNING but not accepting connections.
|
||||
notifyFailed(failure);
|
||||
transportServerClosed();
|
||||
}
|
||||
}
|
||||
|
||||
/** Listens for lifecycle changes to a "TCP connection." */
|
||||
private class TransportServiceListener extends Service.Listener {
|
||||
private final Service transport;
|
||||
|
|
|
|||
|
|
@ -33,7 +33,10 @@ package com.google.net.stubby;
|
|||
|
||||
import static com.google.common.base.Charsets.UTF_8;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
import static org.mockito.Matchers.notNull;
|
||||
|
|
@ -68,6 +71,7 @@ import java.util.concurrent.CyclicBarrier;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/** Unit tests for {@link ServerImpl}. */
|
||||
|
|
@ -92,8 +96,7 @@ public class ServerImplTest {
|
|||
public void startup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
server.startAsync();
|
||||
server.awaitRunning();
|
||||
server.start();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -102,50 +105,33 @@ public class ServerImplTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void startStopImmediate() {
|
||||
public void startStopImmediate() throws InterruptedException {
|
||||
Service transportServer = new NoopService();
|
||||
Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
assertEquals(Service.State.NEW, server.state());
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
assertEquals(Service.State.NEW, transportServer.state());
|
||||
server.startAsync();
|
||||
server.awaitRunning();
|
||||
assertEquals(Service.State.RUNNING, server.state());
|
||||
server.start();
|
||||
assertEquals(Service.State.RUNNING, transportServer.state());
|
||||
server.stopAsync();
|
||||
server.awaitTerminated();
|
||||
assertEquals(Service.State.TERMINATED, server.state());
|
||||
server.shutdown();
|
||||
assertTrue(server.awaitTerminated(100, TimeUnit.MILLISECONDS));
|
||||
assertEquals(Service.State.TERMINATED, transportServer.state());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transportServerFailureFailsServer() {
|
||||
class FailableService extends NoopService {
|
||||
public void doNotifyFailed(Throwable cause) {
|
||||
notifyFailed(cause);
|
||||
}
|
||||
}
|
||||
FailableService transportServer = new FailableService();
|
||||
Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
server.startAsync();
|
||||
server.awaitRunning();
|
||||
RuntimeException ex = new RuntimeException("force failure");
|
||||
transportServer.doNotifyFailed(ex);
|
||||
assertEquals(Service.State.FAILED, server.state());
|
||||
assertEquals(ex, server.failureCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void transportServerFailsStartup() {
|
||||
final Exception ex = new RuntimeException();
|
||||
class FailingStartupService extends NoopService {
|
||||
@Override
|
||||
public void doStart() {
|
||||
notifyFailed(new RuntimeException());
|
||||
notifyFailed(ex);
|
||||
}
|
||||
}
|
||||
FailingStartupService transportServer = new FailingStartupService();
|
||||
Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
server.startAsync();
|
||||
assertEquals(Service.State.FAILED, server.state());
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
try {
|
||||
server.start();
|
||||
} catch (Exception e) {
|
||||
assertSame(ex, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -159,20 +145,20 @@ public class ServerImplTest {
|
|||
public void doStop() {} // Don't notify.
|
||||
}
|
||||
NoopService transportServer = new NoopService();
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
server.startAsync();
|
||||
server.awaitRunning();
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
|
||||
.start();
|
||||
ManualStoppedService transport = new ManualStoppedService();
|
||||
transport.startAsync();
|
||||
server.serverListener().transportCreated(transport);
|
||||
server.stopAsync();
|
||||
server.shutdown();
|
||||
assertEquals(Service.State.STOPPING, transport.state());
|
||||
assertEquals(Service.State.TERMINATED, transportServer.state());
|
||||
assertEquals(Service.State.STOPPING, server.state());
|
||||
assertTrue(server.isShutdown());
|
||||
assertFalse(server.isTerminated());
|
||||
|
||||
transport.doNotifyStopped();
|
||||
assertEquals(Service.State.TERMINATED, transport.state());
|
||||
assertEquals(Service.State.TERMINATED, server.state());
|
||||
assertTrue(server.isTerminated());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -186,20 +172,20 @@ public class ServerImplTest {
|
|||
public void doStop() {} // Don't notify.
|
||||
}
|
||||
ManualStoppedService transportServer = new ManualStoppedService();
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
|
||||
server.startAsync();
|
||||
server.awaitRunning();
|
||||
ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
|
||||
.start();
|
||||
Service transport = new NoopService();
|
||||
transport.startAsync();
|
||||
server.serverListener().transportCreated(transport);
|
||||
server.stopAsync();
|
||||
server.shutdown();
|
||||
assertEquals(Service.State.TERMINATED, transport.state());
|
||||
assertEquals(Service.State.STOPPING, transportServer.state());
|
||||
assertEquals(Service.State.STOPPING, server.state());
|
||||
assertTrue(server.isShutdown());
|
||||
assertFalse(server.isTerminated());
|
||||
|
||||
transportServer.doNotifyStopped();
|
||||
assertEquals(Service.State.TERMINATED, transportServer.state());
|
||||
assertEquals(Service.State.TERMINATED, server.state());
|
||||
assertTrue(server.isTerminated());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -99,11 +99,11 @@ public abstract class AbstractTransportTest {
|
|||
.addService(ServerInterceptors.intercept(
|
||||
TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)),
|
||||
TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)))
|
||||
.buildAndWaitForRunning();
|
||||
.build().start();
|
||||
}
|
||||
|
||||
protected static void stopStaticServer() {
|
||||
server.stopAsync();
|
||||
server.shutdownNow();
|
||||
testServiceExecutor.shutdown();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -138,14 +138,14 @@ public class TestServiceServer {
|
|||
.addService(ServerInterceptors.intercept(
|
||||
TestServiceGrpc.bindService(new TestServiceImpl(executor)),
|
||||
TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)))
|
||||
.build();
|
||||
server.startAsync();
|
||||
server.awaitRunning(5, TimeUnit.SECONDS);
|
||||
.build().start();
|
||||
}
|
||||
|
||||
private void stop() throws Exception {
|
||||
server.stopAsync();
|
||||
server.awaitTerminated();
|
||||
server.shutdownNow();
|
||||
if (!server.awaitTerminated(5, TimeUnit.SECONDS)) {
|
||||
System.err.println("Timed out waiting for server shutdown");
|
||||
}
|
||||
MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,4 +143,18 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
|
|||
}
|
||||
return server;
|
||||
}
|
||||
|
||||
private abstract static class ClosureHook extends Service.Listener {
|
||||
protected abstract void onClosed();
|
||||
|
||||
@Override
|
||||
public void terminated(Service.State from) {
|
||||
onClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Service.State from, Throwable failure) {
|
||||
onClosed();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue