From 6a64951005af01617a2384f9464883d60e9006ae Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 23 Mar 2020 14:42:44 -0700 Subject: [PATCH] xds: implement XdsRoutingLoadBalancer --- .../test/java/io/grpc/internal/TestUtils.java | 41 ++- .../io/grpc/xds/XdsRoutingLoadBalancer.java | 193 +++++++++- .../xds/XdsRoutingLoadBalancerProvider.java | 11 +- .../grpc/xds/XdsRoutingLoadBalancerTest.java | 336 ++++++++++++++++++ 4 files changed, 570 insertions(+), 11 deletions(-) create mode 100644 xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java index bb07c84e8f..8ea9afc28a 100644 --- a/core/src/test/java/io/grpc/internal/TestUtils.java +++ b/core/src/test/java/io/grpc/internal/TestUtils.java @@ -24,6 +24,11 @@ import static org.mockito.Mockito.when; import io.grpc.CallOptions; import io.grpc.ChannelLogger; import io.grpc.InternalLogId; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import java.net.SocketAddress; @@ -36,7 +41,41 @@ import org.mockito.stubbing.Answer; /** * Common utility methods for tests. */ -final class TestUtils { +public final class TestUtils { + + /** Base class for a standard LoadBalancerProvider implementation. */ + public abstract static class StandardLoadBalancerProvider extends LoadBalancerProvider { + private final String policyName; + + public StandardLoadBalancerProvider(String policyName) { + this.policyName = policyName; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public final String getPolicyName() { + return policyName; + } + } + + /** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */ + public static SubchannelPicker pickerOf(final Subchannel subchannel) { + return new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel); + } + }; + } static class MockClientTransportInfo { /** diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java index 5fad2d49ec..10050618e2 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java @@ -16,18 +16,205 @@ package io.grpc.xds; -import io.grpc.LoadBalancer; -import io.grpc.Status; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.grpc.ConnectivityState; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancer; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; -// TODO(zdapeng): Implementation. /** Load balancer for xds_routing policy. */ final class XdsRoutingLoadBalancer extends LoadBalancer { + private final XdsLogger logger; + private final Helper helper; + private final Map routeBalancers = new HashMap<>(); + private final Map routeHelpers = new HashMap<>(); + + private Map actions = ImmutableMap.of(); + private List routes = ImmutableList.of(); + + XdsRoutingLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + logger = XdsLogger.withLogId( + InternalLogId.allocate("xds-routing-lb", helper.getAuthority())); + logger.log(XdsLogLevel.INFO, "Created"); + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + XdsRoutingConfig xdsRoutingConfig = + (XdsRoutingConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + checkNotNull(xdsRoutingConfig, "Missing xds_routing lb config"); + + Map newActions = xdsRoutingConfig.actions; + for (String actionName : newActions.keySet()) { + PolicySelection action = newActions.get(actionName); + if (!actions.containsKey(actionName)) { + RouteHelper routeHelper = new RouteHelper(); + GracefulSwitchLoadBalancer routeBalancer = new GracefulSwitchLoadBalancer(routeHelper); + routeBalancer.switchTo(action.getProvider()); + routeHelpers.put(actionName, routeHelper); + routeBalancers.put(actionName, routeBalancer); + } else if (!action.getProvider().equals(actions.get(actionName).getProvider())) { + routeBalancers.get(actionName).switchTo(action.getProvider()); + } + } + + this.routes = xdsRoutingConfig.routes; + this.actions = newActions; + + for (String actionName : actions.keySet()) { + routeBalancers.get(actionName).handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setLoadBalancingPolicyConfig(actions.get(actionName).getConfig()) + .build()); + } + + // Cleanup removed actions. + // TODO(zdapeng): cache removed actions for 15 minutes. + for (String actionName : routeBalancers.keySet()) { + if (!actions.containsKey(actionName)) { + routeBalancers.get(actionName).shutdown(); + } + } + routeBalancers.keySet().retainAll(actions.keySet()); + routeHelpers.keySet().retainAll(actions.keySet()); + } + @Override public void handleNameResolutionError(Status error) { + logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); + if (routeBalancers.isEmpty()) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + for (LoadBalancer routeBalancer : routeBalancers.values()) { + routeBalancer.handleNameResolutionError(error); + } } @Override public void shutdown() { + logger.log(XdsLogLevel.INFO, "Shutdown"); + for (LoadBalancer routeBalancer : routeBalancers.values()) { + routeBalancer.shutdown(); + } + } + + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + + private void updateOverallBalancingState() { + ConnectivityState overallState = null; + // Use LinkedHashMap to preserve the order of routes. + Map routePickers = new LinkedHashMap<>(); + for (Route route : routes) { + RouteHelper routeHelper = routeHelpers.get(route.actionName); + routePickers.put(route.methodName, routeHelper.currentPicker); + ConnectivityState routeState = routeHelper.currentState; + overallState = aggregateState(overallState, routeState); + } + if (overallState != null) { + SubchannelPicker picker = new PathMatchingSubchannelPicker(routePickers); + helper.updateBalancingState(overallState, picker); + } + } + + @Nullable + private static ConnectivityState aggregateState( + @Nullable ConnectivityState overallState, ConnectivityState childState) { + if (overallState == null) { + return childState; + } + if (overallState == READY || childState == READY) { + return READY; + } + if (overallState == CONNECTING || childState == CONNECTING) { + return CONNECTING; + } + if (overallState == IDLE || childState == IDLE) { + return IDLE; + } + return overallState; + } + + /** + * The lb helper for a single route balancer. + */ + private final class RouteHelper extends ForwardingLoadBalancerHelper { + ConnectivityState currentState = CONNECTING; + SubchannelPicker currentPicker = BUFFER_PICKER; + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + currentState = newState; + currentPicker = newPicker; + updateOverallBalancingState(); + } + + @Override + protected Helper delegate() { + return helper; + } + } + + private static final class PathMatchingSubchannelPicker extends SubchannelPicker { + + final Map routePickers; + + /** + * Constructs a picker that will match the path of PickSubchannelArgs with the given map. + * The order of the map entries matters. First match will be picked even if second match is an + * exact (service + method) path match. + */ + PathMatchingSubchannelPicker(Map routePickers) { + this.routePickers = routePickers; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + for (MethodName methodName : routePickers.keySet()) { + if (match(args.getMethodDescriptor(), methodName)) { + return routePickers.get(methodName).pickSubchannel(args); + } + } + // At least the default route should match, otherwise there is a bug. + throw new IllegalStateException("PathMatchingSubchannelPicker: error in matching path"); + } + + boolean match(MethodDescriptor methodDescriptor, MethodName methodName) { + if (methodName.service.isEmpty() && methodName.method.isEmpty()) { + return true; + } + if (methodName.method.isEmpty()) { + return methodName.service.equals(methodDescriptor.getServiceName()); + } + return (methodName.service + '/' + methodName.method) + .equals(methodDescriptor.getFullMethodName()); + } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java index f82487342b..2a08ad0969 100644 --- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java @@ -64,10 +64,6 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider { this.lbRegistry = lbRegistry; } - private LoadBalancerRegistry loadBalancerRegistry() { - return lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : lbRegistry; - } - @Override public boolean isAvailable() { return true; @@ -85,8 +81,7 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider { @Override public LoadBalancer newLoadBalancer(Helper helper) { - // TODO(zdapeng): pass helper and loadBalancerRegistry() to constructor args. - return new XdsRoutingLoadBalancer(); + return new XdsRoutingLoadBalancer(helper); } @Override @@ -112,8 +107,10 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider { + rawConfig)); } + LoadBalancerRegistry lbRegistry = + this.lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : this.lbRegistry; ConfigOrError selectedConfigOrError = - ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, loadBalancerRegistry()); + ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, lbRegistry); if (selectedConfigOrError.getError() != null) { return selectedConfigOrError; } diff --git a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java new file mode 100644 index 0000000000..a22fa4a935 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java @@ -0,0 +1,336 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import io.grpc.Attributes; +import io.grpc.Attributes.Key; +import io.grpc.CallOptions; +import io.grpc.ChannelLogger; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TestUtils; +import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; +import io.grpc.testing.TestMethodDescriptors; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route; +import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link XdsRoutingLoadBalancer}. */ +public class XdsRoutingLoadBalancerTest { + + private final List fooBalancers = new ArrayList<>(); + private final List barBalancers = new ArrayList<>(); + private final List bazBalancers = new ArrayList<>(); + private final List fooHelpers = new ArrayList<>(); + private final List barHelpers = new ArrayList<>(); + private final List bazHelpers = new ArrayList<>(); + + private final LoadBalancerProvider fooLbProvider = + new StandardLoadBalancerProvider("foo_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + LoadBalancer lb = mock(LoadBalancer.class); + fooBalancers.add(lb); + fooHelpers.add(helper); + return lb; + } + }; + private final LoadBalancerProvider barLbProvider = + new StandardLoadBalancerProvider("bar_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + LoadBalancer lb = mock(LoadBalancer.class); + barBalancers.add(lb); + barHelpers.add(helper); + return lb; + } + }; + private final LoadBalancerProvider bazLbProvider = + new StandardLoadBalancerProvider("baz_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + LoadBalancer lb = mock(LoadBalancer.class); + bazBalancers.add(lb); + bazHelpers.add(helper); + return lb; + } + }; + + @Mock + private Helper helper; + @Mock + private ChannelLogger channelLogger; + + private LoadBalancer xdsRoutingLb; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + doReturn(channelLogger).when(helper).getChannelLogger(); + xdsRoutingLb = new XdsRoutingLoadBalancer(helper); + } + + @After + public void tearDown() { + xdsRoutingLb.shutdown(); + + for (LoadBalancer balancer : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) { + verify(balancer).shutdown(); + } + } + + @Test + public void typicalWorkflow() { + // Resolution error. + xdsRoutingLb.handleNameResolutionError(Status.UNAUTHENTICATED); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + // Config update. + Attributes attributes = + Attributes.newBuilder().set(Key.create("fakeKey"), "fakeVal").build(); + Object fooConfig1 = new Object(); + Object barConfig1 = new Object(); + Object bazConfig1 = new Object(); + Object fooConfig2 = new Object(); + XdsRoutingConfig xdsRoutingConfig = new XdsRoutingConfig( + ImmutableList.of( + new Route("foo_action", new MethodName("service1", "method1")), + new Route("foo_action", new MethodName("service2", "method2")), + new Route("bar_action", new MethodName("service1", "hello")), + new Route("bar_action", new MethodName("service2", "hello")), + new Route("foo_action_2", new MethodName("service2", "")), + new Route("baz_action", new MethodName("", ""))), + ImmutableMap.of( + "foo_action", + new PolicySelection(fooLbProvider, null, fooConfig1), + "foo_action_2", + new PolicySelection(fooLbProvider, null, fooConfig2), + "bar_action", + new PolicySelection(barLbProvider, null, barConfig1), + "baz_action", + new PolicySelection(bazLbProvider, null, bazConfig1))); + xdsRoutingLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(attributes) + .setLoadBalancingPolicyConfig(xdsRoutingConfig).build()); + assertThat(fooBalancers).hasSize(2); + ArgumentCaptor resolvedAddressesCaptor = ArgumentCaptor.forClass(null); + verify(fooBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + ResolvedAddresses resolvedAddressesFoo0 = resolvedAddressesCaptor.getValue(); + verify(fooBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + ResolvedAddresses resolvedAddressesFoo1 = resolvedAddressesCaptor.getValue(); + assertThat(barBalancers).hasSize(1); + verify(barBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + ResolvedAddresses resolvedAddressesBar = resolvedAddressesCaptor.getValue(); + assertThat(bazBalancers).hasSize(1); + verify(bazBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + ResolvedAddresses resolvedAddressesBaz = resolvedAddressesCaptor.getValue(); + assertThat(resolvedAddressesFoo0.getAttributes()).isEqualTo(attributes); + assertThat(resolvedAddressesFoo1.getAttributes()).isEqualTo(attributes); + assertThat(resolvedAddressesBar.getAttributes()).isEqualTo(attributes); + assertThat(resolvedAddressesBaz.getAttributes()).isEqualTo(attributes); + assertThat( + Arrays.asList( + resolvedAddressesFoo0.getLoadBalancingPolicyConfig(), + resolvedAddressesFoo1.getLoadBalancingPolicyConfig())) + .containsExactly(fooConfig1, fooConfig2); + LoadBalancer fooBalancer1; + Helper fooHelper1; + Helper fooHelper2; + if (resolvedAddressesFoo0.getLoadBalancingPolicyConfig().equals(fooConfig1)) { + fooBalancer1 = fooBalancers.get(0); + fooHelper1 = fooHelpers.get(0); + fooHelper2 = fooHelpers.get(1); + } else { + fooBalancer1 = fooBalancers.get(1); + fooHelper1 = fooHelpers.get(1); + fooHelper2 = fooHelpers.get(0); + } + assertThat(resolvedAddressesBar.getLoadBalancingPolicyConfig()).isEqualTo(barConfig1); + assertThat(resolvedAddressesBaz.getLoadBalancingPolicyConfig()).isEqualTo(bazConfig1); + Helper barHelper = barHelpers.get(0); + Helper bazHelper = bazHelpers.get(0); + + // State update. + Subchannel subchannelFoo1 = mock(Subchannel.class); + Subchannel subchannelFoo2 = mock(Subchannel.class); + fooHelper1.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo1)); + fooHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo2)); + barHelper.updateBalancingState( + TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED.withDescription("abort bar"))); + bazHelper.updateBalancingState( + TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("data loss baz"))); + ArgumentCaptor connectivityStateCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor subchannelPickerCaptor = ArgumentCaptor.forClass(null); + verify(helper, atLeastOnce()).updateBalancingState( + connectivityStateCaptor.capture(), subchannelPickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY); + SubchannelPicker picker = subchannelPickerCaptor.getValue(); + assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1); + assertPickerRoutePathToSubchannel(picker, "service2", "method2", subchannelFoo1); + assertPickerRoutePathToError( + picker, "service1", "hello", Status.ABORTED.withDescription("abort bar")); + assertPickerRoutePathToError( + picker, "service2", "hello", Status.ABORTED.withDescription("abort bar")); + assertPickerRoutePathToSubchannel(picker, "service2", "otherMethod", subchannelFoo2); + assertPickerRoutePathToError( + picker, "otherService", "hello", Status.DATA_LOSS.withDescription("data loss baz")); + + // Resolution error. + Status error = Status.UNAVAILABLE.withDescription("fake unavailable"); + xdsRoutingLb.handleNameResolutionError(error); + for (LoadBalancer lb : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) { + verify(lb).handleNameResolutionError(error); + } + + // New config update. + Object fooConfig3 = new Object(); + Object barConfig2 = new Object(); + Object barConfig3 = new Object(); + Object bazConfig2 = new Object(); + xdsRoutingConfig = new XdsRoutingConfig( + ImmutableList.of( + new Route("foo_action", new MethodName("service1", "method1")), + new Route("foo_action", new MethodName("service2", "method3")), + new Route("bar_action", new MethodName("service1", "hello")), + new Route("bar_action_2", new MethodName("service2", "hello")), + new Route("baz_action", new MethodName("", ""))), + ImmutableMap.of( + "foo_action", + new PolicySelection(fooLbProvider, null, fooConfig3), + "bar_action", + new PolicySelection(barLbProvider, null, barConfig2), + "bar_action_2", + new PolicySelection(barLbProvider, null, barConfig3), + "baz_action", + new PolicySelection(bazLbProvider, null, bazConfig2))); + xdsRoutingLb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(xdsRoutingConfig) + .build()); + verify(fooBalancer1, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig()) + .isEqualTo(fooConfig3); + assertThat(barBalancers).hasSize(2); + verify(barBalancers.get(0), times(2)) + .handleResolvedAddresses(resolvedAddressesCaptor.capture()); + assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig()) + .isEqualTo(barConfig2); + verify(barBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture()); + assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig()) + .isEqualTo(barConfig3); + verify(bazBalancers.get(0), times(2)) + .handleResolvedAddresses(resolvedAddressesCaptor.capture()); + assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig()) + .isEqualTo(bazConfig2); + + // New status update. + Subchannel subchannelBar2 = mock(Subchannel.class); + Helper barHelper2 = barHelpers.get(1); + barHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelBar2)); + verify(helper, atLeastOnce()).updateBalancingState( + connectivityStateCaptor.capture(), subchannelPickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY); + picker = subchannelPickerCaptor.getValue(); + assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1); + assertPickerRoutePathToError( + picker, "service1", "method2", Status.DATA_LOSS.withDescription("data loss baz")); + assertPickerRoutePathToSubchannel(picker, "service2", "method3", subchannelFoo1); + assertPickerRoutePathToError( + picker, "service1", "hello", Status.ABORTED.withDescription("abort bar")); + assertPickerRoutePathToSubchannel(picker, "service2", "hello", subchannelBar2); + } + + private static PickSubchannelArgs pickSubchannelArgsForMethod( + final String service, final String method) { + return new PickSubchannelArgs() { + + @Override + public CallOptions getCallOptions() { + return CallOptions.DEFAULT; + } + + @Override + public Metadata getHeaders() { + return new Metadata(); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName(service + "/" + method) + .setRequestMarshaller(TestMethodDescriptors.voidMarshaller()) + .setResponseMarshaller(TestMethodDescriptors.voidMarshaller()) + .build(); + } + }; + } + + private static void assertPickerRoutePathToSubchannel( + SubchannelPicker picker, String service, String method, Subchannel expectedSubchannel) { + Subchannel actualSubchannel = + picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getSubchannel(); + assertThat(actualSubchannel).isEqualTo(expectedSubchannel); + } + + private static void assertPickerRoutePathToError( + SubchannelPicker picker, String service, String method, Status expectedStatus) { + Status actualStatus = + picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getStatus(); + assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode()); + assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription()); + } +}