Create a shared LB base class for LBs with multiple children (#10448)

* Create a shared LB base class for LBs with multiple children and change ClusgterManagerLoadBalancer to use it.
This commit is contained in:
Larry Safran 2023-08-08 21:36:27 +00:00 committed by GitHub
parent 4453ce7eb6
commit a0d8f2eb31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 373 additions and 339 deletions

View File

@ -115,6 +115,19 @@ public abstract class LoadBalancer {
@NameResolver.ResolutionResultAttr
public static final Attributes.Key<Map<String, ?>> ATTR_HEALTH_CHECKING_CONFIG =
Attributes.Key.create("internal:health-checking-config");
public static final SubchannelPicker EMPTY_PICKER = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
@Override
public String toString() {
return "EMPTY_PICKER";
}
};
private int recursionCount;
/**
@ -1398,4 +1411,26 @@ public abstract class LoadBalancer {
*/
public abstract LoadBalancer newLoadBalancer(Helper helper);
}
public static final class ErrorPicker extends SubchannelPicker {
private final Status error;
public ErrorPicker(Status error) {
this.error = checkNotNull(error, "error");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("error", error)
.toString();
}
}
}

View File

@ -0,0 +1,264 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* A base load balancing policy for those policies which has multiple children such as
* ClusterManager or the petiole policies.
*
* @since 1.58
*/
public abstract class MultiChildLoadBalancer extends LoadBalancer {
@VisibleForTesting
public static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15;
private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName());
private final Map<Object, ChildLbState> childLbStates = new HashMap<>();
private final Helper helper;
protected final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
// Set to true if currently in the process of handling resolved addresses.
private boolean resolvingAddresses;
protected MultiChildLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
logger.log(Level.FINE, "Created");
}
protected SubchannelPicker getInitialPicker() {
return EMPTY_PICKER;
}
protected SubchannelPicker getErrorPicker(Status error) {
return new ErrorPicker(error);
}
protected abstract Map<Object, PolicySelection> getPolicySelectionMap(
ResolvedAddresses resolvedAddresses);
protected abstract SubchannelPicker getSubchannelPicker(
Map<Object, SubchannelPicker> childPickers);
@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
try {
resolvingAddresses = true;
return acceptResolvedAddressesInternal(resolvedAddresses);
} finally {
resolvingAddresses = false;
}
}
private boolean acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
Map<Object, PolicySelection> newChildPolicies = getPolicySelectionMap(resolvedAddresses);
for (Map.Entry<Object, PolicySelection> entry : newChildPolicies.entrySet()) {
final Object key = entry.getKey();
LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider();
Object childConfig = entry.getValue().getConfig();
if (!childLbStates.containsKey(key)) {
childLbStates.put(key, new ChildLbState(key, childPolicyProvider, getInitialPicker()));
} else {
childLbStates.get(key).reactivate(childPolicyProvider);
}
LoadBalancer childLb = childLbStates.get(key).lb;
ResolvedAddresses childAddresses =
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build();
childLb.handleResolvedAddresses(childAddresses);
}
for (Object key : childLbStates.keySet()) {
if (!newChildPolicies.containsKey(key)) {
childLbStates.get(key).deactivate();
}
}
// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();
return true;
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(Level.WARNING, "Received name resolution error: {0}", error);
boolean gotoTransientFailure = true;
for (ChildLbState state : childLbStates.values()) {
if (!state.deactivated) {
gotoTransientFailure = false;
state.lb.handleNameResolutionError(error);
}
}
if (gotoTransientFailure) {
helper.updateBalancingState(TRANSIENT_FAILURE, getErrorPicker(error));
}
}
@Override
public void shutdown() {
logger.log(Level.INFO, "Shutdown");
for (ChildLbState state : childLbStates.values()) {
state.shutdown();
}
childLbStates.clear();
}
private void updateOverallBalancingState() {
ConnectivityState overallState = null;
final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
for (ChildLbState childLbState : childLbStates.values()) {
if (childLbState.deactivated) {
continue;
}
childPickers.put(childLbState.key, childLbState.currentPicker);
overallState = aggregateState(overallState, childLbState.currentState);
}
if (overallState != null) {
helper.updateBalancingState(overallState, getSubchannelPicker(childPickers));
}
}
@Nullable
private static ConnectivityState aggregateState(
@Nullable ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
return childState;
}
if (overallState == READY || childState == READY) {
return READY;
}
if (overallState == CONNECTING || childState == CONNECTING) {
return CONNECTING;
}
if (overallState == IDLE || childState == IDLE) {
return IDLE;
}
return overallState;
}
private final class ChildLbState {
private final Object key;
private final GracefulSwitchLoadBalancer lb;
private LoadBalancerProvider policyProvider;
private ConnectivityState currentState = CONNECTING;
private SubchannelPicker currentPicker;
private boolean deactivated;
@Nullable
ScheduledHandle deletionTimer;
ChildLbState(Object key, LoadBalancerProvider policyProvider, SubchannelPicker initialPicker) {
this.key = key;
this.policyProvider = policyProvider;
lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper());
lb.switchTo(policyProvider);
currentPicker = initialPicker;
}
void deactivate() {
if (deactivated) {
return;
}
class DeletionTask implements Runnable {
@Override
public void run() {
shutdown();
childLbStates.remove(key);
}
}
deletionTimer =
syncContext.schedule(
new DeletionTask(),
DELAYED_CHILD_DELETION_TIME_MINUTES,
TimeUnit.MINUTES,
timeService);
deactivated = true;
logger.log(Level.FINE, "Child balancer {0} deactivated", key);
}
void reactivate(LoadBalancerProvider policyProvider) {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
deactivated = false;
logger.log(Level.FINE, "Child balancer {0} reactivated", key);
}
if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) {
Object[] objects = {
key, this.policyProvider.getPolicyName(),policyProvider.getPolicyName()};
logger.log(Level.FINE, "Child balancer {0} switching policy from {1} to {2}", objects);
lb.switchTo(policyProvider);
this.policyProvider = policyProvider;
}
}
void shutdown() {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
lb.shutdown();
logger.log(Level.FINE, "Child balancer {0} deleted", key);
}
private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (!childLbStates.containsKey(key)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated && !resolvingAddresses) {
updateOverallBalancingState();
}
}
@Override
protected Helper delegate() {
return helper;
}
}
}
}

View File

@ -39,7 +39,6 @@ import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;

View File

@ -17,7 +17,6 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@ -46,7 +45,6 @@ import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import io.grpc.xds.orca.OrcaPerRequestUtil;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
@ -173,7 +171,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
private final AtomicLong inFlights;
private ConnectivityState currentState = ConnectivityState.IDLE;
private SubchannelPicker currentPicker = BUFFER_PICKER;
private SubchannelPicker currentPicker = LoadBalancer.EMPTY_PICKER;
private List<DropOverload> dropPolicies = Collections.emptyList();
private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
@Nullable

View File

@ -16,266 +16,63 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* The top-level load balancing policy.
*/
class ClusterManagerLoadBalancer extends LoadBalancer {
class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
@VisibleForTesting
static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15;
private final Map<String, ChildLbState> childLbStates = new HashMap<>();
private final Helper helper;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final XdsLogger logger;
// Set to true if currently in the process of handling resolved addresses.
private boolean resolvingAddresses;
ClusterManagerLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
super(helper);
logger = XdsLogger.withLogId(
InternalLogId.allocate("cluster_manager-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
try {
resolvingAddresses = true;
return acceptResolvedAddressesInternal(resolvedAddresses);
} finally {
resolvingAddresses = false;
}
}
public boolean acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
protected Map<Object, PolicySelection> getPolicySelectionMap(
ResolvedAddresses resolvedAddresses) {
ClusterManagerConfig config = (ClusterManagerConfig)
resolvedAddresses.getLoadBalancingPolicyConfig();
Map<String, PolicySelection> newChildPolicies = config.childPolicies;
Map<Object, PolicySelection> newChildPolicies = new HashMap<>(config.childPolicies);
logger.log(
XdsLogLevel.INFO,
"Received cluster_manager lb config: child names={0}", newChildPolicies.keySet());
for (Map.Entry<String, PolicySelection> entry : newChildPolicies.entrySet()) {
final String name = entry.getKey();
LoadBalancerProvider childPolicyProvider = entry.getValue().getProvider();
Object childConfig = entry.getValue().getConfig();
if (!childLbStates.containsKey(name)) {
childLbStates.put(name, new ChildLbState(name, childPolicyProvider));
} else {
childLbStates.get(name).reactivate(childPolicyProvider);
}
LoadBalancer childLb = childLbStates.get(name).lb;
ResolvedAddresses childAddresses =
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build();
childLb.handleResolvedAddresses(childAddresses);
}
for (String name : childLbStates.keySet()) {
if (!newChildPolicies.containsKey(name)) {
childLbStates.get(name).deactivate();
}
}
// Must update channel picker before return so that new RPCs will not be routed to deleted
// clusters and resolver can remove them in service config.
updateOverallBalancingState();
return true;
return newChildPolicies;
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
boolean gotoTransientFailure = true;
for (ChildLbState state : childLbStates.values()) {
if (!state.deactivated) {
gotoTransientFailure = false;
state.lb.handleNameResolutionError(error);
}
}
if (gotoTransientFailure) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
for (ChildLbState state : childLbStates.values()) {
state.shutdown();
}
childLbStates.clear();
}
private void updateOverallBalancingState() {
ConnectivityState overallState = null;
final Map<String, SubchannelPicker> childPickers = new HashMap<>();
for (ChildLbState childLbState : childLbStates.values()) {
if (childLbState.deactivated) {
continue;
}
childPickers.put(childLbState.name, childLbState.currentPicker);
overallState = aggregateState(overallState, childLbState.currentState);
}
if (overallState != null) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
String clusterName =
args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY);
SubchannelPicker delegate = childPickers.get(clusterName);
if (delegate == null) {
return
PickResult.withError(
Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find "
+ "available subchannel for cluster " + clusterName));
}
return delegate.pickSubchannel(args);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString();
}
};
helper.updateBalancingState(overallState, picker);
}
}
@Nullable
private static ConnectivityState aggregateState(
@Nullable ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
return childState;
}
if (overallState == READY || childState == READY) {
return READY;
}
if (overallState == CONNECTING || childState == CONNECTING) {
return CONNECTING;
}
if (overallState == IDLE || childState == IDLE) {
return IDLE;
}
return overallState;
}
private final class ChildLbState {
private final String name;
private final GracefulSwitchLoadBalancer lb;
private LoadBalancerProvider policyProvider;
private ConnectivityState currentState = CONNECTING;
private SubchannelPicker currentPicker = BUFFER_PICKER;
private boolean deactivated;
@Nullable
ScheduledHandle deletionTimer;
ChildLbState(String name, LoadBalancerProvider policyProvider) {
this.name = name;
this.policyProvider = policyProvider;
lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper());
lb.switchTo(policyProvider);
}
void deactivate() {
if (deactivated) {
return;
}
class DeletionTask implements Runnable {
@Override
public void run() {
shutdown();
childLbStates.remove(name);
}
}
deletionTimer =
syncContext.schedule(
new DeletionTask(),
DELAYED_CHILD_DELETION_TIME_MINUTES,
TimeUnit.MINUTES,
timeService);
deactivated = true;
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", name);
}
void reactivate(LoadBalancerProvider policyProvider) {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
deactivated = false;
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", name);
}
if (!this.policyProvider.getPolicyName().equals(policyProvider.getPolicyName())) {
logger.log(
XdsLogLevel.DEBUG,
"Child balancer {0} switching policy from {1} to {2}",
name, this.policyProvider.getPolicyName(), policyProvider.getPolicyName());
lb.switchTo(policyProvider);
this.policyProvider = policyProvider;
}
}
void shutdown() {
if (deletionTimer != null && deletionTimer.isPending()) {
deletionTimer.cancel();
}
lb.shutdown();
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deleted", name);
}
private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
return new SubchannelPicker() {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (!childLbStates.containsKey(name)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated && !resolvingAddresses) {
updateOverallBalancingState();
public PickResult pickSubchannel(PickSubchannelArgs args) {
String clusterName =
args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY);
SubchannelPicker childPicker = childPickers.get(clusterName);
if (childPicker == null) {
return
PickResult.withError(
Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find "
+ "available subchannel for cluster " + clusterName));
}
return childPicker.pickSubchannel(args);
}
@Override
protected Helper delegate() {
return helper;
public String toString() {
return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString();
}
}
};
}
}

View File

@ -55,7 +55,6 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildCo
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;

View File

@ -21,7 +21,6 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
@ -36,7 +35,6 @@ import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -149,7 +147,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
ChildLbState child =
new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
children.put(priority, child);
updateOverallState(priority, CONNECTING, BUFFER_PICKER);
updateOverallState(priority, CONNECTING, LoadBalancer.EMPTY_PICKER);
// Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
// called recursively. We need to be sure to be done with processing here before it is
// called.
@ -210,7 +208,7 @@ final class PriorityLoadBalancer extends LoadBalancer {
@Nullable ScheduledHandle deletionTimer;
@Nullable String policy;
ConnectivityState connectivityState = CONNECTING;
SubchannelPicker picker = BUFFER_PICKER;
SubchannelPicker picker = LoadBalancer.EMPTY_PICKER;
ChildLbState(final String priority, boolean ignoreReresolution) {
this.priority = priority;

View File

@ -39,7 +39,6 @@ import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;

View File

@ -21,7 +21,6 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.collect.ImmutableMap;
import io.grpc.ConnectivityState;
@ -34,7 +33,6 @@ import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -159,7 +157,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
if (overallState == TRANSIENT_FAILURE) {
picker = new WeightedRandomPicker(errorPickers);
} else {
picker = XdsSubchannelPickers.BUFFER_PICKER;
picker = LoadBalancer.EMPTY_PICKER;
}
} else {
picker = new WeightedRandomPicker(childPickers);
@ -191,7 +189,7 @@ final class WeightedTargetLoadBalancer extends LoadBalancer {
private final class ChildHelper extends ForwardingLoadBalancerHelper {
String name;
ConnectivityState currentState = CONNECTING;
SubchannelPicker currentPicker = BUFFER_PICKER;
SubchannelPicker currentPicker = LoadBalancer.EMPTY_PICKER;
private ChildHelper(String name) {
this.name = name;

View File

@ -32,7 +32,6 @@ import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

View File

@ -1,63 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status;
final class XdsSubchannelPickers {
private XdsSubchannelPickers() { /* DO NOT CALL ME */ }
static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
@Override
public String toString() {
return "BUFFER_PICKER";
}
};
static final class ErrorPicker extends SubchannelPicker {
private final Status error;
ErrorPicker(Status error) {
this.error = checkNotNull(error, "error");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("error", error)
.toString();
}
}
}

View File

@ -54,7 +54,6 @@ import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -126,10 +125,14 @@ public class ClusterManagerLoadBalancerTest {
assertThat(pickSubchannel(picker, "childA")).isEqualTo(PickResult.withNoResult());
assertThat(pickSubchannel(picker, "childB")).isEqualTo(PickResult.withNoResult());
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
assertThat(childBalancer1.name).isEqualTo("policy_a");
assertThat(childBalancer2.name).isEqualTo("policy_b");
assertThat(childBalancers.stream()
.filter(b -> b.name.equals("policy_a"))
.count()).isEqualTo(1);
assertThat(childBalancers.stream()
.filter(b -> b.name.equals("policy_b"))
.count()).isEqualTo(1);
FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a");
FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b");
assertThat(childBalancer1.config).isEqualTo(lbConfigInventory.get("childA"));
assertThat(childBalancer2.config).isEqualTo(lbConfigInventory.get("childB"));
@ -151,8 +154,7 @@ public class ClusterManagerLoadBalancerTest {
assertThat(childBalancer2.shutdown).isFalse();
assertThat(childBalancers).hasSize(3);
FakeLoadBalancer childBalancer3 = childBalancers.get(2);
assertThat(childBalancer3.name).isEqualTo("policy_c");
FakeLoadBalancer childBalancer3 = getChildBalancerByName("policy_c");
assertThat(childBalancer3.config).isEqualTo(lbConfigInventory.get("childC"));
// delayed policy_b deletion
@ -166,8 +168,8 @@ public class ClusterManagerLoadBalancerTest {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a");
FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b");
Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
childBalancer1.deliverSubchannelState(subchannel1, ConnectivityState.READY);
@ -184,11 +186,20 @@ public class ClusterManagerLoadBalancerTest {
.isEqualTo(subchannel2);
}
private FakeLoadBalancer getChildBalancerByName(String name) {
for (FakeLoadBalancer childLb : childBalancers) {
if (childLb.name.equals(name)) {
return childLb;
}
}
return null;
}
@Test
public void ignoreBalancingStateUpdateForDeactivatedChildLbs() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
deliverResolvedAddresses(ImmutableMap.of("childB", "policy_b"));
FakeLoadBalancer childBalancer1 = childBalancers.get(0); // policy_a (deactivated)
FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); // policy_a (deactivated)
Subchannel subchannel = mock(Subchannel.class);
childBalancer1.deliverSubchannelState(subchannel, ConnectivityState.READY);
verify(helper, never()).updateBalancingState(
@ -231,8 +242,8 @@ public class ClusterManagerLoadBalancerTest {
public void handleNameResolutionError_afterChildLbsInstantiated_propagateToChildLbs() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
assertThat(childBalancers).hasSize(2);
FakeLoadBalancer childBalancer1 = childBalancers.get(0);
FakeLoadBalancer childBalancer2 = childBalancers.get(1);
FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a");
FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b");
clusterManagerLoadBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription("resolver error"));
assertThat(childBalancer1.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE);
@ -245,8 +256,8 @@ public class ClusterManagerLoadBalancerTest {
public void handleNameResolutionError_notPropagateToDeactivatedChildLbs() {
deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
deliverResolvedAddresses(ImmutableMap.of("childB", "policy_b"));
FakeLoadBalancer childBalancer1 = childBalancers.get(0); // policy_a (deactivated)
FakeLoadBalancer childBalancer2 = childBalancers.get(1); // policy_b
FakeLoadBalancer childBalancer1 = getChildBalancerByName("policy_a"); // policy_a (deactivated)
FakeLoadBalancer childBalancer2 = getChildBalancerByName("policy_b"); // policy_b
clusterManagerLoadBalancer.handleNameResolutionError(
Status.UNKNOWN.withDescription("unknown error"));
assertThat(childBalancer1.upstreamError).isNull();

View File

@ -21,7 +21,7 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static io.grpc.LoadBalancer.EMPTY_PICKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
@ -41,6 +41,7 @@ import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -55,7 +56,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.internal.TestUtils.StandardLoadBalancerProvider;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
@ -423,13 +423,13 @@ public class PriorityLoadBalancerTest {
// p0 gets IDLE.
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 goes to CONNECTING
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// no failover happened
@ -459,15 +459,15 @@ public class PriorityLoadBalancerTest {
// p0 gets IDLE.
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 goes to CONNECTING, reset failover timer
fakeClock.forwardTime(5, TimeUnit.SECONDS);
helper0.updateBalancingState(
CONNECTING,
BUFFER_PICKER);
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
EMPTY_PICKER);
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
// failover happens
fakeClock.forwardTime(10, TimeUnit.SECONDS);
@ -509,7 +509,7 @@ public class PriorityLoadBalancerTest {
// p0 goes to CONNECTING
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// no failover happened
@ -560,7 +560,7 @@ public class PriorityLoadBalancerTest {
// p0 gets IDLE.
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 fails over to p1 immediately.
@ -581,13 +581,13 @@ public class PriorityLoadBalancerTest {
// p2 gets IDLE
helper2.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 gets back to IDLE
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p2 fails but does not affect overall picker
@ -614,13 +614,13 @@ public class PriorityLoadBalancerTest {
// p2 gets back to IDLE
helper2.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 gets back to IDLE
helper0.updateBalancingState(
IDLE,
BUFFER_PICKER);
EMPTY_PICKER);
assertCurrentPickerIsBufferPicker();
// p0 fails over to p2 and picker is updated to p2's existing picker.

View File

@ -20,7 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static io.grpc.LoadBalancer.EMPTY_PICKER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
@ -39,6 +39,7 @@ import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -52,7 +53,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
@ -209,7 +209,7 @@ public class WeightedTargetLoadBalancerTest {
.setAttributes(Attributes.newBuilder().set(fakeKey, fakeValue).build())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
assertThat(childBalancers).hasSize(4);
assertThat(childHelpers).hasSize(4);
assertThat(fooLbCreated).isEqualTo(2);
@ -246,7 +246,7 @@ public class WeightedTargetLoadBalancerTest {
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(newTargets))
.build());
verify(helper, atLeast(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper, atLeast(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
assertThat(childBalancers).hasSize(5);
assertThat(childHelpers).hasSize(5);
assertThat(fooLbCreated).isEqualTo(3); // One more foo LB created for target4
@ -288,7 +288,7 @@ public class WeightedTargetLoadBalancerTest {
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
// Error after child balancers created.
weightedTargetLb.handleNameResolutionError(Status.ABORTED);
@ -315,7 +315,7 @@ public class WeightedTargetLoadBalancerTest {
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
// Subchannels to be created for each child balancer.
final SubchannelPicker[] subchannelPickers = new SubchannelPicker[]{
@ -335,7 +335,7 @@ public class WeightedTargetLoadBalancerTest {
childHelpers.get(1).updateBalancingState(TRANSIENT_FAILURE, failurePickers[1]);
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper, times(2)).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
// Another child balancer goes to READY.
childHelpers.get(2).updateBalancingState(READY, subchannelPickers[2]);
@ -396,7 +396,7 @@ public class WeightedTargetLoadBalancerTest {
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
verify(helper).updateBalancingState(eq(CONNECTING), eq(BUFFER_PICKER));
verify(helper).updateBalancingState(eq(CONNECTING), eq(EMPTY_PICKER));
// LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first,
// any further balancing state update should be ignored.

View File

@ -30,6 +30,7 @@ import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.ErrorPicker;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
@ -41,7 +42,6 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;