core: add resetConnectBackoff() method to ManagedChannel

This commit is contained in:
Eric Gribkoff 2017-11-03 13:59:50 -07:00 committed by GitHub
parent 34555e497a
commit b31db3cc9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 4 deletions

View File

@ -104,4 +104,20 @@ public abstract class ManagedChannel extends Channel {
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
throw new UnsupportedOperationException("Not implemented");
}
/**
* For subchannels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make
* them reconnect immediately. May also attempt to invoke {@link NameResolver#refresh}.
*
* <p>This is primarily intended for Android users, where the network may experience frequent
* temporary drops. Rather than waiting for gRPC's name resolution and reconnect timers to elapse
* before reconnecting, the app may use this method as a mechanism to notify gRPC that the network
* is now available and a reconnection attempt may occur immediately.
*
* <p>No-op if not supported by the implementation.
*
* @since 1.8.0
*/
@ExperimentalApi
public void resetConnectBackoff() {}
}

View File

@ -20,6 +20,7 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -82,7 +83,8 @@ final class InternalSubchannel implements WithLogId {
private int addressIndex;
/**
* The policy to control back off between reconnects. Non-{@code null} when last connect failed.
* The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
* scheduled.
*/
@GuardedBy("lock")
private BackoffPolicy reconnectPolicy;
@ -97,6 +99,9 @@ final class InternalSubchannel implements WithLogId {
@Nullable
private ScheduledFuture<?> reconnectTask;
@GuardedBy("lock")
private boolean reconnectCanceled;
/**
* All transports that are not terminated. At the very least the value of {@link #activeTransport}
* will be present, but previously used transports that still have streams or are stopping may
@ -227,9 +232,9 @@ final class InternalSubchannel implements WithLogId {
try {
synchronized (lock) {
reconnectTask = null;
if (state.getState() == SHUTDOWN) {
// Even though shutdown() will cancel this task, the task may have already started
// when it's being cancelled.
if (reconnectCanceled) {
// Even though cancelReconnectTask() will cancel this task, the task may have already
// started when it's being canceled.
return;
}
gotoNonErrorState(CONNECTING);
@ -253,12 +258,32 @@ final class InternalSubchannel implements WithLogId {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ns", new Object[]{logId, delayNanos});
}
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
reconnectCanceled = false;
reconnectTask = scheduledExecutor.schedule(
new LogExceptionRunnable(new EndOfCurrentBackoff()),
delayNanos,
TimeUnit.NANOSECONDS);
}
/**
* Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
* method has no effect.
*/
void resetConnectBackoff() {
try {
synchronized (lock) {
if (state.getState() != TRANSIENT_FAILURE) {
return;
}
cancelReconnectTask();
gotoNonErrorState(CONNECTING);
startNewTransport();
}
} finally {
channelExecutor.drain();
}
}
@GuardedBy("lock")
private void gotoNonErrorState(ConnectivityState newState) {
gotoState(ConnectivityStateInfo.forNonError(newState));
@ -400,7 +425,9 @@ final class InternalSubchannel implements WithLogId {
private void cancelReconnectTask() {
if (reconnectTask != null) {
reconnectTask.cancel(false);
reconnectCanceled = true;
reconnectTask = null;
reconnectPolicy = null;
}
}

View File

@ -135,6 +135,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
private final ProxyDetector proxyDetector;
// Must be accessed from the channelExecutor.
private boolean nameResolverStarted;
// null when channel is in idle mode. Must be assigned from channelExecutor.
@Nullable
private LbHelperImpl lbHelper;
@ -210,6 +213,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
if (nameResolver != null) {
nameResolver.shutdown();
nameResolver = null;
nameResolverStarted = false;
}
// Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
@ -266,6 +270,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
// either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
// did not cancel idleModeTimer, both of which are bugs.
nameResolver.shutdown();
nameResolverStarted = false;
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
lbHelper.lb.shutdown();
lbHelper = null;
@ -312,6 +317,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
try {
nameResolver.start(listener);
nameResolverStarted = true;
} catch (Throwable t) {
listener.onError(Status.fromThrowable(t));
}
@ -631,6 +637,28 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
}).drain();
}
@Override
public void resetConnectBackoff() {
channelExecutor.executeLater(
new Runnable() {
@Override
public void run() {
if (shutdown.get()) {
return;
}
if (nameResolverStarted) {
nameResolver.refresh();
}
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
}
for (InternalSubchannel oobChannel : oobChannels) {
oobChannel.resetConnectBackoff();
}
}
}).drain();
}
private class LbHelperImpl extends LoadBalancer.Helper {
LoadBalancer lb;
final NameResolver nr;

View File

@ -884,6 +884,64 @@ public class InternalSubchannelTest {
eq(addr1), eq(AUTHORITY), eq(USER_AGENT), eq(proxy));
}
@Test
public void resetConnectBackoff() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
// Move into TRANSIENT_FAILURE to schedule reconnect
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Save the reconnectTask
FakeClock.ScheduledTask reconnectTask = null;
for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) {
if (task.command.toString().contains("EndOfCurrentBackoff")) {
assertNull("There shouldn't be more than one reconnectTask", reconnectTask);
assertFalse(task.isDone());
reconnectTask = task;
}
}
assertNotNull("There should be at least one reconnectTask", reconnectTask);
internalSubchannel.resetConnectBackoff();
verify(mockTransportFactory, times(2))
.newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertTrue(reconnectTask.isCancelled());
// Simulate a race between cancel and the task scheduler. Should be a no-op.
reconnectTask.command.run();
assertNoCallbackInvoke();
verify(mockTransportFactory, times(2))
.newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY);
verify(mockBackoffPolicyProvider, times(1)).get();
// Fail the reconnect attempt to verify that a fresh reconnect policy is generated after
// invoking resetConnectBackoff()
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
verify(mockBackoffPolicyProvider, times(2)).get();
fakeClock.forwardNanos(10);
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
}
@Test
public void resetConnectBackoff_noopOnIdleTransport() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
assertEquals(IDLE, internalSubchannel.getState());
internalSubchannel.resetConnectBackoff();
assertNoCallbackInvoke();
}
private void createInternalSubchannel(SocketAddress ... addrs) {
createInternalSubChannelWithProxy(ProxyDetector.NOOP_INSTANCE, addrs);
}

View File

@ -1540,6 +1540,43 @@ public class ManagedChannelImplTest {
verify(onStateChanged, never()).run();
}
@Test
public void resetConnectBackoff_refreshesNameResolver() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
channel.resetConnectBackoff();
assertEquals(1, nameResolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWhenChannelShutdown() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
channel.shutdown();
assertTrue(channel.isShutdown());
channel.resetConnectBackoff();
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void resetConnectBackoff_noOpWhenNameResolverNotStarted() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR, false /* requestConnection */,
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE);
channel.resetConnectBackoff();
FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0);
assertEquals(0, nameResolver.refreshCalled);
}
@Test
public void orphanedChannelsAreLogged() throws Exception {
int remaining = unterminatedChannels;
@ -1679,6 +1716,7 @@ public class ManagedChannelImplTest {
}
@Override public void refresh() {
assertNotNull(listener);
refreshCalled++;
}