diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 127d5a2ba6..7b65733841 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -569,6 +569,15 @@ public abstract class LoadBalancer { public abstract void updateBalancingState( @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker); + /** + * Call {@link NameResolver#refresh} on the channel's resolver. + * + * @since 1.18.0 + */ + public void refreshNameResolution() { + throw new UnsupportedOperationException(); + } + /** * Schedule a task to be run in the Synchronization Context, which serializes the task with the * callback methods on the {@link LoadBalancer} interface. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index d355e57c7e..6eb62fca56 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -136,7 +136,8 @@ final class ManagedChannelImpl extends ManagedChannel implements private final TimeProvider timeProvider; private final int maxTraceEvents; - private final SynchronizationContext syncContext = new SynchronizationContext( + @VisibleForTesting + final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { @@ -348,13 +349,13 @@ final class ManagedChannelImpl extends ManagedChannel implements return; } channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode"); - LbHelperImpl lbHelper = new LbHelperImpl(nameResolver); + LbHelperImpl lbHelper = new LbHelperImpl(); lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and // may throw. We don't want to confuse our state, even if we will enter panic mode. this.lbHelper = lbHelper; - NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper); + NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper, nameResolver); try { nameResolver.start(listener); nameResolverStarted = true; @@ -394,10 +395,10 @@ final class ManagedChannelImpl extends ManagedChannel implements // Run from syncContext @VisibleForTesting - class NameResolverRefresh implements Runnable { + class DelayedNameResolverRefresh implements Runnable { @Override public void run() { - nameResolverRefresh = null; + scheduledNameResolverRefresh = null; if (nameResolver != null) { nameResolver.refresh(); } @@ -405,20 +406,30 @@ final class ManagedChannelImpl extends ManagedChannel implements } // Must be used from syncContext - @Nullable private ScheduledHandle nameResolverRefresh; + @Nullable private ScheduledHandle scheduledNameResolverRefresh; // The policy to control backoff between name resolution attempts. Non-null when an attempt is // scheduled. Must be used from syncContext @Nullable private BackoffPolicy nameResolverBackoffPolicy; // Must be run from syncContext private void cancelNameResolverBackoff() { - if (nameResolverRefresh != null) { - nameResolverRefresh.cancel(); - nameResolverRefresh = null; + syncContext.throwIfNotInThisSynchronizationContext(); + if (scheduledNameResolverRefresh != null) { + scheduledNameResolverRefresh.cancel(); + scheduledNameResolverRefresh = null; nameResolverBackoffPolicy = null; } } + // Must be run from syncContext + private void refreshNameResolutionNow() { + syncContext.throwIfNotInThisSynchronizationContext(); + cancelNameResolverBackoff(); + if (nameResolver != null) { + nameResolver.refresh(); + } + } + private final class ChannelTransportProvider implements ClientTransportProvider { @Override public ClientTransport get(PickSubchannelArgs args) { @@ -857,10 +868,9 @@ final class ManagedChannelImpl extends ManagedChannel implements if (shutdown.get()) { return; } - if (nameResolverRefresh != null && nameResolverRefresh.isPending()) { + if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) { checkState(nameResolverStarted, "name resolver must be started"); - cancelNameResolverBackoff(); - nameResolver.refresh(); + refreshNameResolutionNow(); } for (InternalSubchannel subchannel : subchannels) { subchannel.resetConnectBackoff(); @@ -975,16 +985,11 @@ final class ManagedChannelImpl extends ManagedChannel implements private class LbHelperImpl extends LoadBalancer.Helper { LoadBalancer lb; - final NameResolver nr; - - LbHelperImpl(NameResolver nr) { - this.nr = checkNotNull(nr, "NameResolver"); - } // Must be called from syncContext private void handleInternalSubchannelState(ConnectivityStateInfo newState) { if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { - nr.refresh(); + refreshNameResolutionNow(); } } @@ -1111,6 +1116,18 @@ final class ManagedChannelImpl extends ManagedChannel implements syncContext.execute(new UpdateBalancingState()); } + @Override + public void refreshNameResolution() { + final class LoadBalancerRefreshNameResolution implements Runnable { + @Override + public void run() { + refreshNameResolutionNow(); + } + } + + syncContext.execute(new LoadBalancerRefreshNameResolution()); + } + @Override public void updateSubchannelAddresses( LoadBalancer.Subchannel subchannel, List addrs) { @@ -1231,16 +1248,18 @@ final class ManagedChannelImpl extends ManagedChannel implements private class NameResolverListenerImpl implements NameResolver.Listener { final LbHelperImpl helper; + final NameResolver resolver; - NameResolverListenerImpl(LbHelperImpl helperImpl) { - this.helper = helperImpl; + NameResolverListenerImpl(LbHelperImpl helperImpl, NameResolver resolver) { + this.helper = checkNotNull(helperImpl, "helperImpl"); + this.resolver = checkNotNull(resolver, "resolver"); } @Override public void onAddresses(final List servers, final Attributes config) { if (servers.isEmpty()) { onError(Status.UNAVAILABLE.withDescription( - "Name resolver " + helper.nr + " returned an empty list")); + "Name resolver " + resolver + " returned an empty list")); return; } channelLogger.log( @@ -1305,7 +1324,7 @@ final class ManagedChannelImpl extends ManagedChannel implements return; } helper.lb.handleNameResolutionError(error); - if (nameResolverRefresh != null && nameResolverRefresh.isPending()) { + if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) { // The name resolver may invoke onError multiple times, but we only want to // schedule one backoff attempt // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we @@ -1319,9 +1338,9 @@ final class ManagedChannelImpl extends ManagedChannel implements channelLogger.log( ChannelLogLevel.DEBUG, "Scheduling DNS resolution backoff for {0} ns", delayNanos); - nameResolverRefresh = + scheduledNameResolverRefresh = syncContext.schedule( - new NameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, + new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS, transportFactory .getScheduledExecutorService()); } } diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java index 73e168d881..c9f9e320f8 100644 --- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -65,6 +65,11 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper { delegate().updateBalancingState(newState, newPicker); } + @Override + public void refreshNameResolution() { + delegate().refreshNameResolution(); + } + @Override @Deprecated public void runSerialized(Runnable task) { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 12b72ef582..485368e59b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -176,7 +176,7 @@ public class ManagedChannelImplTest { @Override public boolean shouldAccept(Runnable command) { return command.toString().contains( - ManagedChannelImpl.NameResolverRefresh.class.getName()); + ManagedChannelImpl.DelayedNameResolverRefresh.class.getName()); } }; @@ -1867,7 +1867,7 @@ public class ManagedChannelImplTest { assertEquals(1, nameResolverFactory.resolvers.size()); FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); - Throwable panicReason = new Exception("Simulated uncaught exception"); + final Throwable panicReason = new Exception("Simulated uncaught exception"); if (initialState == IDLE) { timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); } else { @@ -1889,7 +1889,13 @@ public class ManagedChannelImplTest { } // Make channel panic! - channel.panic(panicReason); + channel.syncContext.execute( + new Runnable() { + @Override + public void run() { + channel.panic(panicReason); + } + }); // Calls buffered in delayedTransport will fail @@ -1946,8 +1952,14 @@ public class ManagedChannelImplTest { verifyZeroInteractions(mockCallListener, mockCallListener2); // Enter panic - Throwable panicReason = new Exception("Simulated uncaught exception"); - channel.panic(panicReason); + final Throwable panicReason = new Exception("Simulated uncaught exception"); + channel.syncContext.execute( + new Runnable() { + @Override + public void run() { + channel.panic(panicReason); + } + }); // Buffered RPCs fail immediately executor.runDueTasks(); @@ -2191,6 +2203,19 @@ public class ManagedChannelImplTest { verify(onStateChanged, never()).run(); } + @Test + public void balancerRefreshNameResolution() { + FakeNameResolverFactory nameResolverFactory = + new FakeNameResolverFactory.Builder(expectedUri).build(); + channelBuilder.nameResolverFactory(nameResolverFactory); + createChannel(); + + FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); + int initialRefreshCount = resolver.refreshCalled; + helper.refreshNameResolution(); + assertEquals(initialRefreshCount + 1, resolver.refreshCalled); + } + @Test public void resetConnectBackoff() { // Start with a name resolution failure to trigger backoff attempts