diff --git a/xds/src/main/java/io/grpc/xds/LookasideLb.java b/xds/src/main/java/io/grpc/xds/LookasideLb.java index d32530c642..7b5a82ac25 100644 --- a/xds/src/main/java/io/grpc/xds/LookasideLb.java +++ b/xds/src/main/java/io/grpc/xds/LookasideLb.java @@ -17,17 +17,16 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.xds.XdsNameResolver.XDS_CHANNEL_CREDS_LIST; -import static io.grpc.xds.XdsNameResolver.XDS_NODE; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static java.util.logging.Level.FINEST; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Struct; -import com.google.protobuf.Value; import io.envoyproxy.envoy.api.v2.core.Node; import io.grpc.Attributes; +import io.grpc.ChannelLogger; +import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; @@ -38,8 +37,9 @@ import io.grpc.Status; import io.grpc.alts.GoogleDefaultChannelBuilder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; -import io.grpc.util.ForwardingLoadBalancer; +import io.grpc.internal.ObjectPool; import io.grpc.util.GracefulSwitchLoadBalancer; +import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.ChannelCreds; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.Locality; @@ -49,169 +49,223 @@ import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; import io.grpc.xds.LocalityStore.LocalityStoreFactory; import io.grpc.xds.XdsClient.EndpointUpdate; import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; +import io.grpc.xds.XdsClient.XdsClientFactory; import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; +import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.logging.Logger; import javax.annotation.Nullable; -/** Lookaside load balancer that handles balancer name changes. */ -final class LookasideLb extends ForwardingLoadBalancer { +/** Lookaside load balancer that handles EDS config. */ +final class LookasideLb extends LoadBalancer { - private final EdsUpdateCallback edsUpdateCallback; - private final GracefulSwitchLoadBalancer lookasideChannelLb; + private final ChannelLogger channelLogger; + private final EndpointUpdateCallback endpointUpdateCallback; + private final GracefulSwitchLoadBalancer switchingLoadBalancer; private final LoadBalancerRegistry lbRegistry; private final LocalityStoreFactory localityStoreFactory; private final LoadReportClientFactory loadReportClientFactory; + private final Bootstrapper bootstrapper; + private final Helper lookasideLbHelper; - private String balancerName; + // Most recent XdsConfig. + // Becomes non-null once handleResolvedAddresses() successfully. + @Nullable + private XdsConfig xdsConfig; + // Most recent EndpointWatcher. + // Becomes non-null once handleResolvedAddresses() successfully. + @Nullable + private EndpointWatcher endpointWatcher; - LookasideLb(Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { + // Becomes non-null and calls getObject() once handleResolvedAddresses() successfully. + // Will call returnObject() at balancer shutdown. + @Nullable + private ObjectPool xdsClientRef; + // Becomes non-null once handleResolvedAddresses() successfully. + @Nullable + XdsClient xdsClient; + // Becomes non-null for EDS-only case once handleResolvedAddresses() successfully. + // TODO(zdapeng): Stop using it once XdsClientImpl is used. + @Nullable + ManagedChannel channel; + + LookasideLb(Helper lookasideLbHelper, EndpointUpdateCallback endpointUpdateCallback) { this( - lookasideLbHelper, - edsUpdateCallback, + checkNotNull(lookasideLbHelper, "lookasideLbHelper"), + checkNotNull(endpointUpdateCallback, "endpointUpdateCallback"), LoadBalancerRegistry.getDefaultRegistry(), LocalityStoreFactory.getInstance(), - LoadReportClientFactory.getInstance()); + LoadReportClientFactory.getInstance(), + Bootstrapper.getInstance()); } @VisibleForTesting LookasideLb( Helper lookasideLbHelper, - EdsUpdateCallback edsUpdateCallback, + EndpointUpdateCallback endpointUpdateCallback, LoadBalancerRegistry lbRegistry, LocalityStoreFactory localityStoreFactory, - LoadReportClientFactory loadReportClientFactory) { - this.edsUpdateCallback = edsUpdateCallback; + LoadReportClientFactory loadReportClientFactory, + Bootstrapper bootstrapper) { + this.lookasideLbHelper = lookasideLbHelper; + this.channelLogger = lookasideLbHelper.getChannelLogger(); + this.endpointUpdateCallback = endpointUpdateCallback; this.lbRegistry = lbRegistry; - this.lookasideChannelLb = new GracefulSwitchLoadBalancer(lookasideLbHelper); + this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(lookasideLbHelper); this.localityStoreFactory = localityStoreFactory; this.loadReportClientFactory = loadReportClientFactory; - } - - @Override - protected LoadBalancer delegate() { - return lookasideChannelLb; + this.bootstrapper = bootstrapper; } @Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + channelLogger.log(ChannelLogLevel.DEBUG, "Received ResolvedAddresses '%s'", resolvedAddresses); + // In the future, xdsConfig can be gotten directly by - // resolvedAddresses.getLoadBalancingPolicyConfig() + // resolvedAddresses.getLoadBalancingPolicyConfig(). Attributes attributes = resolvedAddresses.getAttributes(); - Map newRawLbConfig = checkNotNull( - attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available"); + Map newRawLbConfig = attributes.get(ATTR_LOAD_BALANCING_CONFIG); + if (newRawLbConfig == null) { + // This will not happen when the service config error handling is implemented. + // For now simply go to TRANSIENT_FAILURE. + lookasideLbHelper.updateBalancingState( + TRANSIENT_FAILURE, + new ErrorPicker( + Status.UNAVAILABLE.withDescription("ATTR_LOAD_BALANCING_CONFIG not available"))); + return; + } ConfigOrError cfg = XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(newRawLbConfig, lbRegistry); if (cfg.getError() != null) { - throw cfg.getError().asRuntimeException(); + // This will not happen when the service config error handling is implemented. + // For now simply go to TRANSIENT_FAILURE. + lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(cfg.getError())); + return; } - XdsConfig xdsConfig = (XdsConfig) cfg.getConfig(); + XdsConfig newXdsConfig = (XdsConfig) cfg.getConfig(); + ObjectPool xdsClientRefFromResolver = attributes.get(XdsAttributes.XDS_CLIENT_REF); + ObjectPool xdsClientRef; - final String newBalancerName = xdsConfig.balancerName; - - // The is to handle the legacy usecase that requires balancerName from xds config. - if (!newBalancerName.equals(balancerName)) { - balancerName = newBalancerName; // cache the name and check next time for optimization - Node nodeFromResolvedAddresses = resolvedAddresses.getAttributes().get(XDS_NODE); - final Node node; - if (nodeFromResolvedAddresses == null) { - node = Node.newBuilder() - .setMetadata(Struct.newBuilder() - .putFields( - "endpoints_required", - Value.newBuilder().setBoolValue(true).build())) - .build(); + // Init XdsClient. + if (xdsClient == null) { + // There are three usecases: + // 1. The EDS-only legacy usecase that requires balancerName from xds config. + // Note: we don't support balancerName change. + // TODO(zdapeng): Remove the legacy case. + // 2. The EDS-only with bootstrap usecase: + // The name resolver resolves a ResolvedAddresses with an XdsConfig without balancerName + // field. Use the bootstrap information to create a channel. + // 3. Non EDS-only usecase: + // XDS_CLIENT_REF attribute is available from ResolvedAddresses either from + // XdsNameResolver or CDS policy. + // + // We assume XdsConfig switching happens only within one usecase, and there is no switching + // between different usecases. + if (newXdsConfig.balancerName != null) { + // This is the EDS-only legacy usecase that requires balancerName from xds config. + channel = initLbChannel( + lookasideLbHelper, newXdsConfig.balancerName, Collections.emptyList()); + xdsClientRef = new RefCountedXdsClientObjectPool(new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + return new XdsComms2( + channel, lookasideLbHelper, new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, Node.getDefaultInstance()); + } + }); + } else if (xdsClientRefFromResolver != null) { + // This is the Non EDS-only usecase. + xdsClientRef = xdsClientRefFromResolver; } else { - node = nodeFromResolvedAddresses; - } - List channelCredsListFromResolvedAddresses = - resolvedAddresses.getAttributes().get(XDS_CHANNEL_CREDS_LIST); - final List channelCredsList; - if (channelCredsListFromResolvedAddresses == null) { - channelCredsList = Collections.emptyList(); - } else { - channelCredsList = channelCredsListFromResolvedAddresses; + // This is the EDS-only with bootstrap usecase. + final BootstrapInfo bootstrapInfo; + try { + bootstrapInfo = bootstrapper.readBootstrap(); + } catch (Exception e) { + lookasideLbHelper.updateBalancingState( + TRANSIENT_FAILURE, + new ErrorPicker(Status.UNAVAILABLE.withCause(e))); + return; + } + channel = initLbChannel( + lookasideLbHelper, bootstrapInfo.getServerUri(), + bootstrapInfo.getChannelCredentials()); + xdsClientRef = new RefCountedXdsClientObjectPool(new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + // TODO(zdapeng): Replace XdsComms2 with XdsClientImpl. + return new XdsComms2( + channel, lookasideLbHelper, new ExponentialBackoffPolicy.Provider(), + GrpcUtil.STOPWATCH_SUPPLIER, bootstrapInfo.getNode()); + } + }); } - LoadBalancerProvider childBalancerProvider = new LoadBalancerProvider() { - @Override - public boolean isAvailable() { - return true; - } - - @Override - public int getPriority() { - return 5; - } - - /** - * A synthetic policy name identified by balancerName. The implementation detail doesn't - * matter. - */ - @Override - public String getPolicyName() { - return "xds_child_policy_balancer_name_" + newBalancerName; - } - - @Override - public LoadBalancer newLoadBalancer(final Helper helper) { - return new LoadBalancer() { - @Nullable - XdsClient xdsClient; - @Nullable - LocalityStore localityStore; - @Nullable - LoadReportClient lrsClient; - - @Override - public void handleNameResolutionError(Status error) {} - - @Override - public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { - if (xdsClient == null) { - ManagedChannel channel = initLbChannel(helper, newBalancerName, channelCredsList); - xdsClient = new XdsComms2( - channel, helper, new ExponentialBackoffPolicy.Provider(), - GrpcUtil.STOPWATCH_SUPPLIER, node); - LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(); - localityStore = localityStoreFactory.newLocalityStore( - helper, lbRegistry, loadStatsStore); - // TODO(zdapeng): Use XdsClient to do Lrs directly. - lrsClient = loadReportClientFactory.createLoadReportClient( - channel, helper, new ExponentialBackoffPolicy.Provider(), - loadStatsStore); - final LoadReportCallback lrsCallback = - new LoadReportCallback() { - @Override - public void onReportResponse(long reportIntervalNano) { - localityStore.updateOobMetricsReportInterval(reportIntervalNano); - } - }; - - EndpointWatcher endpointWatcher = - new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore); - xdsClient.watchEndpointData(node.getCluster(), endpointWatcher); - } - } - - @Override - public void shutdown() { - if (xdsClient != null) { - lrsClient.stopLoadReporting(); - localityStore.reset(); - xdsClient.shutdown(); - } - } - }; - } - }; - - lookasideChannelLb.switchTo(childBalancerProvider); + // At this point the xdsClientRef is assigned in all usecases, cache them for later use. + this.xdsClientRef = xdsClientRef; + xdsClient = xdsClientRef.getObject(); } - lookasideChannelLb.handleResolvedAddresses(resolvedAddresses); + // Note: balancerName change is unsupported and ignored. + // TODO(zdapeng): Remove support for balancerName. + // Note: childPolicy change will be handled in LocalityStore, to be implemented. + + // If edsServiceName in XdsConfig is changed, do a graceful switch. + if (xdsConfig == null + || !Objects.equals(newXdsConfig.edsServiceName, xdsConfig.edsServiceName)) { + String edsServiceName = newXdsConfig.edsServiceName; + + // The edsServiceName field is null in legacy gRPC client with EDS: use target authority for + // querying endpoints, but in the future we expect this to be explicitly given by EDS config. + // We assume if edsServiceName is null, it will always be null in later resolver updates; + // and if edsServiceName is not null, it will always be not null. + if (edsServiceName == null) { + edsServiceName = lookasideLbHelper.getAuthority(); + } + + LoadBalancerProvider clusterEndpointsLoadBalancer = + new ClusterEndpointsBalancerProvider(edsServiceName); + switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancer); + } + resolvedAddresses = resolvedAddresses.toBuilder() + .setAttributes(attributes.toBuilder().discard(ATTR_LOAD_BALANCING_CONFIG).build()) + .setLoadBalancingPolicyConfig(newXdsConfig) + .build(); + switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses); + this.xdsConfig = newXdsConfig; + + // TODO(zdapeng): If lrsServerName in XdsConfig is changed, call xdsClient.reportClientStats() + // and/or xdsClient.cancelClientStatsReport(). + } + + @Override + public void handleNameResolutionError(Status error) { + channelLogger.log(ChannelLogLevel.ERROR, "Name resolution error: '%s'", error); + // Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise, + // we keep running with the data we had previously. + if (endpointWatcher == null) { + lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } else { + switchingLoadBalancer.handleNameResolutionError(error); + } + } + + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + + @Override + public void shutdown() { + channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down"); + switchingLoadBalancer.shutdown(); + if (xdsClientRef != null) { + xdsClientRef.returnObject(xdsClient); + } } private static ManagedChannel initLbChannel( @@ -222,7 +276,7 @@ final class LookasideLb extends ForwardingLoadBalancer { try { channel = helper.createResolvingOobChannel(balancerName); } catch (UnsupportedOperationException uoe) { - // Temporary solution until createResolvingOobChannel is implemented + // Temporary solution until createResolvingOobChannel is implemented. // FIXME (https://github.com/grpc/grpc-java/issues/5495) Logger logger = Logger.getLogger(LookasideLb.class.getName()); if (logger.isLoggable(FINEST)) { @@ -248,10 +302,134 @@ final class LookasideLb extends ForwardingLoadBalancer { return channel; } + private final class ClusterEndpointsBalancerProvider extends LoadBalancerProvider { + final String edsServiceName; + @Nullable + final String oldEdsServiceName; + @Nullable + final EndpointWatcher oldEndpointWatcher; + + ClusterEndpointsBalancerProvider(String edsServiceName) { + this.edsServiceName = edsServiceName; + if (xdsConfig != null) { + oldEdsServiceName = xdsConfig.edsServiceName; + } else { + oldEdsServiceName = null; + } + oldEndpointWatcher = endpointWatcher; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + // A synthetic policy name identified by edsServiceName in XdsConfig. + @Override + public String getPolicyName() { + return "xds_policy__edsServiceName_" + edsServiceName; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new ClusterEndpointsBalancer(helper); + } + + /** + * Load-balances endpoints for a given cluster. + */ + final class ClusterEndpointsBalancer extends LoadBalancer { + final Helper helper; + + // All fields become non-null once handleResolvedAddresses() successfully. + // All fields are assigned at most once. + @Nullable + LocalityStore localityStore; + @Nullable + LoadReportClient lrsClient; + @Nullable + EndpointWatcherImpl endpointWatcher; + + ClusterEndpointsBalancer(Helper helper) { + this.helper = helper; + } + + @Override + public void handleNameResolutionError(Status error) { + // Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise, + // we keep running with the data we had previously. + if (endpointWatcher == null || !endpointWatcher.firstEndpointUpdateReceived) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + } + + @Override + public boolean canHandleEmptyAddressListFromNameResolution() { + return true; + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + XdsConfig xdsConfig = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + + if (endpointWatcher != null) { + // TODO(zddapeng): Handle child policy changed if any. + return; + } + + LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(); + localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore); + LoadReportCallback lrsCallback = + new LoadReportCallback() { + @Override + public void onReportResponse(long reportIntervalNano) { + localityStore.updateOobMetricsReportInterval(reportIntervalNano); + } + }; + + // TODO(zdapeng): Use XdsClient to do Lrs directly. + // For now create an LRS Client. + if (xdsConfig.balancerName != null) { + lrsClient = loadReportClientFactory.createLoadReportClient( + channel, helper, new ExponentialBackoffPolicy.Provider(), loadStatsStore); + } else { + lrsClient = new LoadReportClient() { + @Override + public void startLoadReporting(LoadReportCallback callback) {} + + @Override + public void stopLoadReporting() {} + }; + } + + endpointWatcher = new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore); + xdsClient.watchEndpointData(edsServiceName, endpointWatcher); + if (oldEndpointWatcher != null && oldEdsServiceName != null) { + xdsClient.cancelEndpointDataWatch(oldEdsServiceName, oldEndpointWatcher); + } + LookasideLb.this.endpointWatcher = endpointWatcher; + } + + @Override + public void shutdown() { + if (endpointWatcher != null) { + lrsClient.stopLoadReporting(); + localityStore.reset(); + xdsClient.cancelEndpointDataWatch(edsServiceName, endpointWatcher); + } + } + } + } + /** * Callbacks for the EDS-only-with-fallback usecase. Being deprecated. */ - interface EdsUpdateCallback { + interface EndpointUpdateCallback { void onWorking(); @@ -265,7 +443,7 @@ final class LookasideLb extends ForwardingLoadBalancer { final LoadReportClient lrsClient; final LoadReportCallback lrsCallback; final LocalityStore localityStore; - boolean firstEdsUpdateReceived; + boolean firstEndpointUpdateReceived; EndpointWatcherImpl( LoadReportClient lrsClient, LoadReportCallback lrsCallback, LocalityStore localityStore) { @@ -276,9 +454,14 @@ final class LookasideLb extends ForwardingLoadBalancer { @Override public void onEndpointChanged(EndpointUpdate endpointUpdate) { - if (!firstEdsUpdateReceived) { - firstEdsUpdateReceived = true; - edsUpdateCallback.onWorking(); + channelLogger.log( + ChannelLogLevel.DEBUG, + "EDS load balancer received an endpoint update: '%s'", + endpointUpdate); + + if (!firstEndpointUpdateReceived) { + firstEndpointUpdateReceived = true; + endpointUpdateCallback.onWorking(); lrsClient.startLoadReporting(lrsCallback); } @@ -287,7 +470,7 @@ final class LookasideLb extends ForwardingLoadBalancer { for (DropOverload dropOverload : dropOverloads) { dropOverloadsBuilder.add(dropOverload); if (dropOverload.getDropsPerMillion() == 1_000_000) { - edsUpdateCallback.onAllDrop(); + endpointUpdateCallback.onAllDrop(); break; } } @@ -309,7 +492,9 @@ final class LookasideLb extends ForwardingLoadBalancer { @Override public void onError(Status error) { - edsUpdateCallback.onError(); + channelLogger.log( + ChannelLogLevel.ERROR, "EDS load balancer received an error: '%s'", error); + endpointUpdateCallback.onError(); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java index 5aeb2680ee..fdbfdf5149 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer2.java @@ -27,7 +27,7 @@ import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.util.ForwardingLoadBalancerHelper; -import io.grpc.xds.LookasideLb.EdsUpdateCallback; +import io.grpc.xds.LookasideLb.EndpointUpdateCallback; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nullable; @@ -46,7 +46,7 @@ final class XdsLoadBalancer2 extends LoadBalancer { private final Helper helper; private final LoadBalancer lookasideLb; private final LoadBalancer.Factory fallbackLbFactory; - private final EdsUpdateCallback edsUpdateCallback = new EdsUpdateCallback() { + private final EndpointUpdateCallback edsUpdateCallback = new EndpointUpdateCallback() { @Override public void onWorking() { if (childPolicyHasBeenReady) { @@ -247,13 +247,13 @@ final class XdsLoadBalancer2 extends LoadBalancer { /** Factory of a look-aside load balancer. The interface itself is for convenience in test. */ @VisibleForTesting interface LookasideLbFactory { - LoadBalancer newLoadBalancer(Helper helper, EdsUpdateCallback edsUpdateCallback); + LoadBalancer newLoadBalancer(Helper helper, EndpointUpdateCallback edsUpdateCallback); } private static final class LookasideLbFactoryImpl implements LookasideLbFactory { @Override public LoadBalancer newLoadBalancer( - Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { + Helper lookasideLbHelper, EndpointUpdateCallback edsUpdateCallback) { return new LookasideLb(lookasideLbHelper, edsUpdateCallback); } } diff --git a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java index 0f8ff76270..eee1c8e668 100644 --- a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java +++ b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java @@ -19,6 +19,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -42,6 +43,7 @@ import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; @@ -56,6 +58,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Status; @@ -65,17 +68,25 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.BackoffPolicy.Provider; import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; +import io.grpc.internal.ObjectPool; import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.Bootstrapper.BootstrapInfo; +import io.grpc.xds.Bootstrapper.ChannelCreds; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import io.grpc.xds.LoadReportClient.LoadReportCallback; import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; import io.grpc.xds.LocalityStore.LocalityStoreFactory; -import io.grpc.xds.LookasideLb.EdsUpdateCallback; -import java.util.ArrayList; +import io.grpc.xds.LookasideLb.EndpointUpdateCallback; +import io.grpc.xds.XdsClient.EndpointUpdate; +import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; +import io.grpc.xds.XdsClient.XdsClientFactory; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; import java.util.Map; import org.junit.Before; @@ -118,15 +129,17 @@ public class LookasideLbTest { .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .build(); - private final List helpers = new ArrayList<>(); - private final List localityStores = new ArrayList<>(); - private final List loadReportClients = new ArrayList<>(); + private final Deque helpers = new ArrayDeque<>(); + private final Deque localityStores = new ArrayDeque<>(); + private final Deque loadReportClients = new ArrayDeque<>(); private final FakeClock fakeClock = new FakeClock(); @Mock private Helper helper; @Mock - private EdsUpdateCallback edsUpdateCallback; + private EndpointUpdateCallback edsUpdateCallback; + @Mock + private Bootstrapper bootstrapper; @Captor private ArgumentCaptor> localityEndpointsMappingCaptor; @@ -134,7 +147,6 @@ public class LookasideLbTest { private ManagedChannel channel; private ManagedChannel channel2; private StreamObserver serverResponseWriter; - private LocalityStoreFactory localityStoreFactory; private LoadBalancer lookasideLb; private ResolvedAddresses defaultResolvedAddress; @@ -192,7 +204,7 @@ public class LookasideLbTest { doReturn(channel, channel2).when(helper).createResolvingOobChannel(anyString()); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); - localityStoreFactory = new LocalityStoreFactory() { + LocalityStoreFactory localityStoreFactory = new LocalityStoreFactory() { @Override public LocalityStore newLocalityStore( Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) { @@ -213,9 +225,32 @@ public class LookasideLbTest { } }; + LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); + lbRegistry.register(new LoadBalancerProvider() { + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "supported1"; + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return mock(LoadBalancer.class); + } + }); + lookasideLb = new LookasideLb( - helper, edsUpdateCallback, new LoadBalancerRegistry(), localityStoreFactory, - loadReportClientFactory); + helper, edsUpdateCallback, lbRegistry, localityStoreFactory, loadReportClientFactory, + bootstrapper); String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; @SuppressWarnings("unchecked") @@ -227,144 +262,222 @@ public class LookasideLbTest { } @Test - public void handleChildPolicyChangeThenBalancerNameChangeThenChildPolicyChange() + public void canHandleEmptyAddressListFromNameResolution() { + assertThat(lookasideLb.canHandleEmptyAddressListFromNameResolution()).isTrue(); + } + + @Test + public void handleNameResolutionErrorBeforeAndAfterEdsWorkding() throws Exception { + XdsClientFactory xdsClientFactory = new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + return mock(XdsClient.class); + } + }; + ObjectPool xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory); + XdsClient xdsClientFromResolver = xdsClientRef.getObject(); + + String lbConfigRaw = + "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}" + .replace("'", "\""); + @SuppressWarnings("unchecked") + Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) + .build(); + + lookasideLb.handleResolvedAddresses(resolvedAddresses); + + assertThat(helpers).hasSize(1); + assertThat(localityStores).hasSize(1); + ArgumentCaptor endpointWatcherCaptor = + ArgumentCaptor.forClass(EndpointWatcher.class); + verify(xdsClientFromResolver).watchEndpointData( + eq("edsServiceName1"), endpointWatcherCaptor.capture()); + EndpointWatcher endpointWatcher = endpointWatcherCaptor.getValue(); + + // handleResolutionError() before receiving any endpoint update. + lookasideLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status")); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + + // Endpoint update received. + endpointWatcher.onEndpointChanged( + EndpointUpdate.newBuilder().setClusterName("edsServiceName1").build()); + + // handleResolutionError() after receiving endpoint update. + lookasideLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status")); + // No more TRANSIENT_FAILURE. + verify(helper, times(1)).updateBalancingState( + eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void handleEdsServiceNameChangeInXdsConfig_swtichGracefully() throws Exception { assertThat(helpers).isEmpty(); assertThat(localityStores).isEmpty(); assertThat(loadReportClients).isEmpty(); List eags = ImmutableList.of(); - String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; + XdsClientFactory xdsClientFactory = new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + return mock(XdsClient.class); + } + }; + ObjectPool xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory); + XdsClient xdsClientFromResolver = xdsClientRef.getObject(); + + String lbConfigRaw = + "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}" + .replace("'", "\""); @SuppressWarnings("unchecked") - Map lbConfig11 = (Map) JsonParser.parse(lbConfigRaw11); + Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() .setAddresses(eags) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); assertThat(helpers).hasSize(1); assertThat(localityStores).hasSize(1); - assertThat(loadReportClients).hasSize(1); - Helper helper1 = helpers.get(0); + Helper helper1 = helpers.peekLast(); + LocalityStore localityStore1 = localityStores.peekLast(); SubchannelPicker picker1 = mock(SubchannelPicker.class); helper1.updateBalancingState(CONNECTING, picker1); verify(helper).updateBalancingState(CONNECTING, picker1); - String lbConfigRaw12 = "{" - + "\"balancerName\" : \"dns:///balancer1.example.com:8080\"," - + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]" - + "}"; - @SuppressWarnings("unchecked") - Map lbConfig12 = (Map) JsonParser.parse(lbConfigRaw12); + // Change edsServicename to edsServiceName2. + lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName2'}" + .replace("'", "\""); + lbConfig = (Map) JsonParser.parse(lbConfigRaw); resolvedAddresses = ResolvedAddresses.newBuilder() .setAddresses(eags) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig12).build()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); - - LocalityStore localityStore1 = Iterables.getOnlyElement(localityStores); - LoadReportClient loadReportClient1 = Iterables.getOnlyElement(loadReportClients); - verify(localityStore1, never()).reset(); - verify(loadReportClient1, never()).stopLoadReporting(); - assertThat(helpers).hasSize(1); - assertThat(localityStores).hasSize(1); - assertThat(loadReportClients).hasSize(1); - - // change balancer name policy to balancer2.example.com - String lbConfigRaw21 = "{\"balancerName\" : \"dns:///balancer2.example.com:8080\"}"; - @SuppressWarnings("unchecked") - Map lbConfig21 = (Map) JsonParser.parse(lbConfigRaw21); - resolvedAddresses = ResolvedAddresses.newBuilder() - .setAddresses(eags) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig21).build()) - .build(); - lookasideLb.handleResolvedAddresses(resolvedAddresses); - - verify(localityStore1).reset(); - verify(loadReportClient1).stopLoadReporting(); assertThat(helpers).hasSize(2); assertThat(localityStores).hasSize(2); - assertThat(loadReportClients).hasSize(2); - Helper helper2 = helpers.get(1); - LocalityStore localityStore2 = localityStores.get(1); - LoadReportClient loadReportClient2 = loadReportClients.get(1); - - picker1 = mock(SubchannelPicker.class); - helper1.updateBalancingState(CONNECTING, picker1); - verify(helper, never()).updateBalancingState(CONNECTING, picker1); + Helper helper2 = helpers.peekLast(); + LocalityStore localityStore2 = localityStores.peekLast(); SubchannelPicker picker2 = mock(SubchannelPicker.class); helper2.updateBalancingState(CONNECTING, picker2); - // balancer1 was not READY, so balancer2 will update picker immediately verify(helper).updateBalancingState(CONNECTING, picker2); + verify(localityStore1).reset(); + helper2.updateBalancingState(READY, picker2); + verify(helper).updateBalancingState(READY, picker2); - String lbConfigRaw22 = "{" - + "\"balancerName\" : \"dns:///balancer2.example.com:8080\"," - + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]" - + "}"; - @SuppressWarnings("unchecked") - Map lbConfig22 = (Map) JsonParser.parse(lbConfigRaw22); + // Change edsServiceName to edsServiceName3. + lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName3'}" + .replace("'", "\""); + lbConfig = (Map) JsonParser.parse(lbConfigRaw); resolvedAddresses = ResolvedAddresses.newBuilder() .setAddresses(eags) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig22).build()) - .build(); - lookasideLb.handleResolvedAddresses(resolvedAddresses); - - assertThat(helpers).hasSize(2); - assertThat(localityStores).hasSize(2); - - SubchannelPicker picker3 = mock(SubchannelPicker.class); - helper2.updateBalancingState(READY, picker3); - verify(helper).updateBalancingState(READY, picker3); - - String lbConfigRaw3 = "{" - + "\"balancerName\" : \"dns:///balancer3.example.com:8080\"," - + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_3\" : {}}]" - + "}"; - @SuppressWarnings("unchecked") - Map lbConfig3 = (Map) JsonParser.parse(lbConfigRaw3); - resolvedAddresses = ResolvedAddresses.newBuilder() - .setAddresses(eags) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig3).build()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) .build(); lookasideLb.handleResolvedAddresses(resolvedAddresses); assertThat(helpers).hasSize(3); + assertThat(localityStores).hasSize(3); + Helper helper3 = helpers.peekLast(); + LocalityStore localityStore3 = localityStores.peekLast(); - Helper helper3 = helpers.get(2); - SubchannelPicker picker4 = mock(SubchannelPicker.class); - helper3.updateBalancingState(CONNECTING, picker4); - // balancer2 was READY, so balancer3 will gracefully switch and not update non-READY picker - verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker4)); + SubchannelPicker picker3 = mock(SubchannelPicker.class); + helper3.updateBalancingState(CONNECTING, picker3); + verify(helper, never()).updateBalancingState(CONNECTING, picker3); verify(localityStore2, never()).reset(); - verify(loadReportClient2, never()).stopLoadReporting(); - - SubchannelPicker picker5 = mock(SubchannelPicker.class); - helper3.updateBalancingState(READY, picker5); - verify(helper).updateBalancingState(READY, picker5); + picker2 = mock(SubchannelPicker.class); + helper2.updateBalancingState(CONNECTING, picker2); + // The old balancer becomes not READY, so the new balancer will update picker immediately. + verify(helper).updateBalancingState(CONNECTING, picker3); verify(localityStore2).reset(); - verify(loadReportClient2).stopLoadReporting(); - verify(localityStores.get(2), never()).reset(); - verify(loadReportClients.get(2), never()).stopLoadReporting(); + // Change edsServiceName to edsServiceName4. + lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName4'}" + .replace("'", "\""); + lbConfig = (Map) JsonParser.parse(lbConfigRaw); + resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(eags) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) + .build(); + lookasideLb.handleResolvedAddresses(resolvedAddresses); + + assertThat(helpers).hasSize(4); + assertThat(localityStores).hasSize(4); + Helper helper4 = helpers.peekLast(); + LocalityStore localityStore4 = localityStores.peekLast(); + verify(localityStore3).reset(); + SubchannelPicker picker4 = mock(SubchannelPicker.class); + helper4.updateBalancingState(READY, picker4); + verify(helper).updateBalancingState(READY, picker4); + + // Change edsServiceName to edsServiceName5. + lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName5'}" + .replace("'", "\""); + lbConfig = (Map) JsonParser.parse(lbConfigRaw); + resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(eags) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) + .build(); + lookasideLb.handleResolvedAddresses(resolvedAddresses); + + assertThat(helpers).hasSize(5); + assertThat(localityStores).hasSize(5); + + Helper helper5 = helpers.peekLast(); + LocalityStore localityStore5 = localityStores.peekLast(); + SubchannelPicker picker5 = mock(SubchannelPicker.class); + helper5.updateBalancingState(CONNECTING, picker5); + // The old balancer was READY, so the new balancer will gracefully switch and not update + // non-READY picker. + verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker5)); + verify(localityStore4, never()).reset(); + + helper5.updateBalancingState(READY, picker5); + verify(helper).updateBalancingState(READY, picker5); + verify(localityStore4).reset(); + + verify(localityStore5, never()).reset(); lookasideLb.shutdown(); - verify(localityStores.get(2)).reset(); - verify(loadReportClients.get(2)).stopLoadReporting(); + verify(localityStore5).reset(); + + xdsClientRef.returnObject(xdsClientFromResolver); } + @Deprecated // balancerName will be unsupported. @Test public void handleResolvedAddress_createLbChannel() throws Exception { // Test balancer created with the default real LookasideChannelLbFactory - lookasideLb = new LookasideLb(helper, mock(EdsUpdateCallback.class)); - String lbConfigRaw11 = "{'balancerName' : 'dns:///balancer1.example.com:8080'}" + lookasideLb = new LookasideLb(helper, mock(EndpointUpdateCallback.class)); + String lbConfigRaw = "{'balancerName' : 'dns:///balancer1.example.com:8080'}" .replace("'", "\""); @SuppressWarnings("unchecked") - Map lbConfig11 = (Map) JsonParser.parse(lbConfigRaw11); + Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() .setAddresses(ImmutableList.of()) - .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build()) + .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build()) .build(); verify(helper, never()).createResolvingOobChannel(anyString()); @@ -374,6 +487,88 @@ public class LookasideLbTest { lookasideLb.shutdown(); } + @Test + public void handleResolvedAddress_withBootstrap() throws Exception { + BootstrapInfo bootstrapInfo = new BootstrapInfo( + "trafficdirector.googleapis.com", ImmutableList.of(), + Node.getDefaultInstance()); + doReturn(bootstrapInfo).when(bootstrapper).readBootstrap(); + + String lbConfigRaw = + "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}" + .replace("'", "\""); + @SuppressWarnings("unchecked") + Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .build()) + .build(); + + verify(helper, never()).createResolvingOobChannel(anyString()); + lookasideLb.handleResolvedAddresses(resolvedAddresses); + verify(helper).createResolvingOobChannel("trafficdirector.googleapis.com"); + + assertThat(helpers).hasSize(1); + assertThat(localityStores).hasSize(1); + Helper helper1 = helpers.peekLast(); + LocalityStore localityStore1 = localityStores.peekLast(); + SubchannelPicker picker = mock(SubchannelPicker.class); + helper1.updateBalancingState(READY, picker); + verify(helper).updateBalancingState(READY, picker); + + lookasideLb.shutdown(); + verify(localityStore1).reset(); + } + + @Test + public void handleResolvedAddress_withXdsClientRefAttributes() throws Exception { + XdsClientFactory xdsClientFactory = new XdsClientFactory() { + @Override + XdsClient createXdsClient() { + return mock(XdsClient.class); + } + }; + ObjectPool xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory); + XdsClient xdsClientFromResolver = xdsClientRef.getObject(); + + String lbConfigRaw = + "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}" + .replace("'", "\""); + @SuppressWarnings("unchecked") + Map lbConfig = (Map) JsonParser.parse(lbConfigRaw); + ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setAttributes(Attributes.newBuilder() + .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig) + .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef) + .build()) + .build(); + + lookasideLb.handleResolvedAddresses(resolvedAddresses); + + assertThat(helpers).hasSize(1); + assertThat(localityStores).hasSize(1); + ArgumentCaptor endpointWatcherCaptor = + ArgumentCaptor.forClass(EndpointWatcher.class); + verify(xdsClientFromResolver).watchEndpointData( + eq("edsServiceName1"), endpointWatcherCaptor.capture()); + EndpointWatcher endpointWatcher = endpointWatcherCaptor.getValue(); + + Helper helper1 = helpers.peekLast(); + SubchannelPicker picker = mock(SubchannelPicker.class); + helper1.updateBalancingState(READY, picker); + verify(helper).updateBalancingState(READY, picker); + + // Mimic resolver shutdown + xdsClientRef.returnObject(xdsClientFromResolver); + verify(xdsClientFromResolver, never()).shutdown(); + lookasideLb.shutdown(); + verify(xdsClientFromResolver).cancelEndpointDataWatch("edsServiceName1", endpointWatcher); + verify(xdsClientFromResolver).shutdown(); + } + @Test public void firstAndSecondEdsResponseReceived() { lookasideLb.handleResolvedAddresses(defaultResolvedAddress); diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java index ae3c309445..db8b82e2ff 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancer2Test.java @@ -40,7 +40,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; -import io.grpc.xds.LookasideLb.EdsUpdateCallback; +import io.grpc.xds.LookasideLb.EndpointUpdateCallback; import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory; import java.util.ArrayList; import java.util.List; @@ -75,7 +75,7 @@ public class XdsLoadBalancer2Test { @Mock private Helper helper; private LoadBalancer xdsLoadBalancer; - private EdsUpdateCallback edsUpdateCallback; + private EndpointUpdateCallback edsUpdateCallback; private Helper lookasideLbHelper; private final List lookasideLbs = new ArrayList<>(); @@ -90,7 +90,7 @@ public class XdsLoadBalancer2Test { LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() { @Override public LoadBalancer newLoadBalancer( - Helper helper, EdsUpdateCallback edsUpdateCallback) { + Helper helper, EndpointUpdateCallback edsUpdateCallback) { // just return a mock and record the input and output lookasideLbHelper = helper; XdsLoadBalancer2Test.this.edsUpdateCallback = edsUpdateCallback;