util:MultiChildLoadBalancer cleanup (#10780)

* add final, change method permissions, add javadoc, cleanup unneeded, move updateOverallBalancingState to ClusterManagerLB and make it abstract

* Restructure to eliminate the flags as protected methods

* Move methods around so that the candidates for override are near the top.

* Reorder picker methods lower
This commit is contained in:
Larry Safran 2024-02-15 14:12:40 -08:00 committed by GitHub
parent f20c853c40
commit 044749706a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 319 additions and 287 deletions

View File

@ -44,6 +44,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -71,51 +72,12 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
logger.log(Level.FINE, "Created");
}
protected abstract SubchannelPicker getSubchannelPicker(
Map<Object, SubchannelPicker> childPickers);
protected SubchannelPicker getInitialPicker() {
return new FixedResultPicker(PickResult.withNoResult());
}
protected SubchannelPicker getErrorPicker(Status error) {
return new FixedResultPicker(PickResult.withError(error));
}
/**
* Generally, the only reason to override this is to expose it to a test of a LB in a different
* package.
* Using the state of all children will calculate the current connectivity state,
* update fields, generate a picker and then call
* {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
*/
protected ImmutableMap<Object, ChildLbState> getImmutableChildMap() {
return ImmutableMap.copyOf(childLbStates);
}
@VisibleForTesting
protected Collection<ChildLbState> getChildLbStates() {
return childLbStates.values();
}
/**
* Generally, the only reason to override this is to expose it to a test of a LB in a
* different package.
*/
protected ChildLbState getChildLbState(Object key) {
if (key == null) {
return null;
}
if (key instanceof EquivalentAddressGroup) {
key = new Endpoint((EquivalentAddressGroup) key);
}
return childLbStates.get(key);
}
/**
* Generally, the only reason to override this is to expose it to a test of a LB in a different
* package.
*/
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
return getChildLbState(new Endpoint(eag));
}
protected abstract void updateOverallBalancingState();
/**
* Override to utilize parsing of the policy configuration or alternative helper/lb generation.
@ -153,8 +115,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
resolvingAddresses = true;
// process resolvedAddresses to update children
AcceptResolvedAddressRetVal acceptRetVal =
acceptResolvedAddressesInternal(resolvedAddresses);
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
}
@ -206,66 +167,10 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
}
/**
* This does the work to update the child map and calculate which children have been removed.
* You must call {@link #updateOverallBalancingState} to update the picker
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
*/
protected AcceptResolvedAddressRetVal acceptResolvedAddressesInternal(
ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
if (newChildren.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return new AcceptResolvedAddressRetVal(unavailableStatus, null);
}
// Do adds and updates
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
final Object key = entry.getKey();
LoadBalancerProvider childPolicyProvider = entry.getValue().getPolicyProvider();
Object childConfig = entry.getValue().getConfig();
if (!childLbStates.containsKey(key)) {
childLbStates.put(key, entry.getValue());
} else {
// Reuse the existing one
ChildLbState existingChildLbState = childLbStates.get(key);
if (existingChildLbState.isDeactivated() && reactivateChildOnReuse()) {
existingChildLbState.reactivate(childPolicyProvider);
}
}
ChildLbState childLbState = childLbStates.get(key);
ResolvedAddresses childAddresses = getChildAddresses(key, resolvedAddresses, childConfig);
childLbStates.get(key).setResolvedAddresses(childAddresses); // update child
if (!childLbState.deactivated) {
childLbState.lb.handleResolvedAddresses(childAddresses); // update child LB
}
}
List<ChildLbState> removedChildren = new ArrayList<>();
// Do removals
for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
if (!newChildren.containsKey(key)) {
ChildLbState childLbState = childLbStates.get(key);
childLbState.deactivate();
removedChildren.add(childLbState);
}
}
return new AcceptResolvedAddressRetVal(Status.OK, removedChildren);
}
protected void shutdownRemoved(List<ChildLbState> removedChildren) {
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
// subchannel that has been shutdown.
for (ChildLbState childLbState : removedChildren) {
childLbState.shutdown();
}
}
* Handle the name resolution error.
*
* <p/>Override if you need special handling.
*/
@Override
public void handleNameResolutionError(Status error) {
if (currentConnectivityState != READY) {
@ -273,26 +178,31 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
}
}
/**
* Handle the name resolution error only for the specified child.
*
* <p/>Override if you need special handling.
*/
protected void handleNameResolutionError(ChildLbState child, Status error) {
child.lb.handleNameResolutionError(error);
}
/**
* If true, then when a subchannel state changes to idle, the corresponding child will
* have requestConnection called on its LB. Also causes the PickFirstLB to be created when
* the child is created or reused.
* Creates a picker representing the state before any connections have been established.
*
* <p/>Override to produce a custom picker.
*/
protected boolean reconnectOnIdle() {
return true;
protected SubchannelPicker getInitialPicker() {
return new FixedResultPicker(PickResult.withNoResult());
}
/**
* If true, then when {@link #acceptResolvedAddresses} sees a key that was already part of the
* child map which is deactivated, it will call reactivate on the child.
* If false, it will leave it deactivated.
* Creates a new picker representing an error status.
*
* <p/>Override to produce a custom picker when there are errors.
*/
protected boolean reactivateChildOnReuse() {
return true;
protected SubchannelPicker getErrorPicker(Status error) {
return new FixedResultPicker(PickResult.withError(error));
}
@Override
@ -304,20 +214,93 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
childLbStates.clear();
}
protected void updateOverallBalancingState() {
ConnectivityState overallState = null;
final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
for (ChildLbState childLbState : getChildLbStates()) {
if (childLbState.deactivated) {
continue;
}
childPickers.put(childLbState.key, childLbState.currentPicker);
overallState = aggregateState(overallState, childLbState.currentState);
/**
* This does the work to update the child map and calculate which children have been removed.
* You must call {@link #updateOverallBalancingState} to update the picker
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
*/
protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
ResolvedAddresses resolvedAddresses) {
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
// Subclass handles any special manipulation to create appropriate types of keyed ChildLbStates
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
// Handle error case
if (newChildren.isEmpty()) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. " + resolvedAddresses);
handleNameResolutionError(unavailableStatus);
return new AcceptResolvedAddrRetVal(unavailableStatus, null);
}
if (overallState != null) {
helper.updateBalancingState(overallState, getSubchannelPicker(childPickers));
currentConnectivityState = overallState;
Collection<ChildLbState> reusedChildren = addMissingChildrenAndIdReuse(newChildren);
// Raactivate deactivated children
for (ChildLbState reusedChild : reusedChildren) {
reusedChild.reactivate(reusedChild.getPolicyProvider());
}
updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
return new AcceptResolvedAddrRetVal(Status.OK, getRemovedChildren(newChildren.keySet()));
}
protected final Collection<ChildLbState> addMissingChildrenAndIdReuse(
Map<Object, ChildLbState> newChildren) {
Collection<ChildLbState> reusedChildren = new ArrayList<>();
// Do adds and identify reused children
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
final Object key = entry.getKey();
if (!childLbStates.containsKey(key)) {
childLbStates.put(key, entry.getValue());
} else {
// Reuse the existing one
ChildLbState existingChildLbState = childLbStates.get(key);
if (existingChildLbState.isDeactivated()) {
reusedChildren.add(existingChildLbState);
}
}
}
return reusedChildren;
}
protected final void updateChildrenWithResolvedAddresses(ResolvedAddresses resolvedAddresses,
Map<Object, ChildLbState> newChildren) {
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
Object childConfig = entry.getValue().getConfig();
ChildLbState childLbState = childLbStates.get(entry.getKey());
ResolvedAddresses childAddresses =
getChildAddresses(entry.getKey(), resolvedAddresses, childConfig);
childLbState.setResolvedAddresses(childAddresses); // update child
if (!childLbState.deactivated) {
childLbState.lb.handleResolvedAddresses(childAddresses); // update child LB
}
}
}
/**
* Identifies which children have been removed (are not part of the newChildKeys).
*/
protected final List<ChildLbState> getRemovedChildren(Set<Object> newChildKeys) {
List<ChildLbState> removedChildren = new ArrayList<>();
// Do removals
for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
if (!newChildKeys.contains(key)) {
ChildLbState childLbState = childLbStates.get(key);
childLbState.deactivate();
removedChildren.add(childLbState);
}
}
return removedChildren;
}
protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
// subchannel that has been shutdown.
for (ChildLbState childLbState : removedChildren) {
childLbState.shutdown();
}
}
@ -339,18 +322,44 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
return overallState;
}
protected Helper getHelper() {
protected final Helper getHelper() {
return helper;
}
protected void removeChild(Object key) {
protected final void removeChild(Object key) {
childLbStates.remove(key);
}
@VisibleForTesting
public final ImmutableMap<Object, ChildLbState> getImmutableChildMap() {
return ImmutableMap.copyOf(childLbStates);
}
@VisibleForTesting
public final Collection<ChildLbState> getChildLbStates() {
return childLbStates.values();
}
@VisibleForTesting
public final ChildLbState getChildLbState(Object key) {
if (key == null) {
return null;
}
if (key instanceof EquivalentAddressGroup) {
key = new Endpoint((EquivalentAddressGroup) key);
}
return childLbStates.get(key);
}
@VisibleForTesting
public final ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
return getChildLbState(new Endpoint(eag));
}
/**
* Filters out non-ready and deactivated child load balancers (subchannels).
*/
protected List<ChildLbState> getReadyChildren() {
protected final List<ChildLbState> getReadyChildren() {
List<ChildLbState> activeChildren = new ArrayList<>();
for (ChildLbState child : getChildLbStates()) {
if (!child.isDeactivated() && child.getCurrentState() == READY) {
@ -396,7 +405,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
this.deactivated = deactivated;
this.currentPicker = initialPicker;
this.config = childConfig;
this.lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper());
this.lb = new GracefulSwitchLoadBalancer(createChildHelper());
this.currentState = deactivated ? IDLE : CONNECTING;
this.resolvedAddresses = resolvedAddrs;
if (!deactivated) {
@ -404,68 +413,8 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
}
}
@Override
public String toString() {
return "Address = " + key
+ ", state = " + currentState
+ ", picker type: " + currentPicker.getClass()
+ ", lb: " + lb.delegate().getClass()
+ (deactivated ? ", deactivated" : "");
}
public Object getKey() {
return key;
}
Object getConfig() {
return config;
}
protected GracefulSwitchLoadBalancer getLb() {
return lb;
}
public LoadBalancerProvider getPolicyProvider() {
return policyProvider;
}
protected Subchannel getSubchannels(PickSubchannelArgs args) {
if (getCurrentPicker() == null) {
return null;
}
return getCurrentPicker().pickSubchannel(args).getSubchannel();
}
public ConnectivityState getCurrentState() {
return currentState;
}
public SubchannelPicker getCurrentPicker() {
return currentPicker;
}
public EquivalentAddressGroup getEag() {
if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
return null;
}
return resolvedAddresses.getAddresses().get(0);
}
public boolean isDeactivated() {
return deactivated;
}
protected void setDeactivated() {
deactivated = true;
}
protected void markReactivated() {
deactivated = false;
}
protected void setResolvedAddresses(ResolvedAddresses newAddresses) {
checkNotNull(newAddresses, "Missing address list for child");
resolvedAddresses = newAddresses;
protected ChildLbStateHelper createChildHelper() {
return new ChildLbStateHelper();
}
/**
@ -497,12 +446,94 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
deactivated = false;
}
/**
* Override for unique behavior such as delayed shutdowns of subchannels.
*/
protected void shutdown() {
lb.shutdown();
this.currentState = SHUTDOWN;
logger.log(Level.FINE, "Child balancer {0} deleted", key);
}
@Override
public String toString() {
return "Address = " + key
+ ", state = " + currentState
+ ", picker type: " + currentPicker.getClass()
+ ", lb: " + lb.delegate().getClass()
+ (deactivated ? ", deactivated" : "");
}
public final Object getKey() {
return key;
}
@VisibleForTesting
public final GracefulSwitchLoadBalancer getLb() {
return lb;
}
@VisibleForTesting
public final SubchannelPicker getCurrentPicker() {
return currentPicker;
}
protected final LoadBalancerProvider getPolicyProvider() {
return policyProvider;
}
protected final Subchannel getSubchannels(PickSubchannelArgs args) {
if (getCurrentPicker() == null) {
return null;
}
return getCurrentPicker().pickSubchannel(args).getSubchannel();
}
public final ConnectivityState getCurrentState() {
return currentState;
}
protected final void setCurrentState(ConnectivityState newState) {
currentState = newState;
}
protected final void setCurrentPicker(SubchannelPicker newPicker) {
currentPicker = newPicker;
}
public final EquivalentAddressGroup getEag() {
if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
return null;
}
return resolvedAddresses.getAddresses().get(0);
}
public final boolean isDeactivated() {
return deactivated;
}
protected final void setDeactivated() {
deactivated = true;
}
protected final void markReactivated() {
deactivated = false;
}
protected final void setResolvedAddresses(ResolvedAddresses newAddresses) {
checkNotNull(newAddresses, "Missing address list for child");
resolvedAddresses = newAddresses;
}
private Object getConfig() {
return config;
}
@VisibleForTesting
public final ResolvedAddresses getResolvedAddresses() {
return resolvedAddresses;
}
/**
* ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
* petiole policy above and the PickFirstLoadBalancer's helper below.
@ -510,13 +541,14 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
* <p>The ChildLbState updates happen during updateBalancingState. Otherwise, it is doing
* simple forwarding.
*/
@VisibleForTesting
public ResolvedAddresses getResolvedAddresses() {
return resolvedAddresses;
}
private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
protected class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
/**
* Update current state and picker for this child and then use
* {@link #updateOverallBalancingState()} for the parent LB.
*
* <p/>Override this if you don't want to automatically request a connection when in IDLE
*/
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
@ -525,12 +557,13 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
if (!childLbStates.containsKey(key)) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
currentState = newState;
currentPicker = newPicker;
if (!deactivated && !resolvingAddresses) {
if (newState == IDLE && reconnectOnIdle()) {
if (newState == IDLE) {
lb.requestConnection();
}
updateOverallBalancingState();
@ -598,11 +631,11 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
}
}
protected static class AcceptResolvedAddressRetVal {
protected static class AcceptResolvedAddrRetVal {
public final Status status;
public final List<ChildLbState> removedChildren;
public AcceptResolvedAddressRetVal(Status status, List<ChildLbState> removedChildren) {
public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
this.status = status;
this.removedChildren = removedChildren;
}

View File

@ -34,7 +34,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@ -51,11 +50,6 @@ public class RoundRobinLoadBalancer extends MultiChildLoadBalancer {
super(helper);
}
@Override
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
throw new UnsupportedOperationException(); // local updateOverallBalancingState doesn't use this
}
/**
* Updates picker with the list of active subchannels (state == READY).
*/

View File

@ -21,6 +21,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
@ -103,7 +104,7 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
resolvingAddresses = true;
// process resolvedAddresses to update children
AcceptResolvedAddressRetVal acceptRetVal =
AcceptResolvedAddrRetVal acceptRetVal =
acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
return acceptRetVal.status;
@ -118,7 +119,29 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
}
}
/**
* Using the state of all children will calculate the current connectivity state,
* update currentConnectivityState, generate a picker and then call
* {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
*/
@Override
protected void updateOverallBalancingState() {
ConnectivityState overallState = null;
final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
for (ChildLbState childLbState : getChildLbStates()) {
if (childLbState.isDeactivated()) {
continue;
}
childPickers.put(childLbState.getKey(), childLbState.getCurrentPicker());
overallState = aggregateState(overallState, childLbState.getCurrentState());
}
if (overallState != null) {
getHelper().updateBalancingState(overallState, getSubchannelPicker(childPickers));
currentConnectivityState = overallState;
}
}
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
return new SubchannelPicker() {
@Override
@ -157,11 +180,6 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
}
}
@Override
protected boolean reconnectOnIdle() {
return false;
}
/**
* This differs from the base class in the use of the deletion timer. When it is deactivated,
* rather than immediately calling shutdown it starts a timer. If shutdown or reactivate
@ -177,6 +195,11 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
super(key, policyProvider, childConfig, initialPicker);
}
@Override
protected ChildLbStateHelper createChildHelper() {
return new ClusterManagerChildHelper();
}
@Override
protected void shutdown() {
if (deletionTimer != null && deletionTimer.isPending()) {
@ -220,5 +243,24 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", getKey());
}
private class ClusterManagerChildHelper extends ChildLbStateHelper {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (getChildLbState(getKey()) == null) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
setCurrentState(newState);
setCurrentPicker(newPicker);
if (!isDeactivated() && !resolvingAddresses) {
updateOverallBalancingState();
}
}
}
}
}

View File

@ -39,10 +39,8 @@ import io.grpc.Status;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -69,12 +67,6 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer {
this.random = checkNotNull(random, "random");
}
@Override
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
throw new UnsupportedOperationException(
"LeastRequestLoadBalancer uses its ChildLbStates, not these child pickers directly");
}
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// Need to update choiceCount before calling super so that the updateBalancingState call has the
@ -154,18 +146,6 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer {
super.resolvingAddresses = newValue;
}
// Expose for tests in this package.
@Override
protected Collection<ChildLbState> getChildLbStates() {
return super.getChildLbStates();
}
// Expose for tests in this package.
@Override
protected ChildLbState getChildLbState(Object key) {
return super.getChildLbState(key);
}
// Expose for tests in this package.
private static AtomicInteger getInFlights(ChildLbState childLbState) {
return ((LeastRequestLbState)childLbState).activeRequests;

View File

@ -38,12 +38,10 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -87,19 +85,22 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
return addressValidityStatus;
}
AcceptResolvedAddressRetVal acceptRetVal;
try {
resolvingAddresses = true;
// Update the child list by creating-adding, updating addresses, and removing
acceptRetVal = super.acceptResolvedAddressesInternal(resolvedAddresses);
if (!acceptRetVal.status.isOk()) {
// Subclass handles any special manipulation to create appropriate types of ChildLbStates
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
if (newChildren.isEmpty()) {
addressValidityStatus = Status.UNAVAILABLE.withDescription(
"Ring hash lb error: EDS resolution was successful, but was not accepted by base class"
+ " (" + acceptRetVal.status + ")");
"Ring hash lb error: EDS resolution was successful, but there were no valid addresses");
handleNameResolutionError(addressValidityStatus);
return addressValidityStatus;
}
// We don't care about reuse because we don't want to activate them
addMissingChildrenAndIdReuse(newChildren);
updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
// Now do the ringhash specific logic with weights and building the ring
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
@ -143,7 +144,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
// clusters and resolver can remove them in service config.
updateOverallBalancingState();
shutdownRemoved(acceptRetVal.removedChildren);
shutdownRemoved(getRemovedChildren(newChildren.keySet()));
} finally {
this.resolvingAddresses = false;
}
@ -151,6 +152,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
return Status.OK;
}
/**
* Updates the overall balancing state by aggregating the connectivity states of all subchannels.
*
@ -223,16 +225,6 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
this.currentConnectivityState = overallState;
}
@Override
protected boolean reconnectOnIdle() {
return false;
}
@Override
protected boolean reactivateChildOnReuse() {
return false;
}
@Override
protected ChildLbState createChildLbState(Object key, Object policyConfig,
SubchannelPicker initialPicker, ResolvedAddresses resolvedAddresses) {
@ -446,11 +438,6 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
}
@Override
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
throw new UnsupportedOperationException("Not used by RingHash");
}
/**
* An unmodifiable view of a subchannel with state not subject to its real connectivity
* state changes.
@ -505,29 +492,17 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
}
}
static Set<EquivalentAddressGroup> getStrippedChildEags(Collection<ChildLbState> states) {
return states.stream()
.map(ChildLbState::getEag)
.map(RingHashLoadBalancer::stripAttrs)
.collect(Collectors.toSet());
}
@Override
protected Collection<ChildLbState> getChildLbStates() {
return super.getChildLbStates();
}
@Override
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
return super.getChildLbStateEag(eag);
}
class RingHashChildLbState extends MultiChildLoadBalancer.ChildLbState {
public RingHashChildLbState(Endpoint key, ResolvedAddresses resolvedAddresses) {
super(key, pickFirstLbProvider, null, EMPTY_PICKER, resolvedAddresses, true);
}
@Override
protected ChildLbStateHelper createChildHelper() {
return new RingHashChildHelper();
}
@Override
protected void reactivate(LoadBalancerProvider policyProvider) {
if (!isDeactivated()) {
@ -550,11 +525,25 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
super.shutdown();
}
// Need to expose this to the LB class
@Override
protected GracefulSwitchLoadBalancer getLb() {
return super.getLb();
}
private class RingHashChildHelper extends ChildLbStateHelper {
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
// If we are already in the process of resolving addresses, the overall balancing state
// will be updated at the end of it, and we don't need to trigger that update here.
if (getChildLbState(getKey()) == null) {
return;
}
// Subchannel picker and state are saved, but will only be propagated to the channel
// when the child instance exits deactivated state.
setCurrentState(newState);
setCurrentPicker(newPicker);
if (!isDeactivated() && !resolvingAddresses) {
updateOverallBalancingState();
}
}
}
}
}

View File

@ -115,7 +115,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
}
config =
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
AcceptResolvedAddressRetVal acceptRetVal;
AcceptResolvedAddrRetVal acceptRetVal;
try {
resolvingAddresses = true;
acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
@ -148,12 +148,6 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
}
// Expose for tests in this package.
@Override
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
return super.getChildLbStateEag(eag);
}
@VisibleForTesting
final class WeightedChildLbState extends ChildLbState {