From 95a2723ea5774df54a800540084db5b3f972737d Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 6 Oct 2017 17:02:36 -0700 Subject: [PATCH] core,grpclb: use denser atomics for census --- .../io/grpc/internal/CensusStatsModule.java | 166 +++++++++++------- .../io/grpc/internal/CensusTracingModule.java | 15 +- .../grpc/grpclb/GrpclbClientLoadRecorder.java | 96 ++++++---- 3 files changed, 172 insertions(+), 105 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 7d4cbcce14..efd2d7e98f 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -46,9 +46,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -115,7 +115,7 @@ final class CensusStatsModule { */ @VisibleForTesting ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) { - return new ClientCallTracer(parentCtx, fullMethodName); + return new ClientCallTracer(this, parentCtx, fullMethodName); } /** @@ -133,57 +133,81 @@ final class CensusStatsModule { } private static final class ClientTracer extends ClientStreamTracer { - final AtomicLong outboundMessageCount = new AtomicLong(); - final AtomicLong inboundMessageCount = new AtomicLong(); - final AtomicLong outboundWireSize = new AtomicLong(); - final AtomicLong inboundWireSize = new AtomicLong(); - final AtomicLong outboundUncompressedSize = new AtomicLong(); - final AtomicLong inboundUncompressedSize = new AtomicLong(); + + private static final AtomicLongFieldUpdater outboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount"); + private static final AtomicLongFieldUpdater inboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount"); + private static final AtomicLongFieldUpdater outboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize"); + private static final AtomicLongFieldUpdater inboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize"); + private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize"); + private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize"); + + volatile long outboundMessageCount; + volatile long inboundMessageCount; + volatile long outboundWireSize; + volatile long inboundWireSize; + volatile long outboundUncompressedSize; + volatile long inboundUncompressedSize; @Override public void outboundWireSize(long bytes) { - outboundWireSize.addAndGet(bytes); + outboundWireSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundWireSize(long bytes) { - inboundWireSize.addAndGet(bytes); + inboundWireSizeUpdater.getAndAdd(this, bytes); } @Override public void outboundUncompressedSize(long bytes) { - outboundUncompressedSize.addAndGet(bytes); + outboundUncompressedSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundUncompressedSize(long bytes) { - inboundUncompressedSize.addAndGet(bytes); + inboundUncompressedSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundMessage(int seqNo) { - inboundMessageCount.incrementAndGet(); + inboundMessageCountUpdater.getAndIncrement(this); } @Override public void outboundMessage(int seqNo) { - outboundMessageCount.incrementAndGet(); + outboundMessageCountUpdater.getAndIncrement(this); } } - @VisibleForTesting - final class ClientCallTracer extends ClientStreamTracer.Factory { + + @VisibleForTesting + static final class ClientCallTracer extends ClientStreamTracer.Factory { + private static final AtomicReferenceFieldUpdater + streamTracerUpdater = + AtomicReferenceFieldUpdater.newUpdater( + ClientCallTracer.class, ClientTracer.class, "streamTracer"); + private static final AtomicIntegerFieldUpdater callEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + + private final CensusStatsModule module; private final String fullMethodName; private final Stopwatch stopwatch; - private final AtomicReference streamTracer = new AtomicReference(); - private final AtomicBoolean callEnded = new AtomicBoolean(false); + private volatile ClientTracer streamTracer; + private volatile int callEnded; private final StatsContext parentCtx; - ClientCallTracer(StatsContext parentCtx, String fullMethodName) { + ClientCallTracer(CensusStatsModule module, StatsContext parentCtx, String fullMethodName) { + this.module = module; this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); - this.stopwatch = stopwatchSupplier.get().start(); + this.stopwatch = module.stopwatchSupplier.get().start(); } @Override @@ -191,12 +215,13 @@ final class CensusStatsModule { ClientTracer tracer = new ClientTracer(); // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // one streams. We will need to update this file to support them. - checkState(streamTracer.compareAndSet(null, tracer), + checkState( + streamTracerUpdater.compareAndSet(this, null, tracer), "Are you creating multiple streams per call? This class doesn't yet support this case."); - if (propagateTags) { - headers.discardAll(statsHeader); - if (parentCtx != statsCtxFactory.getDefault()) { - headers.put(statsHeader, parentCtx); + if (module.propagateTags) { + headers.discardAll(module.statsHeader); + if (parentCtx != module.statsCtxFactory.getDefault()) { + headers.put(module.statsHeader, parentCtx); } } return tracer; @@ -209,28 +234,28 @@ final class CensusStatsModule { * is a no-op. */ void callEnded(Status status) { - if (!callEnded.compareAndSet(false, true)) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { return; } stopwatch.stop(); long roundtripNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - ClientTracer tracer = streamTracer.get(); + ClientTracer tracer = streamTracer; if (tracer == null) { tracer = BLANK_CLIENT_TRACER; } MeasurementMap.Builder builder = MeasurementMap.builder() // The metrics are in double .put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount.get()) - .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount.get()) - .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize.get()) - .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize.get()) + .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) + .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) + .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) + .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) .put( RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, - tracer.outboundUncompressedSize.get()) + tracer.outboundUncompressedSize) .put( RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, - tracer.inboundUncompressedSize.get()); + tracer.inboundUncompressedSize); if (!status.isOk()) { builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0); } @@ -242,53 +267,74 @@ final class CensusStatsModule { } } - private final class ServerTracer extends ServerStreamTracer { + private static final class ServerTracer extends ServerStreamTracer { + private static final AtomicIntegerFieldUpdater streamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + private static final AtomicLongFieldUpdater outboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount"); + private static final AtomicLongFieldUpdater inboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount"); + private static final AtomicLongFieldUpdater outboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize"); + private static final AtomicLongFieldUpdater inboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize"); + private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize"); + private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize"); + private final String fullMethodName; @Nullable private final StatsContext parentCtx; - private final AtomicBoolean streamClosed = new AtomicBoolean(false); + private volatile int streamClosed; private final Stopwatch stopwatch; - private final AtomicLong outboundMessageCount = new AtomicLong(); - private final AtomicLong inboundMessageCount = new AtomicLong(); - private final AtomicLong outboundWireSize = new AtomicLong(); - private final AtomicLong inboundWireSize = new AtomicLong(); - private final AtomicLong outboundUncompressedSize = new AtomicLong(); - private final AtomicLong inboundUncompressedSize = new AtomicLong(); + private final StatsContextFactory statsCtxFactory; + private volatile long outboundMessageCount; + private volatile long inboundMessageCount; + private volatile long outboundWireSize; + private volatile long inboundWireSize; + private volatile long outboundUncompressedSize; + private volatile long inboundUncompressedSize; - ServerTracer(String fullMethodName, StatsContext parentCtx) { + ServerTracer( + String fullMethodName, + StatsContext parentCtx, + Supplier stopwatchSupplier, + StatsContextFactory statsCtxFactory) { this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); + this.statsCtxFactory = statsCtxFactory; } @Override public void outboundWireSize(long bytes) { - outboundWireSize.addAndGet(bytes); + outboundWireSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundWireSize(long bytes) { - inboundWireSize.addAndGet(bytes); + inboundWireSizeUpdater.getAndAdd(this, bytes); } @Override public void outboundUncompressedSize(long bytes) { - outboundUncompressedSize.addAndGet(bytes); + outboundUncompressedSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundUncompressedSize(long bytes) { - inboundUncompressedSize.addAndGet(bytes); + inboundUncompressedSizeUpdater.getAndAdd(this, bytes); } @Override public void inboundMessage(int seqNo) { - inboundMessageCount.incrementAndGet(); + inboundMessageCountUpdater.getAndIncrement(this); } @Override public void outboundMessage(int seqNo) { - outboundMessageCount.incrementAndGet(); + outboundMessageCountUpdater.getAndIncrement(this); } /** @@ -299,7 +345,7 @@ final class CensusStatsModule { */ @Override public void streamClosed(Status status) { - if (!streamClosed.compareAndSet(false, true)) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; } stopwatch.stop(); @@ -307,16 +353,12 @@ final class CensusStatsModule { MeasurementMap.Builder builder = MeasurementMap.builder() // The metrics are in double .put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount.get()) - .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount.get()) - .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize.get()) - .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize.get()) - .put( - RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, - outboundUncompressedSize.get()) - .put( - RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, - inboundUncompressedSize.get()); + .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) + .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) + .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) + .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) + .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) + .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); if (!status.isOk()) { builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0); } @@ -344,7 +386,7 @@ final class CensusStatsModule { parentCtx = statsCtxFactory.getDefault(); } parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)); - return new ServerTracer(fullMethodName, parentCtx); + return new ServerTracer(fullMethodName, parentCtx, stopwatchSupplier, statsCtxFactory); } } diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index c691a63765..9b7fb1c918 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -45,7 +45,7 @@ import io.opencensus.trace.export.SampledSpanStore; import io.opencensus.trace.propagation.BinaryFormat; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -63,6 +63,10 @@ import javax.annotation.Nullable; */ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); + private static final AtomicIntegerFieldUpdater callEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + private static final AtomicIntegerFieldUpdater streamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); private final Tracer censusTracer; @VisibleForTesting @@ -216,8 +220,8 @@ final class CensusTracingModule { @VisibleForTesting final class ClientCallTracer extends ClientStreamTracer.Factory { + volatile int callEnded; - private final AtomicBoolean callEnded = new AtomicBoolean(false); private final Span span; ClientCallTracer(@Nullable Span parentSpan, String fullMethodName) { @@ -245,7 +249,7 @@ final class CensusTracingModule { * is a no-op. */ void callEnded(io.grpc.Status status) { - if (!callEnded.compareAndSet(false, true)) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { return; } span.end(createEndSpanOptions(status)); @@ -274,9 +278,10 @@ final class CensusTracingModule { } } + private final class ServerTracer extends ServerStreamTracer { private final Span span; - private final AtomicBoolean streamClosed = new AtomicBoolean(false); + volatile int streamClosed; ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) { checkNotNull(fullMethodName, "fullMethodName"); @@ -297,7 +302,7 @@ final class CensusTracingModule { */ @Override public void streamClosed(io.grpc.Status status) { - if (!streamClosed.compareAndSet(false, true)) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; } span.end(createEndSpanOptions(status)); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java index 3115fa388c..8a165cff59 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java @@ -23,10 +23,11 @@ import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.Metadata; import io.grpc.Status; +import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -36,17 +37,35 @@ import javax.annotation.concurrent.ThreadSafe; */ @ThreadSafe final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { + + private static final AtomicLongFieldUpdater callsStartedUpdater = + AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted"); + private static final AtomicLongFieldUpdater callsFinishedUpdater = + AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished"); + private static final AtomicLongFieldUpdater callsFailedToSendUpdater = + AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend"); + private static final AtomicLongFieldUpdater + callsFinishedKnownReceivedUpdater = + AtomicLongFieldUpdater.newUpdater( + GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived"); + private final TimeProvider time; - private final AtomicLong callsStarted = new AtomicLong(); - private final AtomicLong callsFinished = new AtomicLong(); + @SuppressWarnings("unused") + private volatile long callsStarted; + @SuppressWarnings("unused") + private volatile long callsFinished; + + private static final class LongHolder { + long num; + } // Specific finish types - // Access to it should be protected by lock. Contention is not an issue for these counts, because - // normally only a small portion of all RPCs are dropped. @GuardedBy("this") - private HashMap callsDroppedPerToken = new HashMap(); - private final AtomicLong callsFailedToSend = new AtomicLong(); - private final AtomicLong callsFinishedKnownReceived = new AtomicLong(); + private Map callsDroppedPerToken = new HashMap(1); + @SuppressWarnings("unused") + private volatile long callsFailedToSend; + @SuppressWarnings("unused") + private volatile long callsFinishedKnownReceived; GrpclbClientLoadRecorder(TimeProvider time) { this.time = checkNotNull(time, "time provider"); @@ -54,7 +73,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { @Override public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { - callsStarted.incrementAndGet(); + callsStartedUpdater.getAndIncrement(this); return new StreamTracer(); } @@ -62,17 +81,15 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { * Records that a request has been dropped as instructed by the remote balancer. */ void recordDroppedRequest(String token) { - callsStarted.incrementAndGet(); - callsFinished.incrementAndGet(); + callsStartedUpdater.getAndIncrement(this); + callsFinishedUpdater.getAndIncrement(this); synchronized (this) { - AtomicLong count = callsDroppedPerToken.get(token); - if (count == null) { - count = new AtomicLong(1); - callsDroppedPerToken.put(token, count); - } else { - count.incrementAndGet(); + LongHolder holder; + if ((holder = callsDroppedPerToken.get(token)) == null) { + callsDroppedPerToken.put(token, (holder = new LongHolder())); } + holder.num++; } } @@ -83,52 +100,55 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { ClientStats.Builder statsBuilder = ClientStats.newBuilder() .setTimestamp(Timestamps.fromMillis(time.currentTimeMillis())) - .setNumCallsStarted(callsStarted.getAndSet(0)) - .setNumCallsFinished(callsFinished.getAndSet(0)) - .setNumCallsFinishedWithClientFailedToSend(callsFailedToSend.getAndSet(0)) - .setNumCallsFinishedKnownReceived(callsFinishedKnownReceived.getAndSet(0)); - HashMap savedCallsDroppedPerToken; + .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0)) + .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0)) + .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0)) + .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0)); + + Map localCallsDroppedPerToken = Collections.emptyMap(); synchronized (this) { - savedCallsDroppedPerToken = callsDroppedPerToken; - callsDroppedPerToken = new HashMap(); + if (!callsDroppedPerToken.isEmpty()) { + localCallsDroppedPerToken = callsDroppedPerToken; + callsDroppedPerToken = new HashMap(localCallsDroppedPerToken.size()); + } } - for (Map.Entry dropCount : savedCallsDroppedPerToken.entrySet()) { + for (Entry entry : localCallsDroppedPerToken.entrySet()) { statsBuilder.addCallsFinishedWithDrop( ClientStatsPerToken.newBuilder() - .setLoadBalanceToken(dropCount.getKey()) - .setNumCalls(dropCount.getValue().get()) + .setLoadBalanceToken(entry.getKey()) + .setNumCalls(entry.getValue().num) .build()); } return statsBuilder.build(); } private class StreamTracer extends ClientStreamTracer { - final AtomicBoolean headersSent = new AtomicBoolean(); - final AtomicBoolean anythingReceived = new AtomicBoolean(); + private volatile boolean headersSent; + private volatile boolean anythingReceived; @Override public void outboundHeaders() { - headersSent.set(true); + headersSent = true; } @Override public void inboundHeaders() { - anythingReceived.set(true); + anythingReceived = true; } @Override public void inboundMessage(int seqNo) { - anythingReceived.set(true); + anythingReceived = true; } @Override public void streamClosed(Status status) { - callsFinished.incrementAndGet(); - if (!headersSent.get()) { - callsFailedToSend.incrementAndGet(); + callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); + if (!headersSent) { + callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); } - if (anythingReceived.get()) { - callsFinishedKnownReceived.incrementAndGet(); + if (anythingReceived) { + callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); } } }