diff --git a/core/src/main/java/io/grpc/SynchronizationContext.java b/core/src/main/java/io/grpc/SynchronizationContext.java index 7a41ed2d2d..b577d18e6a 100644 --- a/core/src/main/java/io/grpc/SynchronizationContext.java +++ b/core/src/main/java/io/grpc/SynchronizationContext.java @@ -20,13 +20,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.ThreadSafe; /** @@ -52,13 +52,10 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4984") public final class SynchronizationContext implements Executor { - private final Object lock = new Object(); private final UncaughtExceptionHandler uncaughtExceptionHandler; - @GuardedBy("lock") - private final Queue queue = new ArrayDeque<>(); - @GuardedBy("lock") - private Thread drainingThread; + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final AtomicReference drainingThread = new AtomicReference<>(); /** * Creates a SynchronizationContext. @@ -80,29 +77,24 @@ public final class SynchronizationContext implements Executor { * have been or will eventually be run, while not requiring any more calls to {@code drain()}. */ public final void drain() { - boolean drainLeaseAcquired = false; - while (true) { - Runnable runnable; - synchronized (lock) { - if (!drainLeaseAcquired) { - if (drainingThread != null) { - return; - } - drainingThread = Thread.currentThread(); - drainLeaseAcquired = true; - } - runnable = queue.poll(); - if (runnable == null) { - drainingThread = null; - break; - } + do { + if (!drainingThread.compareAndSet(null, Thread.currentThread())) { + return; } try { - runnable.run(); - } catch (Throwable t) { - uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t); + Runnable runnable; + while ((runnable = queue.poll()) != null) { + try { + runnable.run(); + } catch (Throwable t) { + uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t); + } + } + } finally { + drainingThread.set(null); } - } + // must check queue again here to catch any added prior to clearing drainingThread + } while (!queue.isEmpty()); } /** @@ -113,13 +105,11 @@ public final class SynchronizationContext implements Executor { * #executeLater} in the lock, and call {@link #drain} outside the lock. */ public final void executeLater(Runnable runnable) { - synchronized (lock) { - queue.add(checkNotNull(runnable, "runnable is null")); - } + queue.add(checkNotNull(runnable, "runnable is null")); } /** - * Adds a task and run it in this synchronization context as soon as poassible. The task may run + * Adds a task and run it in this synchronization context as soon as possible. The task may run * inline. If there are tasks that are previously queued by {@link #executeLater} but have not * been run, this method will trigger them to be run before the given task. This is equivalent to * calling {@link #executeLater} immediately followed by {@link #drain}. @@ -135,11 +125,8 @@ public final class SynchronizationContext implements Executor { * context. */ public void throwIfNotInThisSynchronizationContext() { - synchronized (lock) { - checkState( - Thread.currentThread() == drainingThread, - "Not called from the SynchronizationContext"); - } + checkState(Thread.currentThread() == drainingThread.get(), + "Not called from the SynchronizationContext"); } /**