core: fix retriablestream deadlock (#10386)

This commit is contained in:
yifeizhuang 2023-07-21 13:48:37 -07:00 committed by GitHub
parent afa4d6dac8
commit e179212672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 4 deletions

View File

@ -212,13 +212,23 @@ abstract class RetriableStream<ReqT> implements ClientStream {
abstract void postCommit();
/**
* Calls commit() and if successful runs the post commit task.
* Calls commit() and if successful runs the post commit task. Post commit task will be non-null
* for only once. The post commit task cancels other non-winning streams on separate transport
* threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream
* transports.(issues/10314)
* This method should be called only in subListener callbacks. This guarantees callExecutor
* schedules tasks before master listener closes, which is protected by the inFlightSubStreams
* decorative. That is because:
* For a successful winning stream, other streams won't attempt to close master listener.
* For a cancelled winning stream (noop), other stream won't attempt to close master listener.
* For a failed/closed winning stream, the last closed stream closes the master listener, and
* callExecutor scheduling happens-before that.
*/
private void commitAndRun(Substream winningSubstream) {
Runnable postCommitTask = commit(winningSubstream);
if (postCommitTask != null) {
postCommitTask.run();
callExecutor.execute(postCommitTask);
}
}

View File

@ -65,10 +65,12 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Test;
@ -190,7 +192,7 @@ public class RetriableStreamTest {
private RetriableStream<String> retriableStream =
newThrottledRetriableStream(null /* throttle */);
private final RetriableStream<String> hedgingStream =
private RetriableStream<String> hedgingStream =
newThrottledHedgingStream(null /* throttle */);
private ClientStreamTracer bufferSizeTracer;
@ -206,9 +208,13 @@ public class RetriableStreamTest {
}
private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
return newThrottledHedgingStream(throttle, MoreExecutors.directExecutor());
}
private RetriableStream<String> newThrottledHedgingStream(Throttle throttle, Executor executor) {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
executor, fakeClock.getScheduledExecutorService(),
null, HEDGING_POLICY, throttle);
}
@ -2482,6 +2488,121 @@ public class RetriableStreamTest {
inOrder.verifyNoMoreInteractions();
}
// This is for hedging deadlock when multiple in-flight streams races when transports call back,
// particularly for OkHttp:
// e.g. stream1 subListener gets closed() and in turn creates another stream. This ends up with
// transport1 thread lock is held while waiting for transport2 lock for creating a new stream.
// Stream2 subListener gets headersRead() and then try to commit and cancel all other drained
// streams, including the ones that is on transport1. This causes transport2 thread lock held
// while waiting for transport1 (cancel stream requires lock). Thus deadlock.
// Deadlock could also happen when two streams both gets headerRead() at the same time.
// It is believed that retry does not have the issue because streams are created sequentially.
@Test(timeout = 15000)
public void hedging_deadlock() throws Exception {
ClientStream mockStream1 = mock(ClientStream.class); //on transport1
ClientStream mockStream2 = mock(ClientStream.class); //on transport2
ClientStream mockStream3 = mock(ClientStream.class); //on transport2
ClientStream mockStream4 = mock(ClientStream.class); //on transport1
ReentrantLock transport1Lock = new ReentrantLock();
ReentrantLock transport2Lock = new ReentrantLock();
InOrder inOrder = inOrder(
mockStream1, mockStream2, mockStream3,
retriableStreamRecorder, masterListener);
when(retriableStreamRecorder.newSubstream(anyInt()))
.thenReturn(mockStream1)
.thenReturn(mockStream2)
.thenReturn(mockStream3)
.thenAnswer(new Answer<ClientStream>() {
@Override
public ClientStream answer(InvocationOnMock invocation) throws Throwable {
transport1Lock.lock();
return mockStream4;
}
});
hedgingStream = newThrottledHedgingStream(null, fakeClock.getScheduledExecutorService());
hedgingStream.start(masterListener);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream1).start(sublistenerCaptor1.capture());
inOrder.verify(mockStream1).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream2).start(sublistenerCaptor2.capture());
inOrder.verify(mockStream2).isReady();
inOrder.verifyNoMoreInteractions();
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
assertEquals(1, fakeClock.numPendingTasks());
ArgumentCaptor<ClientStreamListener> sublistenerCaptor3 =
ArgumentCaptor.forClass(ClientStreamListener.class);
inOrder.verify(mockStream3).start(sublistenerCaptor3.capture());
inOrder.verify(mockStream3).isReady();
inOrder.verifyNoMoreInteractions();
doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream3).cancel(any(Status.class));
doAnswer(new Answer<Void>() {
@Override
@SuppressWarnings("LockNotBeforeTry")
public Void answer(InvocationOnMock invocation) throws Throwable {
transport2Lock.lock();
transport2Lock.unlock();
return null;
}
}).when(mockStream2).cancel(any(Status.class));
CountDownLatch latch = new CountDownLatch(1);
Thread transport1Activity = new Thread(new Runnable() {
@Override
public void run() {
transport1Lock.lock();
try {
sublistenerCaptor1.getValue().headersRead(new Metadata());
latch.countDown();
} finally {
transport1Lock.unlock();
}
}
}, "Thread-transport1");
transport1Activity.start();
Thread transport2Activity = new Thread(new Runnable() {
@Override
public void run() {
transport2Lock.lock();
try {
sublistenerCaptor2.getValue()
.closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), REFUSED, new Metadata());
} finally {
transport2Lock.unlock();
if (transport1Lock.tryLock()) {
transport1Lock.unlock();
}
}
}
}, "Thread-transport2");
transport2Activity.start();
latch.await();
fakeClock.runDueTasks();
transport2Activity.join();
transport1Activity.join();
}
@Test
public void hedging_pushback_positive() {
ClientStream mockStream1 = mock(ClientStream.class);