mirror of https://github.com/grpc/grpc-java.git
xds: implement XdsRoutingLoadBalancer
This commit is contained in:
parent
8d4240f480
commit
6a64951005
|
|
@ -24,6 +24,11 @@ import static org.mockito.Mockito.when;
|
|||
import io.grpc.CallOptions;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import java.net.SocketAddress;
|
||||
|
|
@ -36,7 +41,41 @@ import org.mockito.stubbing.Answer;
|
|||
/**
|
||||
* Common utility methods for tests.
|
||||
*/
|
||||
final class TestUtils {
|
||||
public final class TestUtils {
|
||||
|
||||
/** Base class for a standard LoadBalancerProvider implementation. */
|
||||
public abstract static class StandardLoadBalancerProvider extends LoadBalancerProvider {
|
||||
private final String policyName;
|
||||
|
||||
public StandardLoadBalancerProvider(String policyName) {
|
||||
this.policyName = policyName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getPolicyName() {
|
||||
return policyName;
|
||||
}
|
||||
}
|
||||
|
||||
/** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */
|
||||
public static SubchannelPicker pickerOf(final Subchannel subchannel) {
|
||||
return new SubchannelPicker() {
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
return PickResult.withSubchannel(subchannel);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
static class MockClientTransportInfo {
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -16,18 +16,205 @@
|
|||
|
||||
package io.grpc.xds;
|
||||
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.Status;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.ConnectivityState.CONNECTING;
|
||||
import static io.grpc.ConnectivityState.IDLE;
|
||||
import static io.grpc.ConnectivityState.READY;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.util.ForwardingLoadBalancerHelper;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.XdsLogger.XdsLogLevel;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
// TODO(zdapeng): Implementation.
|
||||
/** Load balancer for xds_routing policy. */
|
||||
final class XdsRoutingLoadBalancer extends LoadBalancer {
|
||||
|
||||
private final XdsLogger logger;
|
||||
private final Helper helper;
|
||||
private final Map<String, GracefulSwitchLoadBalancer> routeBalancers = new HashMap<>();
|
||||
private final Map<String, RouteHelper> routeHelpers = new HashMap<>();
|
||||
|
||||
private Map<String, PolicySelection> actions = ImmutableMap.of();
|
||||
private List<Route> routes = ImmutableList.of();
|
||||
|
||||
XdsRoutingLoadBalancer(Helper helper) {
|
||||
this.helper = checkNotNull(helper, "helper");
|
||||
logger = XdsLogger.withLogId(
|
||||
InternalLogId.allocate("xds-routing-lb", helper.getAuthority()));
|
||||
logger.log(XdsLogLevel.INFO, "Created");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||
XdsRoutingConfig xdsRoutingConfig =
|
||||
(XdsRoutingConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
checkNotNull(xdsRoutingConfig, "Missing xds_routing lb config");
|
||||
|
||||
Map<String, PolicySelection> newActions = xdsRoutingConfig.actions;
|
||||
for (String actionName : newActions.keySet()) {
|
||||
PolicySelection action = newActions.get(actionName);
|
||||
if (!actions.containsKey(actionName)) {
|
||||
RouteHelper routeHelper = new RouteHelper();
|
||||
GracefulSwitchLoadBalancer routeBalancer = new GracefulSwitchLoadBalancer(routeHelper);
|
||||
routeBalancer.switchTo(action.getProvider());
|
||||
routeHelpers.put(actionName, routeHelper);
|
||||
routeBalancers.put(actionName, routeBalancer);
|
||||
} else if (!action.getProvider().equals(actions.get(actionName).getProvider())) {
|
||||
routeBalancers.get(actionName).switchTo(action.getProvider());
|
||||
}
|
||||
}
|
||||
|
||||
this.routes = xdsRoutingConfig.routes;
|
||||
this.actions = newActions;
|
||||
|
||||
for (String actionName : actions.keySet()) {
|
||||
routeBalancers.get(actionName).handleResolvedAddresses(
|
||||
resolvedAddresses.toBuilder()
|
||||
.setLoadBalancingPolicyConfig(actions.get(actionName).getConfig())
|
||||
.build());
|
||||
}
|
||||
|
||||
// Cleanup removed actions.
|
||||
// TODO(zdapeng): cache removed actions for 15 minutes.
|
||||
for (String actionName : routeBalancers.keySet()) {
|
||||
if (!actions.containsKey(actionName)) {
|
||||
routeBalancers.get(actionName).shutdown();
|
||||
}
|
||||
}
|
||||
routeBalancers.keySet().retainAll(actions.keySet());
|
||||
routeHelpers.keySet().retainAll(actions.keySet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
|
||||
if (routeBalancers.isEmpty()) {
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
}
|
||||
for (LoadBalancer routeBalancer : routeBalancers.values()) {
|
||||
routeBalancer.handleNameResolutionError(error);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
logger.log(XdsLogLevel.INFO, "Shutdown");
|
||||
for (LoadBalancer routeBalancer : routeBalancers.values()) {
|
||||
routeBalancer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canHandleEmptyAddressListFromNameResolution() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private void updateOverallBalancingState() {
|
||||
ConnectivityState overallState = null;
|
||||
// Use LinkedHashMap to preserve the order of routes.
|
||||
Map<MethodName, SubchannelPicker> routePickers = new LinkedHashMap<>();
|
||||
for (Route route : routes) {
|
||||
RouteHelper routeHelper = routeHelpers.get(route.actionName);
|
||||
routePickers.put(route.methodName, routeHelper.currentPicker);
|
||||
ConnectivityState routeState = routeHelper.currentState;
|
||||
overallState = aggregateState(overallState, routeState);
|
||||
}
|
||||
if (overallState != null) {
|
||||
SubchannelPicker picker = new PathMatchingSubchannelPicker(routePickers);
|
||||
helper.updateBalancingState(overallState, picker);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static ConnectivityState aggregateState(
|
||||
@Nullable ConnectivityState overallState, ConnectivityState childState) {
|
||||
if (overallState == null) {
|
||||
return childState;
|
||||
}
|
||||
if (overallState == READY || childState == READY) {
|
||||
return READY;
|
||||
}
|
||||
if (overallState == CONNECTING || childState == CONNECTING) {
|
||||
return CONNECTING;
|
||||
}
|
||||
if (overallState == IDLE || childState == IDLE) {
|
||||
return IDLE;
|
||||
}
|
||||
return overallState;
|
||||
}
|
||||
|
||||
/**
|
||||
* The lb helper for a single route balancer.
|
||||
*/
|
||||
private final class RouteHelper extends ForwardingLoadBalancerHelper {
|
||||
ConnectivityState currentState = CONNECTING;
|
||||
SubchannelPicker currentPicker = BUFFER_PICKER;
|
||||
|
||||
@Override
|
||||
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
|
||||
currentState = newState;
|
||||
currentPicker = newPicker;
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Helper delegate() {
|
||||
return helper;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class PathMatchingSubchannelPicker extends SubchannelPicker {
|
||||
|
||||
final Map<MethodName, SubchannelPicker> routePickers;
|
||||
|
||||
/**
|
||||
* Constructs a picker that will match the path of PickSubchannelArgs with the given map.
|
||||
* The order of the map entries matters. First match will be picked even if second match is an
|
||||
* exact (service + method) path match.
|
||||
*/
|
||||
PathMatchingSubchannelPicker(Map<MethodName, SubchannelPicker> routePickers) {
|
||||
this.routePickers = routePickers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
for (MethodName methodName : routePickers.keySet()) {
|
||||
if (match(args.getMethodDescriptor(), methodName)) {
|
||||
return routePickers.get(methodName).pickSubchannel(args);
|
||||
}
|
||||
}
|
||||
// At least the default route should match, otherwise there is a bug.
|
||||
throw new IllegalStateException("PathMatchingSubchannelPicker: error in matching path");
|
||||
}
|
||||
|
||||
boolean match(MethodDescriptor<?, ?> methodDescriptor, MethodName methodName) {
|
||||
if (methodName.service.isEmpty() && methodName.method.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
if (methodName.method.isEmpty()) {
|
||||
return methodName.service.equals(methodDescriptor.getServiceName());
|
||||
}
|
||||
return (methodName.service + '/' + methodName.method)
|
||||
.equals(methodDescriptor.getFullMethodName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,10 +64,6 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider {
|
|||
this.lbRegistry = lbRegistry;
|
||||
}
|
||||
|
||||
private LoadBalancerRegistry loadBalancerRegistry() {
|
||||
return lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : lbRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
|
|
@ -85,8 +81,7 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider {
|
|||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
// TODO(zdapeng): pass helper and loadBalancerRegistry() to constructor args.
|
||||
return new XdsRoutingLoadBalancer();
|
||||
return new XdsRoutingLoadBalancer(helper);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -112,8 +107,10 @@ public final class XdsRoutingLoadBalancerProvider extends LoadBalancerProvider {
|
|||
+ rawConfig));
|
||||
}
|
||||
|
||||
LoadBalancerRegistry lbRegistry =
|
||||
this.lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : this.lbRegistry;
|
||||
ConfigOrError selectedConfigOrError =
|
||||
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, loadBalancerRegistry());
|
||||
ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, lbRegistry);
|
||||
if (selectedConfigOrError.getError() != null) {
|
||||
return selectedConfigOrError;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,336 @@
|
|||
/*
|
||||
* Copyright 2020 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.ConnectivityState.READY;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Attributes.Key;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.internal.TestUtils;
|
||||
import io.grpc.internal.TestUtils.StandardLoadBalancerProvider;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route;
|
||||
import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
/** Tests for {@link XdsRoutingLoadBalancer}. */
|
||||
public class XdsRoutingLoadBalancerTest {
|
||||
|
||||
private final List<LoadBalancer> fooBalancers = new ArrayList<>();
|
||||
private final List<LoadBalancer> barBalancers = new ArrayList<>();
|
||||
private final List<LoadBalancer> bazBalancers = new ArrayList<>();
|
||||
private final List<Helper> fooHelpers = new ArrayList<>();
|
||||
private final List<Helper> barHelpers = new ArrayList<>();
|
||||
private final List<Helper> bazHelpers = new ArrayList<>();
|
||||
|
||||
private final LoadBalancerProvider fooLbProvider =
|
||||
new StandardLoadBalancerProvider("foo_policy") {
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
LoadBalancer lb = mock(LoadBalancer.class);
|
||||
fooBalancers.add(lb);
|
||||
fooHelpers.add(helper);
|
||||
return lb;
|
||||
}
|
||||
};
|
||||
private final LoadBalancerProvider barLbProvider =
|
||||
new StandardLoadBalancerProvider("bar_policy") {
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
LoadBalancer lb = mock(LoadBalancer.class);
|
||||
barBalancers.add(lb);
|
||||
barHelpers.add(helper);
|
||||
return lb;
|
||||
}
|
||||
};
|
||||
private final LoadBalancerProvider bazLbProvider =
|
||||
new StandardLoadBalancerProvider("baz_policy") {
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
LoadBalancer lb = mock(LoadBalancer.class);
|
||||
bazBalancers.add(lb);
|
||||
bazHelpers.add(helper);
|
||||
return lb;
|
||||
}
|
||||
};
|
||||
|
||||
@Mock
|
||||
private Helper helper;
|
||||
@Mock
|
||||
private ChannelLogger channelLogger;
|
||||
|
||||
private LoadBalancer xdsRoutingLb;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
doReturn(channelLogger).when(helper).getChannelLogger();
|
||||
xdsRoutingLb = new XdsRoutingLoadBalancer(helper);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
xdsRoutingLb.shutdown();
|
||||
|
||||
for (LoadBalancer balancer : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) {
|
||||
verify(balancer).shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void typicalWorkflow() {
|
||||
// Resolution error.
|
||||
xdsRoutingLb.handleNameResolutionError(Status.UNAUTHENTICATED);
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
|
||||
|
||||
// Config update.
|
||||
Attributes attributes =
|
||||
Attributes.newBuilder().set(Key.<String>create("fakeKey"), "fakeVal").build();
|
||||
Object fooConfig1 = new Object();
|
||||
Object barConfig1 = new Object();
|
||||
Object bazConfig1 = new Object();
|
||||
Object fooConfig2 = new Object();
|
||||
XdsRoutingConfig xdsRoutingConfig = new XdsRoutingConfig(
|
||||
ImmutableList.of(
|
||||
new Route("foo_action", new MethodName("service1", "method1")),
|
||||
new Route("foo_action", new MethodName("service2", "method2")),
|
||||
new Route("bar_action", new MethodName("service1", "hello")),
|
||||
new Route("bar_action", new MethodName("service2", "hello")),
|
||||
new Route("foo_action_2", new MethodName("service2", "")),
|
||||
new Route("baz_action", new MethodName("", ""))),
|
||||
ImmutableMap.of(
|
||||
"foo_action",
|
||||
new PolicySelection(fooLbProvider, null, fooConfig1),
|
||||
"foo_action_2",
|
||||
new PolicySelection(fooLbProvider, null, fooConfig2),
|
||||
"bar_action",
|
||||
new PolicySelection(barLbProvider, null, barConfig1),
|
||||
"baz_action",
|
||||
new PolicySelection(bazLbProvider, null, bazConfig1)));
|
||||
xdsRoutingLb.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setAttributes(attributes)
|
||||
.setLoadBalancingPolicyConfig(xdsRoutingConfig).build());
|
||||
assertThat(fooBalancers).hasSize(2);
|
||||
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(fooBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
ResolvedAddresses resolvedAddressesFoo0 = resolvedAddressesCaptor.getValue();
|
||||
verify(fooBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
ResolvedAddresses resolvedAddressesFoo1 = resolvedAddressesCaptor.getValue();
|
||||
assertThat(barBalancers).hasSize(1);
|
||||
verify(barBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
ResolvedAddresses resolvedAddressesBar = resolvedAddressesCaptor.getValue();
|
||||
assertThat(bazBalancers).hasSize(1);
|
||||
verify(bazBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
ResolvedAddresses resolvedAddressesBaz = resolvedAddressesCaptor.getValue();
|
||||
assertThat(resolvedAddressesFoo0.getAttributes()).isEqualTo(attributes);
|
||||
assertThat(resolvedAddressesFoo1.getAttributes()).isEqualTo(attributes);
|
||||
assertThat(resolvedAddressesBar.getAttributes()).isEqualTo(attributes);
|
||||
assertThat(resolvedAddressesBaz.getAttributes()).isEqualTo(attributes);
|
||||
assertThat(
|
||||
Arrays.asList(
|
||||
resolvedAddressesFoo0.getLoadBalancingPolicyConfig(),
|
||||
resolvedAddressesFoo1.getLoadBalancingPolicyConfig()))
|
||||
.containsExactly(fooConfig1, fooConfig2);
|
||||
LoadBalancer fooBalancer1;
|
||||
Helper fooHelper1;
|
||||
Helper fooHelper2;
|
||||
if (resolvedAddressesFoo0.getLoadBalancingPolicyConfig().equals(fooConfig1)) {
|
||||
fooBalancer1 = fooBalancers.get(0);
|
||||
fooHelper1 = fooHelpers.get(0);
|
||||
fooHelper2 = fooHelpers.get(1);
|
||||
} else {
|
||||
fooBalancer1 = fooBalancers.get(1);
|
||||
fooHelper1 = fooHelpers.get(1);
|
||||
fooHelper2 = fooHelpers.get(0);
|
||||
}
|
||||
assertThat(resolvedAddressesBar.getLoadBalancingPolicyConfig()).isEqualTo(barConfig1);
|
||||
assertThat(resolvedAddressesBaz.getLoadBalancingPolicyConfig()).isEqualTo(bazConfig1);
|
||||
Helper barHelper = barHelpers.get(0);
|
||||
Helper bazHelper = bazHelpers.get(0);
|
||||
|
||||
// State update.
|
||||
Subchannel subchannelFoo1 = mock(Subchannel.class);
|
||||
Subchannel subchannelFoo2 = mock(Subchannel.class);
|
||||
fooHelper1.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo1));
|
||||
fooHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo2));
|
||||
barHelper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED.withDescription("abort bar")));
|
||||
bazHelper.updateBalancingState(
|
||||
TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("data loss baz")));
|
||||
ArgumentCaptor<ConnectivityState> connectivityStateCaptor = ArgumentCaptor.forClass(null);
|
||||
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(helper, atLeastOnce()).updateBalancingState(
|
||||
connectivityStateCaptor.capture(), subchannelPickerCaptor.capture());
|
||||
assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY);
|
||||
SubchannelPicker picker = subchannelPickerCaptor.getValue();
|
||||
assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1);
|
||||
assertPickerRoutePathToSubchannel(picker, "service2", "method2", subchannelFoo1);
|
||||
assertPickerRoutePathToError(
|
||||
picker, "service1", "hello", Status.ABORTED.withDescription("abort bar"));
|
||||
assertPickerRoutePathToError(
|
||||
picker, "service2", "hello", Status.ABORTED.withDescription("abort bar"));
|
||||
assertPickerRoutePathToSubchannel(picker, "service2", "otherMethod", subchannelFoo2);
|
||||
assertPickerRoutePathToError(
|
||||
picker, "otherService", "hello", Status.DATA_LOSS.withDescription("data loss baz"));
|
||||
|
||||
// Resolution error.
|
||||
Status error = Status.UNAVAILABLE.withDescription("fake unavailable");
|
||||
xdsRoutingLb.handleNameResolutionError(error);
|
||||
for (LoadBalancer lb : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) {
|
||||
verify(lb).handleNameResolutionError(error);
|
||||
}
|
||||
|
||||
// New config update.
|
||||
Object fooConfig3 = new Object();
|
||||
Object barConfig2 = new Object();
|
||||
Object barConfig3 = new Object();
|
||||
Object bazConfig2 = new Object();
|
||||
xdsRoutingConfig = new XdsRoutingConfig(
|
||||
ImmutableList.of(
|
||||
new Route("foo_action", new MethodName("service1", "method1")),
|
||||
new Route("foo_action", new MethodName("service2", "method3")),
|
||||
new Route("bar_action", new MethodName("service1", "hello")),
|
||||
new Route("bar_action_2", new MethodName("service2", "hello")),
|
||||
new Route("baz_action", new MethodName("", ""))),
|
||||
ImmutableMap.of(
|
||||
"foo_action",
|
||||
new PolicySelection(fooLbProvider, null, fooConfig3),
|
||||
"bar_action",
|
||||
new PolicySelection(barLbProvider, null, barConfig2),
|
||||
"bar_action_2",
|
||||
new PolicySelection(barLbProvider, null, barConfig3),
|
||||
"baz_action",
|
||||
new PolicySelection(bazLbProvider, null, bazConfig2)));
|
||||
xdsRoutingLb.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
|
||||
.setLoadBalancingPolicyConfig(xdsRoutingConfig)
|
||||
.build());
|
||||
verify(fooBalancer1, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
|
||||
.isEqualTo(fooConfig3);
|
||||
assertThat(barBalancers).hasSize(2);
|
||||
verify(barBalancers.get(0), times(2))
|
||||
.handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
|
||||
.isEqualTo(barConfig2);
|
||||
verify(barBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
|
||||
.isEqualTo(barConfig3);
|
||||
verify(bazBalancers.get(0), times(2))
|
||||
.handleResolvedAddresses(resolvedAddressesCaptor.capture());
|
||||
assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
|
||||
.isEqualTo(bazConfig2);
|
||||
|
||||
// New status update.
|
||||
Subchannel subchannelBar2 = mock(Subchannel.class);
|
||||
Helper barHelper2 = barHelpers.get(1);
|
||||
barHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelBar2));
|
||||
verify(helper, atLeastOnce()).updateBalancingState(
|
||||
connectivityStateCaptor.capture(), subchannelPickerCaptor.capture());
|
||||
assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY);
|
||||
picker = subchannelPickerCaptor.getValue();
|
||||
assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1);
|
||||
assertPickerRoutePathToError(
|
||||
picker, "service1", "method2", Status.DATA_LOSS.withDescription("data loss baz"));
|
||||
assertPickerRoutePathToSubchannel(picker, "service2", "method3", subchannelFoo1);
|
||||
assertPickerRoutePathToError(
|
||||
picker, "service1", "hello", Status.ABORTED.withDescription("abort bar"));
|
||||
assertPickerRoutePathToSubchannel(picker, "service2", "hello", subchannelBar2);
|
||||
}
|
||||
|
||||
private static PickSubchannelArgs pickSubchannelArgsForMethod(
|
||||
final String service, final String method) {
|
||||
return new PickSubchannelArgs() {
|
||||
|
||||
@Override
|
||||
public CallOptions getCallOptions() {
|
||||
return CallOptions.DEFAULT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata getHeaders() {
|
||||
return new Metadata();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MethodDescriptor<Void, Void> getMethodDescriptor() {
|
||||
return MethodDescriptor.<Void, Void>newBuilder()
|
||||
.setType(MethodType.UNARY)
|
||||
.setFullMethodName(service + "/" + method)
|
||||
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
|
||||
.build();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void assertPickerRoutePathToSubchannel(
|
||||
SubchannelPicker picker, String service, String method, Subchannel expectedSubchannel) {
|
||||
Subchannel actualSubchannel =
|
||||
picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getSubchannel();
|
||||
assertThat(actualSubchannel).isEqualTo(expectedSubchannel);
|
||||
}
|
||||
|
||||
private static void assertPickerRoutePathToError(
|
||||
SubchannelPicker picker, String service, String method, Status expectedStatus) {
|
||||
Status actualStatus =
|
||||
picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getStatus();
|
||||
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
|
||||
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue