xds: integrate backend metric API to client load reporting (#5797)

* augmented ClientLoadCounter with backend metrics

* added a listener implementation for receiving backend metrics and aggregate in ClientLoadCounter
This commit is contained in:
Chengyuan Zhang 2019-05-31 14:28:23 -07:00 committed by GitHub
parent 276b7d8512
commit 93551719b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 273 additions and 24 deletions

View File

@ -20,12 +20,18 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.envoyproxy.udpa.data.orca.v1.OrcaLoadReport;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.util.ForwardingClientStreamTracer;
import io.grpc.xds.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import io.grpc.xds.XdsLoadStatsStore.StatsCounter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
@ -37,12 +43,18 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@NotThreadSafe
final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
private static final int THREAD_BALANCING_FACTOR = 64;
private final AtomicLong callsInProgress = new AtomicLong();
private final AtomicLong callsSucceeded = new AtomicLong();
private final AtomicLong callsFailed = new AtomicLong();
private final AtomicLong callsIssued = new AtomicLong();
private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR];
ClientLoadCounter() {
for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) {
metricRecorders[i] = new MetricRecorder();
}
}
/**
@ -50,6 +62,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
*/
@VisibleForTesting
ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) {
this();
this.callsSucceeded.set(callsSucceeded);
this.callsInProgress.set(callsInProgress);
this.callsFailed.set(callsFailed);
@ -72,6 +85,13 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
}
}
@Override
void recordMetric(String name, double value) {
MetricRecorder recorder =
metricRecorders[(int) (Thread.currentThread().getId() % THREAD_BALANCING_FACTOR)];
recorder.addValue(name, value);
}
/**
* Generate snapshot for recorded query counts and metrics since previous snapshot.
*
@ -80,10 +100,25 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
*/
@Override
public ClientLoadSnapshot snapshot() {
Map<String, MetricValue> aggregatedValues = new HashMap<>();
for (MetricRecorder recorder : metricRecorders) {
Map<String, MetricValue> map = recorder.takeAll();
for (Map.Entry<String, MetricValue> entry : map.entrySet()) {
MetricValue curr = aggregatedValues.get(entry.getKey());
if (curr == null) {
curr = new MetricValue();
aggregatedValues.put(entry.getKey(), curr);
}
MetricValue diff = entry.getValue();
curr.numReports += diff.numReports;
curr.totalValue += diff.totalValue;
}
}
return new ClientLoadSnapshot(callsSucceeded.getAndSet(0),
callsInProgress.get(),
callsFailed.getAndSet(0),
callsIssued.getAndSet(0));
callsIssued.getAndSet(0),
aggregatedValues);
}
/**
@ -93,11 +128,14 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
static final class ClientLoadSnapshot {
@VisibleForTesting
static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0, 0);
@SuppressWarnings("unchecked")
static final ClientLoadSnapshot EMPTY_SNAPSHOT =
new ClientLoadSnapshot(0, 0, 0, 0, Collections.EMPTY_MAP);
private final long callsSucceeded;
private final long callsInProgress;
private final long callsFailed;
private final long callsIssued;
private final Map<String, MetricValue> metricValues;
/**
* External usage must only be for testing.
@ -106,11 +144,13 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
ClientLoadSnapshot(long callsSucceeded,
long callsInProgress,
long callsFailed,
long callsIssued) {
long callsIssued,
Map<String, MetricValue> metricValues) {
this.callsSucceeded = callsSucceeded;
this.callsInProgress = callsInProgress;
this.callsFailed = callsFailed;
this.callsIssued = callsIssued;
this.metricValues = checkNotNull(metricValues, "metricValues");
}
long getCallsSucceeded() {
@ -129,6 +169,10 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
return callsIssued;
}
Map<String, MetricValue> getMetricValues() {
return Collections.unmodifiableMap(metricValues);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@ -136,10 +180,73 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
.add("callsInProgress", callsInProgress)
.add("callsFailed", callsFailed)
.add("callsIssued", callsIssued)
.add("metricValues", metricValues)
.toString();
}
}
/**
* Atomic unit of recording for metric data.
*/
static final class MetricValue {
private int numReports;
private double totalValue;
private MetricValue() {
this(0, 0);
}
/**
* Must only be used for testing.
*/
@VisibleForTesting
MetricValue(int numReports, double totalValue) {
this.numReports = numReports;
this.totalValue = totalValue;
}
long getNumReports() {
return numReports;
}
double getTotalValue() {
return totalValue;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("numReports", numReports)
.add("totalValue", totalValue)
.toString();
}
}
/**
* Single contention-balanced bucket for recording metric data.
*/
private static class MetricRecorder {
private Map<String, MetricValue> metricValues = new HashMap<>();
synchronized void addValue(String metricName, double value) {
MetricValue currValue = metricValues.get(metricName);
if (currValue == null) {
currValue = new MetricValue();
}
currValue.numReports++;
currValue.totalValue += value;
metricValues.put(metricName, currValue);
}
synchronized Map<String, MetricValue> takeAll() {
Map<String, MetricValue> ret = metricValues;
metricValues = new HashMap<>();
return ret;
}
}
/**
* An {@link XdsClientLoadRecorder} instance records and aggregates client-side load data into an
* {@link ClientLoadCounter} object.
@ -173,4 +280,27 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
};
}
}
/**
* Listener implementation to receive backend metrics with locality-level aggregation.
*/
@ThreadSafe
static final class LocalityMetricsListener implements OrcaPerRequestReportListener,
OrcaOobReportListener {
private final ClientLoadCounter counter;
LocalityMetricsListener(ClientLoadCounter counter) {
this.counter = checkNotNull(counter, "counter");
}
@Override
public void onLoadReport(OrcaLoadReport report) {
counter.recordMetric("cpu_utilization", report.getCpuUtilization());
counter.recordMetric("mem_utilization", report.getMemUtilization());
for (Map.Entry<String, Double> entry : report.getRequestCostOrUtilizationMap().entrySet()) {
counter.recordMetric(entry.getKey(), entry.getValue());
}
}
}
}

View File

@ -23,9 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.grpc.Status;
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
import io.grpc.xds.ClientLoadCounter.MetricValue;
import io.grpc.xds.XdsLoadReportClientImpl.StatsStore;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -76,6 +78,13 @@ final class XdsLoadStatsStore implements StatsStore {
.setTotalErrorRequests(snapshot.getCallsFailed())
.setTotalRequestsInProgress(snapshot.getCallsInProgress())
.setTotalIssuedRequests(snapshot.getCallsIssued());
for (Map.Entry<String, MetricValue> metric : snapshot.getMetricValues().entrySet()) {
localityStatsBuilder.addLoadMetricStats(
EndpointLoadMetricStats.newBuilder()
.setMetricName(metric.getKey())
.setNumRequestsFinishedWithMetric(metric.getValue().getNumReports())
.setTotalMetricValue(metric.getValue().getTotalValue()));
}
statsBuilder.addUpstreamLocalityStats(localityStatsBuilder);
// Discard counters for localities that are no longer exposed by the remote balancer and
// no RPCs ongoing.
@ -152,8 +161,8 @@ final class XdsLoadStatsStore implements StatsStore {
}
/**
* Blueprint for counters that can can record number of calls in-progress, succeeded, failed and
* issued.
* Blueprint for counters that can can record number of calls in-progress, succeeded, failed,
* issued and backend metrics.
*/
abstract static class StatsCounter {
@ -163,6 +172,8 @@ final class XdsLoadStatsStore implements StatsStore {
abstract void recordCallFinished(Status status);
abstract void recordMetric(String name, double value);
abstract ClientLoadSnapshot snapshot();
boolean isActive() {

View File

@ -18,11 +18,14 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.envoyproxy.udpa.data.orca.v1.OrcaLoadReport;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
import io.grpc.xds.ClientLoadCounter.LocalityMetricsListener;
import io.grpc.xds.ClientLoadCounter.MetricValue;
import io.grpc.xds.ClientLoadCounter.XdsClientLoadRecorder;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Before;
@ -50,11 +53,12 @@ public class ClientLoadCounterTest {
public void setUp() {
counter = new ClientLoadCounter();
ClientLoadSnapshot emptySnapshot = counter.snapshot();
assertSnapshot(emptySnapshot, 0, 0, 0, 0);
assertQueryCounts(emptySnapshot, 0, 0, 0, 0);
assertThat(emptySnapshot.getMetricValues()).isEmpty();
}
@Test
public void snapshotContainsEverything() {
public void snapshotContainsDataInCounter() {
long numSucceededCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numInProgressCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numFailedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
@ -63,38 +67,56 @@ public class ClientLoadCounterTest {
new ClientLoadCounter(numSucceededCalls, numInProgressCalls, numFailedCalls,
numIssuedCalls);
ClientLoadSnapshot snapshot = counter.snapshot();
assertSnapshot(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls, numIssuedCalls);
assertQueryCounts(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls,
numIssuedCalls);
String snapshotStr = snapshot.toString();
assertThat(snapshotStr).contains("callsSucceeded=" + numSucceededCalls);
assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls);
assertThat(snapshotStr).contains("callsFailed=" + numFailedCalls);
assertThat(snapshotStr).contains("callsIssued=" + numIssuedCalls);
assertThat(snapshotStr).contains("metricValues={}");
// Snapshot only accounts for stats happening after previous snapshot.
snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, numInProgressCalls, 0, 0);
assertQueryCounts(snapshot, 0, numInProgressCalls, 0, 0);
snapshotStr = snapshot.toString();
assertThat(snapshotStr).contains("callsSucceeded=0");
assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls);
assertThat(snapshotStr).contains("callsFailed=0");
assertThat(snapshotStr).contains("callsIssued=0");
assertThat(snapshotStr).contains("metricValues={}");
}
@Test
public void normalCountingOperations() {
public void normalRecordingOperations() {
counter.recordCallStarted();
ClientLoadSnapshot snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, 1, 0, 1);
assertQueryCounts(snapshot, 0, 1, 0, 1);
counter.recordCallFinished(Status.OK);
snapshot = counter.snapshot();
assertSnapshot(snapshot, 1, 0, 0, 0);
assertQueryCounts(snapshot, 1, 0, 0, 0);
counter.recordCallStarted();
counter.recordCallFinished(Status.CANCELLED);
snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, 0, 1, 1);
assertQueryCounts(snapshot, 0, 0, 1, 1);
counter.recordMetric("test-metric-1", 0.75);
counter.recordMetric("test-metric-2", 0.342);
counter.recordMetric("test-metric-3", 0.512);
counter.recordMetric("test-metric-1", 0.543);
counter.recordMetric("test-metric-1", 4.412);
counter.recordMetric("test-metric-1", 100.353);
snapshot = counter.snapshot();
assertThat(snapshot.getMetricValues().get("test-metric-1").getNumReports()).isEqualTo(4);
assertThat(snapshot.getMetricValues().get("test-metric-1").getTotalValue())
.isEqualTo(0.75 + 0.543 + 4.412 + 100.353);
assertThat(snapshot.getMetricValues().get("test-metric-2").getNumReports()).isEqualTo(1);
assertThat(snapshot.getMetricValues().get("test-metric-2").getTotalValue()).isEqualTo(0.342);
assertThat(snapshot.getMetricValues().get("test-metric-3").getNumReports()).isEqualTo(1);
assertThat(snapshot.getMetricValues().get("test-metric-3").getTotalValue()).isEqualTo(0.512);
}
@Test
@ -103,10 +125,10 @@ public class ClientLoadCounterTest {
new XdsClientLoadRecorder(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
ClientStreamTracer tracer = recorder1.newClientStreamTracer(STREAM_INFO, new Metadata());
ClientLoadSnapshot snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, 1, 0, 1);
assertQueryCounts(snapshot, 0, 1, 0, 1);
tracer.streamClosed(Status.OK);
snapshot = counter.snapshot();
assertSnapshot(snapshot, 1, 0, 0, 0);
assertQueryCounts(snapshot, 1, 0, 0, 0);
// Create a second XdsClientLoadRecorder with the same counter, stats are aggregated together.
XdsClientLoadRecorder recorder2 =
@ -114,10 +136,70 @@ public class ClientLoadCounterTest {
recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED);
recorder2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED);
snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, 0, 2, 2);
assertQueryCounts(snapshot, 0, 0, 2, 2);
}
private void assertSnapshot(ClientLoadSnapshot snapshot,
@Test
public void metricListener_backendMetricsAggregation() {
LocalityMetricsListener listener1 = new LocalityMetricsListener(counter);
OrcaLoadReport report =
OrcaLoadReport.newBuilder()
.setCpuUtilization(0.5345)
.setMemUtilization(0.647)
.putRequestCostOrUtilization("named-cost-or-utilization-1", 3453.3525)
.putRequestCostOrUtilization("named-cost-or-utilization-2", 532543.14234)
.build();
listener1.onLoadReport(report);
// Simulate an empty load report.
listener1.onLoadReport(OrcaLoadReport.getDefaultInstance());
ClientLoadSnapshot snapshot = counter.snapshot();
MetricValue cpuMetric = snapshot.getMetricValues().get("cpu_utilization");
assertThat(cpuMetric.getNumReports()).isEqualTo(2);
assertThat(cpuMetric.getTotalValue()).isEqualTo(0.5345);
MetricValue memMetric = snapshot.getMetricValues().get("mem_utilization");
assertThat(memMetric.getNumReports()).isEqualTo(2);
assertThat(memMetric.getTotalValue()).isEqualTo(0.647);
MetricValue namedMetric1 = snapshot.getMetricValues().get("named-cost-or-utilization-1");
assertThat(namedMetric1.getNumReports()).isEqualTo(1);
assertThat(namedMetric1.getTotalValue()).isEqualTo(3453.3525);
MetricValue namedMetric2 = snapshot.getMetricValues().get("named-cost-or-utilization-2");
assertThat(namedMetric2.getNumReports()).isEqualTo(1);
assertThat(namedMetric2.getTotalValue()).isEqualTo(532543.14234);
snapshot = counter.snapshot();
assertThat(snapshot.getMetricValues()).isEmpty();
LocalityMetricsListener listener2 = new LocalityMetricsListener(counter);
report =
OrcaLoadReport.newBuilder()
.setCpuUtilization(0.3423)
.setMemUtilization(0.654)
.putRequestCostOrUtilization("named-cost-or-utilization", 3534.0)
.build();
// Two listeners with the same counter aggregate metrics together.
listener1.onLoadReport(report);
listener2.onLoadReport(report);
snapshot = counter.snapshot();
cpuMetric = snapshot.getMetricValues().get("cpu_utilization");
assertThat(cpuMetric.getNumReports()).isEqualTo(2);
assertThat(cpuMetric.getTotalValue()).isEqualTo(0.3423 + 0.3423);
memMetric = snapshot.getMetricValues().get("mem_utilization");
assertThat(memMetric.getNumReports()).isEqualTo(2);
assertThat(memMetric.getTotalValue()).isEqualTo(0.654 + 0.654);
MetricValue namedMetric = snapshot.getMetricValues().get("named-cost-or-utilization");
assertThat(namedMetric.getNumReports()).isEqualTo(2);
assertThat(namedMetric.getTotalValue()).isEqualTo(3534.0 + 3534.0);
}
private void assertQueryCounts(ClientLoadSnapshot snapshot,
long callsSucceeded,
long callsInProgress,
long callsFailed,

View File

@ -28,8 +28,11 @@ import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
import io.grpc.xds.ClientLoadCounter.MetricValue;
import io.grpc.xds.XdsLoadStatsStore.StatsCounter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -71,6 +74,19 @@ public class XdsLoadStatsStoreTest {
loadStore = new XdsLoadStatsStore(SERVICE_NAME, localityLoadCounters, dropCounters);
}
private static List<EndpointLoadMetricStats> buildEndpointLoadMetricStatsList(
Map<String, MetricValue> metrics) {
List<EndpointLoadMetricStats> res = new ArrayList<>();
for (Map.Entry<String, MetricValue> entry : metrics.entrySet()) {
res.add(EndpointLoadMetricStats.newBuilder()
.setMetricName(entry.getKey())
.setNumRequestsFinishedWithMetric(entry.getValue().getNumReports())
.setTotalMetricValue(entry.getValue().getTotalValue())
.build());
}
return res;
}
private static UpstreamLocalityStats buildUpstreamLocalityStats(Locality locality,
long callsSucceed,
long callsInProgress,
@ -213,15 +229,23 @@ public class XdsLoadStatsStoreTest {
}
@Test
@SuppressWarnings("unchecked")
public void loadReportMatchesSnapshots() {
StatsCounter counter1 = mock(StatsCounter.class);
Map<String, MetricValue> metrics1 = new HashMap<>();
metrics1.put("cpu_utilization", new MetricValue(15, 12.5435));
metrics1.put("mem_utilization", new MetricValue(8, 0.421));
metrics1.put("named_cost_or_utilization", new MetricValue(3, 2.5435));
when(counter1.isActive()).thenReturn(true);
when(counter1.snapshot())
.thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593),
new ClientLoadSnapshot(0, 543, 0, 0));
when(counter1.snapshot()).thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593, metrics1),
new ClientLoadSnapshot(0, 543, 0, 0, Collections.EMPTY_MAP));
StatsCounter counter2 = mock(StatsCounter.class);
when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702),
new ClientLoadSnapshot(0, 432, 0, 0));
Map<String, MetricValue> metrics2 = new HashMap<>();
metrics2.put("cpu_utilization", new MetricValue(344, 132.74));
metrics2.put("mem_utilization", new MetricValue(41, 23.453));
metrics2.put("named_cost_or_utilization", new MetricValue(12, 423));
when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702, metrics2),
new ClientLoadSnapshot(0, 432, 0, 0, Collections.EMPTY_MAP));
when(counter2.isActive()).thenReturn(true);
localityLoadCounters.put(LOCALITY1, counter1);
localityLoadCounters.put(LOCALITY2, counter2);
@ -229,8 +253,10 @@ public class XdsLoadStatsStoreTest {
ClusterStats expectedReport =
buildClusterStats(
Arrays.asList(
buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593, null),
buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, null)
buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593,
buildEndpointLoadMetricStatsList(metrics1)),
buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702,
buildEndpointLoadMetricStatsList(metrics2))
),
null);