diff --git a/core/src/main/java/io/grpc/Context.java b/core/src/main/java/io/grpc/Context.java index 1a770fac2f..8f2a5305b4 100644 --- a/core/src/main/java/io/grpc/Context.java +++ b/core/src/main/java/io/grpc/Context.java @@ -33,16 +33,12 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import io.grpc.internal.SharedResourceHolder; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -114,33 +110,6 @@ public class Context { private static final Logger LOG = Logger.getLogger(Context.class.getName()); - /** - * Use a shared resource to retain the {@link ScheduledExecutorService} used to - * implement deadline based context cancellation. This allows the executor to be - * shutdown if its not in use thereby allowing Context to be unloaded. - */ - static final SharedResourceHolder.Resource SCHEDULER = - new SharedResourceHolder.Resource() { - private static final String name = "context-scheduler"; - @Override - public ScheduledExecutorService create() { - return Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(name + "-%d") - .setDaemon(true) - .build()); - } - - @Override - public void close(ScheduledExecutorService instance) { - instance.shutdown(); - } - - @Override - public String toString() { - return name; - } - }; - private static final Object[][] EMPTY_ENTRIES = new Object[0][2]; /** @@ -268,8 +237,9 @@ public class Context { * a received existing deadline. When establishing a new deadline, {@link #withDeadlineAfter} * is the better mechanism. */ - public CancellableContext withDeadlineNanoTime(long deadlineNanoTime) { - return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS); + public CancellableContext withDeadlineNanoTime(long deadlineNanoTime, + ScheduledExecutorService scheduler) { + return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS, scheduler); } /** @@ -280,7 +250,7 @@ public class Context { *

Sample usage: *

    *   Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5,
-   *       TimeUnit.SECONDS);
+   *       TimeUnit.SECONDS, scheduler);
    *   executorService.execute(withDeadline.wrap(new Runnable() {
    *     public void run() {
    *       Context current = Context.current();
@@ -291,10 +261,12 @@ public class Context {
    *   });
    * 
*/ - public CancellableContext withDeadlineAfter(long duration, TimeUnit unit) { + public CancellableContext withDeadlineAfter(long duration, TimeUnit unit, + ScheduledExecutorService scheduler) { Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0"); Preconditions.checkNotNull(unit, "unit"); - return new CancellableContext(this, unit.toNanos(duration)); + Preconditions.checkNotNull(scheduler, "scheduler"); + return new CancellableContext(this, unit.toNanos(duration), scheduler); } /** @@ -611,17 +583,13 @@ public class Context { /** * Create a cancellable context that has a deadline. */ - private CancellableContext(Context parent, long delayNanos) { + private CancellableContext(Context parent, long delayNanos, + ScheduledExecutorService scheduler) { this(parent); - final ScheduledExecutorService scheduler = SharedResourceHolder.get(SCHEDULER); scheduledFuture = scheduler.schedule(new Runnable() { @Override public void run() { - try { - cancel(new TimeoutException("context timed out")); - } finally { - SharedResourceHolder.release(SCHEDULER, scheduler); - } + cancel(new TimeoutException("context timed out")); } }, delayNanos, TimeUnit.NANOSECONDS); } diff --git a/core/src/test/java/io/grpc/ContextTest.java b/core/src/test/java/io/grpc/ContextTest.java index 19853bd076..77e52ebdc7 100644 --- a/core/src/test/java/io/grpc/ContextTest.java +++ b/core/src/test/java/io/grpc/ContextTest.java @@ -43,8 +43,6 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import io.grpc.internal.SharedResourceHolder; - import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,9 +55,9 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.Future; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -97,6 +95,7 @@ public class ContextTest { observed = Context.current(); } }; + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); @Before public void setUp() throws Exception { @@ -105,6 +104,7 @@ public class ContextTest { @After public void tearDown() throws Exception { + scheduler.shutdown(); } @Test @@ -611,7 +611,7 @@ public class ContextTest { @Test public void absoluteDeadlineTriggersAndPropagates() throws Exception { Context base = Context.current().withDeadlineNanoTime(System.nanoTime() - + TimeUnit.SECONDS.toNanos(1)); + + TimeUnit.SECONDS.toNanos(1), scheduler); Context child = base.withValue(FOOD, "lasagna"); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); @@ -626,12 +626,12 @@ public class ContextTest { @Test(expected = IllegalArgumentException.class) public void negativeDeadlineFails() { - Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS); + Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS, scheduler); } @Test public void relativeDeadlineTriggersAndPropagates() throws Exception { - Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS); + Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS, scheduler); Context child = base.withValue(FOOD, "lasagna"); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); @@ -646,8 +646,8 @@ public class ContextTest { @Test public void innerDeadlineCompletesBeforeOuter() throws Exception { - Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS); - Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS); + Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS, scheduler); + Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler); child.addListener(cancellationListener, MoreExecutors.directExecutor()); assertFalse(base.isCancelled()); assertFalse(child.isCancelled()); @@ -667,47 +667,20 @@ public class ContextTest { @Test public void cancellationCancelsScheduledTask() { - ScheduledExecutorService service = SharedResourceHolder.get(Context.SCHEDULER); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); try { - ThreadPoolExecutor executor = (ThreadPoolExecutor) service; - executor.purge(); assertEquals(0, executor.getQueue().size()); - Context.CancellableContext base = Context.current().withDeadlineAfter(1, TimeUnit.DAYS); + Context.CancellableContext base + = Context.current().withDeadlineAfter(1, TimeUnit.DAYS, executor); assertEquals(1, executor.getQueue().size()); base.cancel(null); executor.purge(); assertEquals(0, executor.getQueue().size()); } finally { - SharedResourceHolder.release(Context.SCHEDULER, service); + executor.shutdown(); } } - @Test - public void testScheduler() throws Exception { - assertEquals("context-scheduler", Context.SCHEDULER.toString()); - ScheduledExecutorService service = Context.SCHEDULER.create(); - try { - assertFalse(service.isShutdown()); - final AtomicReference threadName = new AtomicReference(); - final AtomicReference threadIsDaemon = new AtomicReference(); - Future ran = service.submit(new Runnable() { - @Override - public void run() { - threadName.set(Thread.currentThread().getName()); - threadIsDaemon.set(Thread.currentThread().isDaemon()); - } - }); - ran.get(); - assertNotNull(threadName.get()); - assertTrue(threadName.get(), threadName.get().startsWith(Context.SCHEDULER.toString())); - assertNotNull(threadIsDaemon.get()); - assertTrue(threadIsDaemon.get()); - } finally { - Context.SCHEDULER.close(service); - } - assertTrue(service.isShutdown()); - } - @Test public void testKeyEqualsHashCode() { assertTrue(PET.equals(PET));