From c436d93f07d147f04a3953182f9a532b6a52870d Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 13 Jan 2017 09:00:32 -0800 Subject: [PATCH] core: ServerImpl returns shared resources at termination (#2605) Previously it does it at shutdown, which was wrong because executor may still be used before the server is terminated. Resolves #2034 Uses ObjectPool to make this change testable. Cleans up test and makes it mostly single-threaded, except for two deadlock tests that have to be multi-threaded. --- .../internal/AbstractServerImplBuilder.java | 21 +- .../java/io/grpc/internal/ObjectPool.java | 3 + .../java/io/grpc/internal/ServerImpl.java | 31 +-- .../java/io/grpc/internal/ServerImplTest.java | 252 +++++++++--------- 4 files changed, 170 insertions(+), 137 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index e14d64f29c..78f2ca0a24 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -165,7 +165,8 @@ public abstract class AbstractServerImplBuilder getExecutorPool() { + final Executor savedExecutor = executor; + if (savedExecutor == null) { + return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR); + } + return new ObjectPool() { + @Override + public Executor getObject() { + return savedExecutor; + } + + @Override + public Executor returnObject(Object object) { + return null; + } + }; + } + /** * Children of AbstractServerBuilder should override this method to provide transport specific * information for the server. This method is mean for Transport implementors and should not be diff --git a/core/src/main/java/io/grpc/internal/ObjectPool.java b/core/src/main/java/io/grpc/internal/ObjectPool.java index 20ae7191e4..b66e101e5f 100644 --- a/core/src/main/java/io/grpc/internal/ObjectPool.java +++ b/core/src/main/java/io/grpc/internal/ObjectPool.java @@ -31,6 +31,9 @@ package io.grpc.internal; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe public interface ObjectPool { /** * Get an object from the pool. diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 8c22bbb666..1b34d9d14c 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -36,7 +36,6 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.grpc.Contexts.statusFromCancelled; import static io.grpc.Status.DEADLINE_EXCEEDED; import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; -import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.base.Preconditions; @@ -64,7 +63,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -88,10 +86,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); private final LogId logId = LogId.allocate(getClass().getName()); + private final ObjectPool executorPool; /** Executor for application processing. Safe to read after {@link #start()}. */ private Executor executor; - /** Safe to read after {@link #start()}. */ - private boolean usingSharedExecutor; private final InternalHandlerRegistry registry; private final HandlerRegistry fallbackRegistry; private final List transportFilters; @@ -111,7 +108,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { @GuardedBy("lock") private final Collection transports = new HashSet(); - private final ScheduledExecutorService timeoutService = SharedResourceHolder.get(TIMER_SERVICE); + private final ObjectPool timeoutServicePool; + private ScheduledExecutorService timeoutService; private final Context rootContext; private final DecompressorRegistry decompressorRegistry; @@ -126,12 +124,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { * doesn't have the method * @param executor to call methods on behalf of remote clients */ - ServerImpl(Executor executor, InternalHandlerRegistry registry, HandlerRegistry fallbackRegistry, + ServerImpl(ObjectPool executorPool, + ObjectPool timeoutServicePool, + InternalHandlerRegistry registry, HandlerRegistry fallbackRegistry, InternalServer transportServer, Context rootContext, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, List transportFilters, StatsContextFactory statsFactory, Supplier stopwatchSupplier) { - this.executor = executor; + this.executorPool = Preconditions.checkNotNull(executorPool, "executorPool"); + this.timeoutServicePool = Preconditions.checkNotNull(timeoutServicePool, "timeoutServicePool"); this.registry = Preconditions.checkNotNull(registry, "registry"); this.fallbackRegistry = Preconditions.checkNotNull(fallbackRegistry, "fallbackRegistry"); this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); @@ -158,12 +159,10 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); - usingSharedExecutor = executor == null; - if (usingSharedExecutor) { - executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR); - } // Start and wait for any port to actually be bound. transportServer.start(new ServerListenerImpl()); + timeoutService = Preconditions.checkNotNull(timeoutServicePool.getObject(), "timeoutService"); + executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; return this; } @@ -214,10 +213,6 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { if (shutdownTransportServer) { transportServer.shutdown(); } - SharedResourceHolder.release(TIMER_SERVICE, timeoutService); - if (usingSharedExecutor) { - SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, (ExecutorService) executor); - } return this; } @@ -307,6 +302,12 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { throw new AssertionError("Server already terminated"); } terminated = true; + if (timeoutService != null) { + timeoutService = timeoutServicePool.returnObject(timeoutService); + } + if (executor != null) { + executor = executorPool.returnObject(executor); + } // TODO(carl-mastrangelo): move this outside the synchronized block. lock.notifyAll(); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index aceb08ea64..dda7d17a6f 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -46,7 +46,7 @@ import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -99,13 +99,11 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.SocketAddress; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; +import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -133,13 +131,17 @@ public class ServerImplTest { SERVER_CONTEXT.cancel(null); } - private ExecutorService executor = Executors.newSingleThreadExecutor(); + private final FakeClock executor = new FakeClock(); + private final FakeClock timer = new FakeClock(); + @Mock + private ObjectPool executorPool; + @Mock + private ObjectPool timerPool; private InternalHandlerRegistry registry = new InternalHandlerRegistry.Builder().build(); - private MutableHandlerRegistry fallbackRegistry = new MutableHandlerRegistry(); + private MutableHandlerRegistry mutableFallbackRegistry = new MutableHandlerRegistry(); + private HandlerRegistry fallbackRegistry = mutableFallbackRegistry; private SimpleServer transportServer = new SimpleServer(); - private ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); + private ServerImpl server; @Captor private ArgumentCaptor statusCaptor; @@ -157,14 +159,14 @@ public class ServerImplTest { @Before public void startUp() throws IOException { MockitoAnnotations.initMocks(this); - - server.start(); + when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); + when(timerPool.getObject()).thenReturn(timer.getScheduledExecutorService()); } - /** Tear down after test. */ @After - public void tearDown() { - executor.shutdownNow(); + public void noPendingTasks() { + assertEquals(0, executor.numPendingTasks()); + assertEquals(0, timer.numPendingTasks()); } @Test @@ -173,10 +175,7 @@ public class ServerImplTest { @Override public void shutdown() {} }; - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); server.shutdown(); assertTrue(server.isShutdown()); assertFalse(server.isTerminated()); @@ -185,27 +184,25 @@ public class ServerImplTest { } @Test - public void stopImmediate() { + public void stopImmediate() throws IOException { transportServer = new SimpleServer() { @Override public void shutdown() { throw new AssertionError("Should not be called, because wasn't started"); } }; - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); + createServer(NO_FILTERS); server.shutdown(); assertTrue(server.isShutdown()); assertTrue(server.isTerminated()); + verifyNoMoreInteractions(executorPool); + verifyNoMoreInteractions(timerPool); } @Test public void startStopImmediateWithChildTransport() throws IOException { - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); + verifyExecutorsAcquired(); class DelayedShutdownServerTransport extends SimpleServerTransport { boolean shutdown; @@ -221,16 +218,17 @@ public class ServerImplTest { assertTrue(server.isShutdown()); assertFalse(server.isTerminated()); assertTrue(serverTransport.shutdown); + verifyExecutorsNotReturned(); + serverTransport.listener.transportTerminated(); assertTrue(server.isTerminated()); + verifyExecutorsReturned(); } @Test public void startShutdownNowImmediateWithChildTransport() throws IOException { - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); + verifyExecutorsAcquired(); class DelayedShutdownServerTransport extends SimpleServerTransport { boolean shutdown; @@ -249,16 +247,17 @@ public class ServerImplTest { assertTrue(server.isShutdown()); assertFalse(server.isTerminated()); assertTrue(serverTransport.shutdown); + verifyExecutorsNotReturned(); + serverTransport.listener.transportTerminated(); assertTrue(server.isTerminated()); + verifyExecutorsReturned(); } @Test public void shutdownNowAfterShutdown() throws IOException { - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); + verifyExecutorsAcquired(); class DelayedShutdownServerTransport extends SimpleServerTransport { boolean shutdown; @@ -278,22 +277,23 @@ public class ServerImplTest { server.shutdownNow(); assertFalse(server.isTerminated()); assertTrue(serverTransport.shutdown); + verifyExecutorsNotReturned(); + serverTransport.listener.transportTerminated(); assertTrue(server.isTerminated()); + verifyExecutorsReturned(); } @Test public void shutdownNowAfterSlowShutdown() throws IOException { - SimpleServer transportServer = new SimpleServer() { + transportServer = new SimpleServer() { @Override public void shutdown() { // Don't call super which calls listener.serverShutdown(). We'll call it manually. } }; - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); + verifyExecutorsAcquired(); class DelayedShutdownServerTransport extends SimpleServerTransport { boolean shutdown; @@ -313,7 +313,10 @@ public class ServerImplTest { transportServer.listener.serverShutdown(); assertTrue(server.isShutdown()); assertFalse(server.isTerminated()); + + verifyExecutorsNotReturned(); serverTransport.listener.transportTerminated(); + verifyExecutorsReturned(); assertTrue(server.isTerminated()); } @@ -327,19 +330,21 @@ public class ServerImplTest { } } - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, - new FailingStartupServer(), SERVER_CONTEXT, decompressorRegistry, compressorRegistry, - NO_FILTERS, statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER); + transportServer = new FailingStartupServer(); + createServer(NO_FILTERS); try { server.start(); fail("expected exception"); } catch (IOException e) { assertSame(ex, e); } + verifyNoMoreInteractions(executorPool); + verifyNoMoreInteractions(timerPool); } @Test public void methodNotFound() throws Exception { + createAndStartServer(NO_FILTERS); ServerTransportListener transportListener = transportServer.registerNewServerTransport(new SimpleServerTransport()); Metadata requestHeaders = new Metadata(); @@ -350,7 +355,7 @@ public class ServerImplTest { verify(stream).setListener(isA(ServerStreamListener.class)); verify(stream, atLeast(1)).statsTraceContext(); - executeBarrier(executor).await(); + assertEquals(1, executor.runDueTasks()); verify(stream).close(statusCaptor.capture(), any(Metadata.class)); Status status = statusCaptor.getValue(); assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); @@ -368,6 +373,7 @@ public class ServerImplTest { @Test public void basicExchangeSuccessful() throws Exception { + createAndStartServer(NO_FILTERS); final Metadata.Key metadataKey = Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER); final Metadata.Key statsHeaderKey @@ -376,7 +382,7 @@ public class ServerImplTest { = new AtomicReference>(); MethodDescriptor method = MethodDescriptor.create( MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER); - fallbackRegistry.addService(ServerServiceDefinition.builder( + mutableFallbackRegistry.addService(ServerServiceDefinition.builder( new ServiceDescriptor("Waiter", method)) .addMethod( method, @@ -412,13 +418,14 @@ public class ServerImplTest { assertNotNull(streamListener); verify(stream, atLeast(1)).statsTraceContext(); - executeBarrier(executor).await(); + assertEquals(1, executor.runDueTasks()); ServerCall call = callReference.get(); assertNotNull(call); String order = "Lots of pizza, please"; streamListener.messageRead(STRING_MARSHALLER.stream(order)); - verify(callListener, timeout(2000)).onMessage(order); + assertEquals(1, executor.runDueTasks()); + verify(callListener).onMessage(order); Metadata responseHeaders = new Metadata(); responseHeaders.put(metadataKey, "response value"); @@ -433,7 +440,7 @@ public class ServerImplTest { assertEquals(314, INTEGER_MARSHALLER.parse(inputCaptor.getValue()).intValue()); streamListener.halfClosed(); // All full; no dessert. - executeBarrier(executor).await(); + assertEquals(1, executor.runDueTasks()); verify(callListener).onHalfClose(); call.sendMessage(50); @@ -448,7 +455,7 @@ public class ServerImplTest { verify(stream).close(status, trailers); streamListener.closed(Status.OK); - executeBarrier(executor).await(); + assertEquals(1, executor.runDueTasks()); verify(callListener).onComplete(); verify(stream, atLeast(1)).statsTraceContext(); @@ -540,10 +547,7 @@ public class ServerImplTest { .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, remoteAddr) .build(); - ServerImpl server = new ServerImpl(MoreExecutors.directExecutor(), registry, fallbackRegistry, - transportServer, SERVER_CONTEXT, decompressorRegistry, compressorRegistry, - ImmutableList.of(filter1, filter2), statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(ImmutableList.of(filter1, filter2)); ServerTransportListener transportListener = transportServer.registerNewServerTransport(new SimpleServerTransport()); Attributes transportAttrs = transportListener.transportReady(Attributes.newBuilder() @@ -562,12 +566,12 @@ public class ServerImplTest { @Test public void exceptionInStartCallPropagatesToStream() throws Exception { - CyclicBarrier barrier = executeBarrier(executor); + createAndStartServer(NO_FILTERS); final Status status = Status.ABORTED.withDescription("Oh, no!"); MethodDescriptor method = MethodDescriptor .create(MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER); - fallbackRegistry.addService(ServerServiceDefinition.builder( + mutableFallbackRegistry.addService(ServerServiceDefinition.builder( new ServiceDescriptor("Waiter", method)) .addMethod(method, new ServerCallHandler() { @@ -594,8 +598,7 @@ public class ServerImplTest { verify(stream, atLeast(1)).statsTraceContext(); verifyNoMoreInteractions(stream); - barrier.await(); - executeBarrier(executor).await(); + assertEquals(1, executor.runDueTasks()); verify(stream).close(same(status), notNull(Metadata.class)); verify(stream, atLeast(1)).statsTraceContext(); verifyNoMoreInteractions(stream); @@ -622,10 +625,7 @@ public class ServerImplTest { } transportServer = new MaybeDeadlockingServer(); - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); new Thread() { @Override public void run() { @@ -645,6 +645,7 @@ public class ServerImplTest { @Test public void testNoDeadlockOnTransportShutdown() throws Exception { + createAndStartServer(NO_FILTERS); final Object lock = new Object(); final CyclicBarrier barrier = new CyclicBarrier(2); class MaybeDeadlockingServerTransport extends SimpleServerTransport { @@ -685,13 +686,14 @@ public class ServerImplTest { @Test public void testCallContextIsBoundInListenerCallbacks() throws Exception { + createAndStartServer(NO_FILTERS); MethodDescriptor method = MethodDescriptor.create( MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER); - final CountDownLatch onReadyCalled = new CountDownLatch(1); - final CountDownLatch onMessageCalled = new CountDownLatch(1); - final CountDownLatch onHalfCloseCalled = new CountDownLatch(1); - final CountDownLatch onCancelCalled = new CountDownLatch(1); - fallbackRegistry.addService(ServerServiceDefinition.builder( + final AtomicBoolean onReadyCalled = new AtomicBoolean(false); + final AtomicBoolean onMessageCalled = new AtomicBoolean(false); + final AtomicBoolean onHalfCloseCalled = new AtomicBoolean(false); + final AtomicBoolean onCancelCalled = new AtomicBoolean(false); + mutableFallbackRegistry.addService(ServerServiceDefinition.builder( new ServiceDescriptor("Waiter", method)) .addMethod( method, @@ -710,25 +712,25 @@ public class ServerImplTest { @Override public void onReady() { checkContext(); - onReadyCalled.countDown(); + onReadyCalled.set(true); } @Override public void onMessage(String message) { checkContext(); - onMessageCalled.countDown(); + onMessageCalled.set(true); } @Override public void onHalfClose() { checkContext(); - onHalfCloseCalled.countDown(); + onHalfCloseCalled.set(true); } @Override public void onCancel() { checkContext(); - onCancelCalled.countDown(); + onCancelCalled.set(true); } @Override @@ -758,14 +760,20 @@ public class ServerImplTest { assertNotNull(streamListener); streamListener.onReady(); - streamListener.messageRead(new ByteArrayInputStream(new byte[0])); - streamListener.halfClosed(); - streamListener.closed(Status.CANCELLED); + assertEquals(1, executor.runDueTasks()); + assertTrue(onReadyCalled.get()); - assertTrue(onReadyCalled.await(5, TimeUnit.SECONDS)); - assertTrue(onMessageCalled.await(5, TimeUnit.SECONDS)); - assertTrue(onHalfCloseCalled.await(5, TimeUnit.SECONDS)); - assertTrue(onCancelCalled.await(5, TimeUnit.SECONDS)); + streamListener.messageRead(new ByteArrayInputStream(new byte[0])); + assertEquals(1, executor.runDueTasks()); + assertTrue(onMessageCalled.get()); + + streamListener.halfClosed(); + assertEquals(1, executor.runDueTasks()); + assertTrue(onHalfCloseCalled.get()); + + streamListener.closed(Status.CANCELLED); + assertEquals(1, executor.runDueTasks()); + assertTrue(onCancelCalled.get()); // Close should never be called if asserts in listener pass. verify(stream, times(0)).close(isA(Status.class), isNotNull(Metadata.class)); @@ -773,14 +781,15 @@ public class ServerImplTest { @Test public void testClientCancelTriggersContextCancellation() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); + createAndStartServer(NO_FILTERS); + final AtomicBoolean contextCancelled = new AtomicBoolean(false); callListener = new ServerCall.Listener() { @Override public void onReady() { Context.current().addListener(new Context.CancellationListener() { @Override public void cancelled(Context context) { - latch.countDown(); + contextCancelled.set(true); } }, MoreExecutors.directExecutor()); } @@ -790,7 +799,7 @@ public class ServerImplTest { = new AtomicReference>(); MethodDescriptor method = MethodDescriptor.create( MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER); - fallbackRegistry.addService(ServerServiceDefinition.builder( + mutableFallbackRegistry.addService(ServerServiceDefinition.builder( new ServiceDescriptor("Waiter", method)) .addMethod(method, new ServerCallHandler() { @@ -817,7 +826,8 @@ public class ServerImplTest { streamListener.onReady(); streamListener.closed(Status.CANCELLED); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(1, executor.runDueTasks()); + assertTrue(contextCancelled.get()); } @Test @@ -828,10 +838,7 @@ public class ServerImplTest { return 65535; } }; - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); Truth.assertThat(server.getPort()).isEqualTo(65535); } @@ -839,9 +846,7 @@ public class ServerImplTest { @Test public void getPortBeforeStartedFails() { transportServer = new SimpleServer(); - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); + createServer(NO_FILTERS); thrown.expect(IllegalStateException.class); thrown.expectMessage("started"); server.getPort(); @@ -850,10 +855,7 @@ public class ServerImplTest { @Test public void getPortAfterTerminationFails() throws Exception { transportServer = new SimpleServer(); - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); server.shutdown(); server.awaitTermination(); thrown.expect(IllegalStateException.class); @@ -863,7 +865,7 @@ public class ServerImplTest { @Test public void handlerRegistryPriorities() throws Exception { - HandlerRegistry fallbackRegistry = mock(HandlerRegistry.class); + fallbackRegistry = mock(HandlerRegistry.class); MethodDescriptor method1 = MethodDescriptor.create( MethodType.UNKNOWN, "Service1/Method1", STRING_MARSHALLER, INTEGER_MARSHALLER); registry = new InternalHandlerRegistry.Builder() @@ -871,10 +873,7 @@ public class ServerImplTest { .addMethod(method1, callHandler).build()) .build(); transportServer = new SimpleServer(); - ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer, - SERVER_CONTEXT, decompressorRegistry, compressorRegistry, NO_FILTERS, statsCtxFactory, - GrpcUtil.STOPWATCH_SUPPLIER); - server.start(); + createAndStartServer(NO_FILTERS); ServerTransportListener transportListener = transportServer.registerNewServerTransport(new SimpleServerTransport()); @@ -886,37 +885,48 @@ public class ServerImplTest { // This call will be handled by callHandler from the internal registry transportListener.streamCreated(stream, "Service1/Method1", requestHeaders); + assertEquals(1, executor.runDueTasks()); + verify(callHandler).startCall(Matchers.>anyObject(), + Matchers.anyObject()); // This call will be handled by the fallbackRegistry because it's not registred in the internal // registry. transportListener.streamCreated(stream, "Service1/Method2", requestHeaders); + assertEquals(1, executor.runDueTasks()); + verify(fallbackRegistry).lookupMethod("Service1/Method2", null); - verify(callHandler, timeout(2000)).startCall(Matchers.>anyObject(), - Matchers.anyObject()); - verify(fallbackRegistry, timeout(2000)).lookupMethod("Service1/Method2", null); verifyNoMoreInteractions(callHandler); verifyNoMoreInteractions(fallbackRegistry); } - /** - * Useful for plugging a single-threaded executor from processing tasks, or for waiting until a - * single-threaded executor has processed queued tasks. - */ - private static CyclicBarrier executeBarrier(Executor executor) { - final CyclicBarrier barrier = new CyclicBarrier(2); - executor.execute(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ex); - } catch (BrokenBarrierException ex) { - throw new RuntimeException(ex); - } - } - }); - return barrier; + private void createAndStartServer(List filters) throws IOException { + createServer(filters); + server.start(); + } + + private void createServer(List filters) { + assertNull(server); + server = new ServerImpl(executorPool, timerPool, registry, fallbackRegistry, + transportServer, SERVER_CONTEXT, decompressorRegistry, compressorRegistry, filters, + statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER); + } + + private void verifyExecutorsAcquired() { + verify(executorPool).getObject(); + verify(timerPool).getObject(); + verifyNoMoreInteractions(executorPool); + verifyNoMoreInteractions(timerPool); + } + + private void verifyExecutorsNotReturned() { + verify(executorPool, never()).returnObject(any(Executor.class)); + verify(timerPool, never()).returnObject(any(ScheduledExecutorService.class)); + } + + private void verifyExecutorsReturned() { + verify(executorPool).returnObject(same(executor.getScheduledExecutorService())); + verify(timerPool).returnObject(same(timer.getScheduledExecutorService())); + verifyNoMoreInteractions(executorPool); + verifyNoMoreInteractions(timerPool); } private static class SimpleServer implements io.grpc.internal.InternalServer {