services: fix HealthCheckingLoadBalancer.shutdown(). (#5848)

HealthCheckingLoadBalancer.shutdown() calls
hcState.onSubchannelState(SHUTDOWN) which removes that hcState from
helper.hcStates.  Therefore, if more than one Subchannels are present,
ConcurrentModificationException will be thrown.

Since HealthCheckingLoadBalancer.shutdown() will clear the hcStates
set after the loop, it's unnecessary to do the deletion within the
loop.  However, when a Subchannel is shutdown by LoadBalancer, its
HcState still needs to be removed.  To do that, change moves the
deletion to Subchannel.shutdown().
This commit is contained in:
Kun Zhang 2019-06-07 09:32:55 -07:00 committed by GitHub
parent 26bd76fa76
commit c6f15162ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 25 deletions

View File

@ -121,7 +121,7 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
HealthCheckState hcState = new HealthCheckState( HealthCheckState hcState = new HealthCheckState(
this, originalSubchannel, syncContext, delegate.getScheduledExecutorService()); this, originalSubchannel, syncContext, delegate.getScheduledExecutorService());
hcStates.add(hcState); hcStates.add(hcState);
Subchannel subchannel = new SubchannelImpl(originalSubchannel, hcState); Subchannel subchannel = new SubchannelImpl(originalSubchannel, this, hcState);
if (healthCheckedService != null) { if (healthCheckedService != null) {
hcState.setServiceName(healthCheckedService); hcState.setServiceName(healthCheckedService);
} }
@ -144,10 +144,12 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
@VisibleForTesting @VisibleForTesting
static final class SubchannelImpl extends ForwardingSubchannel { static final class SubchannelImpl extends ForwardingSubchannel {
final Subchannel delegate; final Subchannel delegate;
final HelperImpl helperImpl;
final HealthCheckState hcState; final HealthCheckState hcState;
SubchannelImpl(Subchannel delegate, HealthCheckState hcState) { SubchannelImpl(Subchannel delegate, HelperImpl helperImpl, HealthCheckState hcState) {
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
this.helperImpl = checkNotNull(helperImpl, "helperImpl");
this.hcState = checkNotNull(hcState, "hcState"); this.hcState = checkNotNull(hcState, "hcState");
} }
@ -161,6 +163,13 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
hcState.init(listener); hcState.init(listener);
delegate().start(hcState); delegate().start(hcState);
} }
@Override
public void shutdown() {
helperImpl.getSynchronizationContext().throwIfNotInThisSynchronizationContext();
delegate().shutdown();
helperImpl.hcStates.remove(hcState);
}
} }
private static final class HealthCheckingLoadBalancer extends ForwardingLoadBalancer { private static final class HealthCheckingLoadBalancer extends ForwardingLoadBalancer {
@ -282,9 +291,6 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
// may be available on the new connection. // may be available on the new connection.
disabled = false; disabled = false;
} }
if (Objects.equal(rawState.getState(), SHUTDOWN)) {
helperImpl.hcStates.remove(this);
}
this.rawState = rawState; this.rawState = rawState;
adjustHealthCheck(); adjustHealthCheck();
} }

View File

@ -258,6 +258,7 @@ public class HealthCheckingLoadBalancerFactoryTest {
verify(origHelper, atLeast(0)).getScheduledExecutorService(); verify(origHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(origHelper); verifyNoMoreInteractions(origHelper);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel[] wrappedSubchannels = new Subchannel[NUM_SUBCHANNELS];
// Simulate that the orignal LB creates Subchannels // Simulate that the orignal LB creates Subchannels
for (int i = 0; i < NUM_SUBCHANNELS; i++) { for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@ -265,8 +266,8 @@ public class HealthCheckingLoadBalancerFactoryTest {
String subchannelAttrValue = "eag attr " + i; String subchannelAttrValue = "eag attr " + i;
Attributes attrs = Attributes.newBuilder() Attributes attrs = Attributes.newBuilder()
.set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); .set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
// We don't wrap Subchannels, thus origLb gets the original Subchannels. wrappedSubchannels[i] = createSubchannel(i, attrs);
assertThat(unwrap(createSubchannel(i, attrs))).isSameInstanceAs(subchannels[i]); assertThat(unwrap(wrappedSubchannels[i])).isSameInstanceAs(subchannels[i]);
verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
@ -340,9 +341,17 @@ public class HealthCheckingLoadBalancerFactoryTest {
assertThat(serverCall.cancelled).isFalse(); assertThat(serverCall.cancelled).isFalse();
verifyNoMoreInteractions(mockStateListener); verifyNoMoreInteractions(mockStateListener);
assertThat(subchannels[i].isShutdown).isFalse();
final Subchannel wrappedSubchannel = wrappedSubchannels[i];
// Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the // Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the
// health check RPC // health check RPC
subchannel.shutdown(); syncContext.execute(new Runnable() {
@Override
public void run() {
wrappedSubchannel.shutdown();
}
});
assertThat(subchannels[i].isShutdown).isTrue();
assertThat(serverCall.cancelled).isTrue(); assertThat(serverCall.cancelled).isTrue();
verify(mockStateListener).onSubchannelState( verify(mockStateListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(SHUTDOWN))); eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
@ -1004,34 +1013,38 @@ public class HealthCheckingLoadBalancerFactoryTest {
verify(origLb).handleResolvedAddresses(result); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
ServerSideCall[] serverCalls = new ServerSideCall[NUM_SUBCHANNELS];
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); for (int i = 0; i < NUM_SUBCHANNELS; i++) {
SubchannelStateListener mockListener = mockStateListeners[0]; Subchannel subchannel = createSubchannel(i, Attributes.EMPTY);
assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[0]); SubchannelStateListener mockListener = mockStateListeners[i];
assertThat(unwrap(subchannel)).isSameInstanceAs(subchannels[i]);
// Trigger the health check // Trigger the health check
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(i, ConnectivityStateInfo.forNonError(READY));
HealthImpl healthImpl = healthImpls[0]; HealthImpl healthImpl = healthImpls[i];
assertThat(healthImpl.calls).hasSize(1); assertThat(healthImpl.calls).hasSize(1);
ServerSideCall serverCall = healthImpl.calls.poll(); serverCalls[i] = healthImpl.calls.poll();
assertThat(serverCall.cancelled).isFalse(); assertThat(serverCalls[i].cancelled).isFalse();
verify(mockListener).onSubchannelState( verify(mockListener).onSubchannelState(
eq(ConnectivityStateInfo.forNonError(CONNECTING))); eq(ConnectivityStateInfo.forNonError(CONNECTING)));
}
// Shut down the balancer // Shut down the balancer
hcLbEventDelivery.shutdown(); hcLbEventDelivery.shutdown();
verify(origLb).shutdown(); verify(origLb).shutdown();
// Health check stream should be cancelled // Health check stream should be cancelled
assertThat(serverCall.cancelled).isTrue(); for (int i = 0; i < NUM_SUBCHANNELS; i++) {
assertThat(serverCalls[i].cancelled).isTrue();
// LoadBalancer API requires no more callbacks on LoadBalancer after shutdown() is called.
verifyNoMoreInteractions(origLb, mockStateListeners[i]);
// No more health check call is made or scheduled
assertThat(healthImpls[i].calls).isEmpty();
}
// LoadBalancer API requires no more callbacks on LoadBalancer after shutdown() is called.
verifyNoMoreInteractions(origLb, mockListener);
// No more health check call is made or scheduled
assertThat(healthImpl.calls).isEmpty();
assertThat(clock.getPendingTasks()).isEmpty(); assertThat(clock.getPendingTasks()).isEmpty();
} }
@ -1156,6 +1169,7 @@ public class HealthCheckingLoadBalancerFactoryTest {
final ArrayList<String> logs = new ArrayList<>(); final ArrayList<String> logs = new ArrayList<>();
final int index; final int index;
SubchannelStateListener listener; SubchannelStateListener listener;
boolean isShutdown;
private final ChannelLogger logger = new ChannelLogger() { private final ChannelLogger logger = new ChannelLogger() {
@Override @Override
public void log(ChannelLogLevel level, String msg) { public void log(ChannelLogLevel level, String msg) {
@ -1183,6 +1197,7 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Override @Override
public void shutdown() { public void shutdown() {
isShutdown = true;
deliverSubchannelState(index, ConnectivityStateInfo.forNonError(SHUTDOWN)); deliverSubchannelState(index, ConnectivityStateInfo.forNonError(SHUTDOWN));
} }