core: fix bug RetriableStream cancel() racing with start() (#8386)

There is a bug in the scenario of the following sequence of events:

- `call.start()` 
- received retryable status and about to retry
- The retry attempt Substream is created but not yet `drain()` 
- `call.cancel()`

But `stream.cancel()` cannot be called prior to `stream.start()`, otherwise retry will cause a failure with IllegalStateException. The current RetriableStream code must be fixed to not cancel a stream until `start()` is called in `drain()`.
This commit is contained in:
ZHANG Dapeng 2021-08-05 18:22:37 -07:00 committed by GitHub
parent 9dd0c66929
commit 3668f2e52c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 14 deletions

View File

@ -104,6 +104,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@GuardedBy("lock")
private FutureCanceller scheduledHedging;
private long nextBackoffIntervalNanos;
private Status cancellationStatus;
RetriableStream(
MethodDescriptor<ReqT, ?> method, Metadata headers,
@ -244,14 +245,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
int index = 0;
int chunk = 0x80;
List<BufferEntry> list = null;
boolean streamStarted = false;
while (true) {
State savedState;
synchronized (lock) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
}
if (index == savedState.buffer.size()) { // I'm drained
@ -275,17 +278,22 @@ abstract class RetriableStream<ReqT> implements ClientStream {
for (BufferEntry bufferEntry : list) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me
if (savedState.winningSubstream != null && savedState.winningSubstream != substream
&& streamStarted) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
if (savedState.cancelled && streamStarted) {
checkState(
savedState.winningSubstream == substream,
"substream should be CANCELLED_BECAUSE_COMMITTED already");
substream.stream.cancel(cancellationStatus);
return;
}
bufferEntry.runWith(substream);
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
}
}
@ -299,6 +307,13 @@ abstract class RetriableStream<ReqT> implements ClientStream {
@Nullable
abstract Status prestart();
class StartEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.start(new Sublistener(substream));
}
}
/** Starts the first PRC attempt. */
@Override
public final void start(ClientStreamListener listener) {
@ -311,13 +326,6 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return;
}
class StartEntry implements BufferEntry {
@Override
public void runWith(Substream substream) {
substream.stream.start(new Sublistener(substream));
}
}
synchronized (lock) {
state.buffer.add(new StartEntry());
}
@ -450,11 +458,18 @@ abstract class RetriableStream<ReqT> implements ClientStream {
return;
}
state.winningSubstream.stream.cancel(reason);
Substream winningSubstreamToCancel = null;
synchronized (lock) {
// This is not required, but causes a short-circuit in the draining process.
if (state.drainedSubstreams.contains(state.winningSubstream)) {
winningSubstreamToCancel = state.winningSubstream;
} else { // the winningSubstream will be cancelled while draining
cancellationStatus = reason;
}
state = state.cancelled();
}
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.cancel(reason);
}
}
private void delayOrExecute(BufferEntry bufferEntry) {