From c75d9bc19fa83d2a925f05d04a8a4c12378466ff Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 31 Jan 2019 14:57:45 -0800 Subject: [PATCH] core: Fix racy creation/set of the retry Future This fixes two races: a data race where scheduledRetry is accessed by cancel() and confusion where scheduledRetry could be set to null by the schedule()d runnable before it is set by the schedule() return value. Although it seems these races can't actually cause problems due to other conditions/constraints, it's hard to reason about. So let's plug these preemptively, even if we can't add tests that trigger them. ScheduledHedging was not specific to hedging, so can now be reused for this retry case. It was renamed to avoid being misleading. --- .../io/grpc/internal/RetriableStream.java | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 1c4452049c..6eee2919c1 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -96,9 +96,10 @@ abstract class RetriableStream implements ClientStream { private long perRpcBufferUsed; private ClientStreamListener masterListener; - private Future scheduledRetry; @GuardedBy("lock") - private ScheduledHedging scheduledHedging; + private FutureCanceller scheduledRetry; + @GuardedBy("lock") + private FutureCanceller scheduledHedging; private long nextBackoffIntervalNanos; RetriableStream( @@ -134,7 +135,13 @@ abstract class RetriableStream implements ClientStream { // subtract the share of this RPC from channelBufferUsed. channelBufferUsed.addAndGet(-perRpcBufferUsed); - // For hedging only, not needed for normal retry + final Future retryFuture; + if (scheduledRetry != null) { + retryFuture = scheduledRetry.markCancelled(); + scheduledRetry = null; + } else { + retryFuture = null; + } // cancel the scheduled hedging if it is scheduled prior to the commitment final Future hedgingFuture; if (scheduledHedging != null) { @@ -153,6 +160,9 @@ abstract class RetriableStream implements ClientStream { substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); } } + if (retryFuture != null) { + retryFuture.cancel(false); + } if (hedgingFuture != null) { hedgingFuture.cancel(false); } @@ -304,13 +314,13 @@ abstract class RetriableStream implements ClientStream { isHedging = true; retryPolicy = RetryPolicy.DEFAULT; - ScheduledHedging scheduledHedgingRef = null; + FutureCanceller scheduledHedgingRef = null; synchronized (lock) { state = state.addActiveHedge(substream); if (hasPotentialHedging(state) && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = scheduledHedgingRef = new ScheduledHedging(lock); + scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); } } @@ -336,7 +346,7 @@ abstract class RetriableStream implements ClientStream { } // Cancels the current scheduledHedging and reschedules a new one. - ScheduledHedging future; + FutureCanceller future; Future futureToBeCancelled; synchronized (lock) { @@ -345,7 +355,7 @@ abstract class RetriableStream implements ClientStream { } futureToBeCancelled = scheduledHedging.markCancelled(); - scheduledHedging = future = new ScheduledHedging(lock); + scheduledHedging = future = new FutureCanceller(lock); } if (futureToBeCancelled != null) { @@ -357,11 +367,11 @@ abstract class RetriableStream implements ClientStream { private final class HedgingRunnable implements Runnable { - // Need to hold a ref to the ScheduledHedging in case RetriableStrea.scheduledHedging is renewed + // Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed // by a positive push-back just after newSubstream is instantiated, so that we can double check. - final ScheduledHedging scheduledHedgingRef; + final FutureCanceller scheduledHedgingRef; - HedgingRunnable(ScheduledHedging scheduledHedging) { + HedgingRunnable(FutureCanceller scheduledHedging) { scheduledHedgingRef = scheduledHedging; } @@ -377,7 +387,7 @@ abstract class RetriableStream implements ClientStream { // change. Substream newSubstream = createSubstream(state.hedgingAttemptCount); boolean cancelled = false; - ScheduledHedging future = null; + FutureCanceller future = null; synchronized (lock) { if (scheduledHedgingRef.isCancelled()) { @@ -386,7 +396,7 @@ abstract class RetriableStream implements ClientStream { state = state.addActiveHedge(newSubstream); if (hasPotentialHedging(state) && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = future = new ScheduledHedging(lock); + scheduledHedging = future = new FutureCanceller(lock); } else { state = state.freezeHedging(); scheduledHedging = null; @@ -418,11 +428,6 @@ abstract class RetriableStream implements ClientStream { Runnable runnable = commit(noopSubstream); if (runnable != null) { - Future savedScheduledRetry = scheduledRetry; - if (savedScheduledRetry != null) { - savedScheduledRetry.cancel(false); - scheduledRetry = null; - } masterListener.closed(reason, new Metadata()); runnable.run(); return; @@ -775,11 +780,14 @@ abstract class RetriableStream implements ClientStream { if (retryPlan.shouldRetry) { // The check state.winningSubstream == null, checking if is not already committed, is // racy, but is still safe b/c the retry will also handle committed/cancellation - scheduledRetry = scheduledExecutorService.schedule( + FutureCanceller scheduledRetryCopy; + synchronized (lock) { + scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); + } + scheduledRetryCopy.setFuture(scheduledExecutorService.schedule( new Runnable() { @Override public void run() { - scheduledRetry = null; callExecutor.execute(new Runnable() { @Override public void run() { @@ -792,7 +800,7 @@ abstract class RetriableStream implements ClientStream { } }, retryPlan.backoffNanos, - TimeUnit.NANOSECONDS); + TimeUnit.NANOSECONDS)); return; } isFatal = retryPlan.isFatal; @@ -1307,7 +1315,8 @@ abstract class RetriableStream implements ClientStream { } } - private static final class ScheduledHedging { + /** Allows cancelling a Future without racing with setting the future. */ + private static final class FutureCanceller { final Object lock; @GuardedBy("lock") @@ -1315,7 +1324,7 @@ abstract class RetriableStream implements ClientStream { @GuardedBy("lock") boolean cancelled; - ScheduledHedging(Object lock) { + FutureCanceller(Object lock) { this.lock = lock; }