Instrument HystrixCommand and HystrixThreadPool
This commit is contained in:
parent
26d36287c2
commit
be34eaf032
|
@ -0,0 +1,29 @@
|
|||
apply plugin: 'version-scan'
|
||||
|
||||
versionScan {
|
||||
group = "com.netflix.hystrix"
|
||||
module = 'hystrix-core'
|
||||
versions = "[1.3.15,)"
|
||||
scanMethods = true
|
||||
verifyPresent = [
|
||||
"com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler\$ThreadPoolWorker": "schedule",
|
||||
]
|
||||
}
|
||||
|
||||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
|
||||
dependencies {
|
||||
// compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.5.12'
|
||||
|
||||
compile project(':dd-trace-ot')
|
||||
compile project(':dd-java-agent:agent-tooling')
|
||||
|
||||
compile deps.bytebuddy
|
||||
compile deps.opentracing
|
||||
compile deps.autoservice
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
|
||||
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
|
||||
testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
package datadog.trace.instrumentation.hystrix;
|
||||
|
||||
import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.not;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.DDAdvice;
|
||||
import datadog.trace.agent.tooling.DDTransformers;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.tag.Tags;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import net.bytebuddy.agent.builder.AgentBuilder;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
|
||||
@AutoService(Instrumenter.class)
|
||||
public class HystrixCommandInstrumentation extends Instrumenter.Configurable {
|
||||
|
||||
public HystrixCommandInstrumentation() {
|
||||
super("hystrix");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean defaultEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
||||
return agentBuilder
|
||||
.type(not(isInterface()).and(hasSuperType(named("com.netflix.hystrix.HystrixCommand"))))
|
||||
// Not adding a version restriction because this should work with any version and add some benefit.
|
||||
.transform(DDTransformers.defaultTransformers())
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod().and(named("run").or(named("getFallback"))),
|
||||
TraceAdvice.class.getName()))
|
||||
.asDecorator();
|
||||
}
|
||||
|
||||
public static class TraceAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope startSpan(@Advice.Origin final Method method) {
|
||||
final Class<?> declaringClass = method.getDeclaringClass();
|
||||
String className = declaringClass.getSimpleName();
|
||||
if (className.isEmpty()) {
|
||||
className = declaringClass.getName();
|
||||
if (declaringClass.getPackage() != null) {
|
||||
final String pkgName = declaringClass.getPackage().getName();
|
||||
if (!pkgName.isEmpty()) {
|
||||
className = declaringClass.getName().replace(pkgName, "").substring(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
final String operationName = className + "." + method.getName();
|
||||
|
||||
return GlobalTracer.get().buildSpan(operationName).startActive(true);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(
|
||||
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
|
||||
if (throwable != null) {
|
||||
final Span span = scope.span();
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
|
||||
}
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package datadog.trace.instrumentation.hystrix;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.DDAdvice;
|
||||
import datadog.trace.agent.tooling.DDTransformers;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import net.bytebuddy.agent.builder.AgentBuilder;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
|
||||
@AutoService(Instrumenter.class)
|
||||
public class HystrixThreadPoolInstrumentation extends Instrumenter.Configurable {
|
||||
|
||||
public HystrixThreadPoolInstrumentation() {
|
||||
super("hystrix");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean defaultEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentBuilder apply(final AgentBuilder agentBuilder) {
|
||||
return agentBuilder
|
||||
.type(
|
||||
named(
|
||||
"com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker"))
|
||||
// Not adding check for classes on the classpath because this is the only class we need.
|
||||
.transform(DDTransformers.defaultTransformers())
|
||||
.transform(
|
||||
DDAdvice.create()
|
||||
.advice(
|
||||
isMethod().and(named("schedule")).and(takesArguments(1)),
|
||||
EnableAsyncAdvice.class.getName()))
|
||||
.asDecorator();
|
||||
}
|
||||
|
||||
public static class EnableAsyncAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static boolean enableAsyncTracking() {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope instanceof TraceScope) {
|
||||
if (!((TraceScope) scope).isAsyncPropagating()) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void disableAsyncTracking(@Advice.Enter final boolean wasEnabled) {
|
||||
if (wasEnabled) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
import com.netflix.hystrix.HystrixCommand
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import spock.lang.Unroll
|
||||
|
||||
import java.util.concurrent.BlockingQueue
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
|
||||
import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey
|
||||
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
|
||||
import static datadog.trace.agent.test.TestUtils.runUnderTrace
|
||||
|
||||
class HystrixTest extends AgentTestRunner {
|
||||
static {
|
||||
System.setProperty("dd.integration.hystrix.enabled", "true")
|
||||
// Uncomment for debugging:
|
||||
// System.setProperty("hystrix.command.default.execution.timeout.enabled", "false")
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "test command #action"() {
|
||||
setup:
|
||||
def command = new HystrixCommand(asKey("ExampleGroup")) {
|
||||
@Override
|
||||
protected Object run() throws Exception {
|
||||
return tracedMethod()
|
||||
}
|
||||
|
||||
@Trace
|
||||
private String tracedMethod() {
|
||||
return "Hello!"
|
||||
}
|
||||
}
|
||||
def result = runUnderTrace("parent") {
|
||||
operation(command)
|
||||
}
|
||||
expect:
|
||||
result == "Hello!"
|
||||
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
span(0) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "parent"
|
||||
resourceName "parent"
|
||||
spanType null
|
||||
parent()
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "HystrixTest\$1.run"
|
||||
resourceName "HystrixTest\$1.run"
|
||||
spanType null
|
||||
childOf span(0)
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "HystrixTest\$1.tracedMethod"
|
||||
resourceName "HystrixTest\$1.tracedMethod"
|
||||
spanType null
|
||||
childOf span(1)
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
action | operation
|
||||
"execute" | { HystrixCommand cmd -> cmd.execute() }
|
||||
"queue" | { HystrixCommand cmd -> cmd.queue().get() }
|
||||
"observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() }
|
||||
"observe" | { HystrixCommand cmd ->
|
||||
BlockingQueue queue = new LinkedBlockingQueue()
|
||||
cmd.observe().subscribe { next ->
|
||||
queue.put(next)
|
||||
}
|
||||
queue.poll()
|
||||
}
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "test command #action fallback"() {
|
||||
setup:
|
||||
def command = new HystrixCommand(asKey("ExampleGroup")) {
|
||||
@Override
|
||||
protected Object run() throws Exception {
|
||||
throw new IllegalArgumentException()
|
||||
}
|
||||
|
||||
protected String getFallback() {
|
||||
return "Fallback!"
|
||||
}
|
||||
}
|
||||
def result = runUnderTrace("parent") {
|
||||
operation(command)
|
||||
}
|
||||
expect:
|
||||
result == "Fallback!"
|
||||
|
||||
assertTraces(TEST_WRITER, 1) {
|
||||
trace(0, 3) {
|
||||
span(0) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "parent"
|
||||
resourceName "parent"
|
||||
spanType null
|
||||
parent()
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "HystrixTest\$2.getFallback"
|
||||
resourceName "HystrixTest\$2.getFallback"
|
||||
spanType null
|
||||
childOf span(0)
|
||||
errored false
|
||||
tags {
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
serviceName "unnamed-java-app"
|
||||
operationName "HystrixTest\$2.run"
|
||||
resourceName "HystrixTest\$2.run"
|
||||
spanType null
|
||||
childOf span(0)
|
||||
errored true
|
||||
tags {
|
||||
errorTags(IllegalArgumentException)
|
||||
defaultTags()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
action | operation
|
||||
"execute" | { HystrixCommand cmd -> cmd.execute() }
|
||||
"queue" | { HystrixCommand cmd -> cmd.queue().get() }
|
||||
"observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() }
|
||||
"observe" | { HystrixCommand cmd ->
|
||||
BlockingQueue queue = new LinkedBlockingQueue()
|
||||
cmd.observe().subscribe { next ->
|
||||
queue.put(next)
|
||||
}
|
||||
queue.poll()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package datadog.trace.agent.test
|
||||
|
||||
import datadog.trace.common.writer.ListWriter
|
||||
|
||||
import static datadog.trace.agent.test.TraceAssert.assertTrace
|
||||
|
||||
class ListWriterAssert {
|
||||
private final ListWriter writer
|
||||
private final int size
|
||||
private final Set<Integer> assertedIndexes = new HashSet<>()
|
||||
|
||||
private ListWriterAssert(writer) {
|
||||
this.writer = writer
|
||||
size = writer.size()
|
||||
}
|
||||
|
||||
static ListWriterAssert assertTraces(ListWriter writer, int expectedSize,
|
||||
@DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
writer.waitForTraces(expectedSize)
|
||||
assert writer.size() == expectedSize
|
||||
def asserter = new ListWriterAssert(writer)
|
||||
def clone = (Closure) spec.clone()
|
||||
clone.delegate = asserter
|
||||
clone.resolveStrategy = Closure.DELEGATE_FIRST
|
||||
clone(asserter)
|
||||
asserter.assertTracesAllVerified()
|
||||
asserter
|
||||
}
|
||||
|
||||
TraceAssert trace(int index, int expectedSize,
|
||||
@DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
if (index >= size) {
|
||||
throw new ArrayIndexOutOfBoundsException(index)
|
||||
}
|
||||
if (writer.size() != size) {
|
||||
throw new ConcurrentModificationException("ListWriter modified during assertion")
|
||||
}
|
||||
assertedIndexes.add(index)
|
||||
assertTrace(writer.get(index), expectedSize, spec)
|
||||
}
|
||||
|
||||
void assertTracesAllVerified() {
|
||||
assert assertedIndexes.size() == size
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
package datadog.trace.agent.test
|
||||
|
||||
import datadog.opentracing.DDSpan
|
||||
|
||||
import static datadog.trace.agent.test.TagsAssert.assertTags
|
||||
|
||||
class SpanAssert {
|
||||
private final DDSpan span
|
||||
|
||||
private SpanAssert(span) {
|
||||
this.span = span
|
||||
}
|
||||
|
||||
static SpanAssert assertSpan(DDSpan span,
|
||||
@DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
def asserter = new SpanAssert(span)
|
||||
def clone = (Closure) spec.clone()
|
||||
clone.delegate = asserter
|
||||
clone.resolveStrategy = Closure.DELEGATE_FIRST
|
||||
clone(asserter)
|
||||
asserter
|
||||
}
|
||||
|
||||
def serviceName(String name) {
|
||||
assert span.serviceName == name
|
||||
}
|
||||
|
||||
def operationName(String name) {
|
||||
assert span.operationName == name
|
||||
}
|
||||
|
||||
def resourceName(String name) {
|
||||
assert span.resourceName == name
|
||||
}
|
||||
|
||||
def spanType(String type) {
|
||||
assert span.spanType == type
|
||||
}
|
||||
|
||||
def parent() {
|
||||
assert span.parentId == 0
|
||||
}
|
||||
|
||||
def childOf(DDSpan parent) {
|
||||
assert span.parentId == parent.spanId
|
||||
assert span.traceId == parent.traceId
|
||||
}
|
||||
|
||||
def errored(boolean errored) {
|
||||
assert span.isError() == errored
|
||||
}
|
||||
|
||||
def tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
return assertTags(span, spec)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package datadog.trace.agent.test
|
||||
|
||||
import datadog.opentracing.DDSpan
|
||||
|
||||
class TagsAssert {
|
||||
private final Map<String, Object> tags
|
||||
private final Set<String> assertedTags = new HashSet<>()
|
||||
|
||||
private TagsAssert(DDSpan span) {
|
||||
this.tags = span.tags
|
||||
}
|
||||
|
||||
static TagsAssert assertTags(DDSpan span,
|
||||
@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
def asserter = new TagsAssert(span)
|
||||
def clone = (Closure) spec.clone()
|
||||
clone.delegate = asserter
|
||||
clone.resolveStrategy = Closure.DELEGATE_FIRST
|
||||
clone(asserter)
|
||||
asserter.assertTracesAllVerified()
|
||||
asserter
|
||||
}
|
||||
|
||||
def defaultTags() {
|
||||
assertedTags.add("thread.name")
|
||||
assertedTags.add("thread.id")
|
||||
|
||||
tags["thread.name"] != null
|
||||
tags["thread.id"] != null
|
||||
}
|
||||
|
||||
def errorTags(Class<Throwable> errorType) {
|
||||
assertedTags.add("error")
|
||||
assertedTags.add("error.type")
|
||||
assertedTags.add("error.stack")
|
||||
|
||||
tags["error"] == true
|
||||
tags["error.type"] == errorType
|
||||
tags["error.stack"] instanceof String
|
||||
}
|
||||
|
||||
def methodMissing(String name, args) {
|
||||
if (args.length > 1) {
|
||||
throw new IllegalArgumentException(args)
|
||||
}
|
||||
assert tags[name] == args[0]
|
||||
}
|
||||
|
||||
void assertTracesAllVerified() {
|
||||
assert tags.keySet() == assertedTags
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package datadog.trace.agent.test
|
||||
|
||||
import datadog.opentracing.DDSpan
|
||||
|
||||
import static datadog.trace.agent.test.SpanAssert.assertSpan
|
||||
|
||||
class TraceAssert {
|
||||
private final List<DDSpan> trace
|
||||
private final int size
|
||||
private final Set<Integer> assertedIndexes = new HashSet<>()
|
||||
|
||||
private TraceAssert(trace) {
|
||||
this.trace = trace
|
||||
size = trace.size()
|
||||
}
|
||||
|
||||
static TraceAssert assertTrace(List<DDSpan> trace, int expectedSize,
|
||||
@DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
assert trace.size() == expectedSize
|
||||
def asserter = new TraceAssert(trace)
|
||||
def clone = (Closure) spec.clone()
|
||||
clone.delegate = asserter
|
||||
clone.resolveStrategy = Closure.DELEGATE_FIRST
|
||||
clone(asserter)
|
||||
asserter.assertTracesAllVerified()
|
||||
asserter
|
||||
}
|
||||
|
||||
DDSpan span(int index) {
|
||||
trace.get(index)
|
||||
}
|
||||
|
||||
SpanAssert span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
|
||||
if (index >= size) {
|
||||
throw new ArrayIndexOutOfBoundsException(index)
|
||||
}
|
||||
if (trace.size() != size) {
|
||||
throw new ConcurrentModificationException("Trace modified during assertion")
|
||||
}
|
||||
assertedIndexes.add(index)
|
||||
assertSpan(trace.get(index), spec)
|
||||
}
|
||||
|
||||
void assertTracesAllVerified() {
|
||||
assert assertedIndexes.size() == size
|
||||
}
|
||||
}
|
|
@ -11,6 +11,8 @@ dependencies {
|
|||
compile project(':dd-trace-ot')
|
||||
compile project(':dd-java-agent:agent-tooling')
|
||||
|
||||
compile deps.groovy
|
||||
|
||||
// test instrumenting java 1.1 bytecode
|
||||
testCompile group: 'net.sf.jt400', name: 'jt400', version: '6.1'
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ include ':dd-java-agent:instrumentation:apache-httpclient-4.3'
|
|||
include ':dd-java-agent:instrumentation:aws-sdk'
|
||||
include ':dd-java-agent:instrumentation:classloaders'
|
||||
include ':dd-java-agent:instrumentation:datastax-cassandra-3.2'
|
||||
include ':dd-java-agent:instrumentation:hystrix-1.3.15'
|
||||
include ':dd-java-agent:instrumentation:jax-rs-annotations'
|
||||
include ':dd-java-agent:instrumentation:jax-rs-client'
|
||||
include ':dd-java-agent:instrumentation:java-concurrent'
|
||||
|
|
Loading…
Reference in New Issue