diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java new file mode 100644 index 0000000000..36d8854944 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -0,0 +1,291 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; + +import io.grpc.ConnectivityState; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** Load balancer for priority policy. */ +final class PriorityLoadBalancer extends LoadBalancer { + private final Helper helper; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService executor; + private final XdsLogger logger; + + // Includes all active and deactivated children. Mutable. New entries are only added from priority + // 0 up to the selected priority. An entry is only deleted 15 minutes after the its deactivation. + private final Map children = new HashMap<>(); + + // Following fields are only null initially. + private ResolvedAddresses resolvedAddresses; + private List priorityNames; + private Map priorityNameToIndex; + private ConnectivityState currentConnectivityState; + private SubchannelPicker currentPicker; + + PriorityLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + syncContext = helper.getSynchronizationContext(); + executor = helper.getScheduledExecutorService(); + InternalLogId logId = InternalLogId.allocate("priority-lb", helper.getAuthority()); + logger = XdsLogger.withLogId(logId); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + this.resolvedAddresses = resolvedAddresses; + PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + checkNotNull(config, "missing priority lb config"); + priorityNames = config.priorities; + Map pToI = new HashMap<>(); + for (int i = 0; i < priorityNames.size(); i++) { + pToI.put(priorityNames.get(i), i); + } + priorityNameToIndex = Collections.unmodifiableMap(pToI); + for (String priority : children.keySet()) { + if (!priorityNameToIndex.containsKey(priority)) { + children.get(priority).deactivate(); + } + } + for (String priority : priorityNames) { + if (children.containsKey(priority)) { + children.get(priority).updateResolvedAddresses(); + } + } + // Not to report connecting in case a pending priority bumps up on top of the current READY + // priority. + tryNextPriority(false); + } + + @Override + public void handleNameResolutionError(Status error) { + logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); + if (children.isEmpty()) { + updateOverallState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + for (ChildLbState child : children.values()) { + child.lb.handleNameResolutionError(error); + } + } + + @Override + public void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutdown"); + for (ChildLbState child : children.values()) { + child.tearDown(); + } + } + + private void tryNextPriority(boolean reportConnecting) { + for (int i = 0; i < priorityNames.size(); i++) { + String priority = priorityNames.get(i); + if (!children.containsKey(priority)) { + ChildLbState child = new ChildLbState(priority); + children.put(priority, child); + child.updateResolvedAddresses(); + updateOverallState(CONNECTING, BUFFER_PICKER); + return; // Give priority i time to connect. + } + ChildLbState child = children.get(priority); + child.reactivate(); + if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) { + logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority); + updateOverallState(child.connectivityState, child.picker); + for (int j = i + 1; j < priorityNames.size(); j++) { + String p = priorityNames.get(j); + if (children.containsKey(p)) { + children.get(p).deactivate(); + } + } + return; + } + if (child.failOverTimer != null && child.failOverTimer.isPending()) { + if (reportConnecting) { + updateOverallState(CONNECTING, BUFFER_PICKER); + } + return; // Give priority i time to connect. + } + } + // TODO(zdapeng): Include error details of each priority. + logger.log(XdsLogLevel.DEBUG, "All priority failed"); + updateOverallState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE)); + } + + private void updateOverallState(ConnectivityState state, SubchannelPicker picker) { + if (!state.equals(currentConnectivityState) || !picker.equals(currentPicker)) { + currentConnectivityState = state; + currentPicker = picker; + helper.updateBalancingState(state, picker); + } + } + + private final class ChildLbState { + final String priority; + final ChildHelper childHelper; + final GracefulSwitchLoadBalancer lb; + // Timer to fail over to the next priority if not connected in 10 sec. Scheduled only once at + // child initialization. + final ScheduledHandle failOverTimer; + // Timer to delay shutdown and deletion of the priority. Scheduled whenever the child is + // deactivated. + @Nullable ScheduledHandle deletionTimer; + @Nullable String policy; + ConnectivityState connectivityState = CONNECTING; + SubchannelPicker picker = BUFFER_PICKER; + + ChildLbState(final String priority) { + this.priority = priority; + childHelper = new ChildHelper(); + lb = new GracefulSwitchLoadBalancer(childHelper); + + class FailOverTask implements Runnable { + @Override + public void run() { + if (deletionTimer != null && deletionTimer.isPending()) { + // The child is deactivated. + return; + } + logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority); + tryNextPriority(true); + } + } + + failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, executor); + logger.log(XdsLogLevel.DEBUG, "Priority created: {0}", priority); + } + + /** + * Called when the child becomes a priority that is or appears before the first READY one in the + * {@code priorities} list, due to either config update or balancing state update. + */ + void reactivate() { + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + logger.log(XdsLogLevel.DEBUG, "Priority reactivated: {0}", priority); + } + } + + /** + * Called when either the child is removed by config update, or a higher priority becomes READY. + */ + void deactivate() { + if (deletionTimer != null && deletionTimer.isPending()) { + return; + } + + class DeletionTask implements Runnable { + @Override + public void run() { + tearDown(); + children.remove(priority); + } + } + + deletionTimer = syncContext.schedule(new DeletionTask(), 15, TimeUnit.MINUTES, executor); + logger.log(XdsLogLevel.DEBUG, "Priority deactivated: {0}", priority); + } + + void tearDown() { + if (failOverTimer.isPending()) { + failOverTimer.cancel(); + } + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + } + lb.shutdown(); + logger.log(XdsLogLevel.DEBUG, "Priority deleted: {0}", priority); + } + + /** + * Called either when the child is just created and in this case updated with the cached {@code + * resolvedAddresses}, or when priority lb receives a new resolved addresses while the child + * already exists. + */ + void updateResolvedAddresses() { + final ResolvedAddresses addresses = resolvedAddresses; + syncContext.execute( + new Runnable() { + @Override + public void run() { + PriorityLbConfig config = (PriorityLbConfig) addresses.getLoadBalancingPolicyConfig(); + PolicySelection childPolicySelection = config.childConfigs.get(priority); + LoadBalancerProvider lbProvider = childPolicySelection.getProvider(); + String newPolicy = lbProvider.getPolicyName(); + if (!newPolicy.equals(policy)) { + policy = newPolicy; + lb.switchTo(lbProvider); + } + // TODO(zdapeng): Implement address filtering. + lb.handleResolvedAddresses( + addresses + .toBuilder() + .setLoadBalancingPolicyConfig(childPolicySelection.getConfig()) + .build()); + } + }); + } + + final class ChildHelper extends ForwardingLoadBalancerHelper { + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + connectivityState = newState; + picker = newPicker; + if (deletionTimer != null && deletionTimer.isPending()) { + return; + } + if (failOverTimer.isPending()) { + if (newState.equals(READY) || newState.equals(TRANSIENT_FAILURE)) { + failOverTimer.cancel(); + } + } + tryNextPriority(true); + } + + @Override + protected Helper delegate() { + return helper; + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancerProvider.java new file mode 100644 index 0000000000..76afe210c9 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancerProvider.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +/** Provider for priority load balancing policy. */ +final class PriorityLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "priority_experimental"; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new PriorityLoadBalancer(helper); + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + throw new UnsupportedOperationException(); + } + + static final class PriorityLbConfig { + final Map childConfigs; + final List priorities; + + PriorityLbConfig(Map childConfigs, List priorities) { + this.childConfigs = Collections.unmodifiableMap(checkNotNull(childConfigs, "childConfigs")); + this.priorities = Collections.unmodifiableList(checkNotNull(priorities, "priorities")); + checkArgument(!priorities.isEmpty(), "priority list is empty"); + checkArgument( + childConfigs.keySet().containsAll(priorities), + "missing child config for at lease one of the priorities"); + checkArgument( + priorities.size() == new HashSet<>(priorities).size(), + "duplicate names in priorities"); + checkArgument( + priorities.size() == childConfigs.keySet().size(), + "some names in childConfigs are not referenced by priorities"); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("childConfigs", childConfigs) + .add("priorities", priorities) + .toString(); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerProviderTest.java new file mode 100644 index 0000000000..34cdd8a47b --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerProviderTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static org.mockito.Mockito.mock; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.LoadBalancerProvider; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; +import java.util.List; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PriorityLoadBalancerProvider}. */ +@RunWith(JUnit4.class) +public class PriorityLoadBalancerProviderTest { + @Rule public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void priorityLbConfig_emptyPriorities() { + Map childConfigs = + ImmutableMap.of("p0", new PolicySelection(mock(LoadBalancerProvider.class), null, null)); + List priorities = ImmutableList.of(); + + thrown.expect(IllegalArgumentException.class); + new PriorityLbConfig(childConfigs, priorities); + } + + @Test + public void priorityLbConfig_missingChildConfig() { + Map childConfigs = + ImmutableMap.of("p1", new PolicySelection(mock(LoadBalancerProvider.class), null, null)); + List priorities = ImmutableList.of("p0", "p1"); + + thrown.expect(IllegalArgumentException.class); + new PriorityLbConfig(childConfigs, priorities); + } +} diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java new file mode 100644 index 0000000000..e41020edd5 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -0,0 +1,345 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; +import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** Tests for {@link PriorityLoadBalancer}. */ +@RunWith(JUnit4.class) +public class PriorityLoadBalancerTest { + private final List fooBalancers = new ArrayList<>(); + private final List barBalancers = new ArrayList<>(); + private final List fooHelpers = new ArrayList<>(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock fakeClock = new FakeClock(); + + private final LoadBalancerProvider fooLbProvider = + new StandardLoadBalancerProvider("foo_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + fooHelpers.add(helper); + LoadBalancer childBalancer = mock(LoadBalancer.class); + fooBalancers.add(childBalancer); + return childBalancer; + } + }; + + private final LoadBalancerProvider barLbProvider = + new StandardLoadBalancerProvider("bar_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + LoadBalancer childBalancer = mock(LoadBalancer.class); + barBalancers.add(childBalancer); + return childBalancer; + } + }; + + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock private Helper helper; + @Captor ArgumentCaptor resolvedAddressesCaptor; + @Captor ArgumentCaptor connectivityStateCaptor; + @Captor ArgumentCaptor pickerCaptor; + + private PriorityLoadBalancer priorityLb; + + @Before + public void setUp() { + doReturn(syncContext).when(helper).getSynchronizationContext(); + doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); + priorityLb = new PriorityLoadBalancer(helper); + } + + @After + public void tearDown() { + priorityLb.shutdown(); + for (LoadBalancer lb : fooBalancers) { + verify(lb).shutdown(); + } + for (LoadBalancer lb : barBalancers) { + verify(lb).shutdown(); + } + assertThat(fakeClock.getPendingTasks()).isEmpty(); + } + + @Test + public void handleResolvedAddresses() { + List addresses = + ImmutableList.of(new EquivalentAddressGroup(new InetSocketAddress(8080))); + Attributes attributes = + Attributes.newBuilder().set(Attributes.Key.create("fakeKey"), "fakeValue").build(); + Object fooConfig0 = new Object(); + PolicySelection fooPolicy0 = new PolicySelection(fooLbProvider, null, fooConfig0); + Object barConfig0 = new Object(); + PolicySelection barPolicy0 = new PolicySelection(barLbProvider, null, barConfig0); + Object fooConfig1 = new Object(); + PolicySelection fooPolicy1 = new PolicySelection(fooLbProvider, null, fooConfig1); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig( + ImmutableMap.of("p0", fooPolicy0, "p1", barPolicy0, "p2", fooPolicy1), + ImmutableList.of("p0", "p1", "p2")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(addresses) + .setAttributes(attributes) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + assertThat(fooBalancers).hasSize(1); + assertThat(barBalancers).isEmpty(); + LoadBalancer fooBalancer0 = Iterables.getOnlyElement(fooBalancers); + verify(fooBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + ResolvedAddresses addressesReceived = resolvedAddressesCaptor.getValue(); + assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses); + assertThat(addressesReceived.getAttributes()).isEqualTo(attributes); + assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig0); + + // Fail over to p1. + fakeClock.forwardTime(10, TimeUnit.SECONDS); + assertThat(fooBalancers).hasSize(1); + assertThat(barBalancers).hasSize(1); + LoadBalancer barBalancer0 = Iterables.getOnlyElement(barBalancers); + verify(barBalancer0).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + addressesReceived = resolvedAddressesCaptor.getValue(); + assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses); + assertThat(addressesReceived.getAttributes()).isEqualTo(attributes); + assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(barConfig0); + + // Fail over to p2. + fakeClock.forwardTime(10, TimeUnit.SECONDS); + assertThat(fooBalancers).hasSize(2); + assertThat(barBalancers).hasSize(1); + LoadBalancer fooBalancer1 = Iterables.getLast(fooBalancers); + verify(fooBalancer1).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + addressesReceived = resolvedAddressesCaptor.getValue(); + assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(addresses); + assertThat(addressesReceived.getAttributes()).isEqualTo(attributes); + assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig1); + + // New update: p0 and p2 deleted; p1 config changed. + List newAddresses = + ImmutableList.of(new EquivalentAddressGroup(new InetSocketAddress(8081))); + Object newBarConfig = new Object(); + PolicySelection newBarPolicy = new PolicySelection(barLbProvider, null, newBarConfig); + PriorityLbConfig newPriorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p1", newBarPolicy), ImmutableList.of("p1")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(newAddresses) + .setLoadBalancingPolicyConfig(newPriorityLbConfig) + .build()); + assertThat(fooBalancers).hasSize(2); + assertThat(barBalancers).hasSize(1); + verify(barBalancer0, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + addressesReceived = resolvedAddressesCaptor.getValue(); + assertThat(addressesReceived.getAddresses()).containsAtLeastElementsIn(newAddresses); + assertThat(addressesReceived.getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(newBarConfig); + verify(fooBalancer0, never()).shutdown(); + verify(fooBalancer1, never()).shutdown(); + fakeClock.forwardTime(15, TimeUnit.MINUTES); + verify(fooBalancer0).shutdown(); + verify(fooBalancer1).shutdown(); + verify(barBalancer0, never()).shutdown(); + } + + @Test + public void typicalPriorityFailOverFlow() { + PolicySelection policy0 = new PolicySelection(fooLbProvider, null, new Object()); + PolicySelection policy1 = new PolicySelection(fooLbProvider, null, new Object()); + PolicySelection policy2 = new PolicySelection(fooLbProvider, null, new Object()); + PolicySelection policy3 = new PolicySelection(fooLbProvider, null, new Object()); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig( + ImmutableMap.of("p0", policy0, "p1", policy1, "p2", policy2, "p3", policy3), + ImmutableList.of("p0", "p1", "p2", "p3")); + priorityLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + assertThat(fooBalancers).hasSize(1); + assertThat(fooHelpers).hasSize(1); + LoadBalancer balancer0 = Iterables.getLast(fooBalancers); + Helper helper0 = Iterables.getOnlyElement(fooHelpers); + + // p0 gets READY. + final Subchannel subchannel0 = mock(Subchannel.class); + helper0.updateBalancingState( + READY, + new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel0); + } + }); + assertLatestSubchannelPicker(subchannel0); + + // p0 fails over to p1 immediately. + helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED)); + assertLatestConnectivityState(CONNECTING); + assertThat(fooBalancers).hasSize(2); + assertThat(fooHelpers).hasSize(2); + LoadBalancer balancer1 = Iterables.getLast(fooBalancers); + + // p1 timeout, and fails over to p2 + fakeClock.forwardTime(10, TimeUnit.SECONDS); + assertLatestConnectivityState(CONNECTING); + assertThat(fooBalancers).hasSize(3); + assertThat(fooHelpers).hasSize(3); + LoadBalancer balancer2 = Iterables.getLast(fooBalancers); + Helper helper2 = Iterables.getLast(fooHelpers); + + // p2 gets READY + final Subchannel subchannel1 = mock(Subchannel.class); + helper2.updateBalancingState( + READY, + new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel1); + } + }); + assertLatestSubchannelPicker(subchannel1); + + // p0 gets back to READY + final Subchannel subchannel2 = mock(Subchannel.class); + helper0.updateBalancingState( + READY, + new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel2); + } + }); + assertLatestSubchannelPicker(subchannel2); + + // p2 fails but does not affect overall picker + helper2.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE)); + assertLatestSubchannelPicker(subchannel2); + + // p0 fails over to p3 immediately since p1 already timeout and p2 already in TRANSIENT_FAILURE. + helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE)); + assertLatestConnectivityState(CONNECTING); + assertThat(fooBalancers).hasSize(4); + assertThat(fooHelpers).hasSize(4); + LoadBalancer balancer3 = Iterables.getLast(fooBalancers); + Helper helper3 = Iterables.getLast(fooHelpers); + + // p3 fails then the channel should go to TRANSIENT_FAILURE + helper3.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE)); + assertLatestConnectivityState(TRANSIENT_FAILURE); + + // p2 gets back to READY + final Subchannel subchannel3 = mock(Subchannel.class); + helper2.updateBalancingState( + READY, + new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel3); + } + }); + assertLatestSubchannelPicker(subchannel3); + + // p0 gets back to READY + final Subchannel subchannel4 = mock(Subchannel.class); + helper0.updateBalancingState( + READY, + new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel4); + } + }); + assertLatestSubchannelPicker(subchannel4); + + // p0 fails over to p2 and picker is updated to p2's existing picker. + helper0.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.UNAVAILABLE)); + assertLatestSubchannelPicker(subchannel3); + + // Deactivate child balancer get deleted. + fakeClock.forwardTime(15, TimeUnit.MINUTES); + verify(balancer0, never()).shutdown(); + verify(balancer1, never()).shutdown(); + verify(balancer2, never()).shutdown(); + verify(balancer3).shutdown(); + } + + private void assertLatestConnectivityState(ConnectivityState expectedState) { + verify(helper, atLeastOnce()) + .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(expectedState); + } + + private void assertLatestSubchannelPicker(Subchannel expectedSubchannelToPick) { + assertLatestConnectivityState(READY); + assertThat( + pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getSubchannel()) + .isEqualTo(expectedSubchannelToPick); + } +}