core: fix race condition for TransportSet scheduleBackoff

Trying to fix issue  #2188
- Try to keep avoiding the lock issue #2152 and also to avoid race condition #2188.
- Add `checkState` for `endBackoff()`. Could help hit and identify any potential issue related to #2188.
- Make sure `startBackoff()` and `endBackoff()` invoked in the right order.
- Not to schedule endBackoff if transportSet has been shutdown.
This commit is contained in:
ZHANG Dapeng 2016-09-20 11:32:38 -07:00 committed by GitHub
parent 141eed5ed0
commit 40c5700cc3
4 changed files with 84 additions and 30 deletions

View File

@ -322,7 +322,11 @@ class DelayedClientTransport implements ManagedClientTransport {
*/
void startBackoff(final Status status) {
synchronized (lock) {
Preconditions.checkState(backoffStatus == null);
if (shutdown) {
return;
}
Preconditions.checkState(backoffStatus == null,
"Error when calling startBackoff: transport is already in backoff period");
backoffStatus = Status.UNAVAILABLE.withDescription("Channel in TRANSIENT_FAILURE state")
.withCause(status.asRuntimeException());
final ArrayList<PendingStream> failFastPendingStreams = new ArrayList<PendingStream>();
@ -335,14 +339,17 @@ class DelayedClientTransport implements ManagedClientTransport {
it.remove();
}
}
streamCreationExecutor.execute(new Runnable() {
class FailTheFailFastPendingStreams implements Runnable {
@Override
public void run() {
for (PendingStream stream : failFastPendingStreams) {
stream.setStream(new FailingClientStream(status));
}
}
});
}
streamCreationExecutor.execute(new FailTheFailFastPendingStreams());
}
}
}
@ -353,6 +360,8 @@ class DelayedClientTransport implements ManagedClientTransport {
*/
void endBackoff() {
synchronized (lock) {
Preconditions.checkState(backoffStatus != null,
"Error when calling endBackoff: transport is not in backoff period");
backoffStatus = null;
}
}

View File

@ -227,21 +227,14 @@ final class TransportSet implements WithLogId {
* @param status the causal status when the channel begins transition to
* TRANSIENT_FAILURE.
*/
@CheckReturnValue
@GuardedBy("lock")
private Runnable scheduleBackoff(
private void scheduleBackoff(
final DelayedClientTransport delayedTransport, final Status status) {
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
// This must be run outside of lock. The TransportSet lock is a channel level lock.
// startBackoff() will acquire the delayed transport lock, which is a transport level
// lock. Our lock ordering mandates transport lock > channel lock. Otherwise a deadlock
// could happen (https://github.com/grpc/grpc-java/issues/2152).
delayedTransport.startBackoff(status);
if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
long delayMillis =
reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms",
new Object[]{getLogId(), delayMillis});
}
class EndOfCurrentBackoff implements Runnable {
@Override
public void run() {
@ -285,18 +278,25 @@ final class TransportSet implements WithLogId {
}
}
reconnectTask = scheduledExecutor.schedule(
new LogExceptionRunnable(new EndOfCurrentBackoff()), delayMillis, TimeUnit.MILLISECONDS);
return new Runnable() {
@Override
public void run() {
// This must be run outside of lock. The TransportSet lock is a channel level lock.
// startBackoff() will acquire the delayed transport lock, which is a transport level
// lock. Our lock ordering mandates transport lock > channel lock. Otherwise a deadlock
// could happen (https://github.com/grpc/grpc-java/issues/2152).
delayedTransport.startBackoff(status);
synchronized (lock) {
if (shutdown) {
return;
}
};
if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
long delayMillis =
reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms",
new Object[]{getLogId(), delayMillis});
}
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
reconnectTask = scheduledExecutor.schedule(
new LogExceptionRunnable(new EndOfCurrentBackoff()),
delayMillis,
TimeUnit.MILLISECONDS);
}
}
/**
@ -464,15 +464,17 @@ final class TransportSet implements WithLogId {
// Continue reconnect if there are still addresses to try.
if (nextAddressIndex == 0) {
allAddressesFailed = true;
// Initiate backoff
// Transition to TRANSIENT_FAILURE
runnable = scheduleBackoff(delayedTransport, s);
} else {
// Still CONNECTING
runnable = startNewTransport(delayedTransport);
}
}
}
if (allAddressesFailed) {
// Initiate backoff
// Transition to TRANSIENT_FAILURE
scheduleBackoff(delayedTransport, s);
}
if (runnable != null) {
runnable.run();
}

View File

@ -324,4 +324,13 @@ public class DelayedClientTransportTest {
delayedTransport.newStream(method, headers, waitForReadyCallOptions);
assertEquals(1, delayedTransport.getPendingStreamsCount());
}
@Test public void startBackoff_DoNothingIfAlreadyShutDown() {
delayedTransport.shutdown();
final Status cause = Status.UNAVAILABLE.withDescription("some error when connecting");
delayedTransport.startBackoff(cause);
assertFalse(delayedTransport.isInBackoffPeriod());
}
}

View File

@ -69,6 +69,7 @@ import org.mockito.MockitoAnnotations;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**
* Unit tests for {@link TransportSet}.
@ -671,6 +672,39 @@ public class TransportSetTest {
verify(mockTransportSetCallback, times(inUse)).onInUse(transportSet);
}
@Test
public void scheduleBackoff_DoNotScheduleEndOfBackoffIfAlreadyShutdown() {
// Setup
final boolean[] startBackoffAndShutdownAreCalled = {false};
Executor executor = new Executor() {
@Override
public void execute(Runnable command) {
if (command.getClass().getName().contains("FailTheFailFastPendingStreams")) {
// shutdown during startBackoff
transportSet.shutdown();
startBackoffAndShutdownAreCalled[0] = true;
}
fakeExecutor.scheduledExecutorService.execute(command);
}
};
SocketAddress addr = mock(SocketAddress.class);
addressGroup = new EquivalentAddressGroup(Arrays.asList(addr));
transportSet = new TransportSet(addressGroup, authority, userAgent, mockLoadBalancer,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.scheduledExecutorService,
fakeClock.stopwatchSupplier, executor, mockTransportSetCallback);
// Attempt and fail, scheduleBackoff should be triggered,
// and transportSet.shutdown should be triggered by setup
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, times(1)).onAllAddressesFailed();
assertTrue(startBackoffAndShutdownAreCalled[0]);
fakeExecutor.runDueTasks();
// verify endOfBackoff not scheduled
verify(mockBackoffPolicy1, never()).nextBackoffMillis();
}
private void createTransportSet(SocketAddress ... addrs) {
addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
transportSet = new TransportSet(addressGroup, authority, userAgent, mockLoadBalancer,