From 6e94cf33db75b652b42fae77d36a4bdd55e9a9dd Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Sat, 23 Jan 2016 09:32:41 -0800 Subject: [PATCH] Require a ScheduledExecutorService to be provided to Context Before the service would be leaked, because when the scheduled future was cancelled the scheduler wouldn't be released. Also, Future isn't powerful enough to signal when we should release when cancelling. Given the nature of Context, it also seems beneficial to not have it own threads. Since any caller of withDeadline*() is required to cancel the Context, it shouldn't be too burdensome for them to manage the lifecycle of the scheduler. --- core/src/main/java/io/grpc/Context.java | 54 +++++---------------- core/src/test/java/io/grpc/ContextTest.java | 53 +++++--------------- 2 files changed, 24 insertions(+), 83 deletions(-) diff --git a/core/src/main/java/io/grpc/Context.java b/core/src/main/java/io/grpc/Context.java index 1a770fac2f..8f2a5305b4 100644 --- a/core/src/main/java/io/grpc/Context.java +++ b/core/src/main/java/io/grpc/Context.java @@ -33,16 +33,12 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import io.grpc.internal.SharedResourceHolder; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -114,33 +110,6 @@ public class Context { private static final Logger LOG = Logger.getLogger(Context.class.getName()); - /** - * Use a shared resource to retain the {@link ScheduledExecutorService} used to - * implement deadline based context cancellation. This allows the executor to be - * shutdown if its not in use thereby allowing Context to be unloaded. - */ - static final SharedResourceHolder.Resource SCHEDULER = - new SharedResourceHolder.Resource() { - private static final String name = "context-scheduler"; - @Override - public ScheduledExecutorService create() { - return Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(name + "-%d") - .setDaemon(true) - .build()); - } - - @Override - public void close(ScheduledExecutorService instance) { - instance.shutdown(); - } - - @Override - public String toString() { - return name; - } - }; - private static final Object[][] EMPTY_ENTRIES = new Object[0][2]; /** @@ -268,8 +237,9 @@ public class Context { * a received existing deadline. When establishing a new deadline, {@link #withDeadlineAfter} * is the better mechanism. */ - public CancellableContext withDeadlineNanoTime(long deadlineNanoTime) { - return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS); + public CancellableContext withDeadlineNanoTime(long deadlineNanoTime, + ScheduledExecutorService scheduler) { + return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS, scheduler); } /** @@ -280,7 +250,7 @@ public class Context { *

Sample usage: *

    *   Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5,
-   *       TimeUnit.SECONDS);
+   *       TimeUnit.SECONDS, scheduler);
    *   executorService.execute(withDeadline.wrap(new Runnable() {
    *     public void run() {
    *       Context current = Context.current();
@@ -291,10 +261,12 @@ public class Context {
    *   });
    * 
*/ - public CancellableContext withDeadlineAfter(long duration, TimeUnit unit) { + public CancellableContext withDeadlineAfter(long duration, TimeUnit unit, + ScheduledExecutorService scheduler) { Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0"); Preconditions.checkNotNull(unit, "unit"); - return new CancellableContext(this, unit.toNanos(duration)); + Preconditions.checkNotNull(scheduler, "scheduler"); + return new CancellableContext(this, unit.toNanos(duration), scheduler); } /** @@ -611,17 +583,13 @@ public class Context { /** * Create a cancellable context that has a deadline. */ - private CancellableContext(Context parent, long delayNanos) { + private CancellableContext(Context parent, long delayNanos, + ScheduledExecutorService scheduler) { this(parent); - final ScheduledExecutorService scheduler = SharedResourceHolder.get(SCHEDULER); scheduledFuture = scheduler.schedule(new Runnable() { @Override public void run() { - try { - cancel(new TimeoutException("context timed out")); - } finally { - SharedResourceHolder.release(SCHEDULER, scheduler); - } + cancel(new TimeoutException("context timed out")); } }, delayNanos, TimeUnit.NANOSECONDS); } diff --git a/core/src/test/java/io/grpc/ContextTest.java b/core/src/test/java/io/grpc/ContextTest.java index 19853bd076..77e52ebdc7 100644 --- a/core/src/test/java/io/grpc/ContextTest.java +++ b/core/src/test/java/io/grpc/ContextTest.java @@ -43,8 +43,6 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import io.grpc.internal.SharedResourceHolder; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,9 +55,9 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.Future; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -97,6 +95,7 @@ public class ContextTest { observed = Context.current(); } }; + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); @Before public void setUp() throws Exception { @@ -105,6 +104,7 @@ public class ContextTest { @After public void tearDown() throws Exception { + scheduler.shutdown(); } @Test @@ -611,7 +611,7 @@ public class ContextTest { @Test public void absoluteDeadlineTriggersAndPropagates() throws Exception { Context base = Context.current().withDeadlineNanoTime(System.nanoTime() - + TimeUnit.SECONDS.toNanos(1)); + + TimeUnit.SECONDS.toNanos(1), scheduler); Context child = base.withValue(FOOD, "lasagna"); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); @@ -626,12 +626,12 @@ public class ContextTest { @Test(expected = IllegalArgumentException.class) public void negativeDeadlineFails() { - Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS); + Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS, scheduler); } @Test public void relativeDeadlineTriggersAndPropagates() throws Exception { - Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS); + Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS, scheduler); Context child = base.withValue(FOOD, "lasagna"); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); @@ -646,8 +646,8 @@ public class ContextTest { @Test public void innerDeadlineCompletesBeforeOuter() throws Exception { - Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS); - Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS); + Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS, scheduler); + Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); assertFalse(child.isCancelled()); @@ -667,47 +667,20 @@ public class ContextTest { @Test public void cancellationCancelsScheduledTask() { - ScheduledExecutorService service = SharedResourceHolder.get(Context.SCHEDULER); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); try { - ThreadPoolExecutor executor = (ThreadPoolExecutor) service; - executor.purge(); assertEquals(0, executor.getQueue().size()); - Context.CancellableContext base = Context.current().withDeadlineAfter(1, TimeUnit.DAYS); + Context.CancellableContext base + = Context.current().withDeadlineAfter(1, TimeUnit.DAYS, executor); assertEquals(1, executor.getQueue().size()); base.cancel(null); executor.purge(); assertEquals(0, executor.getQueue().size()); } finally { - SharedResourceHolder.release(Context.SCHEDULER, service); + executor.shutdown(); } } - @Test - public void testScheduler() throws Exception { - assertEquals("context-scheduler", Context.SCHEDULER.toString()); - ScheduledExecutorService service = Context.SCHEDULER.create(); - try { - assertFalse(service.isShutdown()); - final AtomicReference threadName = new AtomicReference(); - final AtomicReference threadIsDaemon = new AtomicReference(); - Future ran = service.submit(new Runnable() { - @Override - public void run() { - threadName.set(Thread.currentThread().getName()); - threadIsDaemon.set(Thread.currentThread().isDaemon()); - } - }); - ran.get(); - assertNotNull(threadName.get()); - assertTrue(threadName.get(), threadName.get().startsWith(Context.SCHEDULER.toString())); - assertNotNull(threadIsDaemon.get()); - assertTrue(threadIsDaemon.get()); - } finally { - Context.SCHEDULER.close(service); - } - assertTrue(service.isShutdown()); - } - @Test public void testKeyEqualsHashCode() { assertTrue(PET.equals(PET));