mirror of https://github.com/grpc/grpc-java.git
xds: refactor XdsLoadReportClient and XdsLoadStatsStore in order to integrate with XdsLoadBalancer (part 1) (#5863)
* extract self-defined Locality into XdsLocality class
* separate out functionalities for recording client load from lrsClient, xds load balancer will directly interact with XdsLoadStatsStore to set up locality counters
* added GRPC to constant TRAFFICDIRECTOR_HOSTNAME_FIELD name to better match that in XdsComms
* fixed bug of using the wrong cluster name in load report's ClusterStats, it should be GSLB service name, which is responsed by load report response (same as that in EDS response).
* added a new line to the end of files.
* Revert "fixed bug of using the wrong cluster name in load report's ClusterStats, it should be GSLB service name, which is responsed by load report response (same as that in EDS response)."
This reverts commit 6097dd4066.
* rephrase interface comment for StatsStore
* added equality and hashCode test for XdsLocality
This commit is contained in:
parent
c98fb2d03e
commit
213b91b165
|
|
@ -35,7 +35,8 @@ dependencies {
|
|||
|
||||
compileOnly libraries.javax_annotation
|
||||
|
||||
testCompile project(':grpc-testing')
|
||||
testCompile project(':grpc-testing'),
|
||||
libraries.guava_testlib
|
||||
signature "org.codehaus.mojo.signature:java17:1.0@signature"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ import io.grpc.Status;
|
|||
import io.grpc.util.ForwardingLoadBalancerHelper;
|
||||
import io.grpc.xds.InterLocalityPicker.WeightedChildPicker;
|
||||
import io.grpc.xds.XdsComms.DropOverload;
|
||||
import io.grpc.xds.XdsComms.Locality;
|
||||
import io.grpc.xds.XdsComms.LocalityInfo;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.ArrayList;
|
||||
|
|
@ -64,7 +63,7 @@ interface LocalityStore {
|
|||
|
||||
void reset();
|
||||
|
||||
void updateLocalityStore(Map<Locality, LocalityInfo> localityInfoMap);
|
||||
void updateLocalityStore(Map<XdsLocality, LocalityInfo> localityInfoMap);
|
||||
|
||||
void updateDropPercentage(ImmutableList<DropOverload> dropOverloads);
|
||||
|
||||
|
|
@ -78,7 +77,7 @@ interface LocalityStore {
|
|||
private final LoadBalancerProvider loadBalancerProvider;
|
||||
private final ThreadSafeRandom random;
|
||||
|
||||
private Map<Locality, LocalityLbInfo> localityMap = new HashMap<>();
|
||||
private Map<XdsLocality, LocalityLbInfo> localityMap = new HashMap<>();
|
||||
private ImmutableList<DropOverload> dropOverloads = ImmutableList.of();
|
||||
|
||||
LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
|
||||
|
|
@ -157,13 +156,13 @@ interface LocalityStore {
|
|||
|
||||
// This is triggered by EDS response.
|
||||
@Override
|
||||
public void updateLocalityStore(Map<Locality, LocalityInfo> localityInfoMap) {
|
||||
Set<Locality> oldLocalities = localityMap.keySet();
|
||||
Set<Locality> newLocalities = localityInfoMap.keySet();
|
||||
public void updateLocalityStore(Map<XdsLocality, LocalityInfo> localityInfoMap) {
|
||||
Set<XdsLocality> oldLocalities = localityMap.keySet();
|
||||
Set<XdsLocality> newLocalities = localityInfoMap.keySet();
|
||||
|
||||
Iterator<Locality> iterator = oldLocalities.iterator();
|
||||
Iterator<XdsLocality> iterator = oldLocalities.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Locality oldLocality = iterator.next();
|
||||
XdsLocality oldLocality = iterator.next();
|
||||
if (!newLocalities.contains(oldLocality)) {
|
||||
// No graceful transition until a high-level lb graceful transition design is available.
|
||||
localityMap.get(oldLocality).shutdown();
|
||||
|
|
@ -177,7 +176,7 @@ interface LocalityStore {
|
|||
|
||||
ConnectivityState newState = null;
|
||||
List<WeightedChildPicker> childPickers = new ArrayList<>(newLocalities.size());
|
||||
for (Locality newLocality : newLocalities) {
|
||||
for (XdsLocality newLocality : newLocalities) {
|
||||
|
||||
// Assuming standard mode only (EDS response with a list of endpoints) for now
|
||||
List<EquivalentAddressGroup> newEags = localityInfoMap.get(newLocality).eags;
|
||||
|
|
@ -242,7 +241,7 @@ interface LocalityStore {
|
|||
}
|
||||
|
||||
private void updateChildState(
|
||||
Locality locality, ConnectivityState newChildState, SubchannelPicker newChildPicker) {
|
||||
XdsLocality locality, ConnectivityState newChildState, SubchannelPicker newChildPicker) {
|
||||
if (!localityMap.containsKey(locality)) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -250,7 +249,7 @@ interface LocalityStore {
|
|||
List<WeightedChildPicker> childPickers = new ArrayList<>();
|
||||
|
||||
ConnectivityState overallState = null;
|
||||
for (Locality l : localityMap.keySet()) {
|
||||
for (XdsLocality l : localityMap.keySet()) {
|
||||
LocalityLbInfo localityLbInfo = localityMap.get(l);
|
||||
ConnectivityState childState;
|
||||
SubchannelPicker childPicker;
|
||||
|
|
@ -324,12 +323,12 @@ interface LocalityStore {
|
|||
|
||||
class ChildHelper extends ForwardingLoadBalancerHelper {
|
||||
|
||||
private final Locality locality;
|
||||
private final XdsLocality locality;
|
||||
|
||||
private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER;
|
||||
private ConnectivityState currentChildState = null;
|
||||
|
||||
ChildHelper(Locality locality) {
|
||||
ChildHelper(XdsLocality locality) {
|
||||
this.locality = checkNotNull(locality, "locality");
|
||||
}
|
||||
|
||||
|
|
@ -359,7 +358,7 @@ interface LocalityStore {
|
|||
@Override
|
||||
public String getAuthority() {
|
||||
//FIXME: This should be a new proposed field of Locality, locality_name
|
||||
return locality.subzone;
|
||||
return locality.getSubzone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.xds.XdsLoadStatsStore.StatsCounter;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Interface for client side load stats store. A {@code StatsStore} implementation should only be
|
||||
* responsible for keeping track of load data aggregation, any load reporting information should
|
||||
* be opaque to {@code StatsStore} and be set outside.
|
||||
*/
|
||||
interface StatsStore {
|
||||
/**
|
||||
* Generates a {@link ClusterStats} containing load stats and backend metrics in locality
|
||||
* granularity, as well service level drop stats for the interval since the previous call of
|
||||
* this method. The fields cluster_name and load_report_interval in the returned
|
||||
* {@link ClusterStats} needs to be set before it is ready to be sent to the traffic directory
|
||||
* for load reporting.
|
||||
*
|
||||
* <p>This method should be called in the same synchronized context that
|
||||
* {@link XdsLoadBalancer.Helper#getSynchronizationContext} returns.
|
||||
*/
|
||||
ClusterStats generateLoadReport();
|
||||
|
||||
/**
|
||||
* Tracks load stats for endpoints in the provided locality. To be called upon balancer locality
|
||||
* updates only for newly assigned localities. Only load stats for endpoints in added localities
|
||||
* will be reported to the remote balancer. This method needs to be called at locality updates
|
||||
* only for newly assigned localities in balancer discovery responses.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
|
||||
*/
|
||||
void addLocality(XdsLocality locality);
|
||||
|
||||
/**
|
||||
* Stops tracking load stats for endpoints in the provided locality. To be called upon balancer
|
||||
* locality updates only for newly removed localities. Load stats for endpoints in removed
|
||||
* localities will no longer be reported to the remote balancer when client stop sending loads
|
||||
* to them.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context *
|
||||
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
|
||||
*/
|
||||
void removeLocality(XdsLocality locality);
|
||||
|
||||
/**
|
||||
* Applies client side load recording to {@link PickResult}s picked by the intra-locality picker
|
||||
* for the provided locality. If the provided locality is not tracked, the original
|
||||
* {@link PickResult} will be returned.
|
||||
*
|
||||
* <p>This method is thread-safe.
|
||||
*/
|
||||
PickResult interceptPickResult(PickResult pickResult, XdsLocality locality);
|
||||
|
||||
/**
|
||||
* Returns the {@link StatsCounter} that does locality level stats aggregation for the provided
|
||||
* locality. If the provided locality is not tracked, {@code null} will be returned.
|
||||
*
|
||||
* <p>This method is thread-safe.
|
||||
*/
|
||||
@Nullable
|
||||
StatsCounter getLocalityCounter(XdsLocality locality);
|
||||
|
||||
/**
|
||||
* Records a drop decision made by a {@link io.grpc.LoadBalancer.SubchannelPicker} instance
|
||||
* with the provided category. Drops are aggregated in service level.
|
||||
*
|
||||
* <p>This method is thread-safe.
|
||||
*/
|
||||
void recordDroppedRequest(String category);
|
||||
}
|
||||
|
|
@ -59,54 +59,6 @@ final class XdsComms {
|
|||
// never null
|
||||
private AdsStream adsStream;
|
||||
|
||||
static final class Locality {
|
||||
final String region;
|
||||
final String zone;
|
||||
final String subzone;
|
||||
|
||||
Locality(io.envoyproxy.envoy.api.v2.core.Locality locality) {
|
||||
this(
|
||||
/* region = */ locality.getRegion(),
|
||||
/* zone = */ locality.getZone(),
|
||||
/* subzone = */ locality.getSubZone());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Locality(String region, String zone, String subzone) {
|
||||
this.region = region;
|
||||
this.zone = zone;
|
||||
this.subzone = subzone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Locality locality = (Locality) o;
|
||||
return Objects.equal(region, locality.region)
|
||||
&& Objects.equal(zone, locality.zone)
|
||||
&& Objects.equal(subzone, locality.subzone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(region, zone, subzone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("region", region)
|
||||
.add("zone", zone)
|
||||
.add("subzone", subzone)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Information about the locality from EDS response.
|
||||
*/
|
||||
|
|
@ -266,11 +218,11 @@ final class XdsComms {
|
|||
localityStore.updateDropPercentage(dropOverloads);
|
||||
|
||||
List<LocalityLbEndpoints> localities = clusterLoadAssignment.getEndpointsList();
|
||||
Map<Locality, LocalityInfo> localityEndpointsMapping = new LinkedHashMap<>();
|
||||
Map<XdsLocality, LocalityInfo> localityEndpointsMapping = new LinkedHashMap<>();
|
||||
for (LocalityLbEndpoints localityLbEndpoints : localities) {
|
||||
io.envoyproxy.envoy.api.v2.core.Locality localityProto =
|
||||
localityLbEndpoints.getLocality();
|
||||
Locality locality = new Locality(localityProto);
|
||||
XdsLocality locality = XdsLocality.fromLocalityProto(localityProto);
|
||||
List<LbEndpoint> lbEndPoints = new ArrayList<>();
|
||||
for (io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint lbEndpoint
|
||||
: localityLbEndpoints.getLbEndpointsList()) {
|
||||
|
|
|
|||
|
|
@ -16,8 +16,6 @@
|
|||
|
||||
package io.grpc.xds;
|
||||
|
||||
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
/**
|
||||
|
|
@ -36,7 +34,7 @@ interface XdsLoadReportClient {
|
|||
* XdsLoadReportClient} and should only be called once.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* returned by {@link XdsLoadBalancer#helper#getSynchronizationContext}.
|
||||
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
|
||||
*/
|
||||
void startLoadReporting();
|
||||
|
||||
|
|
@ -46,44 +44,7 @@ interface XdsLoadReportClient {
|
|||
* <p>No method in {@link XdsLoadReportClient} should be called after calling this method.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* returned by {@link XdsLoadBalancer#helper#getSynchronizationContext}.
|
||||
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
|
||||
*/
|
||||
void stopLoadReporting();
|
||||
|
||||
/**
|
||||
* Applies client side load recording to {@link PickResult}s picked by the intra-locality picker
|
||||
* for the provided locality.
|
||||
*
|
||||
* <p>This method is thread-safe.
|
||||
*/
|
||||
PickResult interceptPickResult(PickResult pickResult, Locality locality);
|
||||
|
||||
/**
|
||||
* Tracks load stats for endpoints in the provided locality. To be called upon balancer locality
|
||||
* updates only for newly assigned localities. Only load stats for endpoints in added localities
|
||||
* will be reported to the remote balancer.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* returned by {@link XdsLoadBalancer#helper#getSynchronizationContext}.
|
||||
*/
|
||||
void addLocality(Locality locality);
|
||||
|
||||
/**
|
||||
* Stops tracking load stats for endpoints in the provided locality. To be called upon balancer
|
||||
* locality updates only for newly removed localities. Load stats for endpoints in removed
|
||||
* localities will no longer be reported to the remote balancer when client stop sending loads to
|
||||
* them.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context *
|
||||
* returned by {@link XdsLoadBalancer#helper#getSynchronizationContext}.
|
||||
*/
|
||||
void removeLocality(Locality locality);
|
||||
|
||||
/**
|
||||
* Records a client-side request drop with the provided category instructed by the remote
|
||||
* balancer. Stats for dropped requests are aggregated in cluster level.
|
||||
*
|
||||
* <p>This method is thread-safe.
|
||||
*/
|
||||
void recordDroppedRequest(String category);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import com.google.common.base.Supplier;
|
|||
import com.google.protobuf.Struct;
|
||||
import com.google.protobuf.Value;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||
import io.envoyproxy.envoy.api.v2.core.Node;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
|
||||
|
|
@ -34,20 +33,14 @@ import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
|
|||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.xds.ClientLoadCounter.XdsClientLoadRecorder;
|
||||
import io.grpc.xds.XdsLoadStatsStore.StatsCounter;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
|
@ -57,25 +50,15 @@ import javax.annotation.concurrent.NotThreadSafe;
|
|||
|
||||
/**
|
||||
* Client of XDS load reporting service. Methods in this class are expected to be called in
|
||||
* the same synchronized context that {@link XdsLoadBalancer#helper#getSynchronizationContext}
|
||||
* the same synchronized context that {@link XdsLoadBalancer.Helper#getSynchronizationContext}
|
||||
* returns.
|
||||
*/
|
||||
@NotThreadSafe
|
||||
final class XdsLoadReportClientImpl implements XdsLoadReportClient {
|
||||
|
||||
@VisibleForTesting
|
||||
static final String TRAFFICDIRECTOR_HOSTNAME_FIELD
|
||||
static final String TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD
|
||||
= "com.googleapis.trafficdirector.grpc_hostname";
|
||||
private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER =
|
||||
new ClientStreamTracer() {
|
||||
};
|
||||
private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
|
||||
new ClientStreamTracer.Factory() {
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||
return NOOP_CLIENT_STREAM_TRACER;
|
||||
}
|
||||
};
|
||||
|
||||
private final String serviceName;
|
||||
private final ManagedChannel channel;
|
||||
|
|
@ -139,44 +122,6 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
|
|||
// Do not shutdown channel as it is not owned by LrsClient.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLocality(Locality locality) {
|
||||
checkState(started, "load reporting must be started first");
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
statsStore.addLocality(locality);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeLocality(final Locality locality) {
|
||||
checkState(started, "load reporting must be started first");
|
||||
syncContext.throwIfNotInThisSynchronizationContext();
|
||||
statsStore.removeLocality(locality);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordDroppedRequest(String category) {
|
||||
checkState(started, "load reporting must be started first");
|
||||
statsStore.recordDroppedRequest(category);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult interceptPickResult(PickResult pickResult, Locality locality) {
|
||||
checkState(started, "load reporting must be started first");
|
||||
if (!pickResult.getStatus().isOk()) {
|
||||
return pickResult;
|
||||
}
|
||||
StatsCounter counter = statsStore.getLocalityCounter(locality);
|
||||
if (counter == null) {
|
||||
return pickResult;
|
||||
}
|
||||
ClientStreamTracer.Factory originFactory = pickResult.getStreamTracerFactory();
|
||||
if (originFactory == null) {
|
||||
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
|
||||
}
|
||||
XdsClientLoadRecorder recorder = new XdsClientLoadRecorder(counter, originFactory);
|
||||
return PickResult.withSubchannel(pickResult.getSubchannel(), recorder);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class LoadReportingTask implements Runnable {
|
||||
private final LrsStream stream;
|
||||
|
|
@ -232,7 +177,7 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
|
|||
.setNode(Node.newBuilder()
|
||||
.setMetadata(Struct.newBuilder()
|
||||
.putFields(
|
||||
TRAFFICDIRECTOR_HOSTNAME_FIELD,
|
||||
TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
|
||||
Value.newBuilder().setStringValue(serviceName).build())))
|
||||
.build();
|
||||
lrsRequestWriter.onNext(initRequest);
|
||||
|
|
@ -283,7 +228,7 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
|
|||
.setNode(Node.newBuilder()
|
||||
.setMetadata(Struct.newBuilder()
|
||||
.putFields(
|
||||
TRAFFICDIRECTOR_HOSTNAME_FIELD,
|
||||
TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
|
||||
Value.newBuilder().setStringValue(serviceName).build())))
|
||||
.addClusterStats(report)
|
||||
.build());
|
||||
|
|
@ -382,19 +327,4 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for client side load stats store.
|
||||
*/
|
||||
interface StatsStore {
|
||||
ClusterStats generateLoadReport();
|
||||
|
||||
void addLocality(Locality locality);
|
||||
|
||||
void removeLocality(Locality locality);
|
||||
|
||||
StatsCounter getLocalityCounter(Locality locality);
|
||||
|
||||
void recordDroppedRequest(String category);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,15 +20,18 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.ClientStreamTracer.StreamInfo;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
|
||||
import io.grpc.xds.ClientLoadCounter.MetricValue;
|
||||
import io.grpc.xds.XdsLoadReportClientImpl.StatsStore;
|
||||
import io.grpc.xds.ClientLoadCounter.XdsClientLoadRecorder;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
|
@ -41,19 +44,30 @@ import javax.annotation.concurrent.NotThreadSafe;
|
|||
@NotThreadSafe
|
||||
final class XdsLoadStatsStore implements StatsStore {
|
||||
|
||||
private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER =
|
||||
new ClientStreamTracer() {
|
||||
};
|
||||
private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY =
|
||||
new ClientStreamTracer.Factory() {
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||
return NOOP_CLIENT_STREAM_TRACER;
|
||||
}
|
||||
};
|
||||
|
||||
private final String clusterName;
|
||||
private final ConcurrentMap<Locality, StatsCounter> localityLoadCounters;
|
||||
private final ConcurrentMap<XdsLocality, StatsCounter> localityLoadCounters;
|
||||
// Cluster level dropped request counts for each category specified in the DropOverload policy.
|
||||
private final ConcurrentMap<String, AtomicLong> dropCounters;
|
||||
|
||||
XdsLoadStatsStore(String clusterName) {
|
||||
this(clusterName, new ConcurrentHashMap<Locality, StatsCounter>(),
|
||||
this(clusterName, new ConcurrentHashMap<XdsLocality, StatsCounter>(),
|
||||
new ConcurrentHashMap<String, AtomicLong>());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
XdsLoadStatsStore(String clusterName,
|
||||
ConcurrentMap<Locality, StatsCounter> localityLoadCounters,
|
||||
ConcurrentMap<XdsLocality, StatsCounter> localityLoadCounters,
|
||||
ConcurrentMap<String, AtomicLong> dropCounters) {
|
||||
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||
this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters");
|
||||
|
|
@ -63,16 +77,14 @@ final class XdsLoadStatsStore implements StatsStore {
|
|||
/**
|
||||
* Generates a {@link ClusterStats} containing client side load stats and backend metrics
|
||||
* (if any) in locality granularity.
|
||||
* This method should be called in the same synchronized context that
|
||||
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||
*/
|
||||
@Override
|
||||
public ClusterStats generateLoadReport() {
|
||||
ClusterStats.Builder statsBuilder = ClusterStats.newBuilder().setClusterName(clusterName);
|
||||
for (Map.Entry<Locality, StatsCounter> entry : localityLoadCounters.entrySet()) {
|
||||
for (Map.Entry<XdsLocality, StatsCounter> entry : localityLoadCounters.entrySet()) {
|
||||
ClientLoadSnapshot snapshot = entry.getValue().snapshot();
|
||||
UpstreamLocalityStats.Builder localityStatsBuilder =
|
||||
UpstreamLocalityStats.newBuilder().setLocality(entry.getKey());
|
||||
UpstreamLocalityStats.newBuilder().setLocality(entry.getKey().toLocalityProto());
|
||||
localityStatsBuilder
|
||||
.setTotalSuccessfulRequests(snapshot.getCallsSucceeded())
|
||||
.setTotalErrorRequests(snapshot.getCallsFailed())
|
||||
|
|
@ -106,13 +118,10 @@ final class XdsLoadStatsStore implements StatsStore {
|
|||
|
||||
/**
|
||||
* Create a {@link ClientLoadCounter} for the provided locality or make it active if already in
|
||||
* this {@link XdsLoadStatsStore}. This method needs to be called at locality updates only for
|
||||
* newly assigned localities in balancer discovery responses.
|
||||
* This method should be called in the same synchronized context that
|
||||
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||
* this {@link XdsLoadStatsStore}.
|
||||
*/
|
||||
@Override
|
||||
public void addLocality(final Locality locality) {
|
||||
public void addLocality(final XdsLocality locality) {
|
||||
StatsCounter counter = localityLoadCounters.get(locality);
|
||||
checkState(counter == null || !counter.isActive(),
|
||||
"An active counter for locality %s already exists", locality);
|
||||
|
|
@ -125,33 +134,21 @@ final class XdsLoadStatsStore implements StatsStore {
|
|||
|
||||
/**
|
||||
* Deactivate the {@link StatsCounter} for the provided locality in by this
|
||||
* {@link XdsLoadStatsStore}. Inactive {@link StatsCounter}s are for localities
|
||||
* no longer exposed by the remote balancer. This method needs to be called at
|
||||
* locality updates only for localities newly removed from balancer discovery responses.
|
||||
* This method should be called in the same synchronized context that
|
||||
* {@link XdsLoadBalancer#helper#getSynchronizationContext} returns.
|
||||
* {@link XdsLoadStatsStore}.
|
||||
*/
|
||||
@Override
|
||||
public void removeLocality(final Locality locality) {
|
||||
public void removeLocality(final XdsLocality locality) {
|
||||
StatsCounter counter = localityLoadCounters.get(locality);
|
||||
checkState(counter != null && counter.isActive(),
|
||||
"No active counter for locality %s exists", locality);
|
||||
counter.setActive(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link StatsCounter} instance that is responsible for aggregating load
|
||||
* stats for the provided locality, or {@code null} if the locality is untracked.
|
||||
*/
|
||||
@Override
|
||||
public StatsCounter getLocalityCounter(final Locality locality) {
|
||||
public StatsCounter getLocalityCounter(final XdsLocality locality) {
|
||||
return localityLoadCounters.get(locality);
|
||||
}
|
||||
|
||||
/**
|
||||
* Record that a request has been dropped by drop overload policy with the provided category
|
||||
* instructed by the remote balancer.
|
||||
*/
|
||||
@Override
|
||||
public void recordDroppedRequest(String category) {
|
||||
AtomicLong counter = dropCounters.get(category);
|
||||
|
|
@ -164,6 +161,24 @@ final class XdsLoadStatsStore implements StatsStore {
|
|||
counter.getAndIncrement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickResult interceptPickResult(PickResult pickResult, XdsLocality locality) {
|
||||
if (!pickResult.getStatus().isOk()) {
|
||||
return pickResult;
|
||||
}
|
||||
StatsCounter counter = localityLoadCounters.get(locality);
|
||||
if (counter == null) {
|
||||
return pickResult;
|
||||
}
|
||||
ClientStreamTracer.Factory originFactory = pickResult.getStreamTracerFactory();
|
||||
if (originFactory == null) {
|
||||
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
|
||||
}
|
||||
XdsClientLoadRecorder recorder = new XdsClientLoadRecorder(counter, originFactory);
|
||||
return PickResult.withSubchannel(pickResult.getSubchannel(), recorder);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Blueprint for counters that can can record number of calls in-progress, succeeded, failed,
|
||||
* issued and backend metrics.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
/**
|
||||
* An {@code XdsLocality} object is simply a POJO representation for {@link
|
||||
* io.envoyproxy.envoy.api.v2.core.Locality}, with only details needed for {@link XdsLoadBalancer}.
|
||||
*/
|
||||
final class XdsLocality {
|
||||
private final String region;
|
||||
private final String zone;
|
||||
private final String subzone;
|
||||
|
||||
/** Must only be used for testing. */
|
||||
@VisibleForTesting
|
||||
XdsLocality(String region, String zone, String subzone) {
|
||||
this.region = region;
|
||||
this.zone = zone;
|
||||
this.subzone = subzone;
|
||||
}
|
||||
|
||||
static XdsLocality fromLocalityProto(io.envoyproxy.envoy.api.v2.core.Locality locality) {
|
||||
return new XdsLocality(
|
||||
/* region = */ locality.getRegion(),
|
||||
/* zone = */ locality.getZone(),
|
||||
/* subzone = */ locality.getSubZone());
|
||||
}
|
||||
|
||||
io.envoyproxy.envoy.api.v2.core.Locality toLocalityProto() {
|
||||
return io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
|
||||
.setRegion(region)
|
||||
.setZone(zone)
|
||||
.setSubZone(subzone)
|
||||
.build();
|
||||
}
|
||||
|
||||
String getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
String getZone() {
|
||||
return zone;
|
||||
}
|
||||
|
||||
String getSubzone() {
|
||||
return subzone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
XdsLocality locality = (XdsLocality) o;
|
||||
return Objects.equal(region, locality.region)
|
||||
&& Objects.equal(zone, locality.zone)
|
||||
&& Objects.equal(subzone, locality.subzone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(region, zone, subzone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("region", region)
|
||||
.add("zone", zone)
|
||||
.add("subzone", subzone)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -48,7 +48,6 @@ import io.grpc.xds.LocalityStore.LocalityStoreImpl;
|
|||
import io.grpc.xds.LocalityStore.LocalityStoreImpl.PickerFactory;
|
||||
import io.grpc.xds.XdsComms.DropOverload;
|
||||
import io.grpc.xds.XdsComms.LbEndpoint;
|
||||
import io.grpc.xds.XdsComms.Locality;
|
||||
import io.grpc.xds.XdsComms.LocalityInfo;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -121,10 +120,10 @@ public class LocalityStoreTest {
|
|||
|
||||
private final FakePickerFactory pickerFactory = new FakePickerFactory();
|
||||
|
||||
private final Locality locality1 = new Locality("r1", "z1", "sz1");
|
||||
private final Locality locality2 = new Locality("r2", "z2", "sz2");
|
||||
private final Locality locality3 = new Locality("r3", "z3", "sz3");
|
||||
private final Locality locality4 = new Locality("r4", "z4", "sz4");
|
||||
private final XdsLocality locality1 = new XdsLocality("r1", "z1", "sz1");
|
||||
private final XdsLocality locality2 = new XdsLocality("r2", "z2", "sz2");
|
||||
private final XdsLocality locality3 = new XdsLocality("r3", "z3", "sz3");
|
||||
private final XdsLocality locality4 = new XdsLocality("r4", "z4", "sz4");
|
||||
|
||||
private final EquivalentAddressGroup eag11 =
|
||||
new EquivalentAddressGroup(new InetSocketAddress("addr11", 11));
|
||||
|
|
@ -178,7 +177,7 @@ public class LocalityStoreTest {
|
|||
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
|
||||
LocalityInfo localityInfo3 =
|
||||
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
|
||||
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
Map<XdsLocality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
|
||||
localityStore.updateLocalityStore(localityInfoMap);
|
||||
|
||||
|
|
@ -287,7 +286,7 @@ public class LocalityStoreTest {
|
|||
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
|
||||
LocalityInfo localityInfo3 =
|
||||
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
|
||||
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
Map<XdsLocality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
|
||||
localityStore.updateLocalityStore(localityInfoMap);
|
||||
|
||||
|
|
@ -378,7 +377,7 @@ public class LocalityStoreTest {
|
|||
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
|
||||
LocalityInfo localityInfo3 =
|
||||
new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3);
|
||||
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
Map<XdsLocality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
locality1, localityInfo1, locality2, localityInfo2, locality3, localityInfo3);
|
||||
localityStore.updateLocalityStore(localityInfoMap);
|
||||
|
||||
|
|
@ -397,7 +396,7 @@ public class LocalityStoreTest {
|
|||
new LocalityInfo(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1);
|
||||
LocalityInfo localityInfo2 =
|
||||
new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2);
|
||||
Map<Locality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
Map<XdsLocality, LocalityInfo> localityInfoMap = ImmutableMap.of(
|
||||
locality1, localityInfo1, locality2, localityInfo2);
|
||||
localityStore.updateLocalityStore(localityInfoMap);
|
||||
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ public class XdsCommsTest {
|
|||
@Mock
|
||||
private LocalityStore localityStore;
|
||||
@Captor
|
||||
private ArgumentCaptor<Map<XdsComms.Locality, LocalityInfo>> localityEndpointsMappingCaptor;
|
||||
private ArgumentCaptor<Map<XdsLocality, LocalityInfo>> localityEndpointsMappingCaptor;
|
||||
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
|
|
@ -254,7 +254,7 @@ public class XdsCommsTest {
|
|||
|
||||
verify(adsStreamCallback).onWorking();
|
||||
|
||||
XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1);
|
||||
XdsLocality locality1 = XdsLocality.fromLocalityProto(localityProto1);
|
||||
LocalityInfo localityInfo1 = new LocalityInfo(
|
||||
ImmutableList.of(
|
||||
new XdsComms.LbEndpoint(endpoint11),
|
||||
|
|
@ -265,7 +265,7 @@ public class XdsCommsTest {
|
|||
new XdsComms.LbEndpoint(endpoint21),
|
||||
new XdsComms.LbEndpoint(endpoint22)),
|
||||
2);
|
||||
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
|
||||
XdsLocality locality2 = XdsLocality.fromLocalityProto(localityProto2);
|
||||
|
||||
InOrder inOrder = inOrder(localityStore);
|
||||
inOrder.verify(localityStore).updateDropPercentage(ImmutableList.<DropOverload>of());
|
||||
|
|
@ -369,12 +369,12 @@ public class XdsCommsTest {
|
|||
new DropOverload("fake_category", 78_00_00)));
|
||||
inOrder.verify(localityStore).updateLocalityStore(localityEndpointsMappingCaptor.capture());
|
||||
|
||||
XdsComms.Locality locality1 = new XdsComms.Locality(localityProto1);
|
||||
XdsLocality locality1 = XdsLocality.fromLocalityProto(localityProto1);
|
||||
LocalityInfo localityInfo1 = new LocalityInfo(
|
||||
ImmutableList.of(new XdsComms.LbEndpoint(endpoint11)), 1);
|
||||
LocalityInfo localityInfo2 = new LocalityInfo(
|
||||
ImmutableList.of(new XdsComms.LbEndpoint(endpoint21)), 2);
|
||||
XdsComms.Locality locality2 = new XdsComms.Locality(localityProto2);
|
||||
XdsLocality locality2 = XdsLocality.fromLocalityProto(localityProto2);
|
||||
assertThat(localityEndpointsMappingCaptor.getValue()).containsExactly(
|
||||
locality2, localityInfo2, locality1, localityInfo1).inOrder();
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.mockito.AdditionalAnswers.delegatesTo;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
|
@ -29,7 +28,6 @@ import static org.mockito.Mockito.never;
|
|||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
|
|
@ -45,12 +43,8 @@ import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
|
|||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
|
||||
import io.grpc.ChannelLogger;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
|
|
@ -59,7 +53,6 @@ import io.grpc.internal.BackoffPolicy;
|
|||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.GrpcCleanupRule;
|
||||
import io.grpc.xds.XdsLoadReportClientImpl.StatsStore;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
|
@ -107,8 +100,6 @@ public class XdsLoadReportClientImplTest {
|
|||
.setZone("test_zone")
|
||||
.setSubZone("test_subzone")
|
||||
.build();
|
||||
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
|
||||
ClientStreamTracer.StreamInfo.newBuilder().build();
|
||||
@Rule
|
||||
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
||||
private final SynchronizationContext syncContext = new SynchronizationContext(
|
||||
|
|
@ -145,7 +136,7 @@ public class XdsLoadReportClientImplTest {
|
|||
.setNode(Node.newBuilder()
|
||||
.setMetadata(Struct.newBuilder()
|
||||
.putFields(
|
||||
XdsLoadReportClientImpl.TRAFFICDIRECTOR_HOSTNAME_FIELD,
|
||||
XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
|
||||
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())))
|
||||
.build();
|
||||
@Mock
|
||||
|
|
@ -155,8 +146,6 @@ public class XdsLoadReportClientImplTest {
|
|||
@Mock
|
||||
private BackoffPolicy backoffPolicy2;
|
||||
@Mock
|
||||
private Subchannel mockSubchannel;
|
||||
@Mock
|
||||
private StatsStore statsStore;
|
||||
|
||||
private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) {
|
||||
|
|
@ -236,54 +225,13 @@ public class XdsLoadReportClientImplTest {
|
|||
assertEquals(report.getNode(), Node.newBuilder()
|
||||
.setMetadata(Struct.newBuilder()
|
||||
.putFields(
|
||||
XdsLoadReportClientImpl.TRAFFICDIRECTOR_HOSTNAME_FIELD,
|
||||
XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
|
||||
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))
|
||||
.build());
|
||||
assertEquals(1, report.getClusterStatsCount());
|
||||
assertThat(report.getClusterStats(0)).isEqualTo(expectedStats);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadNotRecordedForUntrackedLocality() {
|
||||
when(statsStore.getLocalityCounter(TEST_LOCALITY)).thenReturn(null);
|
||||
PickResult pickResult = PickResult.withSubchannel(mockSubchannel);
|
||||
// If the per-locality counter does not exist, nothing should happen.
|
||||
PickResult interceptedPickResult = lrsClient.interceptPickResult(pickResult, TEST_LOCALITY);
|
||||
verify(statsStore).getLocalityCounter(TEST_LOCALITY);
|
||||
assertThat(interceptedPickResult.getStreamTracerFactory()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidPickResultNotIntercepted() {
|
||||
PickResult errorResult = PickResult.withError(Status.UNAVAILABLE.withDescription("Error"));
|
||||
PickResult droppedResult = PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped"));
|
||||
// TODO (chengyuanzhang): for NoResult PickResult, do we still intercept?
|
||||
PickResult interceptedErrorResult = lrsClient.interceptPickResult(errorResult, TEST_LOCALITY);
|
||||
PickResult interceptedDroppedResult =
|
||||
lrsClient.interceptPickResult(droppedResult, TEST_LOCALITY);
|
||||
assertThat(interceptedErrorResult.getStreamTracerFactory()).isNull();
|
||||
assertThat(interceptedDroppedResult.getStreamTracerFactory()).isNull();
|
||||
verifyZeroInteractions(statsStore);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void interceptPreservesOriginStreamTracer() {
|
||||
ClientStreamTracer.Factory mockFactory = mock(ClientStreamTracer.Factory.class);
|
||||
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
|
||||
when(mockFactory
|
||||
.newClientStreamTracer(any(ClientStreamTracer.StreamInfo.class), any(Metadata.class)))
|
||||
.thenReturn(mockTracer);
|
||||
when(statsStore.getLocalityCounter(TEST_LOCALITY)).thenReturn(new ClientLoadCounter());
|
||||
PickResult pickResult = PickResult.withSubchannel(mockSubchannel, mockFactory);
|
||||
PickResult interceptedPickResult = lrsClient.interceptPickResult(pickResult, TEST_LOCALITY);
|
||||
verify(statsStore).getLocalityCounter(TEST_LOCALITY);
|
||||
Metadata metadata = new Metadata();
|
||||
interceptedPickResult.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, metadata)
|
||||
.streamClosed(Status.OK);
|
||||
verify(mockFactory).newClientStreamTracer(same(STREAM_INFO), same(metadata));
|
||||
verify(mockTracer).streamClosed(Status.OK);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadReportInitialRequest() {
|
||||
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||
|
|
|
|||
|
|
@ -17,16 +17,22 @@
|
|||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
|
||||
import io.grpc.xds.ClientLoadCounter.MetricValue;
|
||||
import io.grpc.xds.XdsLoadStatsStore.StatsCounter;
|
||||
|
|
@ -51,19 +57,14 @@ import org.junit.runners.JUnit4;
|
|||
@RunWith(JUnit4.class)
|
||||
public class XdsLoadStatsStoreTest {
|
||||
private static final String SERVICE_NAME = "api.google.com";
|
||||
private static final Locality LOCALITY1 =
|
||||
Locality.newBuilder()
|
||||
.setRegion("test_region1")
|
||||
.setZone("test_zone")
|
||||
.setSubZone("test_subzone")
|
||||
.build();
|
||||
private static final Locality LOCALITY2 =
|
||||
Locality.newBuilder()
|
||||
.setRegion("test_region2")
|
||||
.setZone("test_zone")
|
||||
.setSubZone("test_subzone")
|
||||
.build();
|
||||
private ConcurrentMap<Locality, StatsCounter> localityLoadCounters;
|
||||
private static final XdsLocality LOCALITY1 =
|
||||
new XdsLocality("test_region1", "test_zone", "test_subzone");
|
||||
private static final XdsLocality LOCALITY2 =
|
||||
new XdsLocality("test_region2", "test_zone", "test_subzone");
|
||||
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
|
||||
ClientStreamTracer.StreamInfo.newBuilder().build();
|
||||
private Subchannel mockSubchannel = mock(Subchannel.class);
|
||||
private ConcurrentMap<XdsLocality, StatsCounter> localityLoadCounters;
|
||||
private ConcurrentMap<String, AtomicLong> dropCounters;
|
||||
private XdsLoadStatsStore loadStore;
|
||||
|
||||
|
|
@ -87,7 +88,7 @@ public class XdsLoadStatsStoreTest {
|
|||
return res;
|
||||
}
|
||||
|
||||
private static UpstreamLocalityStats buildUpstreamLocalityStats(Locality locality,
|
||||
private static UpstreamLocalityStats buildUpstreamLocalityStats(XdsLocality locality,
|
||||
long callsSucceed,
|
||||
long callsInProgress,
|
||||
long callsFailed,
|
||||
|
|
@ -95,7 +96,7 @@ public class XdsLoadStatsStoreTest {
|
|||
@Nullable List<EndpointLoadMetricStats> metrics) {
|
||||
UpstreamLocalityStats.Builder builder =
|
||||
UpstreamLocalityStats.newBuilder()
|
||||
.setLocality(locality)
|
||||
.setLocality(locality.toLocalityProto())
|
||||
.setTotalSuccessfulRequests(callsSucceed)
|
||||
.setTotalErrorRequests(callsFailed)
|
||||
.setTotalRequestsInProgress(callsInProgress)
|
||||
|
|
@ -146,7 +147,8 @@ public class XdsLoadStatsStoreTest {
|
|||
private static void assertUpstreamLocalityStatsListsEqual(List<UpstreamLocalityStats> expected,
|
||||
List<UpstreamLocalityStats> actual) {
|
||||
assertThat(actual).hasSize(expected.size());
|
||||
Map<Locality, UpstreamLocalityStats> expectedLocalityStats = new HashMap<>();
|
||||
Map<io.envoyproxy.envoy.api.v2.core.Locality, UpstreamLocalityStats> expectedLocalityStats =
|
||||
new HashMap<>();
|
||||
for (UpstreamLocalityStats stats : expected) {
|
||||
expectedLocalityStats.put(stats.getLocality(), stats);
|
||||
}
|
||||
|
|
@ -297,4 +299,41 @@ public class XdsLoadStatsStoreTest {
|
|||
assertThat(dropCounters.get("lb").get()).isEqualTo(0);
|
||||
assertThat(dropCounters.get("throttle").get()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadNotRecordedForUntrackedLocality() {
|
||||
PickResult pickResult = PickResult.withSubchannel(mockSubchannel);
|
||||
// If the per-locality counter does not exist, nothing should happen.
|
||||
PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, LOCALITY1);
|
||||
assertThat(interceptedPickResult.getStreamTracerFactory()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidPickResultNotIntercepted() {
|
||||
PickResult errorResult = PickResult.withError(Status.UNAVAILABLE.withDescription("Error"));
|
||||
PickResult droppedResult = PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped"));
|
||||
// TODO (chengyuanzhang): for NoResult PickResult, do we still intercept?
|
||||
PickResult interceptedErrorResult = loadStore.interceptPickResult(errorResult, LOCALITY1);
|
||||
PickResult interceptedDroppedResult =
|
||||
loadStore.interceptPickResult(droppedResult, LOCALITY1);
|
||||
assertThat(interceptedErrorResult.getStreamTracerFactory()).isNull();
|
||||
assertThat(interceptedDroppedResult.getStreamTracerFactory()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void interceptPreservesOriginStreamTracer() {
|
||||
ClientStreamTracer.Factory mockFactory = mock(ClientStreamTracer.Factory.class);
|
||||
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
|
||||
when(mockFactory
|
||||
.newClientStreamTracer(any(ClientStreamTracer.StreamInfo.class), any(Metadata.class)))
|
||||
.thenReturn(mockTracer);
|
||||
localityLoadCounters.put(LOCALITY1, new ClientLoadCounter());
|
||||
PickResult pickResult = PickResult.withSubchannel(mockSubchannel, mockFactory);
|
||||
PickResult interceptedPickResult = loadStore.interceptPickResult(pickResult, LOCALITY1);
|
||||
Metadata metadata = new Metadata();
|
||||
interceptedPickResult.getStreamTracerFactory().newClientStreamTracer(STREAM_INFO, metadata)
|
||||
.streamClosed(Status.OK);
|
||||
verify(mockFactory).newClientStreamTracer(same(STREAM_INFO), same(metadata));
|
||||
verify(mockTracer).streamClosed(Status.OK);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.testing.EqualsTester;
|
||||
import io.envoyproxy.envoy.api.v2.core.Locality;
|
||||
import org.junit.Test;
|
||||
|
||||
public class XdsLocalityTest {
|
||||
|
||||
@Test
|
||||
public void convertToAndFromLocalityProto() {
|
||||
Locality locality =
|
||||
Locality.newBuilder()
|
||||
.setRegion("test_region")
|
||||
.setZone("test_zone")
|
||||
.setSubZone("test_subzone")
|
||||
.build();
|
||||
XdsLocality xdsLocality = XdsLocality.fromLocalityProto(locality);
|
||||
assertThat(xdsLocality.getRegion()).isEqualTo("test_region");
|
||||
assertThat(xdsLocality.getZone()).isEqualTo("test_zone");
|
||||
assertThat(xdsLocality.getSubzone()).isEqualTo("test_subzone");
|
||||
|
||||
Locality convertedLocality = xdsLocality.toLocalityProto();
|
||||
assertThat(convertedLocality.getRegion()).isEqualTo("test_region");
|
||||
assertThat(convertedLocality.getZone()).isEqualTo("test_zone");
|
||||
assertThat(convertedLocality.getSubZone()).isEqualTo("test_subzone");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void equal() {
|
||||
new EqualsTester()
|
||||
.addEqualityGroup(
|
||||
new XdsLocality("region-a", "zone-a", "subzone-a"),
|
||||
new XdsLocality("region-a", "zone-a", "subzone-a"))
|
||||
.addEqualityGroup(
|
||||
new XdsLocality("region", "zone", "subzone")
|
||||
)
|
||||
.addEqualityGroup(
|
||||
new XdsLocality("", "", ""),
|
||||
new XdsLocality("", "", ""))
|
||||
.testEquals();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hash() {
|
||||
assertThat(new XdsLocality("region", "zone", "subzone").hashCode())
|
||||
.isEqualTo(new XdsLocality("region", "zone","subzone").hashCode());
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue