diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 5b88ee5df4..1e47b1e5d0 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -16,12 +16,17 @@ package io.grpc.inprocess; +import com.google.common.annotations.VisibleForTesting; import io.grpc.internal.InternalServer; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder.Resource; +import io.grpc.internal.SharedResourcePool; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; import javax.annotation.concurrent.ThreadSafe; @ThreadSafe @@ -36,14 +41,28 @@ final class InProcessServer implements InternalServer { private final String name; private ServerListener listener; private boolean shutdown; + /** Expected to be a SharedResourcePool except in testing. */ + private final ObjectPool schedulerPool; + /** + * Only used to make sure the scheduler has at least one reference. Since child transports can + * outlive this server, they must get their own reference. + */ + private ScheduledExecutorService scheduler; - InProcessServer(String name) { + InProcessServer(String name, Resource schedulerResource) { + this(name, SharedResourcePool.forResource(schedulerResource)); + } + + @VisibleForTesting + InProcessServer(String name, ObjectPool schedulerPool) { this.name = name; + this.schedulerPool = schedulerPool; } @Override public void start(ServerListener serverListener) throws IOException { this.listener = serverListener; + this.scheduler = schedulerPool.getObject(); // Must be last, as channels can start connecting after this point. if (registry.putIfAbsent(name, this) != null) { throw new IOException("name already registered: " + name); @@ -60,6 +79,7 @@ final class InProcessServer implements InternalServer { if (!registry.remove(name, this)) { throw new AssertionError(); } + scheduler = schedulerPool.returnObject(scheduler); synchronized (this) { shutdown = true; listener.serverShutdown(); @@ -72,4 +92,8 @@ final class InProcessServer implements InternalServer { } return listener.transportCreated(transport); } + + ObjectPool getScheduledExecutorServicePool() { + return schedulerPool; + } } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index ace68fbd13..8f5f8d5f15 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import io.grpc.ExperimentalApi; import io.grpc.ServerStreamTracer; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.GrpcUtil; import java.io.File; import java.util.List; @@ -79,7 +80,7 @@ public final class InProcessServerBuilder // TODO(zhangkun83): InProcessTransport by-passes framer and deframer, thus message sizses are // not counted. Therefore, we disable stats for now. // (https://github.com/grpc/grpc-java/issues/2284) - return new InProcessServer(name); + return new InProcessServer(name, GrpcUtil.TIMER_SERVICE); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 7a2152133d..ec4be39568 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -33,6 +33,7 @@ import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.LogId; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; @@ -45,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.CheckReturnValue; @@ -58,6 +60,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final LogId logId = LogId.allocate(getClass().getName()); private final String name; private final String authority; + private ObjectPool serverSchedulerPool; + private ScheduledExecutorService serverScheduler; private ServerTransportListener serverTransportListener; private Attributes serverStreamAttributes; private ManagedClientTransport.Listener clientTransportListener; @@ -70,10 +74,6 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @GuardedBy("this") private Set streams = new HashSet(); - public InProcessTransport(String name) { - this(name, null); - } - public InProcessTransport(String name, String authority) { this.name = name; this.authority = authority; @@ -85,6 +85,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { + serverSchedulerPool = server.getScheduledExecutorServicePool(); + serverScheduler = serverSchedulerPool.getObject(); + // Must be semi-initialized; past this point, can begin receiving requests serverTransportListener = server.register(this); } if (serverTransportListener == null) { @@ -200,6 +203,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans return Attributes.EMPTY; } + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return serverScheduler; + } + private synchronized void notifyShutdown(Status s) { if (shutdown) { return; @@ -213,6 +221,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans return; } terminated = true; + if (serverScheduler != null) { + serverScheduler = serverSchedulerPool.returnObject(serverScheduler); + } clientTransportListener.transportTerminated(); if (serverTransportListener != null) { serverTransportListener.transportTerminated(); diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 12a972830e..c17e842fc2 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -185,7 +185,6 @@ public abstract class AbstractServerImplBuilder transports = new HashSet(); - private final ObjectPool timeoutServicePool; - private ScheduledExecutorService timeoutService; private final Context rootContext; private final DecompressorRegistry decompressorRegistry; @@ -105,18 +102,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { /** * Construct a server. * - * @param executorPool provides an executor to call methods on behalf of remote clients - * @param registry the primary method registry - * @param fallbackRegistry the secondary method registry, used only if the primary registry - * doesn't have the method + * @param builder builder with configuration for server + * @param transportServer transport server that will create new incoming transports + * @param rootContext context that callbacks for new RPCs should be derived from */ ServerImpl( AbstractServerImplBuilder builder, - ObjectPool timeoutServicePool, InternalServer transportServer, Context rootContext) { this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool"); - this.timeoutServicePool = Preconditions.checkNotNull(timeoutServicePool, "timeoutServicePool"); this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder"); this.fallbackRegistry = Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); @@ -146,7 +140,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { checkState(!shutdown, "Shutting down"); // Start and wait for any port to actually be bound. transportServer.start(new ServerListenerImpl()); - timeoutService = Preconditions.checkNotNull(timeoutServicePool.getObject(), "timeoutService"); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; return this; @@ -297,9 +290,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw new AssertionError("Server already terminated"); } terminated = true; - if (timeoutService != null) { - timeoutService = timeoutServicePool.returnObject(timeoutService); - } if (executor != null) { executor = executorPool.returnObject(executor); } @@ -452,8 +442,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { return baseContext.withCancellation(); } - Context.CancellableContext context = - baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); + Context.CancellableContext context = baseContext.withDeadlineAfter( + timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService()); context.addListener(new Context.CancellationListener() { @Override public void cancelled(Context context) { diff --git a/core/src/main/java/io/grpc/internal/ServerTransport.java b/core/src/main/java/io/grpc/internal/ServerTransport.java index 3f89f01779..27d3da63b3 100644 --- a/core/src/main/java/io/grpc/internal/ServerTransport.java +++ b/core/src/main/java/io/grpc/internal/ServerTransport.java @@ -17,6 +17,7 @@ package io.grpc.internal; import io.grpc.Status; +import java.util.concurrent.ScheduledExecutorService; /** An inbound connection. */ public interface ServerTransport extends WithLogId { @@ -32,4 +33,13 @@ public interface ServerTransport extends WithLogId { * should be closed with the provided {@code reason}. */ void shutdownNow(Status reason); + + /** + * Returns an executor for scheduling provided by the transport. The service should be configured + * to allow cancelled scheduled runnables to be GCed. + * + *

The executor may not be used after the transport terminates. The caller should ensure any + * outstanding tasks are cancelled when the transport terminates. + */ + ScheduledExecutorService getScheduledExecutorService(); } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java index 1b631613c1..3eb15548d1 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessServerTest.java @@ -17,6 +17,13 @@ package io.grpc.inprocess; import com.google.common.truth.Truth; +import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import java.util.concurrent.ScheduledExecutorService; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -26,9 +33,43 @@ public class InProcessServerTest { @Test public void getPort_notStarted() throws Exception { - InProcessServer s = new InProcessServer("name"); + InProcessServer s = new InProcessServer("name", GrpcUtil.TIMER_SERVICE); Truth.assertThat(s.getPort()).isEqualTo(-1); } + + @Test + public void serverHoldsRefToScheduler() throws Exception { + final ScheduledExecutorService ses = new FakeClock().getScheduledExecutorService(); + class RefCountingObjectPool implements ObjectPool { + private int count; + + @Override + public ScheduledExecutorService getObject() { + count++; + return ses; + } + + @Override + public ScheduledExecutorService returnObject(Object returned) { + count--; + return null; + } + } + + RefCountingObjectPool pool = new RefCountingObjectPool(); + InProcessServer s = new InProcessServer("name", pool); + Truth.assertThat(pool.count).isEqualTo(0); + s.start(new ServerListener() { + @Override public ServerTransportListener transportCreated(ServerTransport transport) { + throw new UnsupportedOperationException(); + } + + @Override public void serverShutdown() {} + }); + Truth.assertThat(pool.count).isEqualTo(1); + s.shutdown(); + Truth.assertThat(pool.count).isEqualTo(0); + } } diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index c35f93b09e..c13545470e 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -17,6 +17,7 @@ package io.grpc.inprocess; import io.grpc.ServerStreamTracer; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.testing.AbstractTransportTest; @@ -32,7 +33,7 @@ public class InProcessTransportTest extends AbstractTransportTest { @Override protected InternalServer newServer(List streamTracerFactories) { - return new InProcessServer(TRANSPORT_NAME); + return new InProcessServer(TRANSPORT_NAME, GrpcUtil.TIMER_SERVICE); } @Override diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 4e602a89da..4d2fc56417 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -137,8 +137,6 @@ public class ServerImplTest { }); @Mock private ObjectPool executorPool; - @Mock - private ObjectPool timerPool; private Builder builder = new Builder(); private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry(); private HandlerRegistry fallbackRegistry = mutableFallbackRegistry; @@ -165,7 +163,6 @@ public class ServerImplTest { MockitoAnnotations.initMocks(this); streamTracerFactories = Arrays.asList(streamTracerFactory); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); - when(timerPool.getObject()).thenReturn(timer.getScheduledExecutorService()); when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenReturn(streamTracer); } @@ -203,7 +200,6 @@ public class ServerImplTest { assertTrue(server.isShutdown()); assertTrue(server.isTerminated()); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } @Test @@ -346,7 +342,6 @@ public class ServerImplTest { assertSame(ex, e); } verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } @Test @@ -1168,26 +1163,21 @@ public class ServerImplTest { builder.fallbackHandlerRegistry(fallbackRegistry); builder.executorPool = executorPool; - server = new ServerImpl(builder, timerPool, transportServer, SERVER_CONTEXT); + server = new ServerImpl(builder, transportServer, SERVER_CONTEXT); } private void verifyExecutorsAcquired() { verify(executorPool).getObject(); - verify(timerPool).getObject(); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } private void verifyExecutorsNotReturned() { verify(executorPool, never()).returnObject(any(Executor.class)); - verify(timerPool, never()).returnObject(any(ScheduledExecutorService.class)); } private void verifyExecutorsReturned() { verify(executorPool).returnObject(same(executor.getScheduledExecutorService())); - verify(timerPool).returnObject(same(timer.getScheduledExecutorService())); verifyNoMoreInteractions(executorPool); - verifyNoMoreInteractions(timerPool); } private void ensureServerStateNotLeaked() { @@ -1220,7 +1210,7 @@ public class ServerImplTest { } } - private static class SimpleServerTransport implements ServerTransport { + private class SimpleServerTransport implements ServerTransport { ServerTransportListener listener; @Override @@ -1237,6 +1227,11 @@ public class ServerImplTest { public LogId getLogId() { throw new UnsupportedOperationException(); } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return timer.getScheduledExecutorService(); + } } private static class Builder extends AbstractServerImplBuilder { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index 5df54c71dd..6f2c308d54 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -30,6 +30,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import java.io.IOException; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -106,6 +107,11 @@ class NettyServerTransport implements ServerTransport { channel.pipeline().addLast(negotiationHandler); } + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return channel.eventLoop(); + } + @Override public void shutdown() { if (channel.isOpen()) {