diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index 797f9ae4ed..8f9d396fcc 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -28,14 +28,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.grpc.ConnectivityState; -import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; -import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; @@ -76,12 +74,27 @@ interface LocalityStore { void updateDropPercentage(List dropOverloads); - void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState); - void updateOobMetricsReportInterval(long reportIntervalNano); LoadStatsStore getLoadStatsStore(); + @VisibleForTesting + abstract class LocalityStoreFactory { + private static final LocalityStoreFactory DEFAULT_INSTANCE = + new LocalityStoreFactory() { + @Override + LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) { + return new LocalityStoreImpl(helper, lbRegistry); + } + }; + + static LocalityStoreFactory getInstance() { + return DEFAULT_INSTANCE; + } + + abstract LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry); + } + final class LocalityStoreImpl implements LocalityStore { private static final String ROUND_ROBIN = "round_robin"; private static final long DELAYED_DELETION_TIMEOUT_MINUTES = 15L; @@ -176,16 +189,6 @@ interface LocalityStore { } }; - // This is triggered by xdsLoadbalancer.handleSubchannelState - @Override - public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { - // delegate to the childBalancer who manages this subchannel - for (LocalityLbInfo localityLbInfo : localityMap.values()) { - // This will probably trigger childHelper.updateBalancingState - localityLbInfo.childBalancer.handleSubchannelState(subchannel, newState); - } - } - @Override public void reset() { for (Locality locality : localityMap.keySet()) { @@ -591,7 +594,7 @@ interface LocalityStore { Helper childHelper, final LocalityLbInfo localityLbInfo, final LocalityLbEndpoints localityLbEndpoints, final Locality locality) { final List eags = new ArrayList<>(); - for (LbEndpoint endpoint: localityLbEndpoints.getEndpoints()) { + for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) { if (endpoint.isHealthy()) { eags.add(endpoint.getAddress()); } diff --git a/xds/src/main/java/io/grpc/xds/LookasideChannelLb.java b/xds/src/main/java/io/grpc/xds/LookasideChannelLb.java deleted file mode 100644 index 52658be5d3..0000000000 --- a/xds/src/main/java/io/grpc/xds/LookasideChannelLb.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.grpc.LoadBalancer; -import io.grpc.Status; -import io.grpc.xds.EnvoyProtoData.DropOverload; -import io.grpc.xds.EnvoyProtoData.Locality; -import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; -import io.grpc.xds.LoadReportClient.LoadReportCallback; -import io.grpc.xds.XdsClient.EndpointUpdate; -import io.grpc.xds.XdsClient.EndpointWatcher; -import java.util.List; -import java.util.Map; - -/** - * A load balancer that has a lookaside channel. This layer of load balancer creates a channel to - * the remote load balancer. LrsClient, LocalityStore and XdsComms are three branches below this - * layer, and their implementations are provided by their factories. - */ -final class LookasideChannelLb extends LoadBalancer { - - private final LoadReportClient lrsClient; - private final XdsClient xdsClient; - - LookasideChannelLb( - String edsServiceName, - LookasideChannelCallback lookasideChannelCallback, - XdsClient xdsClient, - LoadReportClient lrsClient, - final LocalityStore localityStore) { - this.xdsClient = xdsClient; - LoadReportCallback lrsCallback = - new LoadReportCallback() { - @Override - public void onReportResponse(long reportIntervalNano) { - localityStore.updateOobMetricsReportInterval(reportIntervalNano); - } - }; - this.lrsClient = lrsClient; - - EndpointWatcher endpointWatcher = new EndpointWatcherImpl( - lookasideChannelCallback, lrsClient, lrsCallback, localityStore) ; - xdsClient.watchEndpointData(edsServiceName, endpointWatcher); - } - - @Override - public void handleNameResolutionError(Status error) { - // NO-OP? - } - - @Override - public void shutdown() { - lrsClient.stopLoadReporting(); - xdsClient.shutdown(); - } - - private static final class EndpointWatcherImpl implements EndpointWatcher { - - final LookasideChannelCallback lookasideChannelCallback; - final LoadReportClient lrsClient; - final LoadReportCallback lrsCallback; - final LocalityStore localityStore; - boolean firstEdsResponseReceived; - - EndpointWatcherImpl( - LookasideChannelCallback lookasideChannelCallback, LoadReportClient lrsClient, - LoadReportCallback lrsCallback, LocalityStore localityStore) { - this.lookasideChannelCallback = lookasideChannelCallback; - this.lrsClient = lrsClient; - this.lrsCallback = lrsCallback; - this.localityStore = localityStore; - } - - @Override - public void onEndpointChanged(EndpointUpdate endpointUpdate) { - if (!firstEdsResponseReceived) { - firstEdsResponseReceived = true; - lookasideChannelCallback.onWorking(); - lrsClient.startLoadReporting(lrsCallback); - } - - List dropOverloads = endpointUpdate.getDropPolicies(); - ImmutableList.Builder dropOverloadsBuilder = ImmutableList.builder(); - for (DropOverload dropOverload : dropOverloads) { - dropOverloadsBuilder.add(dropOverload); - if (dropOverload.getDropsPerMillion() == 1_000_000) { - lookasideChannelCallback.onAllDrop(); - break; - } - } - localityStore.updateDropPercentage(dropOverloadsBuilder.build()); - - ImmutableMap.Builder localityEndpointsMapping = - new ImmutableMap.Builder<>(); - for (Map.Entry entry - : endpointUpdate.getLocalityLbEndpointsMap().entrySet()) { - int localityWeight = entry.getValue().getLocalityWeight(); - - if (localityWeight != 0) { - localityEndpointsMapping.put(entry.getKey(), entry.getValue()); - } - } - - localityStore.updateLocalityStore(localityEndpointsMapping.build()); - } - - @Override - public void onError(Status error) { - lookasideChannelCallback.onError(); - } - } - - - /** - * Callback on ADS stream events. The callback methods should be called in a proper {@link - * io.grpc.SynchronizationContext}. - */ - interface LookasideChannelCallback { - - /** - * Once the response observer receives the first response. - */ - void onWorking(); - - /** - * Once an error occurs in ADS stream. - */ - void onError(); - - /** - * Once receives a response indicating that 100% of calls should be dropped. - */ - void onAllDrop(); - } -} diff --git a/xds/src/main/java/io/grpc/xds/LookasideLb.java b/xds/src/main/java/io/grpc/xds/LookasideLb.java index 41723be0c4..9fd9a8c8cf 100644 --- a/xds/src/main/java/io/grpc/xds/LookasideLb.java +++ b/xds/src/main/java/io/grpc/xds/LookasideLb.java @@ -22,6 +22,8 @@ import static io.grpc.xds.XdsNameResolver.XDS_NODE; import static java.util.logging.Level.FINEST; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.Struct; import com.google.protobuf.Value; import io.envoyproxy.envoy.api.v2.core.Node; @@ -32,46 +34,60 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; import io.grpc.alts.GoogleDefaultChannelBuilder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.util.ForwardingLoadBalancer; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.Bootstrapper.ChannelCreds; -import io.grpc.xds.LocalityStore.LocalityStoreImpl; -import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.LoadReportClient.LoadReportCallback; +import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; +import io.grpc.xds.LocalityStore.LocalityStoreFactory; +import io.grpc.xds.XdsClient.EndpointUpdate; +import io.grpc.xds.XdsClient.EndpointWatcher; import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.logging.Logger; +import javax.annotation.Nullable; /** Lookaside load balancer that handles balancer name changes. */ final class LookasideLb extends ForwardingLoadBalancer { - private final LookasideChannelCallback lookasideChannelCallback; - private final LookasideChannelLbFactory lookasideChannelLbFactory; + private final EdsUpdateCallback edsUpdateCallback; private final GracefulSwitchLoadBalancer lookasideChannelLb; private final LoadBalancerRegistry lbRegistry; + private final LocalityStoreFactory localityStoreFactory; + private final LoadReportClientFactory loadReportClientFactory; private String balancerName; - LookasideLb(Helper lookasideLbHelper, LookasideChannelCallback lookasideChannelCallback) { + LookasideLb(Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { this( - lookasideLbHelper, lookasideChannelCallback, new LookasideChannelLbFactoryImpl(), - LoadBalancerRegistry.getDefaultRegistry()); + lookasideLbHelper, + edsUpdateCallback, + LoadBalancerRegistry.getDefaultRegistry(), + LocalityStoreFactory.getInstance(), + LoadReportClientFactory.getInstance()); } @VisibleForTesting LookasideLb( Helper lookasideLbHelper, - LookasideChannelCallback lookasideChannelCallback, - LookasideChannelLbFactory lookasideChannelLbFactory, - LoadBalancerRegistry lbRegistry) { - this.lookasideChannelCallback = lookasideChannelCallback; - this.lookasideChannelLbFactory = lookasideChannelLbFactory; + EdsUpdateCallback edsUpdateCallback, + LoadBalancerRegistry lbRegistry, + LocalityStoreFactory localityStoreFactory, + LoadReportClientFactory loadReportClientFactory) { + this.edsUpdateCallback = edsUpdateCallback; this.lbRegistry = lbRegistry; this.lookasideChannelLb = new GracefulSwitchLoadBalancer(lookasideLbHelper); + this.localityStoreFactory = localityStoreFactory; + this.loadReportClientFactory = loadReportClientFactory; } @Override @@ -93,121 +109,205 @@ final class LookasideLb extends ForwardingLoadBalancer { } XdsConfig xdsConfig = (XdsConfig) cfg.getConfig(); - String newBalancerName = xdsConfig.balancerName; + final String newBalancerName = xdsConfig.balancerName; // The is to handle the legacy usecase that requires balancerName from xds config. if (!newBalancerName.equals(balancerName)) { balancerName = newBalancerName; // cache the name and check next time for optimization - Node node = resolvedAddresses.getAttributes().get(XDS_NODE); - if (node == null) { + Node nodeFromResolvedAddresses = resolvedAddresses.getAttributes().get(XDS_NODE); + final Node node; + if (nodeFromResolvedAddresses == null) { node = Node.newBuilder() .setMetadata(Struct.newBuilder() .putFields( "endpoints_required", Value.newBuilder().setBoolValue(true).build())) .build(); + } else { + node = nodeFromResolvedAddresses; } - List channelCredsList = + List channelCredsListFromResolvedAddresses = resolvedAddresses.getAttributes().get(XDS_CHANNEL_CREDS_LIST); - if (channelCredsList == null) { + final List channelCredsList; + if (channelCredsListFromResolvedAddresses == null) { channelCredsList = Collections.emptyList(); + } else { + channelCredsList = channelCredsListFromResolvedAddresses; } - lookasideChannelLb.switchTo(newLookasideChannelLbProvider( - newBalancerName, node, channelCredsList)); + + LoadBalancerProvider childBalancerProvider = new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + /** + * A synthetic policy name identified by balancerName. The implementation detail doesn't + * matter. + */ + @Override + public String getPolicyName() { + return "xds_child_policy_balancer_name_" + newBalancerName; + } + + @Override + public LoadBalancer newLoadBalancer(final Helper helper) { + return new LoadBalancer() { + @Nullable + XdsClient xdsClient; + @Nullable + LocalityStore localityStore; + @Nullable + LoadReportClient lrsClient; + + @Override + public void handleNameResolutionError(Status error) {} + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + if (xdsClient == null) { + ManagedChannel channel = initLbChannel(helper, newBalancerName, channelCredsList); + xdsClient = new XdsComms2( + channel, helper, new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, node); + localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry); + // TODO(zdapeng): Use XdsClient to do Lrs directly. + lrsClient = loadReportClientFactory.createLoadReportClient( + channel, helper, new ExponentialBackoffPolicy.Provider(), + localityStore.getLoadStatsStore()); + final LoadReportCallback lrsCallback = + new LoadReportCallback() { + @Override + public void onReportResponse(long reportIntervalNano) { + localityStore.updateOobMetricsReportInterval(reportIntervalNano); + } + }; + + EndpointWatcher endpointWatcher = + new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore); + xdsClient.watchEndpointData(node.getCluster(), endpointWatcher); + } + } + + @Override + public void shutdown() { + if (xdsClient != null) { + lrsClient.stopLoadReporting(); + localityStore.reset(); + xdsClient.shutdown(); + } + } + }; + } + }; + + lookasideChannelLb.switchTo(childBalancerProvider); } lookasideChannelLb.handleResolvedAddresses(resolvedAddresses); } - private LoadBalancerProvider newLookasideChannelLbProvider( - final String balancerName, final Node node, final List channelCredsList) { - return new LoadBalancerProvider() { - @Override - public boolean isAvailable() { - return true; + private static ManagedChannel initLbChannel( + Helper helper, + String balancerName, + List channelCredsList) { + ManagedChannel channel = null; + try { + channel = helper.createResolvingOobChannel(balancerName); + } catch (UnsupportedOperationException uoe) { + // Temporary solution until createResolvingOobChannel is implemented + // FIXME (https://github.com/grpc/grpc-java/issues/5495) + Logger logger = Logger.getLogger(LookasideLb.class.getName()); + if (logger.isLoggable(FINEST)) { + logger.log( + FINEST, + "createResolvingOobChannel() not supported by the helper: " + helper, + uoe); + logger.log(FINEST, "creating oob channel for target {0}", balancerName); } - @Override - public int getPriority() { - return 5; + // Use the first supported channel credentials configuration. + // Currently, only "google_default" is supported. + for (ChannelCreds creds : channelCredsList) { + if (creds.getType().equals("google_default")) { + channel = GoogleDefaultChannelBuilder.forTarget(balancerName).build(); + break; + } } - - /** - * A synthetic policy name for LookasideChannelLb identified by balancerName. The - * implementation detail doesn't matter. - */ - @Override - public String getPolicyName() { - return "xds_child_policy_balancer_name_" + balancerName; + if (channel == null) { + channel = ManagedChannelBuilder.forTarget(balancerName).build(); } - - @Override - public LoadBalancer newLoadBalancer(Helper helper) { - return lookasideChannelLbFactory.newLoadBalancer( - helper, lookasideChannelCallback, balancerName, node, channelCredsList); - } - }; + } + return channel; } - @VisibleForTesting - interface LookasideChannelLbFactory { - LoadBalancer newLoadBalancer( - Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName, - Node node, List channelCredsList); + /** + * Callbacks for the EDS-only-with-fallback usecase. Being deprecated. + */ + interface EdsUpdateCallback { + + void onWorking(); + + void onError(); + + void onAllDrop(); } - private static final class LookasideChannelLbFactoryImpl implements LookasideChannelLbFactory { + private final class EndpointWatcherImpl implements EndpointWatcher { - @Override - public LoadBalancer newLoadBalancer( - Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName, - Node node, List channelCredsList) { - ManagedChannel channel = initLbChannel(helper, balancerName, channelCredsList); - XdsClient xdsClient = new XdsComms2( - channel, helper, new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER, node); - LocalityStore localityStore = - new LocalityStoreImpl(helper, LoadBalancerRegistry.getDefaultRegistry()); - // TODO(zdapeng): Use XdsClient to do Lrs directly. - LoadReportClient lrsClient = new LoadReportClientImpl( - channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, new ExponentialBackoffPolicy.Provider(), - localityStore.getLoadStatsStore()); - return new LookasideChannelLb( - node.getCluster(), lookasideChannelCallback, xdsClient, lrsClient, localityStore); + final LoadReportClient lrsClient; + final LoadReportCallback lrsCallback; + final LocalityStore localityStore; + boolean firstEdsUpdateReceived; + + EndpointWatcherImpl( + LoadReportClient lrsClient, LoadReportCallback lrsCallback, LocalityStore localityStore) { + this.lrsClient = lrsClient; + this.lrsCallback = lrsCallback; + this.localityStore = localityStore; } - private static ManagedChannel initLbChannel( - Helper helper, - String balancerName, - List channelCredsList) { - ManagedChannel channel = null; - try { - channel = helper.createResolvingOobChannel(balancerName); - } catch (UnsupportedOperationException uoe) { - // Temporary solution until createResolvingOobChannel is implemented - // FIXME (https://github.com/grpc/grpc-java/issues/5495) - Logger logger = Logger.getLogger(LookasideChannelLb.class.getName()); - if (logger.isLoggable(FINEST)) { - logger.log( - FINEST, - "createResolvingOobChannel() not supported by the helper: " + helper, - uoe); - logger.log(FINEST, "creating oob channel for target {0}", balancerName); - } + @Override + public void onEndpointChanged(EndpointUpdate endpointUpdate) { + if (!firstEdsUpdateReceived) { + firstEdsUpdateReceived = true; + edsUpdateCallback.onWorking(); + lrsClient.startLoadReporting(lrsCallback); + } - // Use the first supported channel credentials configuration. - // Currently, only "google_default" is supported. - for (ChannelCreds creds : channelCredsList) { - if (creds.getType().equals("google_default")) { - channel = GoogleDefaultChannelBuilder.forTarget(balancerName).build(); - break; - } - } - if (channel == null) { - channel = ManagedChannelBuilder.forTarget(balancerName).build(); + List dropOverloads = endpointUpdate.getDropPolicies(); + ImmutableList.Builder dropOverloadsBuilder = ImmutableList.builder(); + for (DropOverload dropOverload : dropOverloads) { + dropOverloadsBuilder.add(dropOverload); + if (dropOverload.getDropsPerMillion() == 1_000_000) { + edsUpdateCallback.onAllDrop(); + break; } } - return channel; + localityStore.updateDropPercentage(dropOverloadsBuilder.build()); + + ImmutableMap.Builder localityEndpointsMapping = + new ImmutableMap.Builder<>(); + for (Map.Entry entry + : endpointUpdate.getLocalityLbEndpointsMap().entrySet()) { + int localityWeight = entry.getValue().getLocalityWeight(); + + if (localityWeight != 0) { + localityEndpointsMapping.put(entry.getKey(), entry.getValue()); + } + } + + localityStore.updateLocalityStore(localityEndpointsMapping.build()); + } + + @Override + public void onError(Status error) { + edsUpdateCallback.onError(); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java index a68b31354d..5aeb2680ee 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java @@ -27,7 +27,7 @@ import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.util.ForwardingLoadBalancerHelper; -import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback; +import io.grpc.xds.LookasideLb.EdsUpdateCallback; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -46,7 +46,7 @@ final class XdsLoadBalancer2 extends LoadBalancer { private final Helper helper; private final LoadBalancer lookasideLb; private final LoadBalancer.Factory fallbackLbFactory; - private final LookasideChannelCallback lookasideChannelCallback = new LookasideChannelCallback() { + private final EdsUpdateCallback edsUpdateCallback = new EdsUpdateCallback() { @Override public void onWorking() { if (childPolicyHasBeenReady) { @@ -94,7 +94,7 @@ final class XdsLoadBalancer2 extends LoadBalancer { LoadBalancer.Factory fallbackLbFactory) { this.helper = helper; this.lookasideLb = lookasideLbFactory.newLoadBalancer(new LookasideLbHelper(), - lookasideChannelCallback); + edsUpdateCallback); this.fallbackLbFactory = fallbackLbFactory; } @@ -247,14 +247,14 @@ final class XdsLoadBalancer2 extends LoadBalancer { /** Factory of a look-aside load balancer. The interface itself is for convenience in test. */ @VisibleForTesting interface LookasideLbFactory { - LoadBalancer newLoadBalancer(Helper helper, LookasideChannelCallback lookasideChannelCallback); + LoadBalancer newLoadBalancer(Helper helper, EdsUpdateCallback edsUpdateCallback); } private static final class LookasideLbFactoryImpl implements LookasideLbFactory { @Override public LoadBalancer newLoadBalancer( - Helper lookasideLbHelper, LookasideChannelCallback lookasideChannelCallback) { - return new LookasideLb(lookasideLbHelper, lookasideChannelCallback); + Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { + return new LookasideLb(lookasideLbHelper, edsUpdateCallback); } } diff --git a/xds/src/test/java/io/grpc/xds/LookasideChannelLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideChannelLbTest.java deleted file mode 100644 index c3ed9c9c3f..0000000000 --- a/xds/src/test/java/io/grpc/xds/LookasideChannelLbTest.java +++ /dev/null @@ -1,452 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Any; -import com.google.protobuf.UInt32Value; -import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; -import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; -import io.envoyproxy.envoy.api.v2.DiscoveryRequest; -import io.envoyproxy.envoy.api.v2.DiscoveryResponse; -import io.envoyproxy.envoy.api.v2.core.Address; -import io.envoyproxy.envoy.api.v2.core.Node; -import io.envoyproxy.envoy.api.v2.core.SocketAddress; -import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; -import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; -import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; -import io.envoyproxy.envoy.type.FractionalPercent; -import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; -import io.grpc.ChannelLogger; -import io.grpc.LoadBalancer.Helper; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.ExponentialBackoffPolicy; -import io.grpc.internal.GrpcUtil; -import io.grpc.internal.testing.StreamRecorder; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.grpc.xds.EnvoyProtoData.DropOverload; -import io.grpc.xds.EnvoyProtoData.Locality; -import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; -import io.grpc.xds.LoadReportClient.LoadReportCallback; -import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -/** - * Tests for {@link LookasideChannelLb}. - */ -@RunWith(JUnit4.class) -public class LookasideChannelLbTest { - - private static final String SERVICE_AUTHORITY = "test authority"; - - @Rule - public final MockitoRule mockitoRule = MockitoJUnit.rule(); - @Rule - public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); - } - }); - private final StreamRecorder streamRecorder = StreamRecorder.create(); - - private final DiscoveryResponse edsResponse = - DiscoveryResponse.newBuilder() - .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") - .build(); - - @Mock - private Helper helper; - @Mock - private LookasideChannelCallback lookasideChannelCallback; - @Mock - private LoadReportClient loadReportClient; - @Mock - private LocalityStore localityStore; - @Mock - private LoadStatsStore loadStatsStore; - - private ManagedChannel channel; - private StreamObserver serverResponseWriter; - - @Captor - private ArgumentCaptor> - localityEndpointsMappingCaptor; - - private LookasideChannelLb lookasideChannelLb; - - @Before - public void setUp() throws Exception { - AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { - @Override - public StreamObserver streamAggregatedResources( - final StreamObserver responseObserver) { - serverResponseWriter = responseObserver; - - return new StreamObserver() { - - @Override - public void onNext(DiscoveryRequest value) { - streamRecorder.onNext(value); - } - - @Override - public void onError(Throwable t) { - streamRecorder.onError(t); - } - - @Override - public void onCompleted() { - streamRecorder.onCompleted(); - responseObserver.onCompleted(); - } - }; - } - }; - - String serverName = InProcessServerBuilder.generateName(); - cleanupRule.register( - InProcessServerBuilder - .forName(serverName) - .directExecutor() - .addService(serviceImpl) - .build() - .start()); - channel = cleanupRule.register( - InProcessChannelBuilder - .forName(serverName) - .directExecutor() - .build()); - - doReturn(SERVICE_AUTHORITY).when(helper).getAuthority(); - doReturn(syncContext).when(helper).getSynchronizationContext(); - doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); - doReturn(loadStatsStore).when(localityStore).getLoadStatsStore(); - - XdsClient xdsClient = new XdsComms2( - channel, helper, new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER, Node.getDefaultInstance()); - lookasideChannelLb = new LookasideChannelLb( - "cluster1", lookasideChannelCallback, xdsClient, loadReportClient, localityStore); - } - - @Test - public void firstAndSecondEdsResponseReceived() { - verify(lookasideChannelCallback, never()).onWorking(); - verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class)); - - // first EDS response - serverResponseWriter.onNext(edsResponse); - verify(lookasideChannelCallback).onWorking(); - ArgumentCaptor loadReportCallbackCaptor = - ArgumentCaptor.forClass(LoadReportCallback.class); - verify(loadReportClient).startLoadReporting(loadReportCallbackCaptor.capture()); - LoadReportCallback loadReportCallback = loadReportCallbackCaptor.getValue(); - - // second EDS response - serverResponseWriter.onNext(edsResponse); - verify(lookasideChannelCallback, times(1)).onWorking(); - verify(loadReportClient, times(1)).startLoadReporting(any(LoadReportCallback.class)); - - verify(localityStore, never()).updateOobMetricsReportInterval(anyLong()); - loadReportCallback.onReportResponse(1234); - verify(localityStore).updateOobMetricsReportInterval(1234); - - verify(lookasideChannelCallback, never()).onError(); - - lookasideChannelLb.shutdown(); - } - - @Test - public void handleDropUpdates() { - verify(localityStore, never()).updateDropPercentage( - ArgumentMatchers.>any()); - - serverResponseWriter.onNext(edsResponse); - verify(localityStore).updateDropPercentage(eq(ImmutableList.of())); - - ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() - .setPolicy(Policy.newBuilder() - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.HUNDRED) - .setNumerator(3) - .build()) - .build()) - - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.TEN_THOUSAND) - .setNumerator(45) - .build()) - .build()) - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.MILLION) - .setNumerator(6789) - .build()) - .build()) - .build()) - .build(); - serverResponseWriter.onNext( - DiscoveryResponse.newBuilder() - .addResources(Any.pack(clusterLoadAssignment)) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") - .build()); - - verify(lookasideChannelCallback, never()).onAllDrop(); - verify(localityStore).updateDropPercentage(ImmutableList.of( - new DropOverload("cat_1", 300_00), - new DropOverload("cat_2", 45_00), - new DropOverload("cat_3", 6789))); - - - clusterLoadAssignment = ClusterLoadAssignment.newBuilder() - .setPolicy(Policy.newBuilder() - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.HUNDRED) - .setNumerator(3) - .build()) - .build()) - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.HUNDRED) - .setNumerator(101) - .build()) - .build()) - .addDropOverloads(Policy.DropOverload.newBuilder() - .setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder() - .setDenominator(DenominatorType.HUNDRED) - .setNumerator(23) - .build()) - .build()) - .build()) - .build(); - serverResponseWriter.onNext( - DiscoveryResponse.newBuilder() - .addResources(Any.pack(clusterLoadAssignment)) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") - .build()); - - verify(lookasideChannelCallback).onAllDrop(); - verify(localityStore).updateDropPercentage(ImmutableList.of( - new DropOverload("cat_1", 300_00), - new DropOverload("cat_2", 100_00_00))); - - verify(lookasideChannelCallback, never()).onError(); - - lookasideChannelLb.shutdown(); - } - - @Test - public void handleLocalityAssignmentUpdates() { - io.envoyproxy.envoy.api.v2.core.Locality localityProto1 = - io.envoyproxy.envoy.api.v2.core.Locality - .newBuilder() - .setRegion("region1") - .setZone("zone1") - .setSubZone("subzone1") - .build(); - LbEndpoint endpoint11 = LbEndpoint.newBuilder() - .setEndpoint(Endpoint.newBuilder() - .setAddress(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder() - .setAddress("addr11").setPortValue(11)))) - .setLoadBalancingWeight(UInt32Value.of(11)) - .build(); - LbEndpoint endpoint12 = LbEndpoint.newBuilder() - .setEndpoint(Endpoint.newBuilder() - .setAddress(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder() - .setAddress("addr12").setPortValue(12)))) - .setLoadBalancingWeight(UInt32Value.of(12)) - .build(); - io.envoyproxy.envoy.api.v2.core.Locality localityProto2 = - io.envoyproxy.envoy.api.v2.core.Locality - .newBuilder() - .setRegion("region2") - .setZone("zone2") - .setSubZone("subzone2") - .build(); - LbEndpoint endpoint21 = LbEndpoint.newBuilder() - .setEndpoint(Endpoint.newBuilder() - .setAddress(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder() - .setAddress("addr21").setPortValue(21)))) - .setLoadBalancingWeight(UInt32Value.of(21)) - .build(); - LbEndpoint endpoint22 = LbEndpoint.newBuilder() - .setEndpoint(Endpoint.newBuilder() - .setAddress(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder() - .setAddress("addr22").setPortValue(22)))) - .setLoadBalancingWeight(UInt32Value.of(22)) - .build(); - io.envoyproxy.envoy.api.v2.core.Locality localityProto3 = - io.envoyproxy.envoy.api.v2.core.Locality - .newBuilder() - .setRegion("region3") - .setZone("zone3") - .setSubZone("subzone3") - .build(); - LbEndpoint endpoint3 = LbEndpoint.newBuilder() - .setEndpoint(Endpoint.newBuilder() - .setAddress(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder() - .setAddress("addr31").setPortValue(31)))) - .setLoadBalancingWeight(UInt32Value.of(31)) - .build(); - ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() - .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() - .setLocality(localityProto1) - .addLbEndpoints(endpoint11) - .addLbEndpoints(endpoint12) - .setLoadBalancingWeight(UInt32Value.of(1))) - .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() - .setLocality(localityProto2) - .addLbEndpoints(endpoint21) - .addLbEndpoints(endpoint22) - .setLoadBalancingWeight(UInt32Value.of(2))) - .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() - .setLocality(localityProto3) - .addLbEndpoints(endpoint3) - .setLoadBalancingWeight(UInt32Value.of(0))) - .build(); - serverResponseWriter.onNext( - DiscoveryResponse.newBuilder() - .addResources(Any.pack(clusterLoadAssignment)) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") - .build()); - - Locality locality1 = Locality.fromEnvoyProtoLocality(localityProto1); - LocalityLbEndpoints localityInfo1 = new LocalityLbEndpoints( - ImmutableList.of( - EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint11), - EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint12)), - 1, 0); - LocalityLbEndpoints localityInfo2 = new LocalityLbEndpoints( - ImmutableList.of( - EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint21), - EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint22)), - 2, 0); - Locality locality2 = Locality.fromEnvoyProtoLocality(localityProto2); - - InOrder inOrder = inOrder(localityStore); - inOrder.verify(localityStore).updateDropPercentage(ImmutableList.of()); - inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture()); - assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly( - locality1, localityInfo1, locality2, localityInfo2).inOrder(); - - verify(lookasideChannelCallback, never()).onError(); - - lookasideChannelLb.shutdown(); - } - - @Test - public void verifyRpcErrorPropagation() { - verify(lookasideChannelCallback, never()).onError(); - serverResponseWriter.onError(new RuntimeException()); - verify(lookasideChannelCallback).onError(); - } - - @Test - public void shutdown() { - verify(loadReportClient, never()).stopLoadReporting(); - assertThat(channel.isShutdown()).isFalse(); - - lookasideChannelLb.shutdown(); - - verify(loadReportClient).stopLoadReporting(); - assertThat(channel.isShutdown()).isTrue(); - } - - /** - * Tests load reporting is initiated after receiving the first valid EDS response from the traffic - * director, then its operation is independent of load balancing until xDS load balancer is - * shutdown. - */ - @Test - public void reportLoadAfterReceivingFirstEdsResponseUntilShutdown() { - // Simulates a syntactically incorrect EDS response. - serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); - verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class)); - verify(lookasideChannelCallback, never()).onWorking(); - verify(lookasideChannelCallback, never()).onError(); - - // Simulate a syntactically correct EDS response. - DiscoveryResponse edsResponse = - DiscoveryResponse.newBuilder() - .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) - .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") - .build(); - serverResponseWriter.onNext(edsResponse); - - verify(lookasideChannelCallback).onWorking(); - - ArgumentCaptor lrsCallbackCaptor = ArgumentCaptor.forClass(null); - verify(loadReportClient).startLoadReporting(lrsCallbackCaptor.capture()); - lrsCallbackCaptor.getValue().onReportResponse(19543); - verify(localityStore).updateOobMetricsReportInterval(19543); - - // Simulate another EDS response from the same remote balancer. - serverResponseWriter.onNext(edsResponse); - verifyNoMoreInteractions(lookasideChannelCallback, loadReportClient); - - // Simulate an EDS error response. - serverResponseWriter.onError(Status.ABORTED.asException()); - verify(lookasideChannelCallback).onError(); - - verifyNoMoreInteractions(lookasideChannelCallback, loadReportClient); - verify(localityStore, times(1)).updateOobMetricsReportInterval(anyLong()); // only once - - lookasideChannelLb.shutdown(); - } -} diff --git a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java index 3abe6c4c6e..d32b1dcb8e 100644 --- a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java +++ b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java @@ -18,31 +18,78 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.READY; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; -import io.envoyproxy.envoy.api.v2.core.Node; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.api.v2.DiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DiscoveryResponse; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; +import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; +import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; +import io.envoyproxy.envoy.type.FractionalPercent; +import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType; import io.grpc.Attributes; +import io.grpc.ChannelLogger; +import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerRegistry; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy.Provider; +import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; -import io.grpc.xds.Bootstrapper.ChannelCreds; -import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback; -import io.grpc.xds.LookasideLb.LookasideChannelLbFactory; +import io.grpc.internal.testing.StreamRecorder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.EnvoyProtoData.DropOverload; +import io.grpc.xds.EnvoyProtoData.Locality; +import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.LoadReportClient.LoadReportCallback; +import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; +import io.grpc.xds.LocalityStore.LocalityStoreFactory; +import io.grpc.xds.LookasideLb.EdsUpdateCallback; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; /** * Tests for {@link LookasideLb}. @@ -50,34 +97,142 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class LookasideLbTest { - private final Helper helper = mock(Helper.class); - private final List helpers = new ArrayList<>(); - private final List balancers = new ArrayList<>(); - private final LookasideChannelLbFactory lookasideChannelLbFactory = - new LookasideChannelLbFactory() { + private static final String SERVICE_AUTHORITY = "test authority"; + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { @Override - public LoadBalancer newLoadBalancer( - Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName, - Node node, List channelCredsList) { - // just return a mock and record helper and balancer. - helpers.add(helper); - LoadBalancer balancer = mock(LoadBalancer.class); - balancers.add(balancer); - assertThat(node).isNotNull(); - return balancer; + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); } - }; + }); + private final StreamRecorder streamRecorder = StreamRecorder.create(); - private LoadBalancer lookasideLb = new LookasideLb( - helper, mock(LookasideChannelCallback.class), lookasideChannelLbFactory, - new LoadBalancerRegistry()); + private final DiscoveryResponse edsResponse = + DiscoveryResponse.newBuilder() + .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build(); + private final List helpers = new ArrayList<>(); + private final List localityStores = new ArrayList<>(); + private final List loadReportClients = new ArrayList<>(); + private final FakeClock fakeClock = new FakeClock(); + @Mock + private Helper helper; + @Mock + private EdsUpdateCallback edsUpdateCallback; + @Captor + private ArgumentCaptor> + localityEndpointsMappingCaptor; + + private ManagedChannel channel; + private ManagedChannel channel2; + private StreamObserver serverResponseWriter; + private LocalityStoreFactory localityStoreFactory; + private LoadBalancer lookasideLb; + private ResolvedAddresses defaultResolvedAddress; + + @Before + public void setUp() throws Exception { + AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { + @Override + public StreamObserver streamAggregatedResources( + final StreamObserver responseObserver) { + serverResponseWriter = responseObserver; + + return new StreamObserver() { + + @Override + public void onNext(DiscoveryRequest value) { + streamRecorder.onNext(value); + } + + @Override + public void onError(Throwable t) { + streamRecorder.onError(t); + } + + @Override + public void onCompleted() { + streamRecorder.onCompleted(); + responseObserver.onCompleted(); + } + }; + } + }; + + String serverName = InProcessServerBuilder.generateName(); + cleanupRule.register( + InProcessServerBuilder + .forName(serverName) + .directExecutor() + .addService(serviceImpl) + .build() + .start()); + channel = cleanupRule.register( + InProcessChannelBuilder + .forName(serverName) + .directExecutor() + .build()); + channel2 = cleanupRule.register( + InProcessChannelBuilder + .forName(serverName) + .directExecutor() + .build()); + + doReturn(SERVICE_AUTHORITY).when(helper).getAuthority(); + doReturn(syncContext).when(helper).getSynchronizationContext(); + doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger(); + doReturn(channel, channel2).when(helper).createResolvingOobChannel(anyString()); + doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); + + localityStoreFactory = new LocalityStoreFactory() { + @Override + public LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) { + helpers.add(helper); + LocalityStore localityStore = mock(LocalityStore.class); + LoadStatsStore loadStatsStore = mock(LoadStatsStore.class); + doReturn(loadStatsStore).when(localityStore).getLoadStatsStore(); + localityStores.add(localityStore); + return localityStore; + } + }; + + LoadReportClientFactory loadReportClientFactory = new LoadReportClientFactory() { + @Override + LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper, + Provider backoffPolicyProvider, LoadStatsStore loadStatsStore) { + LoadReportClient loadReportClient = mock(LoadReportClient.class); + loadReportClients.add(loadReportClient); + return loadReportClient; + } + }; + + lookasideLb = new LookasideLb( + helper, edsUpdateCallback, new LoadBalancerRegistry(), localityStoreFactory, + loadReportClientFactory); + + String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; + @SuppressWarnings("unchecked") + Map lbConfig11 = (Map) JsonParser.parse(lbConfigRaw11); + defaultResolvedAddress = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build()) + .build(); + } @Test public void handleChildPolicyChangeThenBalancerNameChangeThenChildPolicyChange() throws Exception { assertThat(helpers).isEmpty(); - assertThat(balancers).isEmpty(); + assertThat(localityStores).isEmpty(); + assertThat(loadReportClients).isEmpty(); List eags = ImmutableList.of(); String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; @@ -90,10 +245,9 @@ public class LookasideLbTest { lookasideLb.handleResolvedAddresses(resolvedAddresses); assertThat(helpers).hasSize(1); - assertThat(balancers).hasSize(1); + assertThat(localityStores).hasSize(1); + assertThat(loadReportClients).hasSize(1); Helper helper1 = helpers.get(0); - LoadBalancer balancer1 = balancers.get(0); - verify(balancer1).handleResolvedAddresses(resolvedAddresses); SubchannelPicker picker1 = mock(SubchannelPicker.class); helper1.updateBalancingState(CONNECTING, picker1); @@ -111,11 +265,13 @@ public class LookasideLbTest { .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); - verify(balancer1).handleResolvedAddresses(resolvedAddresses); - - verify(balancer1, never()).shutdown(); + LocalityStore localityStore1 = Iterables.getOnlyElement(localityStores); + LoadReportClient loadReportClient1 = Iterables.getOnlyElement(loadReportClients); + verify(localityStore1, never()).reset(); + verify(loadReportClient1, never()).stopLoadReporting(); assertThat(helpers).hasSize(1); - assertThat(balancers).hasSize(1); + assertThat(localityStores).hasSize(1); + assertThat(loadReportClients).hasSize(1); // change balancer name policy to balancer2.example.com String lbConfigRaw21 = "{\"balancerName\" : \"dns:///balancer2.example.com:8080\"}"; @@ -127,19 +283,21 @@ public class LookasideLbTest { .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); - verify(balancer1).shutdown(); + verify(localityStore1).reset(); + verify(loadReportClient1).stopLoadReporting(); assertThat(helpers).hasSize(2); - assertThat(balancers).hasSize(2); + assertThat(localityStores).hasSize(2); + assertThat(loadReportClients).hasSize(2); Helper helper2 = helpers.get(1); - LoadBalancer balancer2 = balancers.get(1); - verify(balancer1, never()).handleResolvedAddresses(resolvedAddresses); - verify(balancer2).handleResolvedAddresses(resolvedAddresses); + LocalityStore localityStore2 = localityStores.get(1); + LoadReportClient loadReportClient2 = loadReportClients.get(1); picker1 = mock(SubchannelPicker.class); helper1.updateBalancingState(CONNECTING, picker1); verify(helper, never()).updateBalancingState(CONNECTING, picker1); SubchannelPicker picker2 = mock(SubchannelPicker.class); helper2.updateBalancingState(CONNECTING, picker2); + // balancer1 was not READY, so balancer2 will update picker immediately verify(helper).updateBalancingState(CONNECTING, picker2); String lbConfigRaw22 = "{" @@ -154,21 +312,53 @@ public class LookasideLbTest { .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); - verify(balancer2).handleResolvedAddresses(resolvedAddresses); - assertThat(helpers).hasSize(2); - assertThat(balancers).hasSize(2); + assertThat(localityStores).hasSize(2); - verify(balancer2, never()).shutdown(); + SubchannelPicker picker3 = mock(SubchannelPicker.class); + helper2.updateBalancingState(READY, picker3); + verify(helper).updateBalancingState(READY, picker3); + + String lbConfigRaw3 = "{" + + "\"balancerName\" : \"dns:///balancer3.example.com:8080\"," + + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_3\" : {}}]" + + "}"; + @SuppressWarnings("unchecked") + Map lbConfig3 = (Map) JsonParser.parse(lbConfigRaw3); + resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(eags) + .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig3).build()) + .build(); + lookasideLb.handleResolvedAddresses(resolvedAddresses); + + assertThat(helpers).hasSize(3); + + Helper helper3 = helpers.get(2); + SubchannelPicker picker4 = mock(SubchannelPicker.class); + helper3.updateBalancingState(CONNECTING, picker4); + // balancer2 was READY, so balancer3 will gracefully switch and not update non-READY picker + verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker4)); + verify(localityStore2, never()).reset(); + verify(loadReportClient2, never()).stopLoadReporting(); + + SubchannelPicker picker5 = mock(SubchannelPicker.class); + helper3.updateBalancingState(READY, picker5); + verify(helper).updateBalancingState(READY, picker5); + verify(localityStore2).reset(); + verify(loadReportClient2).stopLoadReporting(); + + verify(localityStores.get(2), never()).reset(); + verify(loadReportClients.get(2), never()).stopLoadReporting(); lookasideLb.shutdown(); - verify(balancer2).shutdown(); + verify(localityStores.get(2)).reset(); + verify(loadReportClients.get(2)).stopLoadReporting(); } @Test public void handleResolvedAddress_createLbChannel() throws Exception { // Test balancer created with the default real LookasideChannelLbFactory - lookasideLb = new LookasideLb(helper, mock(LookasideChannelCallback.class)); + lookasideLb = new LookasideLb(helper, mock(EdsUpdateCallback.class)); String lbConfigRaw11 = "{'balancerName' : 'dns:///balancer1.example.com:8080'}" .replace("'", "\""); @SuppressWarnings("unchecked") @@ -179,11 +369,304 @@ public class LookasideLbTest { .build(); verify(helper, never()).createResolvingOobChannel(anyString()); - try { - lookasideLb.handleResolvedAddresses(resolvedAddresses); - } catch (RuntimeException e) { - // Expected because helper is a mock and helper.createResolvingOobChannel() returns null. - } + lookasideLb.handleResolvedAddresses(resolvedAddresses); verify(helper).createResolvingOobChannel("dns:///balancer1.example.com:8080"); + + lookasideLb.shutdown(); + } + + @Test + public void firstAndSecondEdsResponseReceived() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + verify(edsUpdateCallback, never()).onWorking(); + LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients); + verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class)); + + // first EDS response + serverResponseWriter.onNext(edsResponse); + verify(edsUpdateCallback).onWorking(); + ArgumentCaptor loadReportCallbackCaptor = + ArgumentCaptor.forClass(LoadReportCallback.class); + verify(loadReportClient).startLoadReporting(loadReportCallbackCaptor.capture()); + LoadReportCallback loadReportCallback = loadReportCallbackCaptor.getValue(); + + // second EDS response + serverResponseWriter.onNext(edsResponse); + verify(edsUpdateCallback, times(1)).onWorking(); + verify(loadReportClient, times(1)).startLoadReporting(any(LoadReportCallback.class)); + + LocalityStore localityStore = Iterables.getOnlyElement(localityStores); + verify(localityStore, never()).updateOobMetricsReportInterval(anyLong()); + loadReportCallback.onReportResponse(1234); + verify(localityStore).updateOobMetricsReportInterval(1234); + + verify(edsUpdateCallback, never()).onError(); + + lookasideLb.shutdown(); + } + + @Test + public void handleDropUpdates() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + LocalityStore localityStore = Iterables.getOnlyElement(localityStores); + verify(localityStore, never()).updateDropPercentage( + ArgumentMatchers.>any()); + + serverResponseWriter.onNext(edsResponse); + verify(localityStore).updateDropPercentage(eq(ImmutableList.of())); + + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setPolicy(Policy.newBuilder() + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.HUNDRED) + .setNumerator(3) + .build()) + .build()) + + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.TEN_THOUSAND) + .setNumerator(45) + .build()) + .build()) + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.MILLION) + .setNumerator(6789) + .build()) + .build()) + .build()) + .build(); + serverResponseWriter.onNext( + DiscoveryResponse.newBuilder() + .addResources(Any.pack(clusterLoadAssignment)) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build()); + + verify(edsUpdateCallback, never()).onAllDrop(); + verify(localityStore).updateDropPercentage(ImmutableList.of( + new DropOverload("cat_1", 300_00), + new DropOverload("cat_2", 45_00), + new DropOverload("cat_3", 6789))); + + + clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .setPolicy(Policy.newBuilder() + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.HUNDRED) + .setNumerator(3) + .build()) + .build()) + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.HUNDRED) + .setNumerator(101) + .build()) + .build()) + .addDropOverloads(Policy.DropOverload.newBuilder() + .setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder() + .setDenominator(DenominatorType.HUNDRED) + .setNumerator(23) + .build()) + .build()) + .build()) + .build(); + serverResponseWriter.onNext( + DiscoveryResponse.newBuilder() + .addResources(Any.pack(clusterLoadAssignment)) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build()); + + verify(edsUpdateCallback).onAllDrop(); + verify(localityStore).updateDropPercentage(ImmutableList.of( + new DropOverload("cat_1", 300_00), + new DropOverload("cat_2", 100_00_00))); + + verify(edsUpdateCallback, never()).onError(); + + lookasideLb.shutdown(); + } + + @Test + public void handleLocalityAssignmentUpdates() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + io.envoyproxy.envoy.api.v2.core.Locality localityProto1 = + io.envoyproxy.envoy.api.v2.core.Locality + .newBuilder() + .setRegion("region1") + .setZone("zone1") + .setSubZone("subzone1") + .build(); + LbEndpoint endpoint11 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr11").setPortValue(11)))) + .setLoadBalancingWeight(UInt32Value.of(11)) + .build(); + LbEndpoint endpoint12 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr12").setPortValue(12)))) + .setLoadBalancingWeight(UInt32Value.of(12)) + .build(); + io.envoyproxy.envoy.api.v2.core.Locality localityProto2 = + io.envoyproxy.envoy.api.v2.core.Locality + .newBuilder() + .setRegion("region2") + .setZone("zone2") + .setSubZone("subzone2") + .build(); + LbEndpoint endpoint21 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr21").setPortValue(21)))) + .setLoadBalancingWeight(UInt32Value.of(21)) + .build(); + LbEndpoint endpoint22 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr22").setPortValue(22)))) + .setLoadBalancingWeight(UInt32Value.of(22)) + .build(); + io.envoyproxy.envoy.api.v2.core.Locality localityProto3 = + io.envoyproxy.envoy.api.v2.core.Locality + .newBuilder() + .setRegion("region3") + .setZone("zone3") + .setSubZone("subzone3") + .build(); + LbEndpoint endpoint3 = LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress("addr31").setPortValue(31)))) + .setLoadBalancingWeight(UInt32Value.of(31)) + .build(); + ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder() + .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() + .setLocality(localityProto1) + .addLbEndpoints(endpoint11) + .addLbEndpoints(endpoint12) + .setLoadBalancingWeight(UInt32Value.of(1))) + .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() + .setLocality(localityProto2) + .addLbEndpoints(endpoint21) + .addLbEndpoints(endpoint22) + .setLoadBalancingWeight(UInt32Value.of(2))) + .addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder() + .setLocality(localityProto3) + .addLbEndpoints(endpoint3) + .setLoadBalancingWeight(UInt32Value.of(0))) + .build(); + serverResponseWriter.onNext( + DiscoveryResponse.newBuilder() + .addResources(Any.pack(clusterLoadAssignment)) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build()); + + Locality locality1 = Locality.fromEnvoyProtoLocality(localityProto1); + LocalityLbEndpoints localityInfo1 = new LocalityLbEndpoints( + ImmutableList.of( + EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint11), + EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint12)), + 1, 0); + LocalityLbEndpoints localityInfo2 = new LocalityLbEndpoints( + ImmutableList.of( + EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint21), + EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint22)), + 2, 0); + Locality locality2 = Locality.fromEnvoyProtoLocality(localityProto2); + + LocalityStore localityStore = Iterables.getOnlyElement(localityStores); + InOrder inOrder = inOrder(localityStore); + inOrder.verify(localityStore).updateDropPercentage(ImmutableList.of()); + inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture()); + assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly( + locality1, localityInfo1, locality2, localityInfo2).inOrder(); + + verify(edsUpdateCallback, never()).onError(); + + lookasideLb.shutdown(); + } + + @Test + public void verifyRpcErrorPropagation() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + verify(edsUpdateCallback, never()).onError(); + serverResponseWriter.onError(new RuntimeException()); + verify(edsUpdateCallback).onError(); + } + + @Test + public void shutdown() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + LocalityStore localityStore = Iterables.getOnlyElement(localityStores); + LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients); + verify(localityStore, never()).reset(); + verify(loadReportClient, never()).stopLoadReporting(); + assertThat(channel.isShutdown()).isFalse(); + + lookasideLb.shutdown(); + + verify(localityStore).reset(); + verify(loadReportClient).stopLoadReporting(); + assertThat(channel.isShutdown()).isTrue(); + } + + /** + * Tests load reporting is initiated after receiving the first valid EDS response from the traffic + * director, then its operation is independent of load balancing until xDS load balancer is + * shutdown. + */ + @Test + public void reportLoadAfterReceivingFirstEdsResponseUntilShutdown() { + lookasideLb.handleResolvedAddresses(defaultResolvedAddress); + + // Simulates a syntactically incorrect EDS response. + serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); + LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients); + verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class)); + verify(edsUpdateCallback, never()).onWorking(); + verify(edsUpdateCallback, never()).onError(); + + // Simulate a syntactically correct EDS response. + DiscoveryResponse edsResponse = + DiscoveryResponse.newBuilder() + .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) + .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") + .build(); + serverResponseWriter.onNext(edsResponse); + + verify(edsUpdateCallback).onWorking(); + + ArgumentCaptor lrsCallbackCaptor = ArgumentCaptor.forClass(null); + verify(loadReportClient).startLoadReporting(lrsCallbackCaptor.capture()); + lrsCallbackCaptor.getValue().onReportResponse(19543); + LocalityStore localityStore = Iterables.getOnlyElement(localityStores); + verify(localityStore).updateOobMetricsReportInterval(19543); + + // Simulate another EDS response from the same remote balancer. + serverResponseWriter.onNext(edsResponse); + verifyNoMoreInteractions(edsUpdateCallback, loadReportClient); + + // Simulate an EDS error response. + serverResponseWriter.onError(Status.ABORTED.asException()); + verify(edsUpdateCallback).onError(); + + verifyNoMoreInteractions(edsUpdateCallback, loadReportClient); + verify(localityStore, times(1)).updateOobMetricsReportInterval(anyLong()); // only once + + lookasideLb.shutdown(); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java index 99e802d92f..ae3c309445 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java @@ -40,7 +40,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; -import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback; +import io.grpc.xds.LookasideLb.EdsUpdateCallback; import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory; import java.util.ArrayList; import java.util.List; @@ -75,7 +75,7 @@ public class XdsLoadBalancer2Test { @Mock private Helper helper; private LoadBalancer xdsLoadBalancer; - private LookasideChannelCallback lookasideChannelCallback; + private EdsUpdateCallback edsUpdateCallback; private Helper lookasideLbHelper; private final List lookasideLbs = new ArrayList<>(); @@ -90,10 +90,10 @@ public class XdsLoadBalancer2Test { LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() { @Override public LoadBalancer newLoadBalancer( - Helper helper, LookasideChannelCallback lookasideChannelCallback) { + Helper helper, EdsUpdateCallback edsUpdateCallback) { // just return a mock and record the input and output lookasideLbHelper = helper; - XdsLoadBalancer2Test.this.lookasideChannelCallback = lookasideChannelCallback; + XdsLoadBalancer2Test.this.edsUpdateCallback = edsUpdateCallback; LoadBalancer lookasideLb = mock(LoadBalancer.class); lookasideLbs.add(lookasideLb); return lookasideLb; @@ -142,7 +142,7 @@ public class XdsLoadBalancer2Test { public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() { verifyNotInFallbackMode(); fakeClock.forwardTime(9, TimeUnit.SECONDS); - lookasideChannelCallback.onWorking(); + edsUpdateCallback.onWorking(); verifyNotInFallbackMode(); fakeClock.forwardTime(1, TimeUnit.SECONDS); verifyInFallbackMode(); @@ -162,7 +162,7 @@ public class XdsLoadBalancer2Test { verifyNotInFallbackMode(); assertThat(fakeClock.getPendingTasks()).hasSize(1); - lookasideChannelCallback.onWorking(); + edsUpdateCallback.onWorking(); SubchannelPicker subchannelPicker = mock(SubchannelPicker.class); lookasideLbHelper.updateBalancingState(READY, subchannelPicker); verify(helper).updateBalancingState(READY, subchannelPicker); @@ -177,7 +177,7 @@ public class XdsLoadBalancer2Test { verifyNotInFallbackMode(); assertThat(fakeClock.getPendingTasks()).hasSize(1); - lookasideChannelCallback.onAllDrop(); + edsUpdateCallback.onAllDrop(); assertThat(fakeClock.getPendingTasks()).isEmpty(); verifyNotInFallbackMode(); @@ -189,7 +189,7 @@ public class XdsLoadBalancer2Test { verifyNotInFallbackMode(); assertThat(fakeClock.getPendingTasks()).hasSize(1); - lookasideChannelCallback.onError(); + edsUpdateCallback.onError(); verifyInFallbackMode(); assertThat(fallbackLbs).hasSize(1); @@ -199,8 +199,8 @@ public class XdsLoadBalancer2Test { public void lookasideChannelSeeingEdsResponseThenFailsBeforeTimeoutAtStartup() { verifyNotInFallbackMode(); assertThat(fakeClock.getPendingTasks()).hasSize(1); - lookasideChannelCallback.onWorking(); - lookasideChannelCallback.onError(); + edsUpdateCallback.onWorking(); + edsUpdateCallback.onError(); verifyNotInFallbackMode(); fakeClock.forwardTime(10, TimeUnit.SECONDS); @@ -221,7 +221,7 @@ public class XdsLoadBalancer2Test { .build(); xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses); - lookasideChannelCallback.onError(); + edsUpdateCallback.onError(); LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs); verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses)); }