xds: handle drop percentage for SubchannelPicker (#5765)

* xds: handle drop percentage for SubchannelPicker

* XdsCommsTest

* refactor ThreadSafeRandom

* remove hasNonDropBackends

* fix comments
This commit is contained in:
ZHANG Dapeng 2019-05-23 10:36:59 -07:00 committed by GitHub
parent a1bc92a748
commit 08843f8d59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 367 additions and 33 deletions

View File

@ -25,7 +25,6 @@ import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
final class InterLocalityPicker extends SubchannelPicker {
@ -54,21 +53,8 @@ final class InterLocalityPicker extends SubchannelPicker {
}
}
interface ThreadSafeRandom {
int nextInt(int bound);
}
private static final class ThreadSafeRadomImpl implements ThreadSafeRandom {
static final ThreadSafeRandom instance = new ThreadSafeRadomImpl();
@Override
public int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}
}
InterLocalityPicker(List<WeightedChildPicker> weightedChildPickers) {
this(weightedChildPickers, ThreadSafeRadomImpl.instance);
this(weightedChildPickers, ThreadSafeRandom.ThreadSafeRandomImpl.instance);
}
@VisibleForTesting

View File

@ -25,12 +25,15 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
@ -39,6 +42,7 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
import io.grpc.xds.XdsComms.DropOverload;
import io.grpc.xds.XdsComms.Locality;
import io.grpc.xds.XdsComms.LocalityInfo;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
@ -49,6 +53,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
/**
* Manages EAG and locality info for a collection of subchannels, not including subchannels
@ -57,12 +63,12 @@ import java.util.Set;
// Must be accessed/run in SynchronizedContext.
interface LocalityStore {
boolean hasNonDropBackends();
void reset();
void updateLocalityStore(Map<Locality, LocalityInfo> localityInfoMap);
void updateDropPercentage(@Nullable ImmutableList<DropOverload> dropOverloads);
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState);
final class LocalityStoreImpl implements LocalityStore {
@ -71,20 +77,27 @@ interface LocalityStore {
private final Helper helper;
private final PickerFactory pickerFactory;
private final LoadBalancerProvider loadBalancerProvider;
private final ThreadSafeRandom random;
private Map<Locality, LocalityLbInfo> localityMap = new HashMap<>();
@CheckForNull
private ImmutableList<DropOverload> dropOverloads;
LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
this(helper, pickerFactoryImpl, lbRegistry);
this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance);
}
@VisibleForTesting
LocalityStoreImpl(Helper helper, PickerFactory pickerFactory, LoadBalancerRegistry lbRegistry) {
LocalityStoreImpl(
Helper helper, PickerFactory pickerFactory, LoadBalancerRegistry lbRegistry,
ThreadSafeRandom random) {
this.helper = helper;
this.pickerFactory = pickerFactory;
loadBalancerProvider = checkNotNull(
lbRegistry.getProvider(ROUND_ROBIN),
"Unable to find '%s' LoadBalancer", ROUND_ROBIN);
this.random = random;
}
@VisibleForTesting // Introduced for testing only.
@ -92,6 +105,33 @@ interface LocalityStore {
SubchannelPicker picker(List<WeightedChildPicker> childPickers);
}
private static final class DroppablePicker extends SubchannelPicker {
final ImmutableList<DropOverload> dropOverloads;
final SubchannelPicker delegate;
final ThreadSafeRandom random;
DroppablePicker(
ImmutableList<DropOverload> dropOverloads, SubchannelPicker delegate,
ThreadSafeRandom random) {
this.dropOverloads = dropOverloads;
this.delegate = delegate;
this.random = random;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
for (DropOverload dropOverload : dropOverloads) {
int rand = random.nextInt(1000_000);
if (rand < dropOverload.dropsPerMillion) {
return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
"dropped by loadbalancer: " + dropOverload.toString()));
}
}
return delegate.pickSubchannel(args);
}
}
private static final PickerFactory pickerFactoryImpl =
new PickerFactory() {
@Override
@ -100,12 +140,6 @@ interface LocalityStore {
}
};
@Override
public boolean hasNonDropBackends() {
// TODO: impl
return false;
}
// This is triggered by xdsLoadbalancer.handleSubchannelState
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
@ -187,6 +221,11 @@ interface LocalityStore {
}
@Override
public void updateDropPercentage(ImmutableList<DropOverload> dropOverloads) {
this.dropOverloads = dropOverloads;
}
private static ConnectivityState aggregateState(
ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
@ -247,6 +286,11 @@ interface LocalityStore {
} else {
picker = pickerFactory.picker(childPickers);
}
if (dropOverloads != null && !dropOverloads.isEmpty()) {
picker = new DroppablePicker(dropOverloads, picker, random);
}
if (state != null) {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", state, picker);

View File

@ -0,0 +1,37 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe // Except for impls/mocks in tests
interface ThreadSafeRandom {
int nextInt(int bound);
final class ThreadSafeRandomImpl implements ThreadSafeRandom {
static final ThreadSafeRandom instance = new ThreadSafeRandomImpl();
private ThreadSafeRandomImpl() {}
@Override
public int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}
}
}

View File

@ -16,6 +16,7 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@ -33,6 +34,8 @@ import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.Helper;
import io.grpc.ManagedChannel;
@ -168,6 +171,41 @@ final class XdsComms {
}
}
static final class DropOverload {
final String category;
final int dropsPerMillion;
DropOverload(String category, int dropsPerMillion) {
this.category = category;
this.dropsPerMillion = dropsPerMillion;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DropOverload that = (DropOverload) o;
return dropsPerMillion == that.dropsPerMillion && Objects.equal(category, that.category);
}
@Override
public int hashCode() {
return Objects.hashCode(category, dropsPerMillion);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("category", category)
.add("dropsPerMillion", dropsPerMillion)
.toString();
}
}
private final class AdsStream {
static final String EDS_TYPE_URL =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
@ -204,10 +242,32 @@ final class XdsComms {
adsStreamCallback.onError();
return;
}
if (!firstEdsResponseReceived) {
firstEdsResponseReceived = true;
adsStreamCallback.onWorking();
}
ImmutableList<DropOverload> dropOverloads = null;
ClusterLoadAssignment.Policy policy = clusterLoadAssignment.getPolicy();
if (policy != null) {
List<ClusterLoadAssignment.Policy.DropOverload> dropOverloadsProto =
policy.getDropOverloadsList();
if (dropOverloadsProto != null) {
ImmutableList.Builder<DropOverload> dropOverloadsBuilder
= ImmutableList.builder();
for (ClusterLoadAssignment.Policy.DropOverload dropOverload
: dropOverloadsProto) {
dropOverloadsBuilder.add(new DropOverload(
dropOverload.getCategory(),
rateInMillion(dropOverload.getDropPercentage())));
}
dropOverloads = dropOverloadsBuilder.build();
}
}
localityStore.updateDropPercentage(dropOverloads);
List<LocalityLbEndpoints> localities = clusterLoadAssignment.getEndpointsList();
Map<Locality, LocalityInfo> localityEndpointsMapping = new LinkedHashMap<>();
for (LocalityLbEndpoints localityLbEndpoints : localities) {
@ -227,7 +287,6 @@ final class XdsComms {
localityEndpointsMapping = Collections.unmodifiableMap(localityEndpointsMapping);
// TODO: parse drop_percentage, and also updateLoacalistyStore with dropPercentage
localityStore.updateLocalityStore(localityEndpointsMapping);
}
}
@ -297,6 +356,31 @@ final class XdsComms {
}
}
private static int rateInMillion(FractionalPercent fractionalPercent) {
int numerator = fractionalPercent.getNumerator();
checkArgument(numerator >= 0, "numerator shouldn't be negative in %s", fractionalPercent);
DenominatorType type = fractionalPercent.getDenominator();
switch (type) {
case TEN_THOUSAND:
numerator *= 100;
break;
case HUNDRED:
numerator *= 100_00;
break;
case MILLION:
break;
default:
throw new IllegalArgumentException("unknow denominator type of " + fractionalPercent);
}
if (numerator > 1000_000) {
numerator = 1000_000;
}
return numerator;
}
/**
* Starts a new ADS streaming RPC.
*/

View File

@ -109,9 +109,7 @@ class XdsLbState {
}
final void handleNameResolutionError(Status error) {
if (!localityStore.hasNonDropBackends()) {
// TODO: maybe update picker with transient failure
}
// NO-OP?
}
final void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {

View File

@ -24,7 +24,6 @@ import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status;
import io.grpc.xds.InterLocalityPicker.ThreadSafeRandom;
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
import java.util.ArrayList;
import java.util.Arrays;

View File

@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -43,6 +44,7 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
import io.grpc.xds.LocalityStore.LocalityStoreImpl;
import io.grpc.xds.LocalityStore.LocalityStoreImpl.PickerFactory;
import io.grpc.xds.XdsComms.DropOverload;
import io.grpc.xds.XdsComms.LbEndpoint;
import io.grpc.xds.XdsComms.Locality;
import io.grpc.xds.XdsComms.LocalityInfo;
@ -159,6 +161,8 @@ public class LocalityStoreTest {
private Helper helper;
@Mock
private PickSubchannelArgs pickSubchannelArgs;
@Mock
private ThreadSafeRandom random;
private LocalityStore localityStore;
@ -168,7 +172,7 @@ public class LocalityStoreTest {
doReturn(mock(Subchannel.class)).when(helper).createSubchannel(
ArgumentMatchers.<EquivalentAddressGroup>anyList(), any(Attributes.class));
lbRegistry.register(lbProvider);
localityStore = new LocalityStoreImpl(helper, pickerFactory, lbRegistry);
localityStore = new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random);
}
@Test
@ -266,6 +270,74 @@ public class LocalityStoreTest {
.handleResolvedAddresses(resolvedAddressesCaptor1.capture());
assertThat(resolvedAddressesCaptor1.getValue().getAddresses()).containsExactly(eag11);
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(1);
verify(random, never()).nextInt(1000_000);
}
@Test
public void updateLoaclityStore_withDrop() {
LocalityInfo localityInfo1 =
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
LocalityInfo localityInfo2 =
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
LocalityInfo localityInfo3 =
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
localityStore.updateLocalityStore(localityInfoMap);
localityStore.updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 365),
new DropOverload("lb", 1234)));
assertThat(loadBalancers).hasSize(3);
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor1 =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(loadBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor1.capture());
assertThat(resolvedAddressesCaptor1.getValue().getAddresses()).containsExactly(eag11, eag12);
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor2 =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(loadBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
assertThat(resolvedAddressesCaptor2.getValue().getAddresses()).containsExactly(eag21, eag22);
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor3 =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(loadBalancers.get(2)).handleResolvedAddresses(resolvedAddressesCaptor3.capture());
assertThat(resolvedAddressesCaptor3.getValue().getAddresses()).containsExactly(eag31, eag32);
assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0);
// subchannel12 goes to CONNECTING
final Subchannel subchannel12 =
helpers.get(0).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY);
verify(helper).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY);
SubchannelPicker subchannelPicker12 = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel12);
}
};
helpers.get(0).updateBalancingState(CONNECTING, subchannelPicker12);
ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor12 =
ArgumentCaptor.forClass(SubchannelPicker.class);
verify(helper).updateBalancingState(same(CONNECTING), subchannelPickerCaptor12.capture());
doReturn(365, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(2)).nextInt(1000_000);
doReturn(366, 1235).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult());
verify(random, times(4)).nextInt(1000_000);
doReturn(364, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(5)).nextInt(1000_000);
doReturn(365, 1233).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue();
verify(random, times(7)).nextInt(1000_000);
}
@Test

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address;
@ -37,6 +38,8 @@ import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
@ -50,6 +53,7 @@ import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsComms.AdsStreamCallback;
import io.grpc.xds.XdsComms.DropOverload;
import io.grpc.xds.XdsComms.LocalityInfo;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -186,7 +190,7 @@ public class XdsCommsTest {
}
@Test
public void standardMode_sendEdsRequest_getEdsResponse() {
public void standardMode_sendEdsRequest_getEdsResponse_withNoDrop() {
assertThat(streamRecorder.getValues()).hasSize(1);
DiscoveryRequest request = streamRecorder.getValues().get(0);
assertThat(request.getTypeUrl())
@ -257,6 +261,7 @@ public class XdsCommsTest {
2);
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality1, localityInfo1, locality2, localityInfo2).inOrder();
@ -279,6 +284,7 @@ public class XdsCommsTest {
.build();
responseWriter.onNext(edsResponse);
verify(localityStore, times(2)).updateDropPercentage(ImmutableList.<DropOverload>of());
verify(localityStore, times(2)).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality2, localityInfo2, locality1, localityInfo1).inOrder();
@ -286,6 +292,105 @@ public class XdsCommsTest {
xdsComms.shutdownChannel();
}
@Test
public void standardMode_sendEdsRequest_getEdsResponse_withDrops() {
Locality localityProto1 = Locality.newBuilder()
.setRegion("region1").setZone("zone1").setSubZone("subzone1").build();
LbEndpoint endpoint11 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr11").setPortValue(11))))
.setLoadBalancingWeight(UInt32Value.of(11))
.build();
LbEndpoint endpoint12 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr12").setPortValue(12))))
.setLoadBalancingWeight(UInt32Value.of(12))
.build();
Locality localityProto2 = Locality.newBuilder()
.setRegion("region2").setZone("zone2").setSubZone("subzone2").build();
LbEndpoint endpoint21 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr21").setPortValue(21))))
.setLoadBalancingWeight(UInt32Value.of(21))
.build();
LbEndpoint endpoint22 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr22").setPortValue(22))))
.setLoadBalancingWeight(UInt32Value.of(22))
.build();
DiscoveryResponse edsResponseWithDrops = DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto2)
.addLbEndpoints(endpoint21)
.setLoadBalancingWeight(UInt32Value.of(2)))
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.setLoadBalancingWeight(UInt32Value.of(1)))
.setPolicy(Policy.newBuilder()
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("throttle")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(123).setDenominator(DenominatorType.MILLION).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("lb")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(456).setDenominator(DenominatorType.TEN_THOUSAND).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("fake_category")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(78).setDenominator(DenominatorType.HUNDRED).build())
.build())
.addDropOverloads(
io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload
.newBuilder()
.setCategory("fake_category_2")
.setDropPercentage(FractionalPercent.newBuilder()
.setNumerator(789).setDenominator(DenominatorType.HUNDRED).build())
.build())
.build())
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
responseWriter.onNext(edsResponseWithDrops);
verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("throttle", 123),
new DropOverload("lb", 456_00),
new DropOverload("fake_category", 78_00_00),
new DropOverload("fake_category_2", 1000_000)));
verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1);
LocalityInfo localityInfo1 = new LocalityInfo(
ImmutableList.of(new XdsComms.LbEndpoint(endpoint11)), 1);
LocalityInfo localityInfo2 = new LocalityInfo(
ImmutableList.of(new XdsComms.LbEndpoint(endpoint21)), 2);
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality2, localityInfo2, locality1, localityInfo1).inOrder();
xdsComms.shutdownChannel();
}
@Test
public void serverOnCompleteShouldFailClient() {
responseWriter.onCompleted();

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
@ -64,6 +65,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -88,6 +90,8 @@ public class XdsLbStateTest {
private AdsStreamCallback adsStreamCallback;
@Mock
private PickSubchannelArgs pickSubchannelArgs;
@Mock
private ThreadSafeRandom random;
@Captor
private ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor;
@Captor
@ -172,7 +176,7 @@ public class XdsLbStateTest {
doReturn("fake_authority").when(helper).getAuthority();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
lbRegistry.register(childLbProvider);
localityStore = new LocalityStoreImpl(helper, interLocalityPickerFactory, lbRegistry);
localityStore = new LocalityStoreImpl(helper, interLocalityPickerFactory, lbRegistry, random);
String serverName = InProcessServerBuilder.generateName();
@ -213,6 +217,11 @@ public class XdsLbStateTest {
doReturn(channel).when(helper).createResolvingOobChannel(BALANCER_NAME);
}
@After
public void tearDown() {
verifyNoMoreInteractions(random);
}
@Test
public void shutdownAndReleaseXdsCommsDoesShutdown() {
XdsLbState xdsLbState = mock(XdsLbState.class);