core,grpclb: use denser atomics for census

This commit is contained in:
Carl Mastrangelo 2017-10-06 17:02:36 -07:00 committed by GitHub
parent 72f6d9bc08
commit 95a2723ea5
3 changed files with 172 additions and 105 deletions

View File

@ -46,9 +46,9 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -115,7 +115,7 @@ final class CensusStatsModule {
*/ */
@VisibleForTesting @VisibleForTesting
ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) { ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) {
return new ClientCallTracer(parentCtx, fullMethodName); return new ClientCallTracer(this, parentCtx, fullMethodName);
} }
/** /**
@ -133,57 +133,81 @@ final class CensusStatsModule {
} }
private static final class ClientTracer extends ClientStreamTracer { private static final class ClientTracer extends ClientStreamTracer {
final AtomicLong outboundMessageCount = new AtomicLong();
final AtomicLong inboundMessageCount = new AtomicLong(); private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater =
final AtomicLong outboundWireSize = new AtomicLong(); AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount");
final AtomicLong inboundWireSize = new AtomicLong(); private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater =
final AtomicLong outboundUncompressedSize = new AtomicLong(); AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount");
final AtomicLong inboundUncompressedSize = new AtomicLong(); private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> outboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize");
volatile long outboundMessageCount;
volatile long inboundMessageCount;
volatile long outboundWireSize;
volatile long inboundWireSize;
volatile long outboundUncompressedSize;
volatile long inboundUncompressedSize;
@Override @Override
public void outboundWireSize(long bytes) { public void outboundWireSize(long bytes) {
outboundWireSize.addAndGet(bytes); outboundWireSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundWireSize(long bytes) { public void inboundWireSize(long bytes) {
inboundWireSize.addAndGet(bytes); inboundWireSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void outboundUncompressedSize(long bytes) { public void outboundUncompressedSize(long bytes) {
outboundUncompressedSize.addAndGet(bytes); outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundUncompressedSize(long bytes) { public void inboundUncompressedSize(long bytes) {
inboundUncompressedSize.addAndGet(bytes); inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundMessage(int seqNo) { public void inboundMessage(int seqNo) {
inboundMessageCount.incrementAndGet(); inboundMessageCountUpdater.getAndIncrement(this);
} }
@Override @Override
public void outboundMessage(int seqNo) { public void outboundMessage(int seqNo) {
outboundMessageCount.incrementAndGet(); outboundMessageCountUpdater.getAndIncrement(this);
} }
} }
@VisibleForTesting @VisibleForTesting
final class ClientCallTracer extends ClientStreamTracer.Factory { static final class ClientCallTracer extends ClientStreamTracer.Factory {
private static final AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer>
streamTracerUpdater =
AtomicReferenceFieldUpdater.newUpdater(
ClientCallTracer.class, ClientTracer.class, "streamTracer");
private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
private final CensusStatsModule module;
private final String fullMethodName; private final String fullMethodName;
private final Stopwatch stopwatch; private final Stopwatch stopwatch;
private final AtomicReference<ClientTracer> streamTracer = new AtomicReference<ClientTracer>(); private volatile ClientTracer streamTracer;
private final AtomicBoolean callEnded = new AtomicBoolean(false); private volatile int callEnded;
private final StatsContext parentCtx; private final StatsContext parentCtx;
ClientCallTracer(StatsContext parentCtx, String fullMethodName) { ClientCallTracer(CensusStatsModule module, StatsContext parentCtx, String fullMethodName) {
this.module = module;
this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.stopwatch = stopwatchSupplier.get().start(); this.stopwatch = module.stopwatchSupplier.get().start();
} }
@Override @Override
@ -191,12 +215,13 @@ final class CensusStatsModule {
ClientTracer tracer = new ClientTracer(); ClientTracer tracer = new ClientTracer();
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them. // one streams. We will need to update this file to support them.
checkState(streamTracer.compareAndSet(null, tracer), checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case."); "Are you creating multiple streams per call? This class doesn't yet support this case.");
if (propagateTags) { if (module.propagateTags) {
headers.discardAll(statsHeader); headers.discardAll(module.statsHeader);
if (parentCtx != statsCtxFactory.getDefault()) { if (parentCtx != module.statsCtxFactory.getDefault()) {
headers.put(statsHeader, parentCtx); headers.put(module.statsHeader, parentCtx);
} }
} }
return tracer; return tracer;
@ -209,28 +234,28 @@ final class CensusStatsModule {
* is a no-op. * is a no-op.
*/ */
void callEnded(Status status) { void callEnded(Status status) {
if (!callEnded.compareAndSet(false, true)) { if (callEndedUpdater.getAndSet(this, 1) != 0) {
return; return;
} }
stopwatch.stop(); stopwatch.stop();
long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
ClientTracer tracer = streamTracer.get(); ClientTracer tracer = streamTracer;
if (tracer == null) { if (tracer == null) {
tracer = BLANK_CLIENT_TRACER; tracer = BLANK_CLIENT_TRACER;
} }
MeasurementMap.Builder builder = MeasurementMap.builder() MeasurementMap.Builder builder = MeasurementMap.builder()
// The metrics are in double // The metrics are in double
.put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) .put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
.put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount.get()) .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount.get()) .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
.put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize.get()) .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize)
.put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize.get()) .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize)
.put( .put(
RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES,
tracer.outboundUncompressedSize.get()) tracer.outboundUncompressedSize)
.put( .put(
RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES,
tracer.inboundUncompressedSize.get()); tracer.inboundUncompressedSize);
if (!status.isOk()) { if (!status.isOk()) {
builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0); builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0);
} }
@ -242,53 +267,74 @@ final class CensusStatsModule {
} }
} }
private final class ServerTracer extends ServerStreamTracer { private static final class ServerTracer extends ServerStreamTracer {
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
private static final AtomicLongFieldUpdater<ServerTracer> outboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> inboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> outboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize");
private final String fullMethodName; private final String fullMethodName;
@Nullable @Nullable
private final StatsContext parentCtx; private final StatsContext parentCtx;
private final AtomicBoolean streamClosed = new AtomicBoolean(false); private volatile int streamClosed;
private final Stopwatch stopwatch; private final Stopwatch stopwatch;
private final AtomicLong outboundMessageCount = new AtomicLong(); private final StatsContextFactory statsCtxFactory;
private final AtomicLong inboundMessageCount = new AtomicLong(); private volatile long outboundMessageCount;
private final AtomicLong outboundWireSize = new AtomicLong(); private volatile long inboundMessageCount;
private final AtomicLong inboundWireSize = new AtomicLong(); private volatile long outboundWireSize;
private final AtomicLong outboundUncompressedSize = new AtomicLong(); private volatile long inboundWireSize;
private final AtomicLong inboundUncompressedSize = new AtomicLong(); private volatile long outboundUncompressedSize;
private volatile long inboundUncompressedSize;
ServerTracer(String fullMethodName, StatsContext parentCtx) { ServerTracer(
String fullMethodName,
StatsContext parentCtx,
Supplier<Stopwatch> stopwatchSupplier,
StatsContextFactory statsCtxFactory) {
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.stopwatch = stopwatchSupplier.get().start(); this.stopwatch = stopwatchSupplier.get().start();
this.statsCtxFactory = statsCtxFactory;
} }
@Override @Override
public void outboundWireSize(long bytes) { public void outboundWireSize(long bytes) {
outboundWireSize.addAndGet(bytes); outboundWireSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundWireSize(long bytes) { public void inboundWireSize(long bytes) {
inboundWireSize.addAndGet(bytes); inboundWireSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void outboundUncompressedSize(long bytes) { public void outboundUncompressedSize(long bytes) {
outboundUncompressedSize.addAndGet(bytes); outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundUncompressedSize(long bytes) { public void inboundUncompressedSize(long bytes) {
inboundUncompressedSize.addAndGet(bytes); inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} }
@Override @Override
public void inboundMessage(int seqNo) { public void inboundMessage(int seqNo) {
inboundMessageCount.incrementAndGet(); inboundMessageCountUpdater.getAndIncrement(this);
} }
@Override @Override
public void outboundMessage(int seqNo) { public void outboundMessage(int seqNo) {
outboundMessageCount.incrementAndGet(); outboundMessageCountUpdater.getAndIncrement(this);
} }
/** /**
@ -299,7 +345,7 @@ final class CensusStatsModule {
*/ */
@Override @Override
public void streamClosed(Status status) { public void streamClosed(Status status) {
if (!streamClosed.compareAndSet(false, true)) { if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return; return;
} }
stopwatch.stop(); stopwatch.stop();
@ -307,16 +353,12 @@ final class CensusStatsModule {
MeasurementMap.Builder builder = MeasurementMap.builder() MeasurementMap.Builder builder = MeasurementMap.builder()
// The metrics are in double // The metrics are in double
.put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) .put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
.put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount.get()) .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
.put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount.get()) .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
.put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize.get()) .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize)
.put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize.get()) .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize)
.put( .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize)
RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize);
outboundUncompressedSize.get())
.put(
RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES,
inboundUncompressedSize.get());
if (!status.isOk()) { if (!status.isOk()) {
builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0); builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0);
} }
@ -344,7 +386,7 @@ final class CensusStatsModule {
parentCtx = statsCtxFactory.getDefault(); parentCtx = statsCtxFactory.getDefault();
} }
parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)); parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName));
return new ServerTracer(fullMethodName, parentCtx); return new ServerTracer(fullMethodName, parentCtx, stopwatchSupplier, statsCtxFactory);
} }
} }

View File

@ -45,7 +45,7 @@ import io.opencensus.trace.export.SampledSpanStore;
import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.propagation.BinaryFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -63,6 +63,10 @@ import javax.annotation.Nullable;
*/ */
final class CensusTracingModule { final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
private final Tracer censusTracer; private final Tracer censusTracer;
@VisibleForTesting @VisibleForTesting
@ -216,8 +220,8 @@ final class CensusTracingModule {
@VisibleForTesting @VisibleForTesting
final class ClientCallTracer extends ClientStreamTracer.Factory { final class ClientCallTracer extends ClientStreamTracer.Factory {
volatile int callEnded;
private final AtomicBoolean callEnded = new AtomicBoolean(false);
private final Span span; private final Span span;
ClientCallTracer(@Nullable Span parentSpan, String fullMethodName) { ClientCallTracer(@Nullable Span parentSpan, String fullMethodName) {
@ -245,7 +249,7 @@ final class CensusTracingModule {
* is a no-op. * is a no-op.
*/ */
void callEnded(io.grpc.Status status) { void callEnded(io.grpc.Status status) {
if (!callEnded.compareAndSet(false, true)) { if (callEndedUpdater.getAndSet(this, 1) != 0) {
return; return;
} }
span.end(createEndSpanOptions(status)); span.end(createEndSpanOptions(status));
@ -274,9 +278,10 @@ final class CensusTracingModule {
} }
} }
private final class ServerTracer extends ServerStreamTracer { private final class ServerTracer extends ServerStreamTracer {
private final Span span; private final Span span;
private final AtomicBoolean streamClosed = new AtomicBoolean(false); volatile int streamClosed;
ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) { ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) {
checkNotNull(fullMethodName, "fullMethodName"); checkNotNull(fullMethodName, "fullMethodName");
@ -297,7 +302,7 @@ final class CensusTracingModule {
*/ */
@Override @Override
public void streamClosed(io.grpc.Status status) { public void streamClosed(io.grpc.Status status) {
if (!streamClosed.compareAndSet(false, true)) { if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return; return;
} }
span.end(createEndSpanOptions(status)); span.end(createEndSpanOptions(status));

View File

@ -23,10 +23,11 @@ import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer; import io.grpc.ClientStreamTracer;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@ -36,17 +37,35 @@ import javax.annotation.concurrent.ThreadSafe;
*/ */
@ThreadSafe @ThreadSafe
final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsStartedUpdater =
AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted");
private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFinishedUpdater =
AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished");
private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFailedToSendUpdater =
AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend");
private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder>
callsFinishedKnownReceivedUpdater =
AtomicLongFieldUpdater.newUpdater(
GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived");
private final TimeProvider time; private final TimeProvider time;
private final AtomicLong callsStarted = new AtomicLong(); @SuppressWarnings("unused")
private final AtomicLong callsFinished = new AtomicLong(); private volatile long callsStarted;
@SuppressWarnings("unused")
private volatile long callsFinished;
private static final class LongHolder {
long num;
}
// Specific finish types // Specific finish types
// Access to it should be protected by lock. Contention is not an issue for these counts, because
// normally only a small portion of all RPCs are dropped.
@GuardedBy("this") @GuardedBy("this")
private HashMap<String, AtomicLong> callsDroppedPerToken = new HashMap<String, AtomicLong>(); private Map<String, LongHolder> callsDroppedPerToken = new HashMap<String, LongHolder>(1);
private final AtomicLong callsFailedToSend = new AtomicLong(); @SuppressWarnings("unused")
private final AtomicLong callsFinishedKnownReceived = new AtomicLong(); private volatile long callsFailedToSend;
@SuppressWarnings("unused")
private volatile long callsFinishedKnownReceived;
GrpclbClientLoadRecorder(TimeProvider time) { GrpclbClientLoadRecorder(TimeProvider time) {
this.time = checkNotNull(time, "time provider"); this.time = checkNotNull(time, "time provider");
@ -54,7 +73,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
@Override @Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
callsStarted.incrementAndGet(); callsStartedUpdater.getAndIncrement(this);
return new StreamTracer(); return new StreamTracer();
} }
@ -62,17 +81,15 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
* Records that a request has been dropped as instructed by the remote balancer. * Records that a request has been dropped as instructed by the remote balancer.
*/ */
void recordDroppedRequest(String token) { void recordDroppedRequest(String token) {
callsStarted.incrementAndGet(); callsStartedUpdater.getAndIncrement(this);
callsFinished.incrementAndGet(); callsFinishedUpdater.getAndIncrement(this);
synchronized (this) { synchronized (this) {
AtomicLong count = callsDroppedPerToken.get(token); LongHolder holder;
if (count == null) { if ((holder = callsDroppedPerToken.get(token)) == null) {
count = new AtomicLong(1); callsDroppedPerToken.put(token, (holder = new LongHolder()));
callsDroppedPerToken.put(token, count);
} else {
count.incrementAndGet();
} }
holder.num++;
} }
} }
@ -83,52 +100,55 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
ClientStats.Builder statsBuilder = ClientStats.Builder statsBuilder =
ClientStats.newBuilder() ClientStats.newBuilder()
.setTimestamp(Timestamps.fromMillis(time.currentTimeMillis())) .setTimestamp(Timestamps.fromMillis(time.currentTimeMillis()))
.setNumCallsStarted(callsStarted.getAndSet(0)) .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
.setNumCallsFinished(callsFinished.getAndSet(0)) .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
.setNumCallsFinishedWithClientFailedToSend(callsFailedToSend.getAndSet(0)) .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
.setNumCallsFinishedKnownReceived(callsFinishedKnownReceived.getAndSet(0)); .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0));
HashMap<String, AtomicLong> savedCallsDroppedPerToken;
Map<String, LongHolder> localCallsDroppedPerToken = Collections.emptyMap();
synchronized (this) { synchronized (this) {
savedCallsDroppedPerToken = callsDroppedPerToken; if (!callsDroppedPerToken.isEmpty()) {
callsDroppedPerToken = new HashMap<String, AtomicLong>(); localCallsDroppedPerToken = callsDroppedPerToken;
callsDroppedPerToken = new HashMap<String, LongHolder>(localCallsDroppedPerToken.size());
} }
for (Map.Entry<String, AtomicLong> dropCount : savedCallsDroppedPerToken.entrySet()) { }
for (Entry<String, LongHolder> entry : localCallsDroppedPerToken.entrySet()) {
statsBuilder.addCallsFinishedWithDrop( statsBuilder.addCallsFinishedWithDrop(
ClientStatsPerToken.newBuilder() ClientStatsPerToken.newBuilder()
.setLoadBalanceToken(dropCount.getKey()) .setLoadBalanceToken(entry.getKey())
.setNumCalls(dropCount.getValue().get()) .setNumCalls(entry.getValue().num)
.build()); .build());
} }
return statsBuilder.build(); return statsBuilder.build();
} }
private class StreamTracer extends ClientStreamTracer { private class StreamTracer extends ClientStreamTracer {
final AtomicBoolean headersSent = new AtomicBoolean(); private volatile boolean headersSent;
final AtomicBoolean anythingReceived = new AtomicBoolean(); private volatile boolean anythingReceived;
@Override @Override
public void outboundHeaders() { public void outboundHeaders() {
headersSent.set(true); headersSent = true;
} }
@Override @Override
public void inboundHeaders() { public void inboundHeaders() {
anythingReceived.set(true); anythingReceived = true;
} }
@Override @Override
public void inboundMessage(int seqNo) { public void inboundMessage(int seqNo) {
anythingReceived.set(true); anythingReceived = true;
} }
@Override @Override
public void streamClosed(Status status) { public void streamClosed(Status status) {
callsFinished.incrementAndGet(); callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
if (!headersSent.get()) { if (!headersSent) {
callsFailedToSend.incrementAndGet(); callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
} }
if (anythingReceived.get()) { if (anythingReceived) {
callsFinishedKnownReceived.incrementAndGet(); callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
} }
} }
} }