diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java new file mode 100644 index 0000000000..589a42e04c --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -0,0 +1,265 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.CallOptions; +import io.grpc.ConnectivityState; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * The top-level load balancing policy. + */ +class ClusterManagerLoadBalancer extends LoadBalancer { + + @VisibleForTesting + static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15; + @VisibleForTesting + static final CallOptions.Key ROUTING_CLUSTER_NAME_KEY = + CallOptions.Key.create("io.grpc.xds.ROUTING_CLUSTER_NAME_KEY"); + + private final Map childLbStates = new HashMap<>(); + private final Helper helper; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timeService; + private final XdsLogger logger; + + ClusterManagerLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); + this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); + logger = XdsLogger.withLogId( + InternalLogId.allocate("cluster_manager-lb", helper.getAuthority())); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + ClusterManagerConfig config = (ClusterManagerConfig) + resolvedAddresses.getLoadBalancingPolicyConfig(); + Map newChildPolicies = config.childPolicies; + logger.log( + XdsLogLevel.INFO, + "Received cluster_manager lb config: child names={0}", newChildPolicies.keySet()); + for (Map.Entry entry : newChildPolicies.entrySet()) { + final String name = entry.getKey(); + LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider(); + Object childConfig = entry.getValue().getConfig(); + if (!childLbStates.containsKey(name)) { + childLbStates.put(name, new ChildLbState(name, childPolicyProvider)); + } else { + childLbStates.get(name).reactivate(childPolicyProvider); + } + final LoadBalancer childLb = childLbStates.get(name).lb; + final ResolvedAddresses childAddresses = + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build(); + syncContext.execute(new Runnable() { + @Override + public void run() { + childLb.handleResolvedAddresses(childAddresses); + } + }); + } + for (String name : childLbStates.keySet()) { + if (!newChildPolicies.containsKey(name)) { + childLbStates.get(name).deactivate(); + } + } + updateOverallBalancingState(); + } + + @Override + public void handleNameResolutionError(Status error) { + logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); + boolean gotoTransientFailure = true; + for (ChildLbState state : childLbStates.values()) { + if (!state.deactivated) { + gotoTransientFailure = false; + state.lb.handleNameResolutionError(error); + } + } + if (gotoTransientFailure) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + } + + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + + @Override + public void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutdown"); + for (ChildLbState state : childLbStates.values()) { + state.shutdown(); + } + } + + private void updateOverallBalancingState() { + ConnectivityState overallState = null; + final Map childPickers = new HashMap<>(); + for (ChildLbState childLbState : childLbStates.values()) { + if (childLbState.deactivated) { + continue; + } + childPickers.put(childLbState.name, childLbState.currentPicker); + overallState = aggregateState(overallState, childLbState.currentState); + } + if (overallState != null) { + SubchannelPicker picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + String clusterName = args.getCallOptions().getOption(ROUTING_CLUSTER_NAME_KEY); + SubchannelPicker delegate = childPickers.get(clusterName); + if (delegate == null) { + return + PickResult.withError( + Status.UNAVAILABLE.withDescription("Unable to find cluster " + clusterName)); + } + return delegate.pickSubchannel(args); + } + }; + helper.updateBalancingState(overallState, picker); + } + } + + @Nullable + private static ConnectivityState aggregateState( + @Nullable ConnectivityState overallState, ConnectivityState childState) { + if (overallState == null) { + return childState; + } + if (overallState == READY || childState == READY) { + return READY; + } + if (overallState == CONNECTING || childState == CONNECTING) { + return CONNECTING; + } + if (overallState == IDLE || childState == IDLE) { + return IDLE; + } + return overallState; + } + + private final class ChildLbState { + private final String name; + private final GracefulSwitchLoadBalancer lb; + private LoadBalancerProvider policyProvider; + private ConnectivityState currentState = CONNECTING; + private SubchannelPicker currentPicker = BUFFER_PICKER; + private boolean deactivated; + @Nullable + ScheduledHandle deletionTimer; + + ChildLbState(String name, LoadBalancerProvider policyProvider) { + this.name = name; + this.policyProvider = policyProvider; + lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper()); + lb.switchTo(policyProvider); + } + + void deactivate() { + if (deactivated) { + return; + } + + class DeletionTask implements Runnable { + @Override + public void run() { + shutdown(); + childLbStates.remove(name); + } + } + + deletionTimer = + syncContext.schedule( + new DeletionTask(), + DELAYED_CHILD_DELETION_TIME_MINUTES, + TimeUnit.MINUTES, + timeService); + deactivated = true; + logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", name); + } + + void reactivate(LoadBalancerProvider policyProvider) { + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + deactivated = false; + logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", name); + } + if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) { + logger.log( + XdsLogLevel.DEBUG, + "Child balancer {0} switching policy from {1} to {2}", + name, this.policyProvider.getPolicyName(), policyProvider.getPolicyName()); + lb.switchTo(policyProvider); + this.policyProvider = policyProvider; + } + } + + void shutdown() { + deactivated = true; + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + } + lb.shutdown(); + logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deleted", name); + } + + private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper { + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + currentState = newState; + currentPicker = newPicker; + if (!deactivated) { + updateOverallBalancingState(); + } + } + + @Override + protected Helper delegate() { + return helper; + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancerProvider.java new file mode 100644 index 0000000000..e219c0467a --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancerProvider.java @@ -0,0 +1,155 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; + +/** + * The provider for the cluster_manager load balancing policy. This class should not be directly + * referenced in code. The policy should be accessed through + * {@link LoadBalancerRegistry#getProvider} with the name "cluster_manager_experimental". + */ +@Internal +public class ClusterManagerLoadBalancerProvider extends LoadBalancerProvider { + + @Nullable + private final LoadBalancerRegistry lbRegistry; + + public ClusterManagerLoadBalancerProvider() { + this(null); + } + + @VisibleForTesting + ClusterManagerLoadBalancerProvider(@Nullable LoadBalancerRegistry lbRegistry) { + this.lbRegistry = lbRegistry; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + Map parsedChildPolicies = new LinkedHashMap<>(); + try { + Map childPolicies = JsonUtil.getObject(rawConfig, "childPolicy"); + if (childPolicies == null || childPolicies.isEmpty()) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription( + "No child policy provided for cluster_manager LB policy: " + rawConfig)); + } + for (String name : childPolicies.keySet()) { + Map childPolicy = JsonUtil.getObject(childPolicies, name); + if (childPolicy == null) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription( + "No config for child " + name + " in cluster_manager LB policy: " + rawConfig)); + } + List childConfigCandidates = + ServiceConfigUtil.unwrapLoadBalancingConfigList( + JsonUtil.getListOfObjects(childPolicy, "lbPolicy")); + if (childConfigCandidates == null || childConfigCandidates.isEmpty()) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription( + "No config specified for child " + name + " in cluster_manager Lb policy: " + + rawConfig)); + } + LoadBalancerRegistry registry = + lbRegistry != null ? lbRegistry : LoadBalancerRegistry.getDefaultRegistry(); + ConfigOrError selectedConfig = + ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, registry); + if (selectedConfig.getError() != null) { + Status error = selectedConfig.getError(); + return ConfigOrError.fromError( + Status.INTERNAL + .withCause(error.getCause()) + .withDescription(error.getDescription()) + .augmentDescription("Failed to select config for child " + name)); + } + parsedChildPolicies.put(name, (PolicySelection) selectedConfig.getConfig()); + } + } catch (RuntimeException e) { + return ConfigOrError.fromError( + Status.fromThrowable(e).withDescription( + "Failed to parse cluster_manager LB config: " + rawConfig)); + } + return ConfigOrError.fromConfig(new ClusterManagerConfig(parsedChildPolicies)); + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new ClusterManagerLoadBalancer(helper); + } + + static class ClusterManagerConfig { + final Map childPolicies; + + ClusterManagerConfig(Map childPolicies) { + this.childPolicies = Collections.unmodifiableMap(childPolicies); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ClusterManagerConfig)) { + return false; + } + ClusterManagerConfig config = (ClusterManagerConfig) o; + return Objects.equals(childPolicies, config.childPolicies); + } + + @Override + public int hashCode() { + return Objects.hash(childPolicies); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("childPolicies", childPolicies) + .toString(); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java b/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java index 95fa120b0b..06ec45dc91 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java +++ b/xds/src/main/java/io/grpc/xds/XdsLbPolicies.java @@ -17,6 +17,7 @@ package io.grpc.xds; final class XdsLbPolicies { + static final String CLUSTER_MANAGER_POLICY_NAME = "cluster_manager_experimental"; static final String CDS_POLICY_NAME = "cds_experimental"; static final String EDS_POLICY_NAME = "eds_experimental"; static final String WEIGHTED_TARGET_POLICY_NAME = "weighted_target_experimental"; diff --git a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index 9f83a30598..a3baf7399e 100644 --- a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -2,3 +2,4 @@ io.grpc.xds.CdsLoadBalancerProvider io.grpc.xds.EdsLoadBalancerProvider io.grpc.xds.WeightedTargetLoadBalancerProvider io.grpc.xds.XdsRoutingLoadBalancerProvider +io.grpc.xds.ClusterManagerLoadBalancerProvider diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerProviderTest.java new file mode 100644 index 0000000000..3621761b71 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerProviderTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.internal.JsonParser; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.junit.Test; + +/** Tests for {@link ClusterManagerLoadBalancerProvider}. */ +public class ClusterManagerLoadBalancerProviderTest { + + @Test + public void parseClusterManagerLoadBalancingPolicyConfig() throws IOException { + LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); + ClusterManagerLoadBalancerProvider provider = + new ClusterManagerLoadBalancerProvider(lbRegistry); + final Object fooConfig = new Object(); + LoadBalancerProvider lbProviderFoo = new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "foo_policy"; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig( + Map rawLoadBalancingPolicyConfig) { + return ConfigOrError.fromConfig(fooConfig); + } + }; + final Object barConfig = new Object(); + LoadBalancerProvider lbProviderBar = new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "bar_policy"; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + throw new UnsupportedOperationException("Should not be called"); + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig( + Map rawLoadBalancingPolicyConfig) { + return ConfigOrError.fromConfig(barConfig); + } + }; + lbRegistry.register(lbProviderFoo); + lbRegistry.register(lbProviderBar); + + String clusterManagerConfigJson = "{\n" + + " \"childPolicy\": {\n" + + " \"child1\": {\n" + + " \"lbPolicy\": [\n" + + " {\n" + + " \"foo_policy\": {" + + " \"config_name\": \"config_value\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"child2\": {\n" + + " \"lbPolicy\": [\n" + + " {\n" + + " \"bar_policy\": {}\n" + + " }, {\n" + + " \"unsupported\": {}\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + "}"; + @SuppressWarnings("unchecked") + Map rawLbConfigMap = (Map) JsonParser.parse(clusterManagerConfigJson); + ConfigOrError configOrError = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap); + assertThat(configOrError.getConfig()).isNotNull(); + ClusterManagerConfig config = (ClusterManagerConfig) configOrError.getConfig(); + assertThat(config.childPolicies) + .containsExactly( + "child1", + new PolicySelection( + lbProviderFoo, Collections.singletonMap("config_name", "config_value"), fooConfig), + "child2", + new PolicySelection(lbProviderBar, Collections.emptyMap(), barConfig)); + } + + @Test + public void registered() { + LoadBalancerProvider provider = + LoadBalancerRegistry + .getDefaultRegistry() + .getProvider("cluster_manager_experimental"); + assertThat(provider).isInstanceOf(ClusterManagerLoadBalancerProvider.class); + } +} diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java new file mode 100644 index 0000000000..257ee3e181 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -0,0 +1,351 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import io.grpc.CallOptions; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; +import io.grpc.internal.PickSubchannelArgsImpl; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.testing.TestMethodDescriptors; +import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link ClusterManagerLoadBalancer}. */ +@RunWith(JUnit4.class) +public class ClusterManagerLoadBalancerTest { + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock fakeClock = new FakeClock(); + + @Captor + ArgumentCaptor pickerCaptor; + @Mock + private LoadBalancer.Helper helper; + + private final Map lbConfigInventory = new HashMap<>(); + private final List childBalancers = new ArrayList<>(); + private LoadBalancer clusterManagerLoadBalancer; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(helper.getSynchronizationContext()).thenReturn(syncContext); + when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService()); + lbConfigInventory.put("childA", new Object()); + lbConfigInventory.put("childB", new Object()); + lbConfigInventory.put("childC", null); + clusterManagerLoadBalancer = new ClusterManagerLoadBalancer(helper); + } + + @After + public void tearDown() { + clusterManagerLoadBalancer.shutdown(); + for (FakeLoadBalancer childLb : childBalancers) { + assertThat(childLb.shutdown).isTrue(); + } + } + + @Test + public void handleResolvedAddressesUpdatesChannelPicker() { + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult()); + assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult()); + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); + assertThat(childBalancer1.name).isEqualTo("policy_a"); + assertThat(childBalancer2.name).isEqualTo("policy_b"); + assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("childA")); + assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("childB")); + + // Receive an updated config. + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childC", "policy_c")); + + verify(helper, atLeast(2)) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult()); + assertThat(pickSubchannel(picker, "childC")).isEqualTo(PickResult.withNoResult()); + Status status = pickSubchannel(picker, "childB").getStatus(); + assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(status.getDescription()).isEqualTo("Unable to find cluster childB"); + assertThat(fakeClock.numPendingTasks()) + .isEqualTo(1); // (delayed) shutdown because "childB" is removed + assertThat(childBalancer1.shutdown).isFalse(); + assertThat(childBalancer2.shutdown).isFalse(); + + assertThat(childBalancers).hasSize(3); + FakeLoadBalancer childBalancer3 = childBalancers.get(2); + assertThat(childBalancer3.name).isEqualTo("policy_c"); + assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("childC")); + + fakeClock.forwardTime( + ClusterManagerLoadBalancer.DELAYED_CHILD_DELETION_TIME_MINUTES, TimeUnit.MINUTES); + assertThat(childBalancer2.shutdown).isTrue(); + } + + @Test + public void updateBalancingStateFromChildBalancers() { + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); + Subchannel subchannel1 = mock(Subchannel.class); + Subchannel subchannel2 = mock(Subchannel.class); + childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY); + + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertThat(pickSubchannel(picker, "childA").getSubchannel()).isEqualTo(subchannel1); + assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult()); + + childBalancer2.deliverSubchannelState(subchannel2, ConnectivityState.READY); + verify(helper, times(2)) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + assertThat(pickSubchannel(pickerCaptor.getValue(), "childB").getSubchannel()) + .isEqualTo(subchannel2); + } + + @Test + public void updateBalancingStateFromDeactivatedChildBalancer() { + FakeLoadBalancer balancer = deliverAddressesAndUpdateToRemoveChildPolicy("childA", "policy_a"); + Subchannel subchannel = mock(Subchannel.class); + balancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + verify(helper, never()).updateBalancingState( + eq(ConnectivityState.READY), any(SubchannelPicker.class)); + + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a")); + verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + assertThat(pickSubchannel(pickerCaptor.getValue(), "childA").getSubchannel()) + .isEqualTo(subchannel); + } + + @Test + public void errorPropagation() { + Status error = Status.UNAVAILABLE.withDescription("resolver error"); + clusterManagerLoadBalancer.handleNameResolutionError(error); + verify(helper).updateBalancingState( + eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo("resolver error"); + + deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); + + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer childBalancer1 = childBalancers.get(0); + FakeLoadBalancer childBalancer2 = childBalancers.get(1); + + clusterManagerLoadBalancer.handleNameResolutionError(error); + assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer1.upstreamError.getDescription()).isEqualTo("resolver error"); + assertThat(childBalancer2.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("resolver error"); + } + + @Test + public void errorPropagationToDeactivatedChildBalancer() { + FakeLoadBalancer balancer = deliverAddressesAndUpdateToRemoveChildPolicy("childA", "policy_a"); + clusterManagerLoadBalancer.handleNameResolutionError( + Status.UNKNOWN.withDescription("unknown error")); + assertThat(balancer.upstreamError).isNull(); + } + + private FakeLoadBalancer deliverAddressesAndUpdateToRemoveChildPolicy( + String childName, String childPolicyName) { + lbConfigInventory.put("childFoo", null); + deliverResolvedAddresses( + ImmutableMap.of(childName, childPolicyName, "childFoo", "policy_foo")); + + verify(helper, atLeastOnce()).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(childBalancers).hasSize(2); + FakeLoadBalancer balancer = childBalancers.get(0); + + deliverResolvedAddresses(ImmutableMap.of("childFoo", "policy_foo")); + verify(helper, atLeast(2)).updateBalancingState( + eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.MINUTES)) + .isEqualTo(ClusterManagerLoadBalancer.DELAYED_CHILD_DELETION_TIME_MINUTES); + return balancer; + } + + private void deliverResolvedAddresses(final Map childPolicies) { + syncContext.execute(new Runnable() { + @Override + public void run() { + clusterManagerLoadBalancer + .handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setLoadBalancingPolicyConfig(buildConfig(childPolicies)) + .build()); + } + }); + } + + private ClusterManagerConfig buildConfig(Map childPolicies) { + Map childPolicySelections = new LinkedHashMap<>(); + for (String name : childPolicies.keySet()) { + String childPolicyName = childPolicies.get(name); + Object childConfig = lbConfigInventory.get(name); + PolicySelection policy = + new PolicySelection(new FakeLoadBalancerProvider(childPolicyName), null, childConfig); + childPolicySelections.put(name, policy); + } + return new ClusterManagerConfig(childPolicySelections); + } + + private static PickResult pickSubchannel(SubchannelPicker picker, String name) { + PickSubchannelArgs args = + new PickSubchannelArgsImpl( + MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("/service/method") + .setRequestMarshaller(TestMethodDescriptors.voidMarshaller()) + .setResponseMarshaller(TestMethodDescriptors.voidMarshaller()) + .build(), + new Metadata(), + CallOptions.DEFAULT.withOption( + ClusterManagerLoadBalancer.ROUTING_CLUSTER_NAME_KEY, name)); + return picker.pickSubchannel(args); + } + + private final class FakeLoadBalancerProvider extends LoadBalancerProvider { + private final String policyName; + + FakeLoadBalancerProvider(String policyName) { + this.policyName = policyName; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper); + childBalancers.add(balancer); + return balancer; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 0; // doesn't matter + } + + @Override + public String getPolicyName() { + return policyName; + } + } + + private final class FakeLoadBalancer extends LoadBalancer { + private final String name; + private final Helper helper; + private Object config; + private Status upstreamError; + private boolean shutdown; + + FakeLoadBalancer(String name, Helper helper) { + this.name = name; + this.helper = helper; + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + config = resolvedAddresses.getLoadBalancingPolicyConfig(); + } + + @Override + public void handleNameResolutionError(Status error) { + upstreamError = error; + } + + @Override + public void shutdown() { + shutdown = true; + childBalancers.remove(this); + } + + void deliverSubchannelState(final Subchannel subchannel, ConnectivityState state) { + SubchannelPicker picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel); + } + }; + helper.updateBalancingState(state, picker); + } + } +}