diff --git a/util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java b/util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java index 193c0c0fcc..c24e238646 100644 --- a/util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java @@ -60,12 +60,25 @@ import javax.annotation.Nullable; * *

This implements the outlier detection gRFC: * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md + * + *

The implementation maintains two maps. Each endpoint status is tracked using an + * EndpointTracker. E.g. for two endpoints with these address list and their tracker: + * Endpoint e1 : [a1, a2] is tracked with EndpointTracker t1 + * Endpoint e2 : [a3] is tracked with EndpointTracker t2 + * The two maps are: + * First, addressMap maps from socket address -> endpoint tracker : [a1 -> t1, a2 -> t1, a3 -> t2] + * EndpointTracker has reference to all the subchannels of the corresponding endpoint. + * Second, trackerMap maps from unordered address set -> endpoint tracker. + * Updated upon address updates. */ @Internal public final class OutlierDetectionLoadBalancer extends LoadBalancer { @VisibleForTesting - final AddressTrackerMap trackerMap; + final EndpointTrackerMap endpointTrackerMap; + + @VisibleForTesting + final Map addressMap = new HashMap<>(); private final SynchronizationContext syncContext; private final Helper childHelper; @@ -77,8 +90,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { private final ChannelLogger logger; - private static final Attributes.Key ADDRESS_TRACKER_ATTR_KEY - = Attributes.Key.create("addressTrackerKey"); + private static final Attributes.Key ENDPOINT_TRACKER_KEY + = Attributes.Key.create("endpointTrackerKey"); /** * Creates a new instance of {@link OutlierDetectionLoadBalancer}. @@ -87,7 +100,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { logger = helper.getChannelLogger(); childHelper = new ChildHelper(checkNotNull(helper, "helper")); switchLb = new GracefulSwitchLoadBalancer(childHelper); - trackerMap = new AddressTrackerMap(); + endpointTrackerMap = new EndpointTrackerMap(); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); this.timeProvider = timeProvider; @@ -100,17 +113,32 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { OutlierDetectionLoadBalancerConfig config = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - // The map should only retain entries for addresses in this latest update. - ArrayList addresses = new ArrayList<>(); + // The map should only retain entries for endpoints in this latest update. + Set> endpoints = new HashSet<>(); + Map> addressEndpointMap = new HashMap<>(); for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) { - addresses.addAll(addressGroup.getAddresses()); + Set endpoint = ImmutableSet.copyOf(addressGroup.getAddresses()); + endpoints.add(endpoint); + for (SocketAddress address : addressGroup.getAddresses()) { + if (addressEndpointMap.containsKey(address)) { + logger.log(ChannelLogLevel.WARNING, + "Unexpected duplicated address {0} belongs to multiple endpoints", address); + } + addressEndpointMap.put(address, endpoint); + } } - trackerMap.keySet().retainAll(addresses); + endpointTrackerMap.keySet().retainAll(endpoints); - trackerMap.updateTrackerConfigs(config); + endpointTrackerMap.updateTrackerConfigs(config); // Add any new ones. - trackerMap.putNewTrackers(config, addresses); + endpointTrackerMap.putNewTrackers(config, endpoints); + + // Update address -> tracker map. + addressMap.clear(); + for (Map.Entry> e : addressEndpointMap.entrySet()) { + addressMap.put(e.getKey(), endpointTrackerMap.get(e.getValue())); + } switchLb.switchTo(config.childPolicy.getProvider()); @@ -133,7 +161,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // for a fresh start. if (detectionTimerHandle != null) { detectionTimerHandle.cancel(); - trackerMap.resetCallCounters(); + endpointTrackerMap.resetCallCounters(); } detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config, logger), @@ -143,7 +171,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // uneject any addresses we may have ejected. detectionTimerHandle.cancel(); detectionTimerStartNanos = null; - trackerMap.cancelTracking(); + endpointTrackerMap.cancelTracking(); } switchLb.handleResolvedAddresses( @@ -180,13 +208,13 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { public void run() { detectionTimerStartNanos = timeProvider.currentTimeNanos(); - trackerMap.swapCounters(); + endpointTrackerMap.swapCounters(); for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config, logger)) { - algo.ejectOutliers(trackerMap, detectionTimerStartNanos); + algo.ejectOutliers(endpointTrackerMap, detectionTimerStartNanos); } - trackerMap.maybeUnejectOutliers(detectionTimerStartNanos); + endpointTrackerMap.maybeUnejectOutliers(detectionTimerStartNanos); } } @@ -217,8 +245,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // the subchannel will be added to the map and be included in outlier detection. List addressGroups = args.getAddresses(); if (hasSingleAddress(addressGroups) - && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) { - AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0)); + && addressMap.containsKey(addressGroups.get(0).getAddresses().get(0))) { + EndpointTracker tracker = addressMap.get(addressGroups.get(0).getAddresses().get(0)); tracker.addSubchannel(subchannel); // If this address has already been ejected, we need to immediately eject this Subchannel. @@ -239,7 +267,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { class OutlierDetectionSubchannel extends ForwardingSubchannel { private final Subchannel delegate; - private AddressTracker addressTracker; + private EndpointTracker endpointTracker; private boolean ejected; private ConnectivityStateInfo lastSubchannelState; @@ -275,16 +303,16 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { @Override public void shutdown() { - if (addressTracker != null) { - addressTracker.removeSubchannel(this); + if (endpointTracker != null) { + endpointTracker.removeSubchannel(this); } super.shutdown(); } @Override public Attributes getAttributes() { - if (addressTracker != null) { - return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker) + if (endpointTracker != null) { + return delegate.getAttributes().toBuilder().set(ENDPOINT_TRACKER_KEY, endpointTracker) .build(); } else { return delegate.getAttributes(); @@ -300,22 +328,22 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // No change in address plurality, we replace the single one with a new one. if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) { // Remove the current subchannel from the old address it is associated with in the map. - if (trackerMap.containsValue(addressTracker)) { - addressTracker.removeSubchannel(this); + if (endpointTrackerMap.containsValue(endpointTracker)) { + endpointTracker.removeSubchannel(this); } // If the map has an entry for the new address, we associate this subchannel with it. SocketAddress address = addressGroups.get(0).getAddresses().get(0); - if (trackerMap.containsKey(address)) { - trackerMap.get(address).addSubchannel(this); + if (addressMap.containsKey(address)) { + addressMap.get(address).addSubchannel(this); } } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) { // We go from a single address to having multiple, making this subchannel uneligible for // outlier detection. Remove it from all trackers and reset the call counters of all the // associated trackers. // Remove the current subchannel from the old address it is associated with in the map. - if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) { - AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0)); + if (addressMap.containsKey(getAddresses().getAddresses().get(0))) { + EndpointTracker tracker = addressMap.get(getAddresses().getAddresses().get(0)); tracker.removeSubchannel(this); tracker.resetCallCounters(); } @@ -323,8 +351,8 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // We go from, previously uneligble, multiple address mode to a single address. If the map // has an entry for the new address, we associate this subchannel with it. SocketAddress address = addressGroups.get(0).getAddresses().get(0); - if (trackerMap.containsKey(address)) { - AddressTracker tracker = trackerMap.get(address); + if (addressMap.containsKey(address)) { + EndpointTracker tracker = addressMap.get(address); tracker.addSubchannel(this); } } @@ -337,14 +365,14 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { /** * If the {@link Subchannel} is considered for outlier detection the associated {@link - * AddressTracker} should be set. + * EndpointTracker} should be set. */ - void setAddressTracker(AddressTracker addressTracker) { - this.addressTracker = addressTracker; + void setEndpointTracker(EndpointTracker endpointTracker) { + this.endpointTracker = endpointTracker; } - void clearAddressTracker() { - this.addressTracker = null; + void clearEndpointTracker() { + this.endpointTracker = null; } void eject() { @@ -419,7 +447,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { Subchannel subchannel = pickResult.getSubchannel(); if (subchannel != null) { return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory( - subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY), + subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY), pickResult.getStreamTracerFactory())); } @@ -432,12 +460,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { */ class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory { - private final AddressTracker tracker; + private final EndpointTracker tracker; @Nullable private final ClientStreamTracer.Factory delegateFactory; - ResultCountingClientStreamTracerFactory(AddressTracker tracker, + ResultCountingClientStreamTracerFactory(EndpointTracker tracker, @Nullable ClientStreamTracer.Factory delegateFactory) { this.tracker = tracker; this.delegateFactory = delegateFactory; @@ -472,10 +500,9 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { } /** - * Tracks additional information about a set of equivalent addresses needed for outlier - * detection. + * Tracks additional information about the endpoint needed for outlier detection. */ - static class AddressTracker { + static class EndpointTracker { private OutlierDetectionLoadBalancerConfig config; // Marked as volatile to assure that when the inactive counter is swapped in as the new active @@ -486,7 +513,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { private int ejectionTimeMultiplier; private final Set subchannels = new HashSet<>(); - AddressTracker(OutlierDetectionLoadBalancerConfig config) { + EndpointTracker(OutlierDetectionLoadBalancerConfig config) { this.config = config; } @@ -506,12 +533,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { } else if (!subchannelsEjected() && subchannel.isEjected()) { subchannel.uneject(); } - subchannel.setAddressTracker(this); + subchannel.setEndpointTracker(this); return subchannels.add(subchannel); } boolean removeSubchannel(OutlierDetectionSubchannel subchannel) { - subchannel.clearAddressTracker(); + subchannel.clearEndpointTracker(); return subchannels.remove(subchannel); } @@ -631,46 +658,42 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { @Override public String toString() { - return "AddressTracker{" + return "EndpointTracker{" + "subchannels=" + subchannels + '}'; } } /** - * Maintains a mapping from addresses to their trackers. + * Maintains a mapping from endpoint (a set of addresses) to their trackers. */ - static class AddressTrackerMap extends ForwardingMap { - private final Map trackerMap; + static class EndpointTrackerMap extends ForwardingMap, EndpointTracker> { + private final Map, EndpointTracker> trackerMap; - AddressTrackerMap() { + EndpointTrackerMap() { trackerMap = new HashMap<>(); } @Override - protected Map delegate() { + protected Map, EndpointTracker> delegate() { return trackerMap; } void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) { - for (AddressTracker tracker: trackerMap.values()) { + for (EndpointTracker tracker: trackerMap.values()) { tracker.setConfig(config); } } /** Adds a new tracker for every given address. */ void putNewTrackers(OutlierDetectionLoadBalancerConfig config, - Collection addresses) { - for (SocketAddress address : addresses) { - if (!trackerMap.containsKey(address)) { - trackerMap.put(address, new AddressTracker(config)); - } - } + Set> endpoints) { + endpoints.forEach(e -> trackerMap.putIfAbsent(e, new EndpointTracker(config))); } /** Resets the call counters for all the trackers in the map. */ void resetCallCounters() { - for (AddressTracker tracker : trackerMap.values()) { + for (EndpointTracker tracker : trackerMap.values()) { tracker.resetCallCounters(); } } @@ -680,7 +703,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { * to reset the ejection time multiplier. */ void cancelTracking() { - for (AddressTracker tracker : trackerMap.values()) { + for (EndpointTracker tracker : trackerMap.values()) { if (tracker.subchannelsEjected()) { tracker.unejectSubchannels(); } @@ -690,7 +713,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { /** Swaps the active and inactive counters for each tracker. */ void swapCounters() { - for (AddressTracker tracker : trackerMap.values()) { + for (EndpointTracker tracker : trackerMap.values()) { tracker.swapCounters(); } } @@ -701,7 +724,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { * time allowed. */ void maybeUnejectOutliers(Long detectionTimerStartNanos) { - for (AddressTracker tracker : trackerMap.values()) { + for (EndpointTracker tracker : trackerMap.values()) { if (!tracker.subchannelsEjected()) { tracker.decrementEjectionTimeMultiplier(); } @@ -720,15 +743,15 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { if (trackerMap.isEmpty()) { return 0; } - int totalAddresses = 0; - int ejectedAddresses = 0; - for (AddressTracker tracker : trackerMap.values()) { - totalAddresses++; + int totalEndpoints = 0; + int ejectedEndpoints = 0; + for (EndpointTracker tracker : trackerMap.values()) { + totalEndpoints++; if (tracker.subchannelsEjected()) { - ejectedAddresses++; + ejectedEndpoints++; } } - return ((double)ejectedAddresses / totalAddresses) * 100; + return ((double)ejectedEndpoints / totalEndpoints) * 100; } } @@ -739,7 +762,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { interface OutlierEjectionAlgorithm { /** Eject any outlier addresses. */ - void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos); + void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos); /** Builds a list of algorithms that are enabled in the given config. */ @Nullable @@ -775,12 +798,12 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { } @Override - public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { + public void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos) { // Only consider addresses that have the minimum request volume specified in the config. - List trackersWithVolume = trackersWithVolume(trackerMap, + List trackersWithVolume = trackersWithVolume(trackerMap, config.successRateEjection.requestVolume); - // If we don't have enough addresses with significant volume then there's nothing to do. + // If we don't have enough endpoints with significant volume then there's nothing to do. if (trackersWithVolume.size() < config.successRateEjection.minimumHosts || trackersWithVolume.size() == 0) { return; @@ -788,7 +811,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { // Calculate mean and standard deviation of the fractions of successful calls. List successRates = new ArrayList<>(); - for (AddressTracker tracker : trackersWithVolume) { + for (EndpointTracker tracker : trackersWithVolume) { successRates.add(tracker.successRate()); } double mean = mean(successRates); @@ -797,7 +820,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { double requiredSuccessRate = mean - stdev * (config.successRateEjection.stdevFactor / 1000f); - for (AddressTracker tracker : trackersWithVolume) { + for (EndpointTracker tracker : trackersWithVolume) { // If we are above or equal to the max ejection percentage, don't eject any more. This will // allow the total ejections to go one above the max, but at the same time it assures at // least one ejection, which the spec calls for. This behavior matches what Envoy proxy @@ -813,7 +836,7 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { + "Parameters: successRate={1}, mean={2}, stdev={3}, " + "requiredSuccessRate={4}", tracker, tracker.successRate(), mean, stdev, requiredSuccessRate); - // Only eject some addresses based on the enforcement percentage. + // Only eject some endpoints based on the enforcement percentage. if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) { tracker.ejectSubchannels(ejectionTimeNanos); } @@ -859,19 +882,19 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { } @Override - public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { + public void ejectOutliers(EndpointTrackerMap trackerMap, long ejectionTimeNanos) { - // Only consider addresses that have the minimum request volume specified in the config. - List trackersWithVolume = trackersWithVolume(trackerMap, + // Only consider endpoints that have the minimum request volume specified in the config. + List trackersWithVolume = trackersWithVolume(trackerMap, config.failurePercentageEjection.requestVolume); - // If we don't have enough addresses with significant volume then there's nothing to do. + // If we don't have enough endpoints with significant volume then there's nothing to do. if (trackersWithVolume.size() < config.failurePercentageEjection.minimumHosts || trackersWithVolume.size() == 0) { return; } - // If this address does not have enough volume to be considered, skip to the next one. - for (AddressTracker tracker : trackersWithVolume) { + // If this endpoint does not have enough volume to be considered, skip to the next one. + for (EndpointTracker tracker : trackersWithVolume) { // If we are above or equal to the max ejection percentage, don't eject any more. This will // allow the total ejections to go one above the max, but at the same time it assures at // least one ejection, which the spec calls for. This behavior matches what Envoy proxy @@ -900,10 +923,10 @@ public final class OutlierDetectionLoadBalancer extends LoadBalancer { } /** Returns only the trackers that have the minimum configured volume to be considered. */ - private static List trackersWithVolume(AddressTrackerMap trackerMap, - int volume) { - List trackersWithVolume = new ArrayList<>(); - for (AddressTracker tracker : trackerMap.values()) { + private static List trackersWithVolume(EndpointTrackerMap trackerMap, + int volume) { + List trackersWithVolume = new ArrayList<>(); + for (EndpointTracker tracker : trackerMap.values()) { if (tracker.inactiveVolume() >= volume) { trackersWithVolume.add(tracker); } diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index be71e476b7..87bd50a58b 100644 --- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -55,7 +55,7 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; -import io.grpc.util.OutlierDetectionLoadBalancer.AddressTracker; +import io.grpc.util.OutlierDetectionLoadBalancer.EndpointTracker; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.FailurePercentageEjection; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.SuccessRateEjection; @@ -64,6 +64,7 @@ import io.grpc.util.OutlierDetectionLoadBalancer.SuccessRateOutlierEjectionAlgor import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -302,8 +303,9 @@ public class OutlierDetectionLoadBalancerTest { loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers.get(0))); - assertThat(loadBalancer.trackerMap).hasSize(1); - AddressTracker addressTracker = (AddressTracker) loadBalancer.trackerMap.values().toArray()[0]; + assertThat(loadBalancer.endpointTrackerMap).hasSize(1); + EndpointTracker addressTracker = + (EndpointTracker) loadBalancer.endpointTrackerMap.values().toArray()[0]; assertThat(addressTracker).isNotNull(); OutlierDetectionSubchannel trackedSubchannel = (OutlierDetectionSubchannel) addressTracker.getSubchannels().toArray()[0]; @@ -523,7 +525,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); } /** @@ -548,7 +550,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); // New config sets enforcement percentage to 0. config = new OutlierDetectionLoadBalancerConfig.Builder() @@ -568,7 +570,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // Since we brought enforcement percentage to 0, no additional ejection should have happened. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); } /** @@ -593,7 +595,7 @@ public class OutlierDetectionLoadBalancerTest { fakeClock.forwardTime(config.intervalNanos + 1, TimeUnit.NANOSECONDS); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); // Now we produce more load, but the subchannel start working and is no longer an outlier. generateLoad(ImmutableMap.of(), 12); @@ -711,8 +713,8 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0), - servers.get(1).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.of(servers.get(0).getAddresses().get(0)), + ImmutableSet.of(servers.get(1).getAddresses().get(0)))); } /** @@ -743,8 +745,8 @@ public class OutlierDetectionLoadBalancerTest { int totalEjected = 0; for (EquivalentAddressGroup addressGroup: servers) { totalEjected += - loadBalancer.trackerMap.get(addressGroup.getAddresses().get(0)).subchannelsEjected() ? 1 - : 0; + loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(addressGroup.getAddresses().get(0))).subchannelsEjected() ? 1 : 0; } assertThat(totalEjected).isEqualTo(2); @@ -797,7 +799,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); } /** @@ -915,9 +917,9 @@ public class OutlierDetectionLoadBalancerTest { // Should see thee ejected, success rate cathes the first two, error percentage the // same two plus the subchannel with the single failure. assertEjectedSubchannels(ImmutableSet.of( - servers.get(0).getAddresses().get(0), - servers.get(1).getAddresses().get(0), - servers.get(2).getAddresses().get(0))); + ImmutableSet.of(servers.get(0).getAddresses().get(0)), + ImmutableSet.of(servers.get(1).getAddresses().get(0)), + ImmutableSet.of(servers.get(2).getAddresses().get(0)))); } /** @@ -942,14 +944,15 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); EquivalentAddressGroup oldAddressGroup = servers.get(0); - AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( - oldAddressGroup.getAddresses().get(0)); + EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); EquivalentAddressGroup newAddressGroup = servers.get(1); - AddressTracker newAddressTracker = loadBalancer.trackerMap.get( - newAddressGroup.getAddresses().get(0)); + EndpointTracker newAddressTracker = loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(newAddressGroup.getAddresses().get(0))); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of( + ImmutableSet.of(oldAddressGroup.getAddresses().get(0)))); // The ejected subchannel gets updated with another address in the map that is not ejected OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() @@ -967,6 +970,45 @@ public class OutlierDetectionLoadBalancerTest { assertThat(subchannel.isEjected()).isFalse(); } + @Test + public void multipleAddressesEndpoint() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(fakeLbProvider, null)).build(); + + loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers)); + EquivalentAddressGroup manyAddEndpoint = new EquivalentAddressGroup(Arrays.asList( + servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0))); + List manyAddEndpointServer = ImmutableList.of(manyAddEndpoint); + loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, manyAddEndpointServer)); + assertThat(loadBalancer.endpointTrackerMap.size()).isEqualTo(1); + assertThat(loadBalancer.addressMap.size()).isEqualTo(2); + + manyAddEndpoint = new EquivalentAddressGroup(Arrays.asList( + servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0))); + EquivalentAddressGroup manyAddEndpoint2 = new EquivalentAddressGroup(Arrays.asList( + servers.get(2).getAddresses().get(0), servers.get(3).getAddresses().get(0))); + EquivalentAddressGroup singleAddressEndpoint = new EquivalentAddressGroup(Arrays.asList( + servers.get(4).getAddresses().get(0))); + manyAddEndpointServer = ImmutableList.of( + manyAddEndpoint, manyAddEndpoint2, singleAddressEndpoint); + loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, manyAddEndpointServer)); + assertThat(loadBalancer.endpointTrackerMap.size()).isEqualTo(3); + assertThat(loadBalancer.addressMap.size()).isEqualTo(5); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED, + subchannel2, Status.DEADLINE_EXCEEDED), 13); + forwardTime(config); + + // eject the first endpoint: (address0, address1) + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.of( + servers.get(0).getAddresses().get(0), servers.get(1).getAddresses().get(0)))); + } + /** * If a single address gets replaced by multiple, the subchannel becomes uneligible for outlier * detection. @@ -989,8 +1031,8 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); EquivalentAddressGroup oldAddressGroup = servers.get(0); - AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( - oldAddressGroup.getAddresses().get(0)); + EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); EquivalentAddressGroup newAddress1 = servers.get(1); EquivalentAddressGroup newAddress2 = servers.get(2); @@ -1033,15 +1075,16 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); EquivalentAddressGroup oldAddressGroup = servers.get(0); - AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( - oldAddressGroup.getAddresses().get(0)); + EndpointTracker oldAddressTracker = loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); EquivalentAddressGroup newAddressGroup1 = servers.get(1); - AddressTracker newAddressTracker1 = loadBalancer.trackerMap.get( - newAddressGroup1.getAddresses().get(0)); + EndpointTracker newAddressTracker1 = loadBalancer.endpointTrackerMap.get( + ImmutableSet.of(newAddressGroup1.getAddresses().get(0))); EquivalentAddressGroup newAddressGroup2 = servers.get(2); // The old subchannel was returning errors and should be ejected. - assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of( + ImmutableSet.of(oldAddressGroup.getAddresses().get(0)))); OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() .iterator().next(); @@ -1122,7 +1165,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); for (SubchannelStateListener healthListener : healthListeners.values()) { verifyNoInteractions(healthListener); } @@ -1151,7 +1194,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); if (hasHealthConsumer) { verify(healthListeners.get(servers.get(0))).onSubchannelState(eq( ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE) @@ -1183,7 +1226,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); for (SubchannelStateListener healthListener : healthListeners.values()) { verifyNoInteractions(healthListener); } @@ -1212,7 +1255,7 @@ public class OutlierDetectionLoadBalancerTest { forwardTime(config); // The one subchannel that was returning errors should be ejected. - assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); if (hasHealthConsumer) { verify(healthListeners.get(servers.get(0))).onSubchannelState(eq( ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE) @@ -1305,8 +1348,9 @@ public class OutlierDetectionLoadBalancerTest { } // Asserts that the given addresses are ejected and the rest are not. - void assertEjectedSubchannels(Set addresses) { - for (Entry entry : loadBalancer.trackerMap.entrySet()) { + void assertEjectedSubchannels(Collection> addresses) { + for (Entry, EndpointTracker> entry : + loadBalancer.endpointTrackerMap.entrySet()) { assertWithMessage("not ejected: " + entry.getKey()) .that(entry.getValue().subchannelsEjected()) .isEqualTo(addresses.contains(entry.getKey())); @@ -1328,16 +1372,20 @@ public class OutlierDetectionLoadBalancerTest { public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { subchannelList = new ArrayList<>(); for (EquivalentAddressGroup eag: resolvedAddresses.getAddresses()) { - CreateSubchannelArgs.Builder args = CreateSubchannelArgs.newBuilder().setAddresses(eag); - if (hasHealthConsumer) { - assertThat(healthListeners.get(eag)).isNotNull(); - args.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, - healthListeners.get(eag)); + for (SocketAddress address : eag.getAddresses()) { + EquivalentAddressGroup constructedEag = new EquivalentAddressGroup(address); + CreateSubchannelArgs.Builder args = CreateSubchannelArgs.newBuilder() + .setAddresses(constructedEag); + if (hasHealthConsumer) { + assertThat(healthListeners.get(constructedEag)).isNotNull(); + args.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, + healthListeners.get(constructedEag)); + } + Subchannel subchannel = helper.createSubchannel(args.build()); + subchannelList.add(subchannel); + subchannel.start(mock(SubchannelStateListener.class)); + deliverSubchannelState(READY); } - Subchannel subchannel = helper.createSubchannel(args.build()); - subchannelList.add(subchannel); - subchannel.start(mock(SubchannelStateListener.class)); - deliverSubchannelState(READY); } return Status.OK; }