Allow executor tracing to flow across distinct executors

Otherwise any async work is lost after the first executor jump.
This commit is contained in:
Tyler Benson 2018-08-15 14:42:23 +10:00
parent 72b2873ec1
commit 2174f21d61
1 changed files with 23 additions and 15 deletions

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
@ -173,8 +172,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.This final Executor executor,
DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable); @Advice.Enter final DatadogWrapper wrapper,
@Advice.Thrown final Throwable throwable) {
DatadogWrapper.cleanUpOnMethodExit(executor, wrapper, throwable);
} }
} }
@ -195,8 +196,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.This final Executor executor,
DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable); @Advice.Enter final DatadogWrapper wrapper,
@Advice.Thrown final Throwable throwable) {
DatadogWrapper.cleanUpOnMethodExit(executor, wrapper, throwable);
} }
} }
@ -204,12 +207,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Collection<?> wrapJob( public static Collection<?> wrapJob(
@Advice.This final Executor executor,
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) { @Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& tasks != null && tasks != null
&& DatadogWrapper.isTopLevelCall()) { && DatadogWrapper.isTopLevelCall(executor)) {
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size()); final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (final Callable<?> task : tasks) { for (final Callable<?> task : tasks) {
if (task != null && !(task instanceof CallableWrapper)) { if (task != null && !(task instanceof CallableWrapper)) {
@ -224,9 +228,11 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void checkCancel(
@Advice.Enter final Collection<?> wrappedJobs, @Advice.Thrown final Throwable throwable) { @Advice.This final Executor executor,
@Advice.Enter final Collection<?> wrappedJobs,
@Advice.Thrown final Throwable throwable) {
if (null != wrappedJobs) { if (null != wrappedJobs) {
DatadogWrapper.resetNestedCalls(); DatadogWrapper.resetNestedCalls(executor);
if (null != throwable) { if (null != throwable) {
for (final Object wrapper : wrappedJobs) { for (final Object wrapper : wrappedJobs) {
@ -264,13 +270,14 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
* *
* @return true iff call is not nested * @return true iff call is not nested
*/ */
public static boolean isTopLevelCall() { public static boolean isTopLevelCall(final Executor executor) {
return CallDepthThreadLocalMap.incrementCallDepth(ExecutorService.class) <= 0; final int i = CallDepthThreadLocalMap.incrementCallDepth(executor.getClass());
return i <= 0;
} }
/** Reset nested calls to executor. */ /** Reset nested calls to executor. */
public static void resetNestedCalls() { public static void resetNestedCalls(final Executor executor) {
CallDepthThreadLocalMap.reset(ExecutorService.class); CallDepthThreadLocalMap.reset(executor.getClass());
} }
/** /**
@ -283,20 +290,21 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& task != null && task != null
&& !(task instanceof DatadogWrapper) && !(task instanceof DatadogWrapper)
&& isTopLevelCall() && isTopLevelCall(executor)
&& !ConcurrentUtils.isDisabled(executor)); && !ConcurrentUtils.isDisabled(executor));
} }
/** /**
* Clean up after job submission method has exited * Clean up after job submission method has exited
* *
* @param executor the current executor
* @param wrapper task wrapper * @param wrapper task wrapper
* @param throwable throwable that may have been thrown * @param throwable throwable that may have been thrown
*/ */
public static void cleanUpOnMethodExit( public static void cleanUpOnMethodExit(
final DatadogWrapper wrapper, final Throwable throwable) { final Executor executor, final DatadogWrapper wrapper, final Throwable throwable) {
if (null != wrapper) { if (null != wrapper) {
resetNestedCalls(); resetNestedCalls(executor);
if (null != throwable) { if (null != throwable) {
wrapper.cancel(); wrapper.cancel();
} }