core: workaround for Atomic*FieldUpdater bug on some Android devices

This commit is contained in:
Eric Gribkoff 2017-11-16 09:59:47 -08:00 committed by GitHub
parent 2f155606b8
commit 4c483ef7a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 346 additions and 61 deletions

View File

@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Provides factories for {@link StreamTracer} that records stats to Census.
@ -154,18 +155,58 @@ public final class CensusStatsModule {
private static final class ClientTracer extends ClientStreamTracer {
private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater =
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
@Nullable
private static final AtomicLongFieldUpdater<ClientTracer> outboundUncompressedSizeUpdater;
@Nullable
private static final AtomicLongFieldUpdater<ClientTracer> inboundUncompressedSizeUpdater;
/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicLongFieldUpdater<ClientTracer> tmpOutboundMessageCountUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundMessageCountUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpOutboundUncompressedSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundUncompressedSizeUpdater;
try {
tmpOutboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundMessageCount");
private static final AtomicLongFieldUpdater<ClientTracer> inboundMessageCountUpdater =
tmpInboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundMessageCount");
private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater =
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater =
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ClientTracer> outboundUncompressedSizeUpdater =
tmpOutboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ClientTracer> inboundUncompressedSizeUpdater =
tmpInboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundUncompressedSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpOutboundMessageCountUpdater = null;
tmpInboundMessageCountUpdater = null;
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
tmpOutboundUncompressedSizeUpdater = null;
tmpInboundUncompressedSizeUpdater = null;
}
outboundMessageCountUpdater = tmpOutboundMessageCountUpdater;
inboundMessageCountUpdater = tmpInboundMessageCountUpdater;
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater;
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}
volatile long outboundMessageCount;
volatile long inboundMessageCount;
@ -175,33 +216,63 @@ public final class CensusStatsModule {
volatile long inboundUncompressedSize;
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundUncompressedSize(long bytes) {
if (outboundUncompressedSizeUpdater != null) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
outboundUncompressedSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundUncompressedSize(long bytes) {
if (inboundUncompressedSizeUpdater != null) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
inboundUncompressedSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
inboundMessageCount++;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundMessage(int seqNo) {
if (outboundMessageCountUpdater != null) {
outboundMessageCountUpdater.getAndIncrement(this);
} else {
outboundMessageCount++;
}
}
}
@ -209,12 +280,34 @@ public final class CensusStatsModule {
@VisibleForTesting
static final class ClientCallTracer extends ClientStreamTracer.Factory {
@Nullable
private static final AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer>
streamTracerUpdater =
streamTracerUpdater;
@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;
/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicReferenceFieldUpdater<ClientCallTracer, ClientTracer> tmpStreamTracerUpdater;
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
try {
tmpStreamTracerUpdater =
AtomicReferenceFieldUpdater.newUpdater(
ClientCallTracer.class, ClientTracer.class, "streamTracer");
private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater =
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamTracerUpdater = null;
tmpCallEndedUpdater = null;
}
streamTracerUpdater = tmpStreamTracerUpdater;
callEndedUpdater = tmpCallEndedUpdater;
}
private final CensusStatsModule module;
private final String fullMethodName;
@ -250,9 +343,16 @@ public final class CensusStatsModule {
ClientTracer tracer = new ClientTracer();
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.
if (streamTracerUpdater != null) {
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case.");
"Are you creating multiple streams per call? This class doesn't yet support this case");
} else {
checkState(
streamTracer == null,
"Are you creating multiple streams per call? This class doesn't yet support this case");
streamTracer = tracer;
}
if (module.propagateTags) {
headers.discardAll(module.statsHeader);
if (!module.tagger.empty().equals(parentCtx)) {
@ -269,9 +369,16 @@ public final class CensusStatsModule {
* is a no-op.
*/
void callEnded(Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
if (!recordFinishedRpcs) {
return;
}
@ -308,20 +415,64 @@ public final class CensusStatsModule {
}
private static final class ServerTracer extends ServerStreamTracer {
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundMessageCountUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
@Nullable
private static final AtomicLongFieldUpdater<ServerTracer> outboundUncompressedSizeUpdater;
@Nullable
private static final AtomicLongFieldUpdater<ServerTracer> inboundUncompressedSizeUpdater;
/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundMessageCountUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundMessageCountUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundUncompressedSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundUncompressedSizeUpdater;
try {
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
private static final AtomicLongFieldUpdater<ServerTracer> outboundMessageCountUpdater =
tmpOutboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> inboundMessageCountUpdater =
tmpInboundMessageCountUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundMessageCount");
private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater =
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater =
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> outboundUncompressedSizeUpdater =
tmpOutboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundUncompressedSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundUncompressedSizeUpdater =
tmpInboundUncompressedSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundUncompressedSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamClosedUpdater = null;
tmpOutboundMessageCountUpdater = null;
tmpInboundMessageCountUpdater = null;
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
tmpOutboundUncompressedSizeUpdater = null;
tmpInboundUncompressedSizeUpdater = null;
}
streamClosedUpdater = tmpStreamClosedUpdater;
outboundMessageCountUpdater = tmpOutboundMessageCountUpdater;
inboundMessageCountUpdater = tmpInboundMessageCountUpdater;
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
outboundUncompressedSizeUpdater = tmpOutboundUncompressedSizeUpdater;
inboundUncompressedSizeUpdater = tmpInboundUncompressedSizeUpdater;
}
private final CensusStatsModule module;
private final String fullMethodName;
@ -358,33 +509,63 @@ public final class CensusStatsModule {
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundUncompressedSize(long bytes) {
if (outboundUncompressedSizeUpdater != null) {
outboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
outboundUncompressedSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundUncompressedSize(long bytes) {
if (inboundUncompressedSizeUpdater != null) {
inboundUncompressedSizeUpdater.getAndAdd(this, bytes);
} else {
inboundUncompressedSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
if (inboundMessageCountUpdater != null) {
inboundMessageCountUpdater.getAndIncrement(this);
} else {
inboundMessageCount++;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundMessage(int seqNo) {
if (outboundMessageCountUpdater != null) {
outboundMessageCountUpdater.getAndIncrement(this);
} else {
outboundMessageCount++;
}
}
/**
@ -395,9 +576,16 @@ public final class CensusStatsModule {
*/
@Override
public void streamClosed(Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
if (!recordFinishedRpcs) {
return;
}

View File

@ -58,10 +58,32 @@ import javax.annotation.Nullable;
*/
final class CensusTracingModule {
private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater =
@Nullable private static final AtomicIntegerFieldUpdater<ClientCallTracer> callEndedUpdater;
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
/**
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
* reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ClientCallTracer> tmpCallEndedUpdater;
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
try {
tmpCallEndedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ClientCallTracer.class, "callEnded");
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpCallEndedUpdater = null;
tmpStreamClosedUpdater = null;
}
callEndedUpdater = tmpCallEndedUpdater;
streamClosedUpdater = tmpStreamClosedUpdater;
}
private final Tracer censusTracer;
@VisibleForTesting
@ -232,9 +254,16 @@ final class CensusTracingModule {
* is a no-op.
*/
void callEnded(io.grpc.Status status) {
if (callEndedUpdater != null) {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (callEnded != 0) {
return;
}
callEnded = 1;
}
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}
}
@ -291,9 +320,16 @@ final class CensusTracingModule {
*/
@Override
public void streamClosed(io.grpc.Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
span.end(createEndSpanOptions(status, isSampledToLocalTracing));
}

View File

@ -37,8 +37,24 @@ public final class SerializingExecutor implements Executor, Runnable {
private static final Logger log =
Logger.getLogger(SerializingExecutor.class.getName());
private static final AtomicIntegerFieldUpdater<SerializingExecutor> runStateUpdater =
AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState");
// When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
// reflection API that triggers a NoSuchFieldException. When this occurs, fallback to a
// synchronized implementation.
private static final AtomicHelper atomicHelper = getAtomicHelper();
private static AtomicHelper getAtomicHelper() {
AtomicHelper helper;
try {
helper =
new FieldUpdaterAtomicHelper(
AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState"));
} catch (Throwable t) {
log.log(Level.SEVERE, "FieldUpdaterAtomicHelper failed", t);
helper = new SynchronizedAtomicHelper();
}
return helper;
}
private static final int STOPPED = 0;
private static final int RUNNING = -1;
@ -71,7 +87,7 @@ public final class SerializingExecutor implements Executor, Runnable {
}
private void schedule(@Nullable Runnable removable) {
if (runStateUpdater.compareAndSet(this, STOPPED, RUNNING)) {
if (atomicHelper.runStateCompareAndSet(this, STOPPED, RUNNING)) {
boolean success = false;
try {
executor.execute(this);
@ -92,7 +108,7 @@ public final class SerializingExecutor implements Executor, Runnable {
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
runStateUpdater.set(this, STOPPED);
atomicHelper.runStateSet(this, STOPPED);
}
}
}
@ -111,11 +127,56 @@ public final class SerializingExecutor implements Executor, Runnable {
}
}
} finally {
runStateUpdater.set(this, STOPPED);
atomicHelper.runStateSet(this, STOPPED);
}
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null);
}
}
private abstract static class AtomicHelper {
public abstract boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update);
public abstract void runStateSet(SerializingExecutor obj, int newValue);
}
private static final class FieldUpdaterAtomicHelper extends AtomicHelper {
private final AtomicIntegerFieldUpdater<SerializingExecutor> runStateUpdater;
private FieldUpdaterAtomicHelper(
AtomicIntegerFieldUpdater<SerializingExecutor> runStateUpdater) {
this.runStateUpdater = runStateUpdater;
}
@Override
public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) {
return runStateUpdater.compareAndSet(obj, expect, update);
}
@Override
public void runStateSet(SerializingExecutor obj, int newValue) {
runStateUpdater.set(obj, newValue);
}
}
private static final class SynchronizedAtomicHelper extends AtomicHelper {
@Override
public boolean runStateCompareAndSet(SerializingExecutor obj, int expect, int update) {
synchronized (obj) {
if (obj.runState == expect) {
obj.runState = update;
return true;
}
return false;
}
}
@Override
public void runStateSet(SerializingExecutor obj, int newValue) {
synchronized (obj) {
obj.runState = newValue;
}
}
}
}