diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index 550d0e1c56..f301eee1f9 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -212,13 +212,23 @@ abstract class RetriableStream implements ClientStream { abstract void postCommit(); /** - * Calls commit() and if successful runs the post commit task. + * Calls commit() and if successful runs the post commit task. Post commit task will be non-null + * for only once. The post commit task cancels other non-winning streams on separate transport + * threads, thus it must be run on the callExecutor to prevent deadlocks between multiple stream + * transports.(issues/10314) + * This method should be called only in subListener callbacks. This guarantees callExecutor + * schedules tasks before master listener closes, which is protected by the inFlightSubStreams + * decorative. That is because: + * For a successful winning stream, other streams won't attempt to close master listener. + * For a cancelled winning stream (noop), other stream won't attempt to close master listener. + * For a failed/closed winning stream, the last closed stream closes the master listener, and + * callExecutor scheduling happens-before that. */ private void commitAndRun(Substream winningSubstream) { Runnable postCommitTask = commit(winningSubstream); if (postCommitTask != null) { - postCommitTask.run(); + callExecutor.execute(postCommitTask); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index dc3e881717..3487ef02b4 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -65,10 +65,12 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; import org.junit.After; import org.junit.Test; @@ -190,7 +192,7 @@ public class RetriableStreamTest { private RetriableStream retriableStream = newThrottledRetriableStream(null /* throttle */); - private final RetriableStream hedgingStream = + private RetriableStream hedgingStream = newThrottledHedgingStream(null /* throttle */); private ClientStreamTracer bufferSizeTracer; @@ -206,9 +208,13 @@ public class RetriableStreamTest { } private RetriableStream newThrottledHedgingStream(Throttle throttle) { + return newThrottledHedgingStream(throttle, MoreExecutors.directExecutor()); + } + + private RetriableStream newThrottledHedgingStream(Throttle throttle, Executor executor) { return new RecordedRetriableStream( method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, - MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), + executor, fakeClock.getScheduledExecutorService(), null, HEDGING_POLICY, throttle); } @@ -2482,6 +2488,121 @@ public class RetriableStreamTest { inOrder.verifyNoMoreInteractions(); } + // This is for hedging deadlock when multiple in-flight streams races when transports call back, + // particularly for OkHttp: + // e.g. stream1 subListener gets closed() and in turn creates another stream. This ends up with + // transport1 thread lock is held while waiting for transport2 lock for creating a new stream. + // Stream2 subListener gets headersRead() and then try to commit and cancel all other drained + // streams, including the ones that is on transport1. This causes transport2 thread lock held + // while waiting for transport1 (cancel stream requires lock). Thus deadlock. + // Deadlock could also happen when two streams both gets headerRead() at the same time. + // It is believed that retry does not have the issue because streams are created sequentially. + @Test(timeout = 15000) + public void hedging_deadlock() throws Exception { + ClientStream mockStream1 = mock(ClientStream.class); //on transport1 + ClientStream mockStream2 = mock(ClientStream.class); //on transport2 + ClientStream mockStream3 = mock(ClientStream.class); //on transport2 + ClientStream mockStream4 = mock(ClientStream.class); //on transport1 + + ReentrantLock transport1Lock = new ReentrantLock(); + ReentrantLock transport2Lock = new ReentrantLock(); + InOrder inOrder = inOrder( + mockStream1, mockStream2, mockStream3, + retriableStreamRecorder, masterListener); + when(retriableStreamRecorder.newSubstream(anyInt())) + .thenReturn(mockStream1) + .thenReturn(mockStream2) + .thenReturn(mockStream3) + .thenAnswer(new Answer() { + + @Override + public ClientStream answer(InvocationOnMock invocation) throws Throwable { + transport1Lock.lock(); + return mockStream4; + } + }); + + hedgingStream = newThrottledHedgingStream(null, fakeClock.getScheduledExecutorService()); + hedgingStream.start(masterListener); + assertEquals(1, fakeClock.numPendingTasks()); + ArgumentCaptor sublistenerCaptor1 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream1).start(sublistenerCaptor1.capture()); + inOrder.verify(mockStream1).isReady(); + inOrder.verifyNoMoreInteractions(); + + fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); + assertEquals(1, fakeClock.numPendingTasks()); + ArgumentCaptor sublistenerCaptor2 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); + inOrder.verify(mockStream2).isReady(); + inOrder.verifyNoMoreInteractions(); + + fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS); + assertEquals(1, fakeClock.numPendingTasks()); + ArgumentCaptor sublistenerCaptor3 = + ArgumentCaptor.forClass(ClientStreamListener.class); + inOrder.verify(mockStream3).start(sublistenerCaptor3.capture()); + inOrder.verify(mockStream3).isReady(); + inOrder.verifyNoMoreInteractions(); + + doAnswer(new Answer() { + @Override + @SuppressWarnings("LockNotBeforeTry") + public Void answer(InvocationOnMock invocation) throws Throwable { + transport2Lock.lock(); + transport2Lock.unlock(); + return null; + } + }).when(mockStream3).cancel(any(Status.class)); + + doAnswer(new Answer() { + @Override + @SuppressWarnings("LockNotBeforeTry") + public Void answer(InvocationOnMock invocation) throws Throwable { + transport2Lock.lock(); + transport2Lock.unlock(); + return null; + } + }).when(mockStream2).cancel(any(Status.class)); + + CountDownLatch latch = new CountDownLatch(1); + Thread transport1Activity = new Thread(new Runnable() { + @Override + public void run() { + transport1Lock.lock(); + try { + sublistenerCaptor1.getValue().headersRead(new Metadata()); + latch.countDown(); + } finally { + transport1Lock.unlock(); + } + } + }, "Thread-transport1"); + transport1Activity.start(); + Thread transport2Activity = new Thread(new Runnable() { + @Override + public void run() { + transport2Lock.lock(); + try { + sublistenerCaptor2.getValue() + .closed(Status.fromCode(NON_FATAL_STATUS_CODE_1), REFUSED, new Metadata()); + } finally { + transport2Lock.unlock(); + if (transport1Lock.tryLock()) { + transport1Lock.unlock(); + } + } + } + }, "Thread-transport2"); + transport2Activity.start(); + latch.await(); + fakeClock.runDueTasks(); + transport2Activity.join(); + transport1Activity.join(); + } + @Test public void hedging_pushback_positive() { ClientStream mockStream1 = mock(ClientStream.class);