mirror of https://github.com/grpc/grpc-java.git
stub: Have ClientCalls.ThreadlessExecutor reject Runnables after end of RPC (#8847)
Changes originally proposed as part of #7106. Fixes #3557 Co-authored-by: Nick Hill <nickhill@us.ibm.com>
This commit is contained in:
parent
7eeb411b1f
commit
bb3365731f
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
@ -161,6 +162,7 @@ public final class ClientCalls {
|
||||||
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
|
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
|
||||||
throw cancelThrow(call, e);
|
throw cancelThrow(call, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
if (interrupt) {
|
if (interrupt) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
|
@ -626,6 +628,9 @@ public final class ClientCalls {
|
||||||
// Now wait for onClose() to be called, so interceptors can clean up
|
// Now wait for onClose() to be called, so interceptors can clean up
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (next == this || next instanceof StatusRuntimeException) {
|
||||||
|
threadless.shutdown();
|
||||||
|
}
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
@ -712,7 +717,10 @@ public final class ClientCalls {
|
||||||
implements Executor {
|
implements Executor {
|
||||||
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
|
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
|
||||||
|
|
||||||
private volatile Thread waiter;
|
private static final Object SHUTDOWN = new Object(); // sentinel
|
||||||
|
|
||||||
|
// Set to the calling thread while it's parked, SHUTDOWN on RPC completion
|
||||||
|
private volatile Object waiter;
|
||||||
|
|
||||||
// Non private to avoid synthetic class
|
// Non private to avoid synthetic class
|
||||||
ThreadlessExecutor() {}
|
ThreadlessExecutor() {}
|
||||||
|
|
@ -736,14 +744,29 @@ public final class ClientCalls {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
do {
|
do {
|
||||||
try {
|
runQuietly(runnable);
|
||||||
runnable.run();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
log.log(Level.WARNING, "Runnable threw exception", t);
|
|
||||||
}
|
|
||||||
} while ((runnable = poll()) != null);
|
} while ((runnable = poll()) != null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after final call to {@link #waitAndDrain()}, from same thread.
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
waiter = SHUTDOWN;
|
||||||
|
Runnable runnable;
|
||||||
|
while ((runnable = poll()) != null) {
|
||||||
|
runQuietly(runnable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void runQuietly(Runnable runnable) {
|
||||||
|
try {
|
||||||
|
runnable.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.log(Level.WARNING, "Runnable threw exception", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void throwIfInterrupted() throws InterruptedException {
|
private static void throwIfInterrupted() throws InterruptedException {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
|
|
@ -753,7 +776,12 @@ public final class ClientCalls {
|
||||||
@Override
|
@Override
|
||||||
public void execute(Runnable runnable) {
|
public void execute(Runnable runnable) {
|
||||||
add(runnable);
|
add(runnable);
|
||||||
LockSupport.unpark(waiter); // no-op if null
|
Object waiter = this.waiter;
|
||||||
|
if (waiter != SHUTDOWN) {
|
||||||
|
LockSupport.unpark((Thread) waiter); // no-op if null
|
||||||
|
} else if (remove(runnable)) {
|
||||||
|
throw new RejectedExecutionException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue