diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 37c658af65..13d12d0c18 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Provides factories for {@link StreamTracer} that records stats to Census. @@ -154,18 +155,58 @@ public final class CensusStatsModule { private static final class ClientTracer extends ClientStreamTracer { - private static final AtomicLongFieldUpdater outboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount"); - private static final AtomicLongFieldUpdater inboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount"); - private static final AtomicLongFieldUpdater outboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize"); - private static final AtomicLongFieldUpdater inboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize"); - private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize"); - private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize"); + @Nullable private static final AtomicLongFieldUpdater outboundMessageCountUpdater; + @Nullable private static final AtomicLongFieldUpdater inboundMessageCountUpdater; + @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; + @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; + + @Nullable + private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater; + + @Nullable + private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater; + + /** + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their + * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicLongFieldUpdater tmpOutboundMessageCountUpdater; + AtomicLongFieldUpdater tmpInboundMessageCountUpdater; + AtomicLongFieldUpdater tmpOutboundWireSizeUpdater; + AtomicLongFieldUpdater tmpInboundWireSizeUpdater; + AtomicLongFieldUpdater tmpOutboundUncompressedSizeUpdater; + AtomicLongFieldUpdater tmpInboundUncompressedSizeUpdater; + try { + tmpOutboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount"); + tmpInboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount"); + tmpOutboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize"); + tmpInboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize"); + tmpOutboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize"); + tmpInboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpOutboundMessageCountUpdater = null; + tmpInboundMessageCountUpdater = null; + tmpOutboundWireSizeUpdater = null; + tmpInboundWireSizeUpdater = null; + tmpOutboundUncompressedSizeUpdater = null; + tmpInboundUncompressedSizeUpdater = null; + } + outboundMessageCountUpdater = tmpOutboundMessageCountUpdater; + inboundMessageCountUpdater = tmpInboundMessageCountUpdater; + outboundWireSizeUpdater = tmpOutboundWireSizeUpdater; + inboundWireSizeUpdater = tmpInboundWireSizeUpdater; + outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater; + inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater; + } volatile long outboundMessageCount; volatile long inboundMessageCount; @@ -175,33 +216,63 @@ public final class CensusStatsModule { volatile long inboundUncompressedSize; @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundWireSize(long bytes) { - outboundWireSizeUpdater.getAndAdd(this, bytes); + if (outboundWireSizeUpdater != null) { + outboundWireSizeUpdater.getAndAdd(this, bytes); + } else { + outboundWireSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundWireSize(long bytes) { - inboundWireSizeUpdater.getAndAdd(this, bytes); + if (inboundWireSizeUpdater != null) { + inboundWireSizeUpdater.getAndAdd(this, bytes); + } else { + inboundWireSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundUncompressedSize(long bytes) { - outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + if (outboundUncompressedSizeUpdater != null) { + outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + } else { + outboundUncompressedSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundUncompressedSize(long bytes) { - inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + if (inboundUncompressedSizeUpdater != null) { + inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + } else { + inboundUncompressedSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundMessage(int seqNo) { - inboundMessageCountUpdater.getAndIncrement(this); + if (inboundMessageCountUpdater != null) { + inboundMessageCountUpdater.getAndIncrement(this); + } else { + inboundMessageCount++; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundMessage(int seqNo) { - outboundMessageCountUpdater.getAndIncrement(this); + if (outboundMessageCountUpdater != null) { + outboundMessageCountUpdater.getAndIncrement(this); + } else { + outboundMessageCount++; + } } } @@ -209,12 +280,34 @@ public final class CensusStatsModule { @VisibleForTesting static final class ClientCallTracer extends ClientStreamTracer.Factory { + @Nullable private static final AtomicReferenceFieldUpdater - streamTracerUpdater = + streamTracerUpdater; + + @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; + + /** + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their + * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicReferenceFieldUpdater tmpStreamTracerUpdater; + AtomicIntegerFieldUpdater tmpCallEndedUpdater; + try { + tmpStreamTracerUpdater = AtomicReferenceFieldUpdater.newUpdater( ClientCallTracer.class, ClientTracer.class, "streamTracer"); - private static final AtomicIntegerFieldUpdater callEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + tmpCallEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpStreamTracerUpdater = null; + tmpCallEndedUpdater = null; + } + streamTracerUpdater = tmpStreamTracerUpdater; + callEndedUpdater = tmpCallEndedUpdater; + } private final CensusStatsModule module; private final String fullMethodName; @@ -250,9 +343,16 @@ public final class CensusStatsModule { ClientTracer tracer = new ClientTracer(); // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // one streams. We will need to update this file to support them. - checkState( - streamTracerUpdater.compareAndSet(this, null, tracer), - "Are you creating multiple streams per call? This class doesn't yet support this case."); + if (streamTracerUpdater != null) { + checkState( + streamTracerUpdater.compareAndSet(this, null, tracer), + "Are you creating multiple streams per call? This class doesn't yet support this case"); + } else { + checkState( + streamTracer == null, + "Are you creating multiple streams per call? This class doesn't yet support this case"); + streamTracer = tracer; + } if (module.propagateTags) { headers.discardAll(module.statsHeader); if (!module.tagger.empty().equals(parentCtx)) { @@ -269,8 +369,15 @@ public final class CensusStatsModule { * is a no-op. */ void callEnded(Status status) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { - return; + if (callEndedUpdater != null) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (callEnded != 0) { + return; + } + callEnded = 1; } if (!recordFinishedRpcs) { return; @@ -308,20 +415,64 @@ public final class CensusStatsModule { } private static final class ServerTracer extends ServerStreamTracer { - private static final AtomicIntegerFieldUpdater streamClosedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); - private static final AtomicLongFieldUpdater outboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount"); - private static final AtomicLongFieldUpdater inboundMessageCountUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount"); - private static final AtomicLongFieldUpdater outboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize"); - private static final AtomicLongFieldUpdater inboundWireSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize"); - private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize"); - private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater = - AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize"); + @Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater; + @Nullable private static final AtomicLongFieldUpdater outboundMessageCountUpdater; + @Nullable private static final AtomicLongFieldUpdater inboundMessageCountUpdater; + @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; + @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; + + @Nullable + private static final AtomicLongFieldUpdater outboundUncompressedSizeUpdater; + + @Nullable + private static final AtomicLongFieldUpdater inboundUncompressedSizeUpdater; + + /** + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their + * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicIntegerFieldUpdater tmpStreamClosedUpdater; + AtomicLongFieldUpdater tmpOutboundMessageCountUpdater; + AtomicLongFieldUpdater tmpInboundMessageCountUpdater; + AtomicLongFieldUpdater tmpOutboundWireSizeUpdater; + AtomicLongFieldUpdater tmpInboundWireSizeUpdater; + AtomicLongFieldUpdater tmpOutboundUncompressedSizeUpdater; + AtomicLongFieldUpdater tmpInboundUncompressedSizeUpdater; + try { + tmpStreamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + tmpOutboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount"); + tmpInboundMessageCountUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount"); + tmpOutboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize"); + tmpInboundWireSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize"); + tmpOutboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize"); + tmpInboundUncompressedSizeUpdater = + AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpStreamClosedUpdater = null; + tmpOutboundMessageCountUpdater = null; + tmpInboundMessageCountUpdater = null; + tmpOutboundWireSizeUpdater = null; + tmpInboundWireSizeUpdater = null; + tmpOutboundUncompressedSizeUpdater = null; + tmpInboundUncompressedSizeUpdater = null; + } + streamClosedUpdater = tmpStreamClosedUpdater; + outboundMessageCountUpdater = tmpOutboundMessageCountUpdater; + inboundMessageCountUpdater = tmpInboundMessageCountUpdater; + outboundWireSizeUpdater = tmpOutboundWireSizeUpdater; + inboundWireSizeUpdater = tmpInboundWireSizeUpdater; + outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater; + inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater; + } private final CensusStatsModule module; private final String fullMethodName; @@ -358,33 +509,63 @@ public final class CensusStatsModule { } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundWireSize(long bytes) { - outboundWireSizeUpdater.getAndAdd(this, bytes); + if (outboundWireSizeUpdater != null) { + outboundWireSizeUpdater.getAndAdd(this, bytes); + } else { + outboundWireSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundWireSize(long bytes) { - inboundWireSizeUpdater.getAndAdd(this, bytes); + if (inboundWireSizeUpdater != null) { + inboundWireSizeUpdater.getAndAdd(this, bytes); + } else { + inboundWireSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundUncompressedSize(long bytes) { - outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + if (outboundUncompressedSizeUpdater != null) { + outboundUncompressedSizeUpdater.getAndAdd(this, bytes); + } else { + outboundUncompressedSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundUncompressedSize(long bytes) { - inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + if (inboundUncompressedSizeUpdater != null) { + inboundUncompressedSizeUpdater.getAndAdd(this, bytes); + } else { + inboundUncompressedSize += bytes; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void inboundMessage(int seqNo) { - inboundMessageCountUpdater.getAndIncrement(this); + if (inboundMessageCountUpdater != null) { + inboundMessageCountUpdater.getAndIncrement(this); + } else { + inboundMessageCount++; + } } @Override + @SuppressWarnings("NonAtomicVolatileUpdate") public void outboundMessage(int seqNo) { - outboundMessageCountUpdater.getAndIncrement(this); + if (outboundMessageCountUpdater != null) { + outboundMessageCountUpdater.getAndIncrement(this); + } else { + outboundMessageCount++; + } } /** @@ -395,8 +576,15 @@ public final class CensusStatsModule { */ @Override public void streamClosed(Status status) { - if (streamClosedUpdater.getAndSet(this, 1) != 0) { - return; + if (streamClosedUpdater != null) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (streamClosed != 0) { + return; + } + streamClosed = 1; } if (!recordFinishedRpcs) { return; diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index 626abb4aec..0608e0fd8f 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -58,10 +58,32 @@ import javax.annotation.Nullable; */ final class CensusTracingModule { private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName()); - private static final AtomicIntegerFieldUpdater callEndedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); - private static final AtomicIntegerFieldUpdater streamClosedUpdater = - AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + + @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; + + @Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater; + + /** + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK + * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicIntegerFieldUpdater tmpCallEndedUpdater; + AtomicIntegerFieldUpdater tmpStreamClosedUpdater; + try { + tmpCallEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded"); + tmpStreamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpCallEndedUpdater = null; + tmpStreamClosedUpdater = null; + } + callEndedUpdater = tmpCallEndedUpdater; + streamClosedUpdater = tmpStreamClosedUpdater; + } private final Tracer censusTracer; @VisibleForTesting @@ -232,8 +254,15 @@ final class CensusTracingModule { * is a no-op. */ void callEnded(io.grpc.Status status) { - if (callEndedUpdater.getAndSet(this, 1) != 0) { - return; + if (callEndedUpdater != null) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (callEnded != 0) { + return; + } + callEnded = 1; } span.end(createEndSpanOptions(status, isSampledToLocalTracing)); } @@ -291,8 +320,15 @@ final class CensusTracingModule { */ @Override public void streamClosed(io.grpc.Status status) { - if (streamClosedUpdater.getAndSet(this, 1) != 0) { - return; + if (streamClosedUpdater != null) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (streamClosed != 0) { + return; + } + streamClosed = 1; } span.end(createEndSpanOptions(status, isSampledToLocalTracing)); } diff --git a/core/src/main/java/io/grpc/internal/SerializingExecutor.java b/core/src/main/java/io/grpc/internal/SerializingExecutor.java index e5957452cf..84632b9cbb 100644 --- a/core/src/main/java/io/grpc/internal/SerializingExecutor.java +++ b/core/src/main/java/io/grpc/internal/SerializingExecutor.java @@ -37,8 +37,24 @@ public final class SerializingExecutor implements Executor, Runnable { private static final Logger log = Logger.getLogger(SerializingExecutor.class.getName()); - private static final AtomicIntegerFieldUpdater runStateUpdater = - AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState"); + // When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK + // reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a + // synchronized implementation. + private static final AtomicHelper atomicHelper = getAtomicHelper(); + + private static AtomicHelper getAtomicHelper() { + AtomicHelper helper; + try { + helper = + new FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState")); + } catch (Throwable t) { + log.log(Level.SEVERE, "FieldUpdaterAtomicHelper failed", t); + helper = new SynchronizedAtomicHelper(); + } + return helper; + } + private static final int STOPPED = 0; private static final int RUNNING = -1; @@ -71,7 +87,7 @@ public final class SerializingExecutor implements Executor, Runnable { } private void schedule(@Nullable Runnable removable) { - if (runStateUpdater.compareAndSet(this, STOPPED, RUNNING)) { + if (atomicHelper.runStateCompareAndSet(this, STOPPED, RUNNING)) { boolean success = false; try { executor.execute(this); @@ -92,7 +108,7 @@ public final class SerializingExecutor implements Executor, Runnable { // to execute don't succeed and accidentally run a previous runnable. runQueue.remove(removable); } - runStateUpdater.set(this, STOPPED); + atomicHelper.runStateSet(this, STOPPED); } } } @@ -111,11 +127,56 @@ public final class SerializingExecutor implements Executor, Runnable { } } } finally { - runStateUpdater.set(this, STOPPED); + atomicHelper.runStateSet(this, STOPPED); } if (!runQueue.isEmpty()) { // we didn't enqueue anything but someone else did. schedule(null); } } + + private abstract static class AtomicHelper { + public abstract boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update); + + public abstract void runStateSet(SerializingExecutor obj, int newValue); + } + + private static final class FieldUpdaterAtomicHelper extends AtomicHelper { + private final AtomicIntegerFieldUpdater runStateUpdater; + + private FieldUpdaterAtomicHelper( + AtomicIntegerFieldUpdater runStateUpdater) { + this.runStateUpdater = runStateUpdater; + } + + @Override + public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) { + return runStateUpdater.compareAndSet(obj, expect, update); + } + + @Override + public void runStateSet(SerializingExecutor obj, int newValue) { + runStateUpdater.set(obj, newValue); + } + } + + private static final class SynchronizedAtomicHelper extends AtomicHelper { + @Override + public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) { + synchronized (obj) { + if (obj.runState == expect) { + obj.runState = update; + return true; + } + return false; + } + } + + @Override + public void runStateSet(SerializingExecutor obj, int newValue) { + synchronized (obj) { + obj.runState = newValue; + } + } + } }