xds: ignore balancing state update from downstream after LB shutdown (#8134)

LoadBalancers should not propagate balancing state updates after itself being shutdown.

For LB policies that maintain a group of child LB policies with each having its independent lifetime, balancing state update propagations from each child LB policy can go out of the lifetime of its parent easily, especially for cases that balancing state update is put to the back of the queue and not propagated up inline.

For LBs that are simple pass-through in the middle of the LB tree structure, it isn't a big issue as its lifecycle would be the same as its child. Transitively, It would behave correctly as long as its downstream is doing in the right way.

This change is a sanity cleanup for LB policies that maintain multiple child LB policies to preserve the invariant that further balancing state updates from their child policies will not get propagated.
This commit is contained in:
Chengyuan Zhang 2021-05-04 15:56:56 -07:00 committed by GitHub
parent ee000f0dc1
commit fcaf9a9583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 92 additions and 5 deletions

View File

@ -126,6 +126,7 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
for (ChildLbState state : childLbStates.values()) {
state.shutdown();
}
childLbStates.clear();
}
private void updateOverallBalancingState() {
@ -237,7 +238,6 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
}
void shutdown() {
deactivated = true;
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
@ -253,10 +253,13 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
syncContext.execute(new Runnable() {
@Override
public void run() {
currentState = newState;
currentPicker = newPicker;
if (!childLbStates.containsKey(name)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated) {
updateOverallBalancingState();
}

View File

@ -123,6 +123,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
for (ChildLbState child : children.values()) {
child.tearDown();
}
children.clear();
}
private void tryNextPriority(boolean reportConnecting) {
@ -292,6 +293,9 @@ final class PriorityLoadBalancer extends LoadBalancer {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!children.containsKey(priority)) {
return;
}
connectivityState = newState;
picker = newPicker;
if (deletionTimer != null && deletionTimer.isPending()) {

View File

@ -71,7 +71,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
for (String targetName : newTargets.keySet()) {
WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName);
if (!targets.containsKey(targetName)) {
ChildHelper childHelper = new ChildHelper();
ChildHelper childHelper = new ChildHelper(targetName);
GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper);
childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider());
childHelpers.put(targetName, childHelper);
@ -125,6 +125,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
for (LoadBalancer childBalancer : childBalancers.values()) {
childBalancer.shutdown();
}
childBalancers.clear();
}
private void updateOverallBalancingState() {
@ -179,15 +180,23 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
}
private final class ChildHelper extends ForwardingLoadBalancerHelper {
String name;
ConnectivityState currentState = CONNECTING;
SubchannelPicker currentPicker = BUFFER_PICKER;
private ChildHelper(String name) {
this.name = name;
}
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!childBalancers.containsKey(name)) {
return;
}
currentState = newState;
currentPicker = newPicker;
updateOverallBalancingState();

View File

@ -21,10 +21,12 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
@ -98,6 +100,7 @@ public class ClusterManagerLoadBalancerTest {
lbConfigInventory.put("childB", new Object());
lbConfigInventory.put("childC", null);
clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper);
clearInvocations(helper);
}
@After
@ -185,13 +188,28 @@ public class ClusterManagerLoadBalancerTest {
verify(helper, never()).updateBalancingState(
eq(ConnectivityState.READY), any(SubchannelPicker.class));
// reactivate policy_a
// Reactivate policy_a, balancing state update reflects the latest connectivity state and
// picker.
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel())
.isEqualTo(subchannel);
}
@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
verify(helper).updateBalancingState(
eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
FakeLoadBalancer childBalancer = childBalancers.iterator().next();
// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
clusterManagerLoadBalancer.shutdown();
childBalancer.deliverSubchannelState(mock(Subchannel.class), ConnectivityState.READY);
verifyNoMoreInteractions(helper);
}
@Test
public void handleNameResolutionError_beforeChildLbsInstantiated_returnErrorPicker() {
clusterManagerLoadBalancer.handleNameResolutionError(

View File

@ -20,12 +20,16 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -115,6 +119,7 @@ public class PriorityLoadBalancerTest {
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
priorityLb = new PriorityLoadBalancer(helper);
clearInvocations(helper);
}
@After
@ -420,6 +425,31 @@ public class PriorityLoadBalancerTest {
verify(helper).refreshNameResolution();
}
@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), true);
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(new PolicySelection(fooLbProvider, new Object()), false);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
priorityLb.shutdown();
Helper priorityHelper0 = Iterables.getOnlyElement(fooHelpers); // priority p0
priorityHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
verifyNoMoreInteractions(helper);
}
private void assertLatestConnectivityState(ConnectivityState expectedState) {
verify(helper, atLeastOnce())
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());

View File

@ -25,10 +25,12 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
@ -164,6 +166,7 @@ public class WeightedTargetLoadBalancerTest {
lbRegistry.register(barLbProvider);
weightedTargetLb = new WeightedTargetLoadBalancer(helper);
clearInvocations(helper);
}
@After
@ -379,4 +382,24 @@ public class WeightedTargetLoadBalancerTest {
new WeightedChildPicker(weights[2], failurePickers[2]),
new WeightedChildPicker(weights[3], failurePickers[3]));
}
@Test
public void raceBetweenShutdownAndChildLbBalancingStateUpdate() {
Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
"target0", weightedLbConfig0,
"target1", weightedLbConfig1);
weightedTargetLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.
weightedTargetLb.shutdown();
Helper weightedChildHelper0 = childHelpers.iterator().next();
weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
verifyNoMoreInteractions(helper);
}
}