From 6c445ad03038a55ded4f1d2a7a2ce5c342d8d24b Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Wed, 11 Sep 2019 19:23:46 -0400 Subject: [PATCH] Extract RxJava instrumentation from Hystrix and add to Couchbase --- .../couchbase-2.0/couchbase-2.0.gradle | 4 +- .../CouchbaseBucketInstrumentation.java | 100 +--------- .../CouchbaseClusterInstrumentation.java | 96 +-------- .../client/CouchbaseOnSubscribe.java | 36 ++++ .../groovy/CouchbaseAsyncClientTest.groovy | 22 ++- .../test/groovy/CouchbaseClientTest.groovy | 39 ++++ .../hystrix-1.4/hystrix-1.4.gradle | 3 + .../hystrix/HystrixInstrumentation.java | 187 ++---------------- .../instrumentation/rxjava-1/rxjava-1.gradle | 13 ++ .../rxjava/SpanFinishingSubscription.java | 31 +++ .../rxjava/TracedOnSubscribe.java | 58 ++++++ .../rxjava/TracedSubscriber.java | 109 ++++++++++ .../src/main/java/rx/DDTracingUtil.java | 0 settings.gradle | 1 + 14 files changed, 337 insertions(+), 362 deletions(-) create mode 100644 dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java rename dd-java-agent/instrumentation/{hystrix-1.4 => rxjava-1}/src/main/java/rx/DDTracingUtil.java (100%) diff --git a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle index 9586923606..0aeb5d3960 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle +++ b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle @@ -44,8 +44,10 @@ muzzle { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0' - + testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19' testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE' diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java index 50ed987a31..770bcc0df2 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @@ -47,13 +38,15 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @@ -76,94 +69,13 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @Advice.Enter final int callDepth, @Advice.Origin final Method method, @Advice.FieldValue("bucket") final String bucket, - @Advice.AllArguments final Object[] args, @Advice.Return(readOnly = false) Observable result) { if (callDepth > 0) { return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, bucket, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - public static class TraceSpanStart implements Action0 { - private final Method method; - private final String bucket; - private final AtomicReference spanRef; - - public TraceSpanStart( - final Method method, final String bucket, final AtomicReference spanRef) { - this.method = method; - this.bucket = bucket; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .withTag("bucket", bucket) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(span)); - } - } - } - - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java index b5549d014a..e9b555e641 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseClusterInstrumentation extends Instrumenter.Default { @@ -47,13 +38,15 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @@ -80,84 +73,7 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default { return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - - public static class TraceSpanStart implements Action0 { - private final Method method; - private final AtomicReference spanRef; - - public TraceSpanStart(final Method method, final AtomicReference spanRef) { - this.method = method; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(scope.span())); - } - } - } - - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, null)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java new file mode 100644 index 0000000000..b20fcee0ab --- /dev/null +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java @@ -0,0 +1,36 @@ +package datadog.trace.instrumentation.couchbase.client; + +import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; + +import datadog.trace.api.DDTags; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; +import io.opentracing.Span; +import java.lang.reflect.Method; +import rx.Observable; + +public class CouchbaseOnSubscribe extends TracedOnSubscribe { + private final String resourceName; + private final String bucket; + + public CouchbaseOnSubscribe( + final Observable originalObservable, final Method method, final String bucket) { + super(originalObservable, "couchbase.call", DECORATE); + + final Class declaringClass = method.getDeclaringClass(); + final String className = + declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); + resourceName = className + "." + method.getName(); + this.bucket = bucket; + } + + @Override + protected void afterStart(final Span span) { + super.afterStart(span); + + span.setTag(DDTags.RESOURCE_NAME, resourceName); + + if (bucket != null) { + span.setTag("bucket", bucket); + } + } +} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy index 40e6d85998..c8b8e3ca4c 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -42,6 +42,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) }) }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency } then: @@ -51,8 +53,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { trace(0, 3) { basicSpan(it, 0, "someTrace") - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2)) } } @@ -81,6 +83,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { }) }) }) + + blockUntilChildSpansFinished(3) // Improve span ordering consistency } // Create a JSON document and store it with the ID "helloworld" @@ -91,10 +95,10 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { assertTraces(1) { trace(0, 4) { basicSpan(it, 0, "someTrace") - - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) - assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name(), span(0)) + + assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2)) } } @@ -121,6 +125,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { .single() .subscribe({ row -> queryResult.set(row.value()) }) }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency } then: @@ -130,8 +136,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { trace(0, 3) { basicSpan(it, 0, "someTrace") - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.query", bucketSettings.name(), span(2)) } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 604f79f1e5..abff11344b 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -4,6 +4,9 @@ import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.query.N1qlQuery import util.AbstractCouchbaseTest +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + class CouchbaseClientTest extends AbstractCouchbaseTest { def "test hasBucket #type"() { @@ -57,6 +60,42 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { type = bucketSettings.type().name() } + def "test upsert and get #type under trace"() { + when: + // Connect to the bucket and open it + Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) + + // Create a JSON document and store it with the ID "helloworld" + JsonObject content = JsonObject.create().put("hello", "world") + + def inserted + def found + + runUnderTrace("someTrace") { + inserted = bkt.upsert(JsonDocument.create("helloworld", content)) + found = bkt.get("helloworld") + } + + then: + found == inserted + found.content().getString("hello") == "world" + + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0)) + } + } + + where: + manager | cluster | bucketSettings + couchbaseManager | couchbaseCluster | bucketCouchbase + memcacheManager | memcacheCluster | bucketMemcache + + type = bucketSettings.type().name() + } + def "test query"() { setup: Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) diff --git a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle index 60ae7f6346..a2b6b8a474 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle +++ b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle @@ -17,11 +17,14 @@ testSets { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:trace-annotation') + testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java index dada399519..897ccce581 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java +++ b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java @@ -8,23 +8,15 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import com.google.auto.service.AutoService; import com.netflix.hystrix.HystrixInvokableInfo; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.context.TraceScope; -import io.opentracing.Scope; -import io.opentracing.ScopeManager; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import rx.DDTracingUtil; import rx.Observable; -import rx.Subscriber; -import rx.Subscription; @AutoService(Instrumenter.class) public class HystrixInstrumentation extends Instrumenter.Default { @@ -47,10 +39,11 @@ public class HystrixInstrumentation extends Instrumenter.Default { return new String[] { "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".HystrixDecorator", - packageName + ".HystrixInstrumentation$SpanFinishingSubscription", - packageName + ".HystrixInstrumentation$TracedSubscriber", - packageName + ".HystrixInstrumentation$TracedOnSubscribe", + packageName + ".HystrixInstrumentation$HystrixOnSubscribe", }; } @@ -73,11 +66,8 @@ public class HystrixInstrumentation extends Instrumenter.Default { @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "execute")); } } @@ -88,171 +78,30 @@ public class HystrixInstrumentation extends Instrumenter.Default { @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "fallback")); } } - public static class TracedOnSubscribe implements Observable.OnSubscribe { - - private final Observable.OnSubscribe delegate; + public static class HystrixOnSubscribe extends TracedOnSubscribe { private final HystrixInvokableInfo command; private final String methodName; - private final TraceScope.Continuation continuation; - public TracedOnSubscribe( - final Observable.OnSubscribe delegate, + public HystrixOnSubscribe( + final Observable originalObservable, final HystrixInvokableInfo command, - final String methodName, - final Scope parentScope) { - this.delegate = delegate; + final String methodName) { + super(originalObservable, OPERATION_NAME, DECORATE); + this.command = command; this.methodName = methodName; - continuation = - parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; } @Override - public void call(final Subscriber subscriber) { - final Tracer tracer = GlobalTracer.get(); - final Span span; // span finished by TracedSubscriber - if (continuation != null) { - try (final TraceScope scope = continuation.activate()) { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - } else { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - DECORATE.afterStart(span); + protected void afterStart(final Span span) { + super.afterStart(span); + DECORATE.onCommand(span, command, methodName); - - try (final Scope scope = tracer.scopeManager().activate(span, false)) { - if (!((TraceScope) scope).isAsyncPropagating()) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.call(new TracedSubscriber(span, subscriber)); - } - } - } - - public static class TracedSubscriber extends Subscriber { - - private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); - private final AtomicReference spanRef; - private final Subscriber delegate; - - public TracedSubscriber(final Span span, final Subscriber delegate) { - spanRef = new AtomicReference<>(span); - this.delegate = delegate; - final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef); - delegate.add(subscription); - } - - @Override - public void onStart() { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onStart(); - } - } else { - delegate.onStart(); - } - } - - @Override - public void onNext(final T value) { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onNext(value); - } catch (final Throwable e) { - onError(e); - } - } else { - delegate.onNext(value); - } - } - - @Override - public void onCompleted() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - boolean errored = false; - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onCompleted(); - } catch (final Throwable e) { - // Repopulate the spanRef for onError - spanRef.compareAndSet(null, span); - onError(e); - errored = true; - } finally { - // finish called by onError, so don't finish again. - if (!errored) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } else { - delegate.onCompleted(); - } - } - - @Override - public void onError(final Throwable e) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - DECORATE.onError(span, e); - delegate.onError(e); - } catch (final Throwable e2) { - DECORATE.onError(span, e2); - throw e2; - } finally { - DECORATE.beforeFinish(span); - span.finish(); - } - } else { - delegate.onError(e); - } - } - } - - public static class SpanFinishingSubscription implements Subscription { - - private final AtomicReference spanRef; - - public SpanFinishingSubscription(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void unsubscribe() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - - @Override - public boolean isUnsubscribed() { - return spanRef.get() == null; } } } diff --git a/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle new file mode 100644 index 0000000000..d9e224bf77 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle @@ -0,0 +1,13 @@ +apply from: "${rootDir}/gradle/java.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java new file mode 100644 index 0000000000..888d8ded9f --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java @@ -0,0 +1,31 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import io.opentracing.Span; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +public class SpanFinishingSubscription implements Subscription { + private final BaseDecorator decorator; + private final AtomicReference spanRef; + + public SpanFinishingSubscription( + final BaseDecorator decorator, final AtomicReference spanRef) { + this.decorator = decorator; + this.spanRef = spanRef; + } + + @Override + public void unsubscribe() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + decorator.beforeFinish(span); + span.finish(); + } + } + + @Override + public boolean isUnsubscribed() { + return spanRef.get() == null; + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java new file mode 100644 index 0000000000..8428fa72b1 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java @@ -0,0 +1,58 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import rx.DDTracingUtil; +import rx.Observable; +import rx.Subscriber; + +public class TracedOnSubscribe implements Observable.OnSubscribe { + + private final Observable.OnSubscribe delegate; + private final String operationName; + private final TraceScope.Continuation continuation; + private final BaseDecorator decorator; + + public TracedOnSubscribe( + final Observable originalObservable, + final String operationName, + final BaseDecorator decorator) { + this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable); + this.operationName = operationName; + this.decorator = decorator; + + final Scope parentScope = GlobalTracer.get().scopeManager().active(); + + continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; + } + + @Override + public void call(final Subscriber subscriber) { + final Tracer tracer = GlobalTracer.get(); + final Span span; // span finished by TracedSubscriber + if (continuation != null) { + try (final TraceScope scope = continuation.activate()) { + span = tracer.buildSpan(operationName).start(); + } + } else { + span = tracer.buildSpan(operationName).start(); + } + + afterStart(span); + + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (!((TraceScope) scope).isAsyncPropagating()) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.call(new TracedSubscriber(span, subscriber, decorator)); + } + } + + protected void afterStart(final Span span) { + decorator.afterStart(span); + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java new file mode 100644 index 0000000000..8cefff4381 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java @@ -0,0 +1,109 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.ScopeManager; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscriber; + +public class TracedSubscriber extends Subscriber { + + private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); + private final AtomicReference spanRef; + private final Subscriber delegate; + private final BaseDecorator decorator; + + public TracedSubscriber( + final Span span, final Subscriber delegate, final BaseDecorator decorator) { + spanRef = new AtomicReference<>(span); + this.delegate = delegate; + this.decorator = decorator; + final SpanFinishingSubscription subscription = + new SpanFinishingSubscription(decorator, spanRef); + delegate.add(subscription); + } + + @Override + public void onStart() { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onStart(); + } + } else { + delegate.onStart(); + } + } + + @Override + public void onNext(final T value) { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onNext(value); + } catch (final Throwable e) { + onError(e); + } + } else { + delegate.onNext(value); + } + } + + @Override + public void onCompleted() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + boolean errored = false; + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onCompleted(); + } catch (final Throwable e) { + // Repopulate the spanRef for onError + spanRef.compareAndSet(null, span); + onError(e); + errored = true; + } finally { + // finish called by onError, so don't finish again. + if (!errored) { + decorator.beforeFinish(span); + span.finish(); + } + } + } else { + delegate.onCompleted(); + } + } + + @Override + public void onError(final Throwable e) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + decorator.onError(span, e); + delegate.onError(e); + } catch (final Throwable e2) { + decorator.onError(span, e2); + throw e2; + } finally { + decorator.beforeFinish(span); + span.finish(); + } + } else { + delegate.onError(e); + } + } +} diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java similarity index 100% rename from dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java rename to dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java diff --git a/settings.gradle b/settings.gradle index 5e3510a75c..312f0c2f1a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -84,6 +84,7 @@ include ':dd-java-agent:instrumentation:play-2.4' include ':dd-java-agent:instrumentation:play-2.6' include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' +include ':dd-java-agent:instrumentation:rxjava-1' include ':dd-java-agent:instrumentation:reactor-core-3.1' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3'