From 03db20cded04dfbf6927dd4b22a9b9a97cc0c25d Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 17 Apr 2020 17:42:47 +0000 Subject: [PATCH] xds: eliminate special code path for EDS-only workflow (#6931) Delete special logics (e.g., fallback) for EDS-only workflow and use the same format of lb config for running EDS-only workflow as running the full CDS-EDS workflow. --- .../java/io/grpc/xds/EdsLoadBalancer.java | 190 ++++------ .../io/grpc/xds/EdsLoadBalancerProvider.java | 58 +++- xds/src/main/java/io/grpc/xds/FallbackLb.java | 77 ---- .../java/io/grpc/xds/XdsLoadBalancer.java | 284 --------------- .../io/grpc/xds/XdsLoadBalancerProvider.java | 167 --------- .../services/io.grpc.LoadBalancerProvider | 1 - .../grpc/xds/CdsLoadBalancerProviderTest.java | 48 +++ .../grpc/xds/EdsLoadBalancerProviderTest.java | 76 ++++ .../java/io/grpc/xds/EdsLoadBalancerTest.java | 87 +---- .../test/java/io/grpc/xds/FallbackLbTest.java | 261 -------------- .../grpc/xds/XdsLoadBalancerProviderTest.java | 130 ------- .../java/io/grpc/xds/XdsLoadBalancerTest.java | 328 ------------------ 12 files changed, 258 insertions(+), 1449 deletions(-) delete mode 100644 xds/src/main/java/io/grpc/xds/FallbackLb.java delete mode 100644 xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java delete mode 100644 xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java create mode 100644 xds/src/test/java/io/grpc/xds/CdsLoadBalancerProviderTest.java create mode 100644 xds/src/test/java/io/grpc/xds/EdsLoadBalancerProviderTest.java delete mode 100644 xds/src/test/java/io/grpc/xds/FallbackLbTest.java delete mode 100644 xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java delete mode 100644 xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java index 239fc3dc6b..8d78bc82ef 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java @@ -16,7 +16,6 @@ package io.grpc.xds; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; @@ -57,13 +56,12 @@ final class EdsLoadBalancer extends LoadBalancer { private final InternalLogId logId; private final XdsLogger logger; - private final ResourceUpdateCallback resourceUpdateCallback; private final GracefulSwitchLoadBalancer switchingLoadBalancer; private final LoadBalancerRegistry lbRegistry; private final LocalityStoreFactory localityStoreFactory; private final Bootstrapper bootstrapper; private final XdsChannelFactory channelFactory; - private final Helper edsLbHelper; + private final Helper helper; @Nullable private ObjectPool xdsClientPool; @@ -72,12 +70,11 @@ final class EdsLoadBalancer extends LoadBalancer { @Nullable private String clusterName; @Nullable - private EdsConfig config; + private String edsServiceName; - EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) { + EdsLoadBalancer(Helper helper) { this( - edsLbHelper, - resourceUpdateCallback, + helper, LoadBalancerRegistry.getDefaultRegistry(), LocalityStoreFactory.getInstance(), Bootstrapper.getInstance(), @@ -86,20 +83,18 @@ final class EdsLoadBalancer extends LoadBalancer { @VisibleForTesting EdsLoadBalancer( - Helper edsLbHelper, - ResourceUpdateCallback resourceUpdateCallback, + Helper helper, LoadBalancerRegistry lbRegistry, LocalityStoreFactory localityStoreFactory, Bootstrapper bootstrapper, XdsChannelFactory channelFactory) { - this.edsLbHelper = checkNotNull(edsLbHelper, "edsLbHelper"); - this.resourceUpdateCallback = checkNotNull(resourceUpdateCallback, "resourceUpdateCallback"); + this.helper = checkNotNull(helper, "helper"); this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); this.localityStoreFactory = checkNotNull(localityStoreFactory, "localityStoreFactory"); this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); this.channelFactory = checkNotNull(channelFactory, "channelFactory"); - this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(edsLbHelper); - logId = InternalLogId.allocate("eds-lb", edsLbHelper.getAuthority()); + this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper); + logId = InternalLogId.allocate("eds-lb", helper.getAuthority()); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); } @@ -108,7 +103,10 @@ final class EdsLoadBalancer extends LoadBalancer { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses); Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig(); - checkNotNull(lbConfig, "missing EDS lb config"); + if (lbConfig == null) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription("Missing EDS lb config")); + return; + } EdsConfig newEdsConfig = (EdsConfig) lbConfig; if (logger.isLoggable(XdsLogLevel.INFO)) { logger.log( @@ -120,34 +118,20 @@ final class EdsLoadBalancer extends LoadBalancer { newEdsConfig.edsServiceName, newEdsConfig.lrsServerName != null); } + boolean firstUpdate = false; if (clusterName == null) { - clusterName = newEdsConfig.clusterName; - } else { - checkArgument( - clusterName.equals(newEdsConfig.clusterName), - "cluster name should not change"); + firstUpdate = true; } + clusterName = newEdsConfig.clusterName; if (xdsClientPool == null) { - // Init xdsClientPool and xdsClient. - // There are two usecases: - // 1. EDS-only: - // The name resolver resolves a ResolvedAddresses with an XdsConfig. Use the bootstrap - // information to create a channel. - // 2. Non EDS-only: - // XDS_CLIENT_POOL attribute is available from ResolvedAddresses either from - // XdsNameResolver or CDS policy. - // - // We assume XdsConfig switching happens only within one usecase, and there is no switching - // between different usecases. - Attributes attributes = resolvedAddresses.getAttributes(); xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL); - if (xdsClientPool == null) { // This is the EDS-only usecase. + if (xdsClientPool == null) { final BootstrapInfo bootstrapInfo; try { bootstrapInfo = bootstrapper.readBootstrap(); } catch (Exception e) { - edsLbHelper.updateBalancingState( + helper.updateBalancingState( TRANSIENT_FAILURE, new ErrorPicker( Status.UNAVAILABLE.withDescription("Failed to bootstrap").withCause(e))); @@ -157,7 +141,7 @@ final class EdsLoadBalancer extends LoadBalancer { final List serverList = bootstrapInfo.getServers(); final Node node = bootstrapInfo.getNode(); if (serverList.isEmpty()) { - edsLbHelper.updateBalancingState( + helper.updateBalancingState( TRANSIENT_FAILURE, new ErrorPicker( Status.UNAVAILABLE @@ -169,12 +153,12 @@ final class EdsLoadBalancer extends LoadBalancer { XdsClient createXdsClient() { return new XdsClientImpl( - edsLbHelper.getAuthority(), + helper.getAuthority(), serverList, channelFactory, node, - edsLbHelper.getSynchronizationContext(), - edsLbHelper.getScheduledExecutorService(), + helper.getSynchronizationContext(), + helper.getScheduledExecutorService(), new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER); } @@ -188,14 +172,13 @@ final class EdsLoadBalancer extends LoadBalancer { // Note: childPolicy change will be handled in LocalityStore, to be implemented. // If edsServiceName in XdsConfig is changed, do a graceful switch. - if (config == null - || !Objects.equals(newEdsConfig.edsServiceName, config.edsServiceName)) { + if (firstUpdate || !Objects.equals(newEdsConfig.edsServiceName, edsServiceName)) { LoadBalancer.Factory clusterEndpointsLoadBalancerFactory = new ClusterEndpointsBalancerFactory(newEdsConfig.edsServiceName); switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancerFactory); } switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses); - this.config = newEdsConfig; + this.edsServiceName = newEdsConfig.edsServiceName; } @Override @@ -267,7 +250,7 @@ final class EdsLoadBalancer extends LoadBalancer { resourceName = clusterServiceName != null ? clusterServiceName : clusterName; localityStore = localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore); - endpointWatcher = new EndpointWatcherImpl(localityStore); + endpointWatcher = new EndpointWatcherImpl(); logger.log( XdsLogLevel.INFO, "Start endpoint watcher on {0} with xDS client {1}", @@ -308,9 +291,7 @@ final class EdsLoadBalancer extends LoadBalancer { @Override public void handleNameResolutionError(Status error) { - // Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise, - // we keep running with the data we had previously. - if (!endpointWatcher.firstEndpointUpdateReceived) { + if (!endpointWatcher.endpointsReceived) { helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); } } @@ -339,89 +320,60 @@ final class EdsLoadBalancer extends LoadBalancer { resourceName, xdsClient); } - } - } - /** - * Callbacks for the EDS-only-with-fallback usecase. Being deprecated. - */ - interface ResourceUpdateCallback { + private final class EndpointWatcherImpl implements EndpointWatcher { + boolean endpointsReceived; - void onWorking(); + @Override + public void onEndpointChanged(EndpointUpdate endpointUpdate) { + logger.log(XdsLogLevel.DEBUG, "Received endpoint update: {0}", endpointUpdate); + if (logger.isLoggable(XdsLogLevel.INFO)) { + logger.log( + XdsLogLevel.INFO, + "Received endpoint update from xDS client {0}: cluster_name={1}, {2} localities, " + + "{3} drop categories", + xdsClient, + endpointUpdate.getClusterName(), + endpointUpdate.getLocalityLbEndpointsMap().size(), + endpointUpdate.getDropPolicies().size()); + } + endpointsReceived = true; + List dropOverloads = endpointUpdate.getDropPolicies(); + ImmutableList.Builder dropOverloadsBuilder = ImmutableList.builder(); + for (DropOverload dropOverload : dropOverloads) { + dropOverloadsBuilder.add(dropOverload); + if (dropOverload.getDropsPerMillion() == 1_000_000) { + break; + } + } + localityStore.updateDropPercentage(dropOverloadsBuilder.build()); - void onError(); + ImmutableMap.Builder localityEndpointsMapping = + new ImmutableMap.Builder<>(); + for (Map.Entry entry + : endpointUpdate.getLocalityLbEndpointsMap().entrySet()) { + int localityWeight = entry.getValue().getLocalityWeight(); - void onAllDrop(); - } - - private final class EndpointWatcherImpl implements EndpointWatcher { - - final LocalityStore localityStore; - boolean firstEndpointUpdateReceived; - - EndpointWatcherImpl(LocalityStore localityStore) { - this.localityStore = localityStore; - } - - @Override - public void onEndpointChanged(EndpointUpdate endpointUpdate) { - logger.log(XdsLogLevel.DEBUG, "Received endpoint update: {0}", endpointUpdate); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log( - XdsLogLevel.INFO, - "Received endpoint update from xDS client {0}: cluster_name={1}, {2} localities, " - + "{3} drop categories", - xdsClient, - endpointUpdate.getClusterName(), - endpointUpdate.getLocalityLbEndpointsMap().size(), - endpointUpdate.getDropPolicies().size()); - } - - if (!firstEndpointUpdateReceived) { - firstEndpointUpdateReceived = true; - resourceUpdateCallback.onWorking(); - } - - List dropOverloads = endpointUpdate.getDropPolicies(); - ImmutableList.Builder dropOverloadsBuilder = ImmutableList.builder(); - for (DropOverload dropOverload : dropOverloads) { - dropOverloadsBuilder.add(dropOverload); - if (dropOverload.getDropsPerMillion() == 1_000_000) { - resourceUpdateCallback.onAllDrop(); - break; + if (localityWeight != 0) { + localityEndpointsMapping.put(entry.getKey(), entry.getValue()); + } + } + localityStore.updateLocalityStore(localityEndpointsMapping.build()); } - } - localityStore.updateDropPercentage(dropOverloadsBuilder.build()); - ImmutableMap.Builder localityEndpointsMapping = - new ImmutableMap.Builder<>(); - for (Map.Entry entry - : endpointUpdate.getLocalityLbEndpointsMap().entrySet()) { - int localityWeight = entry.getValue().getLocalityWeight(); - - if (localityWeight != 0) { - localityEndpointsMapping.put(entry.getKey(), entry.getValue()); + @Override + public void onError(Status error) { + logger.log( + XdsLogLevel.WARNING, + "Received error from xDS client {0}: {1}: {2}", + xdsClient, + error.getCode(), + error.getDescription()); + if (!endpointsReceived) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } } } - - localityStore.updateLocalityStore(localityEndpointsMapping.build()); - } - - @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, - "Received error from xDS client {0}: {1}: {2}", - xdsClient, - error.getCode(), - error.getDescription()); - resourceUpdateCallback.onError(); - // If we get an error before getting any valid result, we should put the channel in - // TRANSIENT_FAILURE; if they get an error after getting a valid result, we keep using the - // previous channel state. - if (!firstEndpointUpdateReceived) { - edsLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); - } } } } diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java index 30ea397443..87ac8f7697 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java @@ -20,13 +20,20 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; import io.grpc.Internal; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback; +import java.util.Collections; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -55,24 +62,47 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { @Override public LoadBalancer newLoadBalancer(Helper helper) { - return new EdsLoadBalancer( - helper, - new ResourceUpdateCallback() { - @Override - public void onWorking() {} - - @Override - public void onError() {} - - @Override - public void onAllDrop() {} - }); + return new EdsLoadBalancer(helper); } @Override public ConfigOrError parseLoadBalancingPolicyConfig( Map rawLoadBalancingPolicyConfig) { - throw new UnsupportedOperationException(); + LoadBalancerRegistry registry = LoadBalancerRegistry.getDefaultRegistry(); + try { + String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster"); + if (cluster == null) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription("Cluster name required")); + } + String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName"); + String lrsServerName = + JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName"); + + // TODO(chengyuanzhang): figure out locality_picking_policy parsing and its default value. + + LbConfig roundRobinConfig = new LbConfig("round_robin", ImmutableMap.of()); + List endpointPickingPolicy = + ServiceConfigUtil + .unwrapLoadBalancingConfigList( + JsonUtil.getListOfObjects( + rawLoadBalancingPolicyConfig, "endpointPickingPolicy")); + if (endpointPickingPolicy == null || endpointPickingPolicy.isEmpty()) { + endpointPickingPolicy = Collections.singletonList(roundRobinConfig); + } + ConfigOrError endpointPickingConfigOrError = + ServiceConfigUtil.selectLbPolicyFromList(endpointPickingPolicy, registry); + if (endpointPickingConfigOrError.getError() != null) { + return endpointPickingConfigOrError; + } + PolicySelection endpointPickingSelection = + (PolicySelection) endpointPickingConfigOrError.getConfig(); + return ConfigOrError.fromConfig( + new EdsConfig(cluster, edsServiceName, lrsServerName, endpointPickingSelection)); + } catch (RuntimeException e) { + return ConfigOrError.fromError( + Status.fromThrowable(e).withDescription( + "Failed to parse EDS LB config: " + rawLoadBalancingPolicyConfig)); + } } static final class EdsConfig { diff --git a/xds/src/main/java/io/grpc/xds/FallbackLb.java b/xds/src/main/java/io/grpc/xds/FallbackLb.java deleted file mode 100644 index fb311601ff..0000000000 --- a/xds/src/main/java/io/grpc/xds/FallbackLb.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; - -import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancer; -import io.grpc.Status; -import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.util.ForwardingLoadBalancer; -import io.grpc.util.GracefulSwitchLoadBalancer; -import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; -import java.util.List; - -/** Fallback load balancer. Handles fallback policy changes. */ -final class FallbackLb extends ForwardingLoadBalancer { - - private final Helper fallbackLbHelper; - private final GracefulSwitchLoadBalancer fallbackPolicyLb; - - FallbackLb(Helper fallbackLbHelper) { - this.fallbackLbHelper = checkNotNull(fallbackLbHelper, "fallbackLbHelper"); - fallbackPolicyLb = new GracefulSwitchLoadBalancer(fallbackLbHelper); - } - - @Override - protected LoadBalancer delegate() { - return fallbackPolicyLb; - } - - @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - XdsConfig xdsConfig = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - PolicySelection fallbackPolicy = xdsConfig.fallbackPolicy; - if (fallbackPolicy == null) { - // In the latest xDS design, fallback is not supported. - fallbackLbHelper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker(Status.UNAVAILABLE.withDescription("Fallback is not supported"))); - return; - } - fallbackPolicyLb.switchTo(fallbackPolicy.getProvider()); - - List servers = resolvedAddresses.getAddresses(); - // TODO(zhangkun83): FIXME(#5496): this is a temporary hack. - if (servers.isEmpty() - && !fallbackPolicyLb.canHandleEmptyAddressListFromNameResolution()) { - fallbackPolicyLb.handleNameResolutionError(Status.UNAVAILABLE.withDescription( - "NameResolver returned no usable address." - + " addrs=" + resolvedAddresses)); - } else { - // TODO(carl-mastrangelo): propagate the load balancing config policy - ResolvedAddresses fallbackResolvedAddresses = resolvedAddresses.toBuilder() - .setAddresses(servers) - .setLoadBalancingPolicyConfig(fallbackPolicy.getConfig()) - .build(); - fallbackPolicyLb.handleResolvedAddresses(fallbackResolvedAddresses); - } - } -} diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java deleted file mode 100644 index d522eab085..0000000000 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; - -import com.google.common.annotations.VisibleForTesting; -import io.grpc.ChannelLogger.ChannelLogLevel; -import io.grpc.ConnectivityState; -import io.grpc.ConnectivityStateInfo; -import io.grpc.LoadBalancer; -import io.grpc.Status; -import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.util.ForwardingLoadBalancerHelper; -import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback; -import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; -import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; -import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; -import java.util.concurrent.TimeUnit; -import javax.annotation.CheckForNull; -import javax.annotation.Nullable; - -/** - * A {@link LoadBalancer} that uses the XDS protocol. - * - *

This class manages fallback handling. The logic for child policy handling and fallback policy - * handling is provided by EdsLoadBalancer and FallbackLb. - */ -final class XdsLoadBalancer extends LoadBalancer { - - private static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); // same as grpclb - - private final Helper helper; - private final LoadBalancer primaryLb; - private final LoadBalancer.Factory fallbackLbFactory; - private final ResourceUpdateCallback resourceUpdateCallback = new ResourceUpdateCallback() { - @Override - public void onWorking() { - if (primaryPolicyHasBeenReady) { - // cancel Fallback-After-Startup timer if there's any - cancelFallbackTimer(); - } - - primaryPolicyWorked = true; - } - - @Override - public void onError() { - if (!primaryPolicyWorked) { - // start Fallback-at-Startup immediately - useFallbackPolicy(); - } else if (primaryPolicyHasBeenReady) { - // TODO: schedule a timer for Fallback-After-Startup - } // else: the Fallback-at-Startup timer is still pending, noop and wait - } - - @Override - public void onAllDrop() { - cancelFallback(); - } - }; - - @Nullable - private LoadBalancer fallbackLb; - @Nullable - private ResolvedAddresses resolvedAddresses; - // Scheduled only once. Never reset to null. - @CheckForNull - private ScheduledHandle fallbackTimer; - private boolean primaryPolicyWorked; - private boolean primaryPolicyHasBeenReady; - - XdsLoadBalancer(Helper helper) { - this(helper, new EdsLoadBalancerFactory(), new FallbackLbFactory()); - } - - @VisibleForTesting - XdsLoadBalancer( - Helper helper, - PrimaryLbFactory primaryLbFactory, - LoadBalancer.Factory fallbackLbFactory) { - this.helper = checkNotNull(helper, "helper"); - this.primaryLb = primaryLbFactory.newLoadBalancer( - new PrimaryLbHelper(), resourceUpdateCallback); - this.fallbackLbFactory = fallbackLbFactory; - } - - @Override - public boolean canHandleEmptyAddressListFromNameResolution() { - // This does not sound correct, but it's fine as we don't support fallback at this moment. - // TODO(zdapeng): revisit it once we officially support fallback. - return true; - } - - @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - this.resolvedAddresses = resolvedAddresses; - Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig(); - if (lbConfig == null) { - helper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker(Status.UNAVAILABLE.withDescription("Missing xDS lb config"))); - return; - } - XdsConfig newXdsConfig = (XdsConfig) lbConfig; - if (isInFallbackMode()) { - fallbackLb.handleResolvedAddresses(this.resolvedAddresses); - } - - if (fallbackTimer == null) { - class EnterFallbackTask implements Runnable { - - @Override - public void run() { - useFallbackPolicy(); - } - } - - fallbackTimer = helper.getSynchronizationContext().schedule( - new EnterFallbackTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, - helper.getScheduledExecutorService()); - } - EdsConfig edsConfig = - new EdsConfig( - helper.getAuthority(), - newXdsConfig.edsServiceName, - newXdsConfig.lrsServerName, - newXdsConfig.childPolicy); - primaryLb.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build()); - } - - @Override - public void handleNameResolutionError(Status error) { - primaryLb.handleNameResolutionError(error); - if (isInFallbackMode()) { - fallbackLb.handleNameResolutionError(error); - } - } - - @Override - public void requestConnection() { - primaryLb.requestConnection(); - if (isInFallbackMode()) { - fallbackLb.requestConnection(); - } - } - - @Override - public void shutdown() { - helper.getChannelLogger().log( - ChannelLogLevel.INFO, "Shutting down XDS balancer"); - primaryLb.shutdown(); - cancelFallback(); - } - - @Deprecated - public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - throw new UnsupportedOperationException( - "handleSubchannelState() not supported by XdsLoadBalancer"); - } - - private void cancelFallbackTimer() { - if (fallbackTimer != null) { - fallbackTimer.cancel(); - } - } - - private void cancelFallback() { - cancelFallbackTimer(); - if (isInFallbackMode()) { - helper.getChannelLogger().log( - ChannelLogLevel.INFO, "Shutting down XDS fallback balancer"); - fallbackLb.shutdown(); - fallbackLb = null; - } - } - - private void useFallbackPolicy() { - if (isInFallbackMode()) { - return; - } - cancelFallbackTimer(); - helper.getChannelLogger().log( - ChannelLogLevel.INFO, "Using XDS fallback policy"); - - FallbackLbHelper fallbackLbHelper = new FallbackLbHelper(); - fallbackLb = fallbackLbFactory.newLoadBalancer(fallbackLbHelper); - fallbackLbHelper.balancer = fallbackLb; - fallbackLb.handleResolvedAddresses(resolvedAddresses); - } - - /** - * Fallback mode being on indicates that an update from child LBs will be ignored unless the - * update triggers turning off the fallback mode first. - */ - private boolean isInFallbackMode() { - return fallbackLb != null; - } - - private final class PrimaryLbHelper extends ForwardingLoadBalancerHelper { - - @Override - protected Helper delegate() { - return helper; - } - - @Override - public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { - if (newState == ConnectivityState.READY) { - checkState( - primaryPolicyWorked, - "channel goes to READY before the load balancer even worked"); - primaryPolicyHasBeenReady = true; - cancelFallback(); - } - if (!isInFallbackMode()) { - helper.getChannelLogger().log( - ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", newState, newPicker); - helper.updateBalancingState(newState, newPicker); - } - } - } - - private final class FallbackLbHelper extends ForwardingLoadBalancerHelper { - LoadBalancer balancer; - - @Override - protected Helper delegate() { - return helper; - } - - @Override - public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { - checkNotNull(balancer, "balancer not set yet"); - if (balancer != fallbackLb) { - // ignore updates from a misbehaving shutdown fallback balancer - return; - } - helper.getChannelLogger().log( - ChannelLogLevel.INFO, - "Picker updated - state: {0}, picker: {1}", newState, newPicker); - super.updateBalancingState(newState, newPicker); - } - } - - /** Factory of load balancer for the primary policy.*/ - // The interface itself is for convenience in test. - @VisibleForTesting - interface PrimaryLbFactory { - LoadBalancer newLoadBalancer(Helper helper, ResourceUpdateCallback resourceUpdateCallback); - } - - private static final class EdsLoadBalancerFactory implements PrimaryLbFactory { - @Override - public LoadBalancer newLoadBalancer( - Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) { - return new EdsLoadBalancer(edsLbHelper, resourceUpdateCallback); - } - } - - private static final class FallbackLbFactory extends LoadBalancer.Factory { - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return new FallbackLb(helper); - } - } -} diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java deleted file mode 100644 index bf621d9dc1..0000000000 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableMap; -import io.grpc.Internal; -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancer.Helper; -import io.grpc.LoadBalancerProvider; -import io.grpc.LoadBalancerRegistry; -import io.grpc.NameResolver.ConfigOrError; -import io.grpc.Status; -import io.grpc.internal.JsonUtil; -import io.grpc.internal.ServiceConfigUtil; -import io.grpc.internal.ServiceConfigUtil.LbConfig; -import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * The provider for the "xds" balancing policy. This class should not be directly referenced in - * code. The policy should be accessed through {@link io.grpc.LoadBalancerRegistry#getProvider} - * with the name "xds" (currently "xds_experimental"). - */ -@Internal -public final class XdsLoadBalancerProvider extends LoadBalancerProvider { - - private static final String XDS_POLICY_NAME = "xds_experimental"; - - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - @Override - public String getPolicyName() { - return XDS_POLICY_NAME; - } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return new XdsLoadBalancer(helper); - } - - @Override - public ConfigOrError parseLoadBalancingPolicyConfig( - Map rawLoadBalancingPolicyConfig) { - return parseLoadBalancingConfigPolicy( - rawLoadBalancingPolicyConfig, LoadBalancerRegistry.getDefaultRegistry()); - } - - static ConfigOrError parseLoadBalancingConfigPolicy( - Map rawLoadBalancingPolicyConfig, LoadBalancerRegistry registry) { - try { - LbConfig roundRobinConfig = new LbConfig("round_robin", ImmutableMap.of()); - List childPolicyConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - JsonUtil.getListOfObjects(rawLoadBalancingPolicyConfig, "childPolicy")); - if (childPolicyConfigs == null || childPolicyConfigs.isEmpty()) { - childPolicyConfigs = Collections.singletonList(roundRobinConfig); - } - ConfigOrError childConfigOrError = - ServiceConfigUtil.selectLbPolicyFromList(childPolicyConfigs, registry); - if (childConfigOrError.getError() != null) { - return childConfigOrError; - } - PolicySelection childPolicy = (PolicySelection) childConfigOrError.getConfig(); - - List fallbackConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - JsonUtil.getListOfObjects(rawLoadBalancingPolicyConfig, "fallbackPolicy")); - if (fallbackConfigs == null || fallbackConfigs.isEmpty()) { - fallbackConfigs = Collections.singletonList(roundRobinConfig); - } - ConfigOrError fallbackConfigOrError = - ServiceConfigUtil.selectLbPolicyFromList(fallbackConfigs, registry); - if (fallbackConfigOrError.getError() != null) { - return fallbackConfigOrError; - } - PolicySelection fallbackPolicy = (PolicySelection) fallbackConfigOrError.getConfig(); - - String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName"); - String lrsServerName = - JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName"); - return ConfigOrError.fromConfig( - new XdsConfig(edsServiceName, lrsServerName, childPolicy, fallbackPolicy)); - } catch (RuntimeException e) { - return ConfigOrError.fromError( - Status.fromThrowable(e).withDescription( - "Failed to parse XDS LB config: " + rawLoadBalancingPolicyConfig)); - } - } - - /** - * Represents a successfully parsed and validated LoadBalancingConfig for XDS LB policy. - */ - static final class XdsConfig { - @Nullable - final String edsServiceName; - @Nullable - final String lrsServerName; - final PolicySelection childPolicy; // default to round_robin if not specified in proto - final PolicySelection fallbackPolicy; // default to round_robin if not specified in proto - - XdsConfig( - @Nullable String edsServiceName, - @Nullable String lrsServerName, - PolicySelection childPolicy, - PolicySelection fallbackPolicy) { - this.edsServiceName = edsServiceName; - this.lrsServerName = lrsServerName; - this.childPolicy = checkNotNull(childPolicy, "childPolicy"); - this.fallbackPolicy = fallbackPolicy; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("edsServiceName", edsServiceName) - .add("lrsServerName", lrsServerName) - .add("childPolicy", childPolicy) - .add("fallbackPolicy", fallbackPolicy) - .toString(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof XdsConfig)) { - return false; - } - XdsConfig that = (XdsConfig) obj; - return Objects.equal(this.edsServiceName, that.edsServiceName) - && Objects.equal(this.lrsServerName, that.lrsServerName) - && Objects.equal(this.childPolicy, that.childPolicy) - && Objects.equal(this.fallbackPolicy, that.fallbackPolicy); - } - - @Override - public int hashCode() { - return Objects.hashCode( - edsServiceName, lrsServerName, childPolicy, fallbackPolicy); - } - } -} diff --git a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index c85e6d0c4a..9f83a30598 100644 --- a/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/xds/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -1,5 +1,4 @@ io.grpc.xds.CdsLoadBalancerProvider io.grpc.xds.EdsLoadBalancerProvider io.grpc.xds.WeightedTargetLoadBalancerProvider -io.grpc.xds.XdsLoadBalancerProvider io.grpc.xds.XdsRoutingLoadBalancerProvider diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerProviderTest.java new file mode 100644 index 0000000000..b937fc65e7 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerProviderTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.internal.JsonParser; +import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; +import java.io.IOException; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link CdsLoadBalancerProvider}. */ +@RunWith(JUnit4.class) +public class CdsLoadBalancerProviderTest { + + @Test + public void parseCdsLoadBalancingPolicyConfig() throws IOException { + CdsLoadBalancerProvider provider = new CdsLoadBalancerProvider(); + String rawCdsLbConfig = "{\n" + + " \"cluster\": \"cluster-foo.googleapis.com\"\n" + + "}"; + + @SuppressWarnings("unchecked") + Map rawLbConfigMap = (Map) JsonParser.parse(rawCdsLbConfig); + ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap); + assertThat(result.getConfig()).isNotNull(); + CdsConfig config = (CdsConfig) result.getConfig(); + assertThat(config.name).isEqualTo("cluster-foo.googleapis.com"); + } +} diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerProviderTest.java new file mode 100644 index 0000000000..6c69afcc78 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerProviderTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.internal.JsonParser; +import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; +import java.io.IOException; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link EdsLoadBalancerProvider}. */ +@RunWith(JUnit4.class) +public class EdsLoadBalancerProviderTest { + private final EdsLoadBalancerProvider provider = new EdsLoadBalancerProvider(); + + @Test + public void parseEdsLoadBalancingPolicyConfig() throws IOException { + String rawEdsLbConfig = "{\n" + + " \"cluster\": \"cluster-foo.googleapis.com\",\n" + + " \"edsServiceName\": \"cluster-foo:service-blade\",\n" + + " \"lrsLoadReportingServerName\": \"trafficdirector.googleapis.com\",\n" + + " \"endpointPickingPolicy\": [\n" + + " { \"policy_foo\": {} },\n" + + " { \"pick_first\": {} }\n" + + " ]\n" + + "}"; + + @SuppressWarnings("unchecked") + Map rawLbConfigMap = (Map) JsonParser.parse(rawEdsLbConfig); + ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap); + assertThat(result.getConfig()).isNotNull(); + EdsConfig config = (EdsConfig) result.getConfig(); + assertThat(config.clusterName).isEqualTo("cluster-foo.googleapis.com"); + assertThat(config.edsServiceName).isEqualTo("cluster-foo:service-blade"); + assertThat(config.lrsServerName).isEqualTo("trafficdirector.googleapis.com"); + assertThat(config.endpointPickingPolicy.getProvider().getPolicyName()) + .isEqualTo("pick_first"); + } + + @Test + public void parseEdsLoadBalancingPolicyConfig_defaultEndpointPickingPolicy_roundRobin() + throws IOException { + String rawEdsLbConfig = "{\n" + + " \"cluster\": \"cluster-foo.googleapis.com\",\n" + + " \"edsServiceName\": \"cluster-foo:service-blade\",\n" + + " \"lrsLoadReportingServerName\": \"trafficdirector.googleapis.com\"\n" + + "}"; + + @SuppressWarnings("unchecked") + Map rawLbConfigMap = (Map) JsonParser.parse(rawEdsLbConfig); + ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap); + assertThat(result.getConfig()).isNotNull(); + EdsConfig config = (EdsConfig) result.getConfig(); + assertThat(config.endpointPickingPolicy.getProvider().getPolicyName()) + .isEqualTo("round_robin"); + } +} diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java index a5e68d3b8c..845858ae94 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java @@ -74,7 +74,6 @@ import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.ChannelCreds; import io.grpc.xds.Bootstrapper.ServerInfo; -import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; import io.grpc.xds.LocalityStore.LocalityStoreFactory; import io.grpc.xds.XdsClient.EndpointUpdate; @@ -147,8 +146,6 @@ public class EdsLoadBalancerTest { @Mock private Helper helper; @Mock - private ResourceUpdateCallback resourceUpdateCallback; - @Mock private Bootstrapper bootstrapper; @Captor ArgumentCaptor connectivityStateCaptor; @@ -248,9 +245,8 @@ public class EdsLoadBalancerTest { fakeClock.getStopwatchSupplier())); } - edsLb = new EdsLoadBalancer( - helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper, - channelFactory); + edsLb = + new EdsLoadBalancer(helper, lbRegistry, localityStoreFactory, bootstrapper, channelFactory); } @After @@ -289,7 +285,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of(buildDropOverload("throttle", 1000))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); // handleResolutionError() after receiving endpoint update. edsLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status")); @@ -299,7 +295,7 @@ public class EdsLoadBalancerTest { } @Test - public void handleEdsServiceNameChangeInXdsConfig() { + public void handleEdsServiceNameChange() { assertThat(childHelpers).isEmpty(); deliverResolvedAddresses("edsServiceName1", null, fakeEndpointPickingPolicy); @@ -311,7 +307,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childHelpers).hasSize(1); Helper childHelper1 = childHelpers.get("subzone1"); LoadBalancer childBalancer1 = childBalancers.get("subzone1"); @@ -334,7 +330,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.2", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childHelpers).hasSize(2); Helper childHelper2 = childHelpers.get("subzone2"); LoadBalancer childBalancer2 = childBalancers.get("subzone2"); @@ -361,7 +357,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.3", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childHelpers).hasSize(3); Helper childHelper3 = childHelpers.get("subzone3"); @@ -389,7 +385,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.4", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childHelpers).hasSize(4); Helper childHelper4 = childHelpers.get("subzone4"); @@ -415,7 +411,7 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.5", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childHelpers).hasSize(5); Helper childHelper5 = childHelpers.get("subzone5"); @@ -439,40 +435,6 @@ public class EdsLoadBalancerTest { verify(childBalancer5, never()).shutdown(); } - @Test - public void firstAndSecondEdsResponseReceived_onWorkingCalledOnce() { - deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); - - verify(resourceUpdateCallback, never()).onWorking(); - - // first EDS response - ClusterLoadAssignment clusterLoadAssignment = - buildClusterLoadAssignment(CLUSTER_NAME, - ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", - ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), - 1, 0)), - ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); - - verify(resourceUpdateCallback).onWorking(); - - // second EDS response - clusterLoadAssignment = - buildClusterLoadAssignment(CLUSTER_NAME, - ImmutableList.of( - buildLocalityLbEndpoints("region1", "zone1", "subzone1", - ImmutableList.of( - buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2), - buildLbEndpoint("192.168.0.2", 8080, HEALTHY, 2)), - 1, 0)), - ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); - verify(resourceUpdateCallback, times(1)).onWorking(); - verify(resourceUpdateCallback, never()).onError(); - } - @Test public void handleAllDropUpdates_pickersAreDropped() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); @@ -485,9 +447,8 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); - verify(resourceUpdateCallback, never()).onAllDrop(); assertThat(childBalancers).hasSize(1); verify(childBalancers.get("subzone1")).handleResolvedAddresses( argThat(RoundRobinBackendsMatcher.builder().addHostAndPort("192.168.0.1", 8080).build())); @@ -515,15 +476,12 @@ public class EdsLoadBalancerTest { buildDropOverload("cat_1", 3), buildDropOverload("cat_2", 1_000_001), buildDropOverload("cat_3", 4))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); - verify(resourceUpdateCallback).onAllDrop(); verify(helper, atLeastOnce()).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker pickerExpectedDropAll = pickerCaptor.getValue(); assertThat(pickerExpectedDropAll.pickSubchannel(mock(PickSubchannelArgs.class)).isDrop()) .isTrue(); - - verify(resourceUpdateCallback, never()).onError(); } @Test @@ -557,7 +515,7 @@ public class EdsLoadBalancerTest { CLUSTER_NAME, ImmutableList.of(localityLbEndpoints1, localityLbEndpoints2, localityLbEndpoints3), ImmutableList.of()); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); assertThat(childBalancers).hasSize(3); verify(childBalancers.get("subzone1")).handleResolvedAddresses( @@ -586,8 +544,6 @@ public class EdsLoadBalancerTest { verify(helper, never()).updateBalancingState(eq(READY), any(SubchannelPicker.class)); childHelper2.updateBalancingState(READY, picker); assertLatestSubchannelPicker(subchannel); - - verify(resourceUpdateCallback, never()).onError(); } // Uses a fake LocalityStoreFactory that creates a mock LocalityStore, and verifies interaction @@ -614,9 +570,8 @@ public class EdsLoadBalancerTest { return localityStore; } }; - edsLb = new EdsLoadBalancer( - helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper, - channelFactory); + edsLb = + new EdsLoadBalancer(helper, lbRegistry, localityStoreFactory, bootstrapper, channelFactory); deliverResolvedAddresses("edsServiceName1", null, fakeEndpointPickingPolicy); assertThat(localityStores).hasSize(1); @@ -632,7 +587,7 @@ public class EdsLoadBalancerTest { ImmutableList.of( buildDropOverload("cat_1", 3), buildDropOverload("cat_2", 456))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); EndpointUpdate endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment); verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies()); verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap()); @@ -648,7 +603,7 @@ public class EdsLoadBalancerTest { ImmutableList.of( buildDropOverload("cat_1", 3), buildDropOverload("cat_3", 4))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment); verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies()); @@ -670,7 +625,7 @@ public class EdsLoadBalancerTest { ImmutableList.of( buildDropOverload("cat_1", 3), buildDropOverload("cat_3", 4))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment); verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies()); verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap()); @@ -680,10 +635,8 @@ public class EdsLoadBalancerTest { public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() { deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy); - verify(resourceUpdateCallback, never()).onError(); // Forwarding 20 seconds so that the xds client will deem EDS resource not available. fakeClock.forwardTime(20, TimeUnit.SECONDS); - verify(resourceUpdateCallback).onError(); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); } @@ -699,17 +652,15 @@ public class EdsLoadBalancerTest { buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)), 1, 0)), ImmutableList.of(buildDropOverload("throttle", 1000))); - receiveEndpointUpdate(clusterLoadAssignment); + deliverClusterLoadAssignments(clusterLoadAssignment); verify(helper, never()).updateBalancingState( eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); - verify(resourceUpdateCallback, never()).onError(); // XdsClient stream receives an error. responseObserver.onError(new RuntimeException("fake error")); verify(helper, never()).updateBalancingState( eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); - verify(resourceUpdateCallback).onError(); } /** @@ -752,7 +703,7 @@ public class EdsLoadBalancerTest { edsLb.handleResolvedAddresses(resolvedAddressBuilder.build()); } - private void receiveEndpointUpdate(ClusterLoadAssignment clusterLoadAssignment) { + private void deliverClusterLoadAssignments(ClusterLoadAssignment clusterLoadAssignment) { responseObserver.onNext( buildDiscoveryResponse( String.valueOf(versionIno++), diff --git a/xds/src/test/java/io/grpc/xds/FallbackLbTest.java b/xds/src/test/java/io/grpc/xds/FallbackLbTest.java deleted file mode 100644 index f06c70ac04..0000000000 --- a/xds/src/test/java/io/grpc/xds/FallbackLbTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static io.grpc.ConnectivityState.CONNECTING; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableList; -import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancer.Helper; -import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.SubchannelPicker; -import io.grpc.LoadBalancerProvider; -import io.grpc.LoadBalancerRegistry; -import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FallbackLb}. - */ -@RunWith(JUnit4.class) -// TODO(creamsoup) use parsed service config -public class FallbackLbTest { - - private final LoadBalancerProvider fallbackProvider1 = new LoadBalancerProvider() { - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - @Override - public String getPolicyName() { - return "fallback_1"; - } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - helpers1.add(helper); - LoadBalancer balancer = mock(LoadBalancer.class); - balancers1.add(balancer); - return balancer; - } - }; - - private final LoadBalancerProvider fallbackProvider2 = new LoadBalancerProvider() { - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - @Override - public String getPolicyName() { - return "fallback_2"; - } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - // just return mock and recored helper and balancer - helpers2.add(helper); - LoadBalancer balancer = mock(LoadBalancer.class); - balancers2.add(balancer); - return balancer; - } - }; - - private final Helper helper = mock(Helper.class); - private final List helpers1 = new ArrayList<>(); - private final List helpers2 = new ArrayList<>(); - private final List balancers1 = new ArrayList<>(); - private final List balancers2 = new ArrayList<>(); - private final PolicySelection fakeChildPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, new Object()); - - private LoadBalancer fallbackLb; - - @Before - public void setUp() { - LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); - lbRegistry.register(fallbackProvider1); - lbRegistry.register(fallbackProvider2); - fallbackLb = new FallbackLb(helper); - - assertThat(helpers1).isEmpty(); - assertThat(helpers2).isEmpty(); - assertThat(balancers1).isEmpty(); - assertThat(balancers2).isEmpty(); - } - - @Test - public void handlePolicyChanges() { - EquivalentAddressGroup eag111 = new EquivalentAddressGroup(mock(SocketAddress.class)); - EquivalentAddressGroup eag112 = new EquivalentAddressGroup(mock(SocketAddress.class)); - List eags11 = ImmutableList.of(eag111, eag112); - Object lbConfig11 = new Object(); - fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags11) - .setLoadBalancingPolicyConfig(new XdsConfig( - null, - null, - fakeChildPolicy, - new PolicySelection(fallbackProvider1, null, lbConfig11))) - .build()); - - assertThat(helpers1).hasSize(1); - assertThat(balancers1).hasSize(1); - Helper helper1 = helpers1.get(0); - LoadBalancer balancer1 = balancers1.get(0); - verify(balancer1).handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags11) - .setLoadBalancingPolicyConfig(lbConfig11) - .build()); - - SubchannelPicker picker1 = mock(SubchannelPicker.class); - helper1.updateBalancingState(CONNECTING, picker1); - verify(helper).updateBalancingState(CONNECTING, picker1); - - EquivalentAddressGroup eag121 = new EquivalentAddressGroup(mock(SocketAddress.class)); - List eags12 = ImmutableList.of(eag121); - Object lbConfig12 = new Object(); - fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags12) - .setLoadBalancingPolicyConfig(new XdsConfig( - null, - null, - fakeChildPolicy, - new PolicySelection(fallbackProvider1, null, lbConfig12))) - .build()); - - verify(balancer1).handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags12) - .setLoadBalancingPolicyConfig(lbConfig12) - .build()); - - verify(balancer1, never()).shutdown(); - assertThat(helpers2).isEmpty(); - assertThat(balancers2).isEmpty(); - - // change fallback policy to fallback_2 - EquivalentAddressGroup eag211 = new EquivalentAddressGroup(mock(SocketAddress.class)); - EquivalentAddressGroup eag212 = new EquivalentAddressGroup(mock(SocketAddress.class)); - List eags21 = ImmutableList.of(eag211, eag212); - Object lbConfig21 = new Object(); - fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags21) - .setLoadBalancingPolicyConfig(new XdsConfig( - null, - null, - fakeChildPolicy, - new PolicySelection(fallbackProvider2, null, lbConfig21))) - .build()); - - verify(balancer1).shutdown(); - assertThat(helpers2).hasSize(1); - assertThat(balancers2).hasSize(1); - Helper helper2 = helpers2.get(0); - LoadBalancer balancer2 = balancers2.get(0); - verify(balancer1, never()).handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags21) - .setLoadBalancingPolicyConfig(lbConfig21) - .build()); - verify(balancer2).handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags21) - .setLoadBalancingPolicyConfig(lbConfig21) - .build()); - - picker1 = mock(SubchannelPicker.class); - helper1.updateBalancingState(CONNECTING, picker1); - verify(helper, never()).updateBalancingState(CONNECTING, picker1); - SubchannelPicker picker2 = mock(SubchannelPicker.class); - helper2.updateBalancingState(CONNECTING, picker2); - verify(helper).updateBalancingState(CONNECTING, picker2); - - EquivalentAddressGroup eag221 = new EquivalentAddressGroup(mock(SocketAddress.class)); - List eags22 = ImmutableList.of(eag221); - Object lbConfig22 = new Object(); - fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags22) - .setLoadBalancingPolicyConfig(new XdsConfig( - null, - null, - fakeChildPolicy, - new PolicySelection(fallbackProvider2, null, lbConfig22))) - .build()); - - verify(balancer2).handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags22) - .setLoadBalancingPolicyConfig(lbConfig22) - .build()); - - assertThat(helpers1).hasSize(1); - assertThat(balancers1).hasSize(1); - assertThat(helpers2).hasSize(1); - assertThat(balancers2).hasSize(1); - - verify(balancer2, never()).shutdown(); - fallbackLb.shutdown(); - verify(balancer2).shutdown(); - } - - @Test - public void propagateAddressesToFallbackPolicy() { - EquivalentAddressGroup eag1 = new EquivalentAddressGroup( - ImmutableList.of(new InetSocketAddress(8080))); - EquivalentAddressGroup eag2 = new EquivalentAddressGroup( - ImmutableList.of(new InetSocketAddress(8082))); - List eags = ImmutableList.of(eag1, eag2); - - Object lbConfig = new Object(); - fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags) - .setLoadBalancingPolicyConfig(new XdsConfig( - null, - null, - fakeChildPolicy, - new PolicySelection(fallbackProvider1, null, lbConfig))) - .build()); - - LoadBalancer balancer1 = balancers1.get(0); - verify(balancer1).handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of(eag1, eag2)) - .setLoadBalancingPolicyConfig(lbConfig) - .build()); - } -} diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java deleted file mode 100644 index 2bd2e08539..0000000000 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; - -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancer.Helper; -import io.grpc.LoadBalancerProvider; -import io.grpc.LoadBalancerRegistry; -import io.grpc.NameResolver.ConfigOrError; -import io.grpc.internal.JsonParser; -import io.grpc.internal.ServiceConfigUtil; -import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; -import java.util.HashMap; -import java.util.Map; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** - * Tests for {@link XdsLoadBalancerProvider}. - */ -@RunWith(JUnit4.class) -public class XdsLoadBalancerProviderTest { - - @Rule - public final MockitoRule mocks = MockitoJUnit.rule(); - - @Mock - private LoadBalancer fakeBalancer1; - - private final LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry(); - private final Object lbConfig1 = new Object(); - private final LoadBalancerProvider lbProvider1 = new LoadBalancerProvider() { - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - @Override - public String getPolicyName() { - return "supported_1"; - } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return fakeBalancer1; - } - - @Override - public ConfigOrError parseLoadBalancingPolicyConfig( - Map rawLoadBalancingPolicyConfig) { - return ConfigOrError.fromConfig(lbConfig1); - } - }; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - lbRegistry.register(lbProvider1); - } - - @After - public void tearDown() { - lbRegistry.deregister(lbProvider1); - } - - @Test - public void parseLoadBalancingConfigPolicy() throws Exception { - String rawLbConfig = "{" - + "\"childPolicy\" : " - + " [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"supported_1\" : {}}]," - + "\"fallbackPolicy\" : []," - + "\"edsServiceName\" : \"dns:///eds.service.com:8080\"," - + "\"lrsLoadReportingServerName\" : \"dns:///lrs.service.com:8080\"" - + "}"; - Map rawlbConfigMap = checkObject(JsonParser.parse(rawLbConfig)); - ConfigOrError configOrError = - XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(rawlbConfigMap, lbRegistry); - - assertThat(configOrError.getError()).isNull(); - assertThat(configOrError.getConfig()).isInstanceOf(XdsConfig.class); - assertThat(configOrError.getConfig()).isEqualTo( - new XdsConfig( - "dns:///eds.service.com:8080", - "dns:///lrs.service.com:8080", - new PolicySelection(lbProvider1, - ServiceConfigUtil.unwrapLoadBalancingConfig( - checkObject(JsonParser.parse("{\"supported_1\" : {}}"))).getRawConfigValue(), - lbConfig1), - new PolicySelection( - lbRegistry.getProvider("round_robin"), - new HashMap(), - "no service config")) - ); - } - - @SuppressWarnings("unchecked") - private static Map checkObject(Object o) { - return (Map) o; - } -} diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java deleted file mode 100644 index b77c09fdb1..0000000000 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerTest.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static io.grpc.ConnectivityState.CONNECTING; -import static io.grpc.ConnectivityState.READY; -import static org.mockito.ArgumentMatchers.same; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import io.grpc.Attributes; -import io.grpc.ChannelLogger; -import io.grpc.ConnectivityStateInfo; -import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancer; -import io.grpc.LoadBalancer.Helper; -import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelPicker; -import io.grpc.LoadBalancerProvider; -import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.internal.FakeClock; -import io.grpc.internal.ServiceConfigUtil.PolicySelection; -import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback; -import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; -import io.grpc.xds.XdsLoadBalancer.PrimaryLbFactory; -import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** Unit tests for {@link XdsLoadBalancer}. */ -@RunWith(JUnit4.class) -public class XdsLoadBalancerTest { - private static final String AUTHORITY = "grpc-test.googleapis.com"; - - @Rule - public final ExpectedException thrown = ExpectedException.none(); - @Rule - public final MockitoRule mockitoRule = MockitoJUnit.rule(); - - private final FakeClock fakeClock = new FakeClock(); - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - - @Mock - private Helper helper; - private LoadBalancer xdsLoadBalancer; - private ResourceUpdateCallback resourceUpdateCallback; - - private Helper primaryLbHelper; - private final List primaryLbs = new ArrayList<>(); - - private Helper fallbackLbHelper; - private final List fallbackLbs = new ArrayList<>(); - - private int requestConnectionTimes; - private int primaryLbHandleAddrsTimes; - - @Before - public void setUp() { - PrimaryLbFactory primaryLbFactory = new PrimaryLbFactory() { - @Override - public LoadBalancer newLoadBalancer( - Helper helper, ResourceUpdateCallback resourceUpdateCallback) { - // just return a mock and record the input and output - primaryLbHelper = helper; - XdsLoadBalancerTest.this.resourceUpdateCallback = resourceUpdateCallback; - LoadBalancer primaryLb = mock(LoadBalancer.class); - primaryLbs.add(primaryLb); - return primaryLb; - } - }; - LoadBalancer.Factory fallbackLbFactory = new LoadBalancer.Factory() { - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - // just return a mock and record the input and output - fallbackLbHelper = helper; - LoadBalancer fallbackLb = mock(LoadBalancer.class); - fallbackLbs.add(fallbackLb); - return fallbackLb; - } - }; - doReturn(AUTHORITY).when(helper).getAuthority(); - doReturn(syncContext).when(helper).getSynchronizationContext(); - doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); - doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); - - xdsLoadBalancer = - new XdsLoadBalancer(helper, primaryLbFactory, fallbackLbFactory); - xdsLoadBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of()).build()); - } - - @Test - public void tearDown() { - assertThat(primaryLbs).hasSize(1); - xdsLoadBalancer.shutdown(); - for (LoadBalancer primaryLb : primaryLbs) { - verify(primaryLb).shutdown(); - } - for (LoadBalancer fallbackLb : fallbackLbs) { - verify(fallbackLb).shutdown(); - } - assertThat(fakeClock.getPendingTasks()).isEmpty(); - } - - @Test - public void canHandleEmptyAddressListFromNameResolution() { - assertThat(xdsLoadBalancer.canHandleEmptyAddressListFromNameResolution()).isTrue(); - } - - @Test - public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() { - verifyNotInFallbackMode(); - fakeClock.forwardTime(9, TimeUnit.SECONDS); - resourceUpdateCallback.onWorking(); - verifyNotInFallbackMode(); - fakeClock.forwardTime(1, TimeUnit.SECONDS); - verifyInFallbackMode(); - - SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); - primaryLbHelper.updateBalancingState(READY, subchannelPicker); - verify(helper).updateBalancingState(READY, subchannelPicker); - verifyNotInFallbackMode(); - - assertThat(fallbackLbs).hasSize(1); - } - - @Test - public void backendReadyBeforeTimeoutAtStartup_expectNoFallback() { - verifyNotInFallbackMode(); - fakeClock.forwardTime(9, TimeUnit.SECONDS); - verifyNotInFallbackMode(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - - resourceUpdateCallback.onWorking(); - SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); - primaryLbHelper.updateBalancingState(READY, subchannelPicker); - verify(helper).updateBalancingState(READY, subchannelPicker); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - verifyNotInFallbackMode(); - - assertThat(fallbackLbs).isEmpty(); - } - - @Test - public void recevieAllDropBeforeTimeoutAtStartup_expectNoFallback() { - verifyNotInFallbackMode(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - - resourceUpdateCallback.onAllDrop(); - assertThat(fakeClock.getPendingTasks()).isEmpty(); - verifyNotInFallbackMode(); - - assertThat(fallbackLbs).isEmpty(); - } - - @Test - public void primaryFailsWithoutSeeingEdsResponseBeforeTimeoutAtStartup() { - verifyNotInFallbackMode(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - - resourceUpdateCallback.onError(); - verifyInFallbackMode(); - - assertThat(fallbackLbs).hasSize(1); - } - - @Test - public void primarySeeingEdsResponseThenFailsBeforeTimeoutAtStartup() { - verifyNotInFallbackMode(); - assertThat(fakeClock.getPendingTasks()).hasSize(1); - resourceUpdateCallback.onWorking(); - resourceUpdateCallback.onError(); - verifyNotInFallbackMode(); - - fakeClock.forwardTime(10, TimeUnit.SECONDS); - verifyInFallbackMode(); - - assertThat(fallbackLbs).hasSize(1); - } - - @Test - public void fallbackWillHandleLastResolvedAddresses() { - verifyNotInFallbackMode(); - - PolicySelection childPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - PolicySelection fallbackPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of()) - .setLoadBalancingPolicyConfig( - new XdsConfig(null, null, childPolicy, fallbackPolicy)) - .build(); - xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); - - resourceUpdateCallback.onError(); - LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs); - verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses)); - } - - private void verifyInFallbackMode() { - assertThat(primaryLbs).isNotEmpty(); - assertThat(fallbackLbs).isNotEmpty(); - LoadBalancer primaryLb = Iterables.getLast(primaryLbs); - LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs); - verify(primaryLb, never()).shutdown(); - verify(fallbackLb, never()).shutdown(); - - xdsLoadBalancer.requestConnection(); - verify(primaryLb, times(++requestConnectionTimes)).requestConnection(); - verify(fallbackLb).requestConnection(); - - PolicySelection childPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - PolicySelection fallbackPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of()) - .setAttributes( - Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build()) - .setLoadBalancingPolicyConfig(new XdsConfig(null, null, childPolicy, fallbackPolicy)) - .build(); - xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); - verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses)); - ArgumentCaptor resolvedAddrCaptor = ArgumentCaptor.forClass(null); - verify(primaryLb, times(++primaryLbHandleAddrsTimes)) - .handleResolvedAddresses(resolvedAddrCaptor.capture()); - ResolvedAddresses capturedResolvedAddr = resolvedAddrCaptor.getValue(); - EdsConfig edsConfig = (EdsConfig) capturedResolvedAddr.getLoadBalancingPolicyConfig(); - assertThat(edsConfig.endpointPickingPolicy).isEqualTo(childPolicy); - - Status status = Status.DATA_LOSS.withDescription(""); - xdsLoadBalancer.handleNameResolutionError(status); - verify(primaryLb).handleNameResolutionError(same(status)); - verify(fallbackLb).handleNameResolutionError(same(status)); - - SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); - primaryLbHelper.updateBalancingState(CONNECTING, subchannelPicker); - verify(helper, never()).updateBalancingState(CONNECTING, subchannelPicker); - fallbackLbHelper.updateBalancingState(CONNECTING, subchannelPicker); - verify(helper).updateBalancingState(CONNECTING, subchannelPicker); - } - - private void verifyNotInFallbackMode() { - for (LoadBalancer fallbackLb : fallbackLbs) { - verify(fallbackLb).shutdown(); - } - LoadBalancer primaryLb = Iterables.getLast(primaryLbs); - - xdsLoadBalancer.requestConnection(); - verify(primaryLb, times(++requestConnectionTimes)).requestConnection(); - - PolicySelection childPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - PolicySelection fallbackPolicy = - new PolicySelection(mock(LoadBalancerProvider.class), null, null); - ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of()) - .setAttributes( - Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build()) - .setLoadBalancingPolicyConfig(new XdsConfig(null, null, childPolicy, fallbackPolicy)) - .build(); - xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); - ArgumentCaptor resolvedAddrCaptor = ArgumentCaptor.forClass(null); - verify(primaryLb, times(++primaryLbHandleAddrsTimes)) - .handleResolvedAddresses(resolvedAddrCaptor.capture()); - ResolvedAddresses capturedResolvedAddr = resolvedAddrCaptor.getValue(); - EdsConfig edsConfig = (EdsConfig) capturedResolvedAddr.getLoadBalancingPolicyConfig(); - assertThat(edsConfig.endpointPickingPolicy).isEqualTo(childPolicy); - - Status status = Status.DATA_LOSS.withDescription(""); - xdsLoadBalancer.handleNameResolutionError(status); - verify(primaryLb).handleNameResolutionError(same(status)); - - SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); - primaryLbHelper.updateBalancingState(CONNECTING, subchannelPicker); - verify(helper).updateBalancingState(CONNECTING, subchannelPicker); - } - - @Deprecated - @Test - public void handleSubchannelState_shouldThrow() { - Subchannel subchannel = mock(Subchannel.class); - ConnectivityStateInfo connectivityStateInfo = ConnectivityStateInfo.forNonError(READY); - thrown.expect(UnsupportedOperationException.class); - xdsLoadBalancer.handleSubchannelState(subchannel, connectivityStateInfo); - } -}