xds: implement Fallback-at-Startup mode

This is the implementation of the Fallback-at-Startup mode in the design doc.

- The Fallback-After-Startup mode is not implemented.
- Drop related behavior is not implemented.
This commit is contained in:
ZHANG Dapeng 2019-05-22 17:44:39 -07:00 committed by GitHub
parent 7834a50525
commit 54bbd372ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 366 additions and 173 deletions

View File

@ -31,8 +31,6 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
@ -43,6 +41,7 @@ import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
import io.grpc.xds.XdsComms.Locality;
import io.grpc.xds.XdsComms.LocalityInfo;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -58,8 +57,6 @@ import java.util.Set;
// Must be accessed/run in SynchronizedContext.
interface LocalityStore {
boolean hasReadyBackends();
boolean hasNonDropBackends();
void reset();
@ -76,7 +73,6 @@ interface LocalityStore {
private final LoadBalancerProvider loadBalancerProvider;
private Map<Locality, LocalityLbInfo> localityMap = new HashMap<>();
private ConnectivityState overallState;
LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
this(helper, pickerFactoryImpl, lbRegistry);
@ -104,11 +100,6 @@ interface LocalityStore {
}
};
@Override
public boolean hasReadyBackends() {
return overallState == READY;
}
@Override
public boolean hasNonDropBackends() {
// TODO: impl
@ -196,32 +187,6 @@ interface LocalityStore {
}
private static final class ErrorPicker extends SubchannelPicker {
final Status error;
ErrorPicker(Status error) {
this.error = checkNotNull(error, "error");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
}
private static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
@Override
public String toString() {
return "BUFFER_PICKER";
}
};
private static ConnectivityState aggregateState(
ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
@ -268,7 +233,6 @@ interface LocalityStore {
}
updatePicker(overallState, childPickers);
this.overallState = overallState;
}
private void updatePicker(ConnectivityState state, List<WeightedChildPicker> childPickers) {
@ -278,7 +242,7 @@ interface LocalityStore {
if (state == TRANSIENT_FAILURE) {
picker = new ErrorPicker(Status.UNAVAILABLE); // TODO: more details in status
} else {
picker = BUFFER_PICKER;
picker = XdsSubchannelPickers.BUFFER_PICKER;
}
} else {
picker = pickerFactory.picker(childPickers);
@ -316,7 +280,7 @@ interface LocalityStore {
private final Locality locality;
private SubchannelPicker currentChildPicker = BUFFER_PICKER;
private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER;
private ConnectivityState currentChildState = null;
ChildHelper(Locality locality) {

View File

@ -181,7 +181,7 @@ final class XdsComms {
final StreamObserver<DiscoveryResponse> xdsResponseReader =
new StreamObserver<DiscoveryResponse>() {
boolean firstResponseReceived;
boolean firstEdsResponseReceived;
@Override
public void onNext(final DiscoveryResponse value) {
@ -190,10 +190,6 @@ final class XdsComms {
@Override
public void run() {
if (!firstResponseReceived) {
firstResponseReceived = true;
adsStreamCallback.onWorking();
}
String typeUrl = value.getTypeUrl();
if (EDS_TYPE_URL.equals(typeUrl)) {
// Assuming standard mode.
@ -205,9 +201,13 @@ final class XdsComms {
value.getResources(0).unpack(ClusterLoadAssignment.class);
} catch (InvalidProtocolBufferException | NullPointerException e) {
cancelRpc("Received invalid EDS response", e);
adsStreamCallback.onError();
return;
}
if (!firstEdsResponseReceived) {
firstEdsResponseReceived = true;
adsStreamCallback.onWorking();
}
List<LocalityLbEndpoints> localities = clusterLoadAssignment.getEndpointsList();
Map<Locality, LocalityInfo> localityEndpointsMapping = new LinkedHashMap<>();
for (LocalityLbEndpoints localityLbEndpoints : localities) {
@ -242,6 +242,7 @@ final class XdsComms {
new Runnable() {
@Override
public void run() {
// TODO: schedule retry
closed = true;
if (cancelled) {
return;
@ -249,7 +250,6 @@ final class XdsComms {
adsStreamCallback.onError();
}
});
// TODO: more impl
}
@Override

View File

@ -17,7 +17,9 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@ -25,6 +27,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import io.grpc.Attributes;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
@ -33,12 +36,15 @@ import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LocalityStore.LocalityStoreImpl;
import io.grpc.xds.XdsComms.AdsStreamCallback;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
/**
@ -59,15 +65,22 @@ final class XdsLoadBalancer extends LoadBalancer {
@Override
public void onWorking() {
fallbackManager.balancerWorking = true;
fallbackManager.cancelFallback();
if (fallbackManager.childPolicyHasBeenReady) {
// cancel Fallback-After-Startup timer if there's any
fallbackManager.cancelFallbackTimer();
}
fallbackManager.childBalancerWorked = true;
}
@Override
public void onError() {
// TODO: backoff and retry
fallbackManager.balancerWorking = false;
fallbackManager.maybeUseFallbackPolicy();
if (!fallbackManager.childBalancerWorked) {
// start Fallback-at-Startup immediately
fallbackManager.useFallbackPolicy();
} else if (fallbackManager.childPolicyHasBeenReady) {
// TODO: schedule a timer for Fallback-After-Startup
} // else: the Fallback-at-Startup timer is still pending, noop and wait
}
};
@ -76,16 +89,35 @@ final class XdsLoadBalancer extends LoadBalancer {
private LbConfig fallbackPolicy;
@VisibleForTesting
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, LocalityStore localityStore) {
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = helper;
this.lbRegistry = lbRegistry;
this.localityStore = localityStore;
this.localityStore = new LocalityStoreImpl(new LocalityStoreHelper(), lbRegistry);
fallbackManager = new FallbackManager(helper, localityStore, lbRegistry);
}
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) {
this(helper, lbRegistry, new LocalityStoreImpl(helper, lbRegistry));
private final class LocalityStoreHelper extends ForwardingLoadBalancerHelper {
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
if (newState == READY) {
checkState(
fallbackManager.childBalancerWorked,
"channel goes to READY before the load balancer even worked");
fallbackManager.childPolicyHasBeenReady = true;
fallbackManager.cancelFallback();
}
if (!fallbackManager.isInFallbackMode()) {
helper.updateBalancingState(newState, newPicker);
}
}
}
@Override
@ -103,7 +135,7 @@ final class XdsLoadBalancer extends LoadBalancer {
XdsConfig xdsConfig = (XdsConfig) cfg.getConfig();
fallbackPolicy = xdsConfig.fallbackPolicy;
fallbackManager.updateFallbackServers(servers, attributes, fallbackPolicy);
fallbackManager.maybeStartFallbackTimer();
fallbackManager.startFallbackTimer();
handleNewConfig(xdsConfig);
xdsLbState.handleResolvedAddressGroups(servers, attributes);
}
@ -117,7 +149,6 @@ final class XdsLoadBalancer extends LoadBalancer {
xdsComms = xdsLbState.shutdownAndReleaseXdsComms();
if (xdsComms != null) {
xdsComms.shutdownChannel();
fallbackManager.balancerWorking = false;
xdsComms = null;
}
} else if (!Objects.equal(
@ -128,7 +159,6 @@ final class XdsLoadBalancer extends LoadBalancer {
// close the stream but reuse the channel
if (xdsComms != null) {
xdsComms.shutdownLbRpc(cancelMessage);
fallbackManager.balancerWorking = false;
xdsComms.refreshAdsStream();
}
} else { // effectively no change in policy, keep xdsLbState unchanged
@ -150,32 +180,32 @@ final class XdsLoadBalancer extends LoadBalancer {
@Override
public void handleNameResolutionError(Status error) {
if (xdsLbState != null) {
if (fallbackManager.fallbackBalancer != null) {
fallbackManager.fallbackBalancer.handleNameResolutionError(error);
} else {
xdsLbState.handleNameResolutionError(error);
}
xdsLbState.handleNameResolutionError(error);
}
if (fallbackManager.isInFallbackMode()) {
fallbackManager.fallbackBalancer.handleNameResolutionError(error);
}
if (xdsLbState == null && !fallbackManager.isInFallbackMode()) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
// TODO: impl
// else {
// helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(error));
// }
}
/**
* This is only for the subchannel that is created by the the child/fallback balancer using the
* old API {@link LoadBalancer.Helper#createSubchannel(EquivalentAddressGroup, Attributes)} or
* {@link LoadBalancer.Helper#createSubchannel(List, Attributes)}. Otherwise, it either won't be
* called or won't have any effect.
*/
@Deprecated
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
// xdsLbState should never be null here since handleSubchannelState cannot be called while the
// lb is shutdown.
if (newState.getState() == SHUTDOWN) {
return;
}
if (fallbackManager.fallbackBalancer != null) {
if (fallbackManager.isInFallbackMode()) {
fallbackManager.fallbackBalancer.handleSubchannelState(subchannel, newState);
}
// xdsLbState should never be null here since handleSubchannelState cannot be called while the
// lb is shutdown.
xdsLbState.handleSubchannelState(subchannel, newState);
fallbackManager.maybeUseFallbackPolicy();
}
@Override
@ -215,14 +245,15 @@ final class XdsLoadBalancer extends LoadBalancer {
private LoadBalancer fallbackBalancer;
// Scheduled only once. Never reset.
@Nullable
@CheckForNull
private ScheduledHandle fallbackTimer;
private List<EquivalentAddressGroup> fallbackServers = ImmutableList.of();
private Attributes fallbackAttributes;
// allow value write by outer class
private boolean balancerWorking;
private boolean childBalancerWorked;
private boolean childPolicyHasBeenReady;
FallbackManager(
Helper helper, LocalityStore localityStore, LoadBalancerRegistry lbRegistry) {
@ -231,36 +262,67 @@ final class XdsLoadBalancer extends LoadBalancer {
this.lbRegistry = lbRegistry;
}
void cancelFallback() {
/**
* Fallback mode being on indicates that an update from child LBs will be ignored unless the
* update triggers turning off the fallback mode first.
*/
boolean isInFallbackMode() {
return fallbackBalancer != null;
}
void cancelFallbackTimer() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
}
}
void cancelFallback() {
cancelFallbackTimer();
if (fallbackBalancer != null) {
fallbackBalancer.shutdown();
fallbackBalancer = null;
}
}
void maybeUseFallbackPolicy() {
void useFallbackPolicy() {
if (fallbackBalancer != null) {
return;
}
if (balancerWorking || localityStore.hasReadyBackends()) {
return;
}
cancelFallbackTimer();
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Using fallback policy");
final class FallbackBalancerHelper extends ForwardingLoadBalancerHelper {
LoadBalancer balancer;
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
checkNotNull(balancer, "there is a bug");
if (balancer != fallbackBalancer) {
// ignore updates from a misbehaving shutdown fallback balancer
return;
}
super.updateBalancingState(newState, newPicker);
}
@Override
protected Helper delegate() {
return helper;
}
}
FallbackBalancerHelper fallbackBalancerHelper = new FallbackBalancerHelper();
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
.newLoadBalancer(helper);
.newLoadBalancer(fallbackBalancerHelper);
fallbackBalancerHelper.balancer = fallbackBalancer;
// TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
// TODO: maybe update picker here if still use the old API but not SubchannelStateListener
}
void updateFallbackServers(
@ -284,17 +346,17 @@ final class XdsLoadBalancer extends LoadBalancer {
} else {
fallbackBalancer.shutdown();
fallbackBalancer = null;
maybeUseFallbackPolicy();
useFallbackPolicy();
}
}
}
void maybeStartFallbackTimer() {
void startFallbackTimer() {
if (fallbackTimer == null) {
class FallbackTask implements Runnable {
@Override
public void run() {
maybeUseFallbackPolicy();
useFallbackPolicy();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status;
final class XdsSubchannelPickers {
private XdsSubchannelPickers() { /* DO NOT CALL ME */ }
static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withNoResult();
}
@Override
public String toString() {
return "BUFFER_PICKER";
}
};
static final class ErrorPicker extends SubchannelPicker {
private final Status error;
ErrorPicker(Status error) {
this.error = checkNotNull(error, "error");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("error", error)
.toString();
}
}
}

View File

@ -142,16 +142,18 @@ public class FallbackManagerTest {
@Test
public void useFallbackWhenTimeout() {
fallbackManager.maybeStartFallbackTimer();
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
assertThat(fallbackManager.isInFallbackMode()).isFalse();
verify(fakeFallbackLb, never())
.handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat(fallbackManager.isInFallbackMode()).isTrue();
verify(fakeFallbackLb).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(eags)
@ -166,7 +168,7 @@ public class FallbackManagerTest {
@Test
public void cancelFallback() {
fallbackManager.maybeStartFallbackTimer();
fallbackManager.startFallbackTimer();
List<EquivalentAddressGroup> eags = new ArrayList<>();
fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy);
@ -175,6 +177,7 @@ public class FallbackManagerTest {
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertThat(fallbackManager.isInFallbackMode()).isFalse();
verify(fakeFallbackLb, never())
.handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
}

View File

@ -17,11 +17,19 @@
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
import static io.grpc.xds.XdsLoadBalancer.STATE_INFO;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -29,19 +37,27 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
@ -55,10 +71,10 @@ import io.grpc.internal.JsonParser;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -80,7 +96,7 @@ public class XdsLoadBalancerTest {
@Mock
private Helper helper;
@Mock
private LoadBalancer fakeBalancer1;
private LoadBalancer fallbackBalancer1;
@Mock
private LoadBalancer fakeBalancer2;
private XdsLoadBalancer lb;
@ -90,6 +106,8 @@ public class XdsLoadBalancerTest {
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
private Helper fallbackHelper1;
private final LoadBalancerProvider lbProvider1 = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
@ -103,12 +121,13 @@ public class XdsLoadBalancerTest {
@Override
public String getPolicyName() {
return "supported_1";
return "fallback_1";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return fakeBalancer1;
fallbackHelper1 = helper;
return fallbackBalancer1;
}
};
@ -134,6 +153,29 @@ public class XdsLoadBalancerTest {
}
};
private final Locality localityProto1 = Locality.newBuilder()
.setRegion("region1").setZone("zone1").setSubZone("subzone1").build();
private final LbEndpoint endpoint11 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr11").setPortValue(11))))
.setLoadBalancingWeight(UInt32Value.of(11))
.build();
private final DiscoveryResponse edsResponse = DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.newBuilder()
.addEndpoints(LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.setLoadBalancingWeight(UInt32Value.of(1)))
.build()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
private Helper childHelper;
@Mock
private LoadBalancer childBalancer;
private final LoadBalancerProvider roundRobin = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
@ -152,7 +194,8 @@ public class XdsLoadBalancerTest {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return null;
childHelper = helper;
return childBalancer;
}
};
@ -164,9 +207,6 @@ public class XdsLoadBalancerTest {
}
});
@Mock
private LocalityStore fakeLocalityStore;
private ManagedChannel oobChannel1;
private ManagedChannel oobChannel2;
private ManagedChannel oobChannel3;
@ -179,7 +219,7 @@ public class XdsLoadBalancerTest {
lbRegistry.register(lbProvider1);
lbRegistry.register(lbProvider2);
lbRegistry.register(roundRobin);
lb = new XdsLoadBalancer(helper, lbRegistry, fakeLocalityStore);
lb = new XdsLoadBalancer(helper, lbRegistry);
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
@ -253,7 +293,7 @@ public class XdsLoadBalancerTest {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -275,7 +315,7 @@ public class XdsLoadBalancerTest {
lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -304,7 +344,7 @@ public class XdsLoadBalancerTest {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -322,8 +362,8 @@ public class XdsLoadBalancerTest {
lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -349,8 +389,8 @@ public class XdsLoadBalancerTest {
public void resolverEvent_customModeToStandardMode() throws Exception {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -371,7 +411,7 @@ public class XdsLoadBalancerTest {
lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -397,8 +437,8 @@ public class XdsLoadBalancerTest {
public void resolverEvent_customModeToCustomMode() throws Exception {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -417,8 +457,8 @@ public class XdsLoadBalancerTest {
lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_1\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"childPolicy\" : [{\"fallback_1\" : {\"key\" : \"val\"}}, {\"unfallback_1\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -444,7 +484,7 @@ public class XdsLoadBalancerTest {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -462,8 +502,8 @@ public class XdsLoadBalancerTest {
lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8443\","
+ "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"childPolicy\" : [{\"fallback_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -488,6 +528,34 @@ public class XdsLoadBalancerTest {
verifyNoMoreInteractions(oobChannel3);
}
@Test
public void resolutionErrorAtStartup() {
lb.handleNameResolutionError(Status.UNAVAILABLE);
assertNull(childHelper);
assertNull(fallbackHelper1);
verify(helper).updateBalancingState(same(TRANSIENT_FAILURE), isA(ErrorPicker.class));
}
@Test
public void resolutionErrorAtFallback() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
// let fallback timer expire
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fallbackBalancer1).handleResolvedAddresses(captor.capture());
assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("fallback_1_option", "yes");
Status status = Status.UNAVAILABLE.withDescription("resolution error");
lb.handleNameResolutionError(status);
verify(fallbackBalancer1).handleNameResolutionError(status);
}
@Test
public void fallback_AdsNotWorkingYetTimerExpired() throws Exception {
lb.handleResolvedAddresses(
@ -496,49 +564,98 @@ public class XdsLoadBalancerTest {
.setAttributes(standardModeWithFallback1Attributes())
.build());
assertNull(childHelper);
assertNull(fallbackHelper1);
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertNull(childHelper);
assertNotNull(fallbackHelper1);
ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
verify(fallbackBalancer1).handleResolvedAddresses(captor.capture());
assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes");
.containsExactly("fallback_1_option", "yes");
SubchannelPicker picker = mock(SubchannelPicker.class);
fallbackHelper1.updateBalancingState(CONNECTING, picker);
verify(helper).updateBalancingState(CONNECTING, picker);
}
@Test
public void fallback_AdsWorkingTimerCancelled() throws Exception {
public void fallback_ErrorWithoutReceivingEdsResponse() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
assertThat(fakeClock.getPendingTasks()).isEmpty();
verify(fakeBalancer1, never()).handleResolvedAddresses(
ArgumentMatchers.any(ResolvedAddresses.class));
}
@Test
public void fallback_AdsErrorAndNoActiveSubchannel() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
assertNull(childHelper);
assertNull(fallbackHelper1);
assertThat(fakeClock.getPendingTasks()).hasSize(1);
serverResponseWriter.onError(new Exception("fake error"));
// goes to fallback-at-startup mode immediately
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertNull(childHelper);
assertNotNull(fallbackHelper1);
// verify fallback balancer is working
ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
verify(fallbackBalancer1).handleResolvedAddresses(captor.capture());
assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes");
.containsExactly("fallback_1_option", "yes");
SubchannelPicker picker = mock(SubchannelPicker.class);
fallbackHelper1.updateBalancingState(CONNECTING, picker);
verify(helper).updateBalancingState(CONNECTING, picker);
}
@Test
public void fallback_EdsResponseReceivedThenErrorBeforeBackendReady() throws Exception {
lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onNext(edsResponse);
assertNotNull(childHelper);
assertNull(fallbackHelper1);
serverResponseWriter.onError(new Exception("fake error"));
assertThat(fakeClock.getPendingTasks()).hasSize(1);
// verify fallback balancer is not started
assertNull(fallbackHelper1);
verify(fallbackBalancer1, never()).handleResolvedAddresses(any(ResolvedAddresses.class));
SubchannelPicker picker1 = mock(SubchannelPicker.class);
childHelper.updateBalancingState(CONNECTING, picker1);
verify(helper).updateBalancingState(CONNECTING, BUFFER_PICKER);
childHelper.updateBalancingState(TRANSIENT_FAILURE, picker1);
verify(helper).updateBalancingState(same(TRANSIENT_FAILURE), isA(ErrorPicker.class));
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(fakeClock.getPendingTasks()).isEmpty();
// verify fallback balancer is working
ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
assertNotNull(fallbackHelper1);
verify(fallbackBalancer1).handleResolvedAddresses(captor.capture());
assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("fallback_1_option", "yes");
// verify handleResolvedAddresses() is not called again
verify(fakeBalancer1).handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
SubchannelPicker picker2 = mock(SubchannelPicker.class);
childHelper.updateBalancingState(CONNECTING, picker2);
// verify childHelper no more delegates updateBalancingState to parent helper
verify(helper, times(2)).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class));
SubchannelPicker picker3 = mock(SubchannelPicker.class);
fallbackHelper1.updateBalancingState(CONNECTING, picker3);
verify(helper).updateBalancingState(CONNECTING, picker3);
SubchannelPicker picker4 = mock(SubchannelPicker.class);
childHelper.updateBalancingState(READY, picker4);
verify(fallbackBalancer1).shutdown();
verify(helper).updateBalancingState(same(READY), isA(InterLocalityPicker.class));
}
@Test
@ -548,45 +665,29 @@ public class XdsLoadBalancerTest {
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onNext(edsResponse);
assertNotNull(childHelper);
assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertNull(fallbackHelper1);
childHelper.updateBalancingState(READY, mock(SubchannelPicker.class));
verify(helper).updateBalancingState(same(READY), isA(InterLocalityPicker.class));
assertThat(fakeClock.getPendingTasks()).isEmpty();
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
doReturn(true).when(fakeLocalityStore).hasReadyBackends();
serverResponseWriter.onError(new Exception("fake error"));
assertNull(fallbackHelper1);
verify(fallbackBalancer1, never()).handleResolvedAddresses(
any(ResolvedAddresses.class));
verify(fakeBalancer1, never()).handleResolvedAddresses(
ArgumentMatchers.any(ResolvedAddresses.class));
Subchannel subchannel = new Subchannel() {
@Override
public void shutdown() {}
@Override
public void requestConnection() {}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder()
.set(
STATE_INFO,
new AtomicReference<>(ConnectivityStateInfo.forNonError(ConnectivityState.READY)))
.build();
}
};
doReturn(false).when(fakeLocalityStore).hasReadyBackends();
lb.handleSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE));
ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes");
// verify childHelper still delegates updateBalancingState to parent helper
childHelper.updateBalancingState(CONNECTING, mock(SubchannelPicker.class));
verify(helper).updateBalancingState(CONNECTING, BUFFER_PICKER);
}
private static Attributes standardModeWithFallback1Attributes() throws Exception {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"fallbackPolicy\" : [{\"supported_1\" : { \"supported_1_option\" : \"yes\"}}]"
+ "\"fallbackPolicy\" : [{\"fallback_1\" : { \"fallback_1_option\" : \"yes\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@ -598,7 +699,7 @@ public class XdsLoadBalancerTest {
String lbConfigRaw = "{\"xds_experimental\" : { "
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]"
+ "}}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);