mirror of https://github.com/grpc/grpc-java.git
Notify listeners before notifying child contexts of cancellation
Improve performance of isCancellable check
This commit is contained in:
parent
ece7402dc8
commit
73e2a235e7
|
|
@ -193,17 +193,8 @@ public class Context {
|
||||||
private final Object[][] keyValueEntries;
|
private final Object[][] keyValueEntries;
|
||||||
private final boolean cascadesCancellation;
|
private final boolean cascadesCancellation;
|
||||||
private ArrayList<ExecutableListener> listeners;
|
private ArrayList<ExecutableListener> listeners;
|
||||||
private CancellationListener parentListener = new CancellationListener() {
|
private CancellationListener parentListener = new ParentListener();
|
||||||
@Override
|
private final boolean canBeCancelled;
|
||||||
public void cancelled(Context context) {
|
|
||||||
if (Context.this instanceof CancellableContext) {
|
|
||||||
// Record cancellation with its cause.
|
|
||||||
((CancellableContext) Context.this).cancel(context.cause());
|
|
||||||
} else {
|
|
||||||
notifyAndClearListeners();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a context that cannot be cancelled and will not cascade cancellation from its parent.
|
* Construct a context that cannot be cancelled and will not cascade cancellation from its parent.
|
||||||
|
|
@ -212,6 +203,7 @@ public class Context {
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
keyValueEntries = EMPTY_ENTRIES;
|
keyValueEntries = EMPTY_ENTRIES;
|
||||||
cascadesCancellation = false;
|
cascadesCancellation = false;
|
||||||
|
canBeCancelled = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -222,6 +214,18 @@ public class Context {
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
this.keyValueEntries = keyValueEntries;
|
this.keyValueEntries = keyValueEntries;
|
||||||
cascadesCancellation = true;
|
cascadesCancellation = true;
|
||||||
|
canBeCancelled = this.parent != null && this.parent.canBeCancelled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a context that can be cancelled and will cascade cancellation from its parent if
|
||||||
|
* it is cancellable.
|
||||||
|
*/
|
||||||
|
private Context(Context parent, Object[][] keyValueEntries, boolean isCancellable) {
|
||||||
|
this.parent = parent;
|
||||||
|
this.keyValueEntries = keyValueEntries;
|
||||||
|
cascadesCancellation = true;
|
||||||
|
canBeCancelled = isCancellable;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -335,8 +339,8 @@ public class Context {
|
||||||
|
|
||||||
boolean canBeCancelled() {
|
boolean canBeCancelled() {
|
||||||
// A context is cancellable if it cascades from its parent and its parent is
|
// A context is cancellable if it cascades from its parent and its parent is
|
||||||
// cancellable.
|
// cancellable or is itself directly cancellable..
|
||||||
return (cascadesCancellation && this.parent != null && this.parent.canBeCancelled());
|
return canBeCancelled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -407,7 +411,7 @@ public class Context {
|
||||||
final Executor executor) {
|
final Executor executor) {
|
||||||
Preconditions.checkNotNull(cancellationListener);
|
Preconditions.checkNotNull(cancellationListener);
|
||||||
Preconditions.checkNotNull(executor);
|
Preconditions.checkNotNull(executor);
|
||||||
if (canBeCancelled()) {
|
if (canBeCancelled) {
|
||||||
ExecutableListener executableListener =
|
ExecutableListener executableListener =
|
||||||
new ExecutableListener(executor, cancellationListener);
|
new ExecutableListener(executor, cancellationListener);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
|
@ -425,8 +429,6 @@ public class Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// Discussion point: Should we throw or suppress.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -434,13 +436,16 @@ public class Context {
|
||||||
* Remove a {@link CancellationListener}.
|
* Remove a {@link CancellationListener}.
|
||||||
*/
|
*/
|
||||||
public void removeListener(CancellationListener cancellationListener) {
|
public void removeListener(CancellationListener cancellationListener) {
|
||||||
|
if (!canBeCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (listeners != null) {
|
if (listeners != null) {
|
||||||
for (int i = listeners.size() - 1; i >= 0; i--) {
|
for (int i = listeners.size() - 1; i >= 0; i--) {
|
||||||
if (listeners.get(i).listener == cancellationListener) {
|
if (listeners.get(i).listener == cancellationListener) {
|
||||||
listeners.remove(i);
|
listeners.remove(i);
|
||||||
// Just remove the first matching listener, given that we allow duplicate adds we should
|
// Just remove the first matching listener, given that we allow duplicate
|
||||||
// allow for duplicates after remove.
|
// adds we should allow for duplicates after remove.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -458,6 +463,9 @@ public class Context {
|
||||||
* any reference to them so that they may be garbage collected.
|
* any reference to them so that they may be garbage collected.
|
||||||
*/
|
*/
|
||||||
void notifyAndClearListeners() {
|
void notifyAndClearListeners() {
|
||||||
|
if (!canBeCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
ArrayList<ExecutableListener> tmpListeners;
|
ArrayList<ExecutableListener> tmpListeners;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (listeners == null) {
|
if (listeners == null) {
|
||||||
|
|
@ -466,9 +474,20 @@ public class Context {
|
||||||
tmpListeners = listeners;
|
tmpListeners = listeners;
|
||||||
listeners = null;
|
listeners = null;
|
||||||
}
|
}
|
||||||
|
// Deliver events to non-child context listeners before we notify child contexts. We do this
|
||||||
|
// to cancel higher level units of work before child units. This allows for a better error
|
||||||
|
// handling paradigm where the higher level unit of work knows it is cancelled and so can
|
||||||
|
// ignore errors that bubble up as a result of cancellation of lower level units.
|
||||||
for (int i = 0; i < tmpListeners.size(); i++) {
|
for (int i = 0; i < tmpListeners.size(); i++) {
|
||||||
|
if (!(tmpListeners.get(i).listener instanceof ParentListener)) {
|
||||||
tmpListeners.get(i).deliver();
|
tmpListeners.get(i).deliver();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < tmpListeners.size(); i++) {
|
||||||
|
if (tmpListeners.get(i).listener instanceof ParentListener) {
|
||||||
|
tmpListeners.get(i).deliver();
|
||||||
|
}
|
||||||
|
}
|
||||||
parent.removeListener(parentListener);
|
parent.removeListener(parentListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -581,7 +600,7 @@ public class Context {
|
||||||
* Create a cancellable context that does not have a deadline.
|
* Create a cancellable context that does not have a deadline.
|
||||||
*/
|
*/
|
||||||
private CancellableContext(Context parent) {
|
private CancellableContext(Context parent) {
|
||||||
super(parent, EMPTY_ENTRIES);
|
super(parent, EMPTY_ENTRIES, true);
|
||||||
// Create a dummy that inherits from this to attach so that you cannot retrieve a
|
// Create a dummy that inherits from this to attach so that you cannot retrieve a
|
||||||
// cancellable context from Context.current()
|
// cancellable context from Context.current()
|
||||||
dummy = new Context(this, EMPTY_ENTRIES);
|
dummy = new Context(this, EMPTY_ENTRIES);
|
||||||
|
|
@ -679,11 +698,6 @@ public class Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean canBeCancelled() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isCancelled() {
|
public boolean isCancelled() {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
|
@ -799,4 +813,16 @@ public class Context {
|
||||||
listener.cancelled(Context.this);
|
listener.cancelled(Context.this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ParentListener implements CancellationListener {
|
||||||
|
@Override
|
||||||
|
public void cancelled(Context context) {
|
||||||
|
if (Context.this instanceof CancellableContext) {
|
||||||
|
// Record cancellation with its cause.
|
||||||
|
((CancellableContext) Context.this).cancel(context.cause());
|
||||||
|
} else {
|
||||||
|
notifyAndClearListeners();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.logging.Handler;
|
import java.util.logging.Handler;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
|
|
@ -726,4 +727,29 @@ public class ContextTest {
|
||||||
runnables.add(r);
|
runnables.add(r);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void childContextListenerNotifiedAfterParentListener() {
|
||||||
|
Context.CancellableContext parent = Context.current().withCancellation();
|
||||||
|
Context child = parent.withValue(COLOR, "red");
|
||||||
|
final AtomicBoolean childAfterParent = new AtomicBoolean();
|
||||||
|
final AtomicBoolean parentCalled = new AtomicBoolean();
|
||||||
|
child.addListener(new Context.CancellationListener() {
|
||||||
|
@Override
|
||||||
|
public void cancelled(Context context) {
|
||||||
|
if (parentCalled.get()) {
|
||||||
|
childAfterParent.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
parent.addListener(new Context.CancellationListener() {
|
||||||
|
@Override
|
||||||
|
public void cancelled(Context context) {
|
||||||
|
parentCalled.set(true);
|
||||||
|
}
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
|
parent.cancel(null);
|
||||||
|
assertTrue(parentCalled.get());
|
||||||
|
assertTrue(childAfterParent.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue