mirror of https://github.com/grpc/grpc-java.git
xds: Swap RingHashLb to use lazy child, instead of deactivation
This commit is contained in:
parent
8a21afcc9e
commit
61bf21e2a1
|
|
@ -238,7 +238,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
|
||||
// Raactivate deactivated children
|
||||
for (ChildLbState reusedChild : reusedChildren) {
|
||||
reusedChild.reactivate(reusedChild.getPolicyProvider());
|
||||
reusedChild.reactivate(reusedChild.getPolicyFactory());
|
||||
}
|
||||
|
||||
updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
|
||||
|
|
@ -388,20 +388,20 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
private final Object config;
|
||||
|
||||
private final GracefulSwitchLoadBalancer lb;
|
||||
private final LoadBalancerProvider policyProvider;
|
||||
private final LoadBalancer.Factory policyFactory;
|
||||
private ConnectivityState currentState;
|
||||
private SubchannelPicker currentPicker;
|
||||
private boolean deactivated;
|
||||
|
||||
public ChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
|
||||
public ChildLbState(Object key, LoadBalancer.Factory policyFactory, Object childConfig,
|
||||
SubchannelPicker initialPicker) {
|
||||
this(key, policyProvider, childConfig, initialPicker, null, false);
|
||||
this(key, policyFactory, childConfig, initialPicker, null, false);
|
||||
}
|
||||
|
||||
public ChildLbState(Object key, LoadBalancerProvider policyProvider, Object childConfig,
|
||||
public ChildLbState(Object key, LoadBalancer.Factory policyFactory, Object childConfig,
|
||||
SubchannelPicker initialPicker, ResolvedAddresses resolvedAddrs, boolean deactivated) {
|
||||
this.key = key;
|
||||
this.policyProvider = policyProvider;
|
||||
this.policyFactory = policyFactory;
|
||||
this.deactivated = deactivated;
|
||||
this.currentPicker = initialPicker;
|
||||
this.config = childConfig;
|
||||
|
|
@ -409,7 +409,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
this.currentState = deactivated ? IDLE : CONNECTING;
|
||||
this.resolvedAddresses = resolvedAddrs;
|
||||
if (!deactivated) {
|
||||
lb.switchTo(policyProvider);
|
||||
lb.switchTo(policyFactory);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -442,7 +442,7 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
* This base implementation does nothing but reset the flag. If you really want to both
|
||||
* deactivate and reactivate you should override them both.
|
||||
*/
|
||||
protected void reactivate(LoadBalancerProvider policyProvider) {
|
||||
protected void reactivate(LoadBalancer.Factory policyFactory) {
|
||||
deactivated = false;
|
||||
}
|
||||
|
||||
|
|
@ -478,8 +478,8 @@ public abstract class MultiChildLoadBalancer extends LoadBalancer {
|
|||
return currentPicker;
|
||||
}
|
||||
|
||||
protected final LoadBalancerProvider getPolicyProvider() {
|
||||
return policyProvider;
|
||||
protected final LoadBalancer.Factory getPolicyFactory() {
|
||||
return policyFactory;
|
||||
}
|
||||
|
||||
protected final Subchannel getSubchannels(PickSubchannelArgs args) {
|
||||
|
|
|
|||
|
|
@ -210,13 +210,13 @@ class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void reactivate(LoadBalancerProvider policyProvider) {
|
||||
protected void reactivate(Factory policyFactory) {
|
||||
if (deletionTimer != null && deletionTimer.isPending()) {
|
||||
deletionTimer.cancel();
|
||||
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", getKey());
|
||||
}
|
||||
|
||||
super.reactivate(policyProvider);
|
||||
super.reactivate(policyFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.util.ForwardingLoadBalancer;
|
||||
|
||||
/**
|
||||
* A load balancer that starts in IDLE instead of CONNECTING. Once it starts connecting, it
|
||||
* instantiates its delegate.
|
||||
*/
|
||||
final class LazyLoadBalancer extends ForwardingLoadBalancer {
|
||||
private LoadBalancer delegate;
|
||||
|
||||
public LazyLoadBalancer(Helper helper, LoadBalancer.Factory delegateFactory) {
|
||||
this.delegate = new LazyDelegate(helper, delegateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LoadBalancer delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
return delegate.acceptResolvedAddresses(resolvedAddresses);
|
||||
}
|
||||
|
||||
private final class LazyDelegate extends LoadBalancer {
|
||||
private final Helper helper;
|
||||
private final LoadBalancer.Factory delegateFactory;
|
||||
private ResolvedAddresses addresses;
|
||||
private Status error;
|
||||
private boolean updatedBalancingState;
|
||||
|
||||
public LazyDelegate(Helper helper, LoadBalancer.Factory delegateFactory) {
|
||||
this.helper = Preconditions.checkNotNull(helper, "helper");
|
||||
this.delegateFactory = Preconditions.checkNotNull(delegateFactory, "delegateFactory");
|
||||
}
|
||||
|
||||
private LoadBalancer activate() {
|
||||
if (delegate != this) {
|
||||
return delegate;
|
||||
}
|
||||
delegate = delegateFactory.newLoadBalancer(helper);
|
||||
if (addresses != null) {
|
||||
delegate.acceptResolvedAddresses(addresses);
|
||||
}
|
||||
if (error != null) {
|
||||
delegate.handleNameResolutionError(error);
|
||||
}
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
this.addresses = resolvedAddresses;
|
||||
this.error = null;
|
||||
initializeBalancingState();
|
||||
return Status.OK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
// Preserve addresses, because even old addresses may be used by the real policy
|
||||
this.error = error;
|
||||
initializeBalancingState();
|
||||
}
|
||||
|
||||
private void initializeBalancingState() {
|
||||
if (updatedBalancingState) {
|
||||
return;
|
||||
}
|
||||
helper.updateBalancingState(ConnectivityState.IDLE, new LazyPicker());
|
||||
updatedBalancingState = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestConnection() {
|
||||
activate().requestConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
|
||||
private final class LazyPicker extends SubchannelPicker {
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
helper.getSynchronizationContext().execute(LazyDelegate.this::activate);
|
||||
return PickResult.withNoResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestConnection() {
|
||||
helper.getSynchronizationContext().execute(LazyDelegate.this::requestConnection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static final class Factory extends LoadBalancer.Factory {
|
||||
private final LoadBalancer.Factory delegate;
|
||||
|
||||
public Factory(LoadBalancer.Factory delegate) {
|
||||
this.delegate = Preconditions.checkNotNull(delegate, "delegate");
|
||||
}
|
||||
|
||||
@Override public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
return new LazyLoadBalancer(helper, delegate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -35,7 +35,6 @@ import io.grpc.ConnectivityState;
|
|||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.util.MultiChildLoadBalancer;
|
||||
|
|
@ -66,6 +65,8 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
+ " config selector always generates a hash.");
|
||||
private static final XxHash64 hashFunc = XxHash64.INSTANCE;
|
||||
|
||||
private final LoadBalancer.Factory lazyLbFactory =
|
||||
new LazyLoadBalancer.Factory(pickFirstLbProvider);
|
||||
private final XdsLogger logger;
|
||||
private final SynchronizationContext syncContext;
|
||||
private List<RingEntry> ring;
|
||||
|
|
@ -229,8 +230,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
@Override
|
||||
protected ChildLbState createChildLbState(Object key, Object policyConfig,
|
||||
SubchannelPicker initialPicker, ResolvedAddresses resolvedAddresses) {
|
||||
return new RingHashChildLbState((Endpoint)key,
|
||||
getChildAddresses(key, resolvedAddresses, null));
|
||||
return new RingHashChildLbState((Endpoint)key);
|
||||
}
|
||||
|
||||
private Status validateAddrList(List<EquivalentAddressGroup> addrList) {
|
||||
|
|
@ -420,11 +420,7 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
|
||||
if (subchannelView.connectivityState == IDLE) {
|
||||
syncContext.execute(() -> {
|
||||
if (childLbState.isDeactivated()) {
|
||||
childLbState.activate();
|
||||
} else {
|
||||
childLbState.getLb().requestConnection();
|
||||
}
|
||||
childLbState.getLb().requestConnection();
|
||||
});
|
||||
|
||||
return PickResult.withNoResult(); // Indicates that this should be retried after backoff
|
||||
|
|
@ -495,8 +491,8 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
|
||||
class RingHashChildLbState extends MultiChildLoadBalancer.ChildLbState {
|
||||
|
||||
public RingHashChildLbState(Endpoint key, ResolvedAddresses resolvedAddresses) {
|
||||
super(key, pickFirstLbProvider, null, EMPTY_PICKER, resolvedAddresses, true);
|
||||
public RingHashChildLbState(Endpoint key) {
|
||||
super(key, lazyLbFactory, null, EMPTY_PICKER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -504,22 +500,6 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
return new RingHashChildHelper();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reactivate(LoadBalancerProvider policyProvider) {
|
||||
if (!isDeactivated()) {
|
||||
return;
|
||||
}
|
||||
currentConnectivityState = CONNECTING;
|
||||
getLb().switchTo(pickFirstLbProvider);
|
||||
markReactivated();
|
||||
getLb().acceptResolvedAddresses(this.getResolvedAddresses());
|
||||
logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", getKey());
|
||||
}
|
||||
|
||||
public void activate() {
|
||||
reactivate(pickFirstLbProvider);
|
||||
}
|
||||
|
||||
// Need to expose this to the LB class
|
||||
@Override
|
||||
protected void shutdown() {
|
||||
|
|
@ -530,22 +510,19 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
|
|||
@Override
|
||||
public void updateBalancingState(final ConnectivityState newState,
|
||||
final SubchannelPicker newPicker) {
|
||||
// Subchannel picker and state are saved, but will only be propagated to the channel
|
||||
// when the child instance exits deactivated state.
|
||||
setCurrentState(newState);
|
||||
setCurrentPicker(newPicker);
|
||||
|
||||
// If we are already in the process of resolving addresses, the overall balancing state
|
||||
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||
if (getChildLbState(getKey()) == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isDeactivated() && !resolvingAddresses) {
|
||||
// If we are already in the process of resolving addresses, the overall balancing state
|
||||
// will be updated at the end of it, and we don't need to trigger that update here.
|
||||
if (!resolvingAddresses) {
|
||||
updateOverallBalancingState();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -177,15 +177,11 @@ public class RingHashLoadBalancerTest {
|
|||
|
||||
RingHashChildLbState childLbState =
|
||||
(RingHashChildLbState) loadBalancer.getChildLbStates().iterator().next();
|
||||
assertThat(childLbState.isDeactivated()).isTrue();
|
||||
assertThat(subchannels.get(Collections.singletonList(childLbState.getEag()))).isNull();
|
||||
|
||||
// Picking subchannel triggers connection.
|
||||
PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid());
|
||||
pickerCaptor.getValue().pickSubchannel(args);
|
||||
assertThat(childLbState.isDeactivated()).isFalse();
|
||||
String expectedLbType = PickFirstLoadBalancerProvider.isEnabledNewPickFirst()
|
||||
? "PickFirstLeafLoadBalancer" : "PickFirstLoadBalancer";
|
||||
assertThat(childLbState.getLb().delegateType()).isEqualTo(expectedLbType);
|
||||
Subchannel subchannel = subchannels.get(Collections.singletonList(childLbState.getEag()));
|
||||
InOrder inOrder = Mockito.inOrder(helper, subchannel);
|
||||
inOrder.verify(subchannel).requestConnection();
|
||||
|
|
@ -423,7 +419,8 @@ public class RingHashLoadBalancerTest {
|
|||
assertThat(addressesAcceptanceStatus.isOk()).isTrue();
|
||||
|
||||
// Create subchannel for the first address
|
||||
((RingHashChildLbState)loadBalancer.getChildLbStateEag(servers.get(0))).activate();
|
||||
((RingHashChildLbState) loadBalancer.getChildLbStateEag(servers.get(0))).getCurrentPicker()
|
||||
.pickSubchannel(getDefaultPickSubchannelArgs(hashFunc.hashVoid()));
|
||||
verifyConnection(1);
|
||||
|
||||
reset(helper);
|
||||
|
|
@ -944,7 +941,8 @@ public class RingHashLoadBalancerTest {
|
|||
|
||||
// Activate them all to create the child LB and subchannel
|
||||
for (ChildLbState childLbState : loadBalancer.getChildLbStates()) {
|
||||
((RingHashChildLbState)childLbState).activate();
|
||||
childLbState.getCurrentPicker()
|
||||
.pickSubchannel(getDefaultPickSubchannelArgs(hashFunc.hashVoid()));
|
||||
assertThat(childLbState.getResolvedAddresses().getAttributes().get(IS_PETIOLE_POLICY))
|
||||
.isTrue();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue