mirror of https://github.com/grpc/grpc-java.git
xds: weighted target to delay picker updates while updating children (#9306)
This commit is contained in:
parent
7b80954fd5
commit
79c607e5ac
|
|
@ -28,7 +28,6 @@ import io.grpc.ConnectivityState;
|
||||||
import io.grpc.InternalLogId;
|
import io.grpc.InternalLogId;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.SynchronizationContext;
|
|
||||||
import io.grpc.util.ForwardingLoadBalancerHelper;
|
import io.grpc.util.ForwardingLoadBalancerHelper;
|
||||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||||
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
|
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
|
||||||
|
|
@ -49,13 +48,13 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
|
||||||
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
|
private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
|
||||||
private final Map<String, ChildHelper> childHelpers = new HashMap<>();
|
private final Map<String, ChildHelper> childHelpers = new HashMap<>();
|
||||||
private final Helper helper;
|
private final Helper helper;
|
||||||
private final SynchronizationContext syncContext;
|
|
||||||
|
|
||||||
private Map<String, WeightedPolicySelection> targets = ImmutableMap.of();
|
private Map<String, WeightedPolicySelection> targets = ImmutableMap.of();
|
||||||
|
// Set to true if currently in the process of handling resolved addresses.
|
||||||
|
private boolean resolvingAddresses;
|
||||||
|
|
||||||
WeightedTargetLoadBalancer(Helper helper) {
|
WeightedTargetLoadBalancer(Helper helper) {
|
||||||
this.helper = checkNotNull(helper, "helper");
|
this.helper = checkNotNull(helper, "helper");
|
||||||
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
|
|
||||||
logger = XdsLogger.withLogId(
|
logger = XdsLogger.withLogId(
|
||||||
InternalLogId.allocate("weighted-target-lb", helper.getAuthority()));
|
InternalLogId.allocate("weighted-target-lb", helper.getAuthority()));
|
||||||
logger.log(XdsLogLevel.INFO, "Created");
|
logger.log(XdsLogLevel.INFO, "Created");
|
||||||
|
|
@ -63,6 +62,15 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
|
try {
|
||||||
|
resolvingAddresses = true;
|
||||||
|
handleResolvedAddressesInternal(resolvedAddresses);
|
||||||
|
} finally {
|
||||||
|
resolvingAddresses = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
|
||||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||||
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
|
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||||
checkNotNull(lbConfig, "missing weighted_target lb config");
|
checkNotNull(lbConfig, "missing weighted_target lb config");
|
||||||
|
|
@ -191,17 +199,14 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public void updateBalancingState(final ConnectivityState newState,
|
public void updateBalancingState(final ConnectivityState newState,
|
||||||
final SubchannelPicker newPicker) {
|
final SubchannelPicker newPicker) {
|
||||||
syncContext.execute(new Runnable() {
|
currentState = newState;
|
||||||
@Override
|
currentPicker = newPicker;
|
||||||
public void run() {
|
|
||||||
if (!childBalancers.containsKey(name)) {
|
// If we are already in the process of resolving addresses, the overall balancing state
|
||||||
return;
|
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||||
}
|
if (!resolvingAddresses && childBalancers.containsKey(name)) {
|
||||||
currentState = newState;
|
updateOverallBalancingState();
|
||||||
currentPicker = newPicker;
|
}
|
||||||
updateOverallBalancingState();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -402,4 +402,73 @@ public class WeightedTargetLoadBalancerTest {
|
||||||
weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
|
weightedChildHelper0.updateBalancingState(READY, mock(SubchannelPicker.class));
|
||||||
verifyNoMoreInteractions(helper);
|
verifyNoMoreInteractions(helper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When the ChildHelper is asked to update the overall balancing state, it should not do that if
|
||||||
|
// the update was triggered by the parent LB that will handle triggering the overall state update.
|
||||||
|
@Test
|
||||||
|
public void noDuplicateOverallBalancingStateUpdate() {
|
||||||
|
FakeLoadBalancerProvider fakeLbProvider = new FakeLoadBalancerProvider();
|
||||||
|
|
||||||
|
Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
|
||||||
|
"target0", new WeightedPolicySelection(
|
||||||
|
weights[0], new PolicySelection(fakeLbProvider, configs[0])),
|
||||||
|
"target3", new WeightedPolicySelection(
|
||||||
|
weights[3], new PolicySelection(fakeLbProvider, configs[3])));
|
||||||
|
weightedTargetLb.handleResolvedAddresses(
|
||||||
|
ResolvedAddresses.newBuilder()
|
||||||
|
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||||
|
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
|
||||||
|
.build());
|
||||||
|
|
||||||
|
// Both of the two child LB policies will call the helper to update the balancing state.
|
||||||
|
// But since those calls happen during the handling of teh resolved addresses of the parent
|
||||||
|
// WeightedTargetLLoadBalancer, the overall balancing state should only be updated once.
|
||||||
|
verify(helper, times(1)).updateBalancingState(any(), any());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FakeLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAvailable() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getPriority() {
|
||||||
|
return 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPolicyName() {
|
||||||
|
return "foo";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||||
|
return new FakeLoadBalancer(helper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class FakeLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
|
private Helper helper;
|
||||||
|
|
||||||
|
FakeLoadBalancer(Helper helper) {
|
||||||
|
this.helper = helper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
|
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleNameResolutionError(Status error) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue