core: Server uses transport's ScheduledExecutorService

For Netty, this reduces the number of threads necessary for servers (although
until channel is converted, actual number of threads isn't impacted) and
naturally reduces contention and timeout latency.

For InProcess, this gets us closer to allowing applications to provide all
executors, which is especially useful during tests.
This commit is contained in:
Eric Anderson 2017-07-19 10:31:26 -07:00 committed by GitHub
parent d325919f62
commit 994f200d15
10 changed files with 114 additions and 36 deletions

View File

@ -16,12 +16,17 @@
package io.grpc.inprocess; package io.grpc.inprocess;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener; import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.SharedResourcePool;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe @ThreadSafe
@ -36,14 +41,28 @@ final class InProcessServer implements InternalServer {
private final String name; private final String name;
private ServerListener listener; private ServerListener listener;
private boolean shutdown; private boolean shutdown;
/** Expected to be a SharedResourcePool except in testing. */
private final ObjectPool<ScheduledExecutorService> schedulerPool;
/**
* Only used to make sure the scheduler has at least one reference. Since child transports can
* outlive this server, they must get their own reference.
*/
private ScheduledExecutorService scheduler;
InProcessServer(String name) { InProcessServer(String name, Resource<ScheduledExecutorService> schedulerResource) {
this(name, SharedResourcePool.forResource(schedulerResource));
}
@VisibleForTesting
InProcessServer(String name, ObjectPool<ScheduledExecutorService> schedulerPool) {
this.name = name; this.name = name;
this.schedulerPool = schedulerPool;
} }
@Override @Override
public void start(ServerListener serverListener) throws IOException { public void start(ServerListener serverListener) throws IOException {
this.listener = serverListener; this.listener = serverListener;
this.scheduler = schedulerPool.getObject();
// Must be last, as channels can start connecting after this point. // Must be last, as channels can start connecting after this point.
if (registry.putIfAbsent(name, this) != null) { if (registry.putIfAbsent(name, this) != null) {
throw new IOException("name already registered: " + name); throw new IOException("name already registered: " + name);
@ -60,6 +79,7 @@ final class InProcessServer implements InternalServer {
if (!registry.remove(name, this)) { if (!registry.remove(name, this)) {
throw new AssertionError(); throw new AssertionError();
} }
scheduler = schedulerPool.returnObject(scheduler);
synchronized (this) { synchronized (this) {
shutdown = true; shutdown = true;
listener.serverShutdown(); listener.serverShutdown();
@ -72,4 +92,8 @@ final class InProcessServer implements InternalServer {
} }
return listener.transportCreated(transport); return listener.transportCreated(transport);
} }
ObjectPool<ScheduledExecutorService> getScheduledExecutorServicePool() {
return schedulerPool;
}
} }

View File

@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import java.io.File; import java.io.File;
import java.util.List; import java.util.List;
@ -79,7 +80,7 @@ public final class InProcessServerBuilder
// TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are // TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are
// not counted. Therefore, we disable stats for now. // not counted. Therefore, we disable stats for now.
// (https://github.com/grpc/grpc-java/issues/2284) // (https://github.com/grpc/grpc-java/issues/2284)
return new InProcessServer(name); return new InProcessServer(name, GrpcUtil.TIMER_SERVICE);
} }
@Override @Override

View File

@ -33,6 +33,7 @@ import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.LogId; import io.grpc.internal.LogId;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream; import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransport;
@ -45,6 +46,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
@ -58,6 +60,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final LogId logId = LogId.allocate(getClass().getName()); private final LogId logId = LogId.allocate(getClass().getName());
private final String name; private final String name;
private final String authority; private final String authority;
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
private ServerTransportListener serverTransportListener; private ServerTransportListener serverTransportListener;
private Attributes serverStreamAttributes; private Attributes serverStreamAttributes;
private ManagedClientTransport.Listener clientTransportListener; private ManagedClientTransport.Listener clientTransportListener;
@ -70,10 +74,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
@GuardedBy("this") @GuardedBy("this")
private Set<InProcessStream> streams = new HashSet<InProcessStream>(); private Set<InProcessStream> streams = new HashSet<InProcessStream>();
public InProcessTransport(String name) {
this(name, null);
}
public InProcessTransport(String name, String authority) { public InProcessTransport(String name, String authority) {
this.name = name; this.name = name;
this.authority = authority; this.authority = authority;
@ -85,6 +85,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
this.clientTransportListener = listener; this.clientTransportListener = listener;
InProcessServer server = InProcessServer.findServer(name); InProcessServer server = InProcessServer.findServer(name);
if (server != null) { if (server != null) {
serverSchedulerPool = server.getScheduledExecutorServicePool();
serverScheduler = serverSchedulerPool.getObject();
// Must be semi-initialized; past this point, can begin receiving requests
serverTransportListener = server.register(this); serverTransportListener = server.register(this);
} }
if (serverTransportListener == null) { if (serverTransportListener == null) {
@ -200,6 +203,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
return Attributes.EMPTY; return Attributes.EMPTY;
} }
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return serverScheduler;
}
private synchronized void notifyShutdown(Status s) { private synchronized void notifyShutdown(Status s) {
if (shutdown) { if (shutdown) {
return; return;
@ -213,6 +221,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
return; return;
} }
terminated = true; terminated = true;
if (serverScheduler != null) {
serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
}
clientTransportListener.transportTerminated(); clientTransportListener.transportTerminated();
if (serverTransportListener != null) { if (serverTransportListener != null) {
serverTransportListener.transportTerminated(); serverTransportListener.transportTerminated();

View File

@ -185,7 +185,6 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
public Server build() { public Server build() {
ServerImpl server = new ServerImpl( ServerImpl server = new ServerImpl(
this, this,
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
buildTransportServer(Collections.unmodifiableList(getTracerFactories())), buildTransportServer(Collections.unmodifiableList(getTracerFactories())),
Context.ROOT); Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) { for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {

View File

@ -49,7 +49,6 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
@ -95,8 +94,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
@GuardedBy("lock") private final Collection<ServerTransport> transports = @GuardedBy("lock") private final Collection<ServerTransport> transports =
new HashSet<ServerTransport>(); new HashSet<ServerTransport>();
private final ObjectPool<ScheduledExecutorService> timeoutServicePool;
private ScheduledExecutorService timeoutService;
private final Context rootContext; private final Context rootContext;
private final DecompressorRegistry decompressorRegistry; private final DecompressorRegistry decompressorRegistry;
@ -105,18 +102,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
/** /**
* Construct a server. * Construct a server.
* *
* @param executorPool provides an executor to call methods on behalf of remote clients * @param builder builder with configuration for server
* @param registry the primary method registry * @param transportServer transport server that will create new incoming transports
* @param fallbackRegistry the secondary method registry, used only if the primary registry * @param rootContext context that callbacks for new RPCs should be derived from
* doesn't have the method
*/ */
ServerImpl( ServerImpl(
AbstractServerImplBuilder<?> builder, AbstractServerImplBuilder<?> builder,
ObjectPool<ScheduledExecutorService> timeoutServicePool,
InternalServer transportServer, InternalServer transportServer,
Context rootContext) { Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.timeoutServicePool = Preconditions.checkNotNull(timeoutServicePool, "timeoutServicePool");
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry = this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
@ -146,7 +140,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
checkState(!shutdown, "Shutting down"); checkState(!shutdown, "Shutting down");
// Start and wait for any port to actually be bound. // Start and wait for any port to actually be bound.
transportServer.start(new ServerListenerImpl()); transportServer.start(new ServerListenerImpl());
timeoutService = Preconditions.checkNotNull(timeoutServicePool.getObject(), "timeoutService");
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true; started = true;
return this; return this;
@ -297,9 +290,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
throw new AssertionError("Server already terminated"); throw new AssertionError("Server already terminated");
} }
terminated = true; terminated = true;
if (timeoutService != null) {
timeoutService = timeoutServicePool.returnObject(timeoutService);
}
if (executor != null) { if (executor != null) {
executor = executorPool.returnObject(executor); executor = executorPool.returnObject(executor);
} }
@ -452,8 +442,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
return baseContext.withCancellation(); return baseContext.withCancellation();
} }
Context.CancellableContext context = Context.CancellableContext context = baseContext.withDeadlineAfter(
baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
context.addListener(new Context.CancellationListener() { context.addListener(new Context.CancellationListener() {
@Override @Override
public void cancelled(Context context) { public void cancelled(Context context) {

View File

@ -17,6 +17,7 @@
package io.grpc.internal; package io.grpc.internal;
import io.grpc.Status; import io.grpc.Status;
import java.util.concurrent.ScheduledExecutorService;
/** An inbound connection. */ /** An inbound connection. */
public interface ServerTransport extends WithLogId { public interface ServerTransport extends WithLogId {
@ -32,4 +33,13 @@ public interface ServerTransport extends WithLogId {
* should be closed with the provided {@code reason}. * should be closed with the provided {@code reason}.
*/ */
void shutdownNow(Status reason); void shutdownNow(Status reason);
/**
* Returns an executor for scheduling provided by the transport. The service should be configured
* to allow cancelled scheduled runnables to be GCed.
*
* <p>The executor may not be used after the transport terminates. The caller should ensure any
* outstanding tasks are cancelled when the transport terminates.
*/
ScheduledExecutorService getScheduledExecutorService();
} }

View File

@ -17,6 +17,13 @@
package io.grpc.inprocess; package io.grpc.inprocess;
import com.google.common.truth.Truth; import com.google.common.truth.Truth;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
@ -26,9 +33,43 @@ public class InProcessServerTest {
@Test @Test
public void getPort_notStarted() throws Exception { public void getPort_notStarted() throws Exception {
InProcessServer s = new InProcessServer("name"); InProcessServer s = new InProcessServer("name", GrpcUtil.TIMER_SERVICE);
Truth.assertThat(s.getPort()).isEqualTo(-1); Truth.assertThat(s.getPort()).isEqualTo(-1);
} }
@Test
public void serverHoldsRefToScheduler() throws Exception {
final ScheduledExecutorService ses = new FakeClock().getScheduledExecutorService();
class RefCountingObjectPool implements ObjectPool<ScheduledExecutorService> {
private int count;
@Override
public ScheduledExecutorService getObject() {
count++;
return ses;
}
@Override
public ScheduledExecutorService returnObject(Object returned) {
count--;
return null;
}
}
RefCountingObjectPool pool = new RefCountingObjectPool();
InProcessServer s = new InProcessServer("name", pool);
Truth.assertThat(pool.count).isEqualTo(0);
s.start(new ServerListener() {
@Override public ServerTransportListener transportCreated(ServerTransport transport) {
throw new UnsupportedOperationException();
}
@Override public void serverShutdown() {}
});
Truth.assertThat(pool.count).isEqualTo(1);
s.shutdown();
Truth.assertThat(pool.count).isEqualTo(0);
}
} }

View File

@ -17,6 +17,7 @@
package io.grpc.inprocess; package io.grpc.inprocess;
import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.testing.AbstractTransportTest; import io.grpc.internal.testing.AbstractTransportTest;
@ -32,7 +33,7 @@ public class InProcessTransportTest extends AbstractTransportTest {
@Override @Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) { protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
return new InProcessServer(TRANSPORT_NAME); return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE);
} }
@Override @Override

View File

@ -137,8 +137,6 @@ public class ServerImplTest {
}); });
@Mock @Mock
private ObjectPool<Executor> executorPool; private ObjectPool<Executor> executorPool;
@Mock
private ObjectPool<ScheduledExecutorService> timerPool;
private Builder builder = new Builder(); private Builder builder = new Builder();
private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry(); private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry();
private HandlerRegistry fallbackRegistry = mutableFallbackRegistry; private HandlerRegistry fallbackRegistry = mutableFallbackRegistry;
@ -165,7 +163,6 @@ public class ServerImplTest {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
streamTracerFactories = Arrays.asList(streamTracerFactory); streamTracerFactories = Arrays.asList(streamTracerFactory);
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(timerPool.getObject()).thenReturn(timer.getScheduledExecutorService());
when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
.thenReturn(streamTracer); .thenReturn(streamTracer);
} }
@ -203,7 +200,6 @@ public class ServerImplTest {
assertTrue(server.isShutdown()); assertTrue(server.isShutdown());
assertTrue(server.isTerminated()); assertTrue(server.isTerminated());
verifyNoMoreInteractions(executorPool); verifyNoMoreInteractions(executorPool);
verifyNoMoreInteractions(timerPool);
} }
@Test @Test
@ -346,7 +342,6 @@ public class ServerImplTest {
assertSame(ex, e); assertSame(ex, e);
} }
verifyNoMoreInteractions(executorPool); verifyNoMoreInteractions(executorPool);
verifyNoMoreInteractions(timerPool);
} }
@Test @Test
@ -1168,26 +1163,21 @@ public class ServerImplTest {
builder.fallbackHandlerRegistry(fallbackRegistry); builder.fallbackHandlerRegistry(fallbackRegistry);
builder.executorPool = executorPool; builder.executorPool = executorPool;
server = new ServerImpl(builder, timerPool, transportServer, SERVER_CONTEXT); server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
} }
private void verifyExecutorsAcquired() { private void verifyExecutorsAcquired() {
verify(executorPool).getObject(); verify(executorPool).getObject();
verify(timerPool).getObject();
verifyNoMoreInteractions(executorPool); verifyNoMoreInteractions(executorPool);
verifyNoMoreInteractions(timerPool);
} }
private void verifyExecutorsNotReturned() { private void verifyExecutorsNotReturned() {
verify(executorPool, never()).returnObject(any(Executor.class)); verify(executorPool, never()).returnObject(any(Executor.class));
verify(timerPool, never()).returnObject(any(ScheduledExecutorService.class));
} }
private void verifyExecutorsReturned() { private void verifyExecutorsReturned() {
verify(executorPool).returnObject(same(executor.getScheduledExecutorService())); verify(executorPool).returnObject(same(executor.getScheduledExecutorService()));
verify(timerPool).returnObject(same(timer.getScheduledExecutorService()));
verifyNoMoreInteractions(executorPool); verifyNoMoreInteractions(executorPool);
verifyNoMoreInteractions(timerPool);
} }
private void ensureServerStateNotLeaked() { private void ensureServerStateNotLeaked() {
@ -1220,7 +1210,7 @@ public class ServerImplTest {
} }
} }
private static class SimpleServerTransport implements ServerTransport { private class SimpleServerTransport implements ServerTransport {
ServerTransportListener listener; ServerTransportListener listener;
@Override @Override
@ -1237,6 +1227,11 @@ public class ServerImplTest {
public LogId getLogId() { public LogId getLogId() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return timer.getScheduledExecutorService();
}
} }
private static class Builder extends AbstractServerImplBuilder<Builder> { private static class Builder extends AbstractServerImplBuilder<Builder> {

View File

@ -30,6 +30,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -106,6 +107,11 @@ class NettyServerTransport implements ServerTransport {
channel.pipeline().addLast(negotiationHandler); channel.pipeline().addLast(negotiationHandler);
} }
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return channel.eventLoop();
}
@Override @Override
public void shutdown() { public void shutdown() {
if (channel.isOpen()) { if (channel.isOpen()) {