From a0d8f2eb31c88cf8e9e8d2f213a8cc8ccfb814b0 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Tue, 8 Aug 2023 21:36:27 +0000 Subject: [PATCH] Create a shared LB base class for LBs with multiple children (#10448) * Create a shared LB base class for LBs with multiple children and change ClusgterManagerLoadBalancer to use it. --- api/src/main/java/io/grpc/LoadBalancer.java | 35 +++ .../io/grpc/util/MultiChildLoadBalancer.java | 264 ++++++++++++++++++ .../java/io/grpc/xds/CdsLoadBalancer2.java | 1 - .../io/grpc/xds/ClusterImplLoadBalancer.java | 4 +- .../grpc/xds/ClusterManagerLoadBalancer.java | 247 ++-------------- .../grpc/xds/ClusterResolverLoadBalancer.java | 1 - .../io/grpc/xds/PriorityLoadBalancer.java | 6 +- .../io/grpc/xds/RingHashLoadBalancer.java | 1 - .../grpc/xds/WeightedTargetLoadBalancer.java | 6 +- .../io/grpc/xds/WrrLocalityLoadBalancer.java | 1 - .../io/grpc/xds/XdsSubchannelPickers.java | 63 ----- .../xds/ClusterManagerLoadBalancerTest.java | 39 ++- .../io/grpc/xds/PriorityLoadBalancerTest.java | 26 +- .../xds/WeightedTargetLoadBalancerTest.java | 16 +- .../grpc/xds/WrrLocalityLoadBalancerTest.java | 2 +- 15 files changed, 373 insertions(+), 339 deletions(-) create mode 100644 util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java delete mode 100644 xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 5617d27986..d7e3fbb917 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -115,6 +115,19 @@ public abstract class LoadBalancer { @NameResolver.ResolutionResultAttr public static final Attributes.Key> ATTR_HEALTH_CHECKING_CONFIG = Attributes.Key.create("internal:health-checking-config"); + + public static final SubchannelPicker EMPTY_PICKER = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult(); + } + + @Override + public String toString() { + return "EMPTY_PICKER"; + } + }; + private int recursionCount; /** @@ -1398,4 +1411,26 @@ public abstract class LoadBalancer { */ public abstract LoadBalancer newLoadBalancer(Helper helper); } + + public static final class ErrorPicker extends SubchannelPicker { + + private final Status error; + + public ErrorPicker(Status error) { + this.error = checkNotNull(error, "error"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withError(error); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("error", error) + .toString(); + } + } + } diff --git a/util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java b/util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java new file mode 100644 index 0000000000..3671505a34 --- /dev/null +++ b/util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java @@ -0,0 +1,264 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ConnectivityState; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * A base load balancing policy for those policies which has multiple children such as + * ClusterManager or the petiole policies. + * + * @since 1.58 + */ +public abstract class MultiChildLoadBalancer extends LoadBalancer { + + @VisibleForTesting + public static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15; + private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName()); + private final Map childLbStates = new HashMap<>(); + private final Helper helper; + protected final SynchronizationContext syncContext; + private final ScheduledExecutorService timeService; + // Set to true if currently in the process of handling resolved addresses. + private boolean resolvingAddresses; + + protected MultiChildLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); + this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); + logger.log(Level.FINE, "Created"); + } + + protected SubchannelPicker getInitialPicker() { + return EMPTY_PICKER; + } + + protected SubchannelPicker getErrorPicker(Status error) { + return new ErrorPicker(error); + } + + protected abstract Map getPolicySelectionMap( + ResolvedAddresses resolvedAddresses); + + protected abstract SubchannelPicker getSubchannelPicker( + Map childPickers); + + @Override + public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + try { + resolvingAddresses = true; + return acceptResolvedAddressesInternal(resolvedAddresses); + } finally { + resolvingAddresses = false; + } + } + + private boolean acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) { + logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses); + Map newChildPolicies = getPolicySelectionMap(resolvedAddresses); + for (Map.Entry entry : newChildPolicies.entrySet()) { + final Object key = entry.getKey(); + LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider(); + Object childConfig = entry.getValue().getConfig(); + if (!childLbStates.containsKey(key)) { + childLbStates.put(key, new ChildLbState(key, childPolicyProvider, getInitialPicker())); + } else { + childLbStates.get(key).reactivate(childPolicyProvider); + } + LoadBalancer childLb = childLbStates.get(key).lb; + ResolvedAddresses childAddresses = + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build(); + childLb.handleResolvedAddresses(childAddresses); + } + for (Object key : childLbStates.keySet()) { + if (!newChildPolicies.containsKey(key)) { + childLbStates.get(key).deactivate(); + } + } + // Must update channel picker before return so that new RPCs will not be routed to deleted + // clusters and resolver can remove them in service config. + updateOverallBalancingState(); + return true; + } + + @Override + public void handleNameResolutionError(Status error) { + logger.log(Level.WARNING, "Received name resolution error: {0}", error); + boolean gotoTransientFailure = true; + for (ChildLbState state : childLbStates.values()) { + if (!state.deactivated) { + gotoTransientFailure = false; + state.lb.handleNameResolutionError(error); + } + } + if (gotoTransientFailure) { + helper.updateBalancingState(TRANSIENT_FAILURE, getErrorPicker(error)); + } + } + + @Override + public void shutdown() { + logger.log(Level.INFO, "Shutdown"); + for (ChildLbState state : childLbStates.values()) { + state.shutdown(); + } + childLbStates.clear(); + } + + private void updateOverallBalancingState() { + ConnectivityState overallState = null; + final Map childPickers = new HashMap<>(); + for (ChildLbState childLbState : childLbStates.values()) { + if (childLbState.deactivated) { + continue; + } + childPickers.put(childLbState.key, childLbState.currentPicker); + overallState = aggregateState(overallState, childLbState.currentState); + } + if (overallState != null) { + helper.updateBalancingState(overallState, getSubchannelPicker(childPickers)); + } + } + + @Nullable + private static ConnectivityState aggregateState( + @Nullable ConnectivityState overallState, ConnectivityState childState) { + if (overallState == null) { + return childState; + } + if (overallState == READY || childState == READY) { + return READY; + } + if (overallState == CONNECTING || childState == CONNECTING) { + return CONNECTING; + } + if (overallState == IDLE || childState == IDLE) { + return IDLE; + } + return overallState; + } + + private final class ChildLbState { + private final Object key; + private final GracefulSwitchLoadBalancer lb; + private LoadBalancerProvider policyProvider; + private ConnectivityState currentState = CONNECTING; + private SubchannelPicker currentPicker; + private boolean deactivated; + @Nullable + ScheduledHandle deletionTimer; + + ChildLbState(Object key, LoadBalancerProvider policyProvider, SubchannelPicker initialPicker) { + this.key = key; + this.policyProvider = policyProvider; + lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper()); + lb.switchTo(policyProvider); + currentPicker = initialPicker; + } + + void deactivate() { + if (deactivated) { + return; + } + + class DeletionTask implements Runnable { + @Override + public void run() { + shutdown(); + childLbStates.remove(key); + } + } + + deletionTimer = + syncContext.schedule( + new DeletionTask(), + DELAYED_CHILD_DELETION_TIME_MINUTES, + TimeUnit.MINUTES, + timeService); + deactivated = true; + logger.log(Level.FINE, "Child balancer {0} deactivated", key); + } + + void reactivate(LoadBalancerProvider policyProvider) { + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + deactivated = false; + logger.log(Level.FINE, "Child balancer {0} reactivated", key); + } + if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) { + Object[] objects = { + key, this.policyProvider.getPolicyName(),policyProvider.getPolicyName()}; + logger.log(Level.FINE, "Child balancer {0} switching policy from {1} to {2}", objects); + lb.switchTo(policyProvider); + this.policyProvider = policyProvider; + } + } + + void shutdown() { + if (deletionTimer != null && deletionTimer.isPending()) { + deletionTimer.cancel(); + } + lb.shutdown(); + logger.log(Level.FINE, "Child balancer {0} deleted", key); + } + + private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper { + + @Override + public void updateBalancingState(final ConnectivityState newState, + final SubchannelPicker newPicker) { + // If we are already in the process of resolving addresses, the overall balancing state + // will be updated at the end of it, and we don't need to trigger that update here. + if (!childLbStates.containsKey(key)) { + return; + } + // Subchannel picker and state are saved, but will only be propagated to the channel + // when the child instance exits deactivated state. + currentState = newState; + currentPicker = newPicker; + if (!deactivated && !resolvingAddresses) { + updateOverallBalancingState(); + } + } + + @Override + protected Helper delegate() { + return helper; + } + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index a640bbd78b..7257fdfd16 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -39,7 +39,6 @@ import io.grpc.xds.XdsClient.ResourceWatcher; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index b2be811d50..95ca1a33e1 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -17,7 +17,6 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -46,7 +45,6 @@ import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats; import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.internal.security.SslContextProviderSupplier; import io.grpc.xds.orca.OrcaPerRequestUtil; import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; @@ -173,7 +171,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer { private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper { private final AtomicLong inFlights; private ConnectivityState currentState = ConnectivityState.IDLE; - private SubchannelPicker currentPicker = BUFFER_PICKER; + private SubchannelPicker currentPicker = LoadBalancer.EMPTY_PICKER; private List dropPolicies = Collections.emptyList(); private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @Nullable diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java index cce32c6824..a448920423 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java @@ -16,266 +16,63 @@ package io.grpc.xds; -import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.CONNECTING; -import static io.grpc.ConnectivityState.IDLE; -import static io.grpc.ConnectivityState.READY; -import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; - -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import io.grpc.ConnectivityState; import io.grpc.InternalLogId; -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancerProvider; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.util.ForwardingLoadBalancerHelper; -import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.util.MultiChildLoadBalancer; import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; /** * The top-level load balancing policy. */ -class ClusterManagerLoadBalancer extends LoadBalancer { +class ClusterManagerLoadBalancer extends MultiChildLoadBalancer { - @VisibleForTesting - static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15; - - private final Map childLbStates = new HashMap<>(); - private final Helper helper; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timeService; private final XdsLogger logger; - // Set to true if currently in the process of handling resolved addresses. - private boolean resolvingAddresses; ClusterManagerLoadBalancer(Helper helper) { - this.helper = checkNotNull(helper, "helper"); - this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); - this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); + super(helper); logger = XdsLogger.withLogId( InternalLogId.allocate("cluster_manager-lb", helper.getAuthority())); logger.log(XdsLogLevel.INFO, "Created"); } @Override - public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { - try { - resolvingAddresses = true; - return acceptResolvedAddressesInternal(resolvedAddresses); - } finally { - resolvingAddresses = false; - } - } - - public boolean acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) { - logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); + protected Map getPolicySelectionMap( + ResolvedAddresses resolvedAddresses) { ClusterManagerConfig config = (ClusterManagerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - Map newChildPolicies = config.childPolicies; + Map newChildPolicies = new HashMap<>(config.childPolicies); logger.log( XdsLogLevel.INFO, "Received cluster_manager lb config: child names={0}", newChildPolicies.keySet()); - for (Map.Entry entry : newChildPolicies.entrySet()) { - final String name = entry.getKey(); - LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider(); - Object childConfig = entry.getValue().getConfig(); - if (!childLbStates.containsKey(name)) { - childLbStates.put(name, new ChildLbState(name, childPolicyProvider)); - } else { - childLbStates.get(name).reactivate(childPolicyProvider); - } - LoadBalancer childLb = childLbStates.get(name).lb; - ResolvedAddresses childAddresses = - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build(); - childLb.handleResolvedAddresses(childAddresses); - } - for (String name : childLbStates.keySet()) { - if (!newChildPolicies.containsKey(name)) { - childLbStates.get(name).deactivate(); - } - } - // Must update channel picker before return so that new RPCs will not be routed to deleted - // clusters and resolver can remove them in service config. - updateOverallBalancingState(); - return true; + return newChildPolicies; } @Override - public void handleNameResolutionError(Status error) { - logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error); - boolean gotoTransientFailure = true; - for (ChildLbState state : childLbStates.values()) { - if (!state.deactivated) { - gotoTransientFailure = false; - state.lb.handleNameResolutionError(error); - } - } - if (gotoTransientFailure) { - helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); - } - } - - @Override - public void shutdown() { - logger.log(XdsLogLevel.INFO, "Shutdown"); - for (ChildLbState state : childLbStates.values()) { - state.shutdown(); - } - childLbStates.clear(); - } - - private void updateOverallBalancingState() { - ConnectivityState overallState = null; - final Map childPickers = new HashMap<>(); - for (ChildLbState childLbState : childLbStates.values()) { - if (childLbState.deactivated) { - continue; - } - childPickers.put(childLbState.name, childLbState.currentPicker); - overallState = aggregateState(overallState, childLbState.currentState); - } - if (overallState != null) { - SubchannelPicker picker = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - String clusterName = - args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY); - SubchannelPicker delegate = childPickers.get(clusterName); - if (delegate == null) { - return - PickResult.withError( - Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find " - + "available subchannel for cluster " + clusterName)); - } - return delegate.pickSubchannel(args); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString(); - } - }; - helper.updateBalancingState(overallState, picker); - } - } - - @Nullable - private static ConnectivityState aggregateState( - @Nullable ConnectivityState overallState, ConnectivityState childState) { - if (overallState == null) { - return childState; - } - if (overallState == READY || childState == READY) { - return READY; - } - if (overallState == CONNECTING || childState == CONNECTING) { - return CONNECTING; - } - if (overallState == IDLE || childState == IDLE) { - return IDLE; - } - return overallState; - } - - private final class ChildLbState { - private final String name; - private final GracefulSwitchLoadBalancer lb; - private LoadBalancerProvider policyProvider; - private ConnectivityState currentState = CONNECTING; - private SubchannelPicker currentPicker = BUFFER_PICKER; - private boolean deactivated; - @Nullable - ScheduledHandle deletionTimer; - - ChildLbState(String name, LoadBalancerProvider policyProvider) { - this.name = name; - this.policyProvider = policyProvider; - lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper()); - lb.switchTo(policyProvider); - } - - void deactivate() { - if (deactivated) { - return; - } - - class DeletionTask implements Runnable { - @Override - public void run() { - shutdown(); - childLbStates.remove(name); - } - } - - deletionTimer = - syncContext.schedule( - new DeletionTask(), - DELAYED_CHILD_DELETION_TIME_MINUTES, - TimeUnit.MINUTES, - timeService); - deactivated = true; - logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", name); - } - - void reactivate(LoadBalancerProvider policyProvider) { - if (deletionTimer != null && deletionTimer.isPending()) { - deletionTimer.cancel(); - deactivated = false; - logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", name); - } - if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) { - logger.log( - XdsLogLevel.DEBUG, - "Child balancer {0} switching policy from {1} to {2}", - name, this.policyProvider.getPolicyName(), policyProvider.getPolicyName()); - lb.switchTo(policyProvider); - this.policyProvider = policyProvider; - } - } - - void shutdown() { - if (deletionTimer != null && deletionTimer.isPending()) { - deletionTimer.cancel(); - } - lb.shutdown(); - logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deleted", name); - } - - private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper { - + protected SubchannelPicker getSubchannelPicker(Map childPickers) { + return new SubchannelPicker() { @Override - public void updateBalancingState(final ConnectivityState newState, - final SubchannelPicker newPicker) { - // If we are already in the process of resolving addresses, the overall balancing state - // will be updated at the end of it, and we don't need to trigger that update here. - if (!childLbStates.containsKey(name)) { - return; - } - // Subchannel picker and state are saved, but will only be propagated to the channel - // when the child instance exits deactivated state. - currentState = newState; - currentPicker = newPicker; - if (!deactivated && !resolvingAddresses) { - updateOverallBalancingState(); + public PickResult pickSubchannel(PickSubchannelArgs args) { + String clusterName = + args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY); + SubchannelPicker childPicker = childPickers.get(clusterName); + if (childPicker == null) { + return + PickResult.withError( + Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find " + + "available subchannel for cluster " + clusterName)); } + return childPicker.pickSubchannel(args); } @Override - protected Helper delegate() { - return helper; + public String toString() { + return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString(); } - } + }; } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 3af58ef93c..a7564e89a8 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -55,7 +55,6 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildCo import io.grpc.xds.XdsClient.ResourceWatcher; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index e833b3777b..5cf5431756 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -21,7 +21,6 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import io.grpc.ConnectivityState; import io.grpc.InternalLogId; @@ -36,7 +35,6 @@ import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -149,7 +147,7 @@ final class PriorityLoadBalancer extends LoadBalancer { ChildLbState child = new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution); children.put(priority, child); - updateOverallState(priority, CONNECTING, BUFFER_PICKER); + updateOverallState(priority, CONNECTING, LoadBalancer.EMPTY_PICKER); // Calling the child's updateResolvedAddresses() can result in tryNextPriority() being // called recursively. We need to be sure to be done with processing here before it is // called. @@ -210,7 +208,7 @@ final class PriorityLoadBalancer extends LoadBalancer { @Nullable ScheduledHandle deletionTimer; @Nullable String policy; ConnectivityState connectivityState = CONNECTING; - SubchannelPicker picker = BUFFER_PICKER; + SubchannelPicker picker = LoadBalancer.EMPTY_PICKER; ChildLbState(final String priority, boolean ignoreReresolution) { this.priority = priority; diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 436eca8ec5..20a70cb032 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -39,7 +39,6 @@ import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index 825e4a8eca..596247b823 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -21,7 +21,6 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.collect.ImmutableMap; import io.grpc.ConnectivityState; @@ -34,7 +33,6 @@ import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -159,7 +157,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer { if (overallState == TRANSIENT_FAILURE) { picker = new WeightedRandomPicker(errorPickers); } else { - picker = XdsSubchannelPickers.BUFFER_PICKER; + picker = LoadBalancer.EMPTY_PICKER; } } else { picker = new WeightedRandomPicker(childPickers); @@ -191,7 +189,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer { private final class ChildHelper extends ForwardingLoadBalancerHelper { String name; ConnectivityState currentState = CONNECTING; - SubchannelPicker currentPicker = BUFFER_PICKER; + SubchannelPicker currentPicker = LoadBalancer.EMPTY_PICKER; private ChildHelper(String name) { this.name = name; diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index b919649262..885844f1cb 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -32,7 +32,6 @@ import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.HashMap; import java.util.Map; import java.util.Objects; diff --git a/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java b/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java deleted file mode 100644 index 5c2890c34e..0000000000 --- a/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; -import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; -import io.grpc.LoadBalancer.SubchannelPicker; -import io.grpc.Status; - -final class XdsSubchannelPickers { - - private XdsSubchannelPickers() { /* DO NOT CALL ME */ } - - static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); - } - - @Override - public String toString() { - return "BUFFER_PICKER"; - } - }; - - static final class ErrorPicker extends SubchannelPicker { - - private final Status error; - - ErrorPicker(Status error) { - this.error = checkNotNull(error, "error"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("error", error) - .toString(); - } - } -} diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java index 6cb12550b6..6eab615147 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java @@ -54,7 +54,6 @@ import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -126,10 +125,14 @@ public class ClusterManagerLoadBalancerTest { assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult()); assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult()); assertThat(childBalancers).hasSize(2); - FakeLoadBalancer childBalancer1 = childBalancers.get(0); - FakeLoadBalancer childBalancer2 = childBalancers.get(1); - assertThat(childBalancer1.name).isEqualTo("policy_a"); - assertThat(childBalancer2.name).isEqualTo("policy_b"); + assertThat(childBalancers.stream() + .filter(b -> b.name.equals("policy_a")) + .count()).isEqualTo(1); + assertThat(childBalancers.stream() + .filter(b -> b.name.equals("policy_b")) + .count()).isEqualTo(1); + FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); + FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b"); assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("childA")); assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("childB")); @@ -151,8 +154,7 @@ public class ClusterManagerLoadBalancerTest { assertThat(childBalancer2.shutdown).isFalse(); assertThat(childBalancers).hasSize(3); - FakeLoadBalancer childBalancer3 = childBalancers.get(2); - assertThat(childBalancer3.name).isEqualTo("policy_c"); + FakeLoadBalancer childBalancer3 = getChildBalancerByName("policy_c"); assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("childC")); // delayed policy_b deletion @@ -166,8 +168,8 @@ public class ClusterManagerLoadBalancerTest { deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); assertThat(childBalancers).hasSize(2); - FakeLoadBalancer childBalancer1 = childBalancers.get(0); - FakeLoadBalancer childBalancer2 = childBalancers.get(1); + FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); + FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b"); Subchannel subchannel1 = mock(Subchannel.class); Subchannel subchannel2 = mock(Subchannel.class); childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY); @@ -184,11 +186,20 @@ public class ClusterManagerLoadBalancerTest { .isEqualTo(subchannel2); } + private FakeLoadBalancer getChildBalancerByName(String name) { + for (FakeLoadBalancer childLb : childBalancers) { + if (childLb.name.equals(name)) { + return childLb; + } + } + return null; + } + @Test public void ignoreBalancingStateUpdateForDeactivatedChildLbs() { deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); deliverResolvedAddresses(ImmutableMap.of("childB", "policy_b")); - FakeLoadBalancer childBalancer1 = childBalancers.get(0); // policy_a (deactivated) + FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); // policy_a (deactivated) Subchannel subchannel = mock(Subchannel.class); childBalancer1.deliverSubchannelState(subchannel, ConnectivityState.READY); verify(helper, never()).updateBalancingState( @@ -231,8 +242,8 @@ public class ClusterManagerLoadBalancerTest { public void handleNameResolutionError_afterChildLbsInstantiated_propagateToChildLbs() { deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); assertThat(childBalancers).hasSize(2); - FakeLoadBalancer childBalancer1 = childBalancers.get(0); - FakeLoadBalancer childBalancer2 = childBalancers.get(1); + FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); + FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b"); clusterManagerLoadBalancer.handleNameResolutionError( Status.UNAVAILABLE.withDescription("resolver error")); assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE); @@ -245,8 +256,8 @@ public class ClusterManagerLoadBalancerTest { public void handleNameResolutionError_notPropagateToDeactivatedChildLbs() { deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b")); deliverResolvedAddresses(ImmutableMap.of("childB", "policy_b")); - FakeLoadBalancer childBalancer1 = childBalancers.get(0); // policy_a (deactivated) - FakeLoadBalancer childBalancer2 = childBalancers.get(1); // policy_b + FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); // policy_a (deactivated) + FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b"); // policy_b clusterManagerLoadBalancer.handleNameResolutionError( Status.UNKNOWN.withDescription("unknown error")); assertThat(childBalancer1.upstreamError).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index a005f40fad..ebcd68dc95 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -21,7 +21,7 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static io.grpc.LoadBalancer.EMPTY_PICKER; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -41,6 +41,7 @@ import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.ErrorPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -55,7 +56,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -423,13 +423,13 @@ public class PriorityLoadBalancerTest { // p0 gets IDLE. helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 goes to CONNECTING helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // no failover happened @@ -459,15 +459,15 @@ public class PriorityLoadBalancerTest { // p0 gets IDLE. helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 goes to CONNECTING, reset failover timer fakeClock.forwardTime(5, TimeUnit.SECONDS); helper0.updateBalancingState( CONNECTING, - BUFFER_PICKER); - verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + EMPTY_PICKER); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); // failover happens fakeClock.forwardTime(10, TimeUnit.SECONDS); @@ -509,7 +509,7 @@ public class PriorityLoadBalancerTest { // p0 goes to CONNECTING helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // no failover happened @@ -560,7 +560,7 @@ public class PriorityLoadBalancerTest { // p0 gets IDLE. helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 fails over to p1 immediately. @@ -581,13 +581,13 @@ public class PriorityLoadBalancerTest { // p2 gets IDLE helper2.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 gets back to IDLE helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p2 fails but does not affect overall picker @@ -614,13 +614,13 @@ public class PriorityLoadBalancerTest { // p2 gets back to IDLE helper2.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 gets back to IDLE helper0.updateBalancingState( IDLE, - BUFFER_PICKER); + EMPTY_PICKER); assertCurrentPickerIsBufferPicker(); // p0 fails over to p2 and picker is updated to p2's existing picker. diff --git a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java index 91ab1e8fac..6cec8b0fb6 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java @@ -20,7 +20,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static io.grpc.LoadBalancer.EMPTY_PICKER; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -39,6 +39,7 @@ import com.google.common.collect.Iterables; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.ErrorPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -52,7 +53,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -209,7 +209,7 @@ public class WeightedTargetLoadBalancerTest { .setAttributes(Attributes.newBuilder().set(fakeKey, fakeValue).build()) .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); assertThat(childBalancers).hasSize(4); assertThat(childHelpers).hasSize(4); assertThat(fooLbCreated).isEqualTo(2); @@ -246,7 +246,7 @@ public class WeightedTargetLoadBalancerTest { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(new WeightedTargetConfig(newTargets)) .build()); - verify(helper, atLeast(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper, atLeast(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); assertThat(childBalancers).hasSize(5); assertThat(childHelpers).hasSize(5); assertThat(fooLbCreated).isEqualTo(3); // One more foo LB created for target4 @@ -288,7 +288,7 @@ public class WeightedTargetLoadBalancerTest { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); // Error after child balancers created. weightedTargetLb.handleNameResolutionError(Status.ABORTED); @@ -315,7 +315,7 @@ public class WeightedTargetLoadBalancerTest { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); // Subchannels to be created for each child balancer. final SubchannelPicker[] subchannelPickers = new SubchannelPicker[]{ @@ -335,7 +335,7 @@ public class WeightedTargetLoadBalancerTest { childHelpers.get(1).updateBalancingState(TRANSIENT_FAILURE, failurePickers[1]); verify(helper, never()).updateBalancingState( eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); - verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); // Another child balancer goes to READY. childHelpers.get(2).updateBalancingState(READY, subchannelPickers[2]); @@ -396,7 +396,7 @@ public class WeightedTargetLoadBalancerTest { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets)) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER)); + verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index 344876aa34..bb80635f1b 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -30,6 +30,7 @@ import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.ErrorPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; @@ -41,7 +42,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.net.SocketAddress; import java.util.Collections; import java.util.List;