core: add LoadBalancer.Helper.refreshNameResolution() (#5121)

This commit is contained in:
Kun Zhang 2018-12-05 13:11:45 -08:00 committed by GitHub
parent 9111602d7c
commit 3a86a176fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 29 deletions

View File

@ -569,6 +569,15 @@ public abstract class LoadBalancer {
public abstract void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker);
/**
* Call {@link NameResolver#refresh} on the channel's resolver.
*
* @since 1.18.0
*/
public void refreshNameResolution() {
throw new UnsupportedOperationException();
}
/**
* Schedule a task to be run in the Synchronization Context, which serializes the task with the
* callback methods on the {@link LoadBalancer} interface.

View File

@ -136,7 +136,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final TimeProvider timeProvider;
private final int maxTraceEvents;
private final SynchronizationContext syncContext = new SynchronizationContext(
@VisibleForTesting
final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
@ -348,13 +349,13 @@ final class ManagedChannelImpl extends ManagedChannel implements
return;
}
channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
LbHelperImpl lbHelper = new LbHelperImpl(nameResolver);
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
// may throw. We don't want to confuse our state, even if we will enter panic mode.
this.lbHelper = lbHelper;
NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper, nameResolver);
try {
nameResolver.start(listener);
nameResolverStarted = true;
@ -394,10 +395,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
// Run from syncContext
@VisibleForTesting
class NameResolverRefresh implements Runnable {
class DelayedNameResolverRefresh implements Runnable {
@Override
public void run() {
nameResolverRefresh = null;
scheduledNameResolverRefresh = null;
if (nameResolver != null) {
nameResolver.refresh();
}
@ -405,20 +406,30 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
// Must be used from syncContext
@Nullable private ScheduledHandle nameResolverRefresh;
@Nullable private ScheduledHandle scheduledNameResolverRefresh;
// The policy to control backoff between name resolution attempts. Non-null when an attempt is
// scheduled. Must be used from syncContext
@Nullable private BackoffPolicy nameResolverBackoffPolicy;
// Must be run from syncContext
private void cancelNameResolverBackoff() {
if (nameResolverRefresh != null) {
nameResolverRefresh.cancel();
nameResolverRefresh = null;
syncContext.throwIfNotInThisSynchronizationContext();
if (scheduledNameResolverRefresh != null) {
scheduledNameResolverRefresh.cancel();
scheduledNameResolverRefresh = null;
nameResolverBackoffPolicy = null;
}
}
// Must be run from syncContext
private void refreshNameResolutionNow() {
syncContext.throwIfNotInThisSynchronizationContext();
cancelNameResolverBackoff();
if (nameResolver != null) {
nameResolver.refresh();
}
}
private final class ChannelTransportProvider implements ClientTransportProvider {
@Override
public ClientTransport get(PickSubchannelArgs args) {
@ -857,10 +868,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (shutdown.get()) {
return;
}
if (nameResolverRefresh != null && nameResolverRefresh.isPending()) {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
checkState(nameResolverStarted, "name resolver must be started");
cancelNameResolverBackoff();
nameResolver.refresh();
refreshNameResolutionNow();
}
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
@ -975,16 +985,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
private class LbHelperImpl extends LoadBalancer.Helper {
LoadBalancer lb;
final NameResolver nr;
LbHelperImpl(NameResolver nr) {
this.nr = checkNotNull(nr, "NameResolver");
}
// Must be called from syncContext
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
nr.refresh();
refreshNameResolutionNow();
}
}
@ -1111,6 +1116,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
syncContext.execute(new UpdateBalancingState());
}
@Override
public void refreshNameResolution() {
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
refreshNameResolutionNow();
}
}
syncContext.execute(new LoadBalancerRefreshNameResolution());
}
@Override
public void updateSubchannelAddresses(
LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
@ -1231,16 +1248,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
private class NameResolverListenerImpl implements NameResolver.Listener {
final LbHelperImpl helper;
final NameResolver resolver;
NameResolverListenerImpl(LbHelperImpl helperImpl) {
this.helper = helperImpl;
NameResolverListenerImpl(LbHelperImpl helperImpl, NameResolver resolver) {
this.helper = checkNotNull(helperImpl, "helperImpl");
this.resolver = checkNotNull(resolver, "resolver");
}
@Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
if (servers.isEmpty()) {
onError(Status.UNAVAILABLE.withDescription(
"Name resolver " + helper.nr + " returned an empty list"));
"Name resolver " + resolver + " returned an empty list"));
return;
}
channelLogger.log(
@ -1305,7 +1324,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
return;
}
helper.lb.handleNameResolutionError(error);
if (nameResolverRefresh != null && nameResolverRefresh.isPending()) {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
@ -1319,9 +1338,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
nameResolverRefresh =
scheduledNameResolverRefresh =
syncContext.schedule(
new NameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
}
}

View File

@ -65,6 +65,11 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper {
delegate().updateBalancingState(newState, newPicker);
}
@Override
public void refreshNameResolution() {
delegate().refreshNameResolution();
}
@Override
@Deprecated
public void runSerialized(Runnable task) {

View File

@ -176,7 +176,7 @@ public class ManagedChannelImplTest {
@Override
public boolean shouldAccept(Runnable command) {
return command.toString().contains(
ManagedChannelImpl.NameResolverRefresh.class.getName());
ManagedChannelImpl.DelayedNameResolverRefresh.class.getName());
}
};
@ -1867,7 +1867,7 @@ public class ManagedChannelImplTest {
assertEquals(1, nameResolverFactory.resolvers.size());
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
Throwable panicReason = new Exception("Simulated uncaught exception");
final Throwable panicReason = new Exception("Simulated uncaught exception");
if (initialState == IDLE) {
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
} else {
@ -1889,7 +1889,13 @@ public class ManagedChannelImplTest {
}
// Make channel panic!
channel.syncContext.execute(
new Runnable() {
@Override
public void run() {
channel.panic(panicReason);
}
});
// Calls buffered in delayedTransport will fail
@ -1946,8 +1952,14 @@ public class ManagedChannelImplTest {
verifyZeroInteractions(mockCallListener, mockCallListener2);
// Enter panic
Throwable panicReason = new Exception("Simulated uncaught exception");
final Throwable panicReason = new Exception("Simulated uncaught exception");
channel.syncContext.execute(
new Runnable() {
@Override
public void run() {
channel.panic(panicReason);
}
});
// Buffered RPCs fail immediately
executor.runDueTasks();
@ -2191,6 +2203,19 @@ public class ManagedChannelImplTest {
verify(onStateChanged, never()).run();
}
@Test
public void balancerRefreshNameResolution() {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
int initialRefreshCount = resolver.refreshCalled;
helper.refreshNameResolution();
assertEquals(initialRefreshCount + 1, resolver.refreshCalled);
}
@Test
public void resetConnectBackoff() {
// Start with a name resolution failure to trigger backoff attempts