mirror of https://github.com/grpc/grpc-java.git
xds: implement xDS circuit breaking max_requests (#7517)
Implemented xDS circuit breaking for the maximum number of requests can be in-flight. The threshold is retrieved from CDS responses and is configured at the cluster level. It is implemented by wrapping the Picker spawned by EDS LB policy (which resolves endpoints for a single cluster) with stream-limiting logic. That is, when the picker is trying to create a new stream (aka, a new call), it is controlled by the number of open streams created by the current EDS LB policy. RPCs dropped by circuit breakers are recorded into total number of drops at cluster level and will be reported to TD via LRS. In the future, multiple gRPC channels can be load balancing requests to the same (global) cluster. Those request should share the same quota for maximum number of requests can be in-flight. We will use a global counter for aggregating the number of currently-in-flight requests per cluster.
This commit is contained in:
parent
7009c1a863
commit
47d1488373
|
|
@ -218,6 +218,7 @@ final class CdsLoadBalancer extends LoadBalancer {
|
|||
/* clusterName = */ newUpdate.getClusterName(),
|
||||
/* edsServiceName = */ newUpdate.getEdsServiceName(),
|
||||
/* lrsServerName = */ newUpdate.getLrsServerName(),
|
||||
/* maxConcurrentRequests = */ newUpdate.getMaxConcurrentRequests(),
|
||||
new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */),
|
||||
new PolicySelection(endpointPickingPolicyProvider, null));
|
||||
if (isXdsSecurityEnabled()) {
|
||||
|
|
|
|||
|
|
@ -25,11 +25,13 @@ import com.google.common.base.Supplier;
|
|||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
|
||||
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions;
|
||||
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
|
||||
import io.envoyproxy.envoy.config.listener.v3.Listener;
|
||||
|
|
@ -334,6 +336,17 @@ final class ClientXdsClient extends AbstractXdsClient {
|
|||
}
|
||||
updateBuilder.setLrsServerName("");
|
||||
}
|
||||
if (cluster.hasCircuitBreakers()) {
|
||||
List<Thresholds> thresholds = cluster.getCircuitBreakers().getThresholdsList();
|
||||
for (Thresholds threshold : thresholds) {
|
||||
if (threshold.getPriority() != RoutingPriority.DEFAULT) {
|
||||
continue;
|
||||
}
|
||||
if (threshold.hasMaxRequests()) {
|
||||
updateBuilder.setMaxConcurrentRequests(threshold.getMaxRequests().getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext =
|
||||
getTlsContextFromCluster(cluster);
|
||||
|
|
|
|||
|
|
@ -23,16 +23,20 @@ import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.util.ForwardingClientStreamTracer;
|
||||
import io.grpc.util.ForwardingLoadBalancerHelper;
|
||||
import io.grpc.util.GracefulSwitchLoadBalancer;
|
||||
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
|
||||
|
|
@ -57,9 +61,12 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
final class EdsLoadBalancer2 extends LoadBalancer {
|
||||
@VisibleForTesting
|
||||
static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
|
||||
private final XdsLogger logger;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final LoadBalancerRegistry lbRegistry;
|
||||
|
|
@ -96,8 +103,10 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
if (logger.isLoggable(XdsLogLevel.INFO)) {
|
||||
logger.log(XdsLogLevel.INFO, "Received EDS lb config: cluster={0}, "
|
||||
+ "eds_service_name={1}, endpoint_picking_policy={2}, report_load={3}",
|
||||
config.clusterName, config.edsServiceName,
|
||||
+ "eds_service_name={1}, max_concurrent_requests={2}, locality_picking_policy={3}, "
|
||||
+ "endpoint_picking_policy={4}, report_load={5}",
|
||||
config.clusterName, config.edsServiceName, config.maxConcurrentRequests,
|
||||
config.localityPickingPolicy.getProvider().getPolicyName(),
|
||||
config.endpointPickingPolicy.getProvider().getPolicyName(),
|
||||
config.lrsServerName != null);
|
||||
}
|
||||
|
|
@ -150,9 +159,10 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
}
|
||||
|
||||
private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher {
|
||||
private final AtomicLong requestCount = new AtomicLong();
|
||||
@Nullable
|
||||
private final LoadStatsStore loadStatsStore;
|
||||
private final DropHandlingLbHelper lbHelper;
|
||||
private final RequestLimitingLbHelper lbHelper;
|
||||
private List<EquivalentAddressGroup> endpointAddresses = Collections.emptyList();
|
||||
private Map<Integer, Map<Locality, Integer>> prioritizedLocalityWeights
|
||||
= Collections.emptyMap();
|
||||
|
|
@ -169,7 +179,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
} else {
|
||||
loadStatsStore = null;
|
||||
}
|
||||
lbHelper = new DropHandlingLbHelper(helper);
|
||||
lbHelper = new RequestLimitingLbHelper(helper);
|
||||
logger.log(
|
||||
XdsLogLevel.INFO,
|
||||
"Start endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient);
|
||||
|
|
@ -180,6 +190,9 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
this.resolvedAddresses = resolvedAddresses;
|
||||
EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
if (config.maxConcurrentRequests != null) {
|
||||
lbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
|
||||
}
|
||||
if (lb != null) {
|
||||
if (!config.localityPickingPolicy.equals(localityPickingPolicy)
|
||||
|| !config.endpointPickingPolicy.equals(endpointPickingPolicy)) {
|
||||
|
|
@ -355,38 +368,20 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
}
|
||||
|
||||
private final class DropHandlingLbHelper extends ForwardingLoadBalancerHelper {
|
||||
private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper {
|
||||
private final Helper helper;
|
||||
private List<DropOverload> dropPolicies = Collections.emptyList();
|
||||
private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
|
||||
|
||||
private DropHandlingLbHelper(Helper helper) {
|
||||
private RequestLimitingLbHelper(Helper helper) {
|
||||
this.helper = helper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBalancingState(
|
||||
ConnectivityState newState, final SubchannelPicker newPicker) {
|
||||
SubchannelPicker picker = new SubchannelPicker() {
|
||||
List<DropOverload> dropOverloads = dropPolicies;
|
||||
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
for (DropOverload dropOverload : dropOverloads) {
|
||||
int rand = random.nextInt(1_000_000);
|
||||
if (rand < dropOverload.getDropsPerMillion()) {
|
||||
logger.log(
|
||||
XdsLogLevel.INFO,
|
||||
"Drop request with category: {0}", dropOverload.getCategory());
|
||||
if (loadStatsStore != null) {
|
||||
loadStatsStore.recordDroppedRequest(dropOverload.getCategory());
|
||||
}
|
||||
return PickResult.withDrop(
|
||||
Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory()));
|
||||
}
|
||||
}
|
||||
return newPicker.pickSubchannel(args);
|
||||
}
|
||||
};
|
||||
SubchannelPicker picker = new RequestLimitingSubchannelPicker(
|
||||
newPicker, dropPolicies, maxConcurrentRequests);
|
||||
delegate().updateBalancingState(newState, picker);
|
||||
}
|
||||
|
||||
|
|
@ -398,7 +393,97 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
|||
private void updateDropPolicies(List<DropOverload> dropOverloads) {
|
||||
dropPolicies = dropOverloads;
|
||||
}
|
||||
|
||||
private void updateMaxConcurrentRequests(long maxConcurrentRequests) {
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
}
|
||||
|
||||
private final class RequestLimitingSubchannelPicker extends SubchannelPicker {
|
||||
private final SubchannelPicker delegate;
|
||||
private final List<DropOverload> dropPolicies;
|
||||
private final long maxConcurrentRequests;
|
||||
|
||||
private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
|
||||
List<DropOverload> dropPolicies, long maxConcurrentRequests) {
|
||||
this.delegate = delegate;
|
||||
this.dropPolicies = dropPolicies;
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
for (DropOverload dropOverload : dropPolicies) {
|
||||
int rand = random.nextInt(1_000_000);
|
||||
if (rand < dropOverload.getDropsPerMillion()) {
|
||||
logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
|
||||
dropOverload.getCategory());
|
||||
if (loadStatsStore != null) {
|
||||
loadStatsStore.recordDroppedRequest(dropOverload.getCategory());
|
||||
}
|
||||
return PickResult.withDrop(
|
||||
Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory()));
|
||||
}
|
||||
}
|
||||
PickResult result = delegate.pickSubchannel(args);
|
||||
if (result.getStatus().isOk() && result.getSubchannel() != null) {
|
||||
if (requestCount.get() >= maxConcurrentRequests) {
|
||||
if (loadStatsStore != null) {
|
||||
loadStatsStore.recordDroppedRequest();
|
||||
}
|
||||
return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
|
||||
"Cluster max concurrent requests limit exceeded"));
|
||||
} else {
|
||||
ClientStreamTracer.Factory tracerFactory = new RequestCountingStreamTracerFactory(
|
||||
result.getStreamTracerFactory(), requestCount);
|
||||
return PickResult.withSubchannel(result.getSubchannel(), tracerFactory);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Counts the number of outstanding requests.
|
||||
*/
|
||||
private static final class RequestCountingStreamTracerFactory
|
||||
extends ClientStreamTracer.Factory {
|
||||
@Nullable
|
||||
private final ClientStreamTracer.Factory delegate;
|
||||
private final AtomicLong counter;
|
||||
|
||||
private RequestCountingStreamTracerFactory(@Nullable ClientStreamTracer.Factory delegate,
|
||||
AtomicLong counter) {
|
||||
this.delegate = delegate;
|
||||
this.counter = counter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||
counter.incrementAndGet();
|
||||
if (delegate == null) {
|
||||
return new ClientStreamTracer() {
|
||||
@Override
|
||||
public void streamClosed(Status status) {
|
||||
counter.decrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
|
||||
return new ForwardingClientStreamTracer() {
|
||||
@Override
|
||||
protected ClientStreamTracer delegate() {
|
||||
return delegatedTracer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamClosed(Status status) {
|
||||
counter.decrementAndGet();
|
||||
delegate().streamClosed(status);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -68,6 +68,8 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider {
|
|||
final String edsServiceName;
|
||||
@Nullable
|
||||
final String lrsServerName;
|
||||
@Nullable
|
||||
final Long maxConcurrentRequests;
|
||||
final PolicySelection localityPickingPolicy;
|
||||
final PolicySelection endpointPickingPolicy;
|
||||
|
||||
|
|
@ -75,11 +77,13 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider {
|
|||
String clusterName,
|
||||
@Nullable String edsServiceName,
|
||||
@Nullable String lrsServerName,
|
||||
@Nullable Long maxConcurrentRequests,
|
||||
PolicySelection localityPickingPolicy,
|
||||
PolicySelection endpointPickingPolicy) {
|
||||
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||
this.edsServiceName = edsServiceName;
|
||||
this.lrsServerName = lrsServerName;
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy");
|
||||
this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy");
|
||||
}
|
||||
|
|
@ -90,6 +94,7 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider {
|
|||
.add("clusterName", clusterName)
|
||||
.add("edsServiceName", edsServiceName)
|
||||
.add("lrsServerName", lrsServerName)
|
||||
.add("maxConcurrentRequests", maxConcurrentRequests)
|
||||
.add("localityPickingPolicy", localityPickingPolicy)
|
||||
.add("endpointPickingPolicy", endpointPickingPolicy)
|
||||
.toString();
|
||||
|
|
|
|||
|
|
@ -153,10 +153,17 @@ final class LoadStatsManager {
|
|||
void removeLocality(Locality locality);
|
||||
|
||||
/**
|
||||
* Records a drop decision.
|
||||
* Records a drop decision with the given category.
|
||||
*
|
||||
* <p>This method must be thread-safe.
|
||||
*/
|
||||
void recordDroppedRequest(String category);
|
||||
|
||||
/**
|
||||
* Records a uncategorized drop decision.
|
||||
*
|
||||
* <p>This method must be thread-safe.
|
||||
*/
|
||||
void recordDroppedRequest();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,25 +55,21 @@ final class LoadStatsStoreImpl implements LoadStatsStore {
|
|||
@GuardedBy("this")
|
||||
private final Map<Locality, ReferenceCounted<ClientLoadCounter>> localityLoadCounters
|
||||
= new HashMap<>();
|
||||
private final AtomicLong uncategorizedDrops = new AtomicLong();
|
||||
// Cluster level dropped request counts for each category decision.
|
||||
private final ConcurrentMap<String, AtomicLong> dropCounters;
|
||||
private final ConcurrentMap<String, AtomicLong> dropCounters = new ConcurrentHashMap<>();
|
||||
private final Stopwatch stopwatch;
|
||||
|
||||
LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName) {
|
||||
this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get(),
|
||||
new ConcurrentHashMap<String, AtomicLong>());
|
||||
this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LoadStatsStoreImpl(
|
||||
String clusterName,
|
||||
@Nullable String clusterServiceName,
|
||||
Stopwatch stopwatch,
|
||||
ConcurrentMap<String, AtomicLong> dropCounters) {
|
||||
LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName,
|
||||
Stopwatch stopwatch) {
|
||||
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||
this.clusterServiceName = clusterServiceName;
|
||||
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
|
||||
this.dropCounters = checkNotNull(dropCounters, "dropCounters");
|
||||
stopwatch.reset().start();
|
||||
}
|
||||
|
||||
|
|
@ -109,7 +105,7 @@ final class LoadStatsStoreImpl implements LoadStatsStore {
|
|||
}
|
||||
}
|
||||
localityLoadCounters.keySet().removeAll(untrackedLocalities);
|
||||
long totalDrops = 0;
|
||||
long totalDrops = uncategorizedDrops.getAndSet(0);
|
||||
for (Map.Entry<String, AtomicLong> entry : dropCounters.entrySet()) {
|
||||
long drops = entry.getValue().getAndSet(0);
|
||||
totalDrops += drops;
|
||||
|
|
@ -150,6 +146,11 @@ final class LoadStatsStoreImpl implements LoadStatsStore {
|
|||
counter.getAndIncrement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest() {
|
||||
uncategorizedDrops.getAndIncrement();
|
||||
}
|
||||
|
||||
static LoadStatsStoreFactory getDefaultFactory() {
|
||||
return new LoadStatsStoreFactory() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -199,6 +199,8 @@ abstract class XdsClient {
|
|||
private final String lbPolicy;
|
||||
@Nullable
|
||||
private final String lrsServerName;
|
||||
@Nullable
|
||||
private final Long maxConcurrentRequests;
|
||||
private final UpstreamTlsContext upstreamTlsContext;
|
||||
|
||||
private CdsUpdate(
|
||||
|
|
@ -206,11 +208,13 @@ abstract class XdsClient {
|
|||
@Nullable String edsServiceName,
|
||||
String lbPolicy,
|
||||
@Nullable String lrsServerName,
|
||||
@Nullable Long maxConcurrentRequests,
|
||||
@Nullable UpstreamTlsContext upstreamTlsContext) {
|
||||
this.clusterName = clusterName;
|
||||
this.edsServiceName = edsServiceName;
|
||||
this.lbPolicy = lbPolicy;
|
||||
this.lrsServerName = lrsServerName;
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
this.upstreamTlsContext = upstreamTlsContext;
|
||||
}
|
||||
|
||||
|
|
@ -243,6 +247,15 @@ abstract class XdsClient {
|
|||
return lrsServerName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the maximum number of outstanding requests can be made to the upstream cluster, or
|
||||
* {@code null} if not configured.
|
||||
*/
|
||||
@Nullable
|
||||
Long getMaxConcurrentRequests() {
|
||||
return maxConcurrentRequests;
|
||||
}
|
||||
|
||||
/** Returns the {@link UpstreamTlsContext} for this cluster if present, else null. */
|
||||
@Nullable
|
||||
UpstreamTlsContext getUpstreamTlsContext() {
|
||||
|
|
@ -258,14 +271,15 @@ abstract class XdsClient {
|
|||
.add("edsServiceName", edsServiceName)
|
||||
.add("lbPolicy", lbPolicy)
|
||||
.add("lrsServerName", lrsServerName)
|
||||
.add("maxConcurrentRequests", maxConcurrentRequests)
|
||||
.add("upstreamTlsContext", upstreamTlsContext)
|
||||
.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(
|
||||
clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
|
||||
return Objects.hash(clusterName, edsServiceName, lbPolicy, lrsServerName,
|
||||
maxConcurrentRequests, upstreamTlsContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -281,6 +295,7 @@ abstract class XdsClient {
|
|||
&& Objects.equals(edsServiceName, that.edsServiceName)
|
||||
&& Objects.equals(lbPolicy, that.lbPolicy)
|
||||
&& Objects.equals(lrsServerName, that.lrsServerName)
|
||||
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
|
||||
&& Objects.equals(upstreamTlsContext, that.upstreamTlsContext);
|
||||
}
|
||||
|
||||
|
|
@ -296,6 +311,8 @@ abstract class XdsClient {
|
|||
@Nullable
|
||||
private String lrsServerName;
|
||||
@Nullable
|
||||
private Long maxConcurrentRequests;
|
||||
@Nullable
|
||||
private UpstreamTlsContext upstreamTlsContext;
|
||||
|
||||
private Builder() {
|
||||
|
|
@ -321,6 +338,11 @@ abstract class XdsClient {
|
|||
return this;
|
||||
}
|
||||
|
||||
Builder setMaxConcurrentRequests(long maxConcurrentRequests) {
|
||||
this.maxConcurrentRequests = maxConcurrentRequests;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setUpstreamTlsContext(UpstreamTlsContext upstreamTlsContext) {
|
||||
this.upstreamTlsContext = upstreamTlsContext;
|
||||
return this;
|
||||
|
|
@ -330,9 +352,8 @@ abstract class XdsClient {
|
|||
checkState(clusterName != null, "clusterName is not set");
|
||||
checkState(lbPolicy != null, "lbPolicy is not set");
|
||||
|
||||
return
|
||||
new CdsUpdate(
|
||||
clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
|
||||
return new CdsUpdate(clusterName, edsServiceName, lbPolicy, lrsServerName,
|
||||
maxConcurrentRequests, upstreamTlsContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void receiveFirstClusterResourceInfo() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.EDS_POLICY_NAME);
|
||||
|
|
@ -136,6 +136,7 @@ public class CdsLoadBalancerTest {
|
|||
assertThat(edsConfig.clusterName).isEqualTo(CLUSTER);
|
||||
assertThat(edsConfig.edsServiceName).isNull();
|
||||
assertThat(edsConfig.lrsServerName).isNull();
|
||||
assertThat(edsConfig.maxConcurrentRequests).isNull();
|
||||
assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName())
|
||||
.isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target
|
||||
assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName())
|
||||
|
|
@ -155,7 +156,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void clusterResourceRemoved() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(childBalancer.shutdown).isFalse();
|
||||
|
|
@ -171,7 +172,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void clusterResourceUpdated() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
EdsConfig edsConfig = (EdsConfig) childBalancer.config;
|
||||
assertThat(edsConfig.clusterName).isEqualTo(CLUSTER);
|
||||
|
|
@ -184,12 +185,14 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
String edsService = "service-bar.googleapis.com";
|
||||
String loadReportServer = "lrs-server.googleapis.com";
|
||||
xdsClient.deliverClusterInfo(edsService, loadReportServer);
|
||||
long maxConcurrentRequests = 50L;
|
||||
xdsClient.deliverClusterInfo(edsService, loadReportServer, maxConcurrentRequests, null);
|
||||
assertThat(childBalancers).containsExactly(childBalancer);
|
||||
edsConfig = (EdsConfig) childBalancer.config;
|
||||
assertThat(edsConfig.clusterName).isEqualTo(CLUSTER);
|
||||
assertThat(edsConfig.edsServiceName).isEqualTo(edsService);
|
||||
assertThat(edsConfig.lrsServerName).isEqualTo(loadReportServer);
|
||||
assertThat(edsConfig.maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
|
||||
assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName())
|
||||
.isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target
|
||||
assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName())
|
||||
|
|
@ -218,7 +221,7 @@ public class CdsLoadBalancerTest {
|
|||
assertThat(supplier.getUpstreamTlsContext()).isEqualTo(upstreamTlsContext);
|
||||
}
|
||||
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
subchannel = childBalancer.helper.createSubchannel(args);
|
||||
for (EquivalentAddressGroup eag : subchannel.getAllAddresses()) {
|
||||
assertThat(eag.getAttributes().get(XdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER))
|
||||
|
|
@ -241,7 +244,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void subchannelStatePropagateFromDownstreamToUpstream() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
List<EquivalentAddressGroup> addresses = createEndpointAddresses(2);
|
||||
CreateSubchannelArgs args =
|
||||
|
|
@ -267,7 +270,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void clusterDiscoveryError_afterChildPolicyInstantiated_keepUsingCurrentCluster() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
xdsClient.deliverError(Status.UNAVAILABLE.withDescription("unreachable"));
|
||||
assertThat(currentState).isNull();
|
||||
|
|
@ -288,7 +291,7 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
@Test
|
||||
public void nameResolutionError_afterChildPolicyInstantiated_propagateToDownstream() {
|
||||
xdsClient.deliverClusterInfo(null, null);
|
||||
xdsClient.deliverClusterInfo(null, null, null);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
loadBalancer.handleNameResolutionError(
|
||||
Status.UNAVAILABLE.withDescription("cannot reach server"));
|
||||
|
|
@ -327,35 +330,46 @@ public class CdsLoadBalancerTest {
|
|||
}
|
||||
|
||||
void deliverClusterInfo(
|
||||
@Nullable final String edsServiceName, @Nullable final String lrsServerName) {
|
||||
@Nullable final String edsServiceName, @Nullable final String lrsServerName,
|
||||
final long maxConcurrentRequests, @Nullable final UpstreamTlsContext tlsContext) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
watcher.onChanged(
|
||||
CdsUpdate.newBuilder()
|
||||
.setClusterName(CLUSTER)
|
||||
.setEdsServiceName(edsServiceName)
|
||||
.setLbPolicy("round_robin") // only supported policy
|
||||
.setLrsServerName(lrsServerName)
|
||||
.build());
|
||||
CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER);
|
||||
if (edsServiceName != null) {
|
||||
updateBuilder.setEdsServiceName(edsServiceName);
|
||||
}
|
||||
if (lrsServerName != null) {
|
||||
updateBuilder.setLrsServerName(lrsServerName);
|
||||
}
|
||||
if (tlsContext != null) {
|
||||
updateBuilder.setUpstreamTlsContext(tlsContext);
|
||||
}
|
||||
updateBuilder.setLbPolicy("round_robin"); // only supported policy
|
||||
updateBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
|
||||
watcher.onChanged(updateBuilder.build());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void deliverClusterInfo(
|
||||
@Nullable final String edsServiceName, @Nullable final String lrsServerName,
|
||||
final UpstreamTlsContext tlsContext) {
|
||||
@Nullable final UpstreamTlsContext tlsContext) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
watcher.onChanged(
|
||||
CdsUpdate.newBuilder()
|
||||
.setClusterName(CLUSTER)
|
||||
.setEdsServiceName(edsServiceName)
|
||||
.setLbPolicy("round_robin") // only supported policy
|
||||
.setLrsServerName(lrsServerName)
|
||||
.setUpstreamTlsContext(tlsContext)
|
||||
.build());
|
||||
CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER);
|
||||
if (edsServiceName != null) {
|
||||
updateBuilder.setEdsServiceName(edsServiceName);
|
||||
}
|
||||
if (lrsServerName != null) {
|
||||
updateBuilder.setLrsServerName(lrsServerName);
|
||||
}
|
||||
if (tlsContext != null) {
|
||||
updateBuilder.setUpstreamTlsContext(tlsContext);
|
||||
}
|
||||
updateBuilder.setLbPolicy("round_robin"); // only supported policy
|
||||
watcher.onChanged(updateBuilder.build());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,10 +42,15 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.UInt32Value;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource;
|
||||
import io.envoyproxy.envoy.config.core.v3.ConfigSource;
|
||||
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
|
||||
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy;
|
||||
import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats;
|
||||
|
|
@ -735,9 +740,42 @@ public class ClientXdsClientTest {
|
|||
assertThat(cdsUpdate.getEdsServiceName()).isNull();
|
||||
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.getLrsServerName()).isNull();
|
||||
assertThat(cdsUpdate.getMaxConcurrentRequests()).isNull();
|
||||
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cdsResponseWithCircuitBreakers() {
|
||||
RpcCall<DiscoveryRequest, DiscoveryResponse> call =
|
||||
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
|
||||
Cluster cluster = buildCluster(CDS_RESOURCE, null, false);
|
||||
cluster = cluster.toBuilder()
|
||||
.setCircuitBreakers(
|
||||
CircuitBreakers.newBuilder()
|
||||
.addThresholds(
|
||||
Thresholds.newBuilder()
|
||||
.setPriority(RoutingPriority.HIGH)
|
||||
.setMaxRequests(UInt32Value.newBuilder().setValue(50)))
|
||||
.addThresholds(
|
||||
Thresholds.newBuilder()
|
||||
.setPriority(RoutingPriority.DEFAULT)
|
||||
.setMaxRequests(UInt32Value.newBuilder().setValue(200))))
|
||||
.build();
|
||||
DiscoveryResponse response = buildDiscoveryResponse("0",
|
||||
Collections.singletonList(Any.pack(cluster)), ResourceType.CDS.typeUrl(), "0000");
|
||||
call.responseObserver.onNext(response);
|
||||
|
||||
verify(call.requestObserver).onNext(
|
||||
eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000")));
|
||||
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
|
||||
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
|
||||
assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.getEdsServiceName()).isNull();
|
||||
assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.getLrsServerName()).isNull();
|
||||
assertThat(cdsUpdate.getMaxConcurrentRequests()).isEqualTo(200L);
|
||||
}
|
||||
|
||||
/**
|
||||
* CDS response containing UpstreamTlsContext for a cluster.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.ConnectivityState;
|
||||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.LoadBalancer;
|
||||
|
|
@ -40,6 +41,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
|
|||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
|
|
@ -48,6 +50,7 @@ import io.grpc.internal.FakeClock;
|
|||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
|
||||
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
|
||||
import io.grpc.xds.EnvoyProtoData.ClusterStats;
|
||||
import io.grpc.xds.EnvoyProtoData.DropOverload;
|
||||
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
|
||||
import io.grpc.xds.EnvoyProtoData.Locality;
|
||||
|
|
@ -65,10 +68,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.After;
|
||||
|
|
@ -145,7 +145,7 @@ public class EdsLoadBalancer2Test {
|
|||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(
|
||||
CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, roundRobin))
|
||||
CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget, roundRobin))
|
||||
.build());
|
||||
}
|
||||
|
||||
|
|
@ -153,7 +153,7 @@ public class EdsLoadBalancer2Test {
|
|||
public void tearDown() {
|
||||
loadBalancer.shutdown();
|
||||
assertThat(xdsClient.watchers).isEmpty();
|
||||
assertThat(xdsClient.dropStats).isEmpty();
|
||||
assertThat(xdsClient.clusterStats).isEmpty();
|
||||
assertThat(xdsClientRefs).isEqualTo(0);
|
||||
assertThat(downstreamBalancers).isEmpty();
|
||||
}
|
||||
|
|
@ -430,7 +430,17 @@ public class EdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void handleDrops() {
|
||||
FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin");
|
||||
prepareRealDownstreamLbPolicies(fakeRoundRobinProvider);
|
||||
PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null);
|
||||
PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies();
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
|
||||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null,
|
||||
weightedTargetSelection, fakeRoundRobinSelection))
|
||||
.build());
|
||||
when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000);
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
|
|
@ -452,13 +462,109 @@ public class EdsLoadBalancer2Test {
|
|||
assertThat(result.getStatus().isOk()).isFalse();
|
||||
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: throttle");
|
||||
assertThat(xdsClient.dropStats.get(EDS_SERVICE_NAME).get("throttle").get()).isEqualTo(1);
|
||||
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle"))
|
||||
.isEqualTo(1);
|
||||
|
||||
result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
|
||||
assertThat(result.getStatus().isOk()).isTrue();
|
||||
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxConcurrentRequests_appliedByLbConfig() {
|
||||
long maxConcurrentRequests = 100L;
|
||||
FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin");
|
||||
PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null);
|
||||
PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies();
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
|
||||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests,
|
||||
weightedTargetSelection, fakeRoundRobinSelection))
|
||||
.build());
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true));
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1));
|
||||
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
|
||||
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(leafBalancer.name).isEqualTo("round_robin");
|
||||
assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")),
|
||||
leafBalancer.addresses);
|
||||
Subchannel subchannel = leafBalancer.helper.createSubchannel(
|
||||
CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
|
||||
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
|
||||
assertThat(currentState).isEqualTo(ConnectivityState.READY);
|
||||
for (int i = 0; i < maxConcurrentRequests; i++) {
|
||||
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
|
||||
assertThat(result.getStatus().isOk()).isTrue();
|
||||
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
|
||||
assertThat(result.getStreamTracerFactory()).isNotNull();
|
||||
ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
|
||||
streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(),
|
||||
new Metadata());
|
||||
}
|
||||
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L);
|
||||
|
||||
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
|
||||
assertThat(result.getStatus().isOk()).isFalse();
|
||||
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(result.getStatus().getDescription())
|
||||
.isEqualTo("Cluster max concurrent requests limit exceeded");
|
||||
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxConcurrentRequests_appliedWithDefaultValue() {
|
||||
FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin");
|
||||
PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null);
|
||||
PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies();
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
|
||||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null,
|
||||
weightedTargetSelection, fakeRoundRobinSelection))
|
||||
.build());
|
||||
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
|
||||
LocalityLbEndpoints localityLbEndpoints1 =
|
||||
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true));
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1));
|
||||
assertThat(downstreamBalancers).hasSize(1); // one leaf balancer
|
||||
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(leafBalancer.name).isEqualTo("round_robin");
|
||||
assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")),
|
||||
leafBalancer.addresses);
|
||||
Subchannel subchannel = leafBalancer.helper.createSubchannel(
|
||||
CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
|
||||
leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
|
||||
assertThat(currentState).isEqualTo(ConnectivityState.READY);
|
||||
for (int i = 0; i < EdsLoadBalancer2.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) {
|
||||
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
|
||||
assertThat(result.getStatus().isOk()).isTrue();
|
||||
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
|
||||
assertThat(result.getStreamTracerFactory()).isNotNull();
|
||||
ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
|
||||
streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(),
|
||||
new Metadata());
|
||||
}
|
||||
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L);
|
||||
|
||||
PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class));
|
||||
assertThat(result.getStatus().isOk()).isFalse();
|
||||
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(result.getStatus().getDescription())
|
||||
.isEqualTo("Cluster max concurrent requests limit exceeded");
|
||||
assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void configUpdate_changeEdsServiceName_afterChildPolicyReady_switchGracefully() {
|
||||
deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB polices instantiated
|
||||
|
|
@ -476,8 +582,8 @@ public class EdsLoadBalancer2Test {
|
|||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(
|
||||
CLUSTER, newEdsServiceName, LRS_SERVER_NAME, weightedTarget, roundRobin))
|
||||
new EdsConfig(CLUSTER, newEdsServiceName, LRS_SERVER_NAME, null, weightedTarget,
|
||||
roundRobin))
|
||||
.build());
|
||||
deliverSimpleClusterLoadAssignment(newEdsServiceName); // instantiate the new subtree
|
||||
assertThat(downstreamBalancers).hasSize(2);
|
||||
|
|
@ -495,7 +601,17 @@ public class EdsLoadBalancer2Test {
|
|||
@Test
|
||||
public void configUpdate_changeEndpointPickingPolicy() {
|
||||
FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin");
|
||||
prepareRealDownstreamLbPolicies(fakeRoundRobinProvider);
|
||||
PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null);
|
||||
PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies();
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
|
||||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null,
|
||||
weightedTargetSelection, fakeRoundRobinSelection))
|
||||
.build());
|
||||
deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB policies instantiated
|
||||
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
assertThat(leafBalancer.name).isEqualTo("round_robin");
|
||||
|
|
@ -507,8 +623,8 @@ public class EdsLoadBalancer2Test {
|
|||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(
|
||||
CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, fakePickFirstSelection))
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget,
|
||||
fakePickFirstSelection))
|
||||
.build());
|
||||
assertThat(leafBalancer.shutdown).isTrue();
|
||||
leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
|
||||
|
|
@ -645,27 +761,15 @@ public class EdsLoadBalancer2Test {
|
|||
}
|
||||
|
||||
/**
|
||||
* Instantiates the downstream LB policy subtree with real implementations, except the leaf
|
||||
* policy is replaced with a fake implementation to avoid creating connections.
|
||||
* Prepare the LB registry with real LB policy implementations for downstream LB policies.
|
||||
*/
|
||||
private void prepareRealDownstreamLbPolicies(FakeLoadBalancerProvider fakeLeafPolicyProvider) {
|
||||
private PolicySelection prepareRealDownstreamLbPolicies() {
|
||||
registry.deregister(registry.getProvider(PRIORITY_POLICY_NAME));
|
||||
registry.register(new PriorityLoadBalancerProvider());
|
||||
registry.deregister(registry.getProvider(LRS_POLICY_NAME));
|
||||
registry.register(new LrsLoadBalancerProvider());
|
||||
PolicySelection weightedTargetSelection =
|
||||
new PolicySelection(new WeightedTargetLoadBalancerProvider(), null);
|
||||
PolicySelection fakeLeafPolicySelection =
|
||||
new PolicySelection(fakeLeafPolicyProvider, null);
|
||||
loadBalancer.handleResolvedAddresses(
|
||||
ResolvedAddresses.newBuilder()
|
||||
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
|
||||
.setAttributes(
|
||||
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
|
||||
.setLoadBalancingPolicyConfig(
|
||||
new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTargetSelection,
|
||||
fakeLeafPolicySelection))
|
||||
.build());
|
||||
// weighted_target LB policy is not required to be in the registry
|
||||
return new PolicySelection(new WeightedTargetLoadBalancerProvider(), null);
|
||||
}
|
||||
|
||||
private static void assertLrsConfig(
|
||||
|
|
@ -733,7 +837,7 @@ public class EdsLoadBalancer2Test {
|
|||
|
||||
private final class FakeXdsClient extends XdsClient {
|
||||
private final Map<String, EdsResourceWatcher> watchers = new HashMap<>();
|
||||
private final Map<String, ConcurrentMap<String, AtomicLong>> dropStats = new HashMap<>();
|
||||
private final Map<String, FakeLoadStatsStore> clusterStats = new HashMap<>();
|
||||
|
||||
@Override
|
||||
void shutdown() {
|
||||
|
|
@ -752,15 +856,14 @@ public class EdsLoadBalancer2Test {
|
|||
|
||||
@Override
|
||||
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||
ConcurrentMap<String, AtomicLong> dropCounters = new ConcurrentHashMap<>();
|
||||
dropStats.put(clusterServiceName, dropCounters);
|
||||
return new LoadStatsStoreImpl(clusterName, clusterServiceName,
|
||||
fakeClock.getStopwatchSupplier().get(), dropCounters);
|
||||
FakeLoadStatsStore stats = new FakeLoadStatsStore();
|
||||
clusterStats.put(clusterServiceName, stats);
|
||||
return stats;
|
||||
}
|
||||
|
||||
@Override
|
||||
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||
dropStats.remove(clusterServiceName);
|
||||
clusterStats.remove(clusterServiceName);
|
||||
}
|
||||
|
||||
void deliverClusterLoadAssignment(
|
||||
|
|
@ -812,6 +915,41 @@ public class EdsLoadBalancer2Test {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class FakeLoadStatsStore implements LoadStatsStore {
|
||||
private final Map<String, Long> categorizedDrops = new HashMap<>();
|
||||
private int totalDrops;
|
||||
|
||||
@Override
|
||||
public ClusterStats generateLoadReport() {
|
||||
throw new UnsupportedOperationException("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientLoadCounter addLocality(Locality locality) {
|
||||
return new ClientLoadCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeLocality(Locality locality) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest(String category) {
|
||||
if (!categorizedDrops.containsKey(category)) {
|
||||
categorizedDrops.put(category, 1L);
|
||||
} else {
|
||||
categorizedDrops.put(category, categorizedDrops.get(category) + 1L);
|
||||
}
|
||||
totalDrops++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest() {
|
||||
totalDrops++;
|
||||
}
|
||||
}
|
||||
|
||||
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
|
||||
private final String policyName;
|
||||
|
||||
|
|
|
|||
|
|
@ -604,6 +604,11 @@ public class LoadReportClientTest {
|
|||
throw new UnsupportedOperationException("should not used");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest() {
|
||||
throw new UnsupportedOperationException("should not used");
|
||||
}
|
||||
|
||||
private void refresh() {
|
||||
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
|
||||
long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
|
||||
|
|
|
|||
|
|
@ -34,9 +34,6 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.Nullable;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -52,14 +49,12 @@ public class LoadStatsStoreImplTest {
|
|||
private static final Locality LOCALITY2 =
|
||||
new Locality("test_region2", "test_zone", "test_subzone");
|
||||
private final FakeClock fakeClock = new FakeClock();
|
||||
private ConcurrentMap<String, AtomicLong> dropCounters;
|
||||
private LoadStatsStore loadStatsStore;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
dropCounters = new ConcurrentHashMap<>();
|
||||
Stopwatch stopwatch = fakeClock.getStopwatchSupplier().get();
|
||||
loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch, dropCounters);
|
||||
loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch);
|
||||
}
|
||||
|
||||
private static List<EndpointLoadMetricStats> buildEndpointLoadMetricStatsList(
|
||||
|
|
@ -101,20 +96,20 @@ public class LoadStatsStoreImplTest {
|
|||
|
||||
private static ClusterStats buildClusterStats(
|
||||
@Nullable List<UpstreamLocalityStats> upstreamLocalityStatsList,
|
||||
@Nullable List<DroppedRequests> droppedRequestsList, long intervalNano) {
|
||||
@Nullable List<DroppedRequests> droppedRequestsList, long totalDroppedRequests,
|
||||
long intervalNano) {
|
||||
ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder();
|
||||
clusterStatsBuilder.setClusterName(CLUSTER_NAME);
|
||||
if (upstreamLocalityStatsList != null) {
|
||||
clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList);
|
||||
}
|
||||
if (droppedRequestsList != null) {
|
||||
long dropCount = 0;
|
||||
for (DroppedRequests drop : droppedRequestsList) {
|
||||
dropCount += drop.getDroppedCount();
|
||||
totalDroppedRequests += drop.getDroppedCount();
|
||||
clusterStatsBuilder.addDroppedRequests(drop);
|
||||
}
|
||||
clusterStatsBuilder.setTotalDroppedRequests(dropCount);
|
||||
}
|
||||
clusterStatsBuilder.setTotalDroppedRequests(totalDroppedRequests);
|
||||
clusterStatsBuilder.setLoadReportIntervalNanos(intervalNano);
|
||||
return clusterStatsBuilder.build();
|
||||
}
|
||||
|
|
@ -181,7 +176,7 @@ public class LoadStatsStoreImplTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void loadReportContainsRecordedStats() {
|
||||
public void recordCallAndMetricStats() {
|
||||
ClientLoadCounter counter1 = loadStatsStore.addLocality(LOCALITY1);
|
||||
counter1.setCallsSucceeded(4315);
|
||||
counter1.setCallsInProgress(3421);
|
||||
|
|
@ -218,7 +213,7 @@ public class LoadStatsStoreImplTest {
|
|||
buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702,
|
||||
buildEndpointLoadMetricStatsList(metrics2))
|
||||
),
|
||||
null, 1000L);
|
||||
null, 0L, 1000L);
|
||||
assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport());
|
||||
|
||||
fakeClock.forwardNanos(2000L);
|
||||
|
|
@ -228,31 +223,39 @@ public class LoadStatsStoreImplTest {
|
|||
buildUpstreamLocalityStats(LOCALITY1, 0, 3421, 0, 0, null),
|
||||
buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null)
|
||||
),
|
||||
null, 2000L);
|
||||
null, 0L, 2000L);
|
||||
assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void recordingDroppedRequests() {
|
||||
public void recordDroppedRequests() {
|
||||
int numLbDrop = 123;
|
||||
int numThrottleDrop = 456;
|
||||
int uncategorizedDrop = 789;
|
||||
for (int i = 0; i < numLbDrop; i++) {
|
||||
loadStatsStore.recordDroppedRequest("lb");
|
||||
}
|
||||
for (int i = 0; i < numThrottleDrop; i++) {
|
||||
loadStatsStore.recordDroppedRequest("throttle");
|
||||
}
|
||||
assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop);
|
||||
assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop);
|
||||
for (int i = 0; i < uncategorizedDrop; i++) {
|
||||
loadStatsStore.recordDroppedRequest();
|
||||
}
|
||||
|
||||
fakeClock.forwardNanos(1000L);
|
||||
ClusterStats expectedLoadReport =
|
||||
buildClusterStats(null,
|
||||
Arrays.asList(buildDroppedRequests("lb", numLbDrop),
|
||||
buildDroppedRequests("throttle", numThrottleDrop)),
|
||||
1000L);
|
||||
789L, 1000L);
|
||||
assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport());
|
||||
|
||||
fakeClock.forwardNanos(1000L);
|
||||
expectedLoadReport =
|
||||
buildClusterStats(null,
|
||||
Arrays.asList(buildDroppedRequests("lb", 0L),
|
||||
buildDroppedRequests("throttle", 0L)),
|
||||
0L, 1000L);
|
||||
assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport());
|
||||
assertThat(dropCounters.get("lb").get()).isEqualTo(0);
|
||||
assertThat(dropCounters.get("throttle").get()).isEqualTo(0);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -325,5 +325,10 @@ public class LrsLoadBalancerTest {
|
|||
public void recordDroppedRequest(String category) {
|
||||
throw new UnsupportedOperationException("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest() {
|
||||
throw new UnsupportedOperationException("should not be called");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue