diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index dfabfeeffb..5c841189d5 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -322,7 +322,11 @@ class DelayedClientTransport implements ManagedClientTransport { */ void startBackoff(final Status status) { synchronized (lock) { - Preconditions.checkState(backoffStatus == null); + if (shutdown) { + return; + } + Preconditions.checkState(backoffStatus == null, + "Error when calling startBackoff: transport is already in backoff period"); backoffStatus = Status.UNAVAILABLE.withDescription("Channel in TRANSIENT_FAILURE state") .withCause(status.asRuntimeException()); final ArrayList failFastPendingStreams = new ArrayList(); @@ -335,14 +339,17 @@ class DelayedClientTransport implements ManagedClientTransport { it.remove(); } } - streamCreationExecutor.execute(new Runnable() { + + class FailTheFailFastPendingStreams implements Runnable { @Override public void run() { for (PendingStream stream : failFastPendingStreams) { stream.setStream(new FailingClientStream(status)); } } - }); + } + + streamCreationExecutor.execute(new FailTheFailFastPendingStreams()); } } } @@ -353,6 +360,8 @@ class DelayedClientTransport implements ManagedClientTransport { */ void endBackoff() { synchronized (lock) { + Preconditions.checkState(backoffStatus != null, + "Error when calling endBackoff: transport is not in backoff period"); backoffStatus = null; } } diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index d89a8ac49c..3941a88d6c 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -227,21 +227,14 @@ final class TransportSet implements WithLogId { * @param status the causal status when the channel begins transition to * TRANSIENT_FAILURE. */ - @CheckReturnValue - @GuardedBy("lock") - private Runnable scheduleBackoff( + private void scheduleBackoff( final DelayedClientTransport delayedTransport, final Status status) { - Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); + // This must be run outside of lock. The TransportSet lock is a channel level lock. + // startBackoff() will acquire the delayed transport lock, which is a transport level + // lock. Our lock ordering mandates transport lock > channel lock. Otherwise a deadlock + // could happen (https://github.com/grpc/grpc-java/issues/2152). + delayedTransport.startBackoff(status); - if (reconnectPolicy == null) { - reconnectPolicy = backoffPolicyProvider.get(); - } - long delayMillis = - reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS); - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", - new Object[]{getLogId(), delayMillis}); - } class EndOfCurrentBackoff implements Runnable { @Override public void run() { @@ -285,18 +278,25 @@ final class TransportSet implements WithLogId { } } - reconnectTask = scheduledExecutor.schedule( - new LogExceptionRunnable(new EndOfCurrentBackoff()), delayMillis, TimeUnit.MILLISECONDS); - return new Runnable() { - @Override - public void run() { - // This must be run outside of lock. The TransportSet lock is a channel level lock. - // startBackoff() will acquire the delayed transport lock, which is a transport level - // lock. Our lock ordering mandates transport lock > channel lock. Otherwise a deadlock - // could happen (https://github.com/grpc/grpc-java/issues/2152). - delayedTransport.startBackoff(status); + synchronized (lock) { + if (shutdown) { + return; } - }; + if (reconnectPolicy == null) { + reconnectPolicy = backoffPolicyProvider.get(); + } + long delayMillis = + reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", + new Object[]{getLogId(), delayMillis}); + } + Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); + reconnectTask = scheduledExecutor.schedule( + new LogExceptionRunnable(new EndOfCurrentBackoff()), + delayMillis, + TimeUnit.MILLISECONDS); + } } /** @@ -464,15 +464,17 @@ final class TransportSet implements WithLogId { // Continue reconnect if there are still addresses to try. if (nextAddressIndex == 0) { allAddressesFailed = true; - // Initiate backoff - // Transition to TRANSIENT_FAILURE - runnable = scheduleBackoff(delayedTransport, s); } else { // Still CONNECTING runnable = startNewTransport(delayedTransport); } } } + if (allAddressesFailed) { + // Initiate backoff + // Transition to TRANSIENT_FAILURE + scheduleBackoff(delayedTransport, s); + } if (runnable != null) { runnable.run(); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index b3c64d4465..1ef82998ff 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -324,4 +324,13 @@ public class DelayedClientTransportTest { delayedTransport.newStream(method, headers, waitForReadyCallOptions); assertEquals(1, delayedTransport.getPendingStreamsCount()); } + + @Test public void startBackoff_DoNothingIfAlreadyShutDown() { + delayedTransport.shutdown(); + + final Status cause = Status.UNAVAILABLE.withDescription("some error when connecting"); + delayedTransport.startBackoff(cause); + + assertFalse(delayedTransport.isInBackoffPeriod()); + } } diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index ea26822cfd..94ee2052a9 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -69,6 +69,7 @@ import org.mockito.MockitoAnnotations; import java.net.SocketAddress; import java.util.Arrays; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; /** * Unit tests for {@link TransportSet}. @@ -671,6 +672,39 @@ public class TransportSetTest { verify(mockTransportSetCallback, times(inUse)).onInUse(transportSet); } + @Test + public void scheduleBackoff_DoNotScheduleEndOfBackoffIfAlreadyShutdown() { + // Setup + final boolean[] startBackoffAndShutdownAreCalled = {false}; + Executor executor = new Executor() { + @Override + public void execute(Runnable command) { + if (command.getClass().getName().contains("FailTheFailFastPendingStreams")) { + // shutdown during startBackoff + transportSet.shutdown(); + startBackoffAndShutdownAreCalled[0] = true; + } + fakeExecutor.scheduledExecutorService.execute(command); + } + }; + SocketAddress addr = mock(SocketAddress.class); + addressGroup = new EquivalentAddressGroup(Arrays.asList(addr)); + transportSet = new TransportSet(addressGroup, authority, userAgent, mockLoadBalancer, + mockBackoffPolicyProvider, mockTransportFactory, fakeClock.scheduledExecutorService, + fakeClock.stopwatchSupplier, executor, mockTransportSetCallback); + + // Attempt and fail, scheduleBackoff should be triggered, + // and transportSet.shutdown should be triggered by setup + transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + verify(mockTransportSetCallback, times(1)).onAllAddressesFailed(); + assertTrue(startBackoffAndShutdownAreCalled[0]); + + fakeExecutor.runDueTasks(); + // verify endOfBackoff not scheduled + verify(mockBackoffPolicy1, never()).nextBackoffMillis(); + } + private void createTransportSet(SocketAddress ... addrs) { addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs)); transportSet = new TransportSet(addressGroup, authority, userAgent, mockLoadBalancer,