From b31db3cc9b769f1123dce4a0f3514bb75cf1d7f1 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Fri, 3 Nov 2017 13:59:50 -0700 Subject: [PATCH] core: add resetConnectBackoff() method to ManagedChannel --- .../src/main/java/io/grpc/ManagedChannel.java | 16 +++++ .../io/grpc/internal/InternalSubchannel.java | 35 +++++++++-- .../io/grpc/internal/ManagedChannelImpl.java | 28 +++++++++ .../grpc/internal/InternalSubchannelTest.java | 58 +++++++++++++++++++ .../grpc/internal/ManagedChannelImplTest.java | 38 ++++++++++++ 5 files changed, 171 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/grpc/ManagedChannel.java b/core/src/main/java/io/grpc/ManagedChannel.java index e0db45c189..1c89ca4414 100644 --- a/core/src/main/java/io/grpc/ManagedChannel.java +++ b/core/src/main/java/io/grpc/ManagedChannel.java @@ -104,4 +104,20 @@ public abstract class ManagedChannel extends Channel { public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) { throw new UnsupportedOperationException("Not implemented"); } + + /** + * For subchannels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make + * them reconnect immediately. May also attempt to invoke {@link NameResolver#refresh}. + * + *

This is primarily intended for Android users, where the network may experience frequent + * temporary drops. Rather than waiting for gRPC's name resolution and reconnect timers to elapse + * before reconnecting, the app may use this method as a mechanism to notify gRPC that the network + * is now available and a reconnection attempt may occur immediately. + * + *

No-op if not supported by the implementation. + * + * @since 1.8.0 + */ + @ExperimentalApi + public void resetConnectBackoff() {} } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 9b5d98a449..6f402ca84d 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -20,6 +20,7 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -82,7 +83,8 @@ final class InternalSubchannel implements WithLogId { private int addressIndex; /** - * The policy to control back off between reconnects. Non-{@code null} when last connect failed. + * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is + * scheduled. */ @GuardedBy("lock") private BackoffPolicy reconnectPolicy; @@ -97,6 +99,9 @@ final class InternalSubchannel implements WithLogId { @Nullable private ScheduledFuture reconnectTask; + @GuardedBy("lock") + private boolean reconnectCanceled; + /** * All transports that are not terminated. At the very least the value of {@link #activeTransport} * will be present, but previously used transports that still have streams or are stopping may @@ -227,9 +232,9 @@ final class InternalSubchannel implements WithLogId { try { synchronized (lock) { reconnectTask = null; - if (state.getState() == SHUTDOWN) { - // Even though shutdown() will cancel this task, the task may have already started - // when it's being cancelled. + if (reconnectCanceled) { + // Even though cancelReconnectTask() will cancel this task, the task may have already + // started when it's being canceled. return; } gotoNonErrorState(CONNECTING); @@ -253,12 +258,32 @@ final class InternalSubchannel implements WithLogId { log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ns", new Object[]{logId, delayNanos}); } Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); + reconnectCanceled = false; reconnectTask = scheduledExecutor.schedule( new LogExceptionRunnable(new EndOfCurrentBackoff()), delayNanos, TimeUnit.NANOSECONDS); } + /** + * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this + * method has no effect. + */ + void resetConnectBackoff() { + try { + synchronized (lock) { + if (state.getState() != TRANSIENT_FAILURE) { + return; + } + cancelReconnectTask(); + gotoNonErrorState(CONNECTING); + startNewTransport(); + } + } finally { + channelExecutor.drain(); + } + } + @GuardedBy("lock") private void gotoNonErrorState(ConnectivityState newState) { gotoState(ConnectivityStateInfo.forNonError(newState)); @@ -400,7 +425,9 @@ final class InternalSubchannel implements WithLogId { private void cancelReconnectTask() { if (reconnectTask != null) { reconnectTask.cancel(false); + reconnectCanceled = true; reconnectTask = null; + reconnectPolicy = null; } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 2f099df56f..af34c5183b 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -135,6 +135,9 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI private final ProxyDetector proxyDetector; + // Must be accessed from the channelExecutor. + private boolean nameResolverStarted; + // null when channel is in idle mode. Must be assigned from channelExecutor. @Nullable private LbHelperImpl lbHelper; @@ -210,6 +213,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI if (nameResolver != null) { nameResolver.shutdown(); nameResolver = null; + nameResolverStarted = false; } // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them @@ -266,6 +270,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() // did not cancel idleModeTimer, both of which are bugs. nameResolver.shutdown(); + nameResolverStarted = false; nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); lbHelper.lb.shutdown(); lbHelper = null; @@ -312,6 +317,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper); try { nameResolver.start(listener); + nameResolverStarted = true; } catch (Throwable t) { listener.onError(Status.fromThrowable(t)); } @@ -631,6 +637,28 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI }).drain(); } + @Override + public void resetConnectBackoff() { + channelExecutor.executeLater( + new Runnable() { + @Override + public void run() { + if (shutdown.get()) { + return; + } + if (nameResolverStarted) { + nameResolver.refresh(); + } + for (InternalSubchannel subchannel : subchannels) { + subchannel.resetConnectBackoff(); + } + for (InternalSubchannel oobChannel : oobChannels) { + oobChannel.resetConnectBackoff(); + } + } + }).drain(); + } + private class LbHelperImpl extends LoadBalancer.Helper { LoadBalancer lb; final NameResolver nr; diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 30c757e362..40cab8339f 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -884,6 +884,64 @@ public class InternalSubchannelTest { eq(addr1), eq(AUTHORITY), eq(USER_AGENT), eq(proxy)); } + @Test + public void resetConnectBackoff() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + + // Move into TRANSIENT_FAILURE to schedule reconnect + internalSubchannel.obtainActiveTransport(); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + verify(mockTransportFactory).newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + + // Save the reconnectTask + FakeClock.ScheduledTask reconnectTask = null; + for (FakeClock.ScheduledTask task : fakeClock.getPendingTasks()) { + if (task.command.toString().contains("EndOfCurrentBackoff")) { + assertNull("There shouldn't be more than one reconnectTask", reconnectTask); + assertFalse(task.isDone()); + reconnectTask = task; + } + } + assertNotNull("There should be at least one reconnectTask", reconnectTask); + + internalSubchannel.resetConnectBackoff(); + + verify(mockTransportFactory, times(2)) + .newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertTrue(reconnectTask.isCancelled()); + + // Simulate a race between cancel and the task scheduler. Should be a no-op. + reconnectTask.command.run(); + assertNoCallbackInvoke(); + verify(mockTransportFactory, times(2)) + .newClientTransport(addr, AUTHORITY, USER_AGENT, NO_PROXY); + verify(mockBackoffPolicyProvider, times(1)).get(); + + // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after + // invoking resetConnectBackoff() + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); + assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); + verify(mockBackoffPolicyProvider, times(2)).get(); + fakeClock.forwardNanos(10); + assertExactCallbackInvokes("onStateChange:CONNECTING"); + assertEquals(CONNECTING, internalSubchannel.getState()); + } + + @Test + public void resetConnectBackoff_noopOnIdleTransport() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createInternalSubchannel(addr); + assertEquals(IDLE, internalSubchannel.getState()); + + internalSubchannel.resetConnectBackoff(); + + assertNoCallbackInvoke(); + } + private void createInternalSubchannel(SocketAddress ... addrs) { createInternalSubChannelWithProxy(ProxyDetector.NOOP_INSTANCE, addrs); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 672e24ef26..bebb2e442e 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -1540,6 +1540,43 @@ public class ManagedChannelImplTest { verify(onStateChanged, never()).run(); } + @Test + public void resetConnectBackoff_refreshesNameResolver() { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); + assertEquals(0, nameResolver.refreshCalled); + + channel.resetConnectBackoff(); + + assertEquals(1, nameResolver.refreshCalled); + } + + @Test + public void resetConnectBackoff_noOpWhenChannelShutdown() { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + channel.shutdown(); + assertTrue(channel.isShutdown()); + channel.resetConnectBackoff(); + + FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); + assertEquals(0, nameResolver.refreshCalled); + } + + @Test + public void resetConnectBackoff_noOpWhenNameResolverNotStarted() { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR, false /* requestConnection */, + ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE); + + channel.resetConnectBackoff(); + + FakeNameResolverFactory.FakeNameResolver nameResolver = nameResolverFactory.resolvers.get(0); + assertEquals(0, nameResolver.refreshCalled); + } + @Test public void orphanedChannelsAreLogged() throws Exception { int remaining = unterminatedChannels; @@ -1679,6 +1716,7 @@ public class ManagedChannelImplTest { } @Override public void refresh() { + assertNotNull(listener); refreshCalled++; }