xds: implement priority lb

This commit is contained in:
ZHANG Dapeng 2020-05-29 12:47:46 -07:00 committed by GitHub
parent 62620ccd00
commit 0cb91d97bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 782 additions and 0 deletions

View File

@ -0,0 +1,291 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/** Load balancer for priority policy. */
final class PriorityLoadBalancer extends LoadBalancer {
private final Helper helper;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService executor;
private final XdsLogger logger;
// Includes all active and deactivated children. Mutable. New entries are only added from priority
// 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation.
private final Map<String, ChildLbState> children = new HashMap<>();
// Following fields are only null initially.
private ResolvedAddresses resolvedAddresses;
private List<String> priorityNames;
private Map<String, Integer> priorityNameToIndex;
private ConnectivityState currentConnectivityState;
private SubchannelPicker currentPicker;
PriorityLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
syncContext = helper.getSynchronizationContext();
executor = helper.getScheduledExecutorService();
InternalLogId logId = InternalLogId.allocate("priority-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
this.resolvedAddresses = resolvedAddresses;
PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(config, "missing priority lb config");
priorityNames = config.priorities;
Map<String, Integer> pToI = new HashMap<>();
for (int i = 0; i < priorityNames.size(); i++) {
pToI.put(priorityNames.get(i), i);
}
priorityNameToIndex = Collections.unmodifiableMap(pToI);
for (String priority : children.keySet()) {
if (!priorityNameToIndex.containsKey(priority)) {
children.get(priority).deactivate();
}
}
for (String priority : priorityNames) {
if (children.containsKey(priority)) {
children.get(priority).updateResolvedAddresses();
}
}
// Not to report connecting in case a pending priority bumps up on top of the current READY
// priority.
tryNextPriority(false);
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (children.isEmpty()) {
updateOverallState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
for (ChildLbState child : children.values()) {
child.lb.handleNameResolutionError(error);
}
}
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
for (ChildLbState child : children.values()) {
child.tearDown();
}
}
private void tryNextPriority(boolean reportConnecting) {
for (int i = 0; i < priorityNames.size(); i++) {
String priority = priorityNames.get(i);
if (!children.containsKey(priority)) {
ChildLbState child = new ChildLbState(priority);
children.put(priority, child);
child.updateResolvedAddresses();
updateOverallState(CONNECTING, BUFFER_PICKER);
return; // Give priority i time to connect.
}
ChildLbState child = children.get(priority);
child.reactivate();
if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) {
logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority);
updateOverallState(child.connectivityState, child.picker);
for (int j = i + 1; j < priorityNames.size(); j++) {
String p = priorityNames.get(j);
if (children.containsKey(p)) {
children.get(p).deactivate();
}
}
return;
}
if (child.failOverTimer != null && child.failOverTimer.isPending()) {
if (reportConnecting) {
updateOverallState(CONNECTING, BUFFER_PICKER);
}
return; // Give priority i time to connect.
}
}
// TODO(zdapeng): Include error details of each priority.
logger.log(XdsLogLevel.DEBUG, "All priority failed");
updateOverallState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
}
private void updateOverallState(ConnectivityState state, SubchannelPicker picker) {
if (!state.equals(currentConnectivityState) || !picker.equals(currentPicker)) {
currentConnectivityState = state;
currentPicker = picker;
helper.updateBalancingState(state, picker);
}
}
private final class ChildLbState {
final String priority;
final ChildHelper childHelper;
final GracefulSwitchLoadBalancer lb;
// Timer to fail over to the next priority if not connected in 10 sec. Scheduled only once at
// child initialization.
final ScheduledHandle failOverTimer;
// Timer to delay shutdown and deletion of the priority. Scheduled whenever the child is
// deactivated.
@Nullable ScheduledHandle deletionTimer;
@Nullable String policy;
ConnectivityState connectivityState = CONNECTING;
SubchannelPicker picker = BUFFER_PICKER;
ChildLbState(final String priority) {
this.priority = priority;
childHelper = new ChildHelper();
lb = new GracefulSwitchLoadBalancer(childHelper);
class FailOverTask implements Runnable {
@Override
public void run() {
if (deletionTimer != null && deletionTimer.isPending()) {
// The child is deactivated.
return;
}
logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
tryNextPriority(true);
}
}
failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, executor);
logger.log(XdsLogLevel.DEBUG, "Priority created: {0}", priority);
}
/**
* Called when the child becomes a priority that is or appears before the first READY one in the
* {@code priorities} list, due to either config update or balancing state update.
*/
void reactivate() {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
logger.log(XdsLogLevel.DEBUG, "Priority reactivated: {0}", priority);
}
}
/**
* Called when either the child is removed by config update, or a higher priority becomes READY.
*/
void deactivate() {
if (deletionTimer != null && deletionTimer.isPending()) {
return;
}
class DeletionTask implements Runnable {
@Override
public void run() {
tearDown();
children.remove(priority);
}
}
deletionTimer = syncContext.schedule(new DeletionTask(), 15, TimeUnit.MINUTES, executor);
logger.log(XdsLogLevel.DEBUG, "Priority deactivated: {0}", priority);
}
void tearDown() {
if (failOverTimer.isPending()) {
failOverTimer.cancel();
}
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
lb.shutdown();
logger.log(XdsLogLevel.DEBUG, "Priority deleted: {0}", priority);
}
/**
* Called either when the child is just created and in this case updated with the cached {@code
* resolvedAddresses}, or when priority lb receives a new resolved addresses while the child
* already exists.
*/
void updateResolvedAddresses() {
final ResolvedAddresses addresses = resolvedAddresses;
syncContext.execute(
new Runnable() {
@Override
public void run() {
PriorityLbConfig config = (PriorityLbConfig) addresses.getLoadBalancingPolicyConfig();
PolicySelection childPolicySelection = config.childConfigs.get(priority);
LoadBalancerProvider lbProvider = childPolicySelection.getProvider();
String newPolicy = lbProvider.getPolicyName();
if (!newPolicy.equals(policy)) {
policy = newPolicy;
lb.switchTo(lbProvider);
}
// TODO(zdapeng): Implement address filtering.
lb.handleResolvedAddresses(
addresses
.toBuilder()
.setLoadBalancingPolicyConfig(childPolicySelection.getConfig())
.build());
}
});
}
final class ChildHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
connectivityState = newState;
picker = newPicker;
if (deletionTimer != null && deletionTimer.isPending()) {
return;
}
if (failOverTimer.isPending()) {
if (newState.equals(READY) || newState.equals(TRANSIENT_FAILURE)) {
failOverTimer.cancel();
}
}
tryNextPriority(true);
}
@Override
protected Helper delegate() {
return helper;
}
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
/** Provider for priority load balancing policy. */
final class PriorityLoadBalancerProvider extends LoadBalancerProvider {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "priority_experimental";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new PriorityLoadBalancer(helper);
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
throw new UnsupportedOperationException();
}
static final class PriorityLbConfig {
final Map<String, PolicySelection> childConfigs;
final List<String> priorities;
PriorityLbConfig(Map<String, PolicySelection> childConfigs, List<String> priorities) {
this.childConfigs = Collections.unmodifiableMap(checkNotNull(childConfigs, "childConfigs"));
this.priorities = Collections.unmodifiableList(checkNotNull(priorities, "priorities"));
checkArgument(!priorities.isEmpty(), "priority list is empty");
checkArgument(
childConfigs.keySet().containsAll(priorities),
"missing child config for at lease one of the priorities");
checkArgument(
priorities.size() == new HashSet<>(priorities).size(),
"duplicate names in priorities");
checkArgument(
priorities.size() == childConfigs.keySet().size(),
"some names in childConfigs are not referenced by priorities");
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("childConfigs", childConfigs)
.add("priorities", priorities)
.toString();
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static org.mockito.Mockito.mock;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.LoadBalancerProvider;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import java.util.List;
import java.util.Map;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link PriorityLoadBalancerProvider}. */
@RunWith(JUnit4.class)
public class PriorityLoadBalancerProviderTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Test
public void priorityLbConfig_emptyPriorities() {
Map<String, PolicySelection> childConfigs =
ImmutableMap.of("p0", new PolicySelection(mock(LoadBalancerProvider.class), null, null));
List<String> priorities = ImmutableList.of();
thrown.expect(IllegalArgumentException.class);
new PriorityLbConfig(childConfigs, priorities);
}
@Test
public void priorityLbConfig_missingChildConfig() {
Map<String, PolicySelection> childConfigs =
ImmutableMap.of("p1", new PolicySelection(mock(LoadBalancerProvider.class), null, null));
List<String> priorities = ImmutableList.of("p0", "p1");
thrown.expect(IllegalArgumentException.class);
new PriorityLbConfig(childConfigs, priorities);
}
}

View File

@ -0,0 +1,345 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.internal.TestUtils.StandardLoadBalancerProvider;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/** Tests for {@link PriorityLoadBalancer}. */
@RunWith(JUnit4.class)
public class PriorityLoadBalancerTest {
private final List<LoadBalancer> fooBalancers = new ArrayList<>();
private final List<LoadBalancer> barBalancers = new ArrayList<>();
private final List<Helper> fooHelpers = new ArrayList<>();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final FakeClock fakeClock = new FakeClock();
private final LoadBalancerProvider fooLbProvider =
new StandardLoadBalancerProvider("foo_policy") {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
fooHelpers.add(helper);
LoadBalancer childBalancer = mock(LoadBalancer.class);
fooBalancers.add(childBalancer);
return childBalancer;
}
};
private final LoadBalancerProvider barLbProvider =
new StandardLoadBalancerProvider("bar_policy") {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
LoadBalancer childBalancer = mock(LoadBalancer.class);
barBalancers.add(childBalancer);
return childBalancer;
}
};
@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
@Mock private Helper helper;
@Captor ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor;
@Captor ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
@Captor ArgumentCaptor<SubchannelPicker> pickerCaptor;
private PriorityLoadBalancer priorityLb;
@Before
public void setUp() {
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
priorityLb = new PriorityLoadBalancer(helper);
}
@After
public void tearDown() {
priorityLb.shutdown();
for (LoadBalancer lb : fooBalancers) {
verify(lb).shutdown();
}
for (LoadBalancer lb : barBalancers) {
verify(lb).shutdown();
}
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
@Test
public void handleResolvedAddresses() {
List<EquivalentAddressGroup> addresses =
ImmutableList.of(new EquivalentAddressGroup(new InetSocketAddress(8080)));
Attributes attributes =
Attributes.newBuilder().set(Attributes.Key.create("fakeKey"), "fakeValue").build();
Object fooConfig0 = new Object();
PolicySelection fooPolicy0 = new PolicySelection(fooLbProvider, null, fooConfig0);
Object barConfig0 = new Object();
PolicySelection barPolicy0 = new PolicySelection(barLbProvider, null, barConfig0);
Object fooConfig1 = new Object();
PolicySelection fooPolicy1 = new PolicySelection(fooLbProvider, null, fooConfig1);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", fooPolicy0, "p1", barPolicy0, "p2", fooPolicy1),
ImmutableList.of("p0", "p1", "p2"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).isEmpty();
LoadBalancer fooBalancer0 = Iterables.getOnlyElement(fooBalancers);
verify(fooBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture());
ResolvedAddresses addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses);
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig0);
// Fail over to p1.
fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).hasSize(1);
LoadBalancer barBalancer0 = Iterables.getOnlyElement(barBalancers);
verify(barBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses);
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(barConfig0);
// Fail over to p2.
fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
LoadBalancer fooBalancer1 = Iterables.getLast(fooBalancers);
verify(fooBalancer1).handleResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses);
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig1);
// New update: p0 and p2 deleted; p1 config changed.
List<EquivalentAddressGroup> newAddresses =
ImmutableList.of(new EquivalentAddressGroup(new InetSocketAddress(8081)));
Object newBarConfig = new Object();
PolicySelection newBarPolicy = new PolicySelection(barLbProvider, null, newBarConfig);
PriorityLbConfig newPriorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p1", newBarPolicy), ImmutableList.of("p1"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(newAddresses)
.setLoadBalancingPolicyConfig(newPriorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
verify(barBalancer0, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(newAddresses);
assertThat(addressesReceived.getAttributes()).isEqualTo(Attributes.EMPTY);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(newBarConfig);
verify(fooBalancer0, never()).shutdown();
verify(fooBalancer1, never()).shutdown();
fakeClock.forwardTime(15, TimeUnit.MINUTES);
verify(fooBalancer0).shutdown();
verify(fooBalancer1).shutdown();
verify(barBalancer0, never()).shutdown();
}
@Test
public void typicalPriorityFailOverFlow() {
PolicySelection policy0 = new PolicySelection(fooLbProvider, null, new Object());
PolicySelection policy1 = new PolicySelection(fooLbProvider, null, new Object());
PolicySelection policy2 = new PolicySelection(fooLbProvider, null, new Object());
PolicySelection policy3 = new PolicySelection(fooLbProvider, null, new Object());
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", policy0, "p1", policy1, "p2", policy2, "p3", policy3),
ImmutableList.of("p0", "p1", "p2", "p3"));
priorityLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(1);
assertThat(fooHelpers).hasSize(1);
LoadBalancer balancer0 = Iterables.getLast(fooBalancers);
Helper helper0 = Iterables.getOnlyElement(fooHelpers);
// p0 gets READY.
final Subchannel subchannel0 = mock(Subchannel.class);
helper0.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel0);
}
});
assertLatestSubchannelPicker(subchannel0);
// p0 fails over to p1 immediately.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(2);
assertThat(fooHelpers).hasSize(2);
LoadBalancer balancer1 = Iterables.getLast(fooBalancers);
// p1 timeout, and fails over to p2
fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(3);
assertThat(fooHelpers).hasSize(3);
LoadBalancer balancer2 = Iterables.getLast(fooBalancers);
Helper helper2 = Iterables.getLast(fooHelpers);
// p2 gets READY
final Subchannel subchannel1 = mock(Subchannel.class);
helper2.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel1);
}
});
assertLatestSubchannelPicker(subchannel1);
// p0 gets back to READY
final Subchannel subchannel2 = mock(Subchannel.class);
helper0.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel2);
}
});
assertLatestSubchannelPicker(subchannel2);
// p2 fails but does not affect overall picker
helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertLatestSubchannelPicker(subchannel2);
// p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertLatestConnectivityState(CONNECTING);
assertThat(fooBalancers).hasSize(4);
assertThat(fooHelpers).hasSize(4);
LoadBalancer balancer3 = Iterables.getLast(fooBalancers);
Helper helper3 = Iterables.getLast(fooHelpers);
// p3 fails then the channel should go to TRANSIENT_FAILURE
helper3.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertLatestConnectivityState(TRANSIENT_FAILURE);
// p2 gets back to READY
final Subchannel subchannel3 = mock(Subchannel.class);
helper2.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel3);
}
});
assertLatestSubchannelPicker(subchannel3);
// p0 gets back to READY
final Subchannel subchannel4 = mock(Subchannel.class);
helper0.updateBalancingState(
READY,
new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel4);
}
});
assertLatestSubchannelPicker(subchannel4);
// p0 fails over to p2 and picker is updated to p2's existing picker.
helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE));
assertLatestSubchannelPicker(subchannel3);
// Deactivate child balancer get deleted.
fakeClock.forwardTime(15, TimeUnit.MINUTES);
verify(balancer0, never()).shutdown();
verify(balancer1, never()).shutdown();
verify(balancer2, never()).shutdown();
verify(balancer3).shutdown();
}
private void assertLatestConnectivityState(ConnectivityState expectedState) {
verify(helper, atLeastOnce())
.updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture());
assertThat(connectivityStateCaptor.getValue()).isEqualTo(expectedState);
}
private void assertLatestSubchannelPicker(Subchannel expectedSubchannelToPick) {
assertLatestConnectivityState(READY);
assertThat(
pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel())
.isEqualTo(expectedSubchannelToPick);
}
}