diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index 545e574de2..03f4cd804e 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -218,6 +218,7 @@ final class CdsLoadBalancer extends LoadBalancer { /* clusterName = */ newUpdate.getClusterName(), /* edsServiceName = */ newUpdate.getEdsServiceName(), /* lrsServerName = */ newUpdate.getLrsServerName(), + /* maxConcurrentRequests = */ newUpdate.getMaxConcurrentRequests(), new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */), new PolicySelection(endpointPickingPolicyProvider, null)); if (isXdsSecurityEnabled()) { diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 8ddbff0d55..e7b00f684c 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -25,11 +25,13 @@ import com.google.common.base.Supplier; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.listener.v3.Listener; @@ -334,6 +336,17 @@ final class ClientXdsClient extends AbstractXdsClient { } updateBuilder.setLrsServerName(""); } + if (cluster.hasCircuitBreakers()) { + List thresholds = cluster.getCircuitBreakers().getThresholdsList(); + for (Thresholds threshold : thresholds) { + if (threshold.getPriority() != RoutingPriority.DEFAULT) { + continue; + } + if (threshold.hasMaxRequests()) { + updateBuilder.setMaxConcurrentRequests(threshold.getMaxRequests().getValue()); + } + } + } try { EnvoyServerProtoData.UpstreamTlsContext upstreamTlsContext = getTlsContextFromCluster(cluster); diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index 4ebd11f731..bfbeb34314 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -23,16 +23,20 @@ import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.util.ForwardingClientStreamTracer; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; @@ -57,9 +61,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; final class EdsLoadBalancer2 extends LoadBalancer { + @VisibleForTesting + static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L; private final XdsLogger logger; private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; @@ -96,8 +103,10 @@ final class EdsLoadBalancer2 extends LoadBalancer { EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); if (logger.isLoggable(XdsLogLevel.INFO)) { logger.log(XdsLogLevel.INFO, "Received EDS lb config: cluster={0}, " - + "eds_service_name={1}, endpoint_picking_policy={2}, report_load={3}", - config.clusterName, config.edsServiceName, + + "eds_service_name={1}, max_concurrent_requests={2}, locality_picking_policy={3}, " + + "endpoint_picking_policy={4}, report_load={5}", + config.clusterName, config.edsServiceName, config.maxConcurrentRequests, + config.localityPickingPolicy.getProvider().getPolicyName(), config.endpointPickingPolicy.getProvider().getPolicyName(), config.lrsServerName != null); } @@ -150,9 +159,10 @@ final class EdsLoadBalancer2 extends LoadBalancer { } private final class ChildLbState extends LoadBalancer implements EdsResourceWatcher { + private final AtomicLong requestCount = new AtomicLong(); @Nullable private final LoadStatsStore loadStatsStore; - private final DropHandlingLbHelper lbHelper; + private final RequestLimitingLbHelper lbHelper; private List endpointAddresses = Collections.emptyList(); private Map> prioritizedLocalityWeights = Collections.emptyMap(); @@ -169,7 +179,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { } else { loadStatsStore = null; } - lbHelper = new DropHandlingLbHelper(helper); + lbHelper = new RequestLimitingLbHelper(helper); logger.log( XdsLogLevel.INFO, "Start endpoint watcher on {0} with xDS client {1}", resourceName, xdsClient); @@ -180,6 +190,9 @@ final class EdsLoadBalancer2 extends LoadBalancer { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { this.resolvedAddresses = resolvedAddresses; EdsConfig config = (EdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + if (config.maxConcurrentRequests != null) { + lbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); + } if (lb != null) { if (!config.localityPickingPolicy.equals(localityPickingPolicy) || !config.endpointPickingPolicy.equals(endpointPickingPolicy)) { @@ -355,38 +368,20 @@ final class EdsLoadBalancer2 extends LoadBalancer { lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); } - private final class DropHandlingLbHelper extends ForwardingLoadBalancerHelper { + private final class RequestLimitingLbHelper extends ForwardingLoadBalancerHelper { private final Helper helper; private List dropPolicies = Collections.emptyList(); + private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; - private DropHandlingLbHelper(Helper helper) { + private RequestLimitingLbHelper(Helper helper) { this.helper = helper; } @Override public void updateBalancingState( ConnectivityState newState, final SubchannelPicker newPicker) { - SubchannelPicker picker = new SubchannelPicker() { - List dropOverloads = dropPolicies; - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - for (DropOverload dropOverload : dropOverloads) { - int rand = random.nextInt(1_000_000); - if (rand < dropOverload.getDropsPerMillion()) { - logger.log( - XdsLogLevel.INFO, - "Drop request with category: {0}", dropOverload.getCategory()); - if (loadStatsStore != null) { - loadStatsStore.recordDroppedRequest(dropOverload.getCategory()); - } - return PickResult.withDrop( - Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory())); - } - } - return newPicker.pickSubchannel(args); - } - }; + SubchannelPicker picker = new RequestLimitingSubchannelPicker( + newPicker, dropPolicies, maxConcurrentRequests); delegate().updateBalancingState(newState, picker); } @@ -398,10 +393,100 @@ final class EdsLoadBalancer2 extends LoadBalancer { private void updateDropPolicies(List dropOverloads) { dropPolicies = dropOverloads; } + + private void updateMaxConcurrentRequests(long maxConcurrentRequests) { + this.maxConcurrentRequests = maxConcurrentRequests; + } + + private final class RequestLimitingSubchannelPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final List dropPolicies; + private final long maxConcurrentRequests; + + private RequestLimitingSubchannelPicker(SubchannelPicker delegate, + List dropPolicies, long maxConcurrentRequests) { + this.delegate = delegate; + this.dropPolicies = dropPolicies; + this.maxConcurrentRequests = maxConcurrentRequests; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + for (DropOverload dropOverload : dropPolicies) { + int rand = random.nextInt(1_000_000); + if (rand < dropOverload.getDropsPerMillion()) { + logger.log(XdsLogLevel.INFO, "Drop request with category: {0}", + dropOverload.getCategory()); + if (loadStatsStore != null) { + loadStatsStore.recordDroppedRequest(dropOverload.getCategory()); + } + return PickResult.withDrop( + Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.getCategory())); + } + } + PickResult result = delegate.pickSubchannel(args); + if (result.getStatus().isOk() && result.getSubchannel() != null) { + if (requestCount.get() >= maxConcurrentRequests) { + if (loadStatsStore != null) { + loadStatsStore.recordDroppedRequest(); + } + return PickResult.withDrop(Status.UNAVAILABLE.withDescription( + "Cluster max concurrent requests limit exceeded")); + } else { + ClientStreamTracer.Factory tracerFactory = new RequestCountingStreamTracerFactory( + result.getStreamTracerFactory(), requestCount); + return PickResult.withSubchannel(result.getSubchannel(), tracerFactory); + } + } + return result; + } + } } } } + /** + * Counts the number of outstanding requests. + */ + private static final class RequestCountingStreamTracerFactory + extends ClientStreamTracer.Factory { + @Nullable + private final ClientStreamTracer.Factory delegate; + private final AtomicLong counter; + + private RequestCountingStreamTracerFactory(@Nullable ClientStreamTracer.Factory delegate, + AtomicLong counter) { + this.delegate = delegate; + this.counter = counter; + } + + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + counter.incrementAndGet(); + if (delegate == null) { + return new ClientStreamTracer() { + @Override + public void streamClosed(Status status) { + counter.decrementAndGet(); + } + }; + } + final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers); + return new ForwardingClientStreamTracer() { + @Override + protected ClientStreamTracer delegate() { + return delegatedTracer; + } + + @Override + public void streamClosed(Status status) { + counter.decrementAndGet(); + delegate().streamClosed(status); + } + }; + } + } + @VisibleForTesting static PriorityLbConfig generatePriorityLbConfig( String cluster, String edsServiceName, String lrsServerName, diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java index 3130c42b70..d1d9ef4a5b 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancerProvider.java @@ -68,6 +68,8 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { final String edsServiceName; @Nullable final String lrsServerName; + @Nullable + final Long maxConcurrentRequests; final PolicySelection localityPickingPolicy; final PolicySelection endpointPickingPolicy; @@ -75,11 +77,13 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { String clusterName, @Nullable String edsServiceName, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.edsServiceName = edsServiceName; this.lrsServerName = lrsServerName; + this.maxConcurrentRequests = maxConcurrentRequests; this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy"); this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy"); } @@ -90,6 +94,7 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider { .add("clusterName", clusterName) .add("edsServiceName", edsServiceName) .add("lrsServerName", lrsServerName) + .add("maxConcurrentRequests", maxConcurrentRequests) .add("localityPickingPolicy", localityPickingPolicy) .add("endpointPickingPolicy", endpointPickingPolicy) .toString(); diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager.java index 0bf26af076..ae58d80f44 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager.java @@ -153,10 +153,17 @@ final class LoadStatsManager { void removeLocality(Locality locality); /** - * Records a drop decision. + * Records a drop decision with the given category. * *

This method must be thread-safe. */ void recordDroppedRequest(String category); + + /** + * Records a uncategorized drop decision. + * + *

This method must be thread-safe. + */ + void recordDroppedRequest(); } } diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java index 977af60ebb..0fcb10e7ba 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java @@ -55,25 +55,21 @@ final class LoadStatsStoreImpl implements LoadStatsStore { @GuardedBy("this") private final Map> localityLoadCounters = new HashMap<>(); + private final AtomicLong uncategorizedDrops = new AtomicLong(); // Cluster level dropped request counts for each category decision. - private final ConcurrentMap dropCounters; + private final ConcurrentMap dropCounters = new ConcurrentHashMap<>(); private final Stopwatch stopwatch; LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName) { - this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get(), - new ConcurrentHashMap()); + this(clusterName, clusterServiceName, GrpcUtil.STOPWATCH_SUPPLIER.get()); } @VisibleForTesting - LoadStatsStoreImpl( - String clusterName, - @Nullable String clusterServiceName, - Stopwatch stopwatch, - ConcurrentMap dropCounters) { + LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName, + Stopwatch stopwatch) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.clusterServiceName = clusterServiceName; this.stopwatch = checkNotNull(stopwatch, "stopwatch"); - this.dropCounters = checkNotNull(dropCounters, "dropCounters"); stopwatch.reset().start(); } @@ -109,11 +105,11 @@ final class LoadStatsStoreImpl implements LoadStatsStore { } } localityLoadCounters.keySet().removeAll(untrackedLocalities); - long totalDrops = 0; + long totalDrops = uncategorizedDrops.getAndSet(0); for (Map.Entry entry : dropCounters.entrySet()) { long drops = entry.getValue().getAndSet(0); totalDrops += drops; - statsBuilder.addDroppedRequests(new DroppedRequests(entry.getKey(),drops)); + statsBuilder.addDroppedRequests(new DroppedRequests(entry.getKey(), drops)); } statsBuilder.setTotalDroppedRequests(totalDrops); statsBuilder.setLoadReportIntervalNanos(stopwatch.elapsed(NANOSECONDS)); @@ -150,6 +146,11 @@ final class LoadStatsStoreImpl implements LoadStatsStore { counter.getAndIncrement(); } + @Override + public void recordDroppedRequest() { + uncategorizedDrops.getAndIncrement(); + } + static LoadStatsStoreFactory getDefaultFactory() { return new LoadStatsStoreFactory() { @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 93707edc81..c4fe483270 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -199,6 +199,8 @@ abstract class XdsClient { private final String lbPolicy; @Nullable private final String lrsServerName; + @Nullable + private final Long maxConcurrentRequests; private final UpstreamTlsContext upstreamTlsContext; private CdsUpdate( @@ -206,11 +208,13 @@ abstract class XdsClient { @Nullable String edsServiceName, String lbPolicy, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { this.clusterName = clusterName; this.edsServiceName = edsServiceName; this.lbPolicy = lbPolicy; this.lrsServerName = lrsServerName; + this.maxConcurrentRequests = maxConcurrentRequests; this.upstreamTlsContext = upstreamTlsContext; } @@ -243,6 +247,15 @@ abstract class XdsClient { return lrsServerName; } + /** + * Returns the maximum number of outstanding requests can be made to the upstream cluster, or + * {@code null} if not configured. + */ + @Nullable + Long getMaxConcurrentRequests() { + return maxConcurrentRequests; + } + /** Returns the {@link UpstreamTlsContext} for this cluster if present, else null. */ @Nullable UpstreamTlsContext getUpstreamTlsContext() { @@ -258,14 +271,15 @@ abstract class XdsClient { .add("edsServiceName", edsServiceName) .add("lbPolicy", lbPolicy) .add("lrsServerName", lrsServerName) + .add("maxConcurrentRequests", maxConcurrentRequests) .add("upstreamTlsContext", upstreamTlsContext) .toString(); } @Override public int hashCode() { - return Objects.hash( - clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); + return Objects.hash(clusterName, edsServiceName, lbPolicy, lrsServerName, + maxConcurrentRequests, upstreamTlsContext); } @Override @@ -281,6 +295,7 @@ abstract class XdsClient { && Objects.equals(edsServiceName, that.edsServiceName) && Objects.equals(lbPolicy, that.lbPolicy) && Objects.equals(lrsServerName, that.lrsServerName) + && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); } @@ -296,6 +311,8 @@ abstract class XdsClient { @Nullable private String lrsServerName; @Nullable + private Long maxConcurrentRequests; + @Nullable private UpstreamTlsContext upstreamTlsContext; private Builder() { @@ -321,6 +338,11 @@ abstract class XdsClient { return this; } + Builder setMaxConcurrentRequests(long maxConcurrentRequests) { + this.maxConcurrentRequests = maxConcurrentRequests; + return this; + } + Builder setUpstreamTlsContext(UpstreamTlsContext upstreamTlsContext) { this.upstreamTlsContext = upstreamTlsContext; return this; @@ -330,9 +352,8 @@ abstract class XdsClient { checkState(clusterName != null, "clusterName is not set"); checkState(lbPolicy != null, "lbPolicy is not set"); - return - new CdsUpdate( - clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); + return new CdsUpdate(clusterName, edsServiceName, lbPolicy, lrsServerName, + maxConcurrentRequests, upstreamTlsContext); } } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index 4adc6f4400..880ee47d34 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -127,7 +127,7 @@ public class CdsLoadBalancerTest { @Test public void receiveFirstClusterResourceInfo() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.EDS_POLICY_NAME); @@ -136,6 +136,7 @@ public class CdsLoadBalancerTest { assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); assertThat(edsConfig.edsServiceName).isNull(); assertThat(edsConfig.lrsServerName).isNull(); + assertThat(edsConfig.maxConcurrentRequests).isNull(); assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName()) .isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName()) @@ -155,7 +156,7 @@ public class CdsLoadBalancerTest { @Test public void clusterResourceRemoved() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); @@ -171,7 +172,7 @@ public class CdsLoadBalancerTest { @Test public void clusterResourceUpdated() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); EdsConfig edsConfig = (EdsConfig) childBalancer.config; assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); @@ -184,12 +185,14 @@ public class CdsLoadBalancerTest { String edsService = "service-bar.googleapis.com"; String loadReportServer = "lrs-server.googleapis.com"; - xdsClient.deliverClusterInfo(edsService, loadReportServer); + long maxConcurrentRequests = 50L; + xdsClient.deliverClusterInfo(edsService, loadReportServer, maxConcurrentRequests, null); assertThat(childBalancers).containsExactly(childBalancer); edsConfig = (EdsConfig) childBalancer.config; assertThat(edsConfig.clusterName).isEqualTo(CLUSTER); assertThat(edsConfig.edsServiceName).isEqualTo(edsService); assertThat(edsConfig.lrsServerName).isEqualTo(loadReportServer); + assertThat(edsConfig.maxConcurrentRequests).isEqualTo(maxConcurrentRequests); assertThat(edsConfig.localityPickingPolicy.getProvider().getPolicyName()) .isEqualTo(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded to weighted-target assertThat(edsConfig.endpointPickingPolicy.getProvider().getPolicyName()) @@ -218,7 +221,7 @@ public class CdsLoadBalancerTest { assertThat(supplier.getUpstreamTlsContext()).isEqualTo(upstreamTlsContext); } - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); subchannel = childBalancer.helper.createSubchannel(args); for (EquivalentAddressGroup eag : subchannel.getAllAddresses()) { assertThat(eag.getAttributes().get(XdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER)) @@ -241,7 +244,7 @@ public class CdsLoadBalancerTest { @Test public void subchannelStatePropagateFromDownstreamToUpstream() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); List addresses = createEndpointAddresses(2); CreateSubchannelArgs args = @@ -267,7 +270,7 @@ public class CdsLoadBalancerTest { @Test public void clusterDiscoveryError_afterChildPolicyInstantiated_keepUsingCurrentCluster() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); xdsClient.deliverError(Status.UNAVAILABLE.withDescription("unreachable")); assertThat(currentState).isNull(); @@ -288,7 +291,7 @@ public class CdsLoadBalancerTest { @Test public void nameResolutionError_afterChildPolicyInstantiated_propagateToDownstream() { - xdsClient.deliverClusterInfo(null, null); + xdsClient.deliverClusterInfo(null, null, null); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); loadBalancer.handleNameResolutionError( Status.UNAVAILABLE.withDescription("cannot reach server")); @@ -327,35 +330,46 @@ public class CdsLoadBalancerTest { } void deliverClusterInfo( - @Nullable final String edsServiceName, @Nullable final String lrsServerName) { + @Nullable final String edsServiceName, @Nullable final String lrsServerName, + final long maxConcurrentRequests, @Nullable final UpstreamTlsContext tlsContext) { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onChanged( - CdsUpdate.newBuilder() - .setClusterName(CLUSTER) - .setEdsServiceName(edsServiceName) - .setLbPolicy("round_robin") // only supported policy - .setLrsServerName(lrsServerName) - .build()); + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER); + if (edsServiceName != null) { + updateBuilder.setEdsServiceName(edsServiceName); + } + if (lrsServerName != null) { + updateBuilder.setLrsServerName(lrsServerName); + } + if (tlsContext != null) { + updateBuilder.setUpstreamTlsContext(tlsContext); + } + updateBuilder.setLbPolicy("round_robin"); // only supported policy + updateBuilder.setMaxConcurrentRequests(maxConcurrentRequests); + watcher.onChanged(updateBuilder.build()); } }); } void deliverClusterInfo( @Nullable final String edsServiceName, @Nullable final String lrsServerName, - final UpstreamTlsContext tlsContext) { + @Nullable final UpstreamTlsContext tlsContext) { syncContext.execute(new Runnable() { @Override public void run() { - watcher.onChanged( - CdsUpdate.newBuilder() - .setClusterName(CLUSTER) - .setEdsServiceName(edsServiceName) - .setLbPolicy("round_robin") // only supported policy - .setLrsServerName(lrsServerName) - .setUpstreamTlsContext(tlsContext) - .build()); + CdsUpdate.Builder updateBuilder = CdsUpdate.newBuilder().setClusterName(CLUSTER); + if (edsServiceName != null) { + updateBuilder.setEdsServiceName(edsServiceName); + } + if (lrsServerName != null) { + updateBuilder.setLrsServerName(lrsServerName); + } + if (tlsContext != null) { + updateBuilder.setUpstreamTlsContext(tlsContext); + } + updateBuilder.setLbPolicy("round_robin"); // only supported policy + watcher.onChanged(updateBuilder.build()); } }); } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java index d8430245c5..6804f9bf39 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTest.java @@ -42,10 +42,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; +import com.google.protobuf.UInt32Value; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; +import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.ConfigSource; import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; @@ -735,9 +740,42 @@ public class ClientXdsClientTest { assertThat(cdsUpdate.getEdsServiceName()).isNull(); assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.getLrsServerName()).isNull(); + assertThat(cdsUpdate.getMaxConcurrentRequests()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } + @Test + public void cdsResponseWithCircuitBreakers() { + RpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + Cluster cluster = buildCluster(CDS_RESOURCE, null, false); + cluster = cluster.toBuilder() + .setCircuitBreakers( + CircuitBreakers.newBuilder() + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.HIGH) + .setMaxRequests(UInt32Value.newBuilder().setValue(50))) + .addThresholds( + Thresholds.newBuilder() + .setPriority(RoutingPriority.DEFAULT) + .setMaxRequests(UInt32Value.newBuilder().setValue(200)))) + .build(); + DiscoveryResponse response = buildDiscoveryResponse("0", + Collections.singletonList(Any.pack(cluster)), ResourceType.CDS.typeUrl(), "0000"); + call.responseObserver.onNext(response); + + verify(call.requestObserver).onNext( + eq(buildDiscoveryRequest(NODE, "0", CDS_RESOURCE, ResourceType.CDS.typeUrl(), "0000"))); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.getClusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.getEdsServiceName()).isNull(); + assertThat(cdsUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.getLrsServerName()).isNull(); + assertThat(cdsUpdate.getMaxConcurrentRequests()).isEqualTo(200L); + } + /** * CDS response containing UpstreamTlsContext for a cluster. */ diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java index 8ceb5f7ded..85adbeb243 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancer2Test.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; @@ -40,6 +41,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; +import io.grpc.Metadata; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; @@ -48,6 +50,7 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig; +import io.grpc.xds.EnvoyProtoData.ClusterStats; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; @@ -65,10 +68,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.junit.After; @@ -145,7 +145,7 @@ public class EdsLoadBalancer2Test { Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( new EdsConfig( - CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, roundRobin)) + CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget, roundRobin)) .build()); } @@ -153,7 +153,7 @@ public class EdsLoadBalancer2Test { public void tearDown() { loadBalancer.shutdown(); assertThat(xdsClient.watchers).isEmpty(); - assertThat(xdsClient.dropStats).isEmpty(); + assertThat(xdsClient.clusterStats).isEmpty(); assertThat(xdsClientRefs).isEqualTo(0); assertThat(downstreamBalancers).isEmpty(); } @@ -430,7 +430,17 @@ public class EdsLoadBalancer2Test { @Test public void handleDrops() { FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); - prepareRealDownstreamLbPolicies(fakeRoundRobinProvider); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 1_000_000); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints1 = @@ -452,13 +462,109 @@ public class EdsLoadBalancer2Test { assertThat(result.getStatus().isOk()).isFalse(); assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: throttle"); - assertThat(xdsClient.dropStats.get(EDS_SERVICE_NAME).get("throttle").get()).isEqualTo(1); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).categorizedDrops.get("throttle")) + .isEqualTo(1); result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); } + @Test + public void maxConcurrentRequests_appliedByLbConfig() { + long maxConcurrentRequests = 100L; + FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, maxConcurrentRequests, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); + LocalityLbEndpoints localityLbEndpoints1 = + buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1)); + assertThat(downstreamBalancers).hasSize(1); // one leaf balancer + FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(leafBalancer.name).isEqualTo("round_robin"); + assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")), + leafBalancer.addresses); + Subchannel subchannel = leafBalancer.helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); + leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + for (int i = 0; i < maxConcurrentRequests; i++) { + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(result.getStreamTracerFactory()).isNotNull(); + ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); + streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), + new Metadata()); + } + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L); + + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Cluster max concurrent requests limit exceeded"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + } + + @Test + public void maxConcurrentRequests_appliedWithDefaultValue() { + FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); + LocalityLbEndpoints localityLbEndpoints1 = + buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME, Collections.singletonMap(locality1, localityLbEndpoints1)); + assertThat(downstreamBalancers).hasSize(1); // one leaf balancer + FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); + assertThat(leafBalancer.name).isEqualTo("round_robin"); + assertAddressesEqual(Collections.singletonList(makeAddress("endpoint-addr-1")), + leafBalancer.addresses); + Subchannel subchannel = leafBalancer.helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); + leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + for (int i = 0; i < EdsLoadBalancer2.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) { + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isTrue(); + assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); + assertThat(result.getStreamTracerFactory()).isNotNull(); + ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); + streamTracerFactory.newClientStreamTracer(ClientStreamTracer.StreamInfo.newBuilder().build(), + new Metadata()); + } + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(0L); + + PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().isOk()).isFalse(); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .isEqualTo("Cluster max concurrent requests limit exceeded"); + assertThat(xdsClient.clusterStats.get(EDS_SERVICE_NAME).totalDrops).isEqualTo(1L); + } + @Test public void configUpdate_changeEdsServiceName_afterChildPolicyReady_switchGracefully() { deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB polices instantiated @@ -476,8 +582,8 @@ public class EdsLoadBalancer2Test { .setAttributes( Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( - new EdsConfig( - CLUSTER, newEdsServiceName, LRS_SERVER_NAME, weightedTarget, roundRobin)) + new EdsConfig(CLUSTER, newEdsServiceName, LRS_SERVER_NAME, null, weightedTarget, + roundRobin)) .build()); deliverSimpleClusterLoadAssignment(newEdsServiceName); // instantiate the new subtree assertThat(downstreamBalancers).hasSize(2); @@ -495,7 +601,17 @@ public class EdsLoadBalancer2Test { @Test public void configUpdate_changeEndpointPickingPolicy() { FakeLoadBalancerProvider fakeRoundRobinProvider = new FakeLoadBalancerProvider("round_robin"); - prepareRealDownstreamLbPolicies(fakeRoundRobinProvider); + PolicySelection fakeRoundRobinSelection = new PolicySelection(fakeRoundRobinProvider, null); + PolicySelection weightedTargetSelection = prepareRealDownstreamLbPolicies(); + loadBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes( + Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) + .setLoadBalancingPolicyConfig( + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, + weightedTargetSelection, fakeRoundRobinSelection)) + .build()); deliverSimpleClusterLoadAssignment(EDS_SERVICE_NAME); // downstream LB policies instantiated FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); assertThat(leafBalancer.name).isEqualTo("round_robin"); @@ -507,8 +623,8 @@ public class EdsLoadBalancer2Test { .setAttributes( Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) .setLoadBalancingPolicyConfig( - new EdsConfig( - CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTarget, fakePickFirstSelection)) + new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, null, weightedTarget, + fakePickFirstSelection)) .build()); assertThat(leafBalancer.shutdown).isTrue(); leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -645,27 +761,15 @@ public class EdsLoadBalancer2Test { } /** - * Instantiates the downstream LB policy subtree with real implementations, except the leaf - * policy is replaced with a fake implementation to avoid creating connections. + * Prepare the LB registry with real LB policy implementations for downstream LB policies. */ - private void prepareRealDownstreamLbPolicies(FakeLoadBalancerProvider fakeLeafPolicyProvider) { + private PolicySelection prepareRealDownstreamLbPolicies() { registry.deregister(registry.getProvider(PRIORITY_POLICY_NAME)); registry.register(new PriorityLoadBalancerProvider()); registry.deregister(registry.getProvider(LRS_POLICY_NAME)); registry.register(new LrsLoadBalancerProvider()); - PolicySelection weightedTargetSelection = - new PolicySelection(new WeightedTargetLoadBalancerProvider(), null); - PolicySelection fakeLeafPolicySelection = - new PolicySelection(fakeLeafPolicyProvider, null); - loadBalancer.handleResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(Collections.emptyList()) - .setAttributes( - Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build()) - .setLoadBalancingPolicyConfig( - new EdsConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, weightedTargetSelection, - fakeLeafPolicySelection)) - .build()); + // weighted_target LB policy is not required to be in the registry + return new PolicySelection(new WeightedTargetLoadBalancerProvider(), null); } private static void assertLrsConfig( @@ -733,7 +837,7 @@ public class EdsLoadBalancer2Test { private final class FakeXdsClient extends XdsClient { private final Map watchers = new HashMap<>(); - private final Map> dropStats = new HashMap<>(); + private final Map clusterStats = new HashMap<>(); @Override void shutdown() { @@ -752,15 +856,14 @@ public class EdsLoadBalancer2Test { @Override LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) { - ConcurrentMap dropCounters = new ConcurrentHashMap<>(); - dropStats.put(clusterServiceName, dropCounters); - return new LoadStatsStoreImpl(clusterName, clusterServiceName, - fakeClock.getStopwatchSupplier().get(), dropCounters); + FakeLoadStatsStore stats = new FakeLoadStatsStore(); + clusterStats.put(clusterServiceName, stats); + return stats; } @Override void removeClientStats(String clusterName, @Nullable String clusterServiceName) { - dropStats.remove(clusterServiceName); + clusterStats.remove(clusterServiceName); } void deliverClusterLoadAssignment( @@ -812,6 +915,41 @@ public class EdsLoadBalancer2Test { } } + private static final class FakeLoadStatsStore implements LoadStatsStore { + private final Map categorizedDrops = new HashMap<>(); + private int totalDrops; + + @Override + public ClusterStats generateLoadReport() { + throw new UnsupportedOperationException("should not be called"); + } + + @Override + public ClientLoadCounter addLocality(Locality locality) { + return new ClientLoadCounter(); + } + + @Override + public void removeLocality(Locality locality) { + // no-op + } + + @Override + public void recordDroppedRequest(String category) { + if (!categorizedDrops.containsKey(category)) { + categorizedDrops.put(category, 1L); + } else { + categorizedDrops.put(category, categorizedDrops.get(category) + 1L); + } + totalDrops++; + } + + @Override + public void recordDroppedRequest() { + totalDrops++; + } + } + private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index ac2b61e4bc..d2f5f1a024 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -604,6 +604,11 @@ public class LoadReportClientTest { throw new UnsupportedOperationException("should not used"); } + @Override + public void recordDroppedRequest() { + throw new UnsupportedOperationException("should not used"); + } + private void refresh() { long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java index 127124afc2..7de89d183d 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java @@ -34,9 +34,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Test; @@ -52,14 +49,12 @@ public class LoadStatsStoreImplTest { private static final Locality LOCALITY2 = new Locality("test_region2", "test_zone", "test_subzone"); private final FakeClock fakeClock = new FakeClock(); - private ConcurrentMap dropCounters; private LoadStatsStore loadStatsStore; @Before public void setUp() { - dropCounters = new ConcurrentHashMap<>(); Stopwatch stopwatch = fakeClock.getStopwatchSupplier().get(); - loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch, dropCounters); + loadStatsStore = new LoadStatsStoreImpl(CLUSTER_NAME, null, stopwatch); } private static List buildEndpointLoadMetricStatsList( @@ -101,20 +96,20 @@ public class LoadStatsStoreImplTest { private static ClusterStats buildClusterStats( @Nullable List upstreamLocalityStatsList, - @Nullable List droppedRequestsList, long intervalNano) { + @Nullable List droppedRequestsList, long totalDroppedRequests, + long intervalNano) { ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder(); clusterStatsBuilder.setClusterName(CLUSTER_NAME); if (upstreamLocalityStatsList != null) { clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList); } if (droppedRequestsList != null) { - long dropCount = 0; for (DroppedRequests drop : droppedRequestsList) { - dropCount += drop.getDroppedCount(); + totalDroppedRequests += drop.getDroppedCount(); clusterStatsBuilder.addDroppedRequests(drop); } - clusterStatsBuilder.setTotalDroppedRequests(dropCount); } + clusterStatsBuilder.setTotalDroppedRequests(totalDroppedRequests); clusterStatsBuilder.setLoadReportIntervalNanos(intervalNano); return clusterStatsBuilder.build(); } @@ -181,7 +176,7 @@ public class LoadStatsStoreImplTest { } @Test - public void loadReportContainsRecordedStats() { + public void recordCallAndMetricStats() { ClientLoadCounter counter1 = loadStatsStore.addLocality(LOCALITY1); counter1.setCallsSucceeded(4315); counter1.setCallsInProgress(3421); @@ -218,7 +213,7 @@ public class LoadStatsStoreImplTest { buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, buildEndpointLoadMetricStatsList(metrics2)) ), - null, 1000L); + null, 0L, 1000L); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); fakeClock.forwardNanos(2000L); @@ -228,31 +223,39 @@ public class LoadStatsStoreImplTest { buildUpstreamLocalityStats(LOCALITY1, 0, 3421, 0, 0, null), buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null) ), - null, 2000L); + null, 0L, 2000L); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); } @Test - public void recordingDroppedRequests() { + public void recordDroppedRequests() { int numLbDrop = 123; int numThrottleDrop = 456; + int uncategorizedDrop = 789; for (int i = 0; i < numLbDrop; i++) { loadStatsStore.recordDroppedRequest("lb"); } for (int i = 0; i < numThrottleDrop; i++) { loadStatsStore.recordDroppedRequest("throttle"); } - assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop); - assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop); + for (int i = 0; i < uncategorizedDrop; i++) { + loadStatsStore.recordDroppedRequest(); + } fakeClock.forwardNanos(1000L); ClusterStats expectedLoadReport = buildClusterStats(null, Arrays.asList(buildDroppedRequests("lb", numLbDrop), buildDroppedRequests("throttle", numThrottleDrop)), - 1000L); + 789L, 1000L); + assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport()); + + fakeClock.forwardNanos(1000L); + expectedLoadReport = + buildClusterStats(null, + Arrays.asList(buildDroppedRequests("lb", 0L), + buildDroppedRequests("throttle", 0L)), + 0L, 1000L); assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport()); - assertThat(dropCounters.get("lb").get()).isEqualTo(0); - assertThat(dropCounters.get("throttle").get()).isEqualTo(0); } } diff --git a/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java index c1d87d54d0..415477892c 100644 --- a/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java @@ -325,5 +325,10 @@ public class LrsLoadBalancerTest { public void recordDroppedRequest(String category) { throw new UnsupportedOperationException("should not be called"); } + + @Override + public void recordDroppedRequest() { + throw new UnsupportedOperationException("should not be called"); + } } }