xds: refactor to eliminate LookasideChannelLb

This is a followup of #6425 to further refactor/cleanup. Now that `LookasideChannelLb` does very little thing, it does not need to be a class anymore.
This commit is contained in:
ZHANG Dapeng 2019-11-19 17:46:43 -08:00 committed by GitHub
parent c7a6f62831
commit 17c3e48240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 758 additions and 776 deletions

View File

@ -28,14 +28,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
@ -76,12 +74,27 @@ interface LocalityStore {
void updateDropPercentage(List<DropOverload> dropOverloads);
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState);
void updateOobMetricsReportInterval(long reportIntervalNano);
LoadStatsStore getLoadStatsStore();
@VisibleForTesting
abstract class LocalityStoreFactory {
private static final LocalityStoreFactory DEFAULT_INSTANCE =
new LocalityStoreFactory() {
@Override
LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) {
return new LocalityStoreImpl(helper, lbRegistry);
}
};
static LocalityStoreFactory getInstance() {
return DEFAULT_INSTANCE;
}
abstract LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry);
}
final class LocalityStoreImpl implements LocalityStore {
private static final String ROUND_ROBIN = "round_robin";
private static final long DELAYED_DELETION_TIMEOUT_MINUTES = 15L;
@ -176,16 +189,6 @@ interface LocalityStore {
}
};
// This is triggered by xdsLoadbalancer.handleSubchannelState
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
// delegate to the childBalancer who manages this subchannel
for (LocalityLbInfo localityLbInfo : localityMap.values()) {
// This will probably trigger childHelper.updateBalancingState
localityLbInfo.childBalancer.handleSubchannelState(subchannel, newState);
}
}
@Override
public void reset() {
for (Locality locality : localityMap.keySet()) {

View File

@ -1,152 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import java.util.List;
import java.util.Map;
/**
* A load balancer that has a lookaside channel. This layer of load balancer creates a channel to
* the remote load balancer. LrsClient, LocalityStore and XdsComms are three branches below this
* layer, and their implementations are provided by their factories.
*/
final class LookasideChannelLb extends LoadBalancer {
private final LoadReportClient lrsClient;
private final XdsClient xdsClient;
LookasideChannelLb(
String edsServiceName,
LookasideChannelCallback lookasideChannelCallback,
XdsClient xdsClient,
LoadReportClient lrsClient,
final LocalityStore localityStore) {
this.xdsClient = xdsClient;
LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {
localityStore.updateOobMetricsReportInterval(reportIntervalNano);
}
};
this.lrsClient = lrsClient;
EndpointWatcher endpointWatcher = new EndpointWatcherImpl(
lookasideChannelCallback, lrsClient, lrsCallback, localityStore) ;
xdsClient.watchEndpointData(edsServiceName, endpointWatcher);
}
@Override
public void handleNameResolutionError(Status error) {
// NO-OP?
}
@Override
public void shutdown() {
lrsClient.stopLoadReporting();
xdsClient.shutdown();
}
private static final class EndpointWatcherImpl implements EndpointWatcher {
final LookasideChannelCallback lookasideChannelCallback;
final LoadReportClient lrsClient;
final LoadReportCallback lrsCallback;
final LocalityStore localityStore;
boolean firstEdsResponseReceived;
EndpointWatcherImpl(
LookasideChannelCallback lookasideChannelCallback, LoadReportClient lrsClient,
LoadReportCallback lrsCallback, LocalityStore localityStore) {
this.lookasideChannelCallback = lookasideChannelCallback;
this.lrsClient = lrsClient;
this.lrsCallback = lrsCallback;
this.localityStore = localityStore;
}
@Override
public void onEndpointChanged(EndpointUpdate endpointUpdate) {
if (!firstEdsResponseReceived) {
firstEdsResponseReceived = true;
lookasideChannelCallback.onWorking();
lrsClient.startLoadReporting(lrsCallback);
}
List<DropOverload> dropOverloads = endpointUpdate.getDropPolicies();
ImmutableList.Builder<DropOverload> dropOverloadsBuilder = ImmutableList.builder();
for (DropOverload dropOverload : dropOverloads) {
dropOverloadsBuilder.add(dropOverload);
if (dropOverload.getDropsPerMillion() == 1_000_000) {
lookasideChannelCallback.onAllDrop();
break;
}
}
localityStore.updateDropPercentage(dropOverloadsBuilder.build());
ImmutableMap.Builder<Locality, LocalityLbEndpoints> localityEndpointsMapping =
new ImmutableMap.Builder<>();
for (Map.Entry<Locality, LocalityLbEndpoints> entry
: endpointUpdate.getLocalityLbEndpointsMap().entrySet()) {
int localityWeight = entry.getValue().getLocalityWeight();
if (localityWeight != 0) {
localityEndpointsMapping.put(entry.getKey(), entry.getValue());
}
}
localityStore.updateLocalityStore(localityEndpointsMapping.build());
}
@Override
public void onError(Status error) {
lookasideChannelCallback.onError();
}
}
/**
* Callback on ADS stream events. The callback methods should be called in a proper {@link
* io.grpc.SynchronizationContext}.
*/
interface LookasideChannelCallback {
/**
* Once the response observer receives the first response.
*/
void onWorking();
/**
* Once an error occurs in ADS stream.
*/
void onError();
/**
* Once receives a response indicating that 100% of calls should be dropped.
*/
void onAllDrop();
}
}

View File

@ -22,6 +22,8 @@ import static io.grpc.xds.XdsNameResolver.XDS_NODE;
import static java.util.logging.Level.FINEST;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.envoyproxy.envoy.api.v2.core.Node;
@ -32,46 +34,60 @@ import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.LocalityStore.LocalityStoreImpl;
import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/** Lookaside load balancer that handles balancer name changes. */
final class LookasideLb extends ForwardingLoadBalancer {
private final LookasideChannelCallback lookasideChannelCallback;
private final LookasideChannelLbFactory lookasideChannelLbFactory;
private final EdsUpdateCallback edsUpdateCallback;
private final GracefulSwitchLoadBalancer lookasideChannelLb;
private final LoadBalancerRegistry lbRegistry;
private final LocalityStoreFactory localityStoreFactory;
private final LoadReportClientFactory loadReportClientFactory;
private String balancerName;
LookasideLb(Helper lookasideLbHelper, LookasideChannelCallback lookasideChannelCallback) {
LookasideLb(Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) {
this(
lookasideLbHelper, lookasideChannelCallback, new LookasideChannelLbFactoryImpl(),
LoadBalancerRegistry.getDefaultRegistry());
lookasideLbHelper,
edsUpdateCallback,
LoadBalancerRegistry.getDefaultRegistry(),
LocalityStoreFactory.getInstance(),
LoadReportClientFactory.getInstance());
}
@VisibleForTesting
LookasideLb(
Helper lookasideLbHelper,
LookasideChannelCallback lookasideChannelCallback,
LookasideChannelLbFactory lookasideChannelLbFactory,
LoadBalancerRegistry lbRegistry) {
this.lookasideChannelCallback = lookasideChannelCallback;
this.lookasideChannelLbFactory = lookasideChannelLbFactory;
EdsUpdateCallback edsUpdateCallback,
LoadBalancerRegistry lbRegistry,
LocalityStoreFactory localityStoreFactory,
LoadReportClientFactory loadReportClientFactory) {
this.edsUpdateCallback = edsUpdateCallback;
this.lbRegistry = lbRegistry;
this.lookasideChannelLb = new GracefulSwitchLoadBalancer(lookasideLbHelper);
this.localityStoreFactory = localityStoreFactory;
this.loadReportClientFactory = loadReportClientFactory;
}
@Override
@ -93,35 +109,33 @@ final class LookasideLb extends ForwardingLoadBalancer {
}
XdsConfig xdsConfig = (XdsConfig) cfg.getConfig();
String newBalancerName = xdsConfig.balancerName;
final String newBalancerName = xdsConfig.balancerName;
// The is to handle the legacy usecase that requires balancerName from xds config.
if (!newBalancerName.equals(balancerName)) {
balancerName = newBalancerName; // cache the name and check next time for optimization
Node node = resolvedAddresses.getAttributes().get(XDS_NODE);
if (node == null) {
Node nodeFromResolvedAddresses = resolvedAddresses.getAttributes().get(XDS_NODE);
final Node node;
if (nodeFromResolvedAddresses == null) {
node = Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
"endpoints_required",
Value.newBuilder().setBoolValue(true).build()))
.build();
} else {
node = nodeFromResolvedAddresses;
}
List<ChannelCreds> channelCredsList =
List<ChannelCreds> channelCredsListFromResolvedAddresses =
resolvedAddresses.getAttributes().get(XDS_CHANNEL_CREDS_LIST);
if (channelCredsList == null) {
final List<ChannelCreds> channelCredsList;
if (channelCredsListFromResolvedAddresses == null) {
channelCredsList = Collections.emptyList();
}
lookasideChannelLb.switchTo(newLookasideChannelLbProvider(
newBalancerName, node, channelCredsList));
} else {
channelCredsList = channelCredsListFromResolvedAddresses;
}
lookasideChannelLb.handleResolvedAddresses(resolvedAddresses);
}
private LoadBalancerProvider newLookasideChannelLbProvider(
final String balancerName, final Node node, final List<ChannelCreds> channelCredsList) {
return new LoadBalancerProvider() {
LoadBalancerProvider childBalancerProvider = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
@ -133,47 +147,69 @@ final class LookasideLb extends ForwardingLoadBalancer {
}
/**
* A synthetic policy name for LookasideChannelLb identified by balancerName. The
* implementation detail doesn't matter.
* A synthetic policy name identified by balancerName. The implementation detail doesn't
* matter.
*/
@Override
public String getPolicyName() {
return "xds_child_policy_balancer_name_" + balancerName;
return "xds_child_policy_balancer_name_" + newBalancerName;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return lookasideChannelLbFactory.newLoadBalancer(
helper, lookasideChannelCallback, balancerName, node, channelCredsList);
public LoadBalancer newLoadBalancer(final Helper helper) {
return new LoadBalancer() {
@Nullable
XdsClient xdsClient;
@Nullable
LocalityStore localityStore;
@Nullable
LoadReportClient lrsClient;
@Override
public void handleNameResolutionError(Status error) {}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (xdsClient == null) {
ManagedChannel channel = initLbChannel(helper, newBalancerName, channelCredsList);
xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, node);
localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry);
// TODO(zdapeng): Use XdsClient to do Lrs directly.
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(),
localityStore.getLoadStatsStore());
final LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {
localityStore.updateOobMetricsReportInterval(reportIntervalNano);
}
};
EndpointWatcher endpointWatcher =
new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore);
xdsClient.watchEndpointData(node.getCluster(), endpointWatcher);
}
}
@Override
public void shutdown() {
if (xdsClient != null) {
lrsClient.stopLoadReporting();
localityStore.reset();
xdsClient.shutdown();
}
}
};
}
};
@VisibleForTesting
interface LookasideChannelLbFactory {
LoadBalancer newLoadBalancer(
Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName,
Node node, List<ChannelCreds> channelCredsList);
lookasideChannelLb.switchTo(childBalancerProvider);
}
private static final class LookasideChannelLbFactoryImpl implements LookasideChannelLbFactory {
@Override
public LoadBalancer newLoadBalancer(
Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName,
Node node, List<ChannelCreds> channelCredsList) {
ManagedChannel channel = initLbChannel(helper, balancerName, channelCredsList);
XdsClient xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, node);
LocalityStore localityStore =
new LocalityStoreImpl(helper, LoadBalancerRegistry.getDefaultRegistry());
// TODO(zdapeng): Use XdsClient to do Lrs directly.
LoadReportClient lrsClient = new LoadReportClientImpl(
channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, new ExponentialBackoffPolicy.Provider(),
localityStore.getLoadStatsStore());
return new LookasideChannelLb(
node.getCluster(), lookasideChannelCallback, xdsClient, lrsClient, localityStore);
lookasideChannelLb.handleResolvedAddresses(resolvedAddresses);
}
private static ManagedChannel initLbChannel(
@ -186,7 +222,7 @@ final class LookasideLb extends ForwardingLoadBalancer {
} catch (UnsupportedOperationException uoe) {
// Temporary solution until createResolvingOobChannel is implemented
// FIXME (https://github.com/grpc/grpc-java/issues/5495)
Logger logger = Logger.getLogger(LookasideChannelLb.class.getName());
Logger logger = Logger.getLogger(LookasideLb.class.getName());
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
@ -209,5 +245,69 @@ final class LookasideLb extends ForwardingLoadBalancer {
}
return channel;
}
/**
* Callbacks for the EDS-only-with-fallback usecase. Being deprecated.
*/
interface EdsUpdateCallback {
void onWorking();
void onError();
void onAllDrop();
}
private final class EndpointWatcherImpl implements EndpointWatcher {
final LoadReportClient lrsClient;
final LoadReportCallback lrsCallback;
final LocalityStore localityStore;
boolean firstEdsUpdateReceived;
EndpointWatcherImpl(
LoadReportClient lrsClient, LoadReportCallback lrsCallback, LocalityStore localityStore) {
this.lrsClient = lrsClient;
this.lrsCallback = lrsCallback;
this.localityStore = localityStore;
}
@Override
public void onEndpointChanged(EndpointUpdate endpointUpdate) {
if (!firstEdsUpdateReceived) {
firstEdsUpdateReceived = true;
edsUpdateCallback.onWorking();
lrsClient.startLoadReporting(lrsCallback);
}
List<DropOverload> dropOverloads = endpointUpdate.getDropPolicies();
ImmutableList.Builder<DropOverload> dropOverloadsBuilder = ImmutableList.builder();
for (DropOverload dropOverload : dropOverloads) {
dropOverloadsBuilder.add(dropOverload);
if (dropOverload.getDropsPerMillion() == 1_000_000) {
edsUpdateCallback.onAllDrop();
break;
}
}
localityStore.updateDropPercentage(dropOverloadsBuilder.build());
ImmutableMap.Builder<Locality, LocalityLbEndpoints> localityEndpointsMapping =
new ImmutableMap.Builder<>();
for (Map.Entry<Locality, LocalityLbEndpoints> entry
: endpointUpdate.getLocalityLbEndpointsMap().entrySet()) {
int localityWeight = entry.getValue().getLocalityWeight();
if (localityWeight != 0) {
localityEndpointsMapping.put(entry.getKey(), entry.getValue());
}
}
localityStore.updateLocalityStore(localityEndpointsMapping.build());
}
@Override
public void onError(Status error) {
edsUpdateCallback.onError();
}
}
}

View File

@ -27,7 +27,7 @@ import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback;
import io.grpc.xds.LookasideLb.EdsUpdateCallback;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
@ -46,7 +46,7 @@ final class XdsLoadBalancer2 extends LoadBalancer {
private final Helper helper;
private final LoadBalancer lookasideLb;
private final LoadBalancer.Factory fallbackLbFactory;
private final LookasideChannelCallback lookasideChannelCallback = new LookasideChannelCallback() {
private final EdsUpdateCallback edsUpdateCallback = new EdsUpdateCallback() {
@Override
public void onWorking() {
if (childPolicyHasBeenReady) {
@ -94,7 +94,7 @@ final class XdsLoadBalancer2 extends LoadBalancer {
LoadBalancer.Factory fallbackLbFactory) {
this.helper = helper;
this.lookasideLb = lookasideLbFactory.newLoadBalancer(new LookasideLbHelper(),
lookasideChannelCallback);
edsUpdateCallback);
this.fallbackLbFactory = fallbackLbFactory;
}
@ -247,14 +247,14 @@ final class XdsLoadBalancer2 extends LoadBalancer {
/** Factory of a look-aside load balancer. The interface itself is for convenience in test. */
@VisibleForTesting
interface LookasideLbFactory {
LoadBalancer newLoadBalancer(Helper helper, LookasideChannelCallback lookasideChannelCallback);
LoadBalancer newLoadBalancer(Helper helper, EdsUpdateCallback edsUpdateCallback);
}
private static final class LookasideLbFactoryImpl implements LookasideLbFactory {
@Override
public LoadBalancer newLoadBalancer(
Helper lookasideLbHelper, LookasideChannelCallback lookasideChannelCallback) {
return new LookasideLb(lookasideLbHelper, lookasideChannelCallback);
Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) {
return new LookasideLb(lookasideLbHelper, edsUpdateCallback);
}
}

View File

@ -1,452 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.ChannelLogger;
import io.grpc.LoadBalancer.Helper;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Tests for {@link LookasideChannelLb}.
*/
@RunWith(JUnit4.class)
public class LookasideChannelLbTest {
private static final String SERVICE_AUTHORITY = "test authority";
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final StreamRecorder<DiscoveryRequest> streamRecorder = StreamRecorder.create();
private final DiscoveryResponse edsResponse =
DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
@Mock
private Helper helper;
@Mock
private LookasideChannelCallback lookasideChannelCallback;
@Mock
private LoadReportClient loadReportClient;
@Mock
private LocalityStore localityStore;
@Mock
private LoadStatsStore loadStatsStore;
private ManagedChannel channel;
private StreamObserver<DiscoveryResponse> serverResponseWriter;
@Captor
private ArgumentCaptor<ImmutableMap<Locality, LocalityLbEndpoints>>
localityEndpointsMappingCaptor;
private LookasideChannelLb lookasideChannelLb;
@Before
public void setUp() throws Exception {
AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
serverResponseWriter = responseObserver;
return new StreamObserver<DiscoveryRequest>() {
@Override
public void onNext(DiscoveryRequest value) {
streamRecorder.onNext(value);
}
@Override
public void onError(Throwable t) {
streamRecorder.onError(t);
}
@Override
public void onCompleted() {
streamRecorder.onCompleted();
responseObserver.onCompleted();
}
};
}
};
String serverName = InProcessServerBuilder.generateName();
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.directExecutor()
.addService(serviceImpl)
.build()
.start());
channel = cleanupRule.register(
InProcessChannelBuilder
.forName(serverName)
.directExecutor()
.build());
doReturn(SERVICE_AUTHORITY).when(helper).getAuthority();
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
doReturn(loadStatsStore).when(localityStore).getLoadStatsStore();
XdsClient xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, Node.getDefaultInstance());
lookasideChannelLb = new LookasideChannelLb(
"cluster1", lookasideChannelCallback, xdsClient, loadReportClient, localityStore);
}
@Test
public void firstAndSecondEdsResponseReceived() {
verify(lookasideChannelCallback, never()).onWorking();
verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class));
// first EDS response
serverResponseWriter.onNext(edsResponse);
verify(lookasideChannelCallback).onWorking();
ArgumentCaptor<LoadReportCallback> loadReportCallbackCaptor =
ArgumentCaptor.forClass(LoadReportCallback.class);
verify(loadReportClient).startLoadReporting(loadReportCallbackCaptor.capture());
LoadReportCallback loadReportCallback = loadReportCallbackCaptor.getValue();
// second EDS response
serverResponseWriter.onNext(edsResponse);
verify(lookasideChannelCallback, times(1)).onWorking();
verify(loadReportClient, times(1)).startLoadReporting(any(LoadReportCallback.class));
verify(localityStore, never()).updateOobMetricsReportInterval(anyLong());
loadReportCallback.onReportResponse(1234);
verify(localityStore).updateOobMetricsReportInterval(1234);
verify(lookasideChannelCallback, never()).onError();
lookasideChannelLb.shutdown();
}
@Test
public void handleDropUpdates() {
verify(localityStore, never()).updateDropPercentage(
ArgumentMatchers.<ImmutableList<DropOverload>>any());
serverResponseWriter.onNext(edsResponse);
verify(localityStore).updateDropPercentage(eq(ImmutableList.<DropOverload>of()));
ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.setPolicy(Policy.newBuilder()
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(3)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.TEN_THOUSAND)
.setNumerator(45)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.MILLION)
.setNumerator(6789)
.build())
.build())
.build())
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
verify(lookasideChannelCallback, never()).onAllDrop();
verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("cat_1", 300_00),
new DropOverload("cat_2", 45_00),
new DropOverload("cat_3", 6789)));
clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.setPolicy(Policy.newBuilder()
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(3)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(101)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(23)
.build())
.build())
.build())
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
verify(lookasideChannelCallback).onAllDrop();
verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("cat_1", 300_00),
new DropOverload("cat_2", 100_00_00)));
verify(lookasideChannelCallback, never()).onError();
lookasideChannelLb.shutdown();
}
@Test
public void handleLocalityAssignmentUpdates() {
io.envoyproxy.envoy.api.v2.core.Locality localityProto1 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region1")
.setZone("zone1")
.setSubZone("subzone1")
.build();
LbEndpoint endpoint11 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr11").setPortValue(11))))
.setLoadBalancingWeight(UInt32Value.of(11))
.build();
LbEndpoint endpoint12 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr12").setPortValue(12))))
.setLoadBalancingWeight(UInt32Value.of(12))
.build();
io.envoyproxy.envoy.api.v2.core.Locality localityProto2 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region2")
.setZone("zone2")
.setSubZone("subzone2")
.build();
LbEndpoint endpoint21 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr21").setPortValue(21))))
.setLoadBalancingWeight(UInt32Value.of(21))
.build();
LbEndpoint endpoint22 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr22").setPortValue(22))))
.setLoadBalancingWeight(UInt32Value.of(22))
.build();
io.envoyproxy.envoy.api.v2.core.Locality localityProto3 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region3")
.setZone("zone3")
.setSubZone("subzone3")
.build();
LbEndpoint endpoint3 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr31").setPortValue(31))))
.setLoadBalancingWeight(UInt32Value.of(31))
.build();
ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.addLbEndpoints(endpoint12)
.setLoadBalancingWeight(UInt32Value.of(1)))
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto2)
.addLbEndpoints(endpoint21)
.addLbEndpoints(endpoint22)
.setLoadBalancingWeight(UInt32Value.of(2)))
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto3)
.addLbEndpoints(endpoint3)
.setLoadBalancingWeight(UInt32Value.of(0)))
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
Locality locality1 = Locality.fromEnvoyProtoLocality(localityProto1);
LocalityLbEndpoints localityInfo1 = new LocalityLbEndpoints(
ImmutableList.of(
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint11),
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint12)),
1, 0);
LocalityLbEndpoints localityInfo2 = new LocalityLbEndpoints(
ImmutableList.of(
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint21),
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint22)),
2, 0);
Locality locality2 = Locality.fromEnvoyProtoLocality(localityProto2);
InOrder inOrder = inOrder(localityStore);
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality1, localityInfo1, locality2, localityInfo2).inOrder();
verify(lookasideChannelCallback, never()).onError();
lookasideChannelLb.shutdown();
}
@Test
public void verifyRpcErrorPropagation() {
verify(lookasideChannelCallback, never()).onError();
serverResponseWriter.onError(new RuntimeException());
verify(lookasideChannelCallback).onError();
}
@Test
public void shutdown() {
verify(loadReportClient, never()).stopLoadReporting();
assertThat(channel.isShutdown()).isFalse();
lookasideChannelLb.shutdown();
verify(loadReportClient).stopLoadReporting();
assertThat(channel.isShutdown()).isTrue();
}
/**
* Tests load reporting is initiated after receiving the first valid EDS response from the traffic
* director, then its operation is independent of load balancing until xDS load balancer is
* shutdown.
*/
@Test
public void reportLoadAfterReceivingFirstEdsResponseUntilShutdown() {
// Simulates a syntactically incorrect EDS response.
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class));
verify(lookasideChannelCallback, never()).onWorking();
verify(lookasideChannelCallback, never()).onError();
// Simulate a syntactically correct EDS response.
DiscoveryResponse edsResponse =
DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
serverResponseWriter.onNext(edsResponse);
verify(lookasideChannelCallback).onWorking();
ArgumentCaptor<LoadReportCallback> lrsCallbackCaptor = ArgumentCaptor.forClass(null);
verify(loadReportClient).startLoadReporting(lrsCallbackCaptor.capture());
lrsCallbackCaptor.getValue().onReportResponse(19543);
verify(localityStore).updateOobMetricsReportInterval(19543);
// Simulate another EDS response from the same remote balancer.
serverResponseWriter.onNext(edsResponse);
verifyNoMoreInteractions(lookasideChannelCallback, loadReportClient);
// Simulate an EDS error response.
serverResponseWriter.onError(Status.ABORTED.asException());
verify(lookasideChannelCallback).onError();
verifyNoMoreInteractions(lookasideChannelCallback, loadReportClient);
verify(localityStore, times(1)).updateOobMetricsReportInterval(anyLong()); // only once
lookasideChannelLb.shutdown();
}
}

View File

@ -18,31 +18,78 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.ImmutableList;
import io.envoyproxy.envoy.api.v2.core.Node;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.type.FractionalPercent;
import io.envoyproxy.envoy.type.FractionalPercent.DenominatorType;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy.Provider;
import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback;
import io.grpc.xds.LookasideLb.LookasideChannelLbFactory;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.LookasideLb.EdsUpdateCallback;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Tests for {@link LookasideLb}.
@ -50,34 +97,142 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class LookasideLbTest {
private final Helper helper = mock(Helper.class);
private final List<Helper> helpers = new ArrayList<>();
private final List<LoadBalancer> balancers = new ArrayList<>();
private final LookasideChannelLbFactory lookasideChannelLbFactory =
new LookasideChannelLbFactory() {
private static final String SERVICE_AUTHORITY = "test authority";
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public LoadBalancer newLoadBalancer(
Helper helper, LookasideChannelCallback lookasideChannelCallback, String balancerName,
Node node, List<ChannelCreds> channelCredsList) {
// just return a mock and record helper and balancer.
helpers.add(helper);
LoadBalancer balancer = mock(LoadBalancer.class);
balancers.add(balancer);
assertThat(node).isNotNull();
return balancer;
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final StreamRecorder<DiscoveryRequest> streamRecorder = StreamRecorder.create();
private final DiscoveryResponse edsResponse =
DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
private final List<Helper> helpers = new ArrayList<>();
private final List<LocalityStore> localityStores = new ArrayList<>();
private final List<LoadReportClient> loadReportClients = new ArrayList<>();
private final FakeClock fakeClock = new FakeClock();
@Mock
private Helper helper;
@Mock
private EdsUpdateCallback edsUpdateCallback;
@Captor
private ArgumentCaptor<ImmutableMap<Locality, LocalityLbEndpoints>>
localityEndpointsMappingCaptor;
private ManagedChannel channel;
private ManagedChannel channel2;
private StreamObserver<DiscoveryResponse> serverResponseWriter;
private LocalityStoreFactory localityStoreFactory;
private LoadBalancer lookasideLb;
private ResolvedAddresses defaultResolvedAddress;
@Before
public void setUp() throws Exception {
AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
serverResponseWriter = responseObserver;
return new StreamObserver<DiscoveryRequest>() {
@Override
public void onNext(DiscoveryRequest value) {
streamRecorder.onNext(value);
}
@Override
public void onError(Throwable t) {
streamRecorder.onError(t);
}
@Override
public void onCompleted() {
streamRecorder.onCompleted();
responseObserver.onCompleted();
}
};
}
};
private LoadBalancer lookasideLb = new LookasideLb(
helper, mock(LookasideChannelCallback.class), lookasideChannelLbFactory,
new LoadBalancerRegistry());
String serverName = InProcessServerBuilder.generateName();
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.directExecutor()
.addService(serviceImpl)
.build()
.start());
channel = cleanupRule.register(
InProcessChannelBuilder
.forName(serverName)
.directExecutor()
.build());
channel2 = cleanupRule.register(
InProcessChannelBuilder
.forName(serverName)
.directExecutor()
.build());
doReturn(SERVICE_AUTHORITY).when(helper).getAuthority();
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
doReturn(channel, channel2).when(helper).createResolvingOobChannel(anyString());
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
localityStoreFactory = new LocalityStoreFactory() {
@Override
public LocalityStore newLocalityStore(Helper helper, LoadBalancerRegistry lbRegistry) {
helpers.add(helper);
LocalityStore localityStore = mock(LocalityStore.class);
LoadStatsStore loadStatsStore = mock(LoadStatsStore.class);
doReturn(loadStatsStore).when(localityStore).getLoadStatsStore();
localityStores.add(localityStore);
return localityStore;
}
};
LoadReportClientFactory loadReportClientFactory = new LoadReportClientFactory() {
@Override
LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper,
Provider backoffPolicyProvider, LoadStatsStore loadStatsStore) {
LoadReportClient loadReportClient = mock(LoadReportClient.class);
loadReportClients.add(loadReportClient);
return loadReportClient;
}
};
lookasideLb = new LookasideLb(
helper, edsUpdateCallback, new LoadBalancerRegistry(), localityStoreFactory,
loadReportClientFactory);
String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig11 = (Map<String, ?>) JsonParser.parse(lbConfigRaw11);
defaultResolvedAddress = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build())
.build();
}
@Test
public void handleChildPolicyChangeThenBalancerNameChangeThenChildPolicyChange()
throws Exception {
assertThat(helpers).isEmpty();
assertThat(balancers).isEmpty();
assertThat(localityStores).isEmpty();
assertThat(loadReportClients).isEmpty();
List<EquivalentAddressGroup> eags = ImmutableList.of();
String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}";
@ -90,10 +245,9 @@ public class LookasideLbTest {
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(1);
assertThat(balancers).hasSize(1);
assertThat(localityStores).hasSize(1);
assertThat(loadReportClients).hasSize(1);
Helper helper1 = helpers.get(0);
LoadBalancer balancer1 = balancers.get(0);
verify(balancer1).handleResolvedAddresses(resolvedAddresses);
SubchannelPicker picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1);
@ -111,11 +265,13 @@ public class LookasideLbTest {
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
verify(balancer1).handleResolvedAddresses(resolvedAddresses);
verify(balancer1, never()).shutdown();
LocalityStore localityStore1 = Iterables.getOnlyElement(localityStores);
LoadReportClient loadReportClient1 = Iterables.getOnlyElement(loadReportClients);
verify(localityStore1, never()).reset();
verify(loadReportClient1, never()).stopLoadReporting();
assertThat(helpers).hasSize(1);
assertThat(balancers).hasSize(1);
assertThat(localityStores).hasSize(1);
assertThat(loadReportClients).hasSize(1);
// change balancer name policy to balancer2.example.com
String lbConfigRaw21 = "{\"balancerName\" : \"dns:///balancer2.example.com:8080\"}";
@ -127,19 +283,21 @@ public class LookasideLbTest {
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
verify(balancer1).shutdown();
verify(localityStore1).reset();
verify(loadReportClient1).stopLoadReporting();
assertThat(helpers).hasSize(2);
assertThat(balancers).hasSize(2);
assertThat(localityStores).hasSize(2);
assertThat(loadReportClients).hasSize(2);
Helper helper2 = helpers.get(1);
LoadBalancer balancer2 = balancers.get(1);
verify(balancer1, never()).handleResolvedAddresses(resolvedAddresses);
verify(balancer2).handleResolvedAddresses(resolvedAddresses);
LocalityStore localityStore2 = localityStores.get(1);
LoadReportClient loadReportClient2 = loadReportClients.get(1);
picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1);
verify(helper, never()).updateBalancingState(CONNECTING, picker1);
SubchannelPicker picker2 = mock(SubchannelPicker.class);
helper2.updateBalancingState(CONNECTING, picker2);
// balancer1 was not READY, so balancer2 will update picker immediately
verify(helper).updateBalancingState(CONNECTING, picker2);
String lbConfigRaw22 = "{"
@ -154,21 +312,53 @@ public class LookasideLbTest {
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
verify(balancer2).handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(2);
assertThat(balancers).hasSize(2);
assertThat(localityStores).hasSize(2);
verify(balancer2, never()).shutdown();
SubchannelPicker picker3 = mock(SubchannelPicker.class);
helper2.updateBalancingState(READY, picker3);
verify(helper).updateBalancingState(READY, picker3);
String lbConfigRaw3 = "{"
+ "\"balancerName\" : \"dns:///balancer3.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_3\" : {}}]"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig3 = (Map<String, ?>) JsonParser.parse(lbConfigRaw3);
resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig3).build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(3);
Helper helper3 = helpers.get(2);
SubchannelPicker picker4 = mock(SubchannelPicker.class);
helper3.updateBalancingState(CONNECTING, picker4);
// balancer2 was READY, so balancer3 will gracefully switch and not update non-READY picker
verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker4));
verify(localityStore2, never()).reset();
verify(loadReportClient2, never()).stopLoadReporting();
SubchannelPicker picker5 = mock(SubchannelPicker.class);
helper3.updateBalancingState(READY, picker5);
verify(helper).updateBalancingState(READY, picker5);
verify(localityStore2).reset();
verify(loadReportClient2).stopLoadReporting();
verify(localityStores.get(2), never()).reset();
verify(loadReportClients.get(2), never()).stopLoadReporting();
lookasideLb.shutdown();
verify(balancer2).shutdown();
verify(localityStores.get(2)).reset();
verify(loadReportClients.get(2)).stopLoadReporting();
}
@Test
public void handleResolvedAddress_createLbChannel()
throws Exception {
// Test balancer created with the default real LookasideChannelLbFactory
lookasideLb = new LookasideLb(helper, mock(LookasideChannelCallback.class));
lookasideLb = new LookasideLb(helper, mock(EdsUpdateCallback.class));
String lbConfigRaw11 = "{'balancerName' : 'dns:///balancer1.example.com:8080'}"
.replace("'", "\"");
@SuppressWarnings("unchecked")
@ -179,11 +369,304 @@ public class LookasideLbTest {
.build();
verify(helper, never()).createResolvingOobChannel(anyString());
try {
lookasideLb.handleResolvedAddresses(resolvedAddresses);
} catch (RuntimeException e) {
// Expected because helper is a mock and helper.createResolvingOobChannel() returns null.
}
verify(helper).createResolvingOobChannel("dns:///balancer1.example.com:8080");
lookasideLb.shutdown();
}
@Test
public void firstAndSecondEdsResponseReceived() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
verify(edsUpdateCallback, never()).onWorking();
LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients);
verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class));
// first EDS response
serverResponseWriter.onNext(edsResponse);
verify(edsUpdateCallback).onWorking();
ArgumentCaptor<LoadReportCallback> loadReportCallbackCaptor =
ArgumentCaptor.forClass(LoadReportCallback.class);
verify(loadReportClient).startLoadReporting(loadReportCallbackCaptor.capture());
LoadReportCallback loadReportCallback = loadReportCallbackCaptor.getValue();
// second EDS response
serverResponseWriter.onNext(edsResponse);
verify(edsUpdateCallback, times(1)).onWorking();
verify(loadReportClient, times(1)).startLoadReporting(any(LoadReportCallback.class));
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
verify(localityStore, never()).updateOobMetricsReportInterval(anyLong());
loadReportCallback.onReportResponse(1234);
verify(localityStore).updateOobMetricsReportInterval(1234);
verify(edsUpdateCallback, never()).onError();
lookasideLb.shutdown();
}
@Test
public void handleDropUpdates() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
verify(localityStore, never()).updateDropPercentage(
ArgumentMatchers.<ImmutableList<DropOverload>>any());
serverResponseWriter.onNext(edsResponse);
verify(localityStore).updateDropPercentage(eq(ImmutableList.<DropOverload>of()));
ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.setPolicy(Policy.newBuilder()
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(3)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.TEN_THOUSAND)
.setNumerator(45)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.MILLION)
.setNumerator(6789)
.build())
.build())
.build())
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
verify(edsUpdateCallback, never()).onAllDrop();
verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("cat_1", 300_00),
new DropOverload("cat_2", 45_00),
new DropOverload("cat_3", 6789)));
clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.setPolicy(Policy.newBuilder()
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_1").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(3)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_2").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(101)
.build())
.build())
.addDropOverloads(Policy.DropOverload.newBuilder()
.setCategory("cat_3").setDropPercentage(FractionalPercent.newBuilder()
.setDenominator(DenominatorType.HUNDRED)
.setNumerator(23)
.build())
.build())
.build())
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
verify(edsUpdateCallback).onAllDrop();
verify(localityStore).updateDropPercentage(ImmutableList.of(
new DropOverload("cat_1", 300_00),
new DropOverload("cat_2", 100_00_00)));
verify(edsUpdateCallback, never()).onError();
lookasideLb.shutdown();
}
@Test
public void handleLocalityAssignmentUpdates() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
io.envoyproxy.envoy.api.v2.core.Locality localityProto1 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region1")
.setZone("zone1")
.setSubZone("subzone1")
.build();
LbEndpoint endpoint11 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr11").setPortValue(11))))
.setLoadBalancingWeight(UInt32Value.of(11))
.build();
LbEndpoint endpoint12 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr12").setPortValue(12))))
.setLoadBalancingWeight(UInt32Value.of(12))
.build();
io.envoyproxy.envoy.api.v2.core.Locality localityProto2 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region2")
.setZone("zone2")
.setSubZone("subzone2")
.build();
LbEndpoint endpoint21 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr21").setPortValue(21))))
.setLoadBalancingWeight(UInt32Value.of(21))
.build();
LbEndpoint endpoint22 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr22").setPortValue(22))))
.setLoadBalancingWeight(UInt32Value.of(22))
.build();
io.envoyproxy.envoy.api.v2.core.Locality localityProto3 =
io.envoyproxy.envoy.api.v2.core.Locality
.newBuilder()
.setRegion("region3")
.setZone("zone3")
.setSubZone("subzone3")
.build();
LbEndpoint endpoint3 = LbEndpoint.newBuilder()
.setEndpoint(Endpoint.newBuilder()
.setAddress(Address.newBuilder()
.setSocketAddress(SocketAddress.newBuilder()
.setAddress("addr31").setPortValue(31))))
.setLoadBalancingWeight(UInt32Value.of(31))
.build();
ClusterLoadAssignment clusterLoadAssignment = ClusterLoadAssignment.newBuilder()
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto1)
.addLbEndpoints(endpoint11)
.addLbEndpoints(endpoint12)
.setLoadBalancingWeight(UInt32Value.of(1)))
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto2)
.addLbEndpoints(endpoint21)
.addLbEndpoints(endpoint22)
.setLoadBalancingWeight(UInt32Value.of(2)))
.addEndpoints(io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints.newBuilder()
.setLocality(localityProto3)
.addLbEndpoints(endpoint3)
.setLoadBalancingWeight(UInt32Value.of(0)))
.build();
serverResponseWriter.onNext(
DiscoveryResponse.newBuilder()
.addResources(Any.pack(clusterLoadAssignment))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build());
Locality locality1 = Locality.fromEnvoyProtoLocality(localityProto1);
LocalityLbEndpoints localityInfo1 = new LocalityLbEndpoints(
ImmutableList.of(
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint11),
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint12)),
1, 0);
LocalityLbEndpoints localityInfo2 = new LocalityLbEndpoints(
ImmutableList.of(
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint21),
EnvoyProtoData.LbEndpoint.fromEnvoyProtoLbEndpoint(endpoint22)),
2, 0);
Locality locality2 = Locality.fromEnvoyProtoLocality(localityProto2);
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
InOrder inOrder = inOrder(localityStore);
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
locality1, localityInfo1, locality2, localityInfo2).inOrder();
verify(edsUpdateCallback, never()).onError();
lookasideLb.shutdown();
}
@Test
public void verifyRpcErrorPropagation() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
verify(edsUpdateCallback, never()).onError();
serverResponseWriter.onError(new RuntimeException());
verify(edsUpdateCallback).onError();
}
@Test
public void shutdown() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients);
verify(localityStore, never()).reset();
verify(loadReportClient, never()).stopLoadReporting();
assertThat(channel.isShutdown()).isFalse();
lookasideLb.shutdown();
verify(localityStore).reset();
verify(loadReportClient).stopLoadReporting();
assertThat(channel.isShutdown()).isTrue();
}
/**
* Tests load reporting is initiated after receiving the first valid EDS response from the traffic
* director, then its operation is independent of load balancing until xDS load balancer is
* shutdown.
*/
@Test
public void reportLoadAfterReceivingFirstEdsResponseUntilShutdown() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
// Simulates a syntactically incorrect EDS response.
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients);
verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class));
verify(edsUpdateCallback, never()).onWorking();
verify(edsUpdateCallback, never()).onError();
// Simulate a syntactically correct EDS response.
DiscoveryResponse edsResponse =
DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
serverResponseWriter.onNext(edsResponse);
verify(edsUpdateCallback).onWorking();
ArgumentCaptor<LoadReportCallback> lrsCallbackCaptor = ArgumentCaptor.forClass(null);
verify(loadReportClient).startLoadReporting(lrsCallbackCaptor.capture());
lrsCallbackCaptor.getValue().onReportResponse(19543);
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
verify(localityStore).updateOobMetricsReportInterval(19543);
// Simulate another EDS response from the same remote balancer.
serverResponseWriter.onNext(edsResponse);
verifyNoMoreInteractions(edsUpdateCallback, loadReportClient);
// Simulate an EDS error response.
serverResponseWriter.onError(Status.ABORTED.asException());
verify(edsUpdateCallback).onError();
verifyNoMoreInteractions(edsUpdateCallback, loadReportClient);
verify(localityStore, times(1)).updateOobMetricsReportInterval(anyLong()); // only once
lookasideLb.shutdown();
}
}

View File

@ -40,7 +40,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.xds.LookasideChannelLb.LookasideChannelCallback;
import io.grpc.xds.LookasideLb.EdsUpdateCallback;
import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory;
import java.util.ArrayList;
import java.util.List;
@ -75,7 +75,7 @@ public class XdsLoadBalancer2Test {
@Mock
private Helper helper;
private LoadBalancer xdsLoadBalancer;
private LookasideChannelCallback lookasideChannelCallback;
private EdsUpdateCallback edsUpdateCallback;
private Helper lookasideLbHelper;
private final List<LoadBalancer> lookasideLbs = new ArrayList<>();
@ -90,10 +90,10 @@ public class XdsLoadBalancer2Test {
LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() {
@Override
public LoadBalancer newLoadBalancer(
Helper helper, LookasideChannelCallback lookasideChannelCallback) {
Helper helper, EdsUpdateCallback edsUpdateCallback) {
// just return a mock and record the input and output
lookasideLbHelper = helper;
XdsLoadBalancer2Test.this.lookasideChannelCallback = lookasideChannelCallback;
XdsLoadBalancer2Test.this.edsUpdateCallback = edsUpdateCallback;
LoadBalancer lookasideLb = mock(LoadBalancer.class);
lookasideLbs.add(lookasideLb);
return lookasideLb;
@ -142,7 +142,7 @@ public class XdsLoadBalancer2Test {
public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() {
verifyNotInFallbackMode();
fakeClock.forwardTime(9, TimeUnit.SECONDS);
lookasideChannelCallback.onWorking();
edsUpdateCallback.onWorking();
verifyNotInFallbackMode();
fakeClock.forwardTime(1, TimeUnit.SECONDS);
verifyInFallbackMode();
@ -162,7 +162,7 @@ public class XdsLoadBalancer2Test {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
lookasideChannelCallback.onWorking();
edsUpdateCallback.onWorking();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
lookasideLbHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(READY, subchannelPicker);
@ -177,7 +177,7 @@ public class XdsLoadBalancer2Test {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
lookasideChannelCallback.onAllDrop();
edsUpdateCallback.onAllDrop();
assertThat(fakeClock.getPendingTasks()).isEmpty();
verifyNotInFallbackMode();
@ -189,7 +189,7 @@ public class XdsLoadBalancer2Test {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
lookasideChannelCallback.onError();
edsUpdateCallback.onError();
verifyInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
@ -199,8 +199,8 @@ public class XdsLoadBalancer2Test {
public void lookasideChannelSeeingEdsResponseThenFailsBeforeTimeoutAtStartup() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
lookasideChannelCallback.onWorking();
lookasideChannelCallback.onError();
edsUpdateCallback.onWorking();
edsUpdateCallback.onError();
verifyNotInFallbackMode();
fakeClock.forwardTime(10, TimeUnit.SECONDS);
@ -221,7 +221,7 @@ public class XdsLoadBalancer2Test {
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
lookasideChannelCallback.onError();
edsUpdateCallback.onError();
LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs);
verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses));
}