From 47d1488373106c159ae6c863a9d046a0732f8ae0 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 2 Nov 2020 14:24:22 -0800 Subject: [PATCH] xds: implement xDS circuit breaking max_requests (#7517) Implemented xDS circuit breaking for the maximum number of requests can be in-flight. The threshold is retrieved from CDS responses and is configured at the cluster level. It is implemented by wrapping the Picker spawned by EDS LB policy (which resolves endpoints for a single cluster) with stream-limiting logic. That is, when the picker is trying to create a new stream (aka, a new call), it is controlled by the number of open streams created by the current EDS LB policy. RPCs dropped by circuit breakers are recorded into total number of drops at cluster level and will be reported to TD via LRS. In the future, multiple gRPC channels can be load balancing requests to the same (global) cluster. Those request should share the same quota for maximum number of requests can be in-flight. We will use a global counter for aggregating the number of currently-in-flight requests per cluster. --- .../java/io/grpc/xds/CdsLoadBalancer.java | 1 + .../java/io/grpc/xds/ClientXdsClient.java | 13 ++ .../java/io/grpc/xds/EdsLoadBalancer2.java | 139 +++++++++--- .../io/grpc/xds/EdsLoadBalancerProvider.java | 5 + .../java/io/grpc/xds/LoadStatsManager.java | 9 +- .../java/io/grpc/xds/LoadStatsStoreImpl.java | 23 +- xds/src/main/java/io/grpc/xds/XdsClient.java | 31 ++- .../java/io/grpc/xds/CdsLoadBalancerTest.java | 64 +++--- .../java/io/grpc/xds/ClientXdsClientTest.java | 38 ++++ .../io/grpc/xds/EdsLoadBalancer2Test.java | 206 +++++++++++++++--- .../io/grpc/xds/LoadReportClientTest.java | 5 + .../io/grpc/xds/LoadStatsStoreImplTest.java | 41 ++-- .../java/io/grpc/xds/LrsLoadBalancerTest.java | 5 + 13 files changed, 458 insertions(+), 122 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index 545e574de2..03f4cd804e 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -218,6 +218,7 @@ final class CdsLoadBalancer extends LoadBalancer { /* clusterName = */ newUpdate.getClusterName(), /* edsServiceName = */ newUpdate.getEdsServiceName(), /* lrsServerName = */ newUpdate.getLrsServerName(), + /* maxConcurrentRequests = */ newUpdate.getMaxConcurrentRequests(), new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */), new PolicySelection(endpointPickingPolicyProvider, null)); if (isXdsSecurityEnabled()) { diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 8ddbff0d55..e7b00f684c 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -25,11 +25,13 @@ import com.google.common.base.Supplier; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.listener.v3.Listener; @@ -334,6 +336,17 @@ final class ClientXdsClient extends AbstractXdsClient { } updateBuilder.setLrsServerName(""); } + if (cluster.hasCircuitBreakers()) { + List thresholds = cluster.getCircuitBreakers().getThresholdsList(); + for (Thresholds threshold : thresholds) { + if (threshold.getPriority() != RoutingPriority.DEFAULT) { + continue; + } + if (threshold.hasMaxRequests()) { + updateBuilder.setMaxConcurrentRequests(threshold.getMaxRequests().getValue()); + } + } + } try { EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = getTlsContextFromCluster(cluster); diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index 4ebd11f731..bfbeb34314 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -23,16 +23,20 @@ import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.ForwardingClientStreamTracer; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; @@ -57,9 +61,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; final class EdsLoadBalancer2 extends LoadBalancer { + @VisibleForTesting + static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L; private final XdsLogger logger; private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; @@ -96,8 +103,10 @@ final class EdsLoadBalancer2 extends LoadBalancer { EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); if (logger.isLoggable(XdsLogLevel.INFO)) { logger.log(XdsLogLevel.INFO, "Received EDS lb config: cluster={0}, " - + "eds_service_name={1}, endpoint_picking_policy={2}, report_load={3}", - config.clusterName, config.edsServiceName, + + "eds_service_name={1}, max_concurrent_requests={2}, locality_picking_policy={3}, " + + "endpoint_picking_policy={4}, report_load={5}", + config.clusterName, config.edsServiceName, config.maxConcurrentRequests, + config.localityPickingPolicy.getProvider().getPolicyName(), config.endpointPickingPolicy.getProvider().getPolicyName(), config.lrsServerName != null); } @@ -150,9 +159,10 @@ final class EdsLoadBalancer2 extends LoadBalancer { } private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher { + private final AtomicLong requestCount = new AtomicLong(); @Nullable private final LoadStatsStore loadStatsStore; - private final DropHandlingLbHelper lbHelper; + private final RequestLimitingLbHelper lbHelper; private List endpointAddresses = Collections.emptyList(); private Map> prioritizedLocalityWeights = Collections.emptyMap(); @@ -169,7 +179,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { } else { loadStatsStore = null; } - lbHelper = new DropHandlingLbHelper(helper); + lbHelper = new RequestLimitingLbHelper(helper); logger.log( XdsLogLevel.INFO, "Start endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient); @@ -180,6 +190,9 @@ final class EdsLoadBalancer2 extends LoadBalancer { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { this.resolvedAddresses = resolvedAddresses; EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + if (config.maxConcurrentRequests != null) { + lbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); + } if (lb != null) { if (!config.localityPickingPolicy.equals(localityPickingPolicy) || !config.endpointPickingPolicy.equals(endpointPickingPolicy)) { @@ -355,38 +368,20 @@ final class EdsLoadBalancer2 extends LoadBalancer { lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); } - private final class DropHandlingLbHelper extends ForwardingLoadBalancerHelper { + private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper { private final Helper helper; private List dropPolicies = Collections.emptyList(); + private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; - private DropHandlingLbHelper(Helper helper) { + private RequestLimitingLbHelper(Helper helper) { this.helper = helper; } @Override public void updateBalancingState( ConnectivityState newState, final SubchannelPicker newPicker) { - SubchannelPicker picker = new SubchannelPicker() { - List dropOverloads = dropPolicies; - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - for (DropOverload dropOverload : dropOverloads) { - int rand = random.nextInt(1_000_000); - if (rand < dropOverload.getDropsPerMillion()) { - logger.log( - XdsLogLevel.INFO, - "Drop request with category: {0}", dropOverload.getCategory()); - if (loadStatsStore != null) { - loadStatsStore.recordDroppedRequest(dropOverload.getCategory()); - } - return PickResult.withDrop( - Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory())); - } - } - return newPicker.pickSubchannel(args); - } - }; + SubchannelPicker picker = new RequestLimitingSubchannelPicker( + newPicker, dropPolicies, maxConcurrentRequests); delegate().updateBalancingState(newState, picker); } @@ -398,10 +393,100 @@ final class EdsLoadBalancer2 extends LoadBalancer { private void updateDropPolicies(List dropOverloads) { dropPolicies = dropOverloads; } + + private void updateMaxConcurrentRequests(long maxConcurrentRequests) { + this.maxConcurrentRequests = maxConcurrentRequests; + } + + private final class RequestLimitingSubchannelPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final List dropPolicies; + private final long maxConcurrentRequests; + + private RequestLimitingSubchannelPicker(SubchannelPicker delegate, + List dropPolicies, long maxConcurrentRequests) { + this.delegate = delegate; + this.dropPolicies = dropPolicies; + this.maxConcurrentRequests = maxConcurrentRequests; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + for (DropOverload dropOverload : dropPolicies) { + int rand = random.nextInt(1_000_000); + if (rand < dropOverload.getDropsPerMillion()) { + logger.log(XdsLogLevel.INFO, "Drop request with category: {0}", + dropOverload.getCategory()); + if (loadStatsStore != null) { + loadStatsStore.recordDroppedRequest(dropOverload.getCategory()); + } + return PickResult.withDrop( + Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory())); + } + } + PickResult result = delegate.pickSubchannel(args); + if (result.getStatus().isOk() && result.getSubchannel() != null) { + if (requestCount.get() >= maxConcurrentRequests) { + if (loadStatsStore != null) { + loadStatsStore.recordDroppedRequest(); + } + return PickResult.withDrop(Status.UNAVAILABLE.withDescription( + "Cluster max concurrent requests limit exceeded")); + } else { + ClientStreamTracer.Factory tracerFactory = new RequestCountingStreamTracerFactory( + result.getStreamTracerFactory(), requestCount); + return PickResult.withSubchannel(result.getSubchannel(), tracerFactory); + } + } + return result; + } + } } } } + /** + * Counts the number of outstanding requests. + */ + private static final class RequestCountingStreamTracerFactory + extends ClientStreamTracer.Factory { + @Nullable + private final ClientStreamTracer.Factory delegate; + private final AtomicLong counter; + + private RequestCountingStreamTracerFactory(@Nullable ClientStreamTracer.Factory delegate, + AtomicLong counter) { + this.delegate = delegate; + this.counter = counter; + } + + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + counter.incrementAndGet(); + if (delegate == null) { + return new ClientStreamTracer() { + @Override + public void streamClosed(Status status) { + counter.decrementAndGet(); + } + }; + } + final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers); + return new ForwardingClientStreamTracer() { + @Override + protected ClientStreamTracer delegate() { + return delegatedTracer; + } + + @Override + public void streamClosed(Status status) { + counter.decrementAndGet(); + delegate().streamClosed(status); + } + }; + } + } + @VisibleForTesting static PriorityLbConfig generatePriorityLbConfig( String cluster, String edsServiceName, String lrsServerName, diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java index 3130c42b70..d1d9ef4a5b 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java @@ -68,6 +68,8 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { final String edsServiceName; @Nullable final String lrsServerName; + @Nullable + final Long maxConcurrentRequests; final PolicySelection localityPickingPolicy; final PolicySelection endpointPickingPolicy; @@ -75,11 +77,13 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { String clusterName, @Nullable String edsServiceName, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.edsServiceName = edsServiceName; this.lrsServerName = lrsServerName; + this.maxConcurrentRequests = maxConcurrentRequests; this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy"); this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy"); } @@ -90,6 +94,7 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { .add("clusterName", clusterName) .add("edsServiceName", edsServiceName) .add("lrsServerName", lrsServerName) + .add("maxConcurrentRequests", maxConcurrentRequests) .add("localityPickingPolicy", localityPickingPolicy) .add("endpointPickingPolicy", endpointPickingPolicy) .toString(); diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager.java index 0bf26af076..ae58d80f44 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager.java @@ -153,10 +153,17 @@ final class LoadStatsManager { void removeLocality(Locality locality); /** - * Records a drop decision. + * Records a drop decision with the given category. * *

This method must be thread-safe. */ void recordDroppedRequest(String category); + + /** + * Records a uncategorized drop decision. + * + *

This method must be thread-safe. + */ + void recordDroppedRequest(); } } diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java index 977af60ebb..0fcb10e7ba 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java @@ -55,25 +55,21 @@ final class LoadStatsStoreImpl implements LoadStatsStore { @GuardedBy("this") private final Map> localityLoadCounters = new HashMap<>(); + private final AtomicLong uncategorizedDrops = new AtomicLong(); // Cluster level dropped request counts for each category decision. - private final ConcurrentMap dropCounters; + private final ConcurrentMap dropCounters = new ConcurrentHashMap<>(); private final Stopwatch stopwatch; LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName) { - this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get(), - new ConcurrentHashMap()); + this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get()); } @VisibleForTesting - LoadStatsStoreImpl( - String clusterName, - @Nullable String clusterServiceName, - Stopwatch stopwatch, - ConcurrentMap dropCounters) { + LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName, + Stopwatch stopwatch) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.clusterServiceName = clusterServiceName; this.stopwatch = checkNotNull(stopwatch, "stopwatch"); - this.dropCounters = checkNotNull(dropCounters, "dropCounters"); stopwatch.reset().start(); } @@ -109,11 +105,11 @@ final class LoadStatsStoreImpl implements LoadStatsStore { } } localityLoadCounters.keySet().removeAll(untrackedLocalities); - long totalDrops = 0; + long totalDrops = uncategorizedDrops.getAndSet(0); for (Map.Entry entry : dropCounters.entrySet()) { long drops = entry.getValue().getAndSet(0); totalDrops += drops; - statsBuilder.addDroppedRequests(new DroppedRequests(entry.getKey(),drops)); + statsBuilder.addDroppedRequests(new DroppedRequests(entry.getKey(), drops)); } statsBuilder.setTotalDroppedRequests(totalDrops); statsBuilder.setLoadReportIntervalNanos(stopwatch.elapsed(NANOSECONDS)); @@ -150,6 +146,11 @@ final class LoadStatsStoreImpl implements LoadStatsStore { counter.getAndIncrement(); } + @Override + public void recordDroppedRequest() { + uncategorizedDrops.getAndIncrement(); + } + static LoadStatsStoreFactory getDefaultFactory() { return new LoadStatsStoreFactory() { @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 93707edc81..c4fe483270 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -199,6 +199,8 @@ abstract class XdsClient { private final String lbPolicy; @Nullable private final String lrsServerName; + @Nullable + private final Long maxConcurrentRequests; private final UpstreamTlsContext upstreamTlsContext; private CdsUpdate( @@ -206,11 +208,13 @@ abstract class XdsClient { @Nullable String edsServiceName, String lbPolicy, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { this.clusterName = clusterName; this.edsServiceName = edsServiceName; this.lbPolicy = lbPolicy; this.lrsServerName = lrsServerName; + this.maxConcurrentRequests = maxConcurrentRequests; this.upstreamTlsContext = upstreamTlsContext; } @@ -243,6 +247,15 @@ abstract class XdsClient { return lrsServerName; } + /** + * Returns the maximum number of outstanding requests can be made to the upstream cluster, or + * {@code null} if not configured. + */ + @Nullable + Long getMaxConcurrentRequests() { + return maxConcurrentRequests; + } + /** Returns the {@link UpstreamTlsContext} for this cluster if present, else null. */ @Nullable UpstreamTlsContext getUpstreamTlsContext() { @@ -258,14 +271,15 @@ abstract class XdsClient { .add("edsServiceName", edsServiceName) .add("lbPolicy", lbPolicy) .add("lrsServerName", lrsServerName) + .add("maxConcurrentRequests", maxConcurrentRequests) .add("upstreamTlsContext", upstreamTlsContext) .toString(); } @Override public int hashCode() { - return Objects.hash( - clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); + return Objects.hash(clusterName, edsServiceName, lbPolicy, lrsServerName, + maxConcurrentRequests, upstreamTlsContext); } @Override @@ -281,6 +295,7 @@ abstract class XdsClient { && Objects.equals(edsServiceName, that.edsServiceName) && Objects.equals(lbPolicy, that.lbPolicy) && Objects.equals(lrsServerName, that.lrsServerName) + && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); } @@ -296,6 +311,8 @@ abstract class XdsClient { @Nullable private String lrsServerName; @Nullable + private Long maxConcurrentRequests; + @Nullable private UpstreamTlsContext upstreamTlsContext; private Builder() { @@ -321,6 +338,11 @@ abstract class XdsClient { return this; } + Builder setMaxConcurrentRequests(long maxConcurrentRequests) { + this.maxConcurrentRequests = maxConcurrentRequests; + return this; + } + Builder setUpstreamTlsContext(UpstreamTlsContext upstreamTlsContext) { this.upstreamTlsContext = upstreamTlsContext; return this; @@ -330,9 +352,8 @@ abstract class XdsClient { checkState(clusterName != null, "clusterName is not set"); checkState(lbPolicy != null, "lbPolicy is not set"); - return - new CdsUpdate( - clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); + return new CdsUpdate(clusterName, edsServiceName, lbPolicy, lrsServerName, + maxConcurrentRequests, upstreamTlsContext); } } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index 4adc6f4400..880ee47d34 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -127,7 +127,7 @@ public class CdsLoadBalancerTest { @Test public void receiveFirstClusterResourceInfo() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.EDS_POLICY_NAME); @@ -136,6 +136,7 @@ public class CdsLoadBalancerTest { assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); assertThat(edsConfig.edsServiceName).isNull(); assertThat(edsConfig.lrsServerName).isNull(); + assertThat(edsConfig.maxConcurrentRequests).isNull(); assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName()) .isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName()) @@ -155,7 +156,7 @@ public class CdsLoadBalancerTest { @Test public void clusterResourceRemoved() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); @@ -171,7 +172,7 @@ public class CdsLoadBalancerTest { @Test public void clusterResourceUpdated() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); EdsConfig edsConfig = (EdsConfig) childBalancer.config; assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); @@ -184,12 +185,14 @@ public class CdsLoadBalancerTest { String edsService = "service-bar.googleapis.com"; String loadReportServer = "lrs-server.googleapis.com"; - xdsClient.deliverClusterInfo(edsService, loadReportServer); + long maxConcurrentRequests = 50L; + xdsClient.deliverClusterInfo(edsService, loadReportServer, maxConcurrentRequests, null); assertThat(childBalancers).containsExactly(childBalancer); edsConfig = (EdsConfig) childBalancer.config; assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); assertThat(edsConfig.edsServiceName).isEqualTo(edsService); assertThat(edsConfig.lrsServerName).isEqualTo(loadReportServer); + assertThat(edsConfig.maxConcurrentRequests).isEqualTo(maxConcurrentRequests); assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName()) .isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName()) @@ -218,7 +221,7 @@ public class CdsLoadBalancerTest { assertThat(supplier.getUpstreamTlsContext()).isEqualTo(upstreamTlsContext); } - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); subchannel = childBalancer.helper.createSubchannel(args); for (EquivalentAddressGroup eag : subchannel.getAllAddresses()) { assertThat(eag.getAttributes().get(XdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER)) @@ -241,7 +244,7 @@ public class CdsLoadBalancerTest { @Test public void subchannelStatePropagateFromDownstreamToUpstream() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); List addresses = createEndpointAddresses(2); CreateSubchannelArgs args = @@ -267,7 +270,7 @@ public class CdsLoadBalancerTest { @Test public void clusterDiscoveryError_afterChildPolicyInstantiated_keepUsingCurrentCluster() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); xdsClient.deliverError(Status.UNAVAILABLE.withDescription("unreachable")); assertThat(currentState).isNull(); @@ -288,7 +291,7 @@ public class CdsLoadBalancerTest { @Test public void nameResolutionError_afterChildPolicyInstantiated_propagateToDownstream() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); loadBalancer.handleNameResolutionError( Status.UNAVAILABLE.withDescription("cannot reach server")); @@ -327,35 +330,46 @@ public class CdsLoadBalancerTest { } void deliverClusterInfo( - @Nullable final String edsServiceName, @Nullable final String lrsServerName) { + @Nullable final String edsServiceName, @Nullable final String lrsServerName, + final long maxConcurrentRequests, @Nullable final UpstreamTlsContext tlsContext) { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onChanged( - CdsUpdate.newBuilder() - .setClusterName(CLUSTER) - .setEdsServiceName(edsServiceName) - .setLbPolicy("round_robin") // only supported policy - .setLrsServerName(lrsServerName) - .build()); + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER); + if (edsServiceName != null) { + updateBuilder.setEdsServiceName(edsServiceName); + } + if (lrsServerName != null) { + updateBuilder.setLrsServerName(lrsServerName); + } + if (tlsContext != null) { + updateBuilder.setUpstreamTlsContext(tlsContext); + } + updateBuilder.setLbPolicy("round_robin"); // only supported policy + updateBuilder.setMaxConcurrentRequests(maxConcurrentRequests); + watcher.onChanged(updateBuilder.build()); } }); } void deliverClusterInfo( @Nullable final String edsServiceName, @Nullable final String lrsServerName, - final UpstreamTlsContext tlsContext) { + @Nullable final UpstreamTlsContext tlsContext) { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onChanged( - CdsUpdate.newBuilder() - .setClusterName(CLUSTER) - .setEdsServiceName(edsServiceName) - .setLbPolicy("round_robin") // only supported policy - .setLrsServerName(lrsServerName) - .setUpstreamTlsContext(tlsContext) - .build()); + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER); + if (edsServiceName != null) { + updateBuilder.setEdsServiceName(edsServiceName); + } + if (lrsServerName != null) { + updateBuilder.setLrsServerName(lrsServerName); + } + if (tlsContext != null) { + updateBuilder.setUpstreamTlsContext(tlsContext); + } + updateBuilder.setLbPolicy("round_robin"); // only supported policy + watcher.onChanged(updateBuilder.build()); } }); } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java index d8430245c5..6804f9bf39 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java @@ -42,10 +42,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; @@ -735,9 +740,42 @@ public class ClientXdsClientTest { assertThat(cdsUpdate.getEdsServiceName()).isNull(); assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); + assertThat(cdsUpdate.getMaxConcurrentRequests()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } + @Test + public void cdsResponseWithCircuitBreakers() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + Cluster cluster = buildCluster(CDS_RESOURCE, null, false); + cluster = cluster.toBuilder() + .setCircuitBreakers( + CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(50))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) + .build(); + DiscoveryResponse response = buildDiscoveryResponse("0", + Collections.singletonList(Any.pack(cluster)), ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + assertThat(cdsUpdate.getMaxConcurrentRequests()).isEqualTo(200L); + } + /** * CDS response containing UpstreamTlsContext for a cluster. */ diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java index 8ceb5f7ded..85adbeb243 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; @@ -40,6 +41,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; +import io.grpc.Metadata; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; @@ -48,6 +50,7 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; +import io.grpc.xds.EnvoyProtoData.ClusterStats; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; @@ -65,10 +68,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.junit.After; @@ -145,7 +145,7 @@ public class EdsLoadBalancer2Test { Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( new EdsConfig( - CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, roundRobin)) + CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget, roundRobin)) .build()); } @@ -153,7 +153,7 @@ public class EdsLoadBalancer2Test { public void tearDown() { loadBalancer.shutdown(); assertThat(xdsClient.watchers).isEmpty(); - assertThat(xdsClient.dropStats).isEmpty(); + assertThat(xdsClient.clusterStats).isEmpty(); assertThat(xdsClientRefs).isEqualTo(0); assertThat(downstreamBalancers).isEmpty(); } @@ -430,7 +430,17 @@ public class EdsLoadBalancer2Test { @Test public void handleDrops() { FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); - prepareRealDownstreamLbPolicies(fakeRoundRobinProvider); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints1 = @@ -452,13 +462,109 @@ public class EdsLoadBalancer2Test { assertThat(result.getStatus().isOk()).isFalse(); assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: throttle"); - assertThat(xdsClient.dropStats.get(EDS_SERVICE_NAME).get("throttle").get()).isEqualTo(1); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle")) + .isEqualTo(1); result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); } + @Test + public void maxConcurrentRequests_appliedByLbConfig() { + long maxConcurrentRequests = 100L; + FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); + LocalityLbEndpoints localityLbEndpoints1 = + buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1)); + assertThat(downstreamBalancers).hasSize(1); // one leaf balancer + FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(leafBalancer.name).isEqualTo("round_robin"); + assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")), + leafBalancer.addresses); + Subchannel subchannel = leafBalancer.helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); + leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + for (int i = 0; i < maxConcurrentRequests; i++) { + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(result.getStreamTracerFactory()).isNotNull(); + ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); + streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), + new Metadata()); + } + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L); + + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Cluster max concurrent requests limit exceeded"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + } + + @Test + public void maxConcurrentRequests_appliedWithDefaultValue() { + FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); + LocalityLbEndpoints localityLbEndpoints1 = + buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1)); + assertThat(downstreamBalancers).hasSize(1); // one leaf balancer + FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(leafBalancer.name).isEqualTo("round_robin"); + assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")), + leafBalancer.addresses); + Subchannel subchannel = leafBalancer.helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); + leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + for (int i = 0; i < EdsLoadBalancer2.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) { + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(result.getStreamTracerFactory()).isNotNull(); + ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); + streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), + new Metadata()); + } + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L); + + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Cluster max concurrent requests limit exceeded"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + } + @Test public void configUpdate_changeEdsServiceName_afterChildPolicyReady_switchGracefully() { deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB polices instantiated @@ -476,8 +582,8 @@ public class EdsLoadBalancer2Test { .setAttributes( Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( - new EdsConfig( - CLUSTER, newEdsServiceName, LRS_SERVER_NAME, weightedTarget, roundRobin)) + new EdsConfig(CLUSTER, newEdsServiceName, LRS_SERVER_NAME, null, weightedTarget, + roundRobin)) .build()); deliverSimpleClusterLoadAssignment(newEdsServiceName); // instantiate the new subtree assertThat(downstreamBalancers).hasSize(2); @@ -495,7 +601,17 @@ public class EdsLoadBalancer2Test { @Test public void configUpdate_changeEndpointPickingPolicy() { FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); - prepareRealDownstreamLbPolicies(fakeRoundRobinProvider); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB policies instantiated FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); assertThat(leafBalancer.name).isEqualTo("round_robin"); @@ -507,8 +623,8 @@ public class EdsLoadBalancer2Test { .setAttributes( Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( - new EdsConfig( - CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, fakePickFirstSelection)) + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget, + fakePickFirstSelection)) .build()); assertThat(leafBalancer.shutdown).isTrue(); leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -645,27 +761,15 @@ public class EdsLoadBalancer2Test { } /** - * Instantiates the downstream LB policy subtree with real implementations, except the leaf - * policy is replaced with a fake implementation to avoid creating connections. + * Prepare the LB registry with real LB policy implementations for downstream LB policies. */ - private void prepareRealDownstreamLbPolicies(FakeLoadBalancerProvider fakeLeafPolicyProvider) { + private PolicySelection prepareRealDownstreamLbPolicies() { registry.deregister(registry.getProvider(PRIORITY_POLICY_NAME)); registry.register(new PriorityLoadBalancerProvider()); registry.deregister(registry.getProvider(LRS_POLICY_NAME)); registry.register(new LrsLoadBalancerProvider()); - PolicySelection weightedTargetSelection = - new PolicySelection(new WeightedTargetLoadBalancerProvider(), null); - PolicySelection fakeLeafPolicySelection = - new PolicySelection(fakeLeafPolicyProvider, null); - loadBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes( - Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) - .setLoadBalancingPolicyConfig( - new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTargetSelection, - fakeLeafPolicySelection)) - .build()); + // weighted_target LB policy is not required to be in the registry + return new PolicySelection(new WeightedTargetLoadBalancerProvider(), null); } private static void assertLrsConfig( @@ -733,7 +837,7 @@ public class EdsLoadBalancer2Test { private final class FakeXdsClient extends XdsClient { private final Map watchers = new HashMap<>(); - private final Map> dropStats = new HashMap<>(); + private final Map clusterStats = new HashMap<>(); @Override void shutdown() { @@ -752,15 +856,14 @@ public class EdsLoadBalancer2Test { @Override LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) { - ConcurrentMap dropCounters = new ConcurrentHashMap<>(); - dropStats.put(clusterServiceName, dropCounters); - return new LoadStatsStoreImpl(clusterName, clusterServiceName, - fakeClock.getStopwatchSupplier().get(), dropCounters); + FakeLoadStatsStore stats = new FakeLoadStatsStore(); + clusterStats.put(clusterServiceName, stats); + return stats; } @Override void removeClientStats(String clusterName, @Nullable String clusterServiceName) { - dropStats.remove(clusterServiceName); + clusterStats.remove(clusterServiceName); } void deliverClusterLoadAssignment( @@ -812,6 +915,41 @@ public class EdsLoadBalancer2Test { } } + private static final class FakeLoadStatsStore implements LoadStatsStore { + private final Map categorizedDrops = new HashMap<>(); + private int totalDrops; + + @Override + public ClusterStats generateLoadReport() { + throw new UnsupportedOperationException("should not be called"); + } + + @Override + public ClientLoadCounter addLocality(Locality locality) { + return new ClientLoadCounter(); + } + + @Override + public void removeLocality(Locality locality) { + // no-op + } + + @Override + public void recordDroppedRequest(String category) { + if (!categorizedDrops.containsKey(category)) { + categorizedDrops.put(category, 1L); + } else { + categorizedDrops.put(category, categorizedDrops.get(category) + 1L); + } + totalDrops++; + } + + @Override + public void recordDroppedRequest() { + totalDrops++; + } + } + private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index ac2b61e4bc..d2f5f1a024 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -604,6 +604,11 @@ public class LoadReportClientTest { throw new UnsupportedOperationException("should not used"); } + @Override + public void recordDroppedRequest() { + throw new UnsupportedOperationException("should not used"); + } + private void refresh() { long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java index 127124afc2..7de89d183d 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java @@ -34,9 +34,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Test; @@ -52,14 +49,12 @@ public class LoadStatsStoreImplTest { private static final Locality LOCALITY2 = new Locality("test_region2", "test_zone", "test_subzone"); private final FakeClock fakeClock = new FakeClock(); - private ConcurrentMap dropCounters; private LoadStatsStore loadStatsStore; @Before public void setUp() { - dropCounters = new ConcurrentHashMap<>(); Stopwatch stopwatch = fakeClock.getStopwatchSupplier().get(); - loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch, dropCounters); + loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch); } private static List buildEndpointLoadMetricStatsList( @@ -101,20 +96,20 @@ public class LoadStatsStoreImplTest { private static ClusterStats buildClusterStats( @Nullable List upstreamLocalityStatsList, - @Nullable List droppedRequestsList, long intervalNano) { + @Nullable List droppedRequestsList, long totalDroppedRequests, + long intervalNano) { ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder(); clusterStatsBuilder.setClusterName(CLUSTER_NAME); if (upstreamLocalityStatsList != null) { clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList); } if (droppedRequestsList != null) { - long dropCount = 0; for (DroppedRequests drop : droppedRequestsList) { - dropCount += drop.getDroppedCount(); + totalDroppedRequests += drop.getDroppedCount(); clusterStatsBuilder.addDroppedRequests(drop); } - clusterStatsBuilder.setTotalDroppedRequests(dropCount); } + clusterStatsBuilder.setTotalDroppedRequests(totalDroppedRequests); clusterStatsBuilder.setLoadReportIntervalNanos(intervalNano); return clusterStatsBuilder.build(); } @@ -181,7 +176,7 @@ public class LoadStatsStoreImplTest { } @Test - public void loadReportContainsRecordedStats() { + public void recordCallAndMetricStats() { ClientLoadCounter counter1 = loadStatsStore.addLocality(LOCALITY1); counter1.setCallsSucceeded(4315); counter1.setCallsInProgress(3421); @@ -218,7 +213,7 @@ public class LoadStatsStoreImplTest { buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, buildEndpointLoadMetricStatsList(metrics2)) ), - null, 1000L); + null, 0L, 1000L); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); fakeClock.forwardNanos(2000L); @@ -228,31 +223,39 @@ public class LoadStatsStoreImplTest { buildUpstreamLocalityStats(LOCALITY1, 0, 3421, 0, 0, null), buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null) ), - null, 2000L); + null, 0L, 2000L); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); } @Test - public void recordingDroppedRequests() { + public void recordDroppedRequests() { int numLbDrop = 123; int numThrottleDrop = 456; + int uncategorizedDrop = 789; for (int i = 0; i < numLbDrop; i++) { loadStatsStore.recordDroppedRequest("lb"); } for (int i = 0; i < numThrottleDrop; i++) { loadStatsStore.recordDroppedRequest("throttle"); } - assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop); - assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop); + for (int i = 0; i < uncategorizedDrop; i++) { + loadStatsStore.recordDroppedRequest(); + } fakeClock.forwardNanos(1000L); ClusterStats expectedLoadReport = buildClusterStats(null, Arrays.asList(buildDroppedRequests("lb", numLbDrop), buildDroppedRequests("throttle", numThrottleDrop)), - 1000L); + 789L, 1000L); + assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport()); + + fakeClock.forwardNanos(1000L); + expectedLoadReport = + buildClusterStats(null, + Arrays.asList(buildDroppedRequests("lb", 0L), + buildDroppedRequests("throttle", 0L)), + 0L, 1000L); assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport()); - assertThat(dropCounters.get("lb").get()).isEqualTo(0); - assertThat(dropCounters.get("throttle").get()).isEqualTo(0); } } diff --git a/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java index c1d87d54d0..415477892c 100644 --- a/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java @@ -325,5 +325,10 @@ public class LrsLoadBalancerTest { public void recordDroppedRequest(String category) { throw new UnsupportedOperationException("should not be called"); } + + @Override + public void recordDroppedRequest() { + throw new UnsupportedOperationException("should not be called"); + } } }