services: cancel health-check when LoadBalancer.shutdown() is called. (#5051)

The health checking balancer won't receive an update about Subchannel
shutdown via handleSubchannelState(), because no more callback will be
called after LoadBalancer.shutdown() is called.
This commit is contained in:
Kun Zhang 2018-11-09 14:51:36 -08:00 committed by GitHub
parent bff008fbc8
commit 31af0657d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 5 deletions

View File

@ -95,6 +95,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
private LoadBalancer delegateBalancer; private LoadBalancer delegateBalancer;
@Nullable String healthCheckedService; @Nullable String healthCheckedService;
private boolean balancerShutdown;
final HashSet<HealthCheckState> hcStates = new HashSet<HealthCheckState>(); final HashSet<HealthCheckState> hcStates = new HashSet<HealthCheckState>();
@ -119,7 +120,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
// createSubchannel() from the SynchronizationContext. // createSubchannel() from the SynchronizationContext.
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
HealthCheckState hcState = new HealthCheckState( HealthCheckState hcState = new HealthCheckState(
delegateBalancer, syncContext, delegate.getScheduledExecutorService()); this, delegateBalancer, syncContext, delegate.getScheduledExecutorService());
hcStates.add(hcState); hcStates.add(hcState);
Subchannel subchannel = super.createSubchannel( Subchannel subchannel = super.createSubchannel(
addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build()); addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build());
@ -183,6 +184,19 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
} }
} }
@Override
public void shutdown() {
super.shutdown();
helper.balancerShutdown = true;
for (HealthCheckState hcState : helper.hcStates) {
// ManagedChannel will stop calling handleSubchannelState() after shutdown() is called,
// which is required by LoadBalancer API semantics. We need to deliver the final SHUTDOWN
// signal to health checkers so that they can cancel the streams.
hcState.updateRawState(ConnectivityStateInfo.forNonError(SHUTDOWN));
}
helper.hcStates.clear();
}
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
@ -202,6 +216,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
private final LoadBalancer delegate; private final LoadBalancer delegate;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final ScheduledExecutorService timerService; private final ScheduledExecutorService timerService;
private final HelperImpl helperImpl;
private Subchannel subchannel; private Subchannel subchannel;
private ChannelLogger subchannelLogger; private ChannelLogger subchannelLogger;
@ -225,8 +240,10 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
private ScheduledHandle retryTimer; private ScheduledHandle retryTimer;
HealthCheckState( HealthCheckState(
HelperImpl helperImpl,
LoadBalancer delegate, SynchronizationContext syncContext, LoadBalancer delegate, SynchronizationContext syncContext,
ScheduledExecutorService timerService) { ScheduledExecutorService timerService) {
this.helperImpl = checkNotNull(helperImpl, "helperImpl");
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
this.syncContext = checkNotNull(syncContext, "syncContext"); this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(timerService, "timerService"); this.timerService = checkNotNull(timerService, "timerService");
@ -315,7 +332,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
private void gotoState(ConnectivityStateInfo newState) { private void gotoState(ConnectivityStateInfo newState) {
checkState(subchannel != null, "init() not called"); checkState(subchannel != null, "init() not called");
if (!Objects.equal(concludedState, newState)) { if (!helperImpl.balancerShutdown && !Objects.equal(concludedState, newState)) {
concludedState = newState; concludedState = newState;
delegate.handleSubchannelState(subchannel, concludedState); delegate.handleSubchannelState(subchannel, concludedState);
} }

View File

@ -183,13 +183,18 @@ public class HealthCheckingLoadBalancerFactoryTest {
hcLb = hcLbFactory.newLoadBalancer(origHelper); hcLb = hcLbFactory.newLoadBalancer(origHelper);
// Make sure all calls into the hcLb is from the syncContext // Make sure all calls into the hcLb is from the syncContext
hcLbEventDelivery = new LoadBalancer() { hcLbEventDelivery = new LoadBalancer() {
// Per LoadBalancer API, no more callbacks will be called after shutdown() is called.
boolean shutdown;
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddressGroups(
final List<EquivalentAddressGroup> servers, final Attributes attributes) { final List<EquivalentAddressGroup> servers, final Attributes attributes) {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
hcLb.handleResolvedAddressGroups(servers, attributes); if (!shutdown) {
hcLb.handleResolvedAddressGroups(servers, attributes);
}
} }
}); });
} }
@ -200,7 +205,9 @@ public class HealthCheckingLoadBalancerFactoryTest {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
hcLb.handleSubchannelState(subchannel, stateInfo); if (!shutdown) {
hcLb.handleSubchannelState(subchannel, stateInfo);
}
} }
}); });
} }
@ -212,7 +219,15 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Override @Override
public void shutdown() { public void shutdown() {
throw new AssertionError("Not supposed to be called"); syncContext.execute(new Runnable() {
@Override
public void run() {
if (!shutdown) {
shutdown = true;
hcLb.shutdown();
}
}
});
} }
}; };
verify(origLbFactory).newLoadBalancer(any(Helper.class)); verify(origLbFactory).newLoadBalancer(any(Helper.class));
@ -942,6 +957,43 @@ public class HealthCheckingLoadBalancerFactoryTest {
.isEqualTo("FooService"); .isEqualTo("FooService");
} }
@Test
public void balancerShutdown() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
// Trigger the health check
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
HealthImpl healthImpl = healthImpls[0];
assertThat(healthImpl.calls).hasSize(1);
ServerSideCall serverCall = healthImpl.calls.poll();
assertThat(serverCall.cancelled).isFalse();
verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
// Shut down the balancer
hcLbEventDelivery.shutdown();
verify(origLb).shutdown();
// Health check stream should be cancelled
assertThat(serverCall.cancelled).isTrue();
// LoadBalancer API requires no more callbacks on LoadBalancer after shutdown() is called.
verifyNoMoreInteractions(origLb);
// No more health check call is made or scheduled
assertThat(healthImpl.calls).isEmpty();
assertThat(clock.getPendingTasks()).isEmpty();
}
@Test @Test
public void util_newHealthCheckingLoadBalancer() { public void util_newHealthCheckingLoadBalancer() {
Factory hcFactory = Factory hcFactory =