Fix Scala Slick instrumentation

This commit is contained in:
Nikolay Martynov 2018-06-05 14:54:31 -04:00
parent b8fdb4acd5
commit f92af7d860
1 changed files with 106 additions and 33 deletions

View File

@ -12,6 +12,7 @@ import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.context.TraceScope; import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
@ -40,6 +41,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
public static final HelperInjector EXEC_HELPER_INJECTOR = public static final HelperInjector EXEC_HELPER_INJECTOR =
new HelperInjector( new HelperInjector(
ExecutorInstrumentation.class.getName() + "$ConcurrentUtils", ExecutorInstrumentation.class.getName() + "$ConcurrentUtils",
ExecutorInstrumentation.class.getName() + "$WrapAdviceUtils",
ExecutorInstrumentation.class.getName() + "$DatadogWrapper", ExecutorInstrumentation.class.getName() + "$DatadogWrapper",
ExecutorInstrumentation.class.getName() + "$CallableWrapper", ExecutorInstrumentation.class.getName() + "$CallableWrapper",
ExecutorInstrumentation.class.getName() + "$RunnableWrapper"); ExecutorInstrumentation.class.getName() + "$RunnableWrapper");
@ -49,6 +51,12 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
* may be lifted to include all executors. * may be lifted to include all executors.
*/ */
private static final Collection<String> WHITELISTED_EXECUTORS; private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
* checking this list is more expensive (O(n)) we should try to keep it short.
*/
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static { static {
final String[] whitelist = { final String[] whitelist = {
@ -84,6 +92,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
"play.api.libs.streams.Execution$trampoline$" "play.api.libs.streams.Execution$trampoline$"
}; };
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
} }
public ExecutorInstrumentation() { public ExecutorInstrumentation() {
@ -98,7 +110,15 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
new ElementMatcher<TypeDescription>() { new ElementMatcher<TypeDescription>() {
@Override @Override
public boolean matches(final TypeDescription target) { public boolean matches(final TypeDescription target) {
final boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
for (String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
}
if (!whitelisted) { if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName()); log.debug("Skipping executor instrumentation for {}", target.getName());
} }
@ -134,72 +154,120 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
.asDecorator(); .asDecorator();
} }
public static class WrapRunnableAdvice { /** Utils class to provide helper methods to wrap Runnable and Callable */
@Advice.OnMethodEnter(suppress = Throwable.class) public static class WrapAdviceUtils {
public static DatadogWrapper wrapJob(
@Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Runnable task) { /**
* Check if given call to executor is nested. We would like to ignore nested calls to execute to
* avoid wrapping tasks twice. Note: this condition may lead to problems with executors that
* 'fork' several tasks, but we do not have such executors at the moment. Note: this condition
* is mutating and needs to be checked right before task is actually wrapped.
*
* @return true iff call is nested
*/
@SuppressWarnings("WeakerAccess")
public static boolean isTopLevelCall() {
return CallDepthThreadLocalMap.incrementCallDepth(ExecutorService.class) <= 0;
}
/** Reset nested calls to executor. */
@SuppressWarnings("WeakerAccess")
public static void resetNestedCalls() {
CallDepthThreadLocalMap.reset(ExecutorService.class);
}
/**
* @param task tak object
* @return true iff given task object should be wrapped
*/
@SuppressWarnings("WeakerAccess")
public static boolean shouldWrapTask(Object task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& task != null && task != null
&& !(task instanceof DatadogWrapper) && !(task instanceof DatadogWrapper)
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { && isTopLevelCall());
}
/**
* Clean up after job submittion method has exited
*
* @param wrapper task wrapper
* @param throwable throwable that may have been thrown
*/
@SuppressWarnings("WeakerAccess")
public static void cleanUpOnMethodExit(
final DatadogWrapper wrapper, final Throwable throwable) {
if (null != wrapper) {
resetNestedCalls();
if (null != throwable) {
wrapper.cancel();
}
}
}
}
public static class WrapRunnableAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (WrapAdviceUtils.shouldWrapTask(task)) {
task = new RunnableWrapper(task, (TraceScope) scope); task = new RunnableWrapper(task, (TraceScope) scope);
return (RunnableWrapper) task; return (RunnableWrapper) task;
} }
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) {
if (null != wrapper && null != throwable) { WrapAdviceUtils.cleanUpOnMethodExit(wrapper, throwable);
wrapper.cancel();
}
} }
} }
public static class WrapCallableAdvice { public static class WrapCallableAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper wrapJob( public static DatadogWrapper enterJobSubmit(
@Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Callable<?> task) { @Advice.Argument(value = 0, readOnly = false) Callable<?> task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (WrapAdviceUtils.shouldWrapTask(task)) {
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) {
task = new CallableWrapper<>(task, (TraceScope) scope); task = new CallableWrapper<>(task, (TraceScope) scope);
return (CallableWrapper) task; return (CallableWrapper) task;
} }
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) {
if (null != wrapper && null != throwable) { WrapAdviceUtils.cleanUpOnMethodExit(wrapper, throwable);
wrapper.cancel();
}
} }
} }
public static class WrapCallableCollectionAdvice { public static class WrapCallableCollectionAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Collection<?> wrapJob( public static Collection<?> wrapJob(
@Advice.This Object dis,
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) { @Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { && tasks != null
&& WrapAdviceUtils.isTopLevelCall()) {
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size()); final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
if (task != null) { if (task != null && !(task instanceof CallableWrapper)) {
if (!(task instanceof CallableWrapper)) { wrappedTasks.add(new CallableWrapper<>(task, (TraceScope) scope));
task = new CallableWrapper<>(task, (TraceScope) scope);
}
wrappedTasks.add(task);
} }
} }
tasks = wrappedTasks; tasks = wrappedTasks;
@ -208,10 +276,14 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void checkCancel(
@Advice.Enter final Collection<?> wrappedJobs, @Advice.Thrown final Throwable throwable) { @Advice.Enter final Collection<?> wrappedJobs, @Advice.Thrown final Throwable throwable) {
if (null != wrappedJobs && null != throwable) { if (null != wrappedJobs) {
WrapAdviceUtils.resetNestedCalls();
if (null != throwable) {
for (final Object wrapper : wrappedJobs) { for (final Object wrapper : wrappedJobs) {
if (wrapper instanceof DatadogWrapper) { if (wrapper instanceof DatadogWrapper) {
((DatadogWrapper) wrapper).cancel(); ((DatadogWrapper) wrapper).cancel();
@ -220,6 +292,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
} }
} }
} }
}
/** Marker interface for tasks which are wrapped to propagate the trace context. */ /** Marker interface for tasks which are wrapped to propagate the trace context. */
@Slf4j @Slf4j