Extract RxJava instrumentation from Hystrix and add to Couchbase

This commit is contained in:
Laplie Anderson 2019-09-11 19:23:46 -04:00
parent a5b5b0c307
commit 6c445ad030
14 changed files with 337 additions and 362 deletions

View File

@ -44,8 +44,10 @@ muzzle {
}
dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0'
testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19'
testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE'

View File

@ -1,6 +1,5 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class)
public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@ -47,13 +38,15 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator",
getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
packageName + ".CouchbaseOnSubscribe",
};
}
@ -76,94 +69,13 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Advice.Enter final int callDepth,
@Advice.Origin final Method method,
@Advice.FieldValue("bucket") final String bucket,
@Advice.AllArguments final Object[] args,
@Advice.Return(readOnly = false) Observable result) {
if (callDepth > 0) {
return;
}
CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, bucket, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
public static class TraceSpanStart implements Action0 {
private final Method method;
private final String bucket;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(
final Method method, final String bucket, final AtomicReference<Span> spanRef) {
this.method = method;
this.bucket = bucket;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.withTag("bucket", bucket)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(span));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket));
}
}
}

View File

@ -1,6 +1,5 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class)
public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@ -47,13 +38,15 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator",
getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
packageName + ".CouchbaseOnSubscribe",
};
}
@ -80,84 +73,7 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
return;
}
CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
public static class TraceSpanStart implements Action0 {
private final Method method;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(final Method method, final AtomicReference<Span> spanRef) {
this.method = method;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(scope.span()));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
result = Observable.create(new CouchbaseOnSubscribe(result, method, null));
}
}
}

View File

@ -0,0 +1,36 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe;
import io.opentracing.Span;
import java.lang.reflect.Method;
import rx.Observable;
public class CouchbaseOnSubscribe extends TracedOnSubscribe {
private final String resourceName;
private final String bucket;
public CouchbaseOnSubscribe(
final Observable originalObservable, final Method method, final String bucket) {
super(originalObservable, "couchbase.call", DECORATE);
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
resourceName = className + "." + method.getName();
this.bucket = bucket;
}
@Override
protected void afterStart(final Span span) {
super.afterStart(span);
span.setTag(DDTags.RESOURCE_NAME, resourceName);
if (bucket != null) {
span.setTag("bucket", bucket);
}
}
}

View File

@ -42,6 +42,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
@ -51,8 +53,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2))
}
}
@ -81,6 +83,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
})
})
})
blockUntilChildSpansFinished(3) // Improve span ordering consistency
}
// Create a JSON document and store it with the ID "helloworld"
@ -91,10 +95,10 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
assertTraces(1) {
trace(0, 4) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2))
}
}
@ -121,6 +125,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
.single()
.subscribe({ row -> queryResult.set(row.value()) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
@ -130,8 +136,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.query", bucketSettings.name(), span(2))
}
}

View File

@ -4,6 +4,9 @@ import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.query.N1qlQuery
import util.AbstractCouchbaseTest
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class CouchbaseClientTest extends AbstractCouchbaseTest {
def "test hasBucket #type"() {
@ -57,6 +60,42 @@ class CouchbaseClientTest extends AbstractCouchbaseTest {
type = bucketSettings.type().name()
}
def "test upsert and get #type under trace"() {
when:
// Connect to the bucket and open it
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
// Create a JSON document and store it with the ID "helloworld"
JsonObject content = JsonObject.create().put("hello", "world")
def inserted
def found
runUnderTrace("someTrace") {
inserted = bkt.upsert(JsonDocument.create("helloworld", content))
found = bkt.get("helloworld")
}
then:
found == inserted
found.content().getString("hello") == "world"
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0))
}
}
where:
manager | cluster | bucketSettings
couchbaseManager | couchbaseCluster | bucketCouchbase
memcacheManager | memcacheCluster | bucketMemcache
type = bucketSettings.type().name()
}
def "test query"() {
setup:
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())

View File

@ -17,11 +17,14 @@ testSets {
}
dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'

View File

@ -8,23 +8,15 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.google.auto.service.AutoService;
import com.netflix.hystrix.HystrixInvokableInfo;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.DDTracingUtil;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
@AutoService(Instrumenter.class)
public class HystrixInstrumentation extends Instrumenter.Default {
@ -47,10 +39,11 @@ public class HystrixInstrumentation extends Instrumenter.Default {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".HystrixDecorator",
packageName + ".HystrixInstrumentation$SpanFinishingSubscription",
packageName + ".HystrixInstrumentation$TracedSubscriber",
packageName + ".HystrixInstrumentation$TracedOnSubscribe",
packageName + ".HystrixInstrumentation$HystrixOnSubscribe",
};
}
@ -73,11 +66,8 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable result,
@Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active()));
result = Observable.create(new HystrixOnSubscribe(result, command, "execute"));
}
}
@ -88,171 +78,30 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable<?> result,
@Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active()));
result = Observable.create(new HystrixOnSubscribe(result, command, "fallback"));
}
}
public static class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
public static class HystrixOnSubscribe extends TracedOnSubscribe {
private final HystrixInvokableInfo<?> command;
private final String methodName;
private final TraceScope.Continuation continuation;
public TracedOnSubscribe(
final Observable.OnSubscribe<?> delegate,
public HystrixOnSubscribe(
final Observable originalObservable,
final HystrixInvokableInfo<?> command,
final String methodName,
final Scope parentScope) {
this.delegate = delegate;
final String methodName) {
super(originalObservable, OPERATION_NAME, DECORATE);
this.command = command;
this.methodName = methodName;
continuation =
parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(OPERATION_NAME).start();
}
} else {
span = tracer.buildSpan(OPERATION_NAME).start();
}
DECORATE.afterStart(span);
protected void afterStart(final Span span) {
super.afterStart(span);
DECORATE.onCommand(span, command, methodName);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber));
}
}
}
public static class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
public TracedSubscriber(final Span span, final Subscriber<T> delegate) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
DECORATE.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
DECORATE.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
DECORATE.onError(span, e2);
throw e2;
} finally {
DECORATE.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}
public static class SpanFinishingSubscription implements Subscription {
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
}
}
}

View File

@ -0,0 +1,13 @@
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
}

View File

@ -0,0 +1,31 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import io.opentracing.Span;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
public class SpanFinishingSubscription implements Subscription {
private final BaseDecorator decorator;
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(
final BaseDecorator decorator, final AtomicReference<Span> spanRef) {
this.decorator = decorator;
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
decorator.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
}
}

View File

@ -0,0 +1,58 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import rx.DDTracingUtil;
import rx.Observable;
import rx.Subscriber;
public class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
private final String operationName;
private final TraceScope.Continuation continuation;
private final BaseDecorator decorator;
public TracedOnSubscribe(
final Observable originalObservable,
final String operationName,
final BaseDecorator decorator) {
this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable);
this.operationName = operationName;
this.decorator = decorator;
final Scope parentScope = GlobalTracer.get().scopeManager().active();
continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(operationName).start();
}
} else {
span = tracer.buildSpan(operationName).start();
}
afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber, decorator));
}
}
protected void afterStart(final Span span) {
decorator.afterStart(span);
}
}

View File

@ -0,0 +1,109 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
public class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
private final BaseDecorator decorator;
public TracedSubscriber(
final Span span, final Subscriber<T> delegate, final BaseDecorator decorator) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
this.decorator = decorator;
final SpanFinishingSubscription subscription =
new SpanFinishingSubscription(decorator, spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
decorator.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
decorator.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
decorator.onError(span, e2);
throw e2;
} finally {
decorator.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}

View File

@ -84,6 +84,7 @@ include ':dd-java-agent:instrumentation:play-2.4'
include ':dd-java-agent:instrumentation:play-2.6'
include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7'
include ':dd-java-agent:instrumentation:ratpack-1.4'
include ':dd-java-agent:instrumentation:rxjava-1'
include ':dd-java-agent:instrumentation:reactor-core-3.1'
include ':dd-java-agent:instrumentation:servlet-2'
include ':dd-java-agent:instrumentation:servlet-3'