Merge pull request #341 from DataDog/mar-kolya/slick-instrumentation

Fix thread pool instrumentation to work with Scala Slick, and add tests for Slick
This commit is contained in:
Nikolay Martynov 2018-06-10 19:18:05 -04:00 committed by GitHub
commit 074a020335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 271 additions and 53 deletions

View File

@ -2,7 +2,6 @@ package datadog.trace.bootstrap;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Utility to track nested instrumentation. * Utility to track nested instrumentation.
@ -11,30 +10,27 @@ import java.util.concurrent.atomic.AtomicInteger;
* #incrementCallDepth at the beginning of each constructor. * #incrementCallDepth at the beginning of each constructor.
*/ */
public class CallDepthThreadLocalMap { public class CallDepthThreadLocalMap {
private static final ThreadLocal<Map<Object, AtomicInteger>> TLS = private static final ThreadLocal<Map<Object, Integer>> TLS =
new ThreadLocal<Map<Object, AtomicInteger>>() { new ThreadLocal<Map<Object, Integer>>() {
@Override @Override
public Map<Object, AtomicInteger> initialValue() { public Map<Object, Integer> initialValue() {
return new HashMap<>(); return new HashMap<>();
} }
}; };
public static int incrementCallDepth(final Object k) { public static int incrementCallDepth(final Object k) {
final Map<Object, AtomicInteger> map = TLS.get(); final Map<Object, Integer> map = TLS.get();
AtomicInteger depth = map.get(k); Integer depth = map.get(k);
if (depth == null) { if (depth == null) {
depth = new AtomicInteger(0); depth = 0;
map.put(k, depth);
return 0;
} else { } else {
return depth.incrementAndGet(); depth += 1;
} }
map.put(k, depth);
return depth;
} }
public static void reset(final Object k) { public static void reset(final Object k) {
final Map<Object, AtomicInteger> map = TLS.get(); TLS.get().remove(k);
if (map != null) {
map.remove(k);
}
} }
} }

View File

@ -1,4 +1,16 @@
apply from: "${rootDir}/gradle/java.gradle" apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
slickTest
}
compileSlickTestGroovy {
classpath = classpath.plus(files(compileSlickTestScala.destinationDir))
dependsOn compileSlickTestScala
}
dependencies { dependencies {
compile project(':dd-trace-api') compile project(':dd-trace-api')
@ -11,4 +23,15 @@ dependencies {
testCompile project(':dd-java-agent:testing') testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:trace-annotation') testCompile project(':dd-java-agent:instrumentation:trace-annotation')
slickTestCompile project(':dd-java-agent:testing')
slickTestCompile project(':dd-java-agent:instrumentation:java-concurrent')
slickTestCompile project(':dd-java-agent:instrumentation:trace-annotation')
slickTestCompile project(':dd-java-agent:instrumentation:jdbc')
slickTestCompile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'
slickTestCompile group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0'
slickTestCompile group: 'com.h2database', name: 'h2', version: '1.4.197'
} }
// Run Slick library tests along with the rest of unit tests
test.dependsOn slickTest

View File

@ -12,6 +12,7 @@ import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.context.TraceScope; import datadog.trace.context.TraceScope;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
@ -49,6 +50,12 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
* may be lifted to include all executors. * may be lifted to include all executors.
*/ */
private static final Collection<String> WHITELISTED_EXECUTORS; private static final Collection<String> WHITELISTED_EXECUTORS;
/**
* Some frameworks have their executors defined as anon classes inside other classes. Referencing
* anon classes by name would be fragile, so instead we will use list of class prefix names. Since
* checking this list is more expensive (O(n)) we should try to keep it short.
*/
private static final Collection<String> WHITELISTED_EXECUTORS_PREFIXES;
static { static {
final String[] whitelist = { final String[] whitelist = {
@ -84,6 +91,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
"play.api.libs.streams.Execution$trampoline$" "play.api.libs.streams.Execution$trampoline$"
}; };
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));
final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"};
WHITELISTED_EXECUTORS_PREFIXES =
Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes));
} }
public ExecutorInstrumentation() { public ExecutorInstrumentation() {
@ -98,7 +109,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
new ElementMatcher<TypeDescription>() { new ElementMatcher<TypeDescription>() {
@Override @Override
public boolean matches(final TypeDescription target) { public boolean matches(final TypeDescription target) {
final boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName());
// Check for possible prefixes match only if not whitelisted already
if (!whitelisted) {
for (String name : WHITELISTED_EXECUTORS_PREFIXES) {
if (target.getName().startsWith(name)) {
whitelisted = true;
break;
}
}
}
if (!whitelisted) { if (!whitelisted) {
log.debug("Skipping executor instrumentation for {}", target.getName()); log.debug("Skipping executor instrumentation for {}", target.getName());
} }
@ -135,71 +157,65 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
} }
public static class WrapRunnableAdvice { public static class WrapRunnableAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper wrapJob( public static DatadogWrapper enterJobSubmit(
@Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Runnable task) { @Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (DatadogWrapper.shouldWrapTask(task)) {
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) {
task = new RunnableWrapper(task, (TraceScope) scope); task = new RunnableWrapper(task, (TraceScope) scope);
return (RunnableWrapper) task; return (RunnableWrapper) task;
} }
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) {
if (null != wrapper && null != throwable) { DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable);
wrapper.cancel();
}
} }
} }
public static class WrapCallableAdvice { public static class WrapCallableAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static DatadogWrapper wrapJob( public static DatadogWrapper enterJobSubmit(
@Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Callable<?> task) { @Advice.Argument(value = 0, readOnly = false) Callable<?> task) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (DatadogWrapper.shouldWrapTask(task)) {
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) {
task = new CallableWrapper<>(task, (TraceScope) scope); task = new CallableWrapper<>(task, (TraceScope) scope);
return (CallableWrapper) task; return (CallableWrapper) task;
} }
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void exitJobSubmit(
@Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) {
if (null != wrapper && null != throwable) { DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable);
wrapper.cancel();
}
} }
} }
public static class WrapCallableCollectionAdvice { public static class WrapCallableCollectionAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Collection<?> wrapJob( public static Collection<?> wrapJob(
@Advice.This Object dis,
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) { @Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating() && ((TraceScope) scope).isAsyncPropagating()
&& (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { && tasks != null
&& DatadogWrapper.isTopLevelCall()) {
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size()); final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
if (task != null) { if (task != null && !(task instanceof CallableWrapper)) {
if (!(task instanceof CallableWrapper)) { wrappedTasks.add(new CallableWrapper<>(task, (TraceScope) scope));
task = new CallableWrapper<>(task, (TraceScope) scope);
}
wrappedTasks.add(task);
} }
} }
tasks = wrappedTasks; tasks = wrappedTasks;
@ -208,13 +224,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
return null; return null;
} }
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void checkCancel( public static void checkCancel(
@Advice.Enter final Collection<?> wrappedJobs, @Advice.Thrown final Throwable throwable) { @Advice.Enter final Collection<?> wrappedJobs, @Advice.Thrown final Throwable throwable) {
if (null != wrappedJobs && null != throwable) { if (null != wrappedJobs) {
for (final Object wrapper : wrappedJobs) { DatadogWrapper.resetNestedCalls();
if (wrapper instanceof DatadogWrapper) {
((DatadogWrapper) wrapper).cancel(); if (null != throwable) {
for (final Object wrapper : wrappedJobs) {
if (wrapper instanceof DatadogWrapper) {
((DatadogWrapper) wrapper).cancel();
}
} }
} }
} }
@ -237,6 +258,56 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
log.debug("canceled continuation {}", continuation); log.debug("canceled continuation {}", continuation);
} }
} }
/**
* Check if given call to executor is nested. We would like to ignore nested calls to execute to
* avoid wrapping tasks twice. Note: this condition may lead to problems with executors that
* 'fork' several tasks, but we do not have such executors at the moment. Note: this condition
* is mutating and needs to be checked right before task is actually wrapped.
*
* @return true iff call is not nested
*/
@SuppressWarnings("WeakerAccess")
public static boolean isTopLevelCall() {
return CallDepthThreadLocalMap.incrementCallDepth(ExecutorService.class) <= 0;
}
/** Reset nested calls to executor. */
@SuppressWarnings("WeakerAccess")
public static void resetNestedCalls() {
CallDepthThreadLocalMap.reset(ExecutorService.class);
}
/**
* @param task task object
* @return true iff given task object should be wrapped
*/
@SuppressWarnings("WeakerAccess")
public static boolean shouldWrapTask(Object task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
return (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)
&& isTopLevelCall());
}
/**
* Clean up after job submission method has exited
*
* @param wrapper task wrapper
* @param throwable throwable that may have been thrown
*/
@SuppressWarnings("WeakerAccess")
public static void cleanUpOnMethodExit(
final DatadogWrapper wrapper, final Throwable throwable) {
if (null != wrapper) {
resetNestedCalls();
if (null != throwable) {
wrapper.cancel();
}
}
}
} }
@Slf4j @Slf4j

View File

@ -0,0 +1,82 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import spock.lang.Shared
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
class SlickTest extends AgentTestRunner {
@Shared
def database = new SlickUtils()
def "Basic statement generates spans"() {
setup:
def future = database.startQuery(SlickUtils.TestQuery())
def result = database.getResults(future)
expect:
result == SlickUtils.TestValue()
assertTraces(TEST_WRITER, 1) {
trace(0, 2) {
span(0) {
operationName "SlickUtils.startQuery"
serviceName "unnamed-java-app"
resourceName "SlickUtils.startQuery"
parent()
errored false
tags {
defaultTags()
}
}
span(1) {
operationName "${SlickUtils.Driver()}.query"
serviceName SlickUtils.Driver()
resourceName SlickUtils.TestQuery()
childOf span(0)
errored false
tags {
"$Tags.COMPONENT.key" "java-jdbc-prepared_statement"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.SQL
"$Tags.DB_TYPE.key" SlickUtils.Driver()
"$Tags.DB_USER.key" SlickUtils.Username()
"db.jdbc.url" SlickUtils.Url()
"span.origin.type" "org.h2.jdbc.JdbcPreparedStatement"
defaultTags()
}
}
}
}
}
def "Concurrent requests do not throw exception"() {
setup:
def sleepFuture = database.startQuery(SlickUtils.SleepQuery())
def future = database.startQuery(SlickUtils.TestQuery())
def result = database.getResults(future)
database.getResults(sleepFuture)
expect:
result == SlickUtils.TestValue()
// Expect two traces because two queries have been run
assertTraces(TEST_WRITER, 2) {
trace(0, 2, {
span(0) {}
span(1) {}
})
trace(1, 2, {
span(0) {}
span(1) {}
})
}
}
}

View File

@ -0,0 +1,44 @@
import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import io.opentracing.util.GlobalTracer
import slick.jdbc.H2Profile.api._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
class SlickUtils {
import SlickUtils._
val database = Database.forURL(Url,
user=Username,
driver="org.h2.Driver",
keepAliveConnection=true,
// Limit number of threads to hit Slick-specific case when we need to avoid re-wrapping
// wrapped runnables.
executor=AsyncExecutor("test", numThreads=1, queueSize=1000)
)
Await.result(database.run(sqlu"""CREATE ALIAS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf)
@Trace
def startQuery(query: String): Future[Vector[Int]] = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
database.run(sql"#$query".as[Int])
}
def getResults(future: Future[Vector[Int]]): Int = {
Await.result(future, Duration.Inf).head
}
}
object SlickUtils {
val Driver = "h2"
val Username = "TESTUSER"
val Url = s"jdbc:${Driver}:mem:test"
val TestValue = 3
val TestQuery = "SELECT 3"
val SleepQuery = "CALL SLEEP(3000)"
}

View File

@ -17,7 +17,7 @@ class TagsAssert {
clone.delegate = asserter clone.delegate = asserter
clone.resolveStrategy = Closure.DELEGATE_FIRST clone.resolveStrategy = Closure.DELEGATE_FIRST
clone(asserter) clone(asserter)
asserter.assertTracesAllVerified() asserter.assertTagsAllVerified()
asserter asserter
} }
@ -51,12 +51,14 @@ class TagsAssert {
def arg = args[0] def arg = args[0]
if (arg instanceof Class) { if (arg instanceof Class) {
assert ((Class) arg).isInstance(tags[name]) assert ((Class) arg).isInstance(tags[name])
} else if (arg instanceof Closure) {
assert ((Closure) arg).call(tags[name])
} else { } else {
assert tags[name] == arg assert tags[name] == arg
} }
} }
void assertTracesAllVerified() { void assertTagsAllVerified() {
assert tags.keySet() == assertedTags assert tags.keySet() == assertedTags
} }
} }

View File

@ -22,7 +22,7 @@ class TraceAssert {
clone.delegate = asserter clone.delegate = asserter
clone.resolveStrategy = Closure.DELEGATE_FIRST clone.resolveStrategy = Closure.DELEGATE_FIRST
clone(asserter) clone(asserter)
asserter.assertTracesAllVerified() asserter.assertSpansAllVerified()
asserter asserter
} }
@ -41,7 +41,7 @@ class TraceAssert {
assertSpan(trace.get(index), spec) assertSpan(trace.get(index), spec)
} }
void assertTracesAllVerified() { void assertSpansAllVerified() {
assert assertedIndexes.size() == size assert assertedIndexes.size() == size
} }
} }

View File

@ -1,7 +1,7 @@
apply plugin: "jacoco" apply plugin: "jacoco"
jacoco { jacoco {
toolVersion = "0.8.0" toolVersion = "0.8.1"
} }
jacocoTestReport { jacocoTestReport {