mirror of https://github.com/grpc/grpc-java.git
xds: Include locality label in WRR metrics (#11170)
This commit is contained in:
parent
54ac06ae30
commit
2bc4306940
|
|
@ -82,6 +82,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
private final AtomicInteger sequence;
|
private final AtomicInteger sequence;
|
||||||
private final long infTime;
|
private final long infTime;
|
||||||
private final Ticker ticker;
|
private final Ticker ticker;
|
||||||
|
private String locality = "";
|
||||||
|
|
||||||
// The metric instruments are only registered once and shared by all instances of this LB.
|
// The metric instruments are only registered once and shared by all instances of this LB.
|
||||||
static {
|
static {
|
||||||
|
|
@ -147,6 +148,12 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
handleNameResolutionError(unavailableStatus);
|
handleNameResolutionError(unavailableStatus);
|
||||||
return unavailableStatus;
|
return unavailableStatus;
|
||||||
}
|
}
|
||||||
|
String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME);
|
||||||
|
if (locality != null) {
|
||||||
|
this.locality = locality;
|
||||||
|
} else {
|
||||||
|
this.locality = "";
|
||||||
|
}
|
||||||
config =
|
config =
|
||||||
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||||
AcceptResolvedAddrRetVal acceptRetVal;
|
AcceptResolvedAddrRetVal acceptRetVal;
|
||||||
|
|
@ -179,7 +186,8 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
|
public SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList) {
|
||||||
return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
|
return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList),
|
||||||
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper());
|
config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(),
|
||||||
|
locality);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
@ -373,10 +381,12 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
private final AtomicInteger sequence;
|
private final AtomicInteger sequence;
|
||||||
private final int hashCode;
|
private final int hashCode;
|
||||||
private final LoadBalancer.Helper helper;
|
private final LoadBalancer.Helper helper;
|
||||||
|
private final String locality;
|
||||||
private volatile StaticStrideScheduler scheduler;
|
private volatile StaticStrideScheduler scheduler;
|
||||||
|
|
||||||
WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
|
WeightedRoundRobinPicker(List<ChildLbState> children, boolean enableOobLoadReport,
|
||||||
float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) {
|
float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper,
|
||||||
|
String locality) {
|
||||||
checkNotNull(children, "children");
|
checkNotNull(children, "children");
|
||||||
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
|
Preconditions.checkArgument(!children.isEmpty(), "empty child list");
|
||||||
this.children = children;
|
this.children = children;
|
||||||
|
|
@ -391,6 +401,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
this.errorUtilizationPenalty = errorUtilizationPenalty;
|
this.errorUtilizationPenalty = errorUtilizationPenalty;
|
||||||
this.sequence = checkNotNull(sequence, "sequence");
|
this.sequence = checkNotNull(sequence, "sequence");
|
||||||
this.helper = helper;
|
this.helper = helper;
|
||||||
|
this.locality = checkNotNull(locality, "locality");
|
||||||
|
|
||||||
// For equality we treat children as a set; use hash code as defined by Set
|
// For equality we treat children as a set; use hash code as defined by Set
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
|
|
@ -434,7 +445,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
helper.getMetricRecorder()
|
helper.getMetricRecorder()
|
||||||
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
|
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
|
||||||
ImmutableList.of(helper.getChannelTarget()),
|
ImmutableList.of(helper.getChannelTarget()),
|
||||||
ImmutableList.of(""));
|
ImmutableList.of(locality));
|
||||||
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
|
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
|
||||||
}
|
}
|
||||||
if (staleEndpoints.get() > 0) {
|
if (staleEndpoints.get() > 0) {
|
||||||
|
|
@ -442,13 +453,13 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
helper.getMetricRecorder()
|
helper.getMetricRecorder()
|
||||||
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
|
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
|
||||||
ImmutableList.of(helper.getChannelTarget()),
|
ImmutableList.of(helper.getChannelTarget()),
|
||||||
ImmutableList.of(""));
|
ImmutableList.of(locality));
|
||||||
}
|
}
|
||||||
if (notYetUsableEndpoints.get() > 0) {
|
if (notYetUsableEndpoints.get() > 0) {
|
||||||
// TODO: add locality label once available
|
// TODO: add locality label once available
|
||||||
helper.getMetricRecorder()
|
helper.getMetricRecorder()
|
||||||
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
|
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
|
||||||
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(""));
|
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
|
||||||
}
|
}
|
||||||
|
|
||||||
this.scheduler = new StaticStrideScheduler(newWeights, sequence);
|
this.scheduler = new StaticStrideScheduler(newWeights, sequence);
|
||||||
|
|
@ -456,7 +467,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
|
||||||
// TODO: locality label once available
|
// TODO: locality label once available
|
||||||
helper.getMetricRecorder()
|
helper.getMetricRecorder()
|
||||||
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
|
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
|
||||||
ImmutableList.of(""));
|
ImmutableList.of(locality));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -145,6 +145,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
private String channelTarget = "channel-target";
|
private String channelTarget = "channel-target";
|
||||||
|
private String locality = "locality";
|
||||||
|
|
||||||
public WeightedRoundRobinLoadBalancerTest() {
|
public WeightedRoundRobinLoadBalancerTest() {
|
||||||
testHelperInstance = new TestHelper();
|
testHelperInstance = new TestHelper();
|
||||||
|
|
@ -1135,9 +1136,11 @@ public class WeightedRoundRobinLoadBalancerTest {
|
||||||
@Test
|
@Test
|
||||||
public void metrics() {
|
public void metrics() {
|
||||||
// Give WRR some valid addresses to work with.
|
// Give WRR some valid addresses to work with.
|
||||||
|
Attributes attributesWithLocality = Attributes.newBuilder()
|
||||||
|
.set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build();
|
||||||
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
|
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
|
||||||
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
|
.setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig)
|
||||||
.setAttributes(affinity).build()));
|
.setAttributes(attributesWithLocality).build()));
|
||||||
|
|
||||||
// Flip the three subchannels to READY state to initiate the WRR logic
|
// Flip the three subchannels to READY state to initiate the WRR logic
|
||||||
Iterator<Subchannel> it = subchannels.values().iterator();
|
Iterator<Subchannel> it = subchannels.values().iterator();
|
||||||
|
|
@ -1240,7 +1243,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
||||||
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
|
public boolean matches(LongCounterMetricInstrument longCounterInstrument) {
|
||||||
return longCounterInstrument.getName().equals(name);
|
return longCounterInstrument.getName().equals(name);
|
||||||
}
|
}
|
||||||
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList("")));
|
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verifies that the MetricRecorder has been called to record a given double histogram value the
|
// Verifies that the MetricRecorder has been called to record a given double histogram value the
|
||||||
|
|
@ -1252,7 +1255,7 @@ public class WeightedRoundRobinLoadBalancerTest {
|
||||||
public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
|
public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) {
|
||||||
return doubleHistogramInstrument.getName().equals(name);
|
return doubleHistogramInstrument.getName().equals(name);
|
||||||
}
|
}
|
||||||
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList("")));
|
}), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getNumFilteredPendingTasks() {
|
private int getNumFilteredPendingTasks() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue