core: Fix racy creation/set of the retry Future

This fixes two races: a data race where scheduledRetry is accessed
by cancel() and confusion where scheduledRetry could be set to null by
the schedule()d runnable before it is set by the schedule() return
value.

Although it seems these races can't actually cause problems due to other
conditions/constraints, it's hard to reason about. So let's plug these
preemptively, even if we can't add tests that trigger them.

ScheduledHedging was not specific to hedging, so can now be reused for
this retry case. It was renamed to avoid being misleading.
This commit is contained in:
Eric Anderson 2019-01-31 14:57:45 -08:00
parent d0ecc08705
commit c75d9bc19f
1 changed files with 31 additions and 22 deletions

View File

@ -96,9 +96,10 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private long perRpcBufferUsed;
private ClientStreamListener masterListener;
private Future<?> scheduledRetry;
@GuardedBy("lock")
private ScheduledHedging scheduledHedging;
private FutureCanceller scheduledRetry;
@GuardedBy("lock")
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
RetriableStream(
@ -134,7 +135,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// subtract the share of this RPC from channelBufferUsed.
channelBufferUsed.addAndGet(-perRpcBufferUsed);
// For hedging only, not needed for normal retry
final Future<?> retryFuture;
if (scheduledRetry != null) {
retryFuture = scheduledRetry.markCancelled();
scheduledRetry = null;
} else {
retryFuture = null;
}
// cancel the scheduled hedging if it is scheduled prior to the commitment
final Future<?> hedgingFuture;
if (scheduledHedging != null) {
@ -153,6 +160,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED);
}
}
if (retryFuture != null) {
retryFuture.cancel(false);
}
if (hedgingFuture != null) {
hedgingFuture.cancel(false);
}
@ -304,13 +314,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
isHedging = true;
retryPolicy = RetryPolicy.DEFAULT;
ScheduledHedging scheduledHedgingRef = null;
FutureCanceller scheduledHedgingRef = null;
synchronized (lock) {
state = state.addActiveHedge(substream);
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = scheduledHedgingRef = new ScheduledHedging(lock);
scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
}
}
@ -336,7 +346,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
// Cancels the current scheduledHedging and reschedules a new one.
ScheduledHedging future;
FutureCanceller future;
Future<?> futureToBeCancelled;
synchronized (lock) {
@ -345,7 +355,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
futureToBeCancelled = scheduledHedging.markCancelled();
scheduledHedging = future = new ScheduledHedging(lock);
scheduledHedging = future = new FutureCanceller(lock);
}
if (futureToBeCancelled != null) {
@ -357,11 +367,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
private final class HedgingRunnable implements Runnable {
// Need to hold a ref to the ScheduledHedging in case RetriableStrea.scheduledHedging is renewed
// Need to hold a ref to the FutureCanceller in case RetriableStrea.scheduledHedging is renewed
// by a positive push-back just after newSubstream is instantiated, so that we can double check.
final ScheduledHedging scheduledHedgingRef;
final FutureCanceller scheduledHedgingRef;
HedgingRunnable(ScheduledHedging scheduledHedging) {
HedgingRunnable(FutureCanceller scheduledHedging) {
scheduledHedgingRef = scheduledHedging;
}
@ -377,7 +387,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// change.
Substream newSubstream = createSubstream(state.hedgingAttemptCount);
boolean cancelled = false;
ScheduledHedging future = null;
FutureCanceller future = null;
synchronized (lock) {
if (scheduledHedgingRef.isCancelled()) {
@ -386,7 +396,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
state = state.addActiveHedge(newSubstream);
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = future = new ScheduledHedging(lock);
scheduledHedging = future = new FutureCanceller(lock);
} else {
state = state.freezeHedging();
scheduledHedging = null;
@ -418,11 +428,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
Runnable runnable = commit(noopSubstream);
if (runnable != null) {
Future<?> savedScheduledRetry = scheduledRetry;
if (savedScheduledRetry != null) {
savedScheduledRetry.cancel(false);
scheduledRetry = null;
}
masterListener.closed(reason, new Metadata());
runnable.run();
return;
@ -775,11 +780,14 @@ abstract class RetriableStream<ReqT> implements ClientStream {
if (retryPlan.shouldRetry) {
// The check state.winningSubstream == null, checking if is not already committed, is
// racy, but is still safe b/c the retry will also handle committed/cancellation
scheduledRetry = scheduledExecutorService.schedule(
FutureCanceller scheduledRetryCopy;
synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
}
scheduledRetryCopy.setFuture(scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
scheduledRetry = null;
callExecutor.execute(new Runnable() {
@Override
public void run() {
@ -792,7 +800,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
},
retryPlan.backoffNanos,
TimeUnit.NANOSECONDS);
TimeUnit.NANOSECONDS));
return;
}
isFatal = retryPlan.isFatal;
@ -1307,7 +1315,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
}
}
private static final class ScheduledHedging {
/** Allows cancelling a Future without racing with setting the future. */
private static final class FutureCanceller {
final Object lock;
@GuardedBy("lock")
@ -1315,7 +1324,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@GuardedBy("lock")
boolean cancelled;
ScheduledHedging(Object lock) {
FutureCanceller(Object lock) {
this.lock = lock;
}