xds: add counts for recently issued calls in client side load reporting (#5735)

* added counts for recently issued calls in client side load reporting

* use recordCallStarted/Finished to manipulate counter instead of explicitly incr/decr methods
This commit is contained in:
Chengyuan Zhang 2019-05-15 15:52:20 -07:00 committed by GitHub
parent e6c8534f10
commit b7fb3c2e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 90 deletions

View File

@ -38,8 +38,9 @@ import javax.annotation.concurrent.ThreadSafe;
@NotThreadSafe @NotThreadSafe
final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter { final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
private final AtomicLong callsInProgress = new AtomicLong(); private final AtomicLong callsInProgress = new AtomicLong();
private final AtomicLong callsFinished = new AtomicLong(); private final AtomicLong callsSucceeded = new AtomicLong();
private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong();
private final AtomicLong callsIssued = new AtomicLong();
ClientLoadCounter() { ClientLoadCounter() {
} }
@ -48,30 +49,27 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
* Must only be used for testing. * Must only be used for testing.
*/ */
@VisibleForTesting @VisibleForTesting
ClientLoadCounter(long callsFinished, long callsInProgress, long callsFailed) { ClientLoadCounter(long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued) {
this.callsFinished.set(callsFinished); this.callsSucceeded.set(callsSucceeded);
this.callsInProgress.set(callsInProgress); this.callsInProgress.set(callsInProgress);
this.callsFailed.set(callsFailed); this.callsFailed.set(callsFailed);
this.callsIssued.set(callsIssued);
} }
@Override @Override
void incrementCallsInProgress() { void recordCallStarted() {
callsIssued.getAndIncrement();
callsInProgress.getAndIncrement(); callsInProgress.getAndIncrement();
} }
@Override @Override
void decrementCallsInProgress() { void recordCallFinished(Status status) {
callsInProgress.getAndDecrement(); callsInProgress.getAndDecrement();
} if (status.isOk()) {
callsSucceeded.getAndIncrement();
@Override } else {
void incrementCallsFinished() { callsFailed.getAndIncrement();
callsFinished.getAndIncrement(); }
}
@Override
void incrementCallsFailed() {
callsFailed.getAndIncrement();
} }
/** /**
@ -82,9 +80,10 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
*/ */
@Override @Override
public ClientLoadSnapshot snapshot() { public ClientLoadSnapshot snapshot() {
return new ClientLoadSnapshot(callsFinished.getAndSet(0), return new ClientLoadSnapshot(callsSucceeded.getAndSet(0),
callsInProgress.get(), callsInProgress.get(),
callsFailed.getAndSet(0)); callsFailed.getAndSet(0),
callsIssued.getAndSet(0));
} }
/** /**
@ -94,23 +93,28 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
static final class ClientLoadSnapshot { static final class ClientLoadSnapshot {
@VisibleForTesting @VisibleForTesting
static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0); static final ClientLoadSnapshot EMPTY_SNAPSHOT = new ClientLoadSnapshot(0, 0, 0, 0);
private final long callsFinished; private final long callsSucceeded;
private final long callsInProgress; private final long callsInProgress;
private final long callsFailed; private final long callsFailed;
private final long callsIssued;
/** /**
* External usage must only be for testing. * External usage must only be for testing.
*/ */
@VisibleForTesting @VisibleForTesting
ClientLoadSnapshot(long callsFinished, long callsInProgress, long callsFailed) { ClientLoadSnapshot(long callsSucceeded,
this.callsFinished = callsFinished; long callsInProgress,
long callsFailed,
long callsIssued) {
this.callsSucceeded = callsSucceeded;
this.callsInProgress = callsInProgress; this.callsInProgress = callsInProgress;
this.callsFailed = callsFailed; this.callsFailed = callsFailed;
this.callsIssued = callsIssued;
} }
long getCallsFinished() { long getCallsSucceeded() {
return callsFinished; return callsSucceeded;
} }
long getCallsInProgress() { long getCallsInProgress() {
@ -121,12 +125,17 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
return callsFailed; return callsFailed;
} }
long getCallsIssued() {
return callsIssued;
}
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("callsFinished", callsFinished) .add("callsSucceeded", callsSucceeded)
.add("callsInProgress", callsInProgress) .add("callsInProgress", callsInProgress)
.add("callsFailed", callsFailed) .add("callsFailed", callsFailed)
.add("callsIssued", callsIssued)
.toString(); .toString();
} }
} }
@ -148,7 +157,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
@Override @Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
counter.incrementCallsInProgress(); counter.recordCallStarted();
final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers); final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers);
return new ForwardingClientStreamTracer() { return new ForwardingClientStreamTracer() {
@Override @Override
@ -158,11 +167,7 @@ final class ClientLoadCounter extends XdsLoadStatsStore.StatsCounter {
@Override @Override
public void streamClosed(Status status) { public void streamClosed(Status status) {
counter.incrementCallsFinished(); counter.recordCallFinished(status);
counter.decrementCallsInProgress();
if (!status.isOk()) {
counter.incrementCallsFailed();
}
delegate().streamClosed(status); delegate().streamClosed(status);
} }
}; };

View File

@ -24,6 +24,7 @@ import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats.DroppedRequests;
import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats; import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.grpc.Status;
import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot; import io.grpc.xds.ClientLoadCounter.ClientLoadSnapshot;
import io.grpc.xds.XdsLoadReportClientImpl.StatsStore; import io.grpc.xds.XdsLoadReportClientImpl.StatsStore;
import java.util.Map; import java.util.Map;
@ -71,9 +72,10 @@ final class XdsLoadStatsStore implements StatsStore {
UpstreamLocalityStats.Builder localityStatsBuilder = UpstreamLocalityStats.Builder localityStatsBuilder =
UpstreamLocalityStats.newBuilder().setLocality(entry.getKey()); UpstreamLocalityStats.newBuilder().setLocality(entry.getKey());
localityStatsBuilder localityStatsBuilder
.setTotalSuccessfulRequests(snapshot.getCallsFinished() - snapshot.getCallsFailed()) .setTotalSuccessfulRequests(snapshot.getCallsSucceeded())
.setTotalErrorRequests(snapshot.getCallsFailed()) .setTotalErrorRequests(snapshot.getCallsFailed())
.setTotalRequestsInProgress(snapshot.getCallsInProgress()); .setTotalRequestsInProgress(snapshot.getCallsInProgress())
.setTotalIssuedRequests(snapshot.getCallsIssued());
statsBuilder.addUpstreamLocalityStats(localityStatsBuilder); statsBuilder.addUpstreamLocalityStats(localityStatsBuilder);
// Discard counters for localities that are no longer exposed by the remote balancer and // Discard counters for localities that are no longer exposed by the remote balancer and
// no RPCs ongoing. // no RPCs ongoing.
@ -150,19 +152,16 @@ final class XdsLoadStatsStore implements StatsStore {
} }
/** /**
* Blueprint for counters that can can record number of calls in-progress, finished, failed. * Blueprint for counters that can can record number of calls in-progress, succeeded, failed and
* issued.
*/ */
abstract static class StatsCounter { abstract static class StatsCounter {
private boolean active = true; private boolean active = true;
abstract void incrementCallsInProgress(); abstract void recordCallStarted();
abstract void decrementCallsInProgress(); abstract void recordCallFinished(Status status);
abstract void incrementCallsFinished();
abstract void incrementCallsFailed();
abstract ClientLoadSnapshot snapshot(); abstract ClientLoadSnapshot snapshot();

View File

@ -50,55 +50,51 @@ public class ClientLoadCounterTest {
public void setUp() { public void setUp() {
counter = new ClientLoadCounter(); counter = new ClientLoadCounter();
ClientLoadSnapshot emptySnapshot = counter.snapshot(); ClientLoadSnapshot emptySnapshot = counter.snapshot();
assertThat(emptySnapshot.getCallsInProgress()).isEqualTo(0); assertSnapshot(emptySnapshot, 0, 0, 0, 0);
assertThat(emptySnapshot.getCallsFinished()).isEqualTo(0);
assertThat(emptySnapshot.getCallsFailed()).isEqualTo(0);
} }
@Test @Test
public void snapshotContainsEverything() { public void snapshotContainsEverything() {
long numFinishedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numSucceededCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numInProgressCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numInProgressCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numFailedCalls = ThreadLocalRandom.current().nextLong(numFinishedCalls); long numFailedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
counter = new ClientLoadCounter(numFinishedCalls, numInProgressCalls, numFailedCalls); long numIssuedCalls = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
counter =
new ClientLoadCounter(numSucceededCalls, numInProgressCalls, numFailedCalls,
numIssuedCalls);
ClientLoadSnapshot snapshot = counter.snapshot(); ClientLoadSnapshot snapshot = counter.snapshot();
assertThat(snapshot.getCallsFinished()).isEqualTo(numFinishedCalls); assertSnapshot(snapshot, numSucceededCalls, numInProgressCalls, numFailedCalls, numIssuedCalls);
assertThat(snapshot.getCallsInProgress()).isEqualTo(numInProgressCalls);
assertThat(snapshot.getCallsFailed()).isEqualTo(numFailedCalls);
String snapshotStr = snapshot.toString(); String snapshotStr = snapshot.toString();
assertThat(snapshotStr).contains("callsFinished=" + numFinishedCalls); assertThat(snapshotStr).contains("callsSucceeded=" + numSucceededCalls);
assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls);
assertThat(snapshotStr).contains("callsFailed=" + numFailedCalls); assertThat(snapshotStr).contains("callsFailed=" + numFailedCalls);
assertThat(snapshotStr).contains("callsIssued=" + numIssuedCalls);
// Snapshot only accounts for stats happening after previous snapshot. // Snapshot only accounts for stats happening after previous snapshot.
snapshot = counter.snapshot(); snapshot = counter.snapshot();
assertThat(snapshot.getCallsFinished()).isEqualTo(0); assertSnapshot(snapshot, 0, numInProgressCalls, 0, 0);
assertThat(snapshot.getCallsInProgress()).isEqualTo(numInProgressCalls);
assertThat(snapshot.getCallsFailed()).isEqualTo(0);
snapshotStr = snapshot.toString(); snapshotStr = snapshot.toString();
assertThat(snapshotStr).contains("callsFinished=0"); assertThat(snapshotStr).contains("callsSucceeded=0");
assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls); assertThat(snapshotStr).contains("callsInProgress=" + numInProgressCalls);
assertThat(snapshotStr).contains("callsFailed=0"); assertThat(snapshotStr).contains("callsFailed=0");
assertThat(snapshotStr).contains("callsIssued=0");
} }
@Test @Test
public void normalCountingOperations() { public void normalCountingOperations() {
ClientLoadSnapshot preSnapshot = counter.snapshot(); counter.recordCallStarted();
counter.incrementCallsInProgress(); ClientLoadSnapshot snapshot = counter.snapshot();
ClientLoadSnapshot afterSnapshot = counter.snapshot(); assertSnapshot(snapshot, 0, 1, 0, 1);
assertThat(afterSnapshot.getCallsInProgress()).isEqualTo(preSnapshot.getCallsInProgress() + 1);
counter.decrementCallsInProgress();
afterSnapshot = counter.snapshot();
assertThat(afterSnapshot.getCallsInProgress()).isEqualTo(preSnapshot.getCallsInProgress());
counter.incrementCallsFinished(); counter.recordCallFinished(Status.OK);
afterSnapshot = counter.snapshot(); snapshot = counter.snapshot();
assertThat(afterSnapshot.getCallsFinished()).isEqualTo(1); assertSnapshot(snapshot, 1, 0, 0, 0);
counter.incrementCallsFailed(); counter.recordCallStarted();
afterSnapshot = counter.snapshot(); counter.recordCallFinished(Status.CANCELLED);
assertThat(afterSnapshot.getCallsFailed()).isEqualTo(1); snapshot = counter.snapshot();
assertSnapshot(snapshot, 0, 0, 1, 1);
} }
@Test @Test
@ -107,14 +103,10 @@ public class ClientLoadCounterTest {
new XdsClientLoadRecorder(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY); new XdsClientLoadRecorder(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
ClientStreamTracer tracer = recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()); ClientStreamTracer tracer = recorder1.newClientStreamTracer(STREAM_INFO, new Metadata());
ClientLoadSnapshot snapshot = counter.snapshot(); ClientLoadSnapshot snapshot = counter.snapshot();
assertThat(snapshot.getCallsFinished()).isEqualTo(0); assertSnapshot(snapshot, 0, 1, 0, 1);
assertThat(snapshot.getCallsInProgress()).isEqualTo(1);
assertThat(snapshot.getCallsFailed()).isEqualTo(0);
tracer.streamClosed(Status.OK); tracer.streamClosed(Status.OK);
snapshot = counter.snapshot(); snapshot = counter.snapshot();
assertThat(snapshot.getCallsFinished()).isEqualTo(1); assertSnapshot(snapshot, 1, 0, 0, 0);
assertThat(snapshot.getCallsInProgress()).isEqualTo(0);
assertThat(snapshot.getCallsFailed()).isEqualTo(0);
// Create a second XdsClientLoadRecorder with the same counter, stats are aggregated together. // Create a second XdsClientLoadRecorder with the same counter, stats are aggregated together.
XdsClientLoadRecorder recorder2 = XdsClientLoadRecorder recorder2 =
@ -122,8 +114,17 @@ public class ClientLoadCounterTest {
recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED); recorder1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED);
recorder2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED); recorder2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED);
snapshot = counter.snapshot(); snapshot = counter.snapshot();
assertThat(snapshot.getCallsFinished()).isEqualTo(2); assertSnapshot(snapshot, 0, 0, 2, 2);
assertThat(snapshot.getCallsInProgress()).isEqualTo(0); }
assertThat(snapshot.getCallsFailed()).isEqualTo(2);
private void assertSnapshot(ClientLoadSnapshot snapshot,
long callsSucceeded,
long callsInProgress,
long callsFailed,
long callsIssued) {
assertThat(snapshot.getCallsSucceeded()).isEqualTo(callsSucceeded);
assertThat(snapshot.getCallsInProgress()).isEqualTo(callsInProgress);
assertThat(snapshot.getCallsFailed()).isEqualTo(callsFailed);
assertThat(snapshot.getCallsIssued()).isEqualTo(callsIssued);
} }
} }

View File

@ -353,8 +353,9 @@ public class XdsLoadReportClientImplTest {
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsFinished = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsFailed = callsFinished - ThreadLocalRandom.current().nextLong(callsFinished); long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsIssued = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numLbDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numLbDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
@ -364,8 +365,9 @@ public class XdsLoadReportClientImplTest {
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder() .addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY) .setLocality(TEST_LOCALITY)
.setTotalRequestsInProgress(callsInProgress) .setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsFinished - callsFailed) .setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)) .setTotalErrorRequests(callsFailed)
.setTotalIssuedRequests(callsIssued))
.addDroppedRequests(DroppedRequests.newBuilder() .addDroppedRequests(DroppedRequests.newBuilder()
.setCategory("lb") .setCategory("lb")
.setDroppedCount(numLbDrops)) .setDroppedCount(numLbDrops))

View File

@ -76,13 +76,15 @@ public class XdsLoadStatsStoreTest {
long callsSucceed, long callsSucceed,
long callsInProgress, long callsInProgress,
long callsFailed, long callsFailed,
long callsIssued,
@Nullable List<EndpointLoadMetricStats> metrics) { @Nullable List<EndpointLoadMetricStats> metrics) {
UpstreamLocalityStats.Builder builder = UpstreamLocalityStats.Builder builder =
UpstreamLocalityStats.newBuilder() UpstreamLocalityStats.newBuilder()
.setLocality(locality) .setLocality(locality)
.setTotalSuccessfulRequests(callsSucceed) .setTotalSuccessfulRequests(callsSucceed)
.setTotalErrorRequests(callsFailed) .setTotalErrorRequests(callsFailed)
.setTotalRequestsInProgress(callsInProgress); .setTotalRequestsInProgress(callsInProgress)
.setTotalIssuedRequests(callsIssued);
if (metrics != null) { if (metrics != null) {
builder.addAllLoadMetricStats(metrics); builder.addAllLoadMetricStats(metrics);
} }
@ -216,11 +218,11 @@ public class XdsLoadStatsStoreTest {
StatsCounter counter1 = mock(StatsCounter.class); StatsCounter counter1 = mock(StatsCounter.class);
when(counter1.isActive()).thenReturn(true); when(counter1.isActive()).thenReturn(true);
when(counter1.snapshot()) when(counter1.snapshot())
.thenReturn(new ClientLoadSnapshot(4315, 3421, 23), .thenReturn(new ClientLoadSnapshot(4315, 3421, 23, 593),
new ClientLoadSnapshot(0, 543, 0)); new ClientLoadSnapshot(0, 543, 0, 0));
StatsCounter counter2 = mock(StatsCounter.class); StatsCounter counter2 = mock(StatsCounter.class);
when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431), when(counter2.snapshot()).thenReturn(new ClientLoadSnapshot(41234, 432, 431, 702),
new ClientLoadSnapshot(0, 432, 0)); new ClientLoadSnapshot(0, 432, 0, 0));
when(counter2.isActive()).thenReturn(true); when(counter2.isActive()).thenReturn(true);
localityLoadCounters.put(LOCALITY1, counter1); localityLoadCounters.put(LOCALITY1, counter1);
localityLoadCounters.put(LOCALITY2, counter2); localityLoadCounters.put(LOCALITY2, counter2);
@ -228,8 +230,8 @@ public class XdsLoadStatsStoreTest {
ClusterStats expectedReport = ClusterStats expectedReport =
buildClusterStats( buildClusterStats(
Arrays.asList( Arrays.asList(
buildUpstreamLocalityStats(LOCALITY1, 4315 - 23, 3421, 23, null), buildUpstreamLocalityStats(LOCALITY1, 4315, 3421, 23, 593, null),
buildUpstreamLocalityStats(LOCALITY2, 41234 - 431, 432, 431, null) buildUpstreamLocalityStats(LOCALITY2, 41234, 432, 431, 702, null)
), ),
null); null);
@ -240,10 +242,8 @@ public class XdsLoadStatsStoreTest {
expectedReport = expectedReport =
buildClusterStats( buildClusterStats(
Arrays.asList( Arrays.asList(
buildUpstreamLocalityStats(LOCALITY1, 0, 543, 0, buildUpstreamLocalityStats(LOCALITY1, 0, 543, 0, 0, null),
null), buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null)
buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0,
null)
), ),
null); null);
assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport()); assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport());