diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/CallDepthThreadLocalMap.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/CallDepthThreadLocalMap.java index 4b03d5d4f2..20be8dfb9e 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/CallDepthThreadLocalMap.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/CallDepthThreadLocalMap.java @@ -2,7 +2,6 @@ package datadog.trace.bootstrap; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * Utility to track nested instrumentation. @@ -11,30 +10,27 @@ import java.util.concurrent.atomic.AtomicInteger; * #incrementCallDepth at the beginning of each constructor. */ public class CallDepthThreadLocalMap { - private static final ThreadLocal> TLS = - new ThreadLocal>() { + private static final ThreadLocal> TLS = + new ThreadLocal>() { @Override - public Map initialValue() { + public Map initialValue() { return new HashMap<>(); } }; public static int incrementCallDepth(final Object k) { - final Map map = TLS.get(); - AtomicInteger depth = map.get(k); + final Map map = TLS.get(); + Integer depth = map.get(k); if (depth == null) { - depth = new AtomicInteger(0); - map.put(k, depth); - return 0; + depth = 0; } else { - return depth.incrementAndGet(); + depth += 1; } + map.put(k, depth); + return depth; } public static void reset(final Object k) { - final Map map = TLS.get(); - if (map != null) { - map.remove(k); - } + TLS.get().remove(k); } } diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index 95820d5ab5..5b12ac9edb 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -1,4 +1,16 @@ apply from: "${rootDir}/gradle/java.gradle" +apply from: "${rootDir}/gradle/test-with-scala.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + slickTest +} + +compileSlickTestGroovy { + classpath = classpath.plus(files(compileSlickTestScala.destinationDir)) + dependsOn compileSlickTestScala +} dependencies { compile project(':dd-trace-api') @@ -11,4 +23,15 @@ dependencies { testCompile project(':dd-java-agent:testing') testCompile project(':dd-java-agent:instrumentation:trace-annotation') + + slickTestCompile project(':dd-java-agent:testing') + slickTestCompile project(':dd-java-agent:instrumentation:java-concurrent') + slickTestCompile project(':dd-java-agent:instrumentation:trace-annotation') + slickTestCompile project(':dd-java-agent:instrumentation:jdbc') + slickTestCompile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12' + slickTestCompile group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0' + slickTestCompile group: 'com.h2database', name: 'h2', version: '1.4.197' } + +// Run Slick library tests along with the rest of unit tests +test.dependsOn slickTest diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 2dca101cbc..40ecf1a329 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -12,6 +12,7 @@ import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.util.GlobalTracer; @@ -49,6 +50,12 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { * may be lifted to include all executors. */ private static final Collection WHITELISTED_EXECUTORS; + /** + * Some frameworks have their executors defined as anon classes inside other classes. Referencing + * anon classes by name would be fragile, so instead we will use list of class prefix names. Since + * checking this list is more expensive (O(n)) we should try to keep it short. + */ + private static final Collection WHITELISTED_EXECUTORS_PREFIXES; static { final String[] whitelist = { @@ -84,6 +91,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { "play.api.libs.streams.Execution$trampoline$" }; WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist))); + + final String[] whitelistPrefixes = {"slick.util.AsyncExecutor$"}; + WHITELISTED_EXECUTORS_PREFIXES = + Collections.unmodifiableCollection(Arrays.asList(whitelistPrefixes)); } public ExecutorInstrumentation() { @@ -98,7 +109,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { new ElementMatcher() { @Override public boolean matches(final TypeDescription target) { - final boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); + boolean whitelisted = WHITELISTED_EXECUTORS.contains(target.getName()); + + // Check for possible prefixes match only if not whitelisted already + if (!whitelisted) { + for (String name : WHITELISTED_EXECUTORS_PREFIXES) { + if (target.getName().startsWith(name)) { + whitelisted = true; + break; + } + } + } + if (!whitelisted) { log.debug("Skipping executor instrumentation for {}", target.getName()); } @@ -135,71 +157,65 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { } public static class WrapRunnableAdvice { + + @SuppressWarnings("unused") @Advice.OnMethodEnter(suppress = Throwable.class) - public static DatadogWrapper wrapJob( - @Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Runnable task) { + public static DatadogWrapper enterJobSubmit( + @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope - && ((TraceScope) scope).isAsyncPropagating() - && task != null - && !(task instanceof DatadogWrapper) - && (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { + if (DatadogWrapper.shouldWrapTask(task)) { task = new RunnableWrapper(task, (TraceScope) scope); return (RunnableWrapper) task; } return null; } + @SuppressWarnings("unused") @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void checkCancel( + public static void exitJobSubmit( @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { - if (null != wrapper && null != throwable) { - wrapper.cancel(); - } + DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable); } } public static class WrapCallableAdvice { + + @SuppressWarnings("unused") @Advice.OnMethodEnter(suppress = Throwable.class) - public static DatadogWrapper wrapJob( - @Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Callable task) { + public static DatadogWrapper enterJobSubmit( + @Advice.Argument(value = 0, readOnly = false) Callable task) { + final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope - && ((TraceScope) scope).isAsyncPropagating() - && task != null - && !(task instanceof DatadogWrapper) - && (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { + if (DatadogWrapper.shouldWrapTask(task)) { task = new CallableWrapper<>(task, (TraceScope) scope); return (CallableWrapper) task; } return null; } + @SuppressWarnings("unused") @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void checkCancel( + public static void exitJobSubmit( @Advice.Enter final DatadogWrapper wrapper, @Advice.Thrown final Throwable throwable) { - if (null != wrapper && null != throwable) { - wrapper.cancel(); - } + DatadogWrapper.cleanUpOnMethodExit(wrapper, throwable); } } public static class WrapCallableCollectionAdvice { + + @SuppressWarnings("unused") @Advice.OnMethodEnter(suppress = Throwable.class) public static Collection wrapJob( - @Advice.This Object dis, @Advice.Argument(value = 0, readOnly = false) Collection> tasks) { final Scope scope = GlobalTracer.get().scopeManager().active(); if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating() - && (!dis.getClass().getName().startsWith("slick.util.AsyncExecutor"))) { + && tasks != null + && DatadogWrapper.isTopLevelCall()) { final Collection> wrappedTasks = new ArrayList<>(tasks.size()); for (Callable task : tasks) { - if (task != null) { - if (!(task instanceof CallableWrapper)) { - task = new CallableWrapper<>(task, (TraceScope) scope); - } - wrappedTasks.add(task); + if (task != null && !(task instanceof CallableWrapper)) { + wrappedTasks.add(new CallableWrapper<>(task, (TraceScope) scope)); } } tasks = wrappedTasks; @@ -208,13 +224,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { return null; } + @SuppressWarnings("unused") @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void checkCancel( @Advice.Enter final Collection wrappedJobs, @Advice.Thrown final Throwable throwable) { - if (null != wrappedJobs && null != throwable) { - for (final Object wrapper : wrappedJobs) { - if (wrapper instanceof DatadogWrapper) { - ((DatadogWrapper) wrapper).cancel(); + if (null != wrappedJobs) { + DatadogWrapper.resetNestedCalls(); + + if (null != throwable) { + for (final Object wrapper : wrappedJobs) { + if (wrapper instanceof DatadogWrapper) { + ((DatadogWrapper) wrapper).cancel(); + } } } } @@ -237,6 +258,56 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable { log.debug("canceled continuation {}", continuation); } } + + /** + * Check if given call to executor is nested. We would like to ignore nested calls to execute to + * avoid wrapping tasks twice. Note: this condition may lead to problems with executors that + * 'fork' several tasks, but we do not have such executors at the moment. Note: this condition + * is mutating and needs to be checked right before task is actually wrapped. + * + * @return true iff call is not nested + */ + @SuppressWarnings("WeakerAccess") + public static boolean isTopLevelCall() { + return CallDepthThreadLocalMap.incrementCallDepth(ExecutorService.class) <= 0; + } + + /** Reset nested calls to executor. */ + @SuppressWarnings("WeakerAccess") + public static void resetNestedCalls() { + CallDepthThreadLocalMap.reset(ExecutorService.class); + } + + /** + * @param task task object + * @return true iff given task object should be wrapped + */ + @SuppressWarnings("WeakerAccess") + public static boolean shouldWrapTask(Object task) { + final Scope scope = GlobalTracer.get().scopeManager().active(); + return (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncPropagating() + && task != null + && !(task instanceof DatadogWrapper) + && isTopLevelCall()); + } + + /** + * Clean up after job submission method has exited + * + * @param wrapper task wrapper + * @param throwable throwable that may have been thrown + */ + @SuppressWarnings("WeakerAccess") + public static void cleanUpOnMethodExit( + final DatadogWrapper wrapper, final Throwable throwable) { + if (null != wrapper) { + resetNestedCalls(); + if (null != throwable) { + wrapper.cancel(); + } + } + } } @Slf4j diff --git a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy new file mode 100644 index 0000000000..0cd7210782 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/groovy/SlickTest.groovy @@ -0,0 +1,82 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags +import spock.lang.Shared + +import static datadog.trace.agent.test.ListWriterAssert.assertTraces + +class SlickTest extends AgentTestRunner { + + @Shared + def database = new SlickUtils() + + def "Basic statement generates spans"() { + setup: + def future = database.startQuery(SlickUtils.TestQuery()) + def result = database.getResults(future) + + expect: + result == SlickUtils.TestValue() + + assertTraces(TEST_WRITER, 1) { + trace(0, 2) { + span(0) { + operationName "SlickUtils.startQuery" + serviceName "unnamed-java-app" + resourceName "SlickUtils.startQuery" + parent() + errored false + tags { + defaultTags() + } + } + span(1) { + operationName "${SlickUtils.Driver()}.query" + serviceName SlickUtils.Driver() + resourceName SlickUtils.TestQuery() + childOf span(0) + errored false + tags { + "$Tags.COMPONENT.key" "java-jdbc-prepared_statement" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.SQL + + "$Tags.DB_TYPE.key" SlickUtils.Driver() + "$Tags.DB_USER.key" SlickUtils.Username() + + "db.jdbc.url" SlickUtils.Url() + "span.origin.type" "org.h2.jdbc.JdbcPreparedStatement" + + defaultTags() + } + } + } + } + } + + def "Concurrent requests do not throw exception"() { + setup: + def sleepFuture = database.startQuery(SlickUtils.SleepQuery()) + + def future = database.startQuery(SlickUtils.TestQuery()) + def result = database.getResults(future) + + database.getResults(sleepFuture) + + expect: + result == SlickUtils.TestValue() + + // Expect two traces because two queries have been run + assertTraces(TEST_WRITER, 2) { + trace(0, 2, { + span(0) {} + span(1) {} + }) + trace(1, 2, { + span(0) {} + span(1) {} + }) + } + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala new file mode 100644 index 0000000000..34f81803b3 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/slickTest/scala/SlickUtils.scala @@ -0,0 +1,44 @@ +import datadog.trace.api.Trace +import datadog.trace.context.TraceScope +import io.opentracing.util.GlobalTracer +import slick.jdbc.H2Profile.api._ + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration + +class SlickUtils { + import SlickUtils._ + + val database = Database.forURL(Url, + user=Username, + driver="org.h2.Driver", + keepAliveConnection=true, + // Limit number of threads to hit Slick-specific case when we need to avoid re-wrapping + // wrapped runnables. + executor=AsyncExecutor("test", numThreads=1, queueSize=1000) + ) + Await.result(database.run(sqlu"""CREATE ALIAS SLEEP FOR "java.lang.Thread.sleep(long)""""), Duration.Inf) + + @Trace + def startQuery(query: String): Future[Vector[Int]] = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) + database.run(sql"#$query".as[Int]) + } + + def getResults(future: Future[Vector[Int]]): Int = { + Await.result(future, Duration.Inf).head + } + +} + +object SlickUtils { + + val Driver = "h2" + val Username = "TESTUSER" + val Url = s"jdbc:${Driver}:mem:test" + val TestValue = 3 + val TestQuery = "SELECT 3" + + val SleepQuery = "CALL SLEEP(3000)" + +} diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy index 7710627c3e..d3f4dbf7b6 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy @@ -17,7 +17,7 @@ class TagsAssert { clone.delegate = asserter clone.resolveStrategy = Closure.DELEGATE_FIRST clone(asserter) - asserter.assertTracesAllVerified() + asserter.assertTagsAllVerified() asserter } @@ -51,12 +51,14 @@ class TagsAssert { def arg = args[0] if (arg instanceof Class) { assert ((Class) arg).isInstance(tags[name]) + } else if (arg instanceof Closure) { + assert ((Closure) arg).call(tags[name]) } else { assert tags[name] == arg } } - void assertTracesAllVerified() { + void assertTagsAllVerified() { assert tags.keySet() == assertedTags } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy index 52147892f7..e2430ce6a2 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy @@ -22,7 +22,7 @@ class TraceAssert { clone.delegate = asserter clone.resolveStrategy = Closure.DELEGATE_FIRST clone(asserter) - asserter.assertTracesAllVerified() + asserter.assertSpansAllVerified() asserter } @@ -41,7 +41,7 @@ class TraceAssert { assertSpan(trace.get(index), spec) } - void assertTracesAllVerified() { + void assertSpansAllVerified() { assert assertedIndexes.size() == size } } diff --git a/gradle/jacoco.gradle b/gradle/jacoco.gradle index 2778b0602a..fc3cc6b012 100644 --- a/gradle/jacoco.gradle +++ b/gradle/jacoco.gradle @@ -1,7 +1,7 @@ apply plugin: "jacoco" jacoco { - toolVersion = "0.8.0" + toolVersion = "0.8.1" } jacocoTestReport {