Merge pull request #1107 from trask/dd-merge-part-1

This commit is contained in:
Tyler Benson 2020-09-01 14:57:14 -04:00 committed by GitHub
commit bae8d8d56d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1187 additions and 279 deletions

View File

@ -84,6 +84,8 @@ public class Config {
public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled";
public static final String HYSTRIX_TAGS_ENABLED = "hystrix.tags.enabled";
public static final String ENDPOINT_PEER_SERVICE_MAPPING = "endpoint.peer.service.mapping";
private static final boolean DEFAULT_TRACE_ENABLED = true;
@ -100,6 +102,8 @@ public class Config {
public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true;
public static final boolean DEFAULT_HYSTRIX_TAGS_ENABLED = false;
private static final String DEFAULT_TRACE_ANNOTATIONS = null;
private static final boolean DEFAULT_TRACE_EXECUTORS_ALL = false;
private static final String DEFAULT_TRACE_EXECUTORS = "";
@ -132,6 +136,8 @@ public class Config {
private final boolean kafkaClientPropagationEnabled;
private final boolean hystrixTagsEnabled;
private final Map<String, String> endpointPeerServiceMapping;
// Values from an optionally provided properties file
@ -185,6 +191,9 @@ public class Config {
getBooleanSettingFromEnvironment(
KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED);
hystrixTagsEnabled =
getBooleanSettingFromEnvironment(HYSTRIX_TAGS_ENABLED, DEFAULT_HYSTRIX_TAGS_ENABLED);
endpointPeerServiceMapping = getMapSettingFromEnvironment(ENDPOINT_PEER_SERVICE_MAPPING);
log.debug("New instance: {}", this);
@ -237,6 +246,9 @@ public class Config {
getPropertyBooleanValue(
properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled);
hystrixTagsEnabled =
getBooleanSettingFromEnvironment(HYSTRIX_TAGS_ENABLED, parent.hystrixTagsEnabled);
endpointPeerServiceMapping =
getPropertyMapValue(
properties, ENDPOINT_PEER_SERVICE_MAPPING, parent.endpointPeerServiceMapping);
@ -567,6 +579,10 @@ public class Config {
return kafkaClientPropagationEnabled;
}
public boolean isHystrixTagsEnabled() {
return hystrixTagsEnabled;
}
public Map<String, String> getEndpointPeerServiceMapping() {
return endpointPeerServiceMapping;
}
@ -613,6 +629,8 @@ public class Config {
+ sqlNormalizerEnabled
+ ", kafkaClientPropagationEnabled="
+ kafkaClientPropagationEnabled
+ ", hystrixTagsEnabled="
+ hystrixTagsEnabled
+ ", endpointPeerServiceMapping="
+ endpointPeerServiceMapping
+ '}';

View File

@ -28,19 +28,19 @@ import io.opentelemetry.instrumentation.auto.api.ContextStore;
import io.opentelemetry.instrumentation.auto.api.InstrumentationContext;
import io.opentelemetry.instrumentation.auto.api.concurrent.ExecutorInstrumentationUtils;
import io.opentelemetry.instrumentation.auto.api.concurrent.State;
import io.opentelemetry.instrumentation.auto.javaconcurrent.AbstractExecutorInstrumentation;
import io.opentelemetry.javaagent.tooling.Instrumenter;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumentation {
public final class AkkaForkJoinPoolInstrumentation extends Instrumenter.Default {
public AkkaExecutorInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME + ".akka_fork_join");
public AkkaForkJoinPoolInstrumentation() {
super("akka_context_propagation");
}
@Override
@ -48,6 +48,12 @@ public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumen
return false;
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// This might need to be an extendsClass matcher...
return named("akka.dispatch.forkjoin.ForkJoinPool");
}
@Override
public Map<String, String> contextStore() {
return singletonMap(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME, State.class.getName());
@ -59,15 +65,15 @@ public final class AkkaExecutorInstrumentation extends AbstractExecutorInstrumen
transformers.put(
named("execute")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
AkkaExecutorInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
AkkaForkJoinPoolInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
transformers.put(
named("submit")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
AkkaExecutorInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
AkkaForkJoinPoolInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
transformers.put(
nameMatches("invoke")
.and(takesArgument(0, named(AkkaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
AkkaExecutorInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
AkkaForkJoinPoolInstrumentation.class.getName() + "$SetAkkaForkJoinStateAdvice");
return transformers;
}

View File

@ -32,7 +32,6 @@ import io.opentelemetry.instrumentation.auto.api.ContextStore;
import io.opentelemetry.instrumentation.auto.api.InstrumentationContext;
import io.opentelemetry.instrumentation.auto.api.concurrent.AdviceUtils;
import io.opentelemetry.instrumentation.auto.api.concurrent.State;
import io.opentelemetry.instrumentation.auto.javaconcurrent.AbstractExecutorInstrumentation;
import io.opentelemetry.javaagent.tooling.Instrumenter;
import java.util.Collections;
import java.util.HashMap;
@ -55,7 +54,7 @@ public final class AkkaForkJoinTaskInstrumentation extends Instrumenter.Default
static final String TASK_CLASS_NAME = "akka.dispatch.forkjoin.ForkJoinTask";
public AkkaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME + ".akka_fork_join");
super("akka_context_propagation");
}
@Override

View File

@ -36,7 +36,7 @@ import spock.lang.Shared
class AkkaExecutorInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("otel.integration.java_concurrent.akka_fork_join.enabled", "true")
System.setProperty("otel.integration.akka_context_propagation.enabled", "true")
}
@Shared

View File

@ -50,6 +50,7 @@ dependencies {
testLibrary group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.2.0'
testImplementation group: 'org.cassandraunit', name: 'cassandra-unit', version: '3.1.3.2'
testImplementation project(':instrumentation:guava-10.0')
latestDepTestLibrary group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.+'
}

View File

@ -24,6 +24,8 @@ import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import spock.lang.Shared
@ -32,6 +34,9 @@ class CassandraClientTest extends AgentTestRunner {
@Shared
Cluster cluster
@Shared
def executor = Executors.newCachedThreadPool()
def setupSpec() {
/*
This timeout seems excessive but we've seen tests fail with timeout of 40s.
@ -86,9 +91,15 @@ class CassandraClientTest extends AgentTestRunner {
def "test async"() {
setup:
def callbackExecuted = new AtomicBoolean()
Session session = cluster.connect(keyspace)
runUnderTrace("parent") {
session.executeAsync(statement)
def future = session.executeAsync(statement)
future.addListener({ ->
runUnderTrace("callbackListener") {
callbackExecuted.set(true)
}
}, executor)
}
expect:
@ -98,9 +109,10 @@ class CassandraClientTest extends AgentTestRunner {
cassandraSpan(it, 0, "USE $keyspace", null)
}
}
trace(keyspace ? 1 : 0, 2) {
trace(keyspace ? 1 : 0, 3) {
basicSpan(it, 0, "parent")
cassandraSpan(it, 1, statement, keyspace, span(0))
basicSpan(it, 2, "callbackListener", span(0))
}
}

View File

@ -6,6 +6,12 @@ ext {
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest
}
muzzle {
// There are some weird library issues below 2.9 so can't assert inverse
pass {
@ -36,5 +42,10 @@ dependencies {
// Required for older versions of finatra on JDKs >= 11
testImplementation group: 'com.sun.activation', name: 'javax.activation', version: '1.2.0'
latestDepTestLibrary group: 'com.twitter', name: 'finatra-http_2.11', version: '20.6.+'
// TODO latestDepTestLibrary doesn't work here
latestDepTestImplementation group: 'com.twitter', name: 'finatra-http_2.11', version: '+'
}
compileLatestDepTestGroovy {
classpath += files(sourceSets.latestDepTest.scala.classesDirectory)
}

View File

@ -0,0 +1,108 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint.SUCCESS
import static io.opentelemetry.trace.Span.Kind.INTERNAL
import com.twitter.app.lifecycle.Event
import com.twitter.app.lifecycle.Observer
import com.twitter.finatra.http.HttpServer
import com.twitter.util.Await
import com.twitter.util.Closable
import com.twitter.util.Duration
import com.twitter.util.Promise
import io.opentelemetry.auto.test.asserts.TraceAssert
import io.opentelemetry.auto.test.base.HttpServerTest
import io.opentelemetry.sdk.trace.data.SpanData
class FinatraServerLatestTest extends HttpServerTest<HttpServer> {
private static final Duration TIMEOUT = Duration.fromSeconds(5)
private static final Duration STARTUP_TIMEOUT = Duration.fromSeconds(20)
static closeAndWait(Closable closable) {
if (closable != null) {
Await.ready(closable.close(), TIMEOUT)
}
}
@Override
HttpServer startServer(int port) {
HttpServer testServer = new FinatraServer()
// Starting the server is blocking so start it in a separate thread
Thread startupThread = new Thread({
testServer.main("-admin.port=:0", "-http.port=:" + port)
})
startupThread.setDaemon(true)
startupThread.start()
Promise<Boolean> startupPromise = new Promise<>()
testServer.withObserver(new Observer() {
@Override
void onSuccess(Event event) {
if (event == testServer.startupCompletionEvent()) {
startupPromise.setValue(true)
}
}
void onEntry(Event event) {
}
@Override
void onFailure(Event stage, Throwable throwable) {
if (stage != Event.Close$.MODULE$) {
startupPromise.setException(throwable)
}
}
})
Await.result(startupPromise, STARTUP_TIMEOUT)
return testServer
}
@Override
boolean hasHandlerSpan() {
return true
}
@Override
boolean testNotFound() {
// Resource name is set to "GET /notFound"
false
}
@Override
void stopServer(HttpServer httpServer) {
Await.ready(httpServer.close(), TIMEOUT)
}
@Override
void handlerSpan(TraceAssert trace, int index, Object parent, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
operationName "FinatraController"
spanKind INTERNAL
childOf(parent as SpanData)
// Finatra doesn't propagate the stack trace or exception to the instrumentation
// so the normal errorAttributes() method can't be used
errored false
attributes {
}
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finatra.http.Controller
import com.twitter.util.Future
import groovy.lang.Closure
import io.opentelemetry.auto.test.base.HttpServerTest.ServerEndpoint._
import io.opentelemetry.auto.test.base.HttpServerTest.controller
class FinatraController extends Controller {
any(SUCCESS.getPath) { request: Request =>
controller(SUCCESS, new Closure[Response](null) {
override def call(): Response = {
response.ok(SUCCESS.getBody)
}
})
}
any(ERROR.getPath) { request: Request =>
controller(ERROR, new Closure[Response](null) {
override def call(): Response = {
response.internalServerError(ERROR.getBody)
}
})
}
any(QUERY_PARAM.getPath) { request: Request =>
controller(QUERY_PARAM, new Closure[Response](null) {
override def call(): Response = {
response.ok(QUERY_PARAM.getBody)
}
})
}
any(EXCEPTION.getPath) { request: Request =>
controller(EXCEPTION, new Closure[Future[Response]](null) {
override def call(): Future[Response] = {
throw new Exception(EXCEPTION.getBody)
}
})
}
any(REDIRECT.getPath) { request: Request =>
controller(REDIRECT, new Closure[Response](null) {
override def call(): Response = {
response.found.location(REDIRECT.getBody)
}
})
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.twitter.finagle.http.Request
import com.twitter.finatra.http.HttpServer
import com.twitter.finatra.http.filters.ExceptionMappingFilter
import com.twitter.finatra.http.routing.HttpRouter
class FinatraServer extends HttpServer {
override protected def configureHttp(router: HttpRouter): Unit = {
router
.filter[ExceptionMappingFilter[Request]]
.add[FinatraController]
.exceptionMapper[ResponseSettingExceptionMapper]
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finatra.http.exceptions.ExceptionMapper
import com.twitter.finatra.http.response.ResponseBuilder
import javax.inject.{Inject, Singleton}
@Singleton
class ResponseSettingExceptionMapper @Inject()(response: ResponseBuilder)
extends ExceptionMapper[Exception] {
override def toResponse(request: Request, exception: Exception): Response = {
response.internalServerError(exception.getMessage)
}
}

View File

@ -0,0 +1,14 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {
pass {
group = "com.google.guava"
module = "guava"
versions = "[10.0,]"
assertInverse = true
}
}
dependencies {
library group: 'com.google.guava', name: 'guava', version: '10.0'
}

View File

@ -0,0 +1,88 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.guava;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import com.google.common.util.concurrent.AbstractFuture;
import io.grpc.Context;
import io.opentelemetry.instrumentation.auto.api.ContextStore;
import io.opentelemetry.instrumentation.auto.api.InstrumentationContext;
import io.opentelemetry.instrumentation.auto.api.concurrent.ExecutorInstrumentationUtils;
import io.opentelemetry.instrumentation.auto.api.concurrent.RunnableWrapper;
import io.opentelemetry.instrumentation.auto.api.concurrent.State;
import io.opentelemetry.javaagent.tooling.Instrumenter;
import java.util.Map;
import java.util.concurrent.Executor;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
@AutoService(Instrumenter.class)
public class ListenableFutureInstrumentation extends Instrumenter.Default {
public ListenableFutureInstrumentation() {
super("guava");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.google.common.util.concurrent.AbstractFuture");
}
@Override
public Map<String, String> contextStore() {
return singletonMap(Runnable.class.getName(), State.class.getName());
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
named("addListener").and(ElementMatchers.takesArguments(Runnable.class, Executor.class)),
ListenableFutureInstrumentation.class.getName() + "$AddListenerAdvice");
}
public static class AddListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State addListenerEnter(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Context context = Context.current();
final Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
task = newTask;
final ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(contextStore, newTask, context);
}
return null;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addListenerExit(
@Advice.Enter final State state, @Advice.Thrown final Throwable throwable) {
ExecutorInstrumentationUtils.cleanUpOnMethodExit(state, throwable);
}
private static void muzzleCheck(final AbstractFuture<?> future) {
future.addListener(null, null);
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import io.opentelemetry.auto.test.base.AbstractPromiseTest
import java.util.concurrent.Executors
import spock.lang.Shared
class ListenableFutureTest extends AbstractPromiseTest<SettableFuture<Boolean>, ListenableFuture<String>> {
@Shared
def executor = Executors.newFixedThreadPool(1)
@Override
SettableFuture<Boolean> newPromise() {
return SettableFuture.create()
}
@Override
ListenableFuture<String> map(SettableFuture<Boolean> promise, Closure<String> callback) {
return Futures.transform(promise, callback, executor)
}
@Override
void onComplete(ListenableFuture<String> promise, Closure callback) {
promise.addListener({ -> callback(promise.get()) }, executor)
}
@Override
void complete(SettableFuture<Boolean> promise, boolean value) {
promise.set(value)
}
@Override
Boolean get(SettableFuture<Boolean> promise) {
return promise.get()
}
}

View File

@ -17,12 +17,19 @@
package io.opentelemetry.instrumentation.auto.hystrix;
import com.netflix.hystrix.HystrixInvokableInfo;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.decorator.BaseDecorator;
import io.opentelemetry.trace.Span;
public class HystrixDecorator extends BaseDecorator {
public static final HystrixDecorator DECORATE = new HystrixDecorator();
private final boolean extraTags;
private HystrixDecorator() {
extraTags = Config.get().isHystrixTagsEnabled();
}
public void onCommand(Span span, HystrixInvokableInfo<?> command, String methodName) {
if (command != null) {
String commandName = command.getCommandKey().name();
@ -32,9 +39,11 @@ public class HystrixDecorator extends BaseDecorator {
String spanName = groupName + "." + commandName + "." + methodName;
span.updateName(spanName);
span.setAttribute("hystrix.command", commandName);
span.setAttribute("hystrix.group", groupName);
span.setAttribute("hystrix.circuit-open", circuitOpen);
if (extraTags) {
span.setAttribute("hystrix.command", commandName);
span.setAttribute("hystrix.group", groupName);
span.setAttribute("hystrix.circuit-open", circuitOpen);
}
}
}
}

View File

@ -19,6 +19,7 @@ import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import com.netflix.hystrix.HystrixObservableCommand
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.ConfigUtils
import rx.Observable
import rx.schedulers.Schedulers
@ -26,6 +27,9 @@ class HystrixObservableChainTest extends AgentTestRunner {
static {
// Disable so failure testing below doesn't inadvertently change the behavior.
System.setProperty("hystrix.command.default.circuitBreaker.enabled", "false")
ConfigUtils.updateConfig {
System.setProperty("otel.hystrix.tags.enabled", "true")
}
// Uncomment for debugging:
// System.setProperty("hystrix.command.default.execution.timeout.enabled", "false")

View File

@ -21,6 +21,7 @@ import com.netflix.hystrix.HystrixObservable
import com.netflix.hystrix.HystrixObservableCommand
import com.netflix.hystrix.exception.HystrixRuntimeException
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.ConfigUtils
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import rx.Observable
@ -30,6 +31,9 @@ class HystrixObservableTest extends AgentTestRunner {
static {
// Disable so failure testing below doesn't inadvertently change the behavior.
System.setProperty("hystrix.command.default.circuitBreaker.enabled", "false")
ConfigUtils.updateConfig {
System.setProperty("otel.hystrix.tags.enabled", "true")
}
// Uncomment for debugging:
// System.setProperty("hystrix.command.default.execution.timeout.enabled", "false")

View File

@ -19,6 +19,7 @@ import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import com.netflix.hystrix.HystrixCommand
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.auto.test.utils.ConfigUtils
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import spock.lang.Timeout
@ -28,6 +29,9 @@ class HystrixTest extends AgentTestRunner {
static {
// Disable so failure testing below doesn't inadvertently change the behavior.
System.setProperty("hystrix.command.default.circuitBreaker.enabled", "false")
ConfigUtils.updateConfig {
System.setProperty("otel.hystrix.tags.enabled", "true")
}
// Uncomment for debugging:
// System.setProperty("hystrix.command.default.execution.timeout.enabled", "false")

View File

@ -5,39 +5,9 @@ ext {
}
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
// This won't work until the akka and scala integrations are split into separate projects.
//muzzle {
// pass {
// coreJdk()
// }
//}
testSets {
slickTest {
filter {
// this is needed because "test.dependsOn slickTest", and so without this,
// running a single test in the default test set will fail
setFailOnNoMatchingTests(false)
}
muzzle {
pass {
coreJdk()
}
}
compileSlickTestGroovy {
classpath += files(sourceSets.slickTest.scala.classesDirectory)
}
dependencies {
// This is needed for Scala ForJoinTask/Pool instrumentation
compileOnly deps.scala
slickTestImplementation project(':instrumentation:jdbc')
slickTestImplementation deps.scala
slickTestImplementation group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0'
slickTestImplementation group: 'com.h2database', name: 'h2', version: '1.4.197'
}
// Run Slick library tests along with the rest of unit tests
test.dependsOn slickTest

View File

@ -1,9 +0,0 @@
ext.skipPublish = true
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
dependencies {
compileOnly deps.scala
testImplementation deps.scala
}

View File

@ -1,102 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Function
import java.util.function.Supplier
/**
* Note: ideally this should live with the rest of ExecutorInstrumentationTest,
* but this code needs java8 so we put it here for now.
*/
class CompletableFutureTest extends AgentTestRunner {
def "CompletableFuture test"() {
setup:
def pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def differentPool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def supplier = new Supplier<String>() {
@Override
String get() {
TEST_TRACER.spanBuilder("supplier").startSpan().end()
sleep(1000)
return "a"
}
}
def function = new Function<String, String>() {
@Override
String apply(String s) {
TEST_TRACER.spanBuilder("function").startSpan().end()
return s + "c"
}
}
def result = new Supplier<String>() {
@Override
String get() {
runUnderTrace("parent") {
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
.get()
}
}
}.get()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
result == "abc"
TEST_WRITER.traces.size() == 1
trace.size() == 4
trace.get(0).name == "parent"
trace.get(1).name == "supplier"
trace.get(1).parentSpanId == trace.get(0).spanId
trace.get(2).name == "appendingSupplier"
trace.get(2).parentSpanId == trace.get(0).spanId
trace.get(3).name == "function"
trace.get(3).parentSpanId == trace.get(0).spanId
cleanup:
pool?.shutdown()
differentPool?.shutdown()
}
class AppendingSupplier implements Supplier<String> {
String letter
AppendingSupplier(String letter) {
this.letter = letter
}
@Override
String get() {
TEST_TRACER.spanBuilder("appendingSupplier").startSpan().end()
return letter + "b"
}
}
}

View File

@ -18,12 +18,72 @@ import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Function
import java.util.function.Supplier
import spock.lang.Requires
@Requires({ javaVersion >= 1.8 })
class CompletableFutureTest extends AgentTestRunner {
def "CompletableFuture test"() {
setup:
def pool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def differentPool = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
def supplier = new Supplier<String>() {
@Override
String get() {
TEST_TRACER.spanBuilder("supplier").startSpan().end()
sleep(1000)
return "a"
}
}
def function = new Function<String, String>() {
@Override
String apply(String s) {
TEST_TRACER.spanBuilder("function").startSpan().end()
return s + "c"
}
}
def result = new Supplier<String>() {
@Override
String get() {
runUnderTrace("parent") {
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
.get()
}
}
}.get()
TEST_WRITER.waitForTraces(1)
List<SpanData> trace = TEST_WRITER.traces[0]
expect:
result == "abc"
TEST_WRITER.traces.size() == 1
trace.size() == 4
trace.get(0).name == "parent"
trace.get(1).name == "supplier"
trace.get(1).parentSpanId == trace.get(0).spanId
trace.get(2).name == "appendingSupplier"
trace.get(2).parentSpanId == trace.get(0).spanId
trace.get(3).name == "function"
trace.get(3).parentSpanId == trace.get(0).spanId
cleanup:
pool?.shutdown()
differentPool?.shutdown()
}
def "test supplyAsync"() {
when:
CompletableFuture<String> completableFuture = runUnderTrace("parent") {
@ -178,4 +238,18 @@ class CompletableFutureTest extends AgentTestRunner {
}
}
}
class AppendingSupplier implements Supplier<String> {
String letter
AppendingSupplier(String letter) {
this.letter = letter
}
@Override
String get() {
TEST_TRACER.spanBuilder("appendingSupplier").startSpan().end()
return letter + "b"
}
}
}

View File

@ -16,6 +16,7 @@
package io.opentelemetry.instrumentation.auto.kafkaclients;
import static io.opentelemetry.context.ContextUtils.withScopedContext;
import static io.opentelemetry.instrumentation.auto.kafkaclients.KafkaDecorator.DECORATE;
import static io.opentelemetry.instrumentation.auto.kafkaclients.KafkaDecorator.TRACER;
import static io.opentelemetry.instrumentation.auto.kafkaclients.TextMapInjectAdapter.SETTER;
@ -86,15 +87,15 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
Context parent = Context.current();
Span span =
TRACER.spanBuilder(DECORATE.spanNameOnProduce(record)).setSpanKind(PRODUCER).startSpan();
DECORATE.afterStart(span);
DECORATE.onProduce(span, record);
callback = new ProducerCallback(callback, span);
callback = new ProducerCallback(callback, parent, span);
boolean isTombstone = record.value() == null && !record.headers().iterator().hasNext();
if (isTombstone) {
if (record.value() == null) {
span.setAttribute("tombstone", true);
}
@ -106,9 +107,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
// headers attempt to read messages that were produced by clients > 0.11 and the magic
// value of the broker(s) is >= 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()
// Must not interfere with tombstones
&& !isTombstone) {
&& Config.get().isKafkaClientPropagationEnabled()) {
Context context = withSpan(span, Context.current());
try {
OpenTelemetry.getPropagators()
@ -147,24 +146,27 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
public static class ProducerCallback implements Callback {
private final Callback callback;
private final Context parent;
private final Span span;
public ProducerCallback(Callback callback, Span span) {
public ProducerCallback(Callback callback, Context parent, Span span) {
this.callback = callback;
this.parent = parent;
this.span = span;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
try (Scope scope = currentContextWith(span)) {
DECORATE.onError(span, exception);
try {
if (callback != null) {
DECORATE.onError(span, exception);
DECORATE.beforeFinish(span);
span.end();
if (callback != null) {
if (parent != null) {
try (Scope scope = withScopedContext(parent)) {
callback.onCompletion(metadata, exception);
}
} finally {
DECORATE.beforeFinish(span);
span.end();
} else {
callback.onCompletion(metadata, exception);
}
}
}

View File

@ -91,9 +91,7 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
long startTimeMillis = System.currentTimeMillis();
spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis));
Span span = spanBuilder.startSpan();
// tombstone checking logic here because it can only be inferred
// from the record itself
if (next.value() == null && !next.headers().iterator().hasNext()) {
if (next.value() == null) {
span.setAttribute("tombstone", true);
}
decorator.afterStart(span);

View File

@ -15,6 +15,8 @@
*/
import static io.opentelemetry.auto.test.utils.ConfigUtils.withConfigOverride
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER
@ -27,8 +29,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@ -47,6 +51,90 @@ class KafkaClientTest extends AgentTestRunner {
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)
def "test kafka produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
Producer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties = containerProperties()
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
String greeting = "Hello Spring Kafka Sender!"
runUnderTrace("parent") {
producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex ->
if (ex == null) {
runUnderTrace("producer callback") {}
} else {
runUnderTrace("producer exception: " + ex) {}
}
}
}
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.value() == greeting
received.key() == null
assertTraces(1) {
trace(0, 4) {
basicSpan(it, 0, "parent")
span(1) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
childOf span(0)
attributes {
}
}
span(2) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
childOf span(1)
attributes {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
basicSpan(it, 3, "producer callback", span(0))
}
}
cleanup:
producer.close()
container?.stop()
}
def "test spring kafka template produce and consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
@ -83,8 +171,13 @@ class KafkaClientTest extends AgentTestRunner {
when:
String greeting = "Hello Spring Kafka Sender!"
kafkaTemplate.send(SHARED_TOPIC, greeting)
runUnderTrace("parent") {
kafkaTemplate.send(SHARED_TOPIC, greeting).addCallback({
runUnderTrace("producer callback") {}
}, { ex ->
runUnderTrace("producer exception: " + ex) {}
})
}
then:
// check that the message was received
@ -93,26 +186,28 @@ class KafkaClientTest extends AgentTestRunner {
received.key() == null
assertTraces(1) {
trace(0, 2) {
span(0) {
trace(0, 4) {
basicSpan(it, 0, "parent")
span(1) {
operationName SHARED_TOPIC
spanKind PRODUCER
errored false
parent()
childOf span(0)
attributes {
}
}
span(1) {
span(2) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
childOf span(0)
childOf span(1)
attributes {
"partition" { it >= 0 }
"offset" 0
"record.queue_time_ms" { it >= 0 }
}
}
basicSpan(it, 3, "producer callback", span(0))
}
}
@ -121,7 +216,6 @@ class KafkaClientTest extends AgentTestRunner {
container?.stop()
}
def "test pass through tombstone"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
@ -160,15 +254,14 @@ class KafkaClientTest extends AgentTestRunner {
when:
kafkaTemplate.send(SHARED_TOPIC, null)
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.value() == null
received.key() == null
assertTraces(2) {
trace(0, 1) {
assertTraces(1) {
trace(0, 2) {
// PRODUCER span 0
span(0) {
operationName SHARED_TOPIC
@ -179,16 +272,12 @@ class KafkaClientTest extends AgentTestRunner {
"tombstone" true
}
}
}
// when a user consumes a tombstone a new trace is started
// because context can't be propagated safely
trace(1, 1) {
// CONSUMER span 0
span(0) {
span(1) {
operationName SHARED_TOPIC
spanKind CONSUMER
errored false
parent()
childOf span(0)
attributes {
"partition" { it >= 0 }
"offset" 0
@ -199,9 +288,6 @@ class KafkaClientTest extends AgentTestRunner {
}
}
def headers = received.headers()
!headers.iterator().hasNext()
cleanup:
producerFactory.stop()
container?.stop()
@ -270,7 +356,6 @@ class KafkaClientTest extends AgentTestRunner {
cleanup:
consumer.close()
producer.close()
}
@Unroll
@ -326,11 +411,9 @@ class KafkaClientTest extends AgentTestRunner {
container?.stop()
where:
value | expected
"false" | false
"true" | true
String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true
value | expected
"false" | false
"true" | true
}
def "should not read remote context when consuming messages if propagation is disabled"() {
@ -448,11 +531,10 @@ class KafkaClientTest extends AgentTestRunner {
ConfigUtils.updateConfig {
System.clearProperty("otel." + Config.KAFKA_CLIENT_PROPAGATION_ENABLED)
}
}
protected KafkaMessageListenerContainer<Object, Object> startConsumer(String groupId, records) {
// set up the Kafka consumer properties
// set up the Kafka consumer properties
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(groupId, "false", embeddedKafka)
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

View File

@ -0,0 +1,40 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
apply from: "$rootDir/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = 'org.scala-lang'
module = "scala-library"
versions = "[2.8.0,2.12.0)"
assertInverse = true
}
}
testSets {
slickTest {
filter {
// this is needed because "test.dependsOn slickTest", and so without this,
// running a single test in the default test set will fail
setFailOnNoMatchingTests(false)
}
}
}
compileSlickTestGroovy {
classpath += files(sourceSets.slickTest.scala.classesDirectory)
}
dependencies {
library group: 'org.scala-lang', name: 'scala-library', version: '2.8.0'
latestDepTestLibrary group: 'org.scala-lang', name: 'scala-library', version: '2.11.+'
slickTestImplementation project(':instrumentation:jdbc')
slickTestImplementation deps.scala
slickTestImplementation group: 'com.typesafe.slick', name: 'slick_2.11', version: '3.2.0'
slickTestImplementation group: 'com.h2database', name: 'h2', version: '1.4.197'
}
// Run Slick library tests along with the rest of tests
test.dependsOn slickTest

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.javaconcurrent;
package io.opentelemetry.instrumentation.auto.scalaconcurrent;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.nameMatches;
@ -32,14 +32,21 @@ import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.concurrent.forkjoin.ForkJoinTask;
@AutoService(Instrumenter.class)
public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrumentation {
public final class ScalaForkJoinPoolInstrumentation extends Instrumenter.Default {
public ScalaExecutorInstrumentation() {
super(EXEC_NAME + ".scala_fork_join");
public ScalaForkJoinPoolInstrumentation() {
super("java_concurrent", "scala_concurrent");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// This might need to be an extendsClass matcher...
return named("scala.concurrent.forkjoin.ForkJoinPool");
}
@Override
@ -53,15 +60,15 @@ public final class ScalaExecutorInstrumentation extends AbstractExecutorInstrume
transformers.put(
named("execute")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
transformers.put(
named("submit")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
transformers.put(
nameMatches("invoke")
.and(takesArgument(0, named(ScalaForkJoinTaskInstrumentation.TASK_CLASS_NAME))),
ScalaExecutorInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
ScalaForkJoinPoolInstrumentation.class.getName() + "$SetScalaForkJoinStateAdvice");
return transformers;
}

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.javaconcurrent;
package io.opentelemetry.instrumentation.auto.scalaconcurrent;
import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.extendsClass;
@ -54,7 +54,7 @@ public final class ScalaForkJoinTaskInstrumentation extends Instrumenter.Default
static final String TASK_CLASS_NAME = "scala.concurrent.forkjoin.ForkJoinTask";
public ScalaForkJoinTaskInstrumentation() {
super(AbstractExecutorInstrumentation.EXEC_NAME);
super("java_concurrent", "scala_concurrent");
}
@Override

View File

@ -15,16 +15,6 @@ muzzle {
}
}
compileTestJava {
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
}
compileJava {
sourceCompatibility = "1.7"
targetCompatibility = "1.7"
}
dependencies {
library group: 'com.sparkjava', name: 'spark-core', version: '2.3'

View File

@ -1,3 +1,7 @@
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {

View File

@ -16,18 +16,13 @@
package io.opentelemetry.instrumentation.auto.spring.scheduling;
import static io.opentelemetry.instrumentation.auto.spring.scheduling.SpringSchedulingDecorator.DECORATE;
import static io.opentelemetry.instrumentation.auto.spring.scheduling.SpringSchedulingDecorator.TRACER;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.Instrumenter;
import io.opentelemetry.trace.Span;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
@ -49,7 +44,7 @@ public final class SpringSchedulingInstrumentation extends Instrumenter.Default
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SpringSchedulingDecorator", getClass().getName() + "$RunnableWrapper",
packageName + ".SpringSchedulingDecorator", packageName + ".SpringSchedulingRunnableWrapper",
};
}
@ -64,43 +59,7 @@ public final class SpringSchedulingInstrumentation extends Instrumenter.Default
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onConstruction(
@Advice.Argument(value = 0, readOnly = false) Runnable runnable) {
runnable = RunnableWrapper.wrapIfNeeded(runnable);
}
}
public static class RunnableWrapper implements Runnable {
private final Runnable runnable;
private RunnableWrapper(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (runnable == null) {
return;
}
Span span = TRACER.spanBuilder(DECORATE.spanNameOnRun(runnable)).startSpan();
DECORATE.afterStart(span);
try (Scope scope = currentContextWith(span)) {
runnable.run();
} catch (Throwable throwable) {
DECORATE.onError(span, throwable);
throw throwable;
} finally {
DECORATE.beforeFinish(span);
span.end();
}
}
public static Runnable wrapIfNeeded(Runnable task) {
// We wrap only lambdas' anonymous classes and if given object has not already been wrapped.
// Anonymous classes have '/' in class name which is not allowed in 'normal' classes.
if (task instanceof RunnableWrapper) {
return task;
}
return new RunnableWrapper(task);
runnable = SpringSchedulingRunnableWrapper.wrapIfNeeded(runnable);
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.auto.spring.scheduling;
import static io.opentelemetry.instrumentation.auto.spring.scheduling.SpringSchedulingDecorator.TRACER;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
public class SpringSchedulingRunnableWrapper implements Runnable {
private final Runnable runnable;
private SpringSchedulingRunnableWrapper(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (runnable == null) {
return;
}
Span span =
TRACER.spanBuilder(SpringSchedulingDecorator.DECORATE.spanNameOnRun(runnable)).startSpan();
SpringSchedulingDecorator.DECORATE.afterStart(span);
try (Scope scope = currentContextWith(span)) {
runnable.run();
} catch (Throwable throwable) {
SpringSchedulingDecorator.DECORATE.onError(span, throwable);
throw throwable;
} finally {
SpringSchedulingDecorator.DECORATE.beforeFinish(span);
span.end();
}
}
public static Runnable wrapIfNeeded(Runnable task) {
// We wrap only lambdas' anonymous classes and if given object has not already been wrapped.
// Anonymous classes have '/' in class name which is not allowed in 'normal' classes.
if (task instanceof SpringSchedulingRunnableWrapper) {
return task;
}
return new SpringSchedulingRunnableWrapper(task);
}
}

View File

@ -15,6 +15,7 @@
*/
import io.opentelemetry.auto.test.AgentTestRunner
import java.util.concurrent.TimeUnit
import org.springframework.context.annotation.AnnotationConfigApplicationContext
class SpringSchedulingTest extends AgentTestRunner {
@ -63,4 +64,28 @@ class SpringSchedulingTest extends AgentTestRunner {
}
}
def "schedule lambda test"() {
setup:
def context = new AnnotationConfigApplicationContext(LambdaTaskConfig)
def configurer = context.getBean(LambdaTaskConfigurer)
configurer.singleUseLatch.await(2000, TimeUnit.MILLISECONDS)
expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
operationNameContains "LambdaTaskConfigurer\$\$Lambda\$"
parent()
errored false
attributes {
}
}
}
}
cleanup:
context.close()
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class LambdaTaskConfig {
@Bean
LambdaTaskConfigurer lambdaTaskConfigurer() {
return new LambdaTaskConfigurer();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.CountDownLatch;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Service;
@Service
public class LambdaTaskConfigurer implements SchedulingConfigurer {
public final CountDownLatch singleUseLatch = new CountDownLatch(1);
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.addFixedDelayTask(singleUseLatch::countDown, 500);
}
}

View File

@ -7,6 +7,16 @@ apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {
pass {
name = "webflux_5.0.0+_with_netty_0.8.0"
group = "org.springframework"
module = "spring-webflux"
versions = "[5.0.0.RELEASE,)"
assertInverse = true
extraDependency "io.projectreactor.netty:reactor-netty:0.8.0.RELEASE"
}
pass {
name = "webflux_5.0.0_with_ipc_0.7.0"
group = "org.springframework"
module = "spring-webflux"
versions = "[5.0.0.RELEASE,)"
@ -15,6 +25,15 @@ muzzle {
}
pass {
name = "netty_0.8.0+_with_spring-webflux:5.1.0"
group = "io.projectreactor.netty"
module = "reactor-netty"
versions = "[0.8.0.RELEASE,)"
extraDependency "org.springframework:spring-webflux:5.1.0.RELEASE"
}
pass {
name = "ipc_0.7.0+_with_spring-webflux:5.0.0"
group = "io.projectreactor.ipc"
module = "reactor-netty"
versions = "[0.7.0.RELEASE,)"

View File

@ -22,6 +22,9 @@ import io.opentelemetry.context.propagation.TextMapPropagator.Setter;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.net.URI;
import java.util.List;
import org.springframework.web.reactive.function.client.ClientRequest;
@ -32,6 +35,8 @@ public class SpringWebfluxHttpClientTracer
public static final SpringWebfluxHttpClientTracer TRACER = new SpringWebfluxHttpClientTracer();
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
public void onCancel(Span span) {
span.setAttribute("event", "cancelled");
span.setAttribute("message", "The subscription was cancelled");
@ -49,6 +54,15 @@ public class SpringWebfluxHttpClientTracer
@Override
protected Integer status(ClientResponse httpResponse) {
if (RAW_STATUS_CODE != null) {
// rawStatusCode() method was introduced in webflux 5.1
try {
return (int) RAW_STATUS_CODE.invokeExact(httpResponse);
} catch (Throwable ignored) {
}
}
// prior to webflux 5.1, the best we can get is HttpStatus enum, which only covers standard
// status codes
return httpResponse.statusCode().value();
}
@ -76,4 +90,16 @@ public class SpringWebfluxHttpClientTracer
public Tracer getTracer() {
return tracer;
}
// rawStatusCode() method was introduced in webflux 5.1
// prior to this method, the best we can get is HttpStatus enum, which only covers standard status
// codes (see usage above)
private static MethodHandle findRawStatusCode() {
try {
return MethodHandles.publicLookup()
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
} catch (IllegalAccessException | NoSuchMethodException e) {
return null;
}
}
}

View File

@ -34,6 +34,10 @@ import net.bytebuddy.matcher.ElementMatcher;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
/**
* This instrumentation adds the HandlerMappingResourceNameFilter definition to the spring context
* When the context is created, the filter will be added to the beginning of the filter chain
*/
@AutoService(Instrumenter.class)
public class WebApplicationContextInstrumentation extends Instrumenter.Default {
public WebApplicationContextInstrumentation() {

View File

@ -16,6 +16,7 @@
package test.filter
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.ExceptionHandler
@ -25,45 +26,49 @@ import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.ResponseBody
import org.springframework.web.servlet.view.RedirectView
/**
* None of the methods in this controller should be called because they are intercepted
* by the filter
*/
@Controller
class TestController {
@RequestMapping("/success")
@ResponseBody
String success() {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@RequestMapping("/query")
@ResponseBody
String query_param(@RequestParam("some") String param) {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@RequestMapping("/path/{id}/param")
@ResponseBody
String path_param(@PathVariable Integer id) {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@RequestMapping("/redirect")
@ResponseBody
RedirectView redirect() {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@RequestMapping("/error-status")
ResponseEntity error() {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@RequestMapping("/exception")
ResponseEntity exception() {
// stub to test that route is captured when intercepted by filter
throw new Exception("This should not be called")
}
@ExceptionHandler
ResponseEntity handleException(Throwable throwable) {
// stub to test that route is captured when intercepted by filter
new ResponseEntity(throwable.message, HttpStatus.INTERNAL_SERVER_ERROR)
}
}

View File

@ -105,6 +105,7 @@ public class AgentInstaller {
new AgentBuilder.Default()
.disableClassFormatChanges()
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(AgentBuilder.RedefinitionStrategy.DiscoveryStrategy.Reiterating.INSTANCE)
.with(AgentBuilder.DescriptionStrategy.Default.POOL_ONLY)
.with(AgentTooling.poolStrategy())
.with(new ClassLoadListener())

View File

@ -67,6 +67,9 @@ public final class ClassLoaderMatcher {
// Don't skip bootstrap loader
return false;
}
if (canSkipClassLoaderByName(cl)) {
return true;
}
Boolean v = skipCache.getIfPresent(cl);
if (v != null) {
return v;
@ -79,12 +82,12 @@ public final class ClassLoaderMatcher {
// and we don't want to introduce the concept of the tooling code depending on whether or not
// a particular instrumentation is active (mainly because this particular use case doesn't
// seem to justify introducing either of these new concepts)
v = shouldSkipClass(cl) || !delegatesToBootstrap(cl);
v = !delegatesToBootstrap(cl);
skipCache.put(cl, v);
return v;
}
private static boolean shouldSkipClass(ClassLoader loader) {
private static boolean canSkipClassLoaderByName(ClassLoader loader) {
switch (loader.getClass().getName()) {
case "org.codehaus.groovy.runtime.callsite.CallSiteClassLoader":
case "sun.reflect.DelegatingClassLoader":

View File

@ -30,6 +30,7 @@ import java.util.Set;
import net.bytebuddy.jar.asm.ClassReader;
import net.bytebuddy.jar.asm.ClassVisitor;
import net.bytebuddy.jar.asm.FieldVisitor;
import net.bytebuddy.jar.asm.Handle;
import net.bytebuddy.jar.asm.Label;
import net.bytebuddy.jar.asm.MethodVisitor;
import net.bytebuddy.jar.asm.Opcodes;
@ -374,6 +375,49 @@ public class ReferenceCreator extends ClassVisitor {
super.visitMethodInsn(opcode, owner, name, descriptor, isInterface);
}
@Override
public void visitTypeInsn(int opcode, String type) {
Type typeObj = underlyingType(Type.getObjectType(type));
if (typeObj.getSort() == Type.OBJECT) {
addReference(
new Reference.Builder(typeObj.getInternalName())
.withSource(refSourceClassName, currentLineNumber)
.withFlag(computeMinimumClassAccess(refSourceType, typeObj))
.build());
}
super.visitTypeInsn(opcode, type);
}
@Override
public void visitInvokeDynamicInsn(
String name,
String descriptor,
Handle bootstrapMethodHandle,
Object... bootstrapMethodArguments) {
// This part might be unnecessary...
addReference(
new Reference.Builder(bootstrapMethodHandle.getOwner())
.withSource(refSourceClassName, currentLineNumber)
.withFlag(
computeMinimumClassAccess(
refSourceType, Type.getObjectType(bootstrapMethodHandle.getOwner())))
.build());
for (Object arg : bootstrapMethodArguments) {
if (arg instanceof Handle) {
Handle handle = (Handle) arg;
addReference(
new Reference.Builder(handle.getOwner())
.withSource(refSourceClassName, currentLineNumber)
.withFlag(
computeMinimumClassAccess(
refSourceType, Type.getObjectType(handle.getOwner())))
.build());
}
}
super.visitInvokeDynamicInsn(
name, descriptor, bootstrapMethodHandle, bootstrapMethodArguments);
}
@Override
public void visitLdcInsn(Object value) {
if (value instanceof Type) {

View File

@ -51,7 +51,6 @@ include ':utils:test-utils'
// smoke tests
include ':smoke-tests'
// instrumentation:
include ':instrumentation:akka-context-propagation-2.5'
include ':instrumentation:akka-http-10.0'
include ':instrumentation:apache-httpasyncclient-4.0'
@ -86,6 +85,7 @@ include ':instrumentation:google-http-client-1.19'
include ':instrumentation:grizzly-2.0'
include ':instrumentation:grizzly-client-1.9'
include ':instrumentation:grpc-1.5'
include ':instrumentation:guava-10.0'
include ':instrumentation:hibernate:hibernate-3.3'
include ':instrumentation:hibernate:hibernate-4.0'
include ':instrumentation:hibernate:hibernate-4.3'
@ -98,7 +98,6 @@ include ':instrumentation:java-classloader:osgi-testing'
include ':instrumentation:java-classloader:tomcat-testing'
include ':instrumentation:java-concurrent'
include ':instrumentation:java-concurrent:kotlin-testing'
include ':instrumentation:java-concurrent:scala-testing'
include ':instrumentation:jaxrs:jaxrs-1.0'
include ':instrumentation:jaxrs:jaxrs-2.0'
include ':instrumentation:jaxrs:jaxrs-2.0:jaxrs-2.0-jersey-2.0'
@ -148,6 +147,8 @@ include ':instrumentation:rediscala-1.8'
include ':instrumentation:redisson-3.0'
include ':instrumentation:rmi'
include ':instrumentation:rxjava-1.0'
include ':instrumentation:rxjava-1.0'
include ':instrumentation:scala-concurrent'
include ':instrumentation:servlet:glassfish-testing'
include ':instrumentation:servlet:servlet-common'
include ':instrumentation:servlet:servlet-2.2'

View File

@ -0,0 +1,157 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.test.base
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.auto.test.AgentTestRunner
// TODO: add a test for a longer chain of promises
abstract class AbstractPromiseTest<P, M> extends AgentTestRunner {
abstract P newPromise()
abstract M map(P promise, Closure<String> callback)
abstract void onComplete(M promise, Closure callback)
abstract void complete(P promise, boolean value)
abstract Boolean get(P promise)
def "test call with parent"() {
setup:
def promise = newPromise()
when:
runUnderTrace("parent") {
def mapped = map(promise) { "$it" }
onComplete(mapped) {
assert it == "$value"
runUnderTrace("callback") {}
}
runUnderTrace("other") {
complete(promise, value)
}
}
then:
get(promise) == value
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "other", it.span(0))
basicSpan(it, 2, "callback", it.span(0))
}
}
where:
value << [true, false]
}
def "test call with parent delayed complete"() {
setup:
def promise = newPromise()
when:
runUnderTrace("parent") {
def mapped = map(promise) { "$it" }
onComplete(mapped) {
assert it == "$value"
runUnderTrace("callback") {}
}
}
runUnderTrace("other") {
complete(promise, value)
}
then:
get(promise) == value
assertTraces(2) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "callback", span(0))
}
trace(1, 1) {
basicSpan(it, 0, "other")
}
}
where:
value << [true, false]
}
def "test call with parent complete separate thread"() {
setup:
final promise = newPromise()
when:
runUnderTrace("parent") {
def mapped = map(promise) { "$it" }
onComplete(mapped) {
assert it == "$value"
runUnderTrace("callback") {}
}
Thread.start {
complete(promise, value)
}.join()
}
then:
get(promise) == value
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "callback", it.span(0))
}
}
where:
value << [true, false]
}
def "test call with no parent"() {
setup:
def promise = newPromise()
when:
def mapped = map(promise) { "$it" }
onComplete(mapped) {
assert it == "$value"
runUnderTrace("callback") {}
}
runUnderTrace("other") {
complete(promise, value)
}
then:
get(promise) == value
assertTraces(1) {
trace(0, 2) {
// TODO: is this really the behavior we want?
basicSpan(it, 0, "other")
basicSpan(it, 1, "callback", it.span(0))
}
}
where:
value << [true, false]
}
}

View File

@ -22,6 +22,7 @@ import static muzzle.TestClasses.MethodBodyAdvice
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.javaagent.tooling.muzzle.Reference
import io.opentelemetry.javaagent.tooling.muzzle.ReferenceCreator
import spock.lang.Ignore
class ReferenceCreatorTest extends AgentTestRunner {
def "method body creates references"() {
@ -81,6 +82,25 @@ class ReferenceCreatorTest extends AgentTestRunner {
references.get('muzzle.TestClasses$MethodBodyAdvice$A') != null
}
def "instanceof creates references"() {
setup:
Map<String, Reference> references = ReferenceCreator.createReferencesFrom(TestClasses.InstanceofAdvice.getName(), this.getClass().getClassLoader())
expect:
references.get('muzzle.TestClasses$MethodBodyAdvice$A') != null
}
// TODO: remove ignore when we drop java 7 support.
@Ignore
def "invokedynamic creates references"() {
setup:
Map<String, Reference> references = ReferenceCreator.createReferencesFrom(TestClasses.InDyAdvice.getName(), this.getClass().getClassLoader())
expect:
references.get('muzzle.TestClasses$MethodBodyAdvice$SomeImplementation') != null
references.get('muzzle.TestClasses$MethodBodyAdvice$B') != null
}
private static Reference.Method findMethod(Set<Reference.Method> methods, String methodName, String methodDesc) {
for (Reference.Method method : methods) {
if (method == new Reference.Method(methodName, methodDesc)) {

View File

@ -36,7 +36,7 @@ public class TestClasses {
public static class A {
public B b = new B();
protected Object protectedField = null;
private Object privateField = null;
private final Object privateField = null;
public static B staticB = new B();
}
@ -90,4 +90,19 @@ public class TestClasses {
MethodBodyAdvice.A.class.getName();
}
}
public static class InstanceofAdvice {
public static boolean instanceofMethod(Object a) {
return a instanceof MethodBodyAdvice.A;
}
}
// TODO Can't test this until java 7 is dropped.
public static class InDyAdvice {
// public static MethodBodyAdvice.SomeInterface indyMethod(
// final MethodBodyAdvice.SomeImplementation a) {
// Runnable aStaticMethod = MethodBodyAdvice.B::aStaticMethod;
// return a::someMethod;
// }
}
}