xds: Use acceptResolvedAddresses() for PriorityLb children

PriorityLb should propagate config problems up to the name resolver so
it can refresh.
This commit is contained in:
Eric Anderson 2025-02-20 15:13:53 -08:00
parent f207be39a9
commit 110c1ff0d6
2 changed files with 207 additions and 33 deletions

View File

@ -91,6 +91,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
checkNotNull(config, "missing priority lb config");
priorityNames = config.priorities;
priorityConfigs = config.childConfigs;
Status status = Status.OK;
Set<String> prioritySet = new HashSet<>(config.priorities);
ArrayList<String> childKeys = new ArrayList<>(children.keySet());
for (String priority : childKeys) {
@ -105,12 +106,18 @@ final class PriorityLoadBalancer extends LoadBalancer {
for (String priority : priorityNames) {
ChildLbState childLbState = children.get(priority);
if (childLbState != null) {
childLbState.updateResolvedAddresses();
Status newStatus = childLbState.updateResolvedAddresses();
if (!newStatus.isOk()) {
status = newStatus;
}
}
}
handlingResolvedAddresses = false;
tryNextPriority();
return Status.OK;
Status newStatus = tryNextPriority();
if (!newStatus.isOk()) {
status = newStatus;
}
return status;
}
@Override
@ -140,7 +147,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
children.clear();
}
private void tryNextPriority() {
private Status tryNextPriority() {
for (int i = 0; i < priorityNames.size(); i++) {
String priority = priorityNames.get(i);
if (!children.containsKey(priority)) {
@ -151,8 +158,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
// Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
// called recursively. We need to be sure to be done with processing here before it is
// called.
child.updateResolvedAddresses();
return; // Give priority i time to connect.
return child.updateResolvedAddresses(); // Give priority i time to connect.
}
ChildLbState child = children.get(priority);
child.reactivate();
@ -165,16 +171,16 @@ final class PriorityLoadBalancer extends LoadBalancer {
children.get(p).deactivate();
}
}
return;
return Status.OK;
}
if (child.failOverTimer != null && child.failOverTimer.isPending()) {
updateOverallState(priority, child.connectivityState, child.picker);
return; // Give priority i time to connect.
return Status.OK; // Give priority i time to connect.
}
if (priority.equals(currentPriority) && child.connectivityState != TRANSIENT_FAILURE) {
// If the current priority is not changed into TRANSIENT_FAILURE, keep using it.
updateOverallState(priority, child.connectivityState, child.picker);
return;
return Status.OK;
}
}
// TODO(zdapeng): Include error details of each priority.
@ -182,6 +188,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
String lastPriority = priorityNames.get(priorityNames.size() - 1);
SubchannelPicker errorPicker = children.get(lastPriority).picker;
updateOverallState(lastPriority, TRANSIENT_FAILURE, errorPicker);
return Status.OK;
}
private void updateOverallState(
@ -228,7 +235,11 @@ final class PriorityLoadBalancer extends LoadBalancer {
Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority)));
logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
currentPriority = null; // reset currentPriority to guarantee failover happen
tryNextPriority();
Status status = tryNextPriority();
if (!status.isOk()) {
// A child had a problem with the addresses/config. Request it to be refreshed
helper.refreshNameResolution();
}
}
}
@ -279,10 +290,10 @@ final class PriorityLoadBalancer extends LoadBalancer {
* resolvedAddresses}, or when priority lb receives a new resolved addresses while the child
* already exists.
*/
void updateResolvedAddresses() {
Status updateResolvedAddresses() {
PriorityLbConfig config =
(PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
lb.handleResolvedAddresses(
return lb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), priority))
.setLoadBalancingPolicyConfig(config.childConfigs.get(priority).childConfig)
@ -331,7 +342,11 @@ final class PriorityLoadBalancer extends LoadBalancer {
// If we are currently handling newly resolved addresses, let's not try to reconfigure as
// the address handling process will take care of that to provide an atomic config update.
if (!handlingResolvedAddresses) {
tryNextPriority();
Status status = tryNextPriority();
if (!status.isOk()) {
// A child had a problem with the addresses/config. Request it to be refreshed
helper.refreshNameResolution();
}
}
}

View File

@ -33,6 +33,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -97,6 +98,8 @@ public class PriorityLoadBalancerTest {
public LoadBalancer newLoadBalancer(Helper helper) {
fooHelpers.add(helper);
LoadBalancer childBalancer = mock(LoadBalancer.class);
when(childBalancer.acceptResolvedAddresses(any(ResolvedAddresses.class)))
.thenReturn(Status.OK);
fooBalancers.add(childBalancer);
return childBalancer;
}
@ -107,6 +110,8 @@ public class PriorityLoadBalancerTest {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
LoadBalancer childBalancer = mock(LoadBalancer.class);
when(childBalancer.acceptResolvedAddresses(any(ResolvedAddresses.class)))
.thenReturn(Status.OK);
barBalancers.add(childBalancer);
return childBalancer;
}
@ -141,7 +146,7 @@ public class PriorityLoadBalancerTest {
}
@Test
public void handleResolvedAddresses() {
public void acceptResolvedAddresses() {
SocketAddress socketAddress = new InetSocketAddress(8080);
EquivalentAddressGroup eag = new EquivalentAddressGroup(socketAddress);
eag = AddressFilter.setPathFilter(eag, ImmutableList.of("p1"));
@ -162,16 +167,17 @@ public class PriorityLoadBalancerTest {
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1,
"p2", priorityChildConfig2),
ImmutableList.of("p0", "p1", "p2"));
priorityLb.handleResolvedAddresses(
Status status = priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).isEmpty();
LoadBalancer fooBalancer0 = Iterables.getOnlyElement(fooBalancers);
verify(fooBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture());
verify(fooBalancer0).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
ResolvedAddresses addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).isEmpty();
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
@ -182,7 +188,7 @@ public class PriorityLoadBalancerTest {
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).hasSize(1);
LoadBalancer barBalancer0 = Iterables.getOnlyElement(barBalancers);
verify(barBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture());
verify(barBalancer0).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(Iterables.getOnlyElement(addressesReceived.getAddresses()).getAddresses())
.containsExactly(socketAddress);
@ -194,7 +200,7 @@ public class PriorityLoadBalancerTest {
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
LoadBalancer fooBalancer1 = Iterables.getLast(fooBalancers);
verify(fooBalancer1).handleResolvedAddresses(resolvedAddressesCaptor.capture());
verify(fooBalancer1).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).isEmpty();
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
@ -211,14 +217,15 @@ public class PriorityLoadBalancerTest {
ImmutableMap.of("p1",
new PriorityChildConfig(newChildConfig(barLbProvider, newBarConfig), true)),
ImmutableList.of("p1"));
priorityLb.handleResolvedAddresses(
status = priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(newAddresses)
.setLoadBalancingPolicyConfig(newPriorityLbConfig)
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
verify(barBalancer0, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
verify(barBalancer0, times(2)).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(Iterables.getOnlyElement(addressesReceived.getAddresses()).getAddresses())
.containsExactly(newSocketAddress);
@ -232,6 +239,60 @@ public class PriorityLoadBalancerTest {
verify(barBalancer0, never()).shutdown();
}
@Test
public void acceptResolvedAddresses_propagatesChildFailures() {
LoadBalancerProvider lbProvider = new CannedLoadBalancer.Provider();
CannedLoadBalancer.Config internalTf = new CannedLoadBalancer.Config(
Status.INTERNAL, TRANSIENT_FAILURE);
CannedLoadBalancer.Config okTf = new CannedLoadBalancer.Config(Status.OK, TRANSIENT_FAILURE);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of())
.setAttributes(Attributes.EMPTY)
.build();
// tryNewPriority() propagates status
Status status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(lbProvider, internalTf, true)),
ImmutableList.of("p0")))
.build());
assertThat(status.getCode()).isNotEqualTo(Status.Code.OK);
// Updating a child propagates status
status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(lbProvider, internalTf, true)),
ImmutableList.of("p0")))
.build());
assertThat(status.getCode()).isNotEqualTo(Status.Code.OK);
// A single pre-existing child failure propagates
status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(lbProvider, okTf, true),
"p1", newPriorityChildConfig(lbProvider, okTf, true),
"p2", newPriorityChildConfig(lbProvider, okTf, true)),
ImmutableList.of("p0", "p1", "p2")))
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(lbProvider, okTf, true),
"p1", newPriorityChildConfig(lbProvider, internalTf, true),
"p2", newPriorityChildConfig(lbProvider, okTf, true)),
ImmutableList.of("p0", "p1", "p2")))
.build());
assertThat(status.getCode()).isNotEqualTo(Status.Code.OK);
}
@Test
public void handleNameResolutionError() {
Object fooConfig0 = new Object();
@ -243,7 +304,7 @@ public class PriorityLoadBalancerTest {
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -255,7 +316,7 @@ public class PriorityLoadBalancerTest {
priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p1", priorityChildConfig1), ImmutableList.of("p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -286,7 +347,7 @@ public class PriorityLoadBalancerTest {
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1,
"p2", priorityChildConfig2, "p3", priorityChildConfig3),
ImmutableList.of("p0", "p1", "p2", "p3"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -419,7 +480,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -455,7 +516,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -497,7 +558,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -530,7 +591,7 @@ public class PriorityLoadBalancerTest {
// resolution update without priority change does not trigger failover
Attributes.Key<String> fooKey = Attributes.Key.create("fooKey");
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -559,7 +620,7 @@ public class PriorityLoadBalancerTest {
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1,
"p2", priorityChildConfig2, "p3", priorityChildConfig3),
ImmutableList.of("p0", "p1", "p2", "p3"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -652,6 +713,55 @@ public class PriorityLoadBalancerTest {
verify(balancer3).shutdown();
}
@Test
public void failover_propagatesChildFailures() {
LoadBalancerProvider lbProvider = new CannedLoadBalancer.Provider();
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of())
.setAttributes(Attributes.EMPTY)
.build();
Status status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(
lbProvider, new CannedLoadBalancer.Config(Status.OK, TRANSIENT_FAILURE), true),
"p1", newPriorityChildConfig(
lbProvider, new CannedLoadBalancer.Config(Status.INTERNAL, CONNECTING), true)),
ImmutableList.of("p0", "p1")))
.build());
// Since P1's activation wasn't noticed by the result status, it triggered name resolution
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
verify(helper).refreshNameResolution();
}
@Test
public void failoverTimer_propagatesChildFailures() {
LoadBalancerProvider lbProvider = new CannedLoadBalancer.Provider();
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of())
.setAttributes(Attributes.EMPTY)
.build();
Status status = priorityLb.acceptResolvedAddresses(
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(new PriorityLbConfig(
ImmutableMap.of(
"p0", newPriorityChildConfig(
lbProvider, new CannedLoadBalancer.Config(Status.OK, CONNECTING), true),
"p1", newPriorityChildConfig(
lbProvider, new CannedLoadBalancer.Config(Status.INTERNAL, CONNECTING), true)),
ImmutableList.of("p0", "p1")))
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
// P1's activation will refresh name resolution
verify(helper, never()).refreshNameResolution();
fakeClock.forwardTime(10, TimeUnit.SECONDS);
verify(helper).refreshNameResolution();
}
@Test
public void bypassReresolutionRequestsIfConfiged() {
PriorityChildConfig priorityChildConfig0 =
@ -662,7 +772,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -690,7 +800,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -717,7 +827,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0),
ImmutableList.of("p0"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -727,7 +837,7 @@ public class PriorityLoadBalancerTest {
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1),
ImmutableList.of("p0", "p1"));
priorityLb.handleResolvedAddresses(
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
@ -769,6 +879,11 @@ public class PriorityLoadBalancerTest {
return GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(provider, config);
}
private PriorityChildConfig newPriorityChildConfig(
LoadBalancerProvider provider, Object config, boolean ignoreRefresh) {
return new PriorityChildConfig(newChildConfig(provider, config), ignoreRefresh);
}
private static class FakeLoadBalancerProvider extends LoadBalancerProvider {
@Override
@ -801,9 +916,10 @@ public class PriorityLoadBalancerTest {
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
helper.updateBalancingState(
TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
return Status.OK;
}
@Override
@ -814,4 +930,47 @@ public class PriorityLoadBalancerTest {
public void shutdown() {
}
}
static final class CannedLoadBalancer extends LoadBalancer {
private final Helper helper;
private CannedLoadBalancer(Helper helper) {
this.helper = helper;
}
@Override
public Status acceptResolvedAddresses(ResolvedAddresses addresses) {
Config config = (Config) addresses.getLoadBalancingPolicyConfig();
helper.updateBalancingState(
config.state, new FixedResultPicker(PickResult.withError(Status.INTERNAL)));
return config.resolvedAddressesResult;
}
@Override
public void handleNameResolutionError(Status status) {}
@Override
public void shutdown() {}
static final class Provider extends StandardLoadBalancerProvider {
public Provider() {
super("echo");
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new CannedLoadBalancer(helper);
}
}
static final class Config {
final Status resolvedAddressesResult;
final ConnectivityState state;
public Config(Status resolvedAddressesResult, ConnectivityState state) {
this.resolvedAddressesResult = resolvedAddressesResult;
this.state = state;
}
}
}
}