diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java index b2fc2af880..5ed01051d8 100644 --- a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java +++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java @@ -95,6 +95,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory { private LoadBalancer delegateBalancer; @Nullable String healthCheckedService; + private boolean balancerShutdown; final HashSet hcStates = new HashSet(); @@ -119,7 +120,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory { // createSubchannel() from the SynchronizationContext. syncContext.throwIfNotInThisSynchronizationContext(); HealthCheckState hcState = new HealthCheckState( - delegateBalancer, syncContext, delegate.getScheduledExecutorService()); + this, delegateBalancer, syncContext, delegate.getScheduledExecutorService()); hcStates.add(hcState); Subchannel subchannel = super.createSubchannel( addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build()); @@ -183,6 +184,19 @@ final class HealthCheckingLoadBalancerFactory extends Factory { } } + @Override + public void shutdown() { + super.shutdown(); + helper.balancerShutdown = true; + for (HealthCheckState hcState : helper.hcStates) { + // ManagedChannel will stop calling handleSubchannelState() after shutdown() is called, + // which is required by LoadBalancer API semantics. We need to deliver the final SHUTDOWN + // signal to health checkers so that they can cancel the streams. + hcState.updateRawState(ConnectivityStateInfo.forNonError(SHUTDOWN)); + } + helper.hcStates.clear(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); @@ -202,6 +216,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory { private final LoadBalancer delegate; private final SynchronizationContext syncContext; private final ScheduledExecutorService timerService; + private final HelperImpl helperImpl; private Subchannel subchannel; private ChannelLogger subchannelLogger; @@ -225,8 +240,10 @@ final class HealthCheckingLoadBalancerFactory extends Factory { private ScheduledHandle retryTimer; HealthCheckState( + HelperImpl helperImpl, LoadBalancer delegate, SynchronizationContext syncContext, ScheduledExecutorService timerService) { + this.helperImpl = checkNotNull(helperImpl, "helperImpl"); this.delegate = checkNotNull(delegate, "delegate"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timerService = checkNotNull(timerService, "timerService"); @@ -315,7 +332,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory { private void gotoState(ConnectivityStateInfo newState) { checkState(subchannel != null, "init() not called"); - if (!Objects.equal(concludedState, newState)) { + if (!helperImpl.balancerShutdown && !Objects.equal(concludedState, newState)) { concludedState = newState; delegate.handleSubchannelState(subchannel, concludedState); } diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java index e1d4fd4b8e..f3f0a62d85 100644 --- a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java +++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java @@ -183,13 +183,18 @@ public class HealthCheckingLoadBalancerFactoryTest { hcLb = hcLbFactory.newLoadBalancer(origHelper); // Make sure all calls into the hcLb is from the syncContext hcLbEventDelivery = new LoadBalancer() { + // Per LoadBalancer API, no more callbacks will be called after shutdown() is called. + boolean shutdown; + @Override public void handleResolvedAddressGroups( final List servers, final Attributes attributes) { syncContext.execute(new Runnable() { @Override public void run() { - hcLb.handleResolvedAddressGroups(servers, attributes); + if (!shutdown) { + hcLb.handleResolvedAddressGroups(servers, attributes); + } } }); } @@ -200,7 +205,9 @@ public class HealthCheckingLoadBalancerFactoryTest { syncContext.execute(new Runnable() { @Override public void run() { - hcLb.handleSubchannelState(subchannel, stateInfo); + if (!shutdown) { + hcLb.handleSubchannelState(subchannel, stateInfo); + } } }); } @@ -212,7 +219,15 @@ public class HealthCheckingLoadBalancerFactoryTest { @Override public void shutdown() { - throw new AssertionError("Not supposed to be called"); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (!shutdown) { + shutdown = true; + hcLb.shutdown(); + } + } + }); } }; verify(origLbFactory).newLoadBalancer(any(Helper.class)); @@ -942,6 +957,43 @@ public class HealthCheckingLoadBalancerFactoryTest { .isEqualTo("FooService"); } + @Test + public void balancerShutdown() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + + // Trigger the health check + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + ServerSideCall serverCall = healthImpl.calls.poll(); + assertThat(serverCall.cancelled).isFalse(); + + verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + // Shut down the balancer + hcLbEventDelivery.shutdown(); + verify(origLb).shutdown(); + + // Health check stream should be cancelled + assertThat(serverCall.cancelled).isTrue(); + + // LoadBalancer API requires no more callbacks on LoadBalancer after shutdown() is called. + verifyNoMoreInteractions(origLb); + + // No more health check call is made or scheduled + assertThat(healthImpl.calls).isEmpty(); + assertThat(clock.getPendingTasks()).isEmpty(); + } + @Test public void util_newHealthCheckingLoadBalancer() { Factory hcFactory =