From bb3365731fa3cd6c082e6346b81d0ea14646ffc1 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Fri, 11 Feb 2022 15:08:11 -0800 Subject: [PATCH] stub: Have ClientCalls.ThreadlessExecutor reject Runnables after end of RPC (#8847) Changes originally proposed as part of #7106. Fixes #3557 Co-authored-by: Nick Hill --- .../main/java/io/grpc/stub/ClientCalls.java | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 04ed83f083..84417ee7eb 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.LockSupport; import java.util.logging.Level; import java.util.logging.Logger; @@ -161,6 +162,7 @@ public final class ClientCalls { // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { + executor.shutdown(); if (interrupt) { Thread.currentThread().interrupt(); } @@ -626,6 +628,9 @@ public final class ClientCalls { // Now wait for onClose() to be called, so interceptors can clean up } } + if (next == this || next instanceof StatusRuntimeException) { + threadless.shutdown(); + } return next; } } finally { @@ -712,7 +717,10 @@ public final class ClientCalls { implements Executor { private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName()); - private volatile Thread waiter; + private static final Object SHUTDOWN = new Object(); // sentinel + + // Set to the calling thread while it's parked, SHUTDOWN on RPC completion + private volatile Object waiter; // Non private to avoid synthetic class ThreadlessExecutor() {} @@ -736,14 +744,29 @@ public final class ClientCalls { } } do { - try { - runnable.run(); - } catch (Throwable t) { - log.log(Level.WARNING, "Runnable threw exception", t); - } + runQuietly(runnable); } while ((runnable = poll()) != null); } + /** + * Called after final call to {@link #waitAndDrain()}, from same thread. + */ + public void shutdown() { + waiter = SHUTDOWN; + Runnable runnable; + while ((runnable = poll()) != null) { + runQuietly(runnable); + } + } + + private static void runQuietly(Runnable runnable) { + try { + runnable.run(); + } catch (Throwable t) { + log.log(Level.WARNING, "Runnable threw exception", t); + } + } + private static void throwIfInterrupted() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); @@ -753,7 +776,12 @@ public final class ClientCalls { @Override public void execute(Runnable runnable) { add(runnable); - LockSupport.unpark(waiter); // no-op if null + Object waiter = this.waiter; + if (waiter != SHUTDOWN) { + LockSupport.unpark((Thread) waiter); // no-op if null + } else if (remove(runnable)) { + throw new RejectedExecutionException(); + } } }