diff --git a/context/src/main/java/io/grpc/Context.java b/context/src/main/java/io/grpc/Context.java index 27f72a8248..5a4a7bf444 100644 --- a/context/src/main/java/io/grpc/Context.java +++ b/context/src/main/java/io/grpc/Context.java @@ -183,10 +183,6 @@ public class Context { return current; } - private ArrayList listeners; - // parentListener is initialized when listeners is initialized, and uninitialized when - // listeners is uninitialized. - private ParentListener parentListener; final CancellableContext cancellableAncestor; final Node, Object> keyValueEntries; // The number parents between this context and the root context. @@ -415,10 +411,6 @@ public class Context { return new Context(keyValueEntries, generation + 1); } - boolean canBeCancelled() { - return cancellableAncestor != null; - } - /** * Attach this context, thus enter a new scope within which this context is {@link #current}. The * previously current context is returned. It is allowed to attach contexts where {@link @@ -515,103 +507,30 @@ public class Context { final Executor executor) { checkNotNull(cancellationListener, "cancellationListener"); checkNotNull(executor, "executor"); - if (canBeCancelled()) { - ExecutableListener executableListener = - new ExecutableListener(executor, cancellationListener); - synchronized (this) { - if (isCancelled()) { - executableListener.deliver(); - } else { - if (listeners == null) { - // Now that we have a listener we need to listen to our parent so - // we can cascade listener notification. - listeners = new ArrayList<>(); - listeners.add(executableListener); - if (cancellableAncestor != null) { - parentListener = new ParentListener(); - cancellableAncestor.addListener(parentListener, DirectExecutor.INSTANCE); - } - } else { - listeners.add(executableListener); - } - } - } + if (cancellableAncestor == null) { + return; } + cancellableAncestor.addListenerInternal( + new ExecutableListener(executor, cancellationListener, this)); } /** * Remove a {@link CancellationListener}. */ public void removeListener(CancellationListener cancellationListener) { - if (!canBeCancelled()) { + if (cancellableAncestor == null) { return; } - synchronized (this) { - if (listeners != null) { - for (int i = listeners.size() - 1; i >= 0; i--) { - if (listeners.get(i).listener == cancellationListener) { - listeners.remove(i); - // Just remove the first matching listener, given that we allow duplicate - // adds we should allow for duplicates after remove. - break; - } - } - // We have no listeners so no need to listen to our parent - if (listeners.isEmpty()) { - if (cancellableAncestor != null) { - cancellableAncestor.removeListener(parentListener); - } - parentListener = null; - listeners = null; - } - } - } - } - - /** - * Notify all listeners that this context has been cancelled and immediately release - * any reference to them so that they may be garbage collected. - */ - void notifyAndClearListeners() { - if (!canBeCancelled()) { - return; - } - ArrayList tmpListeners; - ParentListener tmpParentListener; - synchronized (this) { - if (listeners == null) { - return; - } - tmpParentListener = parentListener; - parentListener = null; - tmpListeners = listeners; - listeners = null; - } - // Deliver events to non-child context listeners before we notify child contexts. We do this - // to cancel higher level units of work before child units. This allows for a better error - // handling paradigm where the higher level unit of work knows it is cancelled and so can - // ignore errors that bubble up as a result of cancellation of lower level units. - for (int i = 0; i < tmpListeners.size(); i++) { - if (!(tmpListeners.get(i).listener instanceof ParentListener)) { - tmpListeners.get(i).deliver(); - } - } - for (int i = 0; i < tmpListeners.size(); i++) { - if (tmpListeners.get(i).listener instanceof ParentListener) { - tmpListeners.get(i).deliver(); - } - } - if (cancellableAncestor != null) { - cancellableAncestor.removeListener(tmpParentListener); - } + cancellableAncestor.removeListenerInternal(cancellationListener, this); } // Used in tests to ensure that listeners are defined and released when cancellation cascades. // It's very important to ensure that we do not accidentally retain listeners. int listenerCount() { - synchronized (this) { - return listeners == null ? 0 : listeners.size(); + if (cancellableAncestor == null) { + return 0; } + return cancellableAncestor.listenerCount(); } /** @@ -747,9 +666,13 @@ public class Context { private final Deadline deadline; private final Context uncancellableSurrogate; - private boolean cancelled; + private ArrayList listeners; + // parentListener is initialized when listeners is initialized (only if there is a + // cancellable ancestor), and uninitialized when listeners is uninitialized. + private CancellationListener parentListener; private Throwable cancellationCause; private ScheduledFuture pendingDeadline; + private boolean cancelled; /** * Create a cancellable context that does not have a deadline. @@ -803,6 +726,73 @@ public class Context { uncancellableSurrogate.detach(toAttach); } + @Override + public void addListener( + final CancellationListener cancellationListener, final Executor executor) { + checkNotNull(cancellationListener, "cancellationListener"); + checkNotNull(executor, "executor"); + addListenerInternal(new ExecutableListener(executor, cancellationListener, this)); + } + + private void addListenerInternal(ExecutableListener executableListener) { + synchronized (this) { + if (isCancelled()) { + executableListener.deliver(); + } else { + if (listeners == null) { + // Now that we have a listener we need to listen to our parent so + // we can cascade listener notification. + listeners = new ArrayList<>(); + listeners.add(executableListener); + if (cancellableAncestor != null) { + parentListener = + new CancellationListener() { + @Override + public void cancelled(Context context) { + CancellableContext.this.cancel(context.cancellationCause()); + } + }; + cancellableAncestor.addListenerInternal( + new ExecutableListener(DirectExecutor.INSTANCE, parentListener, this)); + } + } else { + listeners.add(executableListener); + } + } + } + } + + @Override + public void removeListener(CancellationListener cancellationListener) { + removeListenerInternal(cancellationListener, this); + } + + private void removeListenerInternal(CancellationListener cancellationListener, + Context context) { + synchronized (this) { + if (listeners != null) { + for (int i = listeners.size() - 1; i >= 0; i--) { + ExecutableListener executableListener = listeners.get(i); + if (executableListener.listener == cancellationListener + && executableListener.context == context) { + listeners.remove(i); + // Just remove the first matching listener, given that we allow duplicate + // adds we should allow for duplicates after remove. + break; + } + } + // We have no listeners so no need to listen to our parent + if (listeners.isEmpty()) { + if (cancellableAncestor != null) { + cancellableAncestor.removeListener(parentListener); + } + parentListener = null; + listeners = null; + } + } + } + } + /** * Returns true if the Context is the current context. * @@ -848,6 +838,48 @@ public class Context { return triggeredCancel; } + /** + * Notify all listeners that this context has been cancelled and immediately release + * any reference to them so that they may be garbage collected. + */ + private void notifyAndClearListeners() { + ArrayList tmpListeners; + CancellationListener tmpParentListener; + synchronized (this) { + if (listeners == null) { + return; + } + tmpParentListener = parentListener; + parentListener = null; + tmpListeners = listeners; + listeners = null; + } + // Deliver events to this context listeners before we notify child contexts. We do this + // to cancel higher level units of work before child units. This allows for a better error + // handling paradigm where the higher level unit of work knows it is cancelled and so can + // ignore errors that bubble up as a result of cancellation of lower level units. + for (ExecutableListener tmpListener : tmpListeners) { + if (tmpListener.context == this) { + tmpListener.deliver(); + } + } + for (ExecutableListener tmpListener : tmpListeners) { + if (!(tmpListener.context == this)) { + tmpListener.deliver(); + } + } + if (cancellableAncestor != null) { + cancellableAncestor.removeListener(tmpParentListener); + } + } + + @Override + int listenerCount() { + synchronized (this) { + return listeners == null ? 0 : listeners.size(); + } + } + /** * Cancel this context and detach it as the current context. * @@ -891,11 +923,6 @@ public class Context { return deadline; } - @Override - boolean canBeCancelled() { - return true; - } - /** * Cleans up this object by calling {@code cancel(null)}. */ @@ -1025,13 +1052,15 @@ public class Context { /** * Stores listener and executor pair. */ - private final class ExecutableListener implements Runnable { + private static final class ExecutableListener implements Runnable { private final Executor executor; final CancellationListener listener; + private final Context context; - ExecutableListener(Executor executor, CancellationListener listener) { + ExecutableListener(Executor executor, CancellationListener listener, Context context) { this.executor = executor; this.listener = listener; + this.context = context; } void deliver() { @@ -1044,19 +1073,7 @@ public class Context { @Override public void run() { - listener.cancelled(Context.this); - } - } - - private final class ParentListener implements CancellationListener { - @Override - public void cancelled(Context context) { - if (Context.this instanceof CancellableContext) { - // Record cancellation with its cancellationCause. - ((CancellableContext) Context.this).cancel(context.cancellationCause()); - } else { - notifyAndClearListeners(); - } + listener.cancelled(context); } } diff --git a/context/src/test/java/io/grpc/ContextTest.java b/context/src/test/java/io/grpc/ContextTest.java index 8e783baaf2..bf078fcff9 100644 --- a/context/src/test/java/io/grpc/ContextTest.java +++ b/context/src/test/java/io/grpc/ContextTest.java @@ -33,6 +33,9 @@ import com.google.common.util.concurrent.SettableFuture; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -300,6 +303,35 @@ public class ContextTest { assertSame(base, observed3.get()); } + @Test + public void removeListenersFromContextAndChildContext() { + class SetContextCancellationListener implements Context.CancellationListener { + private final List observedContexts; + + SetContextCancellationListener() { + this.observedContexts = Collections.synchronizedList(new ArrayList()); + } + + @Override + public void cancelled(Context context) { + observedContexts.add(context); + } + } + + Context.CancellableContext base = Context.current().withCancellation(); + Context child = base.withValue(PET, "tiger"); + Context childOfChild = base.withValue(PET, "lion"); + final SetContextCancellationListener listener = new SetContextCancellationListener(); + base.addListener(listener, MoreExecutors.directExecutor()); + child.addListener(listener, MoreExecutors.directExecutor()); + childOfChild.addListener(listener, MoreExecutors.directExecutor()); + base.removeListener(listener); + childOfChild.removeListener(listener); + base.cancel(null); + assertEquals(1, listener.observedContexts.size()); + assertSame(child, listener.observedContexts.get(0)); + } + @Test public void exceptionOfExecutorDoesntThrow() { final AtomicReference loggedThrowable = new AtomicReference<>(); @@ -404,7 +436,6 @@ public class ContextTest { assertSame("fish", FOOD.get()); assertFalse(attached.isCancelled()); assertNull(attached.cancellationCause()); - assertTrue(attached.canBeCancelled()); assertTrue(attached.isCurrent()); assertTrue(base.isCurrent()); @@ -892,7 +923,6 @@ public class ContextTest { @Test public void cancellableAncestorTest() { Context c = Context.current(); - assertFalse(c.canBeCancelled()); assertNull(cancellableAncestor(c)); Context.CancellableContext withCancellation = c.withCancellation();