From 994f200d153f3475157ba471ece6877dacf8eb68 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 19 Jul 2017 10:31:26 -0700 Subject: [PATCH] core: Server uses transport's ScheduledExecutorService For Netty, this reduces the number of threads necessary for servers (although until channel is converted, actual number of threads isn't impacted) and naturally reduces contention and timeout latency. For InProcess, this gets us closer to allowing applications to provide all executors, which is especially useful during tests. --- .../io/grpc/inprocess/InProcessServer.java | 26 ++++++++++- .../inprocess/InProcessServerBuilder.java | 3 +- .../io/grpc/inprocess/InProcessTransport.java | 19 ++++++-- .../internal/AbstractServerImplBuilder.java | 1 - .../java/io/grpc/internal/ServerImpl.java | 20 +++------ .../io/grpc/internal/ServerTransport.java | 10 +++++ .../grpc/inprocess/InProcessServerTest.java | 43 ++++++++++++++++++- .../inprocess/InProcessTransportTest.java | 3 +- .../java/io/grpc/internal/ServerImplTest.java | 19 +++----- .../io/grpc/netty/NettyServerTransport.java | 6 +++ 10 files changed, 114 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 5b88ee5df4..1e47b1e5d0 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -16,12 +16,17 @@ package io.grpc.inprocess; +import com.google.common.annotations.VisibleForTesting; import io.grpc.internal.InternalServer; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder.Resource; +import io.grpc.internal.SharedResourcePool; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.concurrent.ThreadSafe; @ThreadSafe @@ -36,14 +41,28 @@ final class InProcessServer implements InternalServer { private final String name; private ServerListener listener; private boolean shutdown; + /** Expected to be a SharedResourcePool except in testing. */ + private final ObjectPool schedulerPool; + /** + * Only used to make sure the scheduler has at least one reference. Since child transports can + * outlive this server, they must get their own reference. + */ + private ScheduledExecutorService scheduler; - InProcessServer(String name) { + InProcessServer(String name, Resource schedulerResource) { + this(name, SharedResourcePool.forResource(schedulerResource)); + } + + @VisibleForTesting + InProcessServer(String name, ObjectPool schedulerPool) { this.name = name; + this.schedulerPool = schedulerPool; } @Override public void start(ServerListener serverListener) throws IOException { this.listener = serverListener; + this.scheduler = schedulerPool.getObject(); // Must be last, as channels can start connecting after this point. if (registry.putIfAbsent(name, this) != null) { throw new IOException("name already registered: " + name); @@ -60,6 +79,7 @@ final class InProcessServer implements InternalServer { if (!registry.remove(name, this)) { throw new AssertionError(); } + scheduler = schedulerPool.returnObject(scheduler); synchronized (this) { shutdown = true; listener.serverShutdown(); @@ -72,4 +92,8 @@ final class InProcessServer implements InternalServer { } return listener.transportCreated(transport); } + + ObjectPool getScheduledExecutorServicePool() { + return schedulerPool; + } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index ace68fbd13..8f5f8d5f15 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import io.grpc.ExperimentalApi; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.GrpcUtil; import java.io.File; import java.util.List; @@ -79,7 +80,7 @@ public final class InProcessServerBuilder // TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are // not counted. Therefore, we disable stats for now. // (https://github.com/grpc/grpc-java/issues/2284) - return new InProcessServer(name); + return new InProcessServer(name, GrpcUtil.TIMER_SERVICE); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 7a2152133d..ec4be39568 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -33,6 +33,7 @@ import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.LogId; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.CheckReturnValue; @@ -58,6 +60,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final LogId logId = LogId.allocate(getClass().getName()); private final String name; private final String authority; + private ObjectPool serverSchedulerPool; + private ScheduledExecutorService serverScheduler; private ServerTransportListener serverTransportListener; private Attributes serverStreamAttributes; private ManagedClientTransport.Listener clientTransportListener; @@ -70,10 +74,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @GuardedBy("this") private Set streams = new HashSet(); - public InProcessTransport(String name) { - this(name, null); - } - public InProcessTransport(String name, String authority) { this.name = name; this.authority = authority; @@ -85,6 +85,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { + serverSchedulerPool = server.getScheduledExecutorServicePool(); + serverScheduler = serverSchedulerPool.getObject(); + // Must be semi-initialized; past this point, can begin receiving requests serverTransportListener = server.register(this); } if (serverTransportListener == null) { @@ -200,6 +203,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans return Attributes.EMPTY; } + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return serverScheduler; + } + private synchronized void notifyShutdown(Status s) { if (shutdown) { return; @@ -213,6 +221,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans return; } terminated = true; + if (serverScheduler != null) { + serverScheduler = serverSchedulerPool.returnObject(serverScheduler); + } clientTransportListener.transportTerminated(); if (serverTransportListener != null) { serverTransportListener.transportTerminated(); diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 12a972830e..c17e842fc2 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -185,7 +185,6 @@ public abstract class AbstractServerImplBuilder transports = new HashSet(); - private final ObjectPool timeoutServicePool; - private ScheduledExecutorService timeoutService; private final Context rootContext; private final DecompressorRegistry decompressorRegistry; @@ -105,18 +102,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { /** * Construct a server. * - * @param executorPool provides an executor to call methods on behalf of remote clients - * @param registry the primary method registry - * @param fallbackRegistry the secondary method registry, used only if the primary registry - * doesn't have the method + * @param builder builder with configuration for server + * @param transportServer transport server that will create new incoming transports + * @param rootContext context that callbacks for new RPCs should be derived from */ ServerImpl( AbstractServerImplBuilder builder, - ObjectPool timeoutServicePool, InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); - this.timeoutServicePool = Preconditions.checkNotNull(timeoutServicePool, "timeoutServicePool"); this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); @@ -146,7 +140,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { checkState(!shutdown, "Shutting down"); // Start and wait for any port to actually be bound. transportServer.start(new ServerListenerImpl()); - timeoutService = Preconditions.checkNotNull(timeoutServicePool.getObject(), "timeoutService"); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; return this; @@ -297,9 +290,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw new AssertionError("Server already terminated"); } terminated = true; - if (timeoutService != null) { - timeoutService = timeoutServicePool.returnObject(timeoutService); - } if (executor != null) { executor = executorPool.returnObject(executor); } @@ -452,8 +442,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { return baseContext.withCancellation(); } - Context.CancellableContext context = - baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); + Context.CancellableContext context = baseContext.withDeadlineAfter( + timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService()); context.addListener(new Context.CancellationListener() { @Override public void cancelled(Context context) { diff --git a/core/src/main/java/io/grpc/internal/ServerTransport.java b/core/src/main/java/io/grpc/internal/ServerTransport.java index 3f89f01779..27d3da63b3 100644 --- a/core/src/main/java/io/grpc/internal/ServerTransport.java +++ b/core/src/main/java/io/grpc/internal/ServerTransport.java @@ -17,6 +17,7 @@ package io.grpc.internal; import io.grpc.Status; +import java.util.concurrent.ScheduledExecutorService; /** An inbound connection. */ public interface ServerTransport extends WithLogId { @@ -32,4 +33,13 @@ public interface ServerTransport extends WithLogId { * should be closed with the provided {@code reason}. */ void shutdownNow(Status reason); + + /** + * Returns an executor for scheduling provided by the transport. The service should be configured + * to allow cancelled scheduled runnables to be GCed. + * + *

The executor may not be used after the transport terminates. The caller should ensure any + * outstanding tasks are cancelled when the transport terminates. + */ + ScheduledExecutorService getScheduledExecutorService(); } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java index 1b631613c1..3eb15548d1 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java @@ -17,6 +17,13 @@ package io.grpc.inprocess; import com.google.common.truth.Truth; +import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -26,9 +33,43 @@ public class InProcessServerTest { @Test public void getPort_notStarted() throws Exception { - InProcessServer s = new InProcessServer("name"); + InProcessServer s = new InProcessServer("name", GrpcUtil.TIMER_SERVICE); Truth.assertThat(s.getPort()).isEqualTo(-1); } + + @Test + public void serverHoldsRefToScheduler() throws Exception { + final ScheduledExecutorService ses = new FakeClock().getScheduledExecutorService(); + class RefCountingObjectPool implements ObjectPool { + private int count; + + @Override + public ScheduledExecutorService getObject() { + count++; + return ses; + } + + @Override + public ScheduledExecutorService returnObject(Object returned) { + count--; + return null; + } + } + + RefCountingObjectPool pool = new RefCountingObjectPool(); + InProcessServer s = new InProcessServer("name", pool); + Truth.assertThat(pool.count).isEqualTo(0); + s.start(new ServerListener() { + @Override public ServerTransportListener transportCreated(ServerTransport transport) { + throw new UnsupportedOperationException(); + } + + @Override public void serverShutdown() {} + }); + Truth.assertThat(pool.count).isEqualTo(1); + s.shutdown(); + Truth.assertThat(pool.count).isEqualTo(0); + } } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index c35f93b09e..c13545470e 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import io.grpc.ServerStreamTracer; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.testing.AbstractTransportTest; @@ -32,7 +33,7 @@ public class InProcessTransportTest extends AbstractTransportTest { @Override protected InternalServer newServer(List streamTracerFactories) { - return new InProcessServer(TRANSPORT_NAME); + return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 4e602a89da..4d2fc56417 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -137,8 +137,6 @@ public class ServerImplTest { }); @Mock private ObjectPool executorPool; - @Mock - private ObjectPool timerPool; private Builder builder = new Builder(); private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry(); private HandlerRegistry fallbackRegistry = mutableFallbackRegistry; @@ -165,7 +163,6 @@ public class ServerImplTest { MockitoAnnotations.initMocks(this); streamTracerFactories = Arrays.asList(streamTracerFactory); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); - when(timerPool.getObject()).thenReturn(timer.getScheduledExecutorService()); when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenReturn(streamTracer); } @@ -203,7 +200,6 @@ public class ServerImplTest { assertTrue(server.isShutdown()); assertTrue(server.isTerminated()); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } @Test @@ -346,7 +342,6 @@ public class ServerImplTest { assertSame(ex, e); } verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } @Test @@ -1168,26 +1163,21 @@ public class ServerImplTest { builder.fallbackHandlerRegistry(fallbackRegistry); builder.executorPool = executorPool; - server = new ServerImpl(builder, timerPool, transportServer, SERVER_CONTEXT); + server = new ServerImpl(builder, transportServer, SERVER_CONTEXT); } private void verifyExecutorsAcquired() { verify(executorPool).getObject(); - verify(timerPool).getObject(); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } private void verifyExecutorsNotReturned() { verify(executorPool, never()).returnObject(any(Executor.class)); - verify(timerPool, never()).returnObject(any(ScheduledExecutorService.class)); } private void verifyExecutorsReturned() { verify(executorPool).returnObject(same(executor.getScheduledExecutorService())); - verify(timerPool).returnObject(same(timer.getScheduledExecutorService())); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } private void ensureServerStateNotLeaked() { @@ -1220,7 +1210,7 @@ public class ServerImplTest { } } - private static class SimpleServerTransport implements ServerTransport { + private class SimpleServerTransport implements ServerTransport { ServerTransportListener listener; @Override @@ -1237,6 +1227,11 @@ public class ServerImplTest { public LogId getLogId() { throw new UnsupportedOperationException(); } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return timer.getScheduledExecutorService(); + } } private static class Builder extends AbstractServerImplBuilder { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 5df54c71dd..6f2c308d54 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -30,6 +30,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import java.io.IOException; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -106,6 +107,11 @@ class NettyServerTransport implements ServerTransport { channel.pipeline().addLast(negotiationHandler); } + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return channel.eventLoop(); + } + @Override public void shutdown() { if (channel.isOpen()) {