core: Avoid locks in SynchronizationContext (#5504)

Similar to what's done in SerializingExecutor, it's easy to make SynchronizationContext non-blocking.
This commit is contained in:
Nick Hill 2019-03-26 13:54:01 -07:00 committed by Kun Zhang
parent 53f4ad21b4
commit 8e41f6e43b
1 changed files with 23 additions and 36 deletions

View File

@ -20,13 +20,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy; import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
/** /**
@ -52,13 +52,10 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe @ThreadSafe
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4984") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4984")
public final class SynchronizationContext implements Executor { public final class SynchronizationContext implements Executor {
private final Object lock = new Object();
private final UncaughtExceptionHandler uncaughtExceptionHandler; private final UncaughtExceptionHandler uncaughtExceptionHandler;
@GuardedBy("lock") private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final Queue<Runnable> queue = new ArrayDeque<>(); private final AtomicReference<Thread> drainingThread = new AtomicReference<>();
@GuardedBy("lock")
private Thread drainingThread;
/** /**
* Creates a SynchronizationContext. * Creates a SynchronizationContext.
@ -80,29 +77,24 @@ public final class SynchronizationContext implements Executor {
* have been or will eventually be run, while not requiring any more calls to {@code drain()}. * have been or will eventually be run, while not requiring any more calls to {@code drain()}.
*/ */
public final void drain() { public final void drain() {
boolean drainLeaseAcquired = false; do {
while (true) { if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
Runnable runnable; return;
synchronized (lock) {
if (!drainLeaseAcquired) {
if (drainingThread != null) {
return;
}
drainingThread = Thread.currentThread();
drainLeaseAcquired = true;
}
runnable = queue.poll();
if (runnable == null) {
drainingThread = null;
break;
}
} }
try { try {
runnable.run(); Runnable runnable;
} catch (Throwable t) { while ((runnable = queue.poll()) != null) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t); try {
runnable.run();
} catch (Throwable t) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
}
}
} finally {
drainingThread.set(null);
} }
} // must check queue again here to catch any added prior to clearing drainingThread
} while (!queue.isEmpty());
} }
/** /**
@ -113,13 +105,11 @@ public final class SynchronizationContext implements Executor {
* #executeLater} in the lock, and call {@link #drain} outside the lock. * #executeLater} in the lock, and call {@link #drain} outside the lock.
*/ */
public final void executeLater(Runnable runnable) { public final void executeLater(Runnable runnable) {
synchronized (lock) { queue.add(checkNotNull(runnable, "runnable is null"));
queue.add(checkNotNull(runnable, "runnable is null"));
}
} }
/** /**
* Adds a task and run it in this synchronization context as soon as poassible. The task may run * Adds a task and run it in this synchronization context as soon as possible. The task may run
* inline. If there are tasks that are previously queued by {@link #executeLater} but have not * inline. If there are tasks that are previously queued by {@link #executeLater} but have not
* been run, this method will trigger them to be run before the given task. This is equivalent to * been run, this method will trigger them to be run before the given task. This is equivalent to
* calling {@link #executeLater} immediately followed by {@link #drain}. * calling {@link #executeLater} immediately followed by {@link #drain}.
@ -135,11 +125,8 @@ public final class SynchronizationContext implements Executor {
* context. * context.
*/ */
public void throwIfNotInThisSynchronizationContext() { public void throwIfNotInThisSynchronizationContext() {
synchronized (lock) { checkState(Thread.currentThread() == drainingThread.get(),
checkState( "Not called from the SynchronizationContext");
Thread.currentThread() == drainingThread,
"Not called from the SynchronizationContext");
}
} }
/** /**