mirror of https://github.com/grpc/grpc-java.git
util:MultiChildLoadBalancer cleanup (#10780)
* add final, change method permissions, add javadoc, cleanup unneeded, move updateOverallBalancingState to ClusterManagerLB and make it abstract * Restructure to eliminate the flags as protected methods * Move methods around so that the candidates for override are near the top. * Reorder picker methods lower
This commit is contained in:
parent
f20c853c40
commit
044749706a
|
|
@ -44,6 +44,7 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
|
|
@ -71,51 +72,12 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
logger.log(Level.FINE, "Created");
|
||||
}
|
||||
|
||||
protected abstract SubchannelPicker getSubchannelPicker(
|
||||
Map<Object, SubchannelPicker> childPickers);
|
||||
|
||||
protected SubchannelPicker getInitialPicker() {
|
||||
return new FixedResultPicker(PickResult.withNoResult());
|
||||
}
|
||||
|
||||
protected SubchannelPicker getErrorPicker(Status error) {
|
||||
return new FixedResultPicker(PickResult.withError(error));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generally, the only reason to override this is to expose it to a test of a LB in a different
|
||||
* package.
|
||||
* Using the state of all children will calculate the current connectivity state,
|
||||
* update fields, generate a picker and then call
|
||||
* {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
|
||||
*/
|
||||
protected ImmutableMap<Object, ChildLbState> getImmutableChildMap() {
|
||||
return ImmutableMap.copyOf(childLbStates);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Collection<ChildLbState> getChildLbStates() {
|
||||
return childLbStates.values();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generally, the only reason to override this is to expose it to a test of a LB in a
|
||||
* different package.
|
||||
*/
|
||||
protected ChildLbState getChildLbState(Object key) {
|
||||
if (key == null) {
|
||||
return null;
|
||||
}
|
||||
if (key instanceof EquivalentAddressGroup) {
|
||||
key = new Endpoint((EquivalentAddressGroup) key);
|
||||
}
|
||||
return childLbStates.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generally, the only reason to override this is to expose it to a test of a LB in a different
|
||||
* package.
|
||||
*/
|
||||
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
|
||||
return getChildLbState(new Endpoint(eag));
|
||||
}
|
||||
protected abstract void updateOverallBalancingState();
|
||||
|
||||
/**
|
||||
* Override to utilize parsing of the policy configuration or alternative helper/lb generation.
|
||||
|
|
@ -153,8 +115,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
resolvingAddresses = true;
|
||||
|
||||
// process resolvedAddresses to update children
|
||||
AcceptResolvedAddressRetVal acceptRetVal =
|
||||
acceptResolvedAddressesInternal(resolvedAddresses);
|
||||
AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
|
||||
if (!acceptRetVal.status.isOk()) {
|
||||
return acceptRetVal.status;
|
||||
}
|
||||
|
|
@ -206,66 +167,10 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
|
||||
/**
|
||||
* This does the work to update the child map and calculate which children have been removed.
|
||||
* You must call {@link #updateOverallBalancingState} to update the picker
|
||||
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
|
||||
*/
|
||||
protected AcceptResolvedAddressRetVal acceptResolvedAddressesInternal(
|
||||
ResolvedAddresses resolvedAddresses) {
|
||||
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
|
||||
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
|
||||
|
||||
if (newChildren.isEmpty()) {
|
||||
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
|
||||
"NameResolver returned no usable address. " + resolvedAddresses);
|
||||
handleNameResolutionError(unavailableStatus);
|
||||
return new AcceptResolvedAddressRetVal(unavailableStatus, null);
|
||||
}
|
||||
|
||||
// Do adds and updates
|
||||
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
|
||||
final Object key = entry.getKey();
|
||||
LoadBalancerProvider childPolicyProvider = entry.getValue().getPolicyProvider();
|
||||
Object childConfig = entry.getValue().getConfig();
|
||||
if (!childLbStates.containsKey(key)) {
|
||||
childLbStates.put(key, entry.getValue());
|
||||
} else {
|
||||
// Reuse the existing one
|
||||
ChildLbState existingChildLbState = childLbStates.get(key);
|
||||
if (existingChildLbState.isDeactivated() && reactivateChildOnReuse()) {
|
||||
existingChildLbState.reactivate(childPolicyProvider);
|
||||
}
|
||||
}
|
||||
|
||||
ChildLbState childLbState = childLbStates.get(key);
|
||||
ResolvedAddresses childAddresses = getChildAddresses(key, resolvedAddresses, childConfig);
|
||||
childLbStates.get(key).setResolvedAddresses(childAddresses); // update child
|
||||
if (!childLbState.deactivated) {
|
||||
childLbState.lb.handleResolvedAddresses(childAddresses); // update child LB
|
||||
}
|
||||
}
|
||||
|
||||
List<ChildLbState> removedChildren = new ArrayList<>();
|
||||
// Do removals
|
||||
for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
|
||||
if (!newChildren.containsKey(key)) {
|
||||
ChildLbState childLbState = childLbStates.get(key);
|
||||
childLbState.deactivate();
|
||||
removedChildren.add(childLbState);
|
||||
}
|
||||
}
|
||||
|
||||
return new AcceptResolvedAddressRetVal(Status.OK, removedChildren);
|
||||
}
|
||||
|
||||
protected void shutdownRemoved(List<ChildLbState> removedChildren) {
|
||||
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
|
||||
// subchannel that has been shutdown.
|
||||
for (ChildLbState childLbState : removedChildren) {
|
||||
childLbState.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
* Handle the name resolution error.
|
||||
*
|
||||
* <p/>Override if you need special handling.
|
||||
*/
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
if (currentConnectivityState != READY) {
|
||||
|
|
@ -273,26 +178,31 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the name resolution error only for the specified child.
|
||||
*
|
||||
* <p/>Override if you need special handling.
|
||||
*/
|
||||
protected void handleNameResolutionError(ChildLbState child, Status error) {
|
||||
child.lb.handleNameResolutionError(error);
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, then when a subchannel state changes to idle, the corresponding child will
|
||||
* have requestConnection called on its LB. Also causes the PickFirstLB to be created when
|
||||
* the child is created or reused.
|
||||
* Creates a picker representing the state before any connections have been established.
|
||||
*
|
||||
* <p/>Override to produce a custom picker.
|
||||
*/
|
||||
protected boolean reconnectOnIdle() {
|
||||
return true;
|
||||
protected SubchannelPicker getInitialPicker() {
|
||||
return new FixedResultPicker(PickResult.withNoResult());
|
||||
}
|
||||
|
||||
/**
|
||||
* If true, then when {@link #acceptResolvedAddresses} sees a key that was already part of the
|
||||
* child map which is deactivated, it will call reactivate on the child.
|
||||
* If false, it will leave it deactivated.
|
||||
* Creates a new picker representing an error status.
|
||||
*
|
||||
* <p/>Override to produce a custom picker when there are errors.
|
||||
*/
|
||||
protected boolean reactivateChildOnReuse() {
|
||||
return true;
|
||||
protected SubchannelPicker getErrorPicker(Status error) {
|
||||
return new FixedResultPicker(PickResult.withError(error));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -304,20 +214,93 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
childLbStates.clear();
|
||||
}
|
||||
|
||||
protected void updateOverallBalancingState() {
|
||||
ConnectivityState overallState = null;
|
||||
final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
|
||||
for (ChildLbState childLbState : getChildLbStates()) {
|
||||
if (childLbState.deactivated) {
|
||||
continue;
|
||||
}
|
||||
childPickers.put(childLbState.key, childLbState.currentPicker);
|
||||
overallState = aggregateState(overallState, childLbState.currentState);
|
||||
/**
|
||||
* This does the work to update the child map and calculate which children have been removed.
|
||||
* You must call {@link #updateOverallBalancingState} to update the picker
|
||||
* and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
|
||||
*/
|
||||
protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
|
||||
ResolvedAddresses resolvedAddresses) {
|
||||
logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
|
||||
|
||||
// Subclass handles any special manipulation to create appropriate types of keyed ChildLbStates
|
||||
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
|
||||
|
||||
// Handle error case
|
||||
if (newChildren.isEmpty()) {
|
||||
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
|
||||
"NameResolver returned no usable address. " + resolvedAddresses);
|
||||
handleNameResolutionError(unavailableStatus);
|
||||
return new AcceptResolvedAddrRetVal(unavailableStatus, null);
|
||||
}
|
||||
|
||||
if (overallState != null) {
|
||||
helper.updateBalancingState(overallState, getSubchannelPicker(childPickers));
|
||||
currentConnectivityState = overallState;
|
||||
Collection<ChildLbState> reusedChildren = addMissingChildrenAndIdReuse(newChildren);
|
||||
|
||||
// Raactivate deactivated children
|
||||
for (ChildLbState reusedChild : reusedChildren) {
|
||||
reusedChild.reactivate(reusedChild.getPolicyProvider());
|
||||
}
|
||||
|
||||
updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
|
||||
|
||||
return new AcceptResolvedAddrRetVal(Status.OK, getRemovedChildren(newChildren.keySet()));
|
||||
}
|
||||
|
||||
protected final Collection<ChildLbState> addMissingChildrenAndIdReuse(
|
||||
Map<Object, ChildLbState> newChildren) {
|
||||
Collection<ChildLbState> reusedChildren = new ArrayList<>();
|
||||
|
||||
// Do adds and identify reused children
|
||||
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
|
||||
final Object key = entry.getKey();
|
||||
if (!childLbStates.containsKey(key)) {
|
||||
childLbStates.put(key, entry.getValue());
|
||||
} else {
|
||||
// Reuse the existing one
|
||||
ChildLbState existingChildLbState = childLbStates.get(key);
|
||||
if (existingChildLbState.isDeactivated()) {
|
||||
reusedChildren.add(existingChildLbState);
|
||||
}
|
||||
}
|
||||
}
|
||||
return reusedChildren;
|
||||
}
|
||||
|
||||
protected final void updateChildrenWithResolvedAddresses(ResolvedAddresses resolvedAddresses,
|
||||
Map<Object, ChildLbState> newChildren) {
|
||||
for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
|
||||
Object childConfig = entry.getValue().getConfig();
|
||||
ChildLbState childLbState = childLbStates.get(entry.getKey());
|
||||
ResolvedAddresses childAddresses =
|
||||
getChildAddresses(entry.getKey(), resolvedAddresses, childConfig);
|
||||
childLbState.setResolvedAddresses(childAddresses); // update child
|
||||
if (!childLbState.deactivated) {
|
||||
childLbState.lb.handleResolvedAddresses(childAddresses); // update child LB
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Identifies which children have been removed (are not part of the newChildKeys).
|
||||
*/
|
||||
protected final List<ChildLbState> getRemovedChildren(Set<Object> newChildKeys) {
|
||||
List<ChildLbState> removedChildren = new ArrayList<>();
|
||||
// Do removals
|
||||
for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
|
||||
if (!newChildKeys.contains(key)) {
|
||||
ChildLbState childLbState = childLbStates.get(key);
|
||||
childLbState.deactivate();
|
||||
removedChildren.add(childLbState);
|
||||
}
|
||||
}
|
||||
return removedChildren;
|
||||
}
|
||||
|
||||
protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
|
||||
// Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
|
||||
// subchannel that has been shutdown.
|
||||
for (ChildLbState childLbState : removedChildren) {
|
||||
childLbState.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -339,18 +322,44 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
return overallState;
|
||||
}
|
||||
|
||||
protected Helper getHelper() {
|
||||
protected final Helper getHelper() {
|
||||
return helper;
|
||||
}
|
||||
|
||||
protected void removeChild(Object key) {
|
||||
protected final void removeChild(Object key) {
|
||||
childLbStates.remove(key);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final ImmutableMap<Object, ChildLbState> getImmutableChildMap() {
|
||||
return ImmutableMap.copyOf(childLbStates);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final Collection<ChildLbState> getChildLbStates() {
|
||||
return childLbStates.values();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final ChildLbState getChildLbState(Object key) {
|
||||
if (key == null) {
|
||||
return null;
|
||||
}
|
||||
if (key instanceof EquivalentAddressGroup) {
|
||||
key = new Endpoint((EquivalentAddressGroup) key);
|
||||
}
|
||||
return childLbStates.get(key);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
|
||||
return getChildLbState(new Endpoint(eag));
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters out non-ready and deactivated child load balancers (subchannels).
|
||||
*/
|
||||
protected List<ChildLbState> getReadyChildren() {
|
||||
protected final List<ChildLbState> getReadyChildren() {
|
||||
List<ChildLbState> activeChildren = new ArrayList<>();
|
||||
for (ChildLbState child : getChildLbStates()) {
|
||||
if (!child.isDeactivated() && child.getCurrentState() == READY) {
|
||||
|
|
@ -396,7 +405,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
this.deactivated = deactivated;
|
||||
this.currentPicker = initialPicker;
|
||||
this.config = childConfig;
|
||||
this.lb = new GracefulSwitchLoadBalancer(new ChildLbStateHelper());
|
||||
this.lb = new GracefulSwitchLoadBalancer(createChildHelper());
|
||||
this.currentState = deactivated ? IDLE : CONNECTING;
|
||||
this.resolvedAddresses = resolvedAddrs;
|
||||
if (!deactivated) {
|
||||
|
|
@ -404,68 +413,8 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Address = " + key
|
||||
+ ", state = " + currentState
|
||||
+ ", picker type: " + currentPicker.getClass()
|
||||
+ ", lb: " + lb.delegate().getClass()
|
||||
+ (deactivated ? ", deactivated" : "");
|
||||
}
|
||||
|
||||
public Object getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
Object getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
protected GracefulSwitchLoadBalancer getLb() {
|
||||
return lb;
|
||||
}
|
||||
|
||||
public LoadBalancerProvider getPolicyProvider() {
|
||||
return policyProvider;
|
||||
}
|
||||
|
||||
protected Subchannel getSubchannels(PickSubchannelArgs args) {
|
||||
if (getCurrentPicker() == null) {
|
||||
return null;
|
||||
}
|
||||
return getCurrentPicker().pickSubchannel(args).getSubchannel();
|
||||
}
|
||||
|
||||
public ConnectivityState getCurrentState() {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
public SubchannelPicker getCurrentPicker() {
|
||||
return currentPicker;
|
||||
}
|
||||
|
||||
public EquivalentAddressGroup getEag() {
|
||||
if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return resolvedAddresses.getAddresses().get(0);
|
||||
}
|
||||
|
||||
public boolean isDeactivated() {
|
||||
return deactivated;
|
||||
}
|
||||
|
||||
protected void setDeactivated() {
|
||||
deactivated = true;
|
||||
}
|
||||
|
||||
protected void markReactivated() {
|
||||
deactivated = false;
|
||||
}
|
||||
|
||||
protected void setResolvedAddresses(ResolvedAddresses newAddresses) {
|
||||
checkNotNull(newAddresses, "Missing address list for child");
|
||||
resolvedAddresses = newAddresses;
|
||||
protected ChildLbStateHelper createChildHelper() {
|
||||
return new ChildLbStateHelper();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -497,12 +446,94 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
deactivated = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override for unique behavior such as delayed shutdowns of subchannels.
|
||||
*/
|
||||
protected void shutdown() {
|
||||
lb.shutdown();
|
||||
this.currentState = SHUTDOWN;
|
||||
logger.log(Level.FINE, "Child balancer {0} deleted", key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Address = " + key
|
||||
+ ", state = " + currentState
|
||||
+ ", picker type: " + currentPicker.getClass()
|
||||
+ ", lb: " + lb.delegate().getClass()
|
||||
+ (deactivated ? ", deactivated" : "");
|
||||
}
|
||||
|
||||
public final Object getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final GracefulSwitchLoadBalancer getLb() {
|
||||
return lb;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final SubchannelPicker getCurrentPicker() {
|
||||
return currentPicker;
|
||||
}
|
||||
|
||||
protected final LoadBalancerProvider getPolicyProvider() {
|
||||
return policyProvider;
|
||||
}
|
||||
|
||||
protected final Subchannel getSubchannels(PickSubchannelArgs args) {
|
||||
if (getCurrentPicker() == null) {
|
||||
return null;
|
||||
}
|
||||
return getCurrentPicker().pickSubchannel(args).getSubchannel();
|
||||
}
|
||||
|
||||
public final ConnectivityState getCurrentState() {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
protected final void setCurrentState(ConnectivityState newState) {
|
||||
currentState = newState;
|
||||
}
|
||||
|
||||
protected final void setCurrentPicker(SubchannelPicker newPicker) {
|
||||
currentPicker = newPicker;
|
||||
}
|
||||
|
||||
public final EquivalentAddressGroup getEag() {
|
||||
if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return resolvedAddresses.getAddresses().get(0);
|
||||
}
|
||||
|
||||
public final boolean isDeactivated() {
|
||||
return deactivated;
|
||||
}
|
||||
|
||||
protected final void setDeactivated() {
|
||||
deactivated = true;
|
||||
}
|
||||
|
||||
protected final void markReactivated() {
|
||||
deactivated = false;
|
||||
}
|
||||
|
||||
protected final void setResolvedAddresses(ResolvedAddresses newAddresses) {
|
||||
checkNotNull(newAddresses, "Missing address list for child");
|
||||
resolvedAddresses = newAddresses;
|
||||
}
|
||||
|
||||
private Object getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final ResolvedAddresses getResolvedAddresses() {
|
||||
return resolvedAddresses;
|
||||
}
|
||||
|
||||
/**
|
||||
* ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
|
||||
* petiole policy above and the PickFirstLoadBalancer's helper below.
|
||||
|
|
@ -510,13 +541,14 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
* <p>The ChildLbState updates happen during updateBalancingState. Otherwise, it is doing
|
||||
* simple forwarding.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public ResolvedAddresses getResolvedAddresses() {
|
||||
return resolvedAddresses;
|
||||
}
|
||||
|
||||
private final class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
|
||||
protected class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
|
||||
|
||||
/**
|
||||
* Update current state and picker for this child and then use
|
||||
* {@link #updateOverallBalancingState()} for the parent LB.
|
||||
*
|
||||
* <p/>Override this if you don't want to automatically request a connection when in IDLE
|
||||
*/
|
||||
@Override
|
||||
public void updateBalancingState(final ConnectivityState newState,
|
||||
final SubchannelPicker newPicker) {
|
||||
|
|
@ -525,12 +557,13 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
if (!childLbStates.containsKey(key)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
currentState = newState;
|
||||
currentPicker = newPicker;
|
||||
if (!deactivated && !resolvingAddresses) {
|
||||
if (newState == IDLE && reconnectOnIdle()) {
|
||||
if (newState == IDLE) {
|
||||
lb.requestConnection();
|
||||
}
|
||||
updateOverallBalancingState();
|
||||
|
|
@ -598,11 +631,11 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
protected static class AcceptResolvedAddressRetVal {
|
||||
protected static class AcceptResolvedAddrRetVal {
|
||||
public final Status status;
|
||||
public final List<ChildLbState> removedChildren;
|
||||
|
||||
public AcceptResolvedAddressRetVal(Status status, List<ChildLbState> removedChildren) {
|
||||
public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
|
||||
this.status = status;
|
||||
this.removedChildren = removedChildren;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
|
@ -51,11 +50,6 @@ public class RoundRobinLoadBalancer extends MultiChildLoadBalancer {
|
|||
super(helper);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
|
||||
throw new UnsupportedOperationException(); // local updateOverallBalancingState doesn't use this
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates picker with the list of active subchannels (state == READY).
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.Status;
|
||||
|
|
@ -103,7 +104,7 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
resolvingAddresses = true;
|
||||
|
||||
// process resolvedAddresses to update children
|
||||
AcceptResolvedAddressRetVal acceptRetVal =
|
||||
AcceptResolvedAddrRetVal acceptRetVal =
|
||||
acceptResolvedAddressesInternal(resolvedAddresses);
|
||||
if (!acceptRetVal.status.isOk()) {
|
||||
return acceptRetVal.status;
|
||||
|
|
@ -118,7 +119,29 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using the state of all children will calculate the current connectivity state,
|
||||
* update currentConnectivityState, generate a picker and then call
|
||||
* {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
|
||||
*/
|
||||
@Override
|
||||
protected void updateOverallBalancingState() {
|
||||
ConnectivityState overallState = null;
|
||||
final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
|
||||
for (ChildLbState childLbState : getChildLbStates()) {
|
||||
if (childLbState.isDeactivated()) {
|
||||
continue;
|
||||
}
|
||||
childPickers.put(childLbState.getKey(), childLbState.getCurrentPicker());
|
||||
overallState = aggregateState(overallState, childLbState.getCurrentState());
|
||||
}
|
||||
|
||||
if (overallState != null) {
|
||||
getHelper().updateBalancingState(overallState, getSubchannelPicker(childPickers));
|
||||
currentConnectivityState = overallState;
|
||||
}
|
||||
}
|
||||
|
||||
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
|
||||
return new SubchannelPicker() {
|
||||
@Override
|
||||
|
|
@ -157,11 +180,6 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean reconnectOnIdle() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* This differs from the base class in the use of the deletion timer. When it is deactivated,
|
||||
* rather than immediately calling shutdown it starts a timer. If shutdown or reactivate
|
||||
|
|
@ -177,6 +195,11 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
super(key, policyProvider, childConfig, initialPicker);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChildLbStateHelper createChildHelper() {
|
||||
return new ClusterManagerChildHelper();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shutdown() {
|
||||
if (deletionTimer != null && deletionTimer.isPending()) {
|
||||
|
|
@ -220,5 +243,24 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", getKey());
|
||||
}
|
||||
|
||||
private class ClusterManagerChildHelper extends ChildLbStateHelper {
|
||||
@Override
|
||||
public void updateBalancingState(final ConnectivityState newState,
|
||||
final SubchannelPicker newPicker) {
|
||||
// If we are already in the process of resolving addresses, the overall balancing state
|
||||
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||
if (getChildLbState(getKey()) == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
setCurrentState(newState);
|
||||
setCurrentPicker(newPicker);
|
||||
if (!isDeactivated() && !resolvingAddresses) {
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,10 +39,8 @@ import io.grpc.Status;
|
|||
import io.grpc.util.MultiChildLoadBalancer;
|
||||
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
|
|
@ -69,12 +67,6 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer {
|
|||
this.random = checkNotNull(random, "random");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
|
||||
throw new UnsupportedOperationException(
|
||||
"LeastRequestLoadBalancer uses its ChildLbStates, not these child pickers directly");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
// Need to update choiceCount before calling super so that the updateBalancingState call has the
|
||||
|
|
@ -154,18 +146,6 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer {
|
|||
super.resolvingAddresses = newValue;
|
||||
}
|
||||
|
||||
// Expose for tests in this package.
|
||||
@Override
|
||||
protected Collection<ChildLbState> getChildLbStates() {
|
||||
return super.getChildLbStates();
|
||||
}
|
||||
|
||||
// Expose for tests in this package.
|
||||
@Override
|
||||
protected ChildLbState getChildLbState(Object key) {
|
||||
return super.getChildLbState(key);
|
||||
}
|
||||
|
||||
// Expose for tests in this package.
|
||||
private static AtomicInteger getInFlights(ChildLbState childLbState) {
|
||||
return ((LeastRequestLbState)childLbState).activeRequests;
|
||||
|
|
|
|||
|
|
@ -38,12 +38,10 @@ import io.grpc.LoadBalancer;
|
|||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.util.MultiChildLoadBalancer;
|
||||
import io.grpc.xds.XdsLogger.XdsLogLevel;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -87,19 +85,22 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
return addressValidityStatus;
|
||||
}
|
||||
|
||||
AcceptResolvedAddressRetVal acceptRetVal;
|
||||
try {
|
||||
resolvingAddresses = true;
|
||||
// Update the child list by creating-adding, updating addresses, and removing
|
||||
acceptRetVal = super.acceptResolvedAddressesInternal(resolvedAddresses);
|
||||
if (!acceptRetVal.status.isOk()) {
|
||||
// Subclass handles any special manipulation to create appropriate types of ChildLbStates
|
||||
Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
|
||||
|
||||
if (newChildren.isEmpty()) {
|
||||
addressValidityStatus = Status.UNAVAILABLE.withDescription(
|
||||
"Ring hash lb error: EDS resolution was successful, but was not accepted by base class"
|
||||
+ " (" + acceptRetVal.status + ")");
|
||||
"Ring hash lb error: EDS resolution was successful, but there were no valid addresses");
|
||||
handleNameResolutionError(addressValidityStatus);
|
||||
return addressValidityStatus;
|
||||
}
|
||||
|
||||
// We don't care about reuse because we don't want to activate them
|
||||
addMissingChildrenAndIdReuse(newChildren);
|
||||
updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
|
||||
|
||||
// Now do the ringhash specific logic with weights and building the ring
|
||||
RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
if (config == null) {
|
||||
|
|
@ -143,7 +144,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
// clusters and resolver can remove them in service config.
|
||||
updateOverallBalancingState();
|
||||
|
||||
shutdownRemoved(acceptRetVal.removedChildren);
|
||||
shutdownRemoved(getRemovedChildren(newChildren.keySet()));
|
||||
} finally {
|
||||
this.resolvingAddresses = false;
|
||||
}
|
||||
|
|
@ -151,6 +152,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
return Status.OK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Updates the overall balancing state by aggregating the connectivity states of all subchannels.
|
||||
*
|
||||
|
|
@ -223,16 +225,6 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
this.currentConnectivityState = overallState;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean reconnectOnIdle() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean reactivateChildOnReuse() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChildLbState createChildLbState(Object key, Object policyConfig,
|
||||
SubchannelPicker initialPicker, ResolvedAddresses resolvedAddresses) {
|
||||
|
|
@ -446,11 +438,6 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
|
||||
throw new UnsupportedOperationException("Not used by RingHash");
|
||||
}
|
||||
|
||||
/**
|
||||
* An unmodifiable view of a subchannel with state not subject to its real connectivity
|
||||
* state changes.
|
||||
|
|
@ -505,29 +492,17 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
static Set<EquivalentAddressGroup> getStrippedChildEags(Collection<ChildLbState> states) {
|
||||
return states.stream()
|
||||
.map(ChildLbState::getEag)
|
||||
.map(RingHashLoadBalancer::stripAttrs)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ChildLbState> getChildLbStates() {
|
||||
return super.getChildLbStates();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
|
||||
return super.getChildLbStateEag(eag);
|
||||
}
|
||||
|
||||
class RingHashChildLbState extends MultiChildLoadBalancer.ChildLbState {
|
||||
|
||||
public RingHashChildLbState(Endpoint key, ResolvedAddresses resolvedAddresses) {
|
||||
super(key, pickFirstLbProvider, null, EMPTY_PICKER, resolvedAddresses, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChildLbStateHelper createChildHelper() {
|
||||
return new RingHashChildHelper();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reactivate(LoadBalancerProvider policyProvider) {
|
||||
if (!isDeactivated()) {
|
||||
|
|
@ -550,11 +525,25 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
super.shutdown();
|
||||
}
|
||||
|
||||
// Need to expose this to the LB class
|
||||
@Override
|
||||
protected GracefulSwitchLoadBalancer getLb() {
|
||||
return super.getLb();
|
||||
}
|
||||
private class RingHashChildHelper extends ChildLbStateHelper {
|
||||
@Override
|
||||
public void updateBalancingState(final ConnectivityState newState,
|
||||
final SubchannelPicker newPicker) {
|
||||
// If we are already in the process of resolving addresses, the overall balancing state
|
||||
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||
if (getChildLbState(getKey()) == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
setCurrentState(newState);
|
||||
setCurrentPicker(newPicker);
|
||||
if (!isDeactivated() && !resolvingAddresses) {
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
}
|
||||
config =
|
||||
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
AcceptResolvedAddressRetVal acceptRetVal;
|
||||
AcceptResolvedAddrRetVal acceptRetVal;
|
||||
try {
|
||||
resolvingAddresses = true;
|
||||
acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
|
||||
|
|
@ -148,12 +148,6 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
|||
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence);
|
||||
}
|
||||
|
||||
// Expose for tests in this package.
|
||||
@Override
|
||||
protected ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
|
||||
return super.getChildLbStateEag(eag);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
final class WeightedChildLbState extends ChildLbState {
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue