diff --git a/xds/src/main/java/io/grpc/xds/InterLocalityPicker.java b/xds/src/main/java/io/grpc/xds/InterLocalityPicker.java index 193c2a8399..48523415fe 100644 --- a/xds/src/main/java/io/grpc/xds/InterLocalityPicker.java +++ b/xds/src/main/java/io/grpc/xds/InterLocalityPicker.java @@ -25,7 +25,6 @@ import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.SubchannelPicker; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; final class InterLocalityPicker extends SubchannelPicker { @@ -54,21 +53,8 @@ final class InterLocalityPicker extends SubchannelPicker { } } - interface ThreadSafeRandom { - int nextInt(int bound); - } - - private static final class ThreadSafeRadomImpl implements ThreadSafeRandom { - static final ThreadSafeRandom instance = new ThreadSafeRadomImpl(); - - @Override - public int nextInt(int bound) { - return ThreadLocalRandom.current().nextInt(bound); - } - } - InterLocalityPicker(List weightedChildPickers) { - this(weightedChildPickers, ThreadSafeRadomImpl.instance); + this(weightedChildPickers, ThreadSafeRandom.ThreadSafeRandomImpl.instance); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index 253bcea366..825b3671af 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -25,12 +25,15 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; @@ -39,6 +42,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.Status; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.InterLocalityPicker.WeightedChildPicker; +import io.grpc.xds.XdsComms.DropOverload; import io.grpc.xds.XdsComms.Locality; import io.grpc.xds.XdsComms.LocalityInfo; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; @@ -49,6 +53,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; /** * Manages EAG and locality info for a collection of subchannels, not including subchannels @@ -57,12 +63,12 @@ import java.util.Set; // Must be accessed/run in SynchronizedContext. interface LocalityStore { - boolean hasNonDropBackends(); - void reset(); void updateLocalityStore(Map localityInfoMap); + void updateDropPercentage(@Nullable ImmutableList dropOverloads); + void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState); final class LocalityStoreImpl implements LocalityStore { @@ -71,20 +77,27 @@ interface LocalityStore { private final Helper helper; private final PickerFactory pickerFactory; private final LoadBalancerProvider loadBalancerProvider; + private final ThreadSafeRandom random; private Map localityMap = new HashMap<>(); + @CheckForNull + private ImmutableList dropOverloads; + LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) { - this(helper, pickerFactoryImpl, lbRegistry); + this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance); } @VisibleForTesting - LocalityStoreImpl(Helper helper, PickerFactory pickerFactory, LoadBalancerRegistry lbRegistry) { + LocalityStoreImpl( + Helper helper, PickerFactory pickerFactory, LoadBalancerRegistry lbRegistry, + ThreadSafeRandom random) { this.helper = helper; this.pickerFactory = pickerFactory; loadBalancerProvider = checkNotNull( lbRegistry.getProvider(ROUND_ROBIN), "Unable to find '%s' LoadBalancer", ROUND_ROBIN); + this.random = random; } @VisibleForTesting // Introduced for testing only. @@ -92,6 +105,33 @@ interface LocalityStore { SubchannelPicker picker(List childPickers); } + private static final class DroppablePicker extends SubchannelPicker { + + final ImmutableList dropOverloads; + final SubchannelPicker delegate; + final ThreadSafeRandom random; + + DroppablePicker( + ImmutableList dropOverloads, SubchannelPicker delegate, + ThreadSafeRandom random) { + this.dropOverloads = dropOverloads; + this.delegate = delegate; + this.random = random; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + for (DropOverload dropOverload : dropOverloads) { + int rand = random.nextInt(1000_000); + if (rand < dropOverload.dropsPerMillion) { + return PickResult.withDrop(Status.UNAVAILABLE.withDescription( + "dropped by loadbalancer: " + dropOverload.toString())); + } + } + return delegate.pickSubchannel(args); + } + } + private static final PickerFactory pickerFactoryImpl = new PickerFactory() { @Override @@ -100,12 +140,6 @@ interface LocalityStore { } }; - @Override - public boolean hasNonDropBackends() { - // TODO: impl - return false; - } - // This is triggered by xdsLoadbalancer.handleSubchannelState @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { @@ -187,6 +221,11 @@ interface LocalityStore { } + @Override + public void updateDropPercentage(ImmutableList dropOverloads) { + this.dropOverloads = dropOverloads; + } + private static ConnectivityState aggregateState( ConnectivityState overallState, ConnectivityState childState) { if (overallState == null) { @@ -247,6 +286,11 @@ interface LocalityStore { } else { picker = pickerFactory.picker(childPickers); } + + if (dropOverloads != null && !dropOverloads.isEmpty()) { + picker = new DroppablePicker(dropOverloads, picker, random); + } + if (state != null) { helper.getChannelLogger().log( ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", state, picker); diff --git a/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java new file mode 100644 index 0000000000..6a01f0a3d7 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe // Except for impls/mocks in tests +interface ThreadSafeRandom { + int nextInt(int bound); + + final class ThreadSafeRandomImpl implements ThreadSafeRandom { + + static final ThreadSafeRandom instance = new ThreadSafeRandomImpl(); + + private ThreadSafeRandomImpl() {} + + @Override + public int nextInt(int bound) { + return ThreadLocalRandom.current().nextInt(bound); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsComms.java b/xds/src/main/java/io/grpc/xds/XdsComms.java index c49f6a8e7c..df0619a5cf 100644 --- a/xds/src/main/java/io/grpc/xds/XdsComms.java +++ b/xds/src/main/java/io/grpc/xds/XdsComms.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -33,6 +34,8 @@ import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer.Helper; import io.grpc.ManagedChannel; @@ -168,6 +171,41 @@ final class XdsComms { } } + static final class DropOverload { + final String category; + final int dropsPerMillion; + + DropOverload(String category, int dropsPerMillion) { + this.category = category; + this.dropsPerMillion = dropsPerMillion; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DropOverload that = (DropOverload) o; + return dropsPerMillion == that.dropsPerMillion && Objects.equal(category, that.category); + } + + @Override + public int hashCode() { + return Objects.hashCode(category, dropsPerMillion); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("category", category) + .add("dropsPerMillion", dropsPerMillion) + .toString(); + } + } + private final class AdsStream { static final String EDS_TYPE_URL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"; @@ -204,10 +242,32 @@ final class XdsComms { adsStreamCallback.onError(); return; } + if (!firstEdsResponseReceived) { firstEdsResponseReceived = true; adsStreamCallback.onWorking(); } + + ImmutableList dropOverloads = null; + ClusterLoadAssignment.Policy policy = clusterLoadAssignment.getPolicy(); + if (policy != null) { + List dropOverloadsProto = + policy.getDropOverloadsList(); + if (dropOverloadsProto != null) { + ImmutableList.Builder dropOverloadsBuilder + = ImmutableList.builder(); + for (ClusterLoadAssignment.Policy.DropOverload dropOverload + : dropOverloadsProto) { + dropOverloadsBuilder.add(new DropOverload( + dropOverload.getCategory(), + rateInMillion(dropOverload.getDropPercentage()))); + } + + dropOverloads = dropOverloadsBuilder.build(); + } + } + localityStore.updateDropPercentage(dropOverloads); + List localities = clusterLoadAssignment.getEndpointsList(); Map localityEndpointsMapping = new LinkedHashMap<>(); for (LocalityLbEndpoints localityLbEndpoints : localities) { @@ -227,7 +287,6 @@ final class XdsComms { localityEndpointsMapping = Collections.unmodifiableMap(localityEndpointsMapping); - // TODO: parse drop_percentage, and also updateLoacalistyStore with dropPercentage localityStore.updateLocalityStore(localityEndpointsMapping); } } @@ -297,6 +356,31 @@ final class XdsComms { } } + private static int rateInMillion(FractionalPercent fractionalPercent) { + int numerator = fractionalPercent.getNumerator(); + checkArgument(numerator >= 0, "numerator shouldn't be negative in %s", fractionalPercent); + + DenominatorType type = fractionalPercent.getDenominator(); + switch (type) { + case TEN_THOUSAND: + numerator *= 100; + break; + case HUNDRED: + numerator *= 100_00; + break; + case MILLION: + break; + default: + throw new IllegalArgumentException("unknow denominator type of " + fractionalPercent); + } + + if (numerator > 1000_000) { + numerator = 1000_000; + } + + return numerator; + } + /** * Starts a new ADS streaming RPC. */ diff --git a/xds/src/main/java/io/grpc/xds/XdsLbState.java b/xds/src/main/java/io/grpc/xds/XdsLbState.java index 739e981f72..f89121a75b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLbState.java +++ b/xds/src/main/java/io/grpc/xds/XdsLbState.java @@ -109,9 +109,7 @@ class XdsLbState { } final void handleNameResolutionError(Status error) { - if (!localityStore.hasNonDropBackends()) { - // TODO: maybe update picker with transient failure - } + // NO-OP? } final void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { diff --git a/xds/src/test/java/io/grpc/xds/InterLocalityPickerTest.java b/xds/src/test/java/io/grpc/xds/InterLocalityPickerTest.java index 8235a71a31..4b7f752259 100644 --- a/xds/src/test/java/io/grpc/xds/InterLocalityPickerTest.java +++ b/xds/src/test/java/io/grpc/xds/InterLocalityPickerTest.java @@ -24,7 +24,6 @@ import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.Status; -import io.grpc.xds.InterLocalityPicker.ThreadSafeRandom; import io.grpc.xds.InterLocalityPicker.WeightedChildPicker; import java.util.ArrayList; import java.util.Arrays; diff --git a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java index 0c9d52e8e6..93b92907ba 100644 --- a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,6 +44,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.xds.InterLocalityPicker.WeightedChildPicker; import io.grpc.xds.LocalityStore.LocalityStoreImpl; import io.grpc.xds.LocalityStore.LocalityStoreImpl.PickerFactory; +import io.grpc.xds.XdsComms.DropOverload; import io.grpc.xds.XdsComms.LbEndpoint; import io.grpc.xds.XdsComms.Locality; import io.grpc.xds.XdsComms.LocalityInfo; @@ -159,6 +161,8 @@ public class LocalityStoreTest { private Helper helper; @Mock private PickSubchannelArgs pickSubchannelArgs; + @Mock + private ThreadSafeRandom random; private LocalityStore localityStore; @@ -168,7 +172,7 @@ public class LocalityStoreTest { doReturn(mock(Subchannel.class)).when(helper).createSubchannel( ArgumentMatchers.anyList(), any(Attributes.class)); lbRegistry.register(lbProvider); - localityStore = new LocalityStoreImpl(helper, pickerFactory, lbRegistry); + localityStore = new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random); } @Test @@ -266,6 +270,74 @@ public class LocalityStoreTest { .handleResolvedAddresses(resolvedAddressesCaptor1.capture()); assertThat(resolvedAddressesCaptor1.getValue().getAddresses()).containsExactly(eag11); assertThat(pickerFactory.totalReadyLocalities).isEqualTo(1); + + verify(random, never()).nextInt(1000_000); + } + + @Test + public void updateLoaclityStore_withDrop() { + LocalityInfo localityInfo1 = + new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1); + LocalityInfo localityInfo2 = + new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2); + LocalityInfo localityInfo3 = + new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3); + Map localityInfoMap = ImmutableMap.of( + locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3); + localityStore.updateLocalityStore(localityInfoMap); + localityStore.updateDropPercentage(ImmutableList.of( + new DropOverload("throttle", 365), + new DropOverload("lb", 1234))); + + assertThat(loadBalancers).hasSize(3); + ArgumentCaptor resolvedAddressesCaptor1 = + ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(loadBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor1.capture()); + assertThat(resolvedAddressesCaptor1.getValue().getAddresses()).containsExactly(eag11, eag12); + ArgumentCaptor resolvedAddressesCaptor2 = + ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(loadBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor2.capture()); + assertThat(resolvedAddressesCaptor2.getValue().getAddresses()).containsExactly(eag21, eag22); + ArgumentCaptor resolvedAddressesCaptor3 = + ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(loadBalancers.get(2)).handleResolvedAddresses(resolvedAddressesCaptor3.capture()); + assertThat(resolvedAddressesCaptor3.getValue().getAddresses()).containsExactly(eag31, eag32); + assertThat(pickerFactory.totalReadyLocalities).isEqualTo(0); + + // subchannel12 goes to CONNECTING + final Subchannel subchannel12 = + helpers.get(0).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY); + verify(helper).createSubchannel(ImmutableList.of(eag12), Attributes.EMPTY); + SubchannelPicker subchannelPicker12 = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withSubchannel(subchannel12); + } + }; + helpers.get(0).updateBalancingState(CONNECTING, subchannelPicker12); + ArgumentCaptor subchannelPickerCaptor12 = + ArgumentCaptor.forClass(SubchannelPicker.class); + verify(helper).updateBalancingState(same(CONNECTING), subchannelPickerCaptor12.capture()); + + doReturn(365, 1234).when(random).nextInt(1000_000); + assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)) + .isEqualTo(PickResult.withNoResult()); + verify(random, times(2)).nextInt(1000_000); + + doReturn(366, 1235).when(random).nextInt(1000_000); + assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)) + .isEqualTo(PickResult.withNoResult()); + verify(random, times(4)).nextInt(1000_000); + + doReturn(364, 1234).when(random).nextInt(1000_000); + assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) + .isTrue(); + verify(random, times(5)).nextInt(1000_000); + + doReturn(365, 1233).when(random).nextInt(1000_000); + assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) + .isTrue(); + verify(random, times(7)).nextInt(1000_000); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsCommsTest.java b/xds/src/test/java/io/grpc/xds/XdsCommsTest.java index b1c05d3e55..71ee1fc3c4 100644 --- a/xds/src/test/java/io/grpc/xds/XdsCommsTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsCommsTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.UInt32Value; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.core.Address; @@ -37,6 +38,8 @@ import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancerProvider; @@ -50,6 +53,7 @@ import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.XdsComms.AdsStreamCallback; +import io.grpc.xds.XdsComms.DropOverload; import io.grpc.xds.XdsComms.LocalityInfo; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -186,7 +190,7 @@ public class XdsCommsTest { } @Test - public void standardMode_sendEdsRequest_getEdsResponse() { + public void standardMode_sendEdsRequest_getEdsResponse_withNoDrop() { assertThat(streamRecorder.getValues()).hasSize(1); DiscoveryRequest request = streamRecorder.getValues().get(0); assertThat(request.getTypeUrl()) @@ -257,6 +261,7 @@ public class XdsCommsTest { 2); XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2); + verify(localityStore).updateDropPercentage(ImmutableList.of()); verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture()); assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly( locality1, localityInfo1, locality2, localityInfo2).inOrder(); @@ -279,6 +284,7 @@ public class XdsCommsTest { .build(); responseWriter.onNext(edsResponse); + verify(localityStore, times(2)).updateDropPercentage(ImmutableList.of()); verify(localityStore, times(2)).updateLocalityStore(localityEndpointsMappingCaptor.capture()); assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly( locality2, localityInfo2, locality1, localityInfo1).inOrder(); @@ -286,6 +292,105 @@ public class XdsCommsTest { xdsComms.shutdownChannel(); } + @Test + public void standardMode_sendEdsRequest_getEdsResponse_withDrops() { + Locality localityProto1 = Locality.newBuilder() + .setRegion("region1").setZone("zone1").setSubZone("subzone1").build(); + LbEndpoint endpoint11 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr11").setPortValue(11)))) + .setLoadBalancingWeight(UInt32Value.of(11)) + .build(); + LbEndpoint endpoint12 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr12").setPortValue(12)))) + .setLoadBalancingWeight(UInt32Value.of(12)) + .build(); + Locality localityProto2 = Locality.newBuilder() + .setRegion("region2").setZone("zone2").setSubZone("subzone2").build(); + LbEndpoint endpoint21 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr21").setPortValue(21)))) + .setLoadBalancingWeight(UInt32Value.of(21)) + .build(); + LbEndpoint endpoint22 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr22").setPortValue(22)))) + .setLoadBalancingWeight(UInt32Value.of(22)) + .build(); + + + DiscoveryResponse edsResponseWithDrops = DiscoveryResponse.newBuilder() + .addResources(Any.pack(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLocality(localityProto2) + .addLbEndpoints(endpoint21) + .setLoadBalancingWeight(UInt32Value.of(2))) + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLocality(localityProto1) + .addLbEndpoints(endpoint11) + .setLoadBalancingWeight(UInt32Value.of(1))) + .setPolicy(Policy.newBuilder() + .addDropOverloads( + io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload + .newBuilder() + .setCategory("throttle") + .setDropPercentage(FractionalPercent.newBuilder() + .setNumerator(123).setDenominator(DenominatorType.MILLION).build()) + .build()) + .addDropOverloads( + io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload + .newBuilder() + .setCategory("lb") + .setDropPercentage(FractionalPercent.newBuilder() + .setNumerator(456).setDenominator(DenominatorType.TEN_THOUSAND).build()) + .build()) + .addDropOverloads( + io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload + .newBuilder() + .setCategory("fake_category") + .setDropPercentage(FractionalPercent.newBuilder() + .setNumerator(78).setDenominator(DenominatorType.HUNDRED).build()) + .build()) + .addDropOverloads( + io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload + .newBuilder() + .setCategory("fake_category_2") + .setDropPercentage(FractionalPercent.newBuilder() + .setNumerator(789).setDenominator(DenominatorType.HUNDRED).build()) + .build()) + .build()) + .build())) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build(); + responseWriter.onNext(edsResponseWithDrops); + + verify(localityStore).updateDropPercentage(ImmutableList.of( + new DropOverload("throttle", 123), + new DropOverload("lb", 456_00), + new DropOverload("fake_category", 78_00_00), + new DropOverload("fake_category_2", 1000_000))); + verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture()); + + XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1); + LocalityInfo localityInfo1 = new LocalityInfo( + ImmutableList.of(new XdsComms.LbEndpoint(endpoint11)), 1); + LocalityInfo localityInfo2 = new LocalityInfo( + ImmutableList.of(new XdsComms.LbEndpoint(endpoint21)), 2); + XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2); + assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly( + locality2, localityInfo2, locality1, localityInfo1).inOrder(); + xdsComms.shutdownChannel(); + } + @Test public void serverOnCompleteShouldFailClient() { responseWriter.onCompleted(); diff --git a/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java b/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java index a2b27b1807..78d950a3a0 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLbStateTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; @@ -64,6 +65,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -88,6 +90,8 @@ public class XdsLbStateTest { private AdsStreamCallback adsStreamCallback; @Mock private PickSubchannelArgs pickSubchannelArgs; + @Mock + private ThreadSafeRandom random; @Captor private ArgumentCaptor subchannelPickerCaptor; @Captor @@ -172,7 +176,7 @@ public class XdsLbStateTest { doReturn("fake_authority").when(helper).getAuthority(); doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); lbRegistry.register(childLbProvider); - localityStore = new LocalityStoreImpl(helper, interLocalityPickerFactory, lbRegistry); + localityStore = new LocalityStoreImpl(helper, interLocalityPickerFactory, lbRegistry, random); String serverName = InProcessServerBuilder.generateName(); @@ -213,6 +217,11 @@ public class XdsLbStateTest { doReturn(channel).when(helper).createResolvingOobChannel(BALANCER_NAME); } + @After + public void tearDown() { + verifyNoMoreInteractions(random); + } + @Test public void shutdownAndReleaseXdsCommsDoesShutdown() { XdsLbState xdsLbState = mock(XdsLbState.class);