diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index ea08af93cc..253bcea366 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -31,8 +31,6 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; -import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; @@ -43,6 +41,7 @@ import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.InterLocalityPicker.WeightedChildPicker; import io.grpc.xds.XdsComms.Locality; import io.grpc.xds.XdsComms.LocalityInfo; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -58,8 +57,6 @@ import java.util.Set; // Must be accessed/run in SynchronizedContext. interface LocalityStore { - boolean hasReadyBackends(); - boolean hasNonDropBackends(); void reset(); @@ -76,7 +73,6 @@ interface LocalityStore { private final LoadBalancerProvider loadBalancerProvider; private Map localityMap = new HashMap<>(); - private ConnectivityState overallState; LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) { this(helper, pickerFactoryImpl, lbRegistry); @@ -104,11 +100,6 @@ interface LocalityStore { } }; - @Override - public boolean hasReadyBackends() { - return overallState == READY; - } - @Override public boolean hasNonDropBackends() { // TODO: impl @@ -196,32 +187,6 @@ interface LocalityStore { } - private static final class ErrorPicker extends SubchannelPicker { - - final Status error; - - ErrorPicker(Status error) { - this.error = checkNotNull(error, "error"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - } - - private static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); - } - - @Override - public String toString() { - return "BUFFER_PICKER"; - } - }; - private static ConnectivityState aggregateState( ConnectivityState overallState, ConnectivityState childState) { if (overallState == null) { @@ -268,7 +233,6 @@ interface LocalityStore { } updatePicker(overallState, childPickers); - this.overallState = overallState; } private void updatePicker(ConnectivityState state, List childPickers) { @@ -278,7 +242,7 @@ interface LocalityStore { if (state == TRANSIENT_FAILURE) { picker = new ErrorPicker(Status.UNAVAILABLE); // TODO: more details in status } else { - picker = BUFFER_PICKER; + picker = XdsSubchannelPickers.BUFFER_PICKER; } } else { picker = pickerFactory.picker(childPickers); @@ -316,7 +280,7 @@ interface LocalityStore { private final Locality locality; - private SubchannelPicker currentChildPicker = BUFFER_PICKER; + private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER; private ConnectivityState currentChildState = null; ChildHelper(Locality locality) { diff --git a/xds/src/main/java/io/grpc/xds/XdsComms.java b/xds/src/main/java/io/grpc/xds/XdsComms.java index 71f1f420b9..c49f6a8e7c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsComms.java +++ b/xds/src/main/java/io/grpc/xds/XdsComms.java @@ -181,7 +181,7 @@ final class XdsComms { final StreamObserver xdsResponseReader = new StreamObserver() { - boolean firstResponseReceived; + boolean firstEdsResponseReceived; @Override public void onNext(final DiscoveryResponse value) { @@ -190,10 +190,6 @@ final class XdsComms { @Override public void run() { - if (!firstResponseReceived) { - firstResponseReceived = true; - adsStreamCallback.onWorking(); - } String typeUrl = value.getTypeUrl(); if (EDS_TYPE_URL.equals(typeUrl)) { // Assuming standard mode. @@ -205,9 +201,13 @@ final class XdsComms { value.getResources(0).unpack(ClusterLoadAssignment.class); } catch (InvalidProtocolBufferException | NullPointerException e) { cancelRpc("Received invalid EDS response", e); + adsStreamCallback.onError(); return; } - + if (!firstEdsResponseReceived) { + firstEdsResponseReceived = true; + adsStreamCallback.onWorking(); + } List localities = clusterLoadAssignment.getEndpointsList(); Map localityEndpointsMapping = new LinkedHashMap<>(); for (LocalityLbEndpoints localityLbEndpoints : localities) { @@ -242,6 +242,7 @@ final class XdsComms { new Runnable() { @Override public void run() { + // TODO: schedule retry closed = true; if (cancelled) { return; @@ -249,7 +250,6 @@ final class XdsComms { adsStreamCallback.onError(); } }); - // TODO: more impl } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index f37e604b07..a83959f725 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -17,7 +17,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.SHUTDOWN; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -25,6 +27,7 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import io.grpc.Attributes; import io.grpc.ChannelLogger.ChannelLogLevel; +import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; @@ -33,12 +36,15 @@ import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.xds.LocalityStore.LocalityStoreImpl; import io.grpc.xds.XdsComms.AdsStreamCallback; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.CheckForNull; import javax.annotation.Nullable; /** @@ -59,15 +65,22 @@ final class XdsLoadBalancer extends LoadBalancer { @Override public void onWorking() { - fallbackManager.balancerWorking = true; - fallbackManager.cancelFallback(); + if (fallbackManager.childPolicyHasBeenReady) { + // cancel Fallback-After-Startup timer if there's any + fallbackManager.cancelFallbackTimer(); + } + + fallbackManager.childBalancerWorked = true; } @Override public void onError() { - // TODO: backoff and retry - fallbackManager.balancerWorking = false; - fallbackManager.maybeUseFallbackPolicy(); + if (!fallbackManager.childBalancerWorked) { + // start Fallback-at-Startup immediately + fallbackManager.useFallbackPolicy(); + } else if (fallbackManager.childPolicyHasBeenReady) { + // TODO: schedule a timer for Fallback-After-Startup + } // else: the Fallback-at-Startup timer is still pending, noop and wait } }; @@ -76,16 +89,35 @@ final class XdsLoadBalancer extends LoadBalancer { private LbConfig fallbackPolicy; - @VisibleForTesting - XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, LocalityStore localityStore) { + XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) { this.helper = helper; this.lbRegistry = lbRegistry; - this.localityStore = localityStore; + this.localityStore = new LocalityStoreImpl(new LocalityStoreHelper(), lbRegistry); fallbackManager = new FallbackManager(helper, localityStore, lbRegistry); } - XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry) { - this(helper, lbRegistry, new LocalityStoreImpl(helper, lbRegistry)); + private final class LocalityStoreHelper extends ForwardingLoadBalancerHelper { + + @Override + protected Helper delegate() { + return helper; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + + if (newState == READY) { + checkState( + fallbackManager.childBalancerWorked, + "channel goes to READY before the load balancer even worked"); + fallbackManager.childPolicyHasBeenReady = true; + fallbackManager.cancelFallback(); + } + + if (!fallbackManager.isInFallbackMode()) { + helper.updateBalancingState(newState, newPicker); + } + } } @Override @@ -103,7 +135,7 @@ final class XdsLoadBalancer extends LoadBalancer { XdsConfig xdsConfig = (XdsConfig) cfg.getConfig(); fallbackPolicy = xdsConfig.fallbackPolicy; fallbackManager.updateFallbackServers(servers, attributes, fallbackPolicy); - fallbackManager.maybeStartFallbackTimer(); + fallbackManager.startFallbackTimer(); handleNewConfig(xdsConfig); xdsLbState.handleResolvedAddressGroups(servers, attributes); } @@ -117,7 +149,6 @@ final class XdsLoadBalancer extends LoadBalancer { xdsComms = xdsLbState.shutdownAndReleaseXdsComms(); if (xdsComms != null) { xdsComms.shutdownChannel(); - fallbackManager.balancerWorking = false; xdsComms = null; } } else if (!Objects.equal( @@ -128,7 +159,6 @@ final class XdsLoadBalancer extends LoadBalancer { // close the stream but reuse the channel if (xdsComms != null) { xdsComms.shutdownLbRpc(cancelMessage); - fallbackManager.balancerWorking = false; xdsComms.refreshAdsStream(); } } else { // effectively no change in policy, keep xdsLbState unchanged @@ -150,32 +180,32 @@ final class XdsLoadBalancer extends LoadBalancer { @Override public void handleNameResolutionError(Status error) { if (xdsLbState != null) { - if (fallbackManager.fallbackBalancer != null) { - fallbackManager.fallbackBalancer.handleNameResolutionError(error); - } else { - xdsLbState.handleNameResolutionError(error); - } + xdsLbState.handleNameResolutionError(error); + } + if (fallbackManager.isInFallbackMode()) { + fallbackManager.fallbackBalancer.handleNameResolutionError(error); + } + if (xdsLbState == null && !fallbackManager.isInFallbackMode()) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); } - // TODO: impl - // else { - // helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(error)); - // } } + /** + * This is only for the subchannel that is created by the the child/fallback balancer using the + * old API {@link LoadBalancer.Helper#createSubchannel(EquivalentAddressGroup, Attributes)} or + * {@link LoadBalancer.Helper#createSubchannel(List, Attributes)}. Otherwise, it either won't be + * called or won't have any effect. + */ + @Deprecated @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { - // xdsLbState should never be null here since handleSubchannelState cannot be called while the - // lb is shutdown. - if (newState.getState() == SHUTDOWN) { - return; - } - - if (fallbackManager.fallbackBalancer != null) { + if (fallbackManager.isInFallbackMode()) { fallbackManager.fallbackBalancer.handleSubchannelState(subchannel, newState); } + // xdsLbState should never be null here since handleSubchannelState cannot be called while the + // lb is shutdown. xdsLbState.handleSubchannelState(subchannel, newState); - fallbackManager.maybeUseFallbackPolicy(); } @Override @@ -215,14 +245,15 @@ final class XdsLoadBalancer extends LoadBalancer { private LoadBalancer fallbackBalancer; // Scheduled only once. Never reset. - @Nullable + @CheckForNull private ScheduledHandle fallbackTimer; private List fallbackServers = ImmutableList.of(); private Attributes fallbackAttributes; // allow value write by outer class - private boolean balancerWorking; + private boolean childBalancerWorked; + private boolean childPolicyHasBeenReady; FallbackManager( Helper helper, LocalityStore localityStore, LoadBalancerRegistry lbRegistry) { @@ -231,36 +262,67 @@ final class XdsLoadBalancer extends LoadBalancer { this.lbRegistry = lbRegistry; } - void cancelFallback() { + /** + * Fallback mode being on indicates that an update from child LBs will be ignored unless the + * update triggers turning off the fallback mode first. + */ + boolean isInFallbackMode() { + return fallbackBalancer != null; + } + + void cancelFallbackTimer() { if (fallbackTimer != null) { fallbackTimer.cancel(); } + } + + void cancelFallback() { + cancelFallbackTimer(); if (fallbackBalancer != null) { fallbackBalancer.shutdown(); fallbackBalancer = null; } } - void maybeUseFallbackPolicy() { + void useFallbackPolicy() { if (fallbackBalancer != null) { return; } - if (balancerWorking || localityStore.hasReadyBackends()) { - return; - } + + cancelFallbackTimer(); helper.getChannelLogger().log( ChannelLogLevel.INFO, "Using fallback policy"); + + final class FallbackBalancerHelper extends ForwardingLoadBalancerHelper { + LoadBalancer balancer; + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + checkNotNull(balancer, "there is a bug"); + if (balancer != fallbackBalancer) { + // ignore updates from a misbehaving shutdown fallback balancer + return; + } + super.updateBalancingState(newState, newPicker); + } + + @Override + protected Helper delegate() { + return helper; + } + } + + FallbackBalancerHelper fallbackBalancerHelper = new FallbackBalancerHelper(); fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName()) - .newLoadBalancer(helper); + .newLoadBalancer(fallbackBalancerHelper); + fallbackBalancerHelper.balancer = fallbackBalancer; // TODO(carl-mastrangelo): propagate the load balancing config policy fallbackBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(fallbackServers) .setAttributes(fallbackAttributes) .build()); - - // TODO: maybe update picker here if still use the old API but not SubchannelStateListener } void updateFallbackServers( @@ -284,17 +346,17 @@ final class XdsLoadBalancer extends LoadBalancer { } else { fallbackBalancer.shutdown(); fallbackBalancer = null; - maybeUseFallbackPolicy(); + useFallbackPolicy(); } } } - void maybeStartFallbackTimer() { + void startFallbackTimer() { if (fallbackTimer == null) { class FallbackTask implements Runnable { @Override public void run() { - maybeUseFallbackPolicy(); + useFallbackPolicy(); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java b/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java new file mode 100644 index 0000000000..5c2890c34e --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsSubchannelPickers.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.Status; + +final class XdsSubchannelPickers { + + private XdsSubchannelPickers() { /* DO NOT CALL ME */ } + + static final SubchannelPicker BUFFER_PICKER = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withNoResult(); + } + + @Override + public String toString() { + return "BUFFER_PICKER"; + } + }; + + static final class ErrorPicker extends SubchannelPicker { + + private final Status error; + + ErrorPicker(Status error) { + this.error = checkNotNull(error, "error"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withError(error); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("error", error) + .toString(); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java index ae8544b4ac..3eff3d8885 100644 --- a/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/FallbackManagerTest.java @@ -142,16 +142,18 @@ public class FallbackManagerTest { @Test public void useFallbackWhenTimeout() { - fallbackManager.maybeStartFallbackTimer(); + fallbackManager.startFallbackTimer(); List eags = new ArrayList<>(); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); + assertThat(fallbackManager.isInFallbackMode()).isFalse(); verify(fakeFallbackLb, never()) .handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class)); fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertThat(fallbackManager.isInFallbackMode()).isTrue(); verify(fakeFallbackLb).handleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(eags) @@ -166,7 +168,7 @@ public class FallbackManagerTest { @Test public void cancelFallback() { - fallbackManager.maybeStartFallbackTimer(); + fallbackManager.startFallbackTimer(); List eags = new ArrayList<>(); fallbackManager.updateFallbackServers( eags, Attributes.EMPTY, fallbackPolicy); @@ -175,6 +177,7 @@ public class FallbackManagerTest { fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertThat(fallbackManager.isInFallbackMode()).isFalse(); verify(fakeFallbackLb, never()) .handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class)); } diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java index 80a67aa67f..ba5829bdff 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java @@ -17,11 +17,19 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; -import static io.grpc.xds.XdsLoadBalancer.STATE_INFO; +import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -29,19 +37,27 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.Locality; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ChannelLogger; import io.grpc.ConnectivityState; -import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; @@ -55,10 +71,10 @@ import io.grpc.internal.JsonParser; import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -80,7 +96,7 @@ public class XdsLoadBalancerTest { @Mock private Helper helper; @Mock - private LoadBalancer fakeBalancer1; + private LoadBalancer fallbackBalancer1; @Mock private LoadBalancer fakeBalancer2; private XdsLoadBalancer lb; @@ -90,6 +106,8 @@ public class XdsLoadBalancerTest { private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); + private Helper fallbackHelper1; + private final LoadBalancerProvider lbProvider1 = new LoadBalancerProvider() { @Override public boolean isAvailable() { @@ -103,12 +121,13 @@ public class XdsLoadBalancerTest { @Override public String getPolicyName() { - return "supported_1"; + return "fallback_1"; } @Override public LoadBalancer newLoadBalancer(Helper helper) { - return fakeBalancer1; + fallbackHelper1 = helper; + return fallbackBalancer1; } }; @@ -134,6 +153,29 @@ public class XdsLoadBalancerTest { } }; + private final Locality localityProto1 = Locality.newBuilder() + .setRegion("region1").setZone("zone1").setSubZone("subzone1").build(); + private final LbEndpoint endpoint11 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr11").setPortValue(11)))) + .setLoadBalancingWeight(UInt32Value.of(11)) + .build(); + private final DiscoveryResponse edsResponse = DiscoveryResponse.newBuilder() + .addResources(Any.pack(ClusterLoadAssignment.newBuilder() + .addEndpoints(LocalityLbEndpoints.newBuilder() + .setLocality(localityProto1) + .addLbEndpoints(endpoint11) + .setLoadBalancingWeight(UInt32Value.of(1))) + .build())) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build(); + + private Helper childHelper; + @Mock + private LoadBalancer childBalancer; + private final LoadBalancerProvider roundRobin = new LoadBalancerProvider() { @Override public boolean isAvailable() { @@ -152,7 +194,8 @@ public class XdsLoadBalancerTest { @Override public LoadBalancer newLoadBalancer(Helper helper) { - return null; + childHelper = helper; + return childBalancer; } }; @@ -164,9 +207,6 @@ public class XdsLoadBalancerTest { } }); - @Mock - private LocalityStore fakeLocalityStore; - private ManagedChannel oobChannel1; private ManagedChannel oobChannel2; private ManagedChannel oobChannel3; @@ -179,7 +219,7 @@ public class XdsLoadBalancerTest { lbRegistry.register(lbProvider1); lbRegistry.register(lbProvider2); lbRegistry.register(roundRobin); - lb = new XdsLoadBalancer(helper, lbRegistry, fakeLocalityStore); + lb = new XdsLoadBalancer(helper, lbRegistry); doReturn(syncContext).when(helper).getSynchronizationContext(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); @@ -253,7 +293,7 @@ public class XdsLoadBalancerTest { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -275,7 +315,7 @@ public class XdsLoadBalancerTest { lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig2 = (Map) JsonParser.parse(lbConfigRaw); @@ -304,7 +344,7 @@ public class XdsLoadBalancerTest { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -322,8 +362,8 @@ public class XdsLoadBalancerTest { lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig2 = (Map) JsonParser.parse(lbConfigRaw); @@ -349,8 +389,8 @@ public class XdsLoadBalancerTest { public void resolverEvent_customModeToStandardMode() throws Exception { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -371,7 +411,7 @@ public class XdsLoadBalancerTest { lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig2 = (Map) JsonParser.parse(lbConfigRaw); @@ -397,8 +437,8 @@ public class XdsLoadBalancerTest { public void resolverEvent_customModeToCustomMode() throws Exception { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -417,8 +457,8 @@ public class XdsLoadBalancerTest { lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"childPolicy\" : [{\"supported_2\" : {\"key\" : \"val\"}}, {\"unsupported_1\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"childPolicy\" : [{\"fallback_1\" : {\"key\" : \"val\"}}, {\"unfallback_1\" : {}}]," + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig2 = (Map) JsonParser.parse(lbConfigRaw); @@ -444,7 +484,7 @@ public class XdsLoadBalancerTest { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -462,8 +502,8 @@ public class XdsLoadBalancerTest { lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8443\"," - + "\"childPolicy\" : [{\"supported_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"childPolicy\" : [{\"fallback_1\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig2 = (Map) JsonParser.parse(lbConfigRaw); @@ -488,6 +528,34 @@ public class XdsLoadBalancerTest { verifyNoMoreInteractions(oobChannel3); } + @Test + public void resolutionErrorAtStartup() { + lb.handleNameResolutionError(Status.UNAVAILABLE); + + assertNull(childHelper); + assertNull(fallbackHelper1); + verify(helper).updateBalancingState(same(TRANSIENT_FAILURE), isA(ErrorPicker.class)); + } + + @Test + public void resolutionErrorAtFallback() throws Exception { + lb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(standardModeWithFallback1Attributes()) + .build()); + // let fallback timer expire + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); + ArgumentCaptor captor = ArgumentCaptor.forClass(ResolvedAddresses.class); + verify(fallbackBalancer1).handleResolvedAddresses(captor.capture()); + assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG)) + .containsExactly("fallback_1_option", "yes"); + + Status status = Status.UNAVAILABLE.withDescription("resolution error"); + lb.handleNameResolutionError(status); + verify(fallbackBalancer1).handleNameResolutionError(status); + } + @Test public void fallback_AdsNotWorkingYetTimerExpired() throws Exception { lb.handleResolvedAddresses( @@ -496,49 +564,98 @@ public class XdsLoadBalancerTest { .setAttributes(standardModeWithFallback1Attributes()) .build()); + assertNull(childHelper); + assertNull(fallbackHelper1); + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); - assertThat(fakeClock.getPendingTasks()).isEmpty(); + assertThat(fakeClock.getPendingTasks()).isEmpty(); + assertNull(childHelper); + assertNotNull(fallbackHelper1); ArgumentCaptor captor = ArgumentCaptor.forClass(ResolvedAddresses.class); - verify(fakeBalancer1).handleResolvedAddresses(captor.capture()); + verify(fallbackBalancer1).handleResolvedAddresses(captor.capture()); assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG)) - .containsExactly("supported_1_option", "yes"); + .containsExactly("fallback_1_option", "yes"); + + SubchannelPicker picker = mock(SubchannelPicker.class); + fallbackHelper1.updateBalancingState(CONNECTING, picker); + verify(helper).updateBalancingState(CONNECTING, picker); } @Test - public void fallback_AdsWorkingTimerCancelled() throws Exception { + public void fallback_ErrorWithoutReceivingEdsResponse() throws Exception { lb.handleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(Collections.emptyList()) .setAttributes(standardModeWithFallback1Attributes()) .build()); - serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - verify(fakeBalancer1, never()).handleResolvedAddresses( - ArgumentMatchers.any(ResolvedAddresses.class)); - } - - @Test - public void fallback_AdsErrorAndNoActiveSubchannel() throws Exception { - lb.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes(standardModeWithFallback1Attributes()) - .build()); + assertNull(childHelper); + assertNull(fallbackHelper1); + assertThat(fakeClock.getPendingTasks()).hasSize(1); serverResponseWriter.onError(new Exception("fake error")); + // goes to fallback-at-startup mode immediately + assertThat(fakeClock.getPendingTasks()).isEmpty(); + assertNull(childHelper); + assertNotNull(fallbackHelper1); + // verify fallback balancer is working ArgumentCaptor captor = ArgumentCaptor.forClass(ResolvedAddresses.class); - verify(fakeBalancer1).handleResolvedAddresses(captor.capture()); + verify(fallbackBalancer1).handleResolvedAddresses(captor.capture()); assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG)) - .containsExactly("supported_1_option", "yes"); + .containsExactly("fallback_1_option", "yes"); + + SubchannelPicker picker = mock(SubchannelPicker.class); + fallbackHelper1.updateBalancingState(CONNECTING, picker); + verify(helper).updateBalancingState(CONNECTING, picker); + } + + @Test + public void fallback_EdsResponseReceivedThenErrorBeforeBackendReady() throws Exception { + lb.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(standardModeWithFallback1Attributes()) + .build()); + serverResponseWriter.onNext(edsResponse); + assertNotNull(childHelper); + assertNull(fallbackHelper1); + + serverResponseWriter.onError(new Exception("fake error")); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + // verify fallback balancer is not started + assertNull(fallbackHelper1); + verify(fallbackBalancer1, never()).handleResolvedAddresses(any(ResolvedAddresses.class)); + + SubchannelPicker picker1 = mock(SubchannelPicker.class); + childHelper.updateBalancingState(CONNECTING, picker1); + verify(helper).updateBalancingState(CONNECTING, BUFFER_PICKER); + childHelper.updateBalancingState(TRANSIENT_FAILURE, picker1); + verify(helper).updateBalancingState(same(TRANSIENT_FAILURE), isA(ErrorPicker.class)); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); - assertThat(fakeClock.getPendingTasks()).isEmpty(); + // verify fallback balancer is working + ArgumentCaptor captor = ArgumentCaptor.forClass(ResolvedAddresses.class); + assertNotNull(fallbackHelper1); + verify(fallbackBalancer1).handleResolvedAddresses(captor.capture()); + assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG)) + .containsExactly("fallback_1_option", "yes"); - // verify handleResolvedAddresses() is not called again - verify(fakeBalancer1).handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class)); + SubchannelPicker picker2 = mock(SubchannelPicker.class); + childHelper.updateBalancingState(CONNECTING, picker2); + // verify childHelper no more delegates updateBalancingState to parent helper + verify(helper, times(2)).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); + + SubchannelPicker picker3 = mock(SubchannelPicker.class); + fallbackHelper1.updateBalancingState(CONNECTING, picker3); + verify(helper).updateBalancingState(CONNECTING, picker3); + + SubchannelPicker picker4 = mock(SubchannelPicker.class); + childHelper.updateBalancingState(READY, picker4); + verify(fallbackBalancer1).shutdown(); + verify(helper).updateBalancingState(same(READY), isA(InterLocalityPicker.class)); } @Test @@ -548,45 +665,29 @@ public class XdsLoadBalancerTest { .setAddresses(Collections.emptyList()) .setAttributes(standardModeWithFallback1Attributes()) .build()); + serverResponseWriter.onNext(edsResponse); + assertNotNull(childHelper); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + assertNull(fallbackHelper1); + + childHelper.updateBalancingState(READY, mock(SubchannelPicker.class)); + verify(helper).updateBalancingState(same(READY), isA(InterLocalityPicker.class)); + assertThat(fakeClock.getPendingTasks()).isEmpty(); - serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); - doReturn(true).when(fakeLocalityStore).hasReadyBackends(); serverResponseWriter.onError(new Exception("fake error")); + assertNull(fallbackHelper1); + verify(fallbackBalancer1, never()).handleResolvedAddresses( + any(ResolvedAddresses.class)); - verify(fakeBalancer1, never()).handleResolvedAddresses( - ArgumentMatchers.any(ResolvedAddresses.class)); - - Subchannel subchannel = new Subchannel() { - @Override - public void shutdown() {} - - @Override - public void requestConnection() {} - - @Override - public Attributes getAttributes() { - return Attributes.newBuilder() - .set( - STATE_INFO, - new AtomicReference<>(ConnectivityStateInfo.forNonError(ConnectivityState.READY))) - .build(); - } - }; - - doReturn(false).when(fakeLocalityStore).hasReadyBackends(); - lb.handleSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure( - Status.UNAVAILABLE)); - - ArgumentCaptor captor = ArgumentCaptor.forClass(ResolvedAddresses.class); - verify(fakeBalancer1).handleResolvedAddresses(captor.capture()); - assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG)) - .containsExactly("supported_1_option", "yes"); + // verify childHelper still delegates updateBalancingState to parent helper + childHelper.updateBalancingState(CONNECTING, mock(SubchannelPicker.class)); + verify(helper).updateBalancingState(CONNECTING, BUFFER_PICKER); } private static Attributes standardModeWithFallback1Attributes() throws Exception { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," - + "\"fallbackPolicy\" : [{\"supported_1\" : { \"supported_1_option\" : \"yes\"}}]" + + "\"fallbackPolicy\" : [{\"fallback_1\" : { \"fallback_1_option\" : \"yes\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); @@ -598,7 +699,7 @@ public class XdsLoadBalancerTest { String lbConfigRaw = "{\"xds_experimental\" : { " + "\"balancerName\" : \"dns:///balancer.example.com:8080\"," + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]," - + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}}]" + + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"fallback_1\" : {\"key\" : \"val\"}}]" + "}}"; @SuppressWarnings("unchecked") Map lbConfig = (Map) JsonParser.parse(lbConfigRaw);