Separate out Scala ForkJoinPool instrumentation from java_concurrent (DataDog/dd-trace-java#1687)

This commit is contained in:
Tyler Benson 2020-07-15 18:00:10 -07:00 committed by Trask Stalnaker
parent 9e20571dab
commit 9f4c1ae578
14 changed files with 132 additions and 158 deletions

View File

@ -5,39 +5,9 @@ ext {
}
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
// This won't work until the akka and scala integrations are split into separate projects.
//muzzle {
// pass {
// coreJdk()
// }
//}
testSets {
slickTest {
filter {
// this is needed because "test.dependsOn slickTest", and so without this,
// running a single test in the default test set will fail
setFailOnNoMatchingTests(false)
}
muzzle {
pass {
coreJdk()
}
}
compileSlickTestGroovy {
classpath += files(sourceSets.slickTest.scala.classesDirectory)
}
dependencies {
// This is needed for Scala ForJoinTask/Pool instrumentation
compileOnly deps.scala
slickTestImplementation project(':instrumentation:jdbc')
slickTestImplementation deps.scala
slickTestImplementation group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0'
slickTestImplementation group: 'com.h2database', name: 'h2', version: '1.4.197'
}
// Run Slick library tests along with the rest of unit tests
test.dependsOn slickTest

View File

@ -1,9 +0,0 @@
ext.skipPublish = true
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
dependencies {
compileOnly deps.scala
testImplementation deps.scala
}

View File

@ -1,102 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Function
import java.util.function.Supplier
/**
* Note: ideally this should live with the rest of ExecutorInstrumentationTest,
* but this code needs java8 so we put it here for now.
*/
class CompletableFutureTest extends AgentTestRunner {
def "CompletableFuture test"() {
setup:
def pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def differentPool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def supplier = new Supplier<String>() {
@Override
String get() {
TEST_TRACER.spanBuilder("supplier").startSpan().end()
sleep(1000)
return "a"
}
}
def function = new Function<String, String>() {
@Override
String apply(String s) {
TEST_TRACER.spanBuilder("function").startSpan().end()
return s + "c"
}
}
def result = new Supplier<String>() {
@Override
String get() {
runUnderTrace("parent") {
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
.get()
}
}
}.get()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
result == "abc"
TEST_WRITER.traces.size() == 1
trace.size() == 4
trace.get(0).name == "parent"
trace.get(1).name == "supplier"
trace.get(1).parentSpanId == trace.get(0).spanId
trace.get(2).name == "appendingSupplier"
trace.get(2).parentSpanId == trace.get(0).spanId
trace.get(3).name == "function"
trace.get(3).parentSpanId == trace.get(0).spanId
cleanup:
pool?.shutdown()
differentPool?.shutdown()
}
class AppendingSupplier implements Supplier<String> {
String letter
AppendingSupplier(String letter) {
this.letter = letter
}
@Override
String get() {
TEST_TRACER.spanBuilder("appendingSupplier").startSpan().end()
return letter + "b"
}
}
}

View File

@ -18,12 +18,72 @@ import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Function
import java.util.function.Supplier
import spock.lang.Requires
@Requires({ javaVersion >= 1.8 })
class CompletableFutureTest extends AgentTestRunner {
def "CompletableFuture test"() {
setup:
def pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def differentPool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def supplier = new Supplier<String>() {
@Override
String get() {
TEST_TRACER.spanBuilder("supplier").startSpan().end()
sleep(1000)
return "a"
}
}
def function = new Function<String, String>() {
@Override
String apply(String s) {
TEST_TRACER.spanBuilder("function").startSpan().end()
return s + "c"
}
}
def result = new Supplier<String>() {
@Override
String get() {
runUnderTrace("parent") {
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
.get()
}
}
}.get()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
result == "abc"
TEST_WRITER.traces.size() == 1
trace.size() == 4
trace.get(0).name == "parent"
trace.get(1).name == "supplier"
trace.get(1).parentSpanId == trace.get(0).spanId
trace.get(2).name == "appendingSupplier"
trace.get(2).parentSpanId == trace.get(0).spanId
trace.get(3).name == "function"
trace.get(3).parentSpanId == trace.get(0).spanId
cleanup:
pool?.shutdown()
differentPool?.shutdown()
}
def "test supplyAsync"() {
when:
CompletableFuture<String> completableFuture = runUnderTrace("parent") {
@ -178,4 +238,18 @@ class CompletableFutureTest extends AgentTestRunner {
}
}
}
class AppendingSupplier implements Supplier<String> {
String letter
AppendingSupplier(String letter) {
this.letter = letter
}
@Override
String get() {
TEST_TRACER.spanBuilder("appendingSupplier").startSpan().end()
return letter + "b"
}
}
}

View File

@ -0,0 +1,40 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = 'org.scala-lang'
module = "scala-library"
versions = "[2.8.0,2.12.0)"
assertInverse = true
}
}
testSets {
slickTest {
filter {
// this is needed because "test.dependsOn slickTest", and so without this,
// running a single test in the default test set will fail
setFailOnNoMatchingTests(false)
}
}
}
compileSlickTestGroovy {
classpath += files(sourceSets.slickTest.scala.classesDirectory)
}
dependencies {
library group: 'org.scala-lang', name: 'scala-library', version: '2.8.0'
latestDepTestLibrary group: 'org.scala-lang', name: 'scala-library', version: '2.11.+'
slickTestImplementation project(':instrumentation:jdbc')
slickTestImplementation deps.scala
slickTestImplementation group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0'
slickTestImplementation group: 'com.h2database', name: 'h2', version: '1.4.197'
}
// Run Slick library tests along with the rest of tests
test.dependsOn slickTest

View File

@ -14,9 +14,8 @@
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.javaconcurrent;
package io.opentelemetry.instrumentation.auto.scalaconcurrent;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
import static net.bytebuddy.matcher.ElementMatchers.named;
@ -33,20 +32,21 @@ import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.concurrent.forkjoin.ForkJoinTask;
@AutoService(Instrumenter.class)
public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrumentation {
public final class ScalaForkJoinPoolInstrumentation extends Instrumenter.Default {
public ScalaExecutorInstrumentation() {
super(EXEC_NAME + ".scala_fork_join");
public ScalaForkJoinPoolInstrumentation() {
super("java_concurrent", "scala_concurrent");
}
@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
// Optimization for expensive typeMatcher.
return hasClassesNamed(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME);
public ElementMatcher<TypeDescription> typeMatcher() {
// This might need to be an extendsClass matcher...
return named("scala.concurrent.forkjoin.ForkJoinPool");
}
@Override
@ -60,15 +60,15 @@ public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrume
transformers.put(
named("execute")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
transformers.put(
named("submit")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
transformers.put(
nameMatches("invoke")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
return transformers;
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.javaconcurrent;
package io.opentelemetry.instrumentation.auto.scalaconcurrent;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.extendsClass;
@ -54,7 +54,7 @@ public final class ScalaForkJoinTaskInstrumentation extends Instrumenter.Default
static final String TASK_CLASS_NAME = "scala.concurrent.forkjoin.ForkJoinTask";
public ScalaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME);
super("java_concurrent", "scala_concurrent");
}
@Override

View File

@ -98,7 +98,6 @@ include ':instrumentation:java-classloader:osgi-testing'
include ':instrumentation:java-classloader:tomcat-testing'
include ':instrumentation:java-concurrent'
include ':instrumentation:java-concurrent:kotlin-testing'
include ':instrumentation:java-concurrent:scala-testing'
include ':instrumentation:jaxrs:jaxrs-1.0'
include ':instrumentation:jaxrs:jaxrs-2.0'
include ':instrumentation:jaxrs:jaxrs-2.0:jaxrs-2.0-jersey-2.0'
@ -148,6 +147,8 @@ include ':instrumentation:rediscala-1.8'
include ':instrumentation:redisson-3.0'
include ':instrumentation:rmi'
include ':instrumentation:rxjava-1.0'
include ':instrumentation:rxjava-1.0'
include ':instrumentation:scala-concurrent'
include ':instrumentation:servlet:glassfish-testing'
include ':instrumentation:servlet:servlet-common'
include ':instrumentation:servlet:servlet-2.2'