core: address data race in StatsTraceContext. (#2454)

Document the threading requirements.

Turn all metrics to volatile because callEnded() may not be synchronized
with other metric-updating methods in the case where the RPC is closed
because of transport error or cancellation from the other side.

Remove precondition checks thus allow metrics updating methods to be
called after callEnded() has been called, which may happen since the
application may not be aware when the RPC is closed by the transport.
This commit is contained in:
Kun Zhang 2016-11-28 18:05:41 -08:00 committed by GitHub
parent 6da8f471f7
commit c1a798486b
1 changed files with 19 additions and 14 deletions

View File

@ -31,8 +31,6 @@
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkState;
import com.google.census.CensusContext;
import com.google.census.CensusContextFactory;
import com.google.census.MetricMap;
@ -49,10 +47,18 @@ import io.grpc.Status;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The stats and tracing information for a call.
*
* <p>This class is not thread-safe, in the sense that the updates to each individual metric must be
* serialized, while multiple threads can update different metrics without any sort of
* synchronization. For example, calls to {@link #wireBytesSent} must be synchronized, while {@link
* #wireBytesReceived} and {@link #wireBytesSent} can be called concurrently. {@link #callEnded}
* can be called concurrently with itself and the other methods.
*/
@SuppressWarnings("NonAtomicVolatileUpdate")
public final class StatsTraceContext {
public static final StatsTraceContext NOOP = StatsTraceContext.newClientContext(
"noopservice/noopmethod", NoopCensusContextFactory.INSTANCE,
@ -66,11 +72,11 @@ public final class StatsTraceContext {
private final Stopwatch stopwatch;
private final Side side;
private final Metadata.Key<CensusContext> censusHeader;
private long wireBytesSent;
private long wireBytesReceived;
private long uncompressedBytesSent;
private long uncompressedBytesReceived;
private boolean callEnded;
private volatile long wireBytesSent;
private volatile long wireBytesReceived;
private volatile long uncompressedBytesSent;
private volatile long uncompressedBytesReceived;
private final AtomicBoolean callEnded = new AtomicBoolean(false);
private StatsTraceContext(Side side, String fullMethodName, CensusContext parentCtx,
Supplier<Stopwatch> stopwatchSupplier, Metadata.Key<CensusContext> censusHeader) {
@ -164,8 +170,6 @@ public final class StatsTraceContext {
* Record the outgoing number of payload bytes as on the wire.
*/
void wireBytesSent(long bytes) {
// TODO(zhangkun83): maybe change of the checkState() to assert after this class is stabilized.
checkState(!callEnded, "already eneded");
wireBytesSent += bytes;
}
@ -173,7 +177,6 @@ public final class StatsTraceContext {
* Record the incoming number of payload bytes as on the wire.
*/
void wireBytesReceived(long bytes) {
checkState(!callEnded, "already eneded");
wireBytesReceived += bytes;
}
@ -183,7 +186,6 @@ public final class StatsTraceContext {
* <p>The time this method is called is unrelated to the actual time when those byte are sent.
*/
void uncompressedBytesSent(long bytes) {
checkState(!callEnded, "already ended");
uncompressedBytesSent += bytes;
}
@ -193,16 +195,19 @@ public final class StatsTraceContext {
* <p>The time this method is called is unrelated to the actual time when those byte are received.
*/
void uncompressedBytesReceived(long bytes) {
checkState(!callEnded, "already ended");
uncompressedBytesReceived += bytes;
}
/**
* Record a finished all and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
void callEnded(Status status) {
checkState(!callEnded, "already ended");
callEnded = true;
if (!callEnded.compareAndSet(false, true)) {
return;
}
stopwatch.stop();
MetricName latencyMetric;
MetricName wireBytesSentMetric;