diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 1c4452049c..6eee2919c1 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -96,9 +96,10 @@ abstract class RetriableStream implements ClientStream { private long perRpcBufferUsed; private ClientStreamListener masterListener; - private Future scheduledRetry; @GuardedBy("lock") - private ScheduledHedging scheduledHedging; + private FutureCanceller scheduledRetry; + @GuardedBy("lock") + private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; RetriableStream( @@ -134,7 +135,13 @@ abstract class RetriableStream implements ClientStream { // subtract the share of this RPC from channelBufferUsed. channelBufferUsed.addAndGet(-perRpcBufferUsed); - // For hedging only, not needed for normal retry + final Future retryFuture; + if (scheduledRetry != null) { + retryFuture = scheduledRetry.markCancelled(); + scheduledRetry = null; + } else { + retryFuture = null; + } // cancel the scheduled hedging if it is scheduled prior to the commitment final Future hedgingFuture; if (scheduledHedging != null) { @@ -153,6 +160,9 @@ abstract class RetriableStream implements ClientStream { substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); } } + if (retryFuture != null) { + retryFuture.cancel(false); + } if (hedgingFuture != null) { hedgingFuture.cancel(false); } @@ -304,13 +314,13 @@ abstract class RetriableStream implements ClientStream { isHedging = true; retryPolicy = RetryPolicy.DEFAULT; - ScheduledHedging scheduledHedgingRef = null; + FutureCanceller scheduledHedgingRef = null; synchronized (lock) { state = state.addActiveHedge(substream); if (hasPotentialHedging(state) && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = scheduledHedgingRef = new ScheduledHedging(lock); + scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); } } @@ -336,7 +346,7 @@ abstract class RetriableStream implements ClientStream { } // Cancels the current scheduledHedging and reschedules a new one. - ScheduledHedging future; + FutureCanceller future; Future futureToBeCancelled; synchronized (lock) { @@ -345,7 +355,7 @@ abstract class RetriableStream implements ClientStream { } futureToBeCancelled = scheduledHedging.markCancelled(); - scheduledHedging = future = new ScheduledHedging(lock); + scheduledHedging = future = new FutureCanceller(lock); } if (futureToBeCancelled != null) { @@ -357,11 +367,11 @@ abstract class RetriableStream implements ClientStream { private final class HedgingRunnable implements Runnable { - // Need to hold a ref to the ScheduledHedging in case RetriableStrea.scheduledHedging is renewed + // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed // by a positive push-back just after newSubstream is instantiated, so that we can double check. - final ScheduledHedging scheduledHedgingRef; + final FutureCanceller scheduledHedgingRef; - HedgingRunnable(ScheduledHedging scheduledHedging) { + HedgingRunnable(FutureCanceller scheduledHedging) { scheduledHedgingRef = scheduledHedging; } @@ -377,7 +387,7 @@ abstract class RetriableStream implements ClientStream { // change. Substream newSubstream = createSubstream(state.hedgingAttemptCount); boolean cancelled = false; - ScheduledHedging future = null; + FutureCanceller future = null; synchronized (lock) { if (scheduledHedgingRef.isCancelled()) { @@ -386,7 +396,7 @@ abstract class RetriableStream implements ClientStream { state = state.addActiveHedge(newSubstream); if (hasPotentialHedging(state) && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = future = new ScheduledHedging(lock); + scheduledHedging = future = new FutureCanceller(lock); } else { state = state.freezeHedging(); scheduledHedging = null; @@ -418,11 +428,6 @@ abstract class RetriableStream implements ClientStream { Runnable runnable = commit(noopSubstream); if (runnable != null) { - Future savedScheduledRetry = scheduledRetry; - if (savedScheduledRetry != null) { - savedScheduledRetry.cancel(false); - scheduledRetry = null; - } masterListener.closed(reason, new Metadata()); runnable.run(); return; @@ -775,11 +780,14 @@ abstract class RetriableStream implements ClientStream { if (retryPlan.shouldRetry) { // The check state.winningSubstream == null, checking if is not already committed, is // racy, but is still safe b/c the retry will also handle committed/cancellation - scheduledRetry = scheduledExecutorService.schedule( + FutureCanceller scheduledRetryCopy; + synchronized (lock) { + scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); + } + scheduledRetryCopy.setFuture(scheduledExecutorService.schedule( new Runnable() { @Override public void run() { - scheduledRetry = null; callExecutor.execute(new Runnable() { @Override public void run() { @@ -792,7 +800,7 @@ abstract class RetriableStream implements ClientStream { } }, retryPlan.backoffNanos, - TimeUnit.NANOSECONDS); + TimeUnit.NANOSECONDS)); return; } isFatal = retryPlan.isFatal; @@ -1307,7 +1315,8 @@ abstract class RetriableStream implements ClientStream { } } - private static final class ScheduledHedging { + /** Allows cancelling a Future without racing with setting the future. */ + private static final class FutureCanceller { final Object lock; @GuardedBy("lock") @@ -1315,7 +1324,7 @@ abstract class RetriableStream implements ClientStream { @GuardedBy("lock") boolean cancelled; - ScheduledHedging(Object lock) { + FutureCanceller(Object lock) { this.lock = lock; }