Require a ScheduledExecutorService to be provided to Context

Before the service would be leaked, because when the scheduled future
was cancelled the scheduler wouldn't be released. Also, Future isn't
powerful enough to signal when we should release when cancelling.

Given the nature of Context, it also seems beneficial to not have it own
threads. Since any caller of withDeadline*() is required to cancel the
Context, it shouldn't be too burdensome for them to manage the lifecycle
of the scheduler.
This commit is contained in:
Eric Anderson 2016-01-23 09:32:41 -08:00
parent 4a427b7ac9
commit 6e94cf33db
2 changed files with 24 additions and 83 deletions

View File

@ -33,16 +33,12 @@ package io.grpc;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.internal.SharedResourceHolder;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -114,33 +110,6 @@ public class Context {
private static final Logger LOG = Logger.getLogger(Context.class.getName()); private static final Logger LOG = Logger.getLogger(Context.class.getName());
/**
* Use a shared resource to retain the {@link ScheduledExecutorService} used to
* implement deadline based context cancellation. This allows the executor to be
* shutdown if its not in use thereby allowing Context to be unloaded.
*/
static final SharedResourceHolder.Resource<ScheduledExecutorService> SCHEDULER =
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
private static final String name = "context-scheduler";
@Override
public ScheduledExecutorService create() {
return Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat(name + "-%d")
.setDaemon(true)
.build());
}
@Override
public void close(ScheduledExecutorService instance) {
instance.shutdown();
}
@Override
public String toString() {
return name;
}
};
private static final Object[][] EMPTY_ENTRIES = new Object[0][2]; private static final Object[][] EMPTY_ENTRIES = new Object[0][2];
/** /**
@ -268,8 +237,9 @@ public class Context {
* a received existing deadline. When establishing a new deadline, {@link #withDeadlineAfter} * a received existing deadline. When establishing a new deadline, {@link #withDeadlineAfter}
* is the better mechanism. * is the better mechanism.
*/ */
public CancellableContext withDeadlineNanoTime(long deadlineNanoTime) { public CancellableContext withDeadlineNanoTime(long deadlineNanoTime,
return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS); ScheduledExecutorService scheduler) {
return withDeadlineAfter(deadlineNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS, scheduler);
} }
/** /**
@ -280,7 +250,7 @@ public class Context {
* <p>Sample usage: * <p>Sample usage:
* <pre> * <pre>
* Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5, * Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5,
* TimeUnit.SECONDS); * TimeUnit.SECONDS, scheduler);
* executorService.execute(withDeadline.wrap(new Runnable() { * executorService.execute(withDeadline.wrap(new Runnable() {
* public void run() { * public void run() {
* Context current = Context.current(); * Context current = Context.current();
@ -291,10 +261,12 @@ public class Context {
* }); * });
* </pre> * </pre>
*/ */
public CancellableContext withDeadlineAfter(long duration, TimeUnit unit) { public CancellableContext withDeadlineAfter(long duration, TimeUnit unit,
ScheduledExecutorService scheduler) {
Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0"); Preconditions.checkArgument(duration >= 0, "duration must be greater than or equal to 0");
Preconditions.checkNotNull(unit, "unit"); Preconditions.checkNotNull(unit, "unit");
return new CancellableContext(this, unit.toNanos(duration)); Preconditions.checkNotNull(scheduler, "scheduler");
return new CancellableContext(this, unit.toNanos(duration), scheduler);
} }
/** /**
@ -611,17 +583,13 @@ public class Context {
/** /**
* Create a cancellable context that has a deadline. * Create a cancellable context that has a deadline.
*/ */
private CancellableContext(Context parent, long delayNanos) { private CancellableContext(Context parent, long delayNanos,
ScheduledExecutorService scheduler) {
this(parent); this(parent);
final ScheduledExecutorService scheduler = SharedResourceHolder.get(SCHEDULER);
scheduledFuture = scheduler.schedule(new Runnable() { scheduledFuture = scheduler.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
try { cancel(new TimeoutException("context timed out"));
cancel(new TimeoutException("context timed out"));
} finally {
SharedResourceHolder.release(SCHEDULER, scheduler);
}
} }
}, delayNanos, TimeUnit.NANOSECONDS); }, delayNanos, TimeUnit.NANOSECONDS);
} }

View File

@ -43,8 +43,6 @@ import static org.junit.Assert.fail;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.internal.SharedResourceHolder;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -57,9 +55,9 @@ import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -97,6 +95,7 @@ public class ContextTest {
observed = Context.current(); observed = Context.current();
} }
}; };
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -105,6 +104,7 @@ public class ContextTest {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
scheduler.shutdown();
} }
@Test @Test
@ -611,7 +611,7 @@ public class ContextTest {
@Test @Test
public void absoluteDeadlineTriggersAndPropagates() throws Exception { public void absoluteDeadlineTriggersAndPropagates() throws Exception {
Context base = Context.current().withDeadlineNanoTime(System.nanoTime() Context base = Context.current().withDeadlineNanoTime(System.nanoTime()
+ TimeUnit.SECONDS.toNanos(1)); + TimeUnit.SECONDS.toNanos(1), scheduler);
Context child = base.withValue(FOOD, "lasagna"); Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor()); child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled()); assertFalse(base.isCancelled());
@ -626,12 +626,12 @@ public class ContextTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void negativeDeadlineFails() { public void negativeDeadlineFails() {
Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS); Context.current().withDeadlineAfter(-1, TimeUnit.NANOSECONDS, scheduler);
} }
@Test @Test
public void relativeDeadlineTriggersAndPropagates() throws Exception { public void relativeDeadlineTriggersAndPropagates() throws Exception {
Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS); Context base = Context.current().withDeadlineAfter(1, TimeUnit.SECONDS, scheduler);
Context child = base.withValue(FOOD, "lasagna"); Context child = base.withValue(FOOD, "lasagna");
child.addListener(cancellationListener, MoreExecutors.directExecutor()); child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled()); assertFalse(base.isCancelled());
@ -646,8 +646,8 @@ public class ContextTest {
@Test @Test
public void innerDeadlineCompletesBeforeOuter() throws Exception { public void innerDeadlineCompletesBeforeOuter() throws Exception {
Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS); Context base = Context.current().withDeadlineAfter(2, TimeUnit.SECONDS, scheduler);
Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS); Context child = base.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler);
child.addListener(cancellationListener, MoreExecutors.directExecutor()); child.addListener(cancellationListener, MoreExecutors.directExecutor());
assertFalse(base.isCancelled()); assertFalse(base.isCancelled());
assertFalse(child.isCancelled()); assertFalse(child.isCancelled());
@ -667,47 +667,20 @@ public class ContextTest {
@Test @Test
public void cancellationCancelsScheduledTask() { public void cancellationCancelsScheduledTask() {
ScheduledExecutorService service = SharedResourceHolder.get(Context.SCHEDULER); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
try { try {
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
executor.purge();
assertEquals(0, executor.getQueue().size()); assertEquals(0, executor.getQueue().size());
Context.CancellableContext base = Context.current().withDeadlineAfter(1, TimeUnit.DAYS); Context.CancellableContext base
= Context.current().withDeadlineAfter(1, TimeUnit.DAYS, executor);
assertEquals(1, executor.getQueue().size()); assertEquals(1, executor.getQueue().size());
base.cancel(null); base.cancel(null);
executor.purge(); executor.purge();
assertEquals(0, executor.getQueue().size()); assertEquals(0, executor.getQueue().size());
} finally { } finally {
SharedResourceHolder.release(Context.SCHEDULER, service); executor.shutdown();
} }
} }
@Test
public void testScheduler() throws Exception {
assertEquals("context-scheduler", Context.SCHEDULER.toString());
ScheduledExecutorService service = Context.SCHEDULER.create();
try {
assertFalse(service.isShutdown());
final AtomicReference<String> threadName = new AtomicReference<String>();
final AtomicReference<Boolean> threadIsDaemon = new AtomicReference<Boolean>();
Future<?> ran = service.submit(new Runnable() {
@Override
public void run() {
threadName.set(Thread.currentThread().getName());
threadIsDaemon.set(Thread.currentThread().isDaemon());
}
});
ran.get();
assertNotNull(threadName.get());
assertTrue(threadName.get(), threadName.get().startsWith(Context.SCHEDULER.toString()));
assertNotNull(threadIsDaemon.get());
assertTrue(threadIsDaemon.get());
} finally {
Context.SCHEDULER.close(service);
}
assertTrue(service.isShutdown());
}
@Test @Test
public void testKeyEqualsHashCode() { public void testKeyEqualsHashCode() {
assertTrue(PET.equals(PET)); assertTrue(PET.equals(PET));