Fix retry race condition that can lead to double decrementing inFlightSubStreams and so miss calling closed (#11026)

This commit is contained in:
Larry Safran 2024-03-22 17:01:58 +00:00 committed by GitHub
parent b3ffb5078d
commit bdb623031f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 17 deletions

View File

@ -149,11 +149,10 @@ abstract class RetriableStream<ReqT> implements ClientStream {
this.throttle = throttle; this.throttle = throttle;
} }
@SuppressWarnings("GuardedBy") @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
@Nullable // null if already committed @Nullable // null if already committed
@CheckReturnValue @CheckReturnValue
private Runnable commit(final Substream winningSubstream) { private Runnable commit(final Substream winningSubstream) {
synchronized (lock) { synchronized (lock) {
if (state.winningSubstream != null) { if (state.winningSubstream != null) {
return null; return null;
@ -165,10 +164,9 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// subtract the share of this RPC from channelBufferUsed. // subtract the share of this RPC from channelBufferUsed.
channelBufferUsed.addAndGet(-perRpcBufferUsed); channelBufferUsed.addAndGet(-perRpcBufferUsed);
final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
final Future<?> retryFuture; final Future<?> retryFuture;
if (scheduledRetry != null) { if (scheduledRetry != null) {
// TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
// found: 'this.lock'
retryFuture = scheduledRetry.markCancelled(); retryFuture = scheduledRetry.markCancelled();
scheduledRetry = null; scheduledRetry = null;
} else { } else {
@ -177,8 +175,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// cancel the scheduled hedging if it is scheduled prior to the commitment // cancel the scheduled hedging if it is scheduled prior to the commitment
final Future<?> hedgingFuture; final Future<?> hedgingFuture;
if (scheduledHedging != null) { if (scheduledHedging != null) {
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
// found: 'this.lock'
hedgingFuture = scheduledHedging.markCancelled(); hedgingFuture = scheduledHedging.markCancelled();
scheduledHedging = null; scheduledHedging = null;
} else { } else {
@ -196,7 +192,21 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
if (retryFuture != null) { if (retryFuture != null) {
retryFuture.cancel(false); retryFuture.cancel(false);
if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
assert savedCloseMasterListenerReason != null;
listenerSerializeExecutor.execute(
new Runnable() {
@Override
public void run() {
isClosed = true;
masterListener.closed(savedCloseMasterListenerReason.status,
savedCloseMasterListenerReason.progress,
savedCloseMasterListenerReason.metadata);
} }
});
}
}
if (hedgingFuture != null) { if (hedgingFuture != null) {
hedgingFuture.cancel(false); hedgingFuture.cancel(false);
} }
@ -415,7 +425,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
drain(substream); drain(substream);
} }
@SuppressWarnings("GuardedBy") @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
private void pushbackHedging(@Nullable Integer delayMillis) { private void pushbackHedging(@Nullable Integer delayMillis) {
if (delayMillis == null) { if (delayMillis == null) {
return; return;
@ -434,8 +444,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return; return;
} }
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
// found: 'this.lock'
futureToBeCancelled = scheduledHedging.markCancelled(); futureToBeCancelled = scheduledHedging.markCancelled();
scheduledHedging = future = new FutureCanceller(lock); scheduledHedging = future = new FutureCanceller(lock);
} }
@ -469,16 +477,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
callExecutor.execute( callExecutor.execute(
new Runnable() { new Runnable() {
@SuppressWarnings("GuardedBy") @SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
@Override @Override
public void run() { public void run() {
boolean cancelled = false; boolean cancelled = false;
FutureCanceller future = null; FutureCanceller future = null;
synchronized (lock) { synchronized (lock) {
// TODO(b/145386688): This access should be guarded by
// 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
// 'RetriableStream.this.lock'
if (scheduledHedgingRef.isCancelled()) { if (scheduledHedgingRef.isCancelled()) {
cancelled = true; cancelled = true;
} else { } else {
@ -810,13 +815,11 @@ abstract class RetriableStream<ReqT> implements ClientStream {
&& !state.hedgingFrozen; && !state.hedgingFrozen;
} }
@SuppressWarnings("GuardedBy") @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
private void freezeHedging() { private void freezeHedging() {
Future<?> futureToBeCancelled = null; Future<?> futureToBeCancelled = null;
synchronized (lock) { synchronized (lock) {
if (scheduledHedging != null) { if (scheduledHedging != null) {
// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
// found: 'this.lock'
futureToBeCancelled = scheduledHedging.markCancelled(); futureToBeCancelled = scheduledHedging.markCancelled();
scheduledHedging = null; scheduledHedging = null;
} }
@ -999,9 +1002,19 @@ abstract class RetriableStream<ReqT> implements ClientStream {
synchronized (lock) { synchronized (lock) {
scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock); scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
} }
class RetryBackoffRunnable implements Runnable { class RetryBackoffRunnable implements Runnable {
@Override @Override
@SuppressWarnings("FutureReturnValueIgnored")
public void run() { public void run() {
synchronized (scheduledRetryCopy.lock) {
if (scheduledRetryCopy.isCancelled()) {
return;
} else {
scheduledRetryCopy.markCancelled();
}
}
callExecutor.execute( callExecutor.execute(
new Runnable() { new Runnable() {
@Override @Override
@ -1563,11 +1576,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
} }
void setFuture(Future<?> future) { void setFuture(Future<?> future) {
boolean wasCancelled;
synchronized (lock) { synchronized (lock) {
if (!cancelled) { wasCancelled = cancelled;
if (!wasCancelled) {
this.future = future; this.future = future;
} }
} }
if (wasCancelled) {
future.cancel(false);
}
} }
@GuardedBy("lock") @GuardedBy("lock")

View File

@ -705,6 +705,7 @@ public class RetriableStreamTest {
// cancel // cancel
retriableStream.cancel(Status.CANCELLED); retriableStream.cancel(Status.CANCELLED);
inOrder.verify(retriableStreamRecorder, never()).postCommit(); inOrder.verify(retriableStreamRecorder, never()).postCommit();
verify(masterListener, times(1)).closed(any(), any(), any());
} }
@Test @Test
@ -733,6 +734,7 @@ public class RetriableStreamTest {
verifyNoMoreInteractions(mockStream1); verifyNoMoreInteractions(mockStream1);
verifyNoMoreInteractions(mockStream2); verifyNoMoreInteractions(mockStream2);
verify(masterListener, times(1)).closed(any(), any(), any());
} }
@Test @Test