From b7fb3c2e93564a78ca8484c6be32c5912839bdec Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 15 May 2019 15:52:20 -0700 Subject: [PATCH] xds: add counts for recently issued calls in client side load reporting (#5735) * added counts for recently issued calls in client side load reporting * use recordCallStarted/Finished to manipulate counter instead of explicitly incr/decr methods --- .../java/io/grpc/xds/ClientLoadCounter.java | 65 +++++++++-------- .../java/io/grpc/xds/XdsLoadStatsStore.java | 17 ++--- .../io/grpc/xds/ClientLoadCounterTest.java | 73 ++++++++++--------- .../grpc/xds/XdsLoadReportClientImplTest.java | 10 ++- .../io/grpc/xds/XdsLoadStatsStoreTest.java | 22 +++--- 5 files changed, 97 insertions(+), 90 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java index d4db4a9dbe..ff93474fe7 100644 --- a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java +++ b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java @@ -38,8 +38,9 @@ import javax.annotation.concurrent.ThreadSafe; @NotThreadSafe final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { private final AtomicLong callsInProgress = new AtomicLong(); - private final AtomicLong callsFinished = new AtomicLong(); + private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); + private final AtomicLong callsIssued = new AtomicLong(); ClientLoadCounter() { } @@ -48,30 +49,27 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { * Must only be used for testing. */ @VisibleForTesting - ClientLoadCounter(long callsFinished, long callsInProgress, long callsFailed) { - this.callsFinished.set(callsFinished); + ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) { + this.callsSucceeded.set(callsSucceeded); this.callsInProgress.set(callsInProgress); this.callsFailed.set(callsFailed); + this.callsIssued.set(callsIssued); } @Override - void incrementCallsInProgress() { + void recordCallStarted() { + callsIssued.getAndIncrement(); callsInProgress.getAndIncrement(); } @Override - void decrementCallsInProgress() { + void recordCallFinished(Status status) { callsInProgress.getAndDecrement(); - } - - @Override - void incrementCallsFinished() { - callsFinished.getAndIncrement(); - } - - @Override - void incrementCallsFailed() { - callsFailed.getAndIncrement(); + if (status.isOk()) { + callsSucceeded.getAndIncrement(); + } else { + callsFailed.getAndIncrement(); + } } /** @@ -82,9 +80,10 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { */ @Override public ClientLoadSnapshot snapshot() { - return new ClientLoadSnapshot(callsFinished.getAndSet(0), + return new ClientLoadSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), - callsFailed.getAndSet(0)); + callsFailed.getAndSet(0), + callsIssued.getAndSet(0)); } /** @@ -94,23 +93,28 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { static final class ClientLoadSnapshot { @VisibleForTesting - static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0); - private final long callsFinished; + static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0, 0); + private final long callsSucceeded; private final long callsInProgress; private final long callsFailed; + private final long callsIssued; /** * External usage must only be for testing. */ @VisibleForTesting - ClientLoadSnapshot(long callsFinished, long callsInProgress, long callsFailed) { - this.callsFinished = callsFinished; + ClientLoadSnapshot(long callsSucceeded, + long callsInProgress, + long callsFailed, + long callsIssued) { + this.callsSucceeded = callsSucceeded; this.callsInProgress = callsInProgress; this.callsFailed = callsFailed; + this.callsIssued = callsIssued; } - long getCallsFinished() { - return callsFinished; + long getCallsSucceeded() { + return callsSucceeded; } long getCallsInProgress() { @@ -121,12 +125,17 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { return callsFailed; } + long getCallsIssued() { + return callsIssued; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("callsFinished", callsFinished) + .add("callsSucceeded", callsSucceeded) .add("callsInProgress", callsInProgress) .add("callsFailed", callsFailed) + .add("callsIssued", callsIssued) .toString(); } } @@ -148,7 +157,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { @Override public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { - counter.incrementCallsInProgress(); + counter.recordCallStarted(); final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers); return new ForwardingClientStreamTracer() { @Override @@ -158,11 +167,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { @Override public void streamClosed(Status status) { - counter.incrementCallsFinished(); - counter.decrementCallsInProgress(); - if (!status.isOk()) { - counter.incrementCallsFailed(); - } + counter.recordCallFinished(status); delegate().streamClosed(status); } }; diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java b/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java index 318deda9b7..3c20b1581b 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java @@ -24,6 +24,7 @@ import io.envoyproxy.envoy.api.v2.core.Locality; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; +import io.grpc.Status; import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot; import io.grpc.xds.XdsLoadReportClientImpl.StatsStore; import java.util.Map; @@ -71,9 +72,10 @@ final class XdsLoadStatsStore implements StatsStore { UpstreamLocalityStats.Builder localityStatsBuilder = UpstreamLocalityStats.newBuilder().setLocality(entry.getKey()); localityStatsBuilder - .setTotalSuccessfulRequests(snapshot.getCallsFinished() - snapshot.getCallsFailed()) + .setTotalSuccessfulRequests(snapshot.getCallsSucceeded()) .setTotalErrorRequests(snapshot.getCallsFailed()) - .setTotalRequestsInProgress(snapshot.getCallsInProgress()); + .setTotalRequestsInProgress(snapshot.getCallsInProgress()) + .setTotalIssuedRequests(snapshot.getCallsIssued()); statsBuilder.addUpstreamLocalityStats(localityStatsBuilder); // Discard counters for localities that are no longer exposed by the remote balancer and // no RPCs ongoing. @@ -150,19 +152,16 @@ final class XdsLoadStatsStore implements StatsStore { } /** - * Blueprint for counters that can can record number of calls in-progress, finished, failed. + * Blueprint for counters that can can record number of calls in-progress, succeeded, failed and + * issued. */ abstract static class StatsCounter { private boolean active = true; - abstract void incrementCallsInProgress(); + abstract void recordCallStarted(); - abstract void decrementCallsInProgress(); - - abstract void incrementCallsFinished(); - - abstract void incrementCallsFailed(); + abstract void recordCallFinished(Status status); abstract ClientLoadSnapshot snapshot(); diff --git a/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java b/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java index 7bff11e998..145f7d0f88 100644 --- a/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientLoadCounterTest.java @@ -50,55 +50,51 @@ public class ClientLoadCounterTest { public void setUp() { counter = new ClientLoadCounter(); ClientLoadSnapshot emptySnapshot = counter.snapshot(); - assertThat(emptySnapshot.getCallsInProgress()).isEqualTo(0); - assertThat(emptySnapshot.getCallsFinished()).isEqualTo(0); - assertThat(emptySnapshot.getCallsFailed()).isEqualTo(0); + assertSnapshot(emptySnapshot, 0, 0, 0, 0); } @Test public void snapshotContainsEverything() { - long numFinishedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + long numSucceededCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numInProgressCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - long numFailedCalls = ThreadLocalRandom.current().nextLong(numFinishedCalls); - counter = new ClientLoadCounter(numFinishedCalls, numInProgressCalls, numFailedCalls); + long numFailedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + long numIssuedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + counter = + new ClientLoadCounter(numSucceededCalls, numInProgressCalls, numFailedCalls, + numIssuedCalls); ClientLoadSnapshot snapshot = counter.snapshot(); - assertThat(snapshot.getCallsFinished()).isEqualTo(numFinishedCalls); - assertThat(snapshot.getCallsInProgress()).isEqualTo(numInProgressCalls); - assertThat(snapshot.getCallsFailed()).isEqualTo(numFailedCalls); + assertSnapshot(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls, numIssuedCalls); String snapshotStr = snapshot.toString(); - assertThat(snapshotStr).contains("callsFinished=" + numFinishedCalls); + assertThat(snapshotStr).contains("callsSucceeded=" + numSucceededCalls); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsFailed=" + numFailedCalls); + assertThat(snapshotStr).contains("callsIssued=" + numIssuedCalls); // Snapshot only accounts for stats happening after previous snapshot. snapshot = counter.snapshot(); - assertThat(snapshot.getCallsFinished()).isEqualTo(0); - assertThat(snapshot.getCallsInProgress()).isEqualTo(numInProgressCalls); - assertThat(snapshot.getCallsFailed()).isEqualTo(0); + assertSnapshot(snapshot, 0, numInProgressCalls, 0, 0); snapshotStr = snapshot.toString(); - assertThat(snapshotStr).contains("callsFinished=0"); + assertThat(snapshotStr).contains("callsSucceeded=0"); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsFailed=0"); + assertThat(snapshotStr).contains("callsIssued=0"); } @Test public void normalCountingOperations() { - ClientLoadSnapshot preSnapshot = counter.snapshot(); - counter.incrementCallsInProgress(); - ClientLoadSnapshot afterSnapshot = counter.snapshot(); - assertThat(afterSnapshot.getCallsInProgress()).isEqualTo(preSnapshot.getCallsInProgress() + 1); - counter.decrementCallsInProgress(); - afterSnapshot = counter.snapshot(); - assertThat(afterSnapshot.getCallsInProgress()).isEqualTo(preSnapshot.getCallsInProgress()); + counter.recordCallStarted(); + ClientLoadSnapshot snapshot = counter.snapshot(); + assertSnapshot(snapshot, 0, 1, 0, 1); - counter.incrementCallsFinished(); - afterSnapshot = counter.snapshot(); - assertThat(afterSnapshot.getCallsFinished()).isEqualTo(1); + counter.recordCallFinished(Status.OK); + snapshot = counter.snapshot(); + assertSnapshot(snapshot, 1, 0, 0, 0); - counter.incrementCallsFailed(); - afterSnapshot = counter.snapshot(); - assertThat(afterSnapshot.getCallsFailed()).isEqualTo(1); + counter.recordCallStarted(); + counter.recordCallFinished(Status.CANCELLED); + snapshot = counter.snapshot(); + assertSnapshot(snapshot, 0, 0, 1, 1); } @Test @@ -107,14 +103,10 @@ public class ClientLoadCounterTest { new XdsClientLoadRecorder(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY); ClientStreamTracer tracer = recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()); ClientLoadSnapshot snapshot = counter.snapshot(); - assertThat(snapshot.getCallsFinished()).isEqualTo(0); - assertThat(snapshot.getCallsInProgress()).isEqualTo(1); - assertThat(snapshot.getCallsFailed()).isEqualTo(0); + assertSnapshot(snapshot, 0, 1, 0, 1); tracer.streamClosed(Status.OK); snapshot = counter.snapshot(); - assertThat(snapshot.getCallsFinished()).isEqualTo(1); - assertThat(snapshot.getCallsInProgress()).isEqualTo(0); - assertThat(snapshot.getCallsFailed()).isEqualTo(0); + assertSnapshot(snapshot, 1, 0, 0, 0); // Create a second XdsClientLoadRecorder with the same counter, stats are aggregated together. XdsClientLoadRecorder recorder2 = @@ -122,8 +114,17 @@ public class ClientLoadCounterTest { recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED); recorder2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED); snapshot = counter.snapshot(); - assertThat(snapshot.getCallsFinished()).isEqualTo(2); - assertThat(snapshot.getCallsInProgress()).isEqualTo(0); - assertThat(snapshot.getCallsFailed()).isEqualTo(2); + assertSnapshot(snapshot, 0, 0, 2, 2); + } + + private void assertSnapshot(ClientLoadSnapshot snapshot, + long callsSucceeded, + long callsInProgress, + long callsFailed, + long callsIssued) { + assertThat(snapshot.getCallsSucceeded()).isEqualTo(callsSucceeded); + assertThat(snapshot.getCallsInProgress()).isEqualTo(callsInProgress); + assertThat(snapshot.getCallsFailed()).isEqualTo(callsFailed); + assertThat(snapshot.getCallsIssued()).isEqualTo(callsIssued); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java index 4773a2c02a..83d13b417f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java @@ -353,8 +353,9 @@ public class XdsLoadReportClientImplTest { inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - long callsFinished = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - long callsFailed = callsFinished - ThreadLocalRandom.current().nextLong(callsFinished); + long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + long callsIssued = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numLbDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); @@ -364,8 +365,9 @@ public class XdsLoadReportClientImplTest { .addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder() .setLocality(TEST_LOCALITY) .setTotalRequestsInProgress(callsInProgress) - .setTotalSuccessfulRequests(callsFinished - callsFailed) - .setTotalErrorRequests(callsFailed)) + .setTotalSuccessfulRequests(callsSucceeded) + .setTotalErrorRequests(callsFailed) + .setTotalIssuedRequests(callsIssued)) .addDroppedRequests(DroppedRequests.newBuilder() .setCategory("lb") .setDroppedCount(numLbDrops)) diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java index c2e1443295..d2f48f9614 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java @@ -76,13 +76,15 @@ public class XdsLoadStatsStoreTest { long callsSucceed, long callsInProgress, long callsFailed, + long callsIssued, @Nullable List metrics) { UpstreamLocalityStats.Builder builder = UpstreamLocalityStats.newBuilder() .setLocality(locality) .setTotalSuccessfulRequests(callsSucceed) .setTotalErrorRequests(callsFailed) - .setTotalRequestsInProgress(callsInProgress); + .setTotalRequestsInProgress(callsInProgress) + .setTotalIssuedRequests(callsIssued); if (metrics != null) { builder.addAllLoadMetricStats(metrics); } @@ -216,11 +218,11 @@ public class XdsLoadStatsStoreTest { StatsCounter counter1 = mock(StatsCounter.class); when(counter1.isActive()).thenReturn(true); when(counter1.snapshot()) - .thenReturn(new ClientLoadSnapshot(4315, 3421, 23), - new ClientLoadSnapshot(0, 543, 0)); + .thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593), + new ClientLoadSnapshot(0, 543, 0, 0)); StatsCounter counter2 = mock(StatsCounter.class); - when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431), - new ClientLoadSnapshot(0, 432, 0)); + when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702), + new ClientLoadSnapshot(0, 432, 0, 0)); when(counter2.isActive()).thenReturn(true); localityLoadCounters.put(LOCALITY1, counter1); localityLoadCounters.put(LOCALITY2, counter2); @@ -228,8 +230,8 @@ public class XdsLoadStatsStoreTest { ClusterStats expectedReport = buildClusterStats( Arrays.asList( - buildUpstreamLocalityStats(LOCALITY1, 4315 - 23, 3421, 23, null), - buildUpstreamLocalityStats(LOCALITY2, 41234 - 431, 432, 431, null) + buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593, null), + buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, null) ), null); @@ -240,10 +242,8 @@ public class XdsLoadStatsStoreTest { expectedReport = buildClusterStats( Arrays.asList( - buildUpstreamLocalityStats(LOCALITY1, 0, 543, 0, - null), - buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, - null) + buildUpstreamLocalityStats(LOCALITY1, 0, 543, 0, 0, null), + buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null) ), null); assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport());