Move everything related to cancellation to CancellableContext.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-04-17 13:23:47 -07:00 committed by Eric Anderson
parent a649737e3a
commit 6bcc182b1b
2 changed files with 160 additions and 113 deletions

View File

@ -183,10 +183,6 @@ public class Context {
return current;
}
private ArrayList<ExecutableListener> listeners;
// parentListener is initialized when listeners is initialized, and uninitialized when
// listeners is uninitialized.
private ParentListener parentListener;
final CancellableContext cancellableAncestor;
final Node<Key<?>, Object> keyValueEntries;
// The number parents between this context and the root context.
@ -415,10 +411,6 @@ public class Context {
return new Context(keyValueEntries, generation + 1);
}
boolean canBeCancelled() {
return cancellableAncestor != null;
}
/**
* Attach this context, thus enter a new scope within which this context is {@link #current}. The
* previously current context is returned. It is allowed to attach contexts where {@link
@ -515,103 +507,30 @@ public class Context {
final Executor executor) {
checkNotNull(cancellationListener, "cancellationListener");
checkNotNull(executor, "executor");
if (canBeCancelled()) {
ExecutableListener executableListener =
new ExecutableListener(executor, cancellationListener);
synchronized (this) {
if (isCancelled()) {
executableListener.deliver();
} else {
if (listeners == null) {
// Now that we have a listener we need to listen to our parent so
// we can cascade listener notification.
listeners = new ArrayList<>();
listeners.add(executableListener);
if (cancellableAncestor != null) {
parentListener = new ParentListener();
cancellableAncestor.addListener(parentListener, DirectExecutor.INSTANCE);
}
} else {
listeners.add(executableListener);
}
}
}
if (cancellableAncestor == null) {
return;
}
cancellableAncestor.addListenerInternal(
new ExecutableListener(executor, cancellationListener, this));
}
/**
* Remove a {@link CancellationListener}.
*/
public void removeListener(CancellationListener cancellationListener) {
if (!canBeCancelled()) {
if (cancellableAncestor == null) {
return;
}
synchronized (this) {
if (listeners != null) {
for (int i = listeners.size() - 1; i >= 0; i--) {
if (listeners.get(i).listener == cancellationListener) {
listeners.remove(i);
// Just remove the first matching listener, given that we allow duplicate
// adds we should allow for duplicates after remove.
break;
}
}
// We have no listeners so no need to listen to our parent
if (listeners.isEmpty()) {
if (cancellableAncestor != null) {
cancellableAncestor.removeListener(parentListener);
}
parentListener = null;
listeners = null;
}
}
}
}
/**
* Notify all listeners that this context has been cancelled and immediately release
* any reference to them so that they may be garbage collected.
*/
void notifyAndClearListeners() {
if (!canBeCancelled()) {
return;
}
ArrayList<ExecutableListener> tmpListeners;
ParentListener tmpParentListener;
synchronized (this) {
if (listeners == null) {
return;
}
tmpParentListener = parentListener;
parentListener = null;
tmpListeners = listeners;
listeners = null;
}
// Deliver events to non-child context listeners before we notify child contexts. We do this
// to cancel higher level units of work before child units. This allows for a better error
// handling paradigm where the higher level unit of work knows it is cancelled and so can
// ignore errors that bubble up as a result of cancellation of lower level units.
for (int i = 0; i < tmpListeners.size(); i++) {
if (!(tmpListeners.get(i).listener instanceof ParentListener)) {
tmpListeners.get(i).deliver();
}
}
for (int i = 0; i < tmpListeners.size(); i++) {
if (tmpListeners.get(i).listener instanceof ParentListener) {
tmpListeners.get(i).deliver();
}
}
if (cancellableAncestor != null) {
cancellableAncestor.removeListener(tmpParentListener);
}
cancellableAncestor.removeListenerInternal(cancellationListener, this);
}
// Used in tests to ensure that listeners are defined and released when cancellation cascades.
// It's very important to ensure that we do not accidentally retain listeners.
int listenerCount() {
synchronized (this) {
return listeners == null ? 0 : listeners.size();
if (cancellableAncestor == null) {
return 0;
}
return cancellableAncestor.listenerCount();
}
/**
@ -747,9 +666,13 @@ public class Context {
private final Deadline deadline;
private final Context uncancellableSurrogate;
private boolean cancelled;
private ArrayList<ExecutableListener> listeners;
// parentListener is initialized when listeners is initialized (only if there is a
// cancellable ancestor), and uninitialized when listeners is uninitialized.
private CancellationListener parentListener;
private Throwable cancellationCause;
private ScheduledFuture<?> pendingDeadline;
private boolean cancelled;
/**
* Create a cancellable context that does not have a deadline.
@ -803,6 +726,73 @@ public class Context {
uncancellableSurrogate.detach(toAttach);
}
@Override
public void addListener(
final CancellationListener cancellationListener, final Executor executor) {
checkNotNull(cancellationListener, "cancellationListener");
checkNotNull(executor, "executor");
addListenerInternal(new ExecutableListener(executor, cancellationListener, this));
}
private void addListenerInternal(ExecutableListener executableListener) {
synchronized (this) {
if (isCancelled()) {
executableListener.deliver();
} else {
if (listeners == null) {
// Now that we have a listener we need to listen to our parent so
// we can cascade listener notification.
listeners = new ArrayList<>();
listeners.add(executableListener);
if (cancellableAncestor != null) {
parentListener =
new CancellationListener() {
@Override
public void cancelled(Context context) {
CancellableContext.this.cancel(context.cancellationCause());
}
};
cancellableAncestor.addListenerInternal(
new ExecutableListener(DirectExecutor.INSTANCE, parentListener, this));
}
} else {
listeners.add(executableListener);
}
}
}
}
@Override
public void removeListener(CancellationListener cancellationListener) {
removeListenerInternal(cancellationListener, this);
}
private void removeListenerInternal(CancellationListener cancellationListener,
Context context) {
synchronized (this) {
if (listeners != null) {
for (int i = listeners.size() - 1; i >= 0; i--) {
ExecutableListener executableListener = listeners.get(i);
if (executableListener.listener == cancellationListener
&& executableListener.context == context) {
listeners.remove(i);
// Just remove the first matching listener, given that we allow duplicate
// adds we should allow for duplicates after remove.
break;
}
}
// We have no listeners so no need to listen to our parent
if (listeners.isEmpty()) {
if (cancellableAncestor != null) {
cancellableAncestor.removeListener(parentListener);
}
parentListener = null;
listeners = null;
}
}
}
}
/**
* Returns true if the Context is the current context.
*
@ -848,6 +838,48 @@ public class Context {
return triggeredCancel;
}
/**
* Notify all listeners that this context has been cancelled and immediately release
* any reference to them so that they may be garbage collected.
*/
private void notifyAndClearListeners() {
ArrayList<ExecutableListener> tmpListeners;
CancellationListener tmpParentListener;
synchronized (this) {
if (listeners == null) {
return;
}
tmpParentListener = parentListener;
parentListener = null;
tmpListeners = listeners;
listeners = null;
}
// Deliver events to this context listeners before we notify child contexts. We do this
// to cancel higher level units of work before child units. This allows for a better error
// handling paradigm where the higher level unit of work knows it is cancelled and so can
// ignore errors that bubble up as a result of cancellation of lower level units.
for (ExecutableListener tmpListener : tmpListeners) {
if (tmpListener.context == this) {
tmpListener.deliver();
}
}
for (ExecutableListener tmpListener : tmpListeners) {
if (!(tmpListener.context == this)) {
tmpListener.deliver();
}
}
if (cancellableAncestor != null) {
cancellableAncestor.removeListener(tmpParentListener);
}
}
@Override
int listenerCount() {
synchronized (this) {
return listeners == null ? 0 : listeners.size();
}
}
/**
* Cancel this context and detach it as the current context.
*
@ -891,11 +923,6 @@ public class Context {
return deadline;
}
@Override
boolean canBeCancelled() {
return true;
}
/**
* Cleans up this object by calling {@code cancel(null)}.
*/
@ -1025,13 +1052,15 @@ public class Context {
/**
* Stores listener and executor pair.
*/
private final class ExecutableListener implements Runnable {
private static final class ExecutableListener implements Runnable {
private final Executor executor;
final CancellationListener listener;
private final Context context;
ExecutableListener(Executor executor, CancellationListener listener) {
ExecutableListener(Executor executor, CancellationListener listener, Context context) {
this.executor = executor;
this.listener = listener;
this.context = context;
}
void deliver() {
@ -1044,19 +1073,7 @@ public class Context {
@Override
public void run() {
listener.cancelled(Context.this);
}
}
private final class ParentListener implements CancellationListener {
@Override
public void cancelled(Context context) {
if (Context.this instanceof CancellableContext) {
// Record cancellation with its cancellationCause.
((CancellableContext) Context.this).cancel(context.cancellationCause());
} else {
notifyAndClearListeners();
}
listener.cancelled(context);
}
}

View File

@ -33,6 +33,9 @@ import com.google.common.util.concurrent.SettableFuture;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@ -300,6 +303,35 @@ public class ContextTest {
assertSame(base, observed3.get());
}
@Test
public void removeListenersFromContextAndChildContext() {
class SetContextCancellationListener implements Context.CancellationListener {
private final List<Context> observedContexts;
SetContextCancellationListener() {
this.observedContexts = Collections.synchronizedList(new ArrayList<Context>());
}
@Override
public void cancelled(Context context) {
observedContexts.add(context);
}
}
Context.CancellableContext base = Context.current().withCancellation();
Context child = base.withValue(PET, "tiger");
Context childOfChild = base.withValue(PET, "lion");
final SetContextCancellationListener listener = new SetContextCancellationListener();
base.addListener(listener, MoreExecutors.directExecutor());
child.addListener(listener, MoreExecutors.directExecutor());
childOfChild.addListener(listener, MoreExecutors.directExecutor());
base.removeListener(listener);
childOfChild.removeListener(listener);
base.cancel(null);
assertEquals(1, listener.observedContexts.size());
assertSame(child, listener.observedContexts.get(0));
}
@Test
public void exceptionOfExecutorDoesntThrow() {
final AtomicReference<Throwable> loggedThrowable = new AtomicReference<>();
@ -404,7 +436,6 @@ public class ContextTest {
assertSame("fish", FOOD.get());
assertFalse(attached.isCancelled());
assertNull(attached.cancellationCause());
assertTrue(attached.canBeCancelled());
assertTrue(attached.isCurrent());
assertTrue(base.isCurrent());
@ -892,7 +923,6 @@ public class ContextTest {
@Test
public void cancellableAncestorTest() {
Context c = Context.current();
assertFalse(c.canBeCancelled());
assertNull(cancellableAncestor(c));
Context.CancellableContext withCancellation = c.withCancellation();