xds: implement the top-level LB policy (#7203)

The top-level LB policy, which is an aggregator for CDS policies. It maintains the lifecycle of CDS LB policy instances. The pick argument taken from the Channel contains the information to determine which child CDS policy instance should the picking operation be delegated to.

The implementation is similar to the action part of what we currently have in the routing policy. The existing routing policy will be refactored to two parts, with the route match part moved into ConfigSelector and action part being this top-level LB policy.
This commit is contained in:
Chengyuan Zhang 2020-07-20 19:12:59 +00:00 committed by GitHub
parent e7cd2299c4
commit b9d067677c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 918 additions and 0 deletions

View File

@ -0,0 +1,265 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* The top-level load balancing policy.
*/
class ClusterManagerLoadBalancer extends LoadBalancer {
@VisibleForTesting
static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15;
@VisibleForTesting
static final CallOptions.Key<String> ROUTING_CLUSTER_NAME_KEY =
CallOptions.Key.create("io.grpc.xds.ROUTING_CLUSTER_NAME_KEY");
private final Map<String, ChildLbState> childLbStates = new HashMap<>();
private final Helper helper;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final XdsLogger logger;
ClusterManagerLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
logger = XdsLogger.withLogId(
InternalLogId.allocate("cluster_manager-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
ClusterManagerConfig config = (ClusterManagerConfig)
resolvedAddresses.getLoadBalancingPolicyConfig();
Map<String, PolicySelection> newChildPolicies = config.childPolicies;
logger.log(
XdsLogLevel.INFO,
"Received cluster_manager lb config: child names={0}", newChildPolicies.keySet());
for (Map.Entry<String, PolicySelection> entry : newChildPolicies.entrySet()) {
final String name = entry.getKey();
LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider();
Object childConfig = entry.getValue().getConfig();
if (!childLbStates.containsKey(name)) {
childLbStates.put(name, new ChildLbState(name, childPolicyProvider));
} else {
childLbStates.get(name).reactivate(childPolicyProvider);
}
final LoadBalancer childLb = childLbStates.get(name).lb;
final ResolvedAddresses childAddresses =
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build();
syncContext.execute(new Runnable() {
@Override
public void run() {
childLb.handleResolvedAddresses(childAddresses);
}
});
}
for (String name : childLbStates.keySet()) {
if (!newChildPolicies.containsKey(name)) {
childLbStates.get(name).deactivate();
}
}
updateOverallBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
boolean gotoTransientFailure = true;
for (ChildLbState state : childLbStates.values()) {
if (!state.deactivated) {
gotoTransientFailure = false;
state.lb.handleNameResolutionError(error);
}
}
if (gotoTransientFailure) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
for (ChildLbState state : childLbStates.values()) {
state.shutdown();
}
}
private void updateOverallBalancingState() {
ConnectivityState overallState = null;
final Map<String, SubchannelPicker> childPickers = new HashMap<>();
for (ChildLbState childLbState : childLbStates.values()) {
if (childLbState.deactivated) {
continue;
}
childPickers.put(childLbState.name, childLbState.currentPicker);
overallState = aggregateState(overallState, childLbState.currentState);
}
if (overallState != null) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
String clusterName = args.getCallOptions().getOption(ROUTING_CLUSTER_NAME_KEY);
SubchannelPicker delegate = childPickers.get(clusterName);
if (delegate == null) {
return
PickResult.withError(
Status.UNAVAILABLE.withDescription("Unable to find cluster " + clusterName));
}
return delegate.pickSubchannel(args);
}
};
helper.updateBalancingState(overallState, picker);
}
}
@Nullable
private static ConnectivityState aggregateState(
@Nullable ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
return childState;
}
if (overallState == READY || childState == READY) {
return READY;
}
if (overallState == CONNECTING || childState == CONNECTING) {
return CONNECTING;
}
if (overallState == IDLE || childState == IDLE) {
return IDLE;
}
return overallState;
}
private final class ChildLbState {
private final String name;
private final GracefulSwitchLoadBalancer lb;
private LoadBalancerProvider policyProvider;
private ConnectivityState currentState = CONNECTING;
private SubchannelPicker currentPicker = BUFFER_PICKER;
private boolean deactivated;
@Nullable
ScheduledHandle deletionTimer;
ChildLbState(String name, LoadBalancerProvider policyProvider) {
this.name = name;
this.policyProvider = policyProvider;
lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper());
lb.switchTo(policyProvider);
}
void deactivate() {
if (deactivated) {
return;
}
class DeletionTask implements Runnable {
@Override
public void run() {
shutdown();
childLbStates.remove(name);
}
}
deletionTimer =
syncContext.schedule(
new DeletionTask(),
DELAYED_CHILD_DELETION_TIME_MINUTES,
TimeUnit.MINUTES,
timeService);
deactivated = true;
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", name);
}
void reactivate(LoadBalancerProvider policyProvider) {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
deactivated = false;
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", name);
}
if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) {
logger.log(
XdsLogLevel.DEBUG,
"Child balancer {0} switching policy from {1} to {2}",
name, this.policyProvider.getPolicyName(), policyProvider.getPolicyName());
lb.switchTo(policyProvider);
this.policyProvider = policyProvider;
}
}
void shutdown() {
deactivated = true;
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
lb.shutdown();
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deleted", name);
}
private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
currentState = newState;
currentPicker = newPicker;
if (!deactivated) {
updateOverallBalancingState();
}
}
@Override
protected Helper delegate() {
return helper;
}
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
/**
* The provider for the cluster_manager load balancing policy. This class should not be directly
* referenced in code. The policy should be accessed through
* {@link LoadBalancerRegistry#getProvider} with the name "cluster_manager_experimental".
*/
@Internal
public class ClusterManagerLoadBalancerProvider extends LoadBalancerProvider {
@Nullable
private final LoadBalancerRegistry lbRegistry;
public ClusterManagerLoadBalancerProvider() {
this(null);
}
@VisibleForTesting
ClusterManagerLoadBalancerProvider(@Nullable LoadBalancerRegistry lbRegistry) {
this.lbRegistry = lbRegistry;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME;
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
Map<String, PolicySelection> parsedChildPolicies = new LinkedHashMap<>();
try {
Map<String, ?> childPolicies = JsonUtil.getObject(rawConfig, "childPolicy");
if (childPolicies == null || childPolicies.isEmpty()) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
"No child policy provided for cluster_manager LB policy: " + rawConfig));
}
for (String name : childPolicies.keySet()) {
Map<String, ?> childPolicy = JsonUtil.getObject(childPolicies, name);
if (childPolicy == null) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
"No config for child " + name + " in cluster_manager LB policy: " + rawConfig));
}
List<LbConfig> childConfigCandidates =
ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(childPolicy, "lbPolicy"));
if (childConfigCandidates == null || childConfigCandidates.isEmpty()) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription(
"No config specified for child " + name + " in cluster_manager Lb policy: "
+ rawConfig));
}
LoadBalancerRegistry registry =
lbRegistry != null ? lbRegistry : LoadBalancerRegistry.getDefaultRegistry();
ConfigOrError selectedConfig =
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, registry);
if (selectedConfig.getError() != null) {
Status error = selectedConfig.getError();
return ConfigOrError.fromError(
Status.INTERNAL
.withCause(error.getCause())
.withDescription(error.getDescription())
.augmentDescription("Failed to select config for child " + name));
}
parsedChildPolicies.put(name, (PolicySelection) selectedConfig.getConfig());
}
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.fromThrowable(e).withDescription(
"Failed to parse cluster_manager LB config: " + rawConfig));
}
return ConfigOrError.fromConfig(new ClusterManagerConfig(parsedChildPolicies));
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new ClusterManagerLoadBalancer(helper);
}
static class ClusterManagerConfig {
final Map<String, PolicySelection> childPolicies;
ClusterManagerConfig(Map<String, PolicySelection> childPolicies) {
this.childPolicies = Collections.unmodifiableMap(childPolicies);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ClusterManagerConfig)) {
return false;
}
ClusterManagerConfig config = (ClusterManagerConfig) o;
return Objects.equals(childPolicies, config.childPolicies);
}
@Override
public int hashCode() {
return Objects.hash(childPolicies);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("childPolicies", childPolicies)
.toString();
}
}
}

View File

@ -17,6 +17,7 @@
package io.grpc.xds;
final class XdsLbPolicies {
static final String CLUSTER_MANAGER_POLICY_NAME = "cluster_manager_experimental";
static final String CDS_POLICY_NAME = "cds_experimental";
static final String EDS_POLICY_NAME = "eds_experimental";
static final String WEIGHTED_TARGET_POLICY_NAME = "weighted_target_experimental";

View File

@ -2,3 +2,4 @@ io.grpc.xds.CdsLoadBalancerProvider
io.grpc.xds.EdsLoadBalancerProvider
io.grpc.xds.WeightedTargetLoadBalancerProvider
io.grpc.xds.XdsRoutingLoadBalancerProvider
io.grpc.xds.ClusterManagerLoadBalancerProvider

View File

@ -0,0 +1,145 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.JsonParser;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.junit.Test;
/** Tests for {@link ClusterManagerLoadBalancerProvider}. */
public class ClusterManagerLoadBalancerProviderTest {
@Test
public void parseClusterManagerLoadBalancingPolicyConfig() throws IOException {
LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
ClusterManagerLoadBalancerProvider provider =
new ClusterManagerLoadBalancerProvider(lbRegistry);
final Object fooConfig = new Object();
LoadBalancerProvider lbProviderFoo = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "foo_policy";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return ConfigOrError.fromConfig(fooConfig);
}
};
final Object barConfig = new Object();
LoadBalancerProvider lbProviderBar = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "bar_policy";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return ConfigOrError.fromConfig(barConfig);
}
};
lbRegistry.register(lbProviderFoo);
lbRegistry.register(lbProviderBar);
String clusterManagerConfigJson = "{\n"
+ " \"childPolicy\": {\n"
+ " \"child1\": {\n"
+ " \"lbPolicy\": [\n"
+ " {\n"
+ " \"foo_policy\": {"
+ " \"config_name\": \"config_value\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"child2\": {\n"
+ " \"lbPolicy\": [\n"
+ " {\n"
+ " \"bar_policy\": {}\n"
+ " }, {\n"
+ " \"unsupported\": {}\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> rawLbConfigMap = (Map<String, ?>) JsonParser.parse(clusterManagerConfigJson);
ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap);
assertThat(configOrError.getConfig()).isNotNull();
ClusterManagerConfig config = (ClusterManagerConfig) configOrError.getConfig();
assertThat(config.childPolicies)
.containsExactly(
"child1",
new PolicySelection(
lbProviderFoo, Collections.singletonMap("config_name", "config_value"), fooConfig),
"child2",
new PolicySelection(lbProviderBar, Collections.<String, Object>emptyMap(), barConfig));
}
@Test
public void registered() {
LoadBalancerProvider provider =
LoadBalancerRegistry
.getDefaultRegistry()
.getProvider("cluster_manager_experimental");
assertThat(provider).isInstanceOf(ClusterManagerLoadBalancerProvider.class);
}
}

View File

@ -0,0 +1,351 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.grpc.CallOptions;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link ClusterManagerLoadBalancer}. */
@RunWith(JUnit4.class)
public class ClusterManagerLoadBalancerTest {
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final FakeClock fakeClock = new FakeClock();
@Captor
ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Mock
private LoadBalancer.Helper helper;
private final Map<String, Object> lbConfigInventory = new HashMap<>();
private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
private LoadBalancer clusterManagerLoadBalancer;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService());
lbConfigInventory.put("childA", new Object());
lbConfigInventory.put("childB", new Object());
lbConfigInventory.put("childC", null);
clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper);
}
@After
public void tearDown() {
clusterManagerLoadBalancer.shutdown();
for (FakeLoadBalancer childLb : childBalancers) {
assertThat(childLb.shutdown).isTrue();
}
}
@Test
public void handleResolvedAddressesUpdatesChannelPicker() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
verify(helper, atLeastOnce()).updateBalancingState(
eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult());
assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult());
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
assertThat(childBalancer1.name).isEqualTo("policy_a");
assertThat(childBalancer2.name).isEqualTo("policy_b");
assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("childA"));
assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("childB"));
// Receive an updated config.
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childC", "policy_c"));
verify(helper, atLeast(2))
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
picker = pickerCaptor.getValue();
assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult());
assertThat(pickSubchannel(picker, "childC")).isEqualTo(PickResult.withNoResult());
Status status = pickSubchannel(picker, "childB").getStatus();
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(status.getDescription()).isEqualTo("Unable to find cluster childB");
assertThat(fakeClock.numPendingTasks())
.isEqualTo(1); // (delayed) shutdown because "childB" is removed
assertThat(childBalancer1.shutdown).isFalse();
assertThat(childBalancer2.shutdown).isFalse();
assertThat(childBalancers).hasSize(3);
FakeLoadBalancer childBalancer3 = childBalancers.get(2);
assertThat(childBalancer3.name).isEqualTo("policy_c");
assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("childC"));
fakeClock.forwardTime(
ClusterManagerLoadBalancer.DELAYED_CHILD_DELETION_TIME_MINUTES, TimeUnit.MINUTES);
assertThat(childBalancer2.shutdown).isTrue();
}
@Test
public void updateBalancingStateFromChildBalancers() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(pickSubchannel(picker, "childA").getSubchannel()).isEqualTo(subchannel1);
assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult());
childBalancer2.deliverSubchannelState(subchannel2, ConnectivityState.READY);
verify(helper, times(2))
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
assertThat(pickSubchannel(pickerCaptor.getValue(), "childB").getSubchannel())
.isEqualTo(subchannel2);
}
@Test
public void updateBalancingStateFromDeactivatedChildBalancer() {
FakeLoadBalancer balancer = deliverAddressesAndUpdateToRemoveChildPolicy("childA", "policy_a");
Subchannel subchannel = mock(Subchannel.class);
balancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
verify(helper, never()).updateBalancingState(
eq(ConnectivityState.READY), any(SubchannelPicker.class));
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a"));
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel())
.isEqualTo(subchannel);
}
@Test
public void errorPropagation() {
Status error = Status.UNAVAILABLE.withDescription("resolver error");
clusterManagerLoadBalancer.handleNameResolutionError(error);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription()).isEqualTo("resolver error");
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
clusterManagerLoadBalancer.handleNameResolutionError(error);
assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(childBalancer1.upstreamError.getDescription()).isEqualTo("resolver error");
assertThat(childBalancer2.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("resolver error");
}
@Test
public void errorPropagationToDeactivatedChildBalancer() {
FakeLoadBalancer balancer = deliverAddressesAndUpdateToRemoveChildPolicy("childA", "policy_a");
clusterManagerLoadBalancer.handleNameResolutionError(
Status.UNKNOWN.withDescription("unknown error"));
assertThat(balancer.upstreamError).isNull();
}
private FakeLoadBalancer deliverAddressesAndUpdateToRemoveChildPolicy(
String childName, String childPolicyName) {
lbConfigInventory.put("childFoo", null);
deliverResolvedAddresses(
ImmutableMap.of(childName, childPolicyName, "childFoo", "policy_foo"));
verify(helper, atLeastOnce()).updateBalancingState(
eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer balancer = childBalancers.get(0);
deliverResolvedAddresses(ImmutableMap.of("childFoo", "policy_foo"));
verify(helper, atLeast(2)).updateBalancingState(
eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.MINUTES))
.isEqualTo(ClusterManagerLoadBalancer.DELAYED_CHILD_DELETION_TIME_MINUTES);
return balancer;
}
private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
syncContext.execute(new Runnable() {
@Override
public void run() {
clusterManagerLoadBalancer
.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setLoadBalancingPolicyConfig(buildConfig(childPolicies))
.build());
}
});
}
private ClusterManagerConfig buildConfig(Map<String, String> childPolicies) {
Map<String, PolicySelection> childPolicySelections = new LinkedHashMap<>();
for (String name : childPolicies.keySet()) {
String childPolicyName = childPolicies.get(name);
Object childConfig = lbConfigInventory.get(name);
PolicySelection policy =
new PolicySelection(new FakeLoadBalancerProvider(childPolicyName), null, childConfig);
childPolicySelections.put(name, policy);
}
return new ClusterManagerConfig(childPolicySelections);
}
private static PickResult pickSubchannel(SubchannelPicker picker, String name) {
PickSubchannelArgs args =
new PickSubchannelArgsImpl(
MethodDescriptor.<Void, Void>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("/service/method")
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.build(),
new Metadata(),
CallOptions.DEFAULT.withOption(
ClusterManagerLoadBalancer.ROUTING_CLUSTER_NAME_KEY, name));
return picker.pickSubchannel(args);
}
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
FakeLoadBalancerProvider(String policyName) {
this.policyName = policyName;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper);
childBalancers.add(balancer);
return balancer;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 0; // doesn't matter
}
@Override
public String getPolicyName() {
return policyName;
}
}
private final class FakeLoadBalancer extends LoadBalancer {
private final String name;
private final Helper helper;
private Object config;
private Status upstreamError;
private boolean shutdown;
FakeLoadBalancer(String name, Helper helper) {
this.name = name;
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
}
@Override
public void handleNameResolutionError(Status error) {
upstreamError = error;
}
@Override
public void shutdown() {
shutdown = true;
childBalancers.remove(this);
}
void deliverSubchannelState(final Subchannel subchannel, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
helper.updateBalancingState(state, picker);
}
}
}