Merge pull request #988 from DataDog/landerson/couchbase-subscribe

Couchbase Async Subscriptions
This commit is contained in:
Laplie Anderson 2019-09-13 09:36:07 -04:00 committed by GitHub
commit b875ae19a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 686 additions and 682 deletions

View File

@ -44,6 +44,8 @@ muzzle {
}
dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0'
testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19'

View File

@ -1,6 +1,5 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class)
public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@ -47,13 +38,15 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator",
getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
packageName + ".CouchbaseOnSubscribe",
};
}
@ -76,94 +69,13 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Advice.Enter final int callDepth,
@Advice.Origin final Method method,
@Advice.FieldValue("bucket") final String bucket,
@Advice.AllArguments final Object[] args,
@Advice.Return(readOnly = false) Observable result) {
if (callDepth > 0) {
return;
}
CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, bucket, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
public static class TraceSpanStart implements Action0 {
private final Method method;
private final String bucket;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(
final Method method, final String bucket, final AtomicReference<Span> spanRef) {
this.method = method;
this.bucket = bucket;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.withTag("bucket", bucket)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(span));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket));
}
}
}

View File

@ -1,6 +1,5 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -12,22 +11,14 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class)
public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@ -47,20 +38,22 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator",
getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
packageName + ".CouchbaseOnSubscribe",
};
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(isPublic()).and(returns(named("rx.Observable"))),
isMethod().and(isPublic()).and(returns(named("rx.Observable"))).and(not(named("core"))),
CouchbaseClientAdvice.class.getName());
}
@ -80,84 +73,7 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
return;
}
CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
public static class TraceSpanStart implements Action0 {
private final Method method;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(final Method method, final AtomicReference<Span> spanRef) {
this.method = method;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(scope.span()));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
result = Observable.create(new CouchbaseOnSubscribe(result, method, null));
}
}
}

View File

@ -0,0 +1,36 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe;
import io.opentracing.Span;
import java.lang.reflect.Method;
import rx.Observable;
public class CouchbaseOnSubscribe extends TracedOnSubscribe {
private final String resourceName;
private final String bucket;
public CouchbaseOnSubscribe(
final Observable originalObservable, final Method method, final String bucket) {
super(originalObservable, "couchbase.call", DECORATE);
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
resourceName = className + "." + method.getName();
this.bucket = bucket;
}
@Override
protected void afterStart(final Span span) {
super.afterStart(span);
span.setTag(DDTags.RESOURCE_NAME, resourceName);
if (bucket != null) {
span.setTag("bucket", bucket);
}
}
}

View File

@ -0,0 +1,172 @@
import com.couchbase.client.java.AsyncCluster
import com.couchbase.client.java.CouchbaseAsyncCluster
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.query.N1qlQuery
import spock.util.concurrent.BlockingVariable
import util.AbstractCouchbaseTest
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
def "test hasBucket #type"() {
setup:
def hasBucket = new BlockingVariable<Boolean>()
when:
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) })
})
then:
assert hasBucket.get()
sortAndAssertTraces(1) {
trace(0, 2) {
assertCouchbaseCall(it, 0, "Cluster.openBucket", null)
assertCouchbaseCall(it, 1, "ClusterManager.hasBucket", null, span(0))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single()
type = bucketSettings.type().name()
}
def "test upsert #type"() {
setup:
JsonObject content = JsonObject.create().put("hello", "world")
def inserted = new BlockingVariable<JsonDocument>()
when:
runUnderTrace("someTrace") {
// Connect to the bucket and open it
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
inserted.get().content().getString("hello") == "world"
sortAndAssertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test upsert and get #type"() {
setup:
JsonObject content = JsonObject.create().put("hello", "world")
def inserted = new BlockingVariable<JsonDocument>()
def found = new BlockingVariable<JsonDocument>()
when:
runUnderTrace("someTrace") {
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
bkt.upsert(JsonDocument.create("helloworld", content))
.subscribe({ result ->
inserted.set(result)
bkt.get("helloworld")
.subscribe({ searchResult -> found.set(searchResult)
})
})
})
blockUntilChildSpansFinished(3) // Improve span ordering consistency
}
// Create a JSON document and store it with the ID "helloworld"
then:
found.get() == inserted.get()
found.get().content().getString("hello") == "world"
sortAndAssertTraces(1) {
trace(0, 4) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test query"() {
setup:
// Only couchbase buckets support queries.
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
AsyncCluster cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
def queryResult = new BlockingVariable<JsonObject>()
when:
// Mock expects this specific query.
// See com.couchbase.mock.http.query.QueryServer.handleString.
runUnderTrace("someTrace") {
cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).subscribe({
bkt ->
bkt.query(N1qlQuery.simple("SELECT mockrow"))
.flatMap({ query -> query.rows() })
.single()
.subscribe({ row -> queryResult.set(row.value()) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
queryResult.get().get("row") == "value"
sortAndAssertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.query", bucketCouchbase.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
}
}

View File

@ -1,38 +1,42 @@
import com.couchbase.client.java.Bucket
import com.couchbase.client.java.Cluster
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.query.N1qlQuery
import datadog.trace.api.DDSpanTypes
import io.opentracing.tag.Tags
import util.AbstractCouchbaseTest
class CouchbaseClientTest extends AbstractCouchbaseTest {
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
def "test client #type"() {
class CouchbaseClientTest extends AbstractCouchbaseTest {
def "test hasBucket #type"() {
when:
manager.hasBucket(bucketSettings.name())
def hasBucket = manager.hasBucket(bucketSettings.name())
then:
assertTraces(1) {
assert hasBucket
sortAndAssertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "ClusterManager.hasBucket"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
defaultTags()
}
}
assertCouchbaseCall(it, 0, "ClusterManager.hasBucket")
}
}
TEST_WRITER.clear()
cleanup:
cluster?.disconnect()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
manager = cluster.clusterManager(USERNAME, PASSWORD)
type = bucketSettings.type().name()
}
def "test upsert and get #type"() {
when:
// Connect to the bucket and open it
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
@ -40,70 +44,87 @@ class CouchbaseClientTest extends AbstractCouchbaseTest {
// Create a JSON document and store it with the ID "helloworld"
JsonObject content = JsonObject.create().put("hello", "world")
def inserted = bkt.upsert(JsonDocument.create("helloworld", content))
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
}
}
TEST_WRITER.clear()
when:
def found = bkt.get("helloworld")
then:
found == inserted
found.content().getString("hello") == "world"
and:
assertTraces(1) {
sortAndAssertTraces(3) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Cluster.openBucket")
}
trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name())
}
trace(2, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name())
}
}
TEST_WRITER.clear()
cleanup:
cluster?.disconnect()
environment.shutdown()
where:
manager | cluster | bucketSettings
couchbaseManager | couchbaseCluster | bucketCouchbase
memcacheManager | memcacheCluster | bucketMemcache
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test upsert and get #type under trace"() {
when:
// Connect to the bucket and open it
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
// Create a JSON document and store it with the ID "helloworld"
JsonObject content = JsonObject.create().put("hello", "world")
def inserted
def found
runUnderTrace("someTrace") {
inserted = bkt.upsert(JsonDocument.create("helloworld", content))
found = bkt.get("helloworld")
blockUntilChildSpansFinished(2)
}
then:
found == inserted
found.content().getString("hello") == "world"
sortAndAssertTraces(2) {
trace(0, 1) {
assertCouchbaseCall(it, 0, "Cluster.openBucket")
}
trace(1, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0))
}
}
cleanup:
cluster?.disconnect()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test query"() {
setup:
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
// Only couchbase buckets support queries.
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
Cluster cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
when:
// Mock expects this specific query.
@ -116,31 +137,17 @@ class CouchbaseClientTest extends AbstractCouchbaseTest {
result.first().value().get("row") == "value"
and:
assertTraces(1) {
sortAndAssertTraces(2) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Cluster.openBucket")
}
trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name())
}
}
where:
manager | cluster | bucketSettings
couchbaseManager | couchbaseCluster | bucketCouchbase
// Only couchbase buckets support queries.
type = bucketSettings.type().name()
cleanup:
cluster?.disconnect()
environment.shutdown()
}
}

View File

@ -1,9 +1,10 @@
package springdata
import com.couchbase.client.java.Cluster
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.view.DefaultView
import com.couchbase.client.java.view.DesignDocument
import datadog.trace.api.DDSpanTypes
import io.opentracing.tag.Tags
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.data.repository.CrudRepository
@ -31,21 +32,23 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
DocRepository repo
def setupSpec() {
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
Cluster couchbaseCluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
// Create view for SpringRepository's findAll()
couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager()
.insertDesignDocument(
DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all",
'''
DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all",
'''
function (doc, meta) {
if (doc._class == "springdata.Doc") {
emit(meta.id, null);
}
}
'''.stripIndent()
)))
)
CouchbaseConfig.setEnvironment(couchbaseEnvironment)
)))
)
CouchbaseConfig.setEnvironment(environment)
CouchbaseConfig.setBucketSettings(bucketCouchbase)
// Close all buckets and disconnect
@ -75,21 +78,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name())
}
}
@ -107,21 +96,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name())
}
}
TEST_WRITER.clear()
@ -132,21 +107,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name())
}
}
TEST_WRITER.clear()
@ -160,55 +121,13 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
assertTraces(3) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name())
}
trace(1, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name())
}
trace(2, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name())
}
}
TEST_WRITER.clear()
@ -222,38 +141,10 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and:
assertTraces(2) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.remove"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.remove", bucketCouchbase.name())
}
trace(1, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name())
}
}
}

View File

@ -1,8 +1,10 @@
package springdata
import com.couchbase.client.java.Bucket
import datadog.trace.api.DDSpanTypes
import io.opentracing.tag.Tags
import com.couchbase.client.java.Cluster
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.cluster.ClusterManager
import com.couchbase.client.java.env.CouchbaseEnvironment
import org.springframework.data.couchbase.core.CouchbaseTemplate
import spock.lang.Shared
import util.AbstractCouchbaseTest
@ -12,7 +14,26 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
@Shared
List<CouchbaseTemplate> templates
@Shared
Cluster couchbaseCluster
@Shared
Cluster memcacheCluster
@Shared
protected CouchbaseEnvironment couchbaseEnvironment
@Shared
protected CouchbaseEnvironment memcacheEnvironment
def setupSpec() {
couchbaseEnvironment = envBuilder(bucketCouchbase).build()
memcacheEnvironment = envBuilder(bucketMemcache).build()
couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1"))
memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1"))
ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD)
ClusterManager memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD)
Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password())
@ -20,8 +41,14 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)]
}
def cleanupSpec() {
couchbaseCluster?.disconnect()
memcacheCluster?.disconnect()
couchbaseEnvironment.shutdown()
memcacheEnvironment.shutdown()
}
def "test write/read #name"() {
def "test write #name"() {
setup:
def doc = new Doc()
@ -31,81 +58,42 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
then:
template.findById("1", Doc) != null
and:
assertTraces(2) {
trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", name)
}
trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", name)
}
}
where:
template << templates
name = template.couchbaseBucket.name()
}
def "test remove #name"() {
setup:
def doc = new Doc()
when:
template.save(doc)
template.remove(doc)
then:
template.findById("1", Doc) == null
and:
assertTraces(4) {
assertTraces(3) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.upsert", name)
}
trace(1, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.remove", name)
}
trace(2, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.remove"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
assertCouchbaseCall(it, 0, "Bucket.get", name)
}
}

View File

@ -1,31 +1,33 @@
package util
import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig
import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.bucket.BucketType
import com.couchbase.client.java.cluster.BucketSettings
import com.couchbase.client.java.cluster.ClusterManager
import com.couchbase.client.java.cluster.DefaultBucketSettings
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment
import com.couchbase.mock.Bucket
import com.couchbase.mock.BucketConfiguration
import com.couchbase.mock.CouchbaseMock
import com.couchbase.mock.http.query.QueryServer
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.asserts.ListWriterAssert
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.api.Config
import datadog.trace.api.DDSpanTypes
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentracing.tag.Tags
import spock.lang.Shared
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
abstract class AbstractCouchbaseTest extends AgentTestRunner {
private static final USERNAME = "Administrator"
private static final PASSWORD = "password"
static final USERNAME = "Administrator"
static final PASSWORD = "password"
@Shared
private int port = PortUtils.randomOpenPort()
@ -53,42 +55,16 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
@Shared
CouchbaseMock mock
@Shared
protected CouchbaseCluster couchbaseCluster
@Shared
protected CouchbaseCluster memcacheCluster
@Shared
protected CouchbaseEnvironment couchbaseEnvironment
@Shared
protected CouchbaseEnvironment memcacheEnvironment
@Shared
protected ClusterManager couchbaseManager
@Shared
protected ClusterManager memcacheManager
def setupSpec() {
mock = new CouchbaseMock("127.0.0.1", port, 1, 1)
mock.httpServer.register("/query", new QueryServer())
mock.start()
println "CouchbaseMock listening on localhost:$port"
mock.createBucket(convert(bucketCouchbase))
couchbaseEnvironment = envBuilder(bucketCouchbase).build()
couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1"))
couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD)
mock.createBucket(convert(bucketMemcache))
memcacheEnvironment = envBuilder(bucketMemcache).build()
memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1"))
memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD)
// Cache buckets:
couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password())
// This setting should have no effect since decorator returns null for the instance.
System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true")
}
@ -104,23 +80,12 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
}
def cleanupSpec() {
try {
couchbaseCluster?.disconnect()
} catch (RejectedExecutionException e) {
// already closed by a test?
}
try {
memcacheCluster?.disconnect()
} catch (RejectedExecutionException e) {
// already closed by a test?
}
mock?.stop()
System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE)
}
private DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) {
protected DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) {
// Couchbase seems to be really slow to start sometimes
def timeout = TimeUnit.SECONDS.toMillis(20)
return DefaultCouchbaseEnvironment.builder()
@ -141,4 +106,55 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
.analyticsTimeout(timeout)
.socketConnectTimeout(timeout.intValue())
}
void sortAndAssertTraces(
final int size,
@ClosureParams(value = SimpleType, options = "datadog.trace.agent.test.asserts.ListWriterAssert")
@DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST)
final Closure spec) {
TEST_WRITER.waitForTraces(size)
TEST_WRITER.each {
it.sort({
a, b ->
boolean aIsCouchbaseOperation = a.operationName == "couchbase.call"
boolean bIsCouchbaseOperation = b.operationName == "couchbase.call"
if (aIsCouchbaseOperation && !bIsCouchbaseOperation) {
return 1
} else if (!aIsCouchbaseOperation && bIsCouchbaseOperation) {
return -1
}
return a.resourceName.compareTo(b.resourceName)
})
}
assertTraces(size, spec)
}
void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) {
trace.span(index) {
serviceName "couchbase"
resourceName name
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
if (parentSpan == null) {
parent()
} else {
childOf((DDSpan) parentSpan)
}
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
if (bucketName != null) {
"bucket" bucketName
}
defaultTags()
}
}
}
}

View File

@ -17,11 +17,14 @@ testSets {
}
dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'

View File

@ -8,23 +8,15 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.google.auto.service.AutoService;
import com.netflix.hystrix.HystrixInvokableInfo;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import rx.DDTracingUtil;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
@AutoService(Instrumenter.class)
public class HystrixInstrumentation extends Instrumenter.Default {
@ -47,10 +39,11 @@ public class HystrixInstrumentation extends Instrumenter.Default {
return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".HystrixDecorator",
packageName + ".HystrixInstrumentation$SpanFinishingSubscription",
packageName + ".HystrixInstrumentation$TracedSubscriber",
packageName + ".HystrixInstrumentation$TracedOnSubscribe",
packageName + ".HystrixInstrumentation$HystrixOnSubscribe",
};
}
@ -73,11 +66,8 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable result,
@Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active()));
result = Observable.create(new HystrixOnSubscribe(result, command, "execute"));
}
}
@ -88,171 +78,30 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable<?> result,
@Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active()));
result = Observable.create(new HystrixOnSubscribe(result, command, "fallback"));
}
}
public static class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
public static class HystrixOnSubscribe extends TracedOnSubscribe {
private final HystrixInvokableInfo<?> command;
private final String methodName;
private final TraceScope.Continuation continuation;
public TracedOnSubscribe(
final Observable.OnSubscribe<?> delegate,
public HystrixOnSubscribe(
final Observable originalObservable,
final HystrixInvokableInfo<?> command,
final String methodName,
final Scope parentScope) {
this.delegate = delegate;
final String methodName) {
super(originalObservable, OPERATION_NAME, DECORATE);
this.command = command;
this.methodName = methodName;
continuation =
parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(OPERATION_NAME).start();
}
} else {
span = tracer.buildSpan(OPERATION_NAME).start();
}
DECORATE.afterStart(span);
protected void afterStart(final Span span) {
super.afterStart(span);
DECORATE.onCommand(span, command, methodName);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber));
}
}
}
public static class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
public TracedSubscriber(final Span span, final Subscriber<T> delegate) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
DECORATE.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
DECORATE.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
DECORATE.onError(span, e2);
throw e2;
} finally {
DECORATE.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}
public static class SpanFinishingSubscription implements Subscription {
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
}
}
}

View File

@ -0,0 +1,13 @@
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
}

View File

@ -0,0 +1,31 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import io.opentracing.Span;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
public class SpanFinishingSubscription implements Subscription {
private final BaseDecorator decorator;
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(
final BaseDecorator decorator, final AtomicReference<Span> spanRef) {
this.decorator = decorator;
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
decorator.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
}
}

View File

@ -0,0 +1,58 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import rx.DDTracingUtil;
import rx.Observable;
import rx.Subscriber;
public class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
private final String operationName;
private final TraceScope.Continuation continuation;
private final BaseDecorator decorator;
public TracedOnSubscribe(
final Observable originalObservable,
final String operationName,
final BaseDecorator decorator) {
this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable);
this.operationName = operationName;
this.decorator = decorator;
final Scope parentScope = GlobalTracer.get().scopeManager().active();
continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(operationName).start();
}
} else {
span = tracer.buildSpan(operationName).start();
}
afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber, decorator));
}
}
protected void afterStart(final Span span) {
decorator.afterStart(span);
}
}

View File

@ -0,0 +1,109 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
public class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
private final BaseDecorator decorator;
public TracedSubscriber(
final Span span, final Subscriber<T> delegate, final BaseDecorator decorator) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
this.decorator = decorator;
final SpanFinishingSubscription subscription =
new SpanFinishingSubscription(decorator, spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
decorator.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
decorator.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
decorator.onError(span, e2);
throw e2;
} finally {
decorator.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}

View File

@ -80,6 +80,7 @@ include ':dd-java-agent:instrumentation:play-2.4'
include ':dd-java-agent:instrumentation:play-2.6'
include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7'
include ':dd-java-agent:instrumentation:ratpack-1.4'
include ':dd-java-agent:instrumentation:rxjava-1'
include ':dd-java-agent:instrumentation:reactor-core-3.1'
include ':dd-java-agent:instrumentation:servlet-2'
include ':dd-java-agent:instrumentation:servlet-3'