From 93551719b935c98249de172cf396c0fdf98e834a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 31 May 2019 14:28:23 -0700 Subject: [PATCH] xds: integrate backend metric API to client load reporting (#5797) * augmented ClientLoadCounter with backend metrics * added a listener implementation for receiving backend metrics and aggregate in ClientLoadCounter --- .../java/io/grpc/xds/ClientLoadCounter.java | 136 +++++++++++++++++- .../java/io/grpc/xds/XdsLoadStatsStore.java | 15 +- .../io/grpc/xds/ClientLoadCounterTest.java | 106 ++++++++++++-- .../io/grpc/xds/XdsLoadStatsStoreTest.java | 40 +++++- 4 files changed, 273 insertions(+), 24 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java index ff93474fe7..195b6f6657 100644 --- a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java +++ b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java @@ -20,12 +20,18 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import io.envoyproxy.udpa.data.orca.v1.OrcaLoadReport; import io.grpc.ClientStreamTracer; import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.util.ForwardingClientStreamTracer; +import io.grpc.xds.OrcaOobUtil.OrcaOobReportListener; +import io.grpc.xds.OrcaPerRequestUtil.OrcaPerRequestReportListener; import io.grpc.xds.XdsLoadStatsStore.StatsCounter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.ThreadSafe; @@ -37,12 +43,18 @@ import javax.annotation.concurrent.ThreadSafe; */ @NotThreadSafe final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { + + private static final int THREAD_BALANCING_FACTOR = 64; private final AtomicLong callsInProgress = new AtomicLong(); private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsIssued = new AtomicLong(); + private final MetricRecorder[] metricRecorders = new MetricRecorder[THREAD_BALANCING_FACTOR]; ClientLoadCounter() { + for (int i = 0; i < THREAD_BALANCING_FACTOR; i++) { + metricRecorders[i] = new MetricRecorder(); + } } /** @@ -50,6 +62,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { */ @VisibleForTesting ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) { + this(); this.callsSucceeded.set(callsSucceeded); this.callsInProgress.set(callsInProgress); this.callsFailed.set(callsFailed); @@ -72,6 +85,13 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { } } + @Override + void recordMetric(String name, double value) { + MetricRecorder recorder = + metricRecorders[(int) (Thread.currentThread().getId() % THREAD_BALANCING_FACTOR)]; + recorder.addValue(name, value); + } + /** * Generate snapshot for recorded query counts and metrics since previous snapshot. * @@ -80,10 +100,25 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { */ @Override public ClientLoadSnapshot snapshot() { + Map aggregatedValues = new HashMap<>(); + for (MetricRecorder recorder : metricRecorders) { + Map map = recorder.takeAll(); + for (Map.Entry entry : map.entrySet()) { + MetricValue curr = aggregatedValues.get(entry.getKey()); + if (curr == null) { + curr = new MetricValue(); + aggregatedValues.put(entry.getKey(), curr); + } + MetricValue diff = entry.getValue(); + curr.numReports += diff.numReports; + curr.totalValue += diff.totalValue; + } + } return new ClientLoadSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), callsFailed.getAndSet(0), - callsIssued.getAndSet(0)); + callsIssued.getAndSet(0), + aggregatedValues); } /** @@ -93,11 +128,14 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { static final class ClientLoadSnapshot { @VisibleForTesting - static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0, 0); + @SuppressWarnings("unchecked") + static final ClientLoadSnapshot EMPTY_SNAPSHOT = + new ClientLoadSnapshot(0, 0, 0, 0, Collections.EMPTY_MAP); private final long callsSucceeded; private final long callsInProgress; private final long callsFailed; private final long callsIssued; + private final Map metricValues; /** * External usage must only be for testing. @@ -106,11 +144,13 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { ClientLoadSnapshot(long callsSucceeded, long callsInProgress, long callsFailed, - long callsIssued) { + long callsIssued, + Map metricValues) { this.callsSucceeded = callsSucceeded; this.callsInProgress = callsInProgress; this.callsFailed = callsFailed; this.callsIssued = callsIssued; + this.metricValues = checkNotNull(metricValues, "metricValues"); } long getCallsSucceeded() { @@ -129,6 +169,10 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { return callsIssued; } + Map getMetricValues() { + return Collections.unmodifiableMap(metricValues); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -136,10 +180,73 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { .add("callsInProgress", callsInProgress) .add("callsFailed", callsFailed) .add("callsIssued", callsIssued) + .add("metricValues", metricValues) .toString(); } } + /** + * Atomic unit of recording for metric data. + */ + static final class MetricValue { + + private int numReports; + private double totalValue; + + private MetricValue() { + this(0, 0); + } + + /** + * Must only be used for testing. + */ + @VisibleForTesting + MetricValue(int numReports, double totalValue) { + this.numReports = numReports; + this.totalValue = totalValue; + } + + long getNumReports() { + return numReports; + } + + double getTotalValue() { + return totalValue; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("numReports", numReports) + .add("totalValue", totalValue) + .toString(); + } + } + + /** + * Single contention-balanced bucket for recording metric data. + */ + private static class MetricRecorder { + + private Map metricValues = new HashMap<>(); + + synchronized void addValue(String metricName, double value) { + MetricValue currValue = metricValues.get(metricName); + if (currValue == null) { + currValue = new MetricValue(); + } + currValue.numReports++; + currValue.totalValue += value; + metricValues.put(metricName, currValue); + } + + synchronized Map takeAll() { + Map ret = metricValues; + metricValues = new HashMap<>(); + return ret; + } + } + /** * An {@link XdsClientLoadRecorder} instance records and aggregates client-side load data into an * {@link ClientLoadCounter} object. @@ -173,4 +280,27 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { }; } } + + /** + * Listener implementation to receive backend metrics with locality-level aggregation. + */ + @ThreadSafe + static final class LocalityMetricsListener implements OrcaPerRequestReportListener, + OrcaOobReportListener { + + private final ClientLoadCounter counter; + + LocalityMetricsListener(ClientLoadCounter counter) { + this.counter = checkNotNull(counter, "counter"); + } + + @Override + public void onLoadReport(OrcaLoadReport report) { + counter.recordMetric("cpu_utilization", report.getCpuUtilization()); + counter.recordMetric("mem_utilization", report.getMemUtilization()); + for (Map.Entry entry : report.getRequestCostOrUtilizationMap().entrySet()) { + counter.recordMetric(entry.getKey(), entry.getValue()); + } + } + } } diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java b/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java index 3c20b1581b..77f1607737 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java @@ -23,9 +23,11 @@ import com.google.common.annotations.VisibleForTesting; import io.envoyproxy.envoy.api.v2.core.Locality; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; +import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats; import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; import io.grpc.Status; import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot; +import io.grpc.xds.ClientLoadCounter.MetricValue; import io.grpc.xds.XdsLoadReportClientImpl.StatsStore; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -76,6 +78,13 @@ final class XdsLoadStatsStore implements StatsStore { .setTotalErrorRequests(snapshot.getCallsFailed()) .setTotalRequestsInProgress(snapshot.getCallsInProgress()) .setTotalIssuedRequests(snapshot.getCallsIssued()); + for (Map.Entry metric : snapshot.getMetricValues().entrySet()) { + localityStatsBuilder.addLoadMetricStats( + EndpointLoadMetricStats.newBuilder() + .setMetricName(metric.getKey()) + .setNumRequestsFinishedWithMetric(metric.getValue().getNumReports()) + .setTotalMetricValue(metric.getValue().getTotalValue())); + } statsBuilder.addUpstreamLocalityStats(localityStatsBuilder); // Discard counters for localities that are no longer exposed by the remote balancer and // no RPCs ongoing. @@ -152,8 +161,8 @@ final class XdsLoadStatsStore implements StatsStore { } /** - * Blueprint for counters that can can record number of calls in-progress, succeeded, failed and - * issued. + * Blueprint for counters that can can record number of calls in-progress, succeeded, failed, + * issued and backend metrics. */ abstract static class StatsCounter { @@ -163,6 +172,8 @@ final class XdsLoadStatsStore implements StatsStore { abstract void recordCallFinished(Status status); + abstract void recordMetric(String name, double value); + abstract ClientLoadSnapshot snapshot(); boolean isActive() { diff --git a/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java b/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java index 145f7d0f88..69ac5efcb0 100644 --- a/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java @@ -18,11 +18,14 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import io.envoyproxy.udpa.data.orca.v1.OrcaLoadReport; import io.grpc.ClientStreamTracer; import io.grpc.ClientStreamTracer.StreamInfo; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot; +import io.grpc.xds.ClientLoadCounter.LocalityMetricsListener; +import io.grpc.xds.ClientLoadCounter.MetricValue; import io.grpc.xds.ClientLoadCounter.XdsClientLoadRecorder; import java.util.concurrent.ThreadLocalRandom; import org.junit.Before; @@ -50,11 +53,12 @@ public class ClientLoadCounterTest { public void setUp() { counter = new ClientLoadCounter(); ClientLoadSnapshot emptySnapshot = counter.snapshot(); - assertSnapshot(emptySnapshot, 0, 0, 0, 0); + assertQueryCounts(emptySnapshot, 0, 0, 0, 0); + assertThat(emptySnapshot.getMetricValues()).isEmpty(); } @Test - public void snapshotContainsEverything() { + public void snapshotContainsDataInCounter() { long numSucceededCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numInProgressCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numFailedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); @@ -63,38 +67,56 @@ public class ClientLoadCounterTest { new ClientLoadCounter(numSucceededCalls, numInProgressCalls, numFailedCalls, numIssuedCalls); ClientLoadSnapshot snapshot = counter.snapshot(); - assertSnapshot(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls, numIssuedCalls); + assertQueryCounts(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls, + numIssuedCalls); String snapshotStr = snapshot.toString(); assertThat(snapshotStr).contains("callsSucceeded=" + numSucceededCalls); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsFailed=" + numFailedCalls); assertThat(snapshotStr).contains("callsIssued=" + numIssuedCalls); + assertThat(snapshotStr).contains("metricValues={}"); // Snapshot only accounts for stats happening after previous snapshot. snapshot = counter.snapshot(); - assertSnapshot(snapshot, 0, numInProgressCalls, 0, 0); + assertQueryCounts(snapshot, 0, numInProgressCalls, 0, 0); snapshotStr = snapshot.toString(); assertThat(snapshotStr).contains("callsSucceeded=0"); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsFailed=0"); assertThat(snapshotStr).contains("callsIssued=0"); + assertThat(snapshotStr).contains("metricValues={}"); } @Test - public void normalCountingOperations() { + public void normalRecordingOperations() { counter.recordCallStarted(); ClientLoadSnapshot snapshot = counter.snapshot(); - assertSnapshot(snapshot, 0, 1, 0, 1); + assertQueryCounts(snapshot, 0, 1, 0, 1); counter.recordCallFinished(Status.OK); snapshot = counter.snapshot(); - assertSnapshot(snapshot, 1, 0, 0, 0); + assertQueryCounts(snapshot, 1, 0, 0, 0); counter.recordCallStarted(); counter.recordCallFinished(Status.CANCELLED); snapshot = counter.snapshot(); - assertSnapshot(snapshot, 0, 0, 1, 1); + assertQueryCounts(snapshot, 0, 0, 1, 1); + + counter.recordMetric("test-metric-1", 0.75); + counter.recordMetric("test-metric-2", 0.342); + counter.recordMetric("test-metric-3", 0.512); + counter.recordMetric("test-metric-1", 0.543); + counter.recordMetric("test-metric-1", 4.412); + counter.recordMetric("test-metric-1", 100.353); + snapshot = counter.snapshot(); + assertThat(snapshot.getMetricValues().get("test-metric-1").getNumReports()).isEqualTo(4); + assertThat(snapshot.getMetricValues().get("test-metric-1").getTotalValue()) + .isEqualTo(0.75 + 0.543 + 4.412 + 100.353); + assertThat(snapshot.getMetricValues().get("test-metric-2").getNumReports()).isEqualTo(1); + assertThat(snapshot.getMetricValues().get("test-metric-2").getTotalValue()).isEqualTo(0.342); + assertThat(snapshot.getMetricValues().get("test-metric-3").getNumReports()).isEqualTo(1); + assertThat(snapshot.getMetricValues().get("test-metric-3").getTotalValue()).isEqualTo(0.512); } @Test @@ -103,10 +125,10 @@ public class ClientLoadCounterTest { new XdsClientLoadRecorder(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY); ClientStreamTracer tracer = recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()); ClientLoadSnapshot snapshot = counter.snapshot(); - assertSnapshot(snapshot, 0, 1, 0, 1); + assertQueryCounts(snapshot, 0, 1, 0, 1); tracer.streamClosed(Status.OK); snapshot = counter.snapshot(); - assertSnapshot(snapshot, 1, 0, 0, 0); + assertQueryCounts(snapshot, 1, 0, 0, 0); // Create a second XdsClientLoadRecorder with the same counter, stats are aggregated together. XdsClientLoadRecorder recorder2 = @@ -114,10 +136,70 @@ public class ClientLoadCounterTest { recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED); recorder2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED); snapshot = counter.snapshot(); - assertSnapshot(snapshot, 0, 0, 2, 2); + assertQueryCounts(snapshot, 0, 0, 2, 2); } - private void assertSnapshot(ClientLoadSnapshot snapshot, + @Test + public void metricListener_backendMetricsAggregation() { + LocalityMetricsListener listener1 = new LocalityMetricsListener(counter); + OrcaLoadReport report = + OrcaLoadReport.newBuilder() + .setCpuUtilization(0.5345) + .setMemUtilization(0.647) + .putRequestCostOrUtilization("named-cost-or-utilization-1", 3453.3525) + .putRequestCostOrUtilization("named-cost-or-utilization-2", 532543.14234) + .build(); + listener1.onLoadReport(report); + + // Simulate an empty load report. + listener1.onLoadReport(OrcaLoadReport.getDefaultInstance()); + + ClientLoadSnapshot snapshot = counter.snapshot(); + MetricValue cpuMetric = snapshot.getMetricValues().get("cpu_utilization"); + assertThat(cpuMetric.getNumReports()).isEqualTo(2); + assertThat(cpuMetric.getTotalValue()).isEqualTo(0.5345); + + MetricValue memMetric = snapshot.getMetricValues().get("mem_utilization"); + assertThat(memMetric.getNumReports()).isEqualTo(2); + assertThat(memMetric.getTotalValue()).isEqualTo(0.647); + + MetricValue namedMetric1 = snapshot.getMetricValues().get("named-cost-or-utilization-1"); + assertThat(namedMetric1.getNumReports()).isEqualTo(1); + assertThat(namedMetric1.getTotalValue()).isEqualTo(3453.3525); + + MetricValue namedMetric2 = snapshot.getMetricValues().get("named-cost-or-utilization-2"); + assertThat(namedMetric2.getNumReports()).isEqualTo(1); + assertThat(namedMetric2.getTotalValue()).isEqualTo(532543.14234); + + snapshot = counter.snapshot(); + assertThat(snapshot.getMetricValues()).isEmpty(); + + LocalityMetricsListener listener2 = new LocalityMetricsListener(counter); + report = + OrcaLoadReport.newBuilder() + .setCpuUtilization(0.3423) + .setMemUtilization(0.654) + .putRequestCostOrUtilization("named-cost-or-utilization", 3534.0) + .build(); + // Two listeners with the same counter aggregate metrics together. + listener1.onLoadReport(report); + listener2.onLoadReport(report); + + snapshot = counter.snapshot(); + cpuMetric = snapshot.getMetricValues().get("cpu_utilization"); + assertThat(cpuMetric.getNumReports()).isEqualTo(2); + assertThat(cpuMetric.getTotalValue()).isEqualTo(0.3423 + 0.3423); + + memMetric = snapshot.getMetricValues().get("mem_utilization"); + assertThat(memMetric.getNumReports()).isEqualTo(2); + assertThat(memMetric.getTotalValue()).isEqualTo(0.654 + 0.654); + + MetricValue namedMetric = snapshot.getMetricValues().get("named-cost-or-utilization"); + assertThat(namedMetric.getNumReports()).isEqualTo(2); + assertThat(namedMetric.getTotalValue()).isEqualTo(3534.0 + 3534.0); + } + + private void assertQueryCounts(ClientLoadSnapshot snapshot, long callsSucceeded, long callsInProgress, long callsFailed, diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java index 683a81305a..54792239e7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java @@ -28,8 +28,11 @@ import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; import io.envoyproxy.envoy.api.v2.endpoint.EndpointLoadMetricStats; import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot; +import io.grpc.xds.ClientLoadCounter.MetricValue; import io.grpc.xds.XdsLoadStatsStore.StatsCounter; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -71,6 +74,19 @@ public class XdsLoadStatsStoreTest { loadStore = new XdsLoadStatsStore(SERVICE_NAME, localityLoadCounters, dropCounters); } + private static List buildEndpointLoadMetricStatsList( + Map metrics) { + List res = new ArrayList<>(); + for (Map.Entry entry : metrics.entrySet()) { + res.add(EndpointLoadMetricStats.newBuilder() + .setMetricName(entry.getKey()) + .setNumRequestsFinishedWithMetric(entry.getValue().getNumReports()) + .setTotalMetricValue(entry.getValue().getTotalValue()) + .build()); + } + return res; + } + private static UpstreamLocalityStats buildUpstreamLocalityStats(Locality locality, long callsSucceed, long callsInProgress, @@ -213,15 +229,23 @@ public class XdsLoadStatsStoreTest { } @Test + @SuppressWarnings("unchecked") public void loadReportMatchesSnapshots() { StatsCounter counter1 = mock(StatsCounter.class); + Map metrics1 = new HashMap<>(); + metrics1.put("cpu_utilization", new MetricValue(15, 12.5435)); + metrics1.put("mem_utilization", new MetricValue(8, 0.421)); + metrics1.put("named_cost_or_utilization", new MetricValue(3, 2.5435)); when(counter1.isActive()).thenReturn(true); - when(counter1.snapshot()) - .thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593), - new ClientLoadSnapshot(0, 543, 0, 0)); + when(counter1.snapshot()).thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593, metrics1), + new ClientLoadSnapshot(0, 543, 0, 0, Collections.EMPTY_MAP)); StatsCounter counter2 = mock(StatsCounter.class); - when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702), - new ClientLoadSnapshot(0, 432, 0, 0)); + Map metrics2 = new HashMap<>(); + metrics2.put("cpu_utilization", new MetricValue(344, 132.74)); + metrics2.put("mem_utilization", new MetricValue(41, 23.453)); + metrics2.put("named_cost_or_utilization", new MetricValue(12, 423)); + when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702, metrics2), + new ClientLoadSnapshot(0, 432, 0, 0, Collections.EMPTY_MAP)); when(counter2.isActive()).thenReturn(true); localityLoadCounters.put(LOCALITY1, counter1); localityLoadCounters.put(LOCALITY2, counter2); @@ -229,8 +253,10 @@ public class XdsLoadStatsStoreTest { ClusterStats expectedReport = buildClusterStats( Arrays.asList( - buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593, null), - buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, null) + buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593, + buildEndpointLoadMetricStatsList(metrics1)), + buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, + buildEndpointLoadMetricStatsList(metrics2)) ), null);