From c1a798486b61efa55f659cefb99a679b2d737291 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 28 Nov 2016 18:05:41 -0800 Subject: [PATCH] core: address data race in StatsTraceContext. (#2454) Document the threading requirements. Turn all metrics to volatile because callEnded() may not be synchronized with other metric-updating methods in the case where the RPC is closed because of transport error or cancellation from the other side. Remove precondition checks thus allow metrics updating methods to be called after callEnded() has been called, which may happen since the application may not be aware when the RPC is closed by the transport. --- .../io/grpc/internal/StatsTraceContext.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index 2705a37856..1a7af8fe9b 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -31,8 +31,6 @@ package io.grpc.internal; -import static com.google.common.base.Preconditions.checkState; - import com.google.census.CensusContext; import com.google.census.CensusContextFactory; import com.google.census.MetricMap; @@ -49,10 +47,18 @@ import io.grpc.Status; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * The stats and tracing information for a call. + * + *

This class is not thread-safe, in the sense that the updates to each individual metric must be + * serialized, while multiple threads can update different metrics without any sort of + * synchronization. For example, calls to {@link #wireBytesSent} must be synchronized, while {@link + * #wireBytesReceived} and {@link #wireBytesSent} can be called concurrently. {@link #callEnded} + * can be called concurrently with itself and the other methods. */ +@SuppressWarnings("NonAtomicVolatileUpdate") public final class StatsTraceContext { public static final StatsTraceContext NOOP = StatsTraceContext.newClientContext( "noopservice/noopmethod", NoopCensusContextFactory.INSTANCE, @@ -66,11 +72,11 @@ public final class StatsTraceContext { private final Stopwatch stopwatch; private final Side side; private final Metadata.Key censusHeader; - private long wireBytesSent; - private long wireBytesReceived; - private long uncompressedBytesSent; - private long uncompressedBytesReceived; - private boolean callEnded; + private volatile long wireBytesSent; + private volatile long wireBytesReceived; + private volatile long uncompressedBytesSent; + private volatile long uncompressedBytesReceived; + private final AtomicBoolean callEnded = new AtomicBoolean(false); private StatsTraceContext(Side side, String fullMethodName, CensusContext parentCtx, Supplier stopwatchSupplier, Metadata.Key censusHeader) { @@ -164,8 +170,6 @@ public final class StatsTraceContext { * Record the outgoing number of payload bytes as on the wire. */ void wireBytesSent(long bytes) { - // TODO(zhangkun83): maybe change of the checkState() to assert after this class is stabilized. - checkState(!callEnded, "already eneded"); wireBytesSent += bytes; } @@ -173,7 +177,6 @@ public final class StatsTraceContext { * Record the incoming number of payload bytes as on the wire. */ void wireBytesReceived(long bytes) { - checkState(!callEnded, "already eneded"); wireBytesReceived += bytes; } @@ -183,7 +186,6 @@ public final class StatsTraceContext { *

The time this method is called is unrelated to the actual time when those byte are sent. */ void uncompressedBytesSent(long bytes) { - checkState(!callEnded, "already ended"); uncompressedBytesSent += bytes; } @@ -193,16 +195,19 @@ public final class StatsTraceContext { *

The time this method is called is unrelated to the actual time when those byte are received. */ void uncompressedBytesReceived(long bytes) { - checkState(!callEnded, "already ended"); uncompressedBytesReceived += bytes; } /** * Record a finished all and mark the current time as the end time. + * + *

Can be called from any thread without synchronization. Calling it the second time or more + * is a no-op. */ void callEnded(Status status) { - checkState(!callEnded, "already ended"); - callEnded = true; + if (!callEnded.compareAndSet(false, true)) { + return; + } stopwatch.stop(); MetricName latencyMetric; MetricName wireBytesSentMetric;