util, dual stack: change address based outlier detection to endpoint based (#10939)

This commit is contained in:
yifeizhuang 2024-02-27 10:35:59 -08:00 committed by GitHub
parent 8087977c0b
commit c61fe69803
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 197 additions and 126 deletions

View File

@ -60,12 +60,25 @@ import javax.annotation.Nullable;
*
* <p>This implements the outlier detection gRFC:
* https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
*
* <p>The implementation maintains two maps. Each endpoint status is tracked using an
* EndpointTracker. E.g. for two endpoints with these address list and their tracker:
* Endpoint e1 : [a1, a2] is tracked with EndpointTracker t1
* Endpoint e2 : [a3] is tracked with EndpointTracker t2
* The two maps are:
* First, addressMap maps from socket address -> endpoint tracker : [a1 -> t1, a2 -> t1, a3 -> t2]
* EndpointTracker has reference to all the subchannels of the corresponding endpoint.
* Second, trackerMap maps from unordered address set -> endpoint tracker.
* Updated upon address updates.
*/
@Internal
public final class OutlierDetectionLoadBalancer extends LoadBalancer {
@VisibleForTesting
final AddressTrackerMap trackerMap;
final EndpointTrackerMap endpointTrackerMap;
@VisibleForTesting
final Map<SocketAddress, EndpointTracker> addressMap = new HashMap<>();
private final SynchronizationContext syncContext;
private final Helper childHelper;
@ -77,8 +90,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
private final ChannelLogger logger;
private static final Attributes.Key<AddressTracker> ADDRESS_TRACKER_ATTR_KEY
= Attributes.Key.create("addressTrackerKey");
private static final Attributes.Key<EndpointTracker> ENDPOINT_TRACKER_KEY
= Attributes.Key.create("endpointTrackerKey");
/**
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
@ -87,7 +100,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
logger = helper.getChannelLogger();
childHelper = new ChildHelper(checkNotNull(helper, "helper"));
switchLb = new GracefulSwitchLoadBalancer(childHelper);
trackerMap = new AddressTrackerMap();
endpointTrackerMap = new EndpointTrackerMap();
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.timeProvider = timeProvider;
@ -100,17 +113,32 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
OutlierDetectionLoadBalancerConfig config
= (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
// The map should only retain entries for addresses in this latest update.
ArrayList<SocketAddress> addresses = new ArrayList<>();
// The map should only retain entries for endpoints in this latest update.
Set<Set<SocketAddress>> endpoints = new HashSet<>();
Map<SocketAddress, Set<SocketAddress>> addressEndpointMap = new HashMap<>();
for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) {
addresses.addAll(addressGroup.getAddresses());
Set<SocketAddress> endpoint = ImmutableSet.copyOf(addressGroup.getAddresses());
endpoints.add(endpoint);
for (SocketAddress address : addressGroup.getAddresses()) {
if (addressEndpointMap.containsKey(address)) {
logger.log(ChannelLogLevel.WARNING,
"Unexpected duplicated address {0} belongs to multiple endpoints", address);
}
trackerMap.keySet().retainAll(addresses);
addressEndpointMap.put(address, endpoint);
}
}
endpointTrackerMap.keySet().retainAll(endpoints);
trackerMap.updateTrackerConfigs(config);
endpointTrackerMap.updateTrackerConfigs(config);
// Add any new ones.
trackerMap.putNewTrackers(config, addresses);
endpointTrackerMap.putNewTrackers(config, endpoints);
// Update address -> tracker map.
addressMap.clear();
for (Map.Entry<SocketAddress, Set<SocketAddress>> e : addressEndpointMap.entrySet()) {
addressMap.put(e.getKey(), endpointTrackerMap.get(e.getValue()));
}
switchLb.switchTo(config.childPolicy.getProvider());
@ -133,7 +161,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// for a fresh start.
if (detectionTimerHandle != null) {
detectionTimerHandle.cancel();
trackerMap.resetCallCounters();
endpointTrackerMap.resetCallCounters();
}
detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger),
@ -143,7 +171,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// uneject any addresses we may have ejected.
detectionTimerHandle.cancel();
detectionTimerStartNanos = null;
trackerMap.cancelTracking();
endpointTrackerMap.cancelTracking();
}
switchLb.handleResolvedAddresses(
@ -180,13 +208,13 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
public void run() {
detectionTimerStartNanos = timeProvider.currentTimeNanos();
trackerMap.swapCounters();
endpointTrackerMap.swapCounters();
for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) {
algo.ejectOutliers(trackerMap, detectionTimerStartNanos);
algo.ejectOutliers(endpointTrackerMap, detectionTimerStartNanos);
}
trackerMap.maybeUnejectOutliers(detectionTimerStartNanos);
endpointTrackerMap.maybeUnejectOutliers(detectionTimerStartNanos);
}
}
@ -217,8 +245,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// the subchannel will be added to the map and be included in outlier detection.
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
if (hasSingleAddress(addressGroups)
&& trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) {
AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0));
&& addressMap.containsKey(addressGroups.get(0).getAddresses().get(0))) {
EndpointTracker tracker = addressMap.get(addressGroups.get(0).getAddresses().get(0));
tracker.addSubchannel(subchannel);
// If this address has already been ejected, we need to immediately eject this Subchannel.
@ -239,7 +267,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
class OutlierDetectionSubchannel extends ForwardingSubchannel {
private final Subchannel delegate;
private AddressTracker addressTracker;
private EndpointTracker endpointTracker;
private boolean ejected;
private ConnectivityStateInfo lastSubchannelState;
@ -275,16 +303,16 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
@Override
public void shutdown() {
if (addressTracker != null) {
addressTracker.removeSubchannel(this);
if (endpointTracker != null) {
endpointTracker.removeSubchannel(this);
}
super.shutdown();
}
@Override
public Attributes getAttributes() {
if (addressTracker != null) {
return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker)
if (endpointTracker != null) {
return delegate.getAttributes().toBuilder().set(ENDPOINT_TRACKER_KEY, endpointTracker)
.build();
} else {
return delegate.getAttributes();
@ -300,22 +328,22 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// No change in address plurality, we replace the single one with a new one.
if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) {
// Remove the current subchannel from the old address it is associated with in the map.
if (trackerMap.containsValue(addressTracker)) {
addressTracker.removeSubchannel(this);
if (endpointTrackerMap.containsValue(endpointTracker)) {
endpointTracker.removeSubchannel(this);
}
// If the map has an entry for the new address, we associate this subchannel with it.
SocketAddress address = addressGroups.get(0).getAddresses().get(0);
if (trackerMap.containsKey(address)) {
trackerMap.get(address).addSubchannel(this);
if (addressMap.containsKey(address)) {
addressMap.get(address).addSubchannel(this);
}
} else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) {
// We go from a single address to having multiple, making this subchannel uneligible for
// outlier detection. Remove it from all trackers and reset the call counters of all the
// associated trackers.
// Remove the current subchannel from the old address it is associated with in the map.
if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) {
AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0));
if (addressMap.containsKey(getAddresses().getAddresses().get(0))) {
EndpointTracker tracker = addressMap.get(getAddresses().getAddresses().get(0));
tracker.removeSubchannel(this);
tracker.resetCallCounters();
}
@ -323,8 +351,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// We go from, previously uneligble, multiple address mode to a single address. If the map
// has an entry for the new address, we associate this subchannel with it.
SocketAddress address = addressGroups.get(0).getAddresses().get(0);
if (trackerMap.containsKey(address)) {
AddressTracker tracker = trackerMap.get(address);
if (addressMap.containsKey(address)) {
EndpointTracker tracker = addressMap.get(address);
tracker.addSubchannel(this);
}
}
@ -337,14 +365,14 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
/**
* If the {@link Subchannel} is considered for outlier detection the associated {@link
* AddressTracker} should be set.
* EndpointTracker} should be set.
*/
void setAddressTracker(AddressTracker addressTracker) {
this.addressTracker = addressTracker;
void setEndpointTracker(EndpointTracker endpointTracker) {
this.endpointTracker = endpointTracker;
}
void clearAddressTracker() {
this.addressTracker = null;
void clearEndpointTracker() {
this.endpointTracker = null;
}
void eject() {
@ -419,7 +447,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
Subchannel subchannel = pickResult.getSubchannel();
if (subchannel != null) {
return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY),
subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY),
pickResult.getStreamTracerFactory()));
}
@ -432,12 +460,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
*/
class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory {
private final AddressTracker tracker;
private final EndpointTracker tracker;
@Nullable
private final ClientStreamTracer.Factory delegateFactory;
ResultCountingClientStreamTracerFactory(AddressTracker tracker,
ResultCountingClientStreamTracerFactory(EndpointTracker tracker,
@Nullable ClientStreamTracer.Factory delegateFactory) {
this.tracker = tracker;
this.delegateFactory = delegateFactory;
@ -472,10 +500,9 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
}
/**
* Tracks additional information about a set of equivalent addresses needed for outlier
* detection.
* Tracks additional information about the endpoint needed for outlier detection.
*/
static class AddressTracker {
static class EndpointTracker {
private OutlierDetectionLoadBalancerConfig config;
// Marked as volatile to assure that when the inactive counter is swapped in as the new active
@ -486,7 +513,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
private int ejectionTimeMultiplier;
private final Set<OutlierDetectionSubchannel> subchannels = new HashSet<>();
AddressTracker(OutlierDetectionLoadBalancerConfig config) {
EndpointTracker(OutlierDetectionLoadBalancerConfig config) {
this.config = config;
}
@ -506,12 +533,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
} else if (!subchannelsEjected() && subchannel.isEjected()) {
subchannel.uneject();
}
subchannel.setAddressTracker(this);
subchannel.setEndpointTracker(this);
return subchannels.add(subchannel);
}
boolean removeSubchannel(OutlierDetectionSubchannel subchannel) {
subchannel.clearAddressTracker();
subchannel.clearEndpointTracker();
return subchannels.remove(subchannel);
}
@ -631,46 +658,42 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
@Override
public String toString() {
return "AddressTracker{"
return "EndpointTracker{"
+ "subchannels=" + subchannels
+ '}';
}
}
/**
* Maintains a mapping from addresses to their trackers.
* Maintains a mapping from endpoint (a set of addresses) to their trackers.
*/
static class AddressTrackerMap extends ForwardingMap<SocketAddress, AddressTracker> {
private final Map<SocketAddress, AddressTracker> trackerMap;
static class EndpointTrackerMap extends ForwardingMap<Set<SocketAddress>, EndpointTracker> {
private final Map<Set<SocketAddress>, EndpointTracker> trackerMap;
AddressTrackerMap() {
EndpointTrackerMap() {
trackerMap = new HashMap<>();
}
@Override
protected Map<SocketAddress, AddressTracker> delegate() {
protected Map<Set<SocketAddress>, EndpointTracker> delegate() {
return trackerMap;
}
void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) {
for (AddressTracker tracker: trackerMap.values()) {
for (EndpointTracker tracker: trackerMap.values()) {
tracker.setConfig(config);
}
}
/** Adds a new tracker for every given address. */
void putNewTrackers(OutlierDetectionLoadBalancerConfig config,
Collection<SocketAddress> addresses) {
for (SocketAddress address : addresses) {
if (!trackerMap.containsKey(address)) {
trackerMap.put(address, new AddressTracker(config));
}
}
Set<Set<SocketAddress>> endpoints) {
endpoints.forEach(e -> trackerMap.putIfAbsent(e, new EndpointTracker(config)));
}
/** Resets the call counters for all the trackers in the map. */
void resetCallCounters() {
for (AddressTracker tracker : trackerMap.values()) {
for (EndpointTracker tracker : trackerMap.values()) {
tracker.resetCallCounters();
}
}
@ -680,7 +703,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
* to reset the ejection time multiplier.
*/
void cancelTracking() {
for (AddressTracker tracker : trackerMap.values()) {
for (EndpointTracker tracker : trackerMap.values()) {
if (tracker.subchannelsEjected()) {
tracker.unejectSubchannels();
}
@ -690,7 +713,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
/** Swaps the active and inactive counters for each tracker. */
void swapCounters() {
for (AddressTracker tracker : trackerMap.values()) {
for (EndpointTracker tracker : trackerMap.values()) {
tracker.swapCounters();
}
}
@ -701,7 +724,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
* time allowed.
*/
void maybeUnejectOutliers(Long detectionTimerStartNanos) {
for (AddressTracker tracker : trackerMap.values()) {
for (EndpointTracker tracker : trackerMap.values()) {
if (!tracker.subchannelsEjected()) {
tracker.decrementEjectionTimeMultiplier();
}
@ -720,15 +743,15 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
if (trackerMap.isEmpty()) {
return 0;
}
int totalAddresses = 0;
int ejectedAddresses = 0;
for (AddressTracker tracker : trackerMap.values()) {
totalAddresses++;
int totalEndpoints = 0;
int ejectedEndpoints = 0;
for (EndpointTracker tracker : trackerMap.values()) {
totalEndpoints++;
if (tracker.subchannelsEjected()) {
ejectedAddresses++;
ejectedEndpoints++;
}
}
return ((double)ejectedAddresses / totalAddresses) * 100;
return ((double)ejectedEndpoints / totalEndpoints) * 100;
}
}
@ -739,7 +762,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
interface OutlierEjectionAlgorithm {
/** Eject any outlier addresses. */
void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos);
void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos);
/** Builds a list of algorithms that are enabled in the given config. */
@Nullable
@ -775,12 +798,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
}
@Override
public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
public void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos) {
// Only consider addresses that have the minimum request volume specified in the config.
List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
List<EndpointTracker> trackersWithVolume = trackersWithVolume(trackerMap,
config.successRateEjection.requestVolume);
// If we don't have enough addresses with significant volume then there's nothing to do.
// If we don't have enough endpoints with significant volume then there's nothing to do.
if (trackersWithVolume.size() < config.successRateEjection.minimumHosts
|| trackersWithVolume.size() == 0) {
return;
@ -788,7 +811,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
// Calculate mean and standard deviation of the fractions of successful calls.
List<Double> successRates = new ArrayList<>();
for (AddressTracker tracker : trackersWithVolume) {
for (EndpointTracker tracker : trackersWithVolume) {
successRates.add(tracker.successRate());
}
double mean = mean(successRates);
@ -797,7 +820,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
double requiredSuccessRate =
mean - stdev * (config.successRateEjection.stdevFactor / 1000f);
for (AddressTracker tracker : trackersWithVolume) {
for (EndpointTracker tracker : trackersWithVolume) {
// If we are above or equal to the max ejection percentage, don't eject any more. This will
// allow the total ejections to go one above the max, but at the same time it assures at
// least one ejection, which the spec calls for. This behavior matches what Envoy proxy
@ -813,7 +836,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
+ "Parameters: successRate={1}, mean={2}, stdev={3}, "
+ "requiredSuccessRate={4}",
tracker, tracker.successRate(), mean, stdev, requiredSuccessRate);
// Only eject some addresses based on the enforcement percentage.
// Only eject some endpoints based on the enforcement percentage.
if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) {
tracker.ejectSubchannels(ejectionTimeNanos);
}
@ -859,19 +882,19 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
}
@Override
public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) {
public void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos) {
// Only consider addresses that have the minimum request volume specified in the config.
List<AddressTracker> trackersWithVolume = trackersWithVolume(trackerMap,
// Only consider endpoints that have the minimum request volume specified in the config.
List<EndpointTracker> trackersWithVolume = trackersWithVolume(trackerMap,
config.failurePercentageEjection.requestVolume);
// If we don't have enough addresses with significant volume then there's nothing to do.
// If we don't have enough endpoints with significant volume then there's nothing to do.
if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts
|| trackersWithVolume.size() == 0) {
return;
}
// If this address does not have enough volume to be considered, skip to the next one.
for (AddressTracker tracker : trackersWithVolume) {
// If this endpoint does not have enough volume to be considered, skip to the next one.
for (EndpointTracker tracker : trackersWithVolume) {
// If we are above or equal to the max ejection percentage, don't eject any more. This will
// allow the total ejections to go one above the max, but at the same time it assures at
// least one ejection, which the spec calls for. This behavior matches what Envoy proxy
@ -900,10 +923,10 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer {
}
/** Returns only the trackers that have the minimum configured volume to be considered. */
private static List<AddressTracker> trackersWithVolume(AddressTrackerMap trackerMap,
private static List<EndpointTracker> trackersWithVolume(EndpointTrackerMap trackerMap,
int volume) {
List<AddressTracker> trackersWithVolume = new ArrayList<>();
for (AddressTracker tracker : trackerMap.values()) {
List<EndpointTracker> trackersWithVolume = new ArrayList<>();
for (EndpointTracker tracker : trackerMap.values()) {
if (tracker.inactiveVolume() >= volume) {
trackersWithVolume.add(tracker);
}

View File

@ -55,7 +55,7 @@ import io.grpc.internal.FakeClock;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.internal.TestUtils.StandardLoadBalancerProvider;
import io.grpc.util.OutlierDetectionLoadBalancer.AddressTracker;
import io.grpc.util.OutlierDetectionLoadBalancer.EndpointTracker;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.FailurePercentageEjection;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.SuccessRateEjection;
@ -64,6 +64,7 @@ import io.grpc.util.OutlierDetectionLoadBalancer.SuccessRateOutlierEjectionAlgor
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -302,8 +303,9 @@ public class OutlierDetectionLoadBalancerTest {
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers.get(0)));
assertThat(loadBalancer.trackerMap).hasSize(1);
AddressTracker addressTracker = (AddressTracker) loadBalancer.trackerMap.values().toArray()[0];
assertThat(loadBalancer.endpointTrackerMap).hasSize(1);
EndpointTracker addressTracker =
(EndpointTracker) loadBalancer.endpointTrackerMap.values().toArray()[0];
assertThat(addressTracker).isNotNull();
OutlierDetectionSubchannel trackedSubchannel
= (OutlierDetectionSubchannel) addressTracker.getSubchannels().toArray()[0];
@ -523,7 +525,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
}
/**
@ -548,7 +550,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
// New config sets enforcement percentage to 0.
config = new OutlierDetectionLoadBalancerConfig.Builder()
@ -568,7 +570,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// Since we brought enforcement percentage to 0, no additional ejection should have happened.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
}
/**
@ -593,7 +595,7 @@ public class OutlierDetectionLoadBalancerTest {
fakeClock.forwardTime(config.intervalNanos + 1, TimeUnit.NANOSECONDS);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
// Now we produce more load, but the subchannel start working and is no longer an outlier.
generateLoad(ImmutableMap.of(), 12);
@ -711,8 +713,8 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0),
servers.get(1).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.of(servers.get(0).getAddresses().get(0)),
ImmutableSet.of(servers.get(1).getAddresses().get(0))));
}
/**
@ -743,8 +745,8 @@ public class OutlierDetectionLoadBalancerTest {
int totalEjected = 0;
for (EquivalentAddressGroup addressGroup: servers) {
totalEjected +=
loadBalancer.trackerMap.get(addressGroup.getAddresses().get(0)).subchannelsEjected() ? 1
: 0;
loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(addressGroup.getAddresses().get(0))).subchannelsEjected() ? 1 : 0;
}
assertThat(totalEjected).isEqualTo(2);
@ -797,7 +799,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
}
/**
@ -915,9 +917,9 @@ public class OutlierDetectionLoadBalancerTest {
// Should see thee ejected, success rate cathes the first two, error percentage the
// same two plus the subchannel with the single failure.
assertEjectedSubchannels(ImmutableSet.of(
servers.get(0).getAddresses().get(0),
servers.get(1).getAddresses().get(0),
servers.get(2).getAddresses().get(0)));
ImmutableSet.of(servers.get(0).getAddresses().get(0)),
ImmutableSet.of(servers.get(1).getAddresses().get(0)),
ImmutableSet.of(servers.get(2).getAddresses().get(0))));
}
/**
@ -942,14 +944,15 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
EquivalentAddressGroup oldAddressGroup = servers.get(0);
AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(
oldAddressGroup.getAddresses().get(0));
EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(oldAddressGroup.getAddresses().get(0)));
EquivalentAddressGroup newAddressGroup = servers.get(1);
AddressTracker newAddressTracker = loadBalancer.trackerMap.get(
newAddressGroup.getAddresses().get(0));
EndpointTracker newAddressTracker = loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(newAddressGroup.getAddresses().get(0)));
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(
ImmutableSet.of(oldAddressGroup.getAddresses().get(0))));
// The ejected subchannel gets updated with another address in the map that is not ejected
OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels()
@ -967,6 +970,45 @@ public class OutlierDetectionLoadBalancerTest {
assertThat(subchannel.isEjected()).isFalse();
}
@Test
public void multipleAddressesEndpoint() {
OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder()
.setMaxEjectionPercent(50)
.setFailurePercentageEjection(
new FailurePercentageEjection.Builder()
.setMinimumHosts(3)
.setRequestVolume(10).build())
.setChildPolicy(new PolicySelection(fakeLbProvider, null)).build();
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers));
EquivalentAddressGroup manyAddEndpoint = new EquivalentAddressGroup(Arrays.asList(
servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0)));
List<EquivalentAddressGroup> manyAddEndpointServer = ImmutableList.of(manyAddEndpoint);
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, manyAddEndpointServer));
assertThat(loadBalancer.endpointTrackerMap.size()).isEqualTo(1);
assertThat(loadBalancer.addressMap.size()).isEqualTo(2);
manyAddEndpoint = new EquivalentAddressGroup(Arrays.asList(
servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0)));
EquivalentAddressGroup manyAddEndpoint2 = new EquivalentAddressGroup(Arrays.asList(
servers.get(2).getAddresses().get(0), servers.get(3).getAddresses().get(0)));
EquivalentAddressGroup singleAddressEndpoint = new EquivalentAddressGroup(Arrays.asList(
servers.get(4).getAddresses().get(0)));
manyAddEndpointServer = ImmutableList.of(
manyAddEndpoint, manyAddEndpoint2, singleAddressEndpoint);
loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, manyAddEndpointServer));
assertThat(loadBalancer.endpointTrackerMap.size()).isEqualTo(3);
assertThat(loadBalancer.addressMap.size()).isEqualTo(5);
generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED,
subchannel2, Status.DEADLINE_EXCEEDED), 13);
forwardTime(config);
// eject the first endpoint: (address0, address1)
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.of(
servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0))));
}
/**
* If a single address gets replaced by multiple, the subchannel becomes uneligible for outlier
* detection.
@ -989,8 +1031,8 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
EquivalentAddressGroup oldAddressGroup = servers.get(0);
AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(
oldAddressGroup.getAddresses().get(0));
EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(oldAddressGroup.getAddresses().get(0)));
EquivalentAddressGroup newAddress1 = servers.get(1);
EquivalentAddressGroup newAddress2 = servers.get(2);
@ -1033,15 +1075,16 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
EquivalentAddressGroup oldAddressGroup = servers.get(0);
AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(
oldAddressGroup.getAddresses().get(0));
EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(oldAddressGroup.getAddresses().get(0)));
EquivalentAddressGroup newAddressGroup1 = servers.get(1);
AddressTracker newAddressTracker1 = loadBalancer.trackerMap.get(
newAddressGroup1.getAddresses().get(0));
EndpointTracker newAddressTracker1 = loadBalancer.endpointTrackerMap.get(
ImmutableSet.of(newAddressGroup1.getAddresses().get(0)));
EquivalentAddressGroup newAddressGroup2 = servers.get(2);
// The old subchannel was returning errors and should be ejected.
assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(
ImmutableSet.of(oldAddressGroup.getAddresses().get(0))));
OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels()
.iterator().next();
@ -1122,7 +1165,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
for (SubchannelStateListener healthListener : healthListeners.values()) {
verifyNoInteractions(healthListener);
}
@ -1151,7 +1194,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
if (hasHealthConsumer) {
verify(healthListeners.get(servers.get(0))).onSubchannelState(eq(
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)
@ -1183,7 +1226,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
for (SubchannelStateListener healthListener : healthListeners.values()) {
verifyNoInteractions(healthListener);
}
@ -1212,7 +1255,7 @@ public class OutlierDetectionLoadBalancerTest {
forwardTime(config);
// The one subchannel that was returning errors should be ejected.
assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0)));
assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses())));
if (hasHealthConsumer) {
verify(healthListeners.get(servers.get(0))).onSubchannelState(eq(
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)
@ -1305,8 +1348,9 @@ public class OutlierDetectionLoadBalancerTest {
}
// Asserts that the given addresses are ejected and the rest are not.
void assertEjectedSubchannels(Set<SocketAddress> addresses) {
for (Entry<SocketAddress, AddressTracker> entry : loadBalancer.trackerMap.entrySet()) {
void assertEjectedSubchannels(Collection<Set<SocketAddress>> addresses) {
for (Entry<Set<SocketAddress>, EndpointTracker> entry :
loadBalancer.endpointTrackerMap.entrySet()) {
assertWithMessage("not ejected: " + entry.getKey())
.that(entry.getValue().subchannelsEjected())
.isEqualTo(addresses.contains(entry.getKey()));
@ -1328,17 +1372,21 @@ public class OutlierDetectionLoadBalancerTest {
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
subchannelList = new ArrayList<>();
for (EquivalentAddressGroup eag: resolvedAddresses.getAddresses()) {
CreateSubchannelArgs.Builder args = CreateSubchannelArgs.newBuilder().setAddresses(eag);
for (SocketAddress address : eag.getAddresses()) {
EquivalentAddressGroup constructedEag = new EquivalentAddressGroup(address);
CreateSubchannelArgs.Builder args = CreateSubchannelArgs.newBuilder()
.setAddresses(constructedEag);
if (hasHealthConsumer) {
assertThat(healthListeners.get(eag)).isNotNull();
assertThat(healthListeners.get(constructedEag)).isNotNull();
args.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY,
healthListeners.get(eag));
healthListeners.get(constructedEag));
}
Subchannel subchannel = helper.createSubchannel(args.build());
subchannelList.add(subchannel);
subchannel.start(mock(SubchannelStateListener.class));
deliverSubchannelState(READY);
}
}
return Status.OK;
}