mirror of https://github.com/grpc/grpc-java.git
xds: cluster manager to delay picker updates (#9365)
Do not perform picker updates while handling new addresses even if child LBs request it. Assure that a single picker update is done.
This commit is contained in:
parent
eb25807d43
commit
49f555192d
|
|
@ -57,6 +57,8 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
|
|||
private final SynchronizationContext syncContext;
|
||||
private final ScheduledExecutorService timeService;
|
||||
private final XdsLogger logger;
|
||||
// Set to true if currently in the process of handling resolved addresses.
|
||||
private boolean resolvingAddresses;
|
||||
|
||||
ClusterManagerLoadBalancer(Helper helper) {
|
||||
this.helper = checkNotNull(helper, "helper");
|
||||
|
|
@ -69,6 +71,15 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
|
|||
|
||||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
try {
|
||||
resolvingAddresses = true;
|
||||
handleResolvedAddressesInternal(resolvedAddresses);
|
||||
} finally {
|
||||
resolvingAddresses = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
|
||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||
ClusterManagerConfig config = (ClusterManagerConfig)
|
||||
resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
|
|
@ -251,21 +262,18 @@ class ClusterManagerLoadBalancer extends LoadBalancer {
|
|||
@Override
|
||||
public void updateBalancingState(final ConnectivityState newState,
|
||||
final SubchannelPicker newPicker) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!childLbStates.containsKey(name)) {
|
||||
return;
|
||||
}
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
currentState = newState;
|
||||
currentPicker = newPicker;
|
||||
if (!deactivated) {
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
}
|
||||
});
|
||||
// If we are already in the process of resolving addresses, the overall balancing state
|
||||
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||
if (resolvingAddresses || !childLbStates.containsKey(name)) {
|
||||
return;
|
||||
}
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
currentState = newState;
|
||||
currentPicker = newPicker;
|
||||
if (!deactivated) {
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
|
|
@ -52,6 +53,7 @@ import io.grpc.internal.PickSubchannelArgsImpl;
|
|||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -249,6 +251,15 @@ public class ClusterManagerLoadBalancerTest {
|
|||
assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("unknown error");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noDuplicateOverallBalancingStateUpdate() {
|
||||
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
|
||||
|
||||
// The test child LBs would have triggered state updates, let's make sure the overall balancing
|
||||
// state was only updated once.
|
||||
verify(helper, times(1)).updateBalancingState(any(), any());
|
||||
}
|
||||
|
||||
private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
|
||||
clusterManagerLoadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
|
|
@ -329,6 +340,10 @@ public class ClusterManagerLoadBalancerTest {
|
|||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
config = resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
|
||||
// Update balancing state here so that concurrent child state changes can be easily tested.
|
||||
// Most tests ignore this and trigger separate child LB updates.
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue