Revert "Couchbase Async Subscriptions"

This commit is contained in:
Laplie Anderson 2019-09-13 11:51:52 -04:00 committed by GitHub
parent b875ae19a2
commit d063b42491
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 682 additions and 686 deletions

View File

@ -44,10 +44,8 @@ muzzle {
} }
dependencies { dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0' compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0'
testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19' testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19'
testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE' testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE'

View File

@ -1,5 +1,6 @@
package datadog.trace.instrumentation.couchbase.client; package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -11,14 +12,22 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable; import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public class CouchbaseBucketInstrumentation extends Instrumenter.Default { public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@ -38,15 +47,13 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return new String[] { return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator", packageName + ".CouchbaseClientDecorator",
packageName + ".CouchbaseOnSubscribe", getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
}; };
} }
@ -69,13 +76,94 @@ public class CouchbaseBucketInstrumentation extends Instrumenter.Default {
@Advice.Enter final int callDepth, @Advice.Enter final int callDepth,
@Advice.Origin final Method method, @Advice.Origin final Method method,
@Advice.FieldValue("bucket") final String bucket, @Advice.FieldValue("bucket") final String bucket,
@Advice.AllArguments final Object[] args,
@Advice.Return(readOnly = false) Observable result) { @Advice.Return(readOnly = false) Observable result) {
if (callDepth > 0) { if (callDepth > 0) {
return; return;
} }
CallDepthThreadLocalMap.reset(CouchbaseCluster.class); CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, bucket, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket)); public static class TraceSpanStart implements Action0 {
private final Method method;
private final String bucket;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(
final Method method, final String bucket, final AtomicReference<Span> spanRef) {
this.method = method;
this.bucket = bucket;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.withTag("bucket", bucket)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(span));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
} }
} }
} }

View File

@ -1,5 +1,6 @@
package datadog.trace.instrumentation.couchbase.client; package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
@ -11,14 +12,22 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.CouchbaseCluster;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import rx.Observable; import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public class CouchbaseClusterInstrumentation extends Instrumenter.Default { public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@ -38,22 +47,20 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return new String[] { return new String[] {
"rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.ClientDecorator",
"datadog.trace.agent.decorator.DatabaseClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".CouchbaseClientDecorator", packageName + ".CouchbaseClientDecorator",
packageName + ".CouchbaseOnSubscribe", getClass().getName() + "$TraceSpanStart",
getClass().getName() + "$TraceSpanFinish",
getClass().getName() + "$TraceSpanError",
}; };
} }
@Override @Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() { public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap( return singletonMap(
isMethod().and(isPublic()).and(returns(named("rx.Observable"))).and(not(named("core"))), isMethod().and(isPublic()).and(returns(named("rx.Observable"))),
CouchbaseClientAdvice.class.getName()); CouchbaseClientAdvice.class.getName());
} }
@ -73,7 +80,84 @@ public class CouchbaseClusterInstrumentation extends Instrumenter.Default {
return; return;
} }
CallDepthThreadLocalMap.reset(CouchbaseCluster.class); CallDepthThreadLocalMap.reset(CouchbaseCluster.class);
result = Observable.create(new CouchbaseOnSubscribe(result, method, null)); final AtomicReference<Span> spanRef = new AtomicReference<>();
result =
result
.doOnSubscribe(new TraceSpanStart(method, spanRef))
.doOnCompleted(new TraceSpanFinish(spanRef))
.doOnError(new TraceSpanError(spanRef));
}
}
public static class TraceSpanStart implements Action0 {
private final Method method;
private final AtomicReference<Span> spanRef;
public TraceSpanStart(final Method method, final AtomicReference<Span> spanRef) {
this.method = method;
this.spanRef = spanRef;
}
@Override
public void call() {
// This is called each time an observer has a new subscriber, but we should only time it once.
if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) {
return;
}
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
final String resourceName = className + "." + method.getName();
final Span span =
GlobalTracer.get()
.buildSpan("couchbase.call")
.withTag(DDTags.RESOURCE_NAME, resourceName)
.start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
// just replace the no-op span.
spanRef.set(DECORATE.afterStart(scope.span()));
}
}
}
public static class TraceSpanFinish implements Action0 {
private final AtomicReference<Span> spanRef;
public TraceSpanFinish(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.beforeFinish(span);
span.finish();
}
}
}
}
public static class TraceSpanError implements Action1<Throwable> {
private final AtomicReference<Span> spanRef;
public TraceSpanError(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void call(final Throwable throwable) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
}
}
} }
} }
} }

View File

@ -1,36 +0,0 @@
package datadog.trace.instrumentation.couchbase.client;
import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe;
import io.opentracing.Span;
import java.lang.reflect.Method;
import rx.Observable;
public class CouchbaseOnSubscribe extends TracedOnSubscribe {
private final String resourceName;
private final String bucket;
public CouchbaseOnSubscribe(
final Observable originalObservable, final Method method, final String bucket) {
super(originalObservable, "couchbase.call", DECORATE);
final Class<?> declaringClass = method.getDeclaringClass();
final String className =
declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", "");
resourceName = className + "." + method.getName();
this.bucket = bucket;
}
@Override
protected void afterStart(final Span span) {
super.afterStart(span);
span.setTag(DDTags.RESOURCE_NAME, resourceName);
if (bucket != null) {
span.setTag("bucket", bucket);
}
}
}

View File

@ -1,172 +0,0 @@
import com.couchbase.client.java.AsyncCluster
import com.couchbase.client.java.CouchbaseAsyncCluster
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.query.N1qlQuery
import spock.util.concurrent.BlockingVariable
import util.AbstractCouchbaseTest
import java.util.concurrent.TimeUnit
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class CouchbaseAsyncClientTest extends AbstractCouchbaseTest {
def "test hasBucket #type"() {
setup:
def hasBucket = new BlockingVariable<Boolean>()
when:
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) })
})
then:
assert hasBucket.get()
sortAndAssertTraces(1) {
trace(0, 2) {
assertCouchbaseCall(it, 0, "Cluster.openBucket", null)
assertCouchbaseCall(it, 1, "ClusterManager.hasBucket", null, span(0))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single()
type = bucketSettings.type().name()
}
def "test upsert #type"() {
setup:
JsonObject content = JsonObject.create().put("hello", "world")
def inserted = new BlockingVariable<JsonDocument>()
when:
runUnderTrace("someTrace") {
// Connect to the bucket and open it
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
inserted.get().content().getString("hello") == "world"
sortAndAssertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test upsert and get #type"() {
setup:
JsonObject content = JsonObject.create().put("hello", "world")
def inserted = new BlockingVariable<JsonDocument>()
def found = new BlockingVariable<JsonDocument>()
when:
runUnderTrace("someTrace") {
cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt ->
bkt.upsert(JsonDocument.create("helloworld", content))
.subscribe({ result ->
inserted.set(result)
bkt.get("helloworld")
.subscribe({ searchResult -> found.set(searchResult)
})
})
})
blockUntilChildSpansFinished(3) // Improve span ordering consistency
}
// Create a JSON document and store it with the ID "helloworld"
then:
found.get() == inserted.get()
found.get().content().getString("hello") == "world"
sortAndAssertTraces(1) {
trace(0, 4) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test query"() {
setup:
// Only couchbase buckets support queries.
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
AsyncCluster cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1"))
def queryResult = new BlockingVariable<JsonObject>()
when:
// Mock expects this specific query.
// See com.couchbase.mock.http.query.QueryServer.handleString.
runUnderTrace("someTrace") {
cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).subscribe({
bkt ->
bkt.query(N1qlQuery.simple("SELECT mockrow"))
.flatMap({ query -> query.rows() })
.single()
.subscribe({ row -> queryResult.set(row.value()) })
})
blockUntilChildSpansFinished(2) // Improve span ordering consistency
}
then:
queryResult.get().get("row") == "value"
sortAndAssertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0))
assertCouchbaseCall(it, 1, "Bucket.query", bucketCouchbase.name(), span(2))
}
}
cleanup:
cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single()
environment.shutdown()
}
}

View File

@ -1,42 +1,38 @@
import com.couchbase.client.java.Bucket import com.couchbase.client.java.Bucket
import com.couchbase.client.java.Cluster
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.query.N1qlQuery import com.couchbase.client.java.query.N1qlQuery
import datadog.trace.api.DDSpanTypes
import io.opentracing.tag.Tags
import util.AbstractCouchbaseTest import util.AbstractCouchbaseTest
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class CouchbaseClientTest extends AbstractCouchbaseTest { class CouchbaseClientTest extends AbstractCouchbaseTest {
def "test hasBucket #type"() {
def "test client #type"() {
when: when:
def hasBucket = manager.hasBucket(bucketSettings.name()) manager.hasBucket(bucketSettings.name())
then: then:
assert hasBucket assertTraces(1) {
sortAndAssertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") span(0) {
serviceName "couchbase"
resourceName "ClusterManager.hasBucket"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
defaultTags()
}
}
} }
} }
TEST_WRITER.clear()
cleanup:
cluster?.disconnect()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
manager = cluster.clusterManager(USERNAME, PASSWORD)
type = bucketSettings.type().name()
}
def "test upsert and get #type"() {
when: when:
// Connect to the bucket and open it // Connect to the bucket and open it
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
@ -44,87 +40,70 @@ class CouchbaseClientTest extends AbstractCouchbaseTest {
// Create a JSON document and store it with the ID "helloworld" // Create a JSON document and store it with the ID "helloworld"
JsonObject content = JsonObject.create().put("hello", "world") JsonObject content = JsonObject.create().put("hello", "world")
def inserted = bkt.upsert(JsonDocument.create("helloworld", content)) def inserted = bkt.upsert(JsonDocument.create("helloworld", content))
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
}
}
TEST_WRITER.clear()
when:
def found = bkt.get("helloworld") def found = bkt.get("helloworld")
then: then:
found == inserted found == inserted
found.content().getString("hello") == "world" found.content().getString("hello") == "world"
sortAndAssertTraces(3) { and:
assertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Cluster.openBucket") span(0) {
} serviceName "couchbase"
trace(1, 1) { resourceName "Bucket.get"
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) operationName "couchbase.call"
} spanType DDSpanTypes.COUCHBASE
trace(2, 1) { errored false
assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name()) parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
} }
} }
TEST_WRITER.clear()
cleanup:
cluster?.disconnect()
environment.shutdown()
where: where:
bucketSettings << [bucketCouchbase, bucketMemcache] manager | cluster | bucketSettings
couchbaseManager | couchbaseCluster | bucketCouchbase
memcacheManager | memcacheCluster | bucketMemcache
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name()
}
def "test upsert and get #type under trace"() {
when:
// Connect to the bucket and open it
Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
// Create a JSON document and store it with the ID "helloworld"
JsonObject content = JsonObject.create().put("hello", "world")
def inserted
def found
runUnderTrace("someTrace") {
inserted = bkt.upsert(JsonDocument.create("helloworld", content))
found = bkt.get("helloworld")
blockUntilChildSpansFinished(2)
}
then:
found == inserted
found.content().getString("hello") == "world"
sortAndAssertTraces(2) {
trace(0, 1) {
assertCouchbaseCall(it, 0, "Cluster.openBucket")
}
trace(1, 3) {
basicSpan(it, 0, "someTrace")
assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0))
assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0))
}
}
cleanup:
cluster?.disconnect()
environment.shutdown()
where:
bucketSettings << [bucketCouchbase, bucketMemcache]
environment = envBuilder(bucketSettings).build()
cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
type = bucketSettings.type().name() type = bucketSettings.type().name()
} }
def "test query"() { def "test query"() {
setup: setup:
// Only couchbase buckets support queries. Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password())
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
Cluster cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
when: when:
// Mock expects this specific query. // Mock expects this specific query.
@ -137,17 +116,31 @@ class CouchbaseClientTest extends AbstractCouchbaseTest {
result.first().value().get("row") == "value" result.first().value().get("row") == "value"
and: and:
sortAndAssertTraces(2) { assertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Cluster.openBucket") span(0) {
} serviceName "couchbase"
trace(1, 1) { resourceName "Bucket.query"
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bkt.name()
defaultTags()
}
}
} }
} }
cleanup: where:
cluster?.disconnect() manager | cluster | bucketSettings
environment.shutdown() couchbaseManager | couchbaseCluster | bucketCouchbase
// Only couchbase buckets support queries.
type = bucketSettings.type().name()
} }
} }

View File

@ -1,10 +1,9 @@
package springdata package springdata
import com.couchbase.client.java.Cluster
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.view.DefaultView import com.couchbase.client.java.view.DefaultView
import com.couchbase.client.java.view.DesignDocument import com.couchbase.client.java.view.DesignDocument
import datadog.trace.api.DDSpanTypes
import io.opentracing.tag.Tags
import org.springframework.context.ConfigurableApplicationContext import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.data.repository.CrudRepository import org.springframework.data.repository.CrudRepository
@ -32,23 +31,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
DocRepository repo DocRepository repo
def setupSpec() { def setupSpec() {
CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build()
Cluster couchbaseCluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1"))
// Create view for SpringRepository's findAll() // Create view for SpringRepository's findAll()
couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager() couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager()
.insertDesignDocument( .insertDesignDocument(
DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all",
''' '''
function (doc, meta) { function (doc, meta) {
if (doc._class == "springdata.Doc") { if (doc._class == "springdata.Doc") {
emit(meta.id, null); emit(meta.id, null);
} }
} }
'''.stripIndent() '''.stripIndent()
))) )))
) )
CouchbaseConfig.setEnvironment(environment) CouchbaseConfig.setEnvironment(couchbaseEnvironment)
CouchbaseConfig.setBucketSettings(bucketCouchbase) CouchbaseConfig.setBucketSettings(bucketCouchbase)
// Close all buckets and disconnect // Close all buckets and disconnect
@ -78,7 +75,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and: and:
assertTraces(1) { assertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
} }
@ -96,7 +107,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and: and:
assertTraces(1) { assertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
} }
TEST_WRITER.clear() TEST_WRITER.clear()
@ -107,7 +132,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and: and:
assertTraces(1) { assertTraces(1) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
} }
TEST_WRITER.clear() TEST_WRITER.clear()
@ -121,13 +160,55 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
assertTraces(3) { assertTraces(3) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
trace(1, 1) { trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
trace(2, 1) { trace(2, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
} }
TEST_WRITER.clear() TEST_WRITER.clear()
@ -141,10 +222,38 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest {
and: and:
assertTraces(2) { assertTraces(2) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.remove", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.remove"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
trace(1, 1) { trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) span(0) {
serviceName "couchbase"
resourceName "Bucket.query"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" bucketCouchbase.name()
defaultTags()
}
}
} }
} }
} }

View File

@ -1,10 +1,8 @@
package springdata package springdata
import com.couchbase.client.java.Bucket import com.couchbase.client.java.Bucket
import com.couchbase.client.java.Cluster import datadog.trace.api.DDSpanTypes
import com.couchbase.client.java.CouchbaseCluster import io.opentracing.tag.Tags
import com.couchbase.client.java.cluster.ClusterManager
import com.couchbase.client.java.env.CouchbaseEnvironment
import org.springframework.data.couchbase.core.CouchbaseTemplate import org.springframework.data.couchbase.core.CouchbaseTemplate
import spock.lang.Shared import spock.lang.Shared
import util.AbstractCouchbaseTest import util.AbstractCouchbaseTest
@ -14,26 +12,7 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
@Shared @Shared
List<CouchbaseTemplate> templates List<CouchbaseTemplate> templates
@Shared
Cluster couchbaseCluster
@Shared
Cluster memcacheCluster
@Shared
protected CouchbaseEnvironment couchbaseEnvironment
@Shared
protected CouchbaseEnvironment memcacheEnvironment
def setupSpec() { def setupSpec() {
couchbaseEnvironment = envBuilder(bucketCouchbase).build()
memcacheEnvironment = envBuilder(bucketMemcache).build()
couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1"))
memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1"))
ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD)
ClusterManager memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD)
Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password())
@ -41,14 +20,8 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)] new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)]
} }
def cleanupSpec() {
couchbaseCluster?.disconnect()
memcacheCluster?.disconnect()
couchbaseEnvironment.shutdown()
memcacheEnvironment.shutdown()
}
def "test write #name"() { def "test write/read #name"() {
setup: setup:
def doc = new Doc() def doc = new Doc()
@ -58,42 +31,81 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest {
then: then:
template.findById("1", Doc) != null template.findById("1", Doc) != null
and:
assertTraces(2) {
trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", name)
}
trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", name)
}
}
where:
template << templates
name = template.couchbaseBucket.name()
}
def "test remove #name"() {
setup:
def doc = new Doc()
when: when:
template.save(doc)
template.remove(doc) template.remove(doc)
then: then:
template.findById("1", Doc) == null template.findById("1", Doc) == null
and: and:
assertTraces(3) { assertTraces(4) {
trace(0, 1) { trace(0, 1) {
assertCouchbaseCall(it, 0, "Bucket.upsert", name) span(0) {
serviceName "couchbase"
resourceName "Bucket.upsert"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
} }
trace(1, 1) { trace(1, 1) {
assertCouchbaseCall(it, 0, "Bucket.remove", name) span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
} }
trace(2, 1) { trace(2, 1) {
assertCouchbaseCall(it, 0, "Bucket.get", name) span(0) {
serviceName "couchbase"
resourceName "Bucket.remove"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "couchbase"
resourceName "Bucket.get"
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
parent()
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"bucket" name
defaultTags()
}
}
} }
} }

View File

@ -1,33 +1,31 @@
package util package util
import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig
import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig
import com.couchbase.client.java.CouchbaseCluster
import com.couchbase.client.java.bucket.BucketType import com.couchbase.client.java.bucket.BucketType
import com.couchbase.client.java.cluster.BucketSettings import com.couchbase.client.java.cluster.BucketSettings
import com.couchbase.client.java.cluster.ClusterManager
import com.couchbase.client.java.cluster.DefaultBucketSettings import com.couchbase.client.java.cluster.DefaultBucketSettings
import com.couchbase.client.java.env.CouchbaseEnvironment
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment import com.couchbase.client.java.env.DefaultCouchbaseEnvironment
import com.couchbase.mock.Bucket import com.couchbase.mock.Bucket
import com.couchbase.mock.BucketConfiguration import com.couchbase.mock.BucketConfiguration
import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.CouchbaseMock
import com.couchbase.mock.http.query.QueryServer import com.couchbase.mock.http.query.QueryServer
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.asserts.ListWriterAssert
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.agent.test.utils.PortUtils import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.api.Config import datadog.trace.api.Config
import datadog.trace.api.DDSpanTypes
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentracing.tag.Tags
import spock.lang.Shared import spock.lang.Shared
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
abstract class AbstractCouchbaseTest extends AgentTestRunner { abstract class AbstractCouchbaseTest extends AgentTestRunner {
static final USERNAME = "Administrator" private static final USERNAME = "Administrator"
static final PASSWORD = "password" private static final PASSWORD = "password"
@Shared @Shared
private int port = PortUtils.randomOpenPort() private int port = PortUtils.randomOpenPort()
@ -55,16 +53,42 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
@Shared @Shared
CouchbaseMock mock CouchbaseMock mock
@Shared
protected CouchbaseCluster couchbaseCluster
@Shared
protected CouchbaseCluster memcacheCluster
@Shared
protected CouchbaseEnvironment couchbaseEnvironment
@Shared
protected CouchbaseEnvironment memcacheEnvironment
@Shared
protected ClusterManager couchbaseManager
@Shared
protected ClusterManager memcacheManager
def setupSpec() { def setupSpec() {
mock = new CouchbaseMock("127.0.0.1", port, 1, 1) mock = new CouchbaseMock("127.0.0.1", port, 1, 1)
mock.httpServer.register("/query", new QueryServer()) mock.httpServer.register("/query", new QueryServer())
mock.start() mock.start()
println "CouchbaseMock listening on localhost:$port" println "CouchbaseMock listening on localhost:$port"
mock.createBucket(convert(bucketCouchbase)) mock.createBucket(convert(bucketCouchbase))
couchbaseEnvironment = envBuilder(bucketCouchbase).build()
couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1"))
couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD)
mock.createBucket(convert(bucketMemcache)) mock.createBucket(convert(bucketMemcache))
memcacheEnvironment = envBuilder(bucketMemcache).build()
memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1"))
memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD)
// Cache buckets:
couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password())
memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password())
// This setting should have no effect since decorator returns null for the instance. // This setting should have no effect since decorator returns null for the instance.
System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true") System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true")
} }
@ -80,12 +104,23 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
} }
def cleanupSpec() { def cleanupSpec() {
try {
couchbaseCluster?.disconnect()
} catch (RejectedExecutionException e) {
// already closed by a test?
}
try {
memcacheCluster?.disconnect()
} catch (RejectedExecutionException e) {
// already closed by a test?
}
mock?.stop() mock?.stop()
System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE) System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE)
} }
protected DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { private DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) {
// Couchbase seems to be really slow to start sometimes // Couchbase seems to be really slow to start sometimes
def timeout = TimeUnit.SECONDS.toMillis(20) def timeout = TimeUnit.SECONDS.toMillis(20)
return DefaultCouchbaseEnvironment.builder() return DefaultCouchbaseEnvironment.builder()
@ -106,55 +141,4 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner {
.analyticsTimeout(timeout) .analyticsTimeout(timeout)
.socketConnectTimeout(timeout.intValue()) .socketConnectTimeout(timeout.intValue())
} }
void sortAndAssertTraces(
final int size,
@ClosureParams(value = SimpleType, options = "datadog.trace.agent.test.asserts.ListWriterAssert")
@DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST)
final Closure spec) {
TEST_WRITER.waitForTraces(size)
TEST_WRITER.each {
it.sort({
a, b ->
boolean aIsCouchbaseOperation = a.operationName == "couchbase.call"
boolean bIsCouchbaseOperation = b.operationName == "couchbase.call"
if (aIsCouchbaseOperation && !bIsCouchbaseOperation) {
return 1
} else if (!aIsCouchbaseOperation && bIsCouchbaseOperation) {
return -1
}
return a.resourceName.compareTo(b.resourceName)
})
}
assertTraces(size, spec)
}
void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) {
trace.span(index) {
serviceName "couchbase"
resourceName name
operationName "couchbase.call"
spanType DDSpanTypes.COUCHBASE
errored false
if (parentSpan == null) {
parent()
} else {
childOf((DDSpan) parentSpan)
}
tags {
"$Tags.COMPONENT.key" "couchbase-client"
"$Tags.DB_TYPE.key" "couchbase"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
if (bucketName != null) {
"bucket" bucketName
}
defaultTags()
}
}
}
} }

View File

@ -17,14 +17,11 @@ testSets {
} }
dependencies { dependencies {
compile project(':dd-java-agent:instrumentation:rxjava-1')
compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation') testCompile project(':dd-java-agent:instrumentation:trace-annotation')
testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0'

View File

@ -8,15 +8,23 @@ import static net.bytebuddy.matcher.ElementMatchers.returns;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import com.netflix.hystrix.HystrixInvokableInfo; import com.netflix.hystrix.HystrixInvokableInfo;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import rx.DDTracingUtil;
import rx.Observable; import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public class HystrixInstrumentation extends Instrumenter.Default { public class HystrixInstrumentation extends Instrumenter.Default {
@ -39,11 +47,10 @@ public class HystrixInstrumentation extends Instrumenter.Default {
return new String[] { return new String[] {
"rx.DDTracingUtil", "rx.DDTracingUtil",
"datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.instrumentation.rxjava.SpanFinishingSubscription",
"datadog.trace.instrumentation.rxjava.TracedSubscriber",
"datadog.trace.instrumentation.rxjava.TracedOnSubscribe",
packageName + ".HystrixDecorator", packageName + ".HystrixDecorator",
packageName + ".HystrixInstrumentation$HystrixOnSubscribe", packageName + ".HystrixInstrumentation$SpanFinishingSubscription",
packageName + ".HystrixInstrumentation$TracedSubscriber",
packageName + ".HystrixInstrumentation$TracedOnSubscribe",
}; };
} }
@ -66,8 +73,11 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command, @Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable result, @Advice.Return(readOnly = false) Observable result,
@Advice.Thrown final Throwable throwable) { @Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result = Observable.create(new HystrixOnSubscribe(result, command, "execute")); result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active()));
} }
} }
@ -78,30 +88,171 @@ public class HystrixInstrumentation extends Instrumenter.Default {
@Advice.This final HystrixInvokableInfo<?> command, @Advice.This final HystrixInvokableInfo<?> command,
@Advice.Return(readOnly = false) Observable<?> result, @Advice.Return(readOnly = false) Observable<?> result,
@Advice.Thrown final Throwable throwable) { @Advice.Thrown final Throwable throwable) {
final Observable.OnSubscribe<?> onSubscribe = DDTracingUtil.extractOnSubscribe(result);
result = Observable.create(new HystrixOnSubscribe(result, command, "fallback")); result =
Observable.create(
new TracedOnSubscribe(
onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active()));
} }
} }
public static class HystrixOnSubscribe extends TracedOnSubscribe { public static class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
private final HystrixInvokableInfo<?> command; private final HystrixInvokableInfo<?> command;
private final String methodName; private final String methodName;
private final TraceScope.Continuation continuation;
public HystrixOnSubscribe( public TracedOnSubscribe(
final Observable originalObservable, final Observable.OnSubscribe<?> delegate,
final HystrixInvokableInfo<?> command, final HystrixInvokableInfo<?> command,
final String methodName) { final String methodName,
super(originalObservable, OPERATION_NAME, DECORATE); final Scope parentScope) {
this.delegate = delegate;
this.command = command; this.command = command;
this.methodName = methodName; this.methodName = methodName;
continuation =
parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
} }
@Override @Override
protected void afterStart(final Span span) { public void call(final Subscriber<? super T> subscriber) {
super.afterStart(span); final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(OPERATION_NAME).start();
}
} else {
span = tracer.buildSpan(OPERATION_NAME).start();
}
DECORATE.afterStart(span);
DECORATE.onCommand(span, command, methodName); DECORATE.onCommand(span, command, methodName);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber));
}
}
}
public static class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
public TracedSubscriber(final Span span, final Subscriber<T> delegate) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
DECORATE.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
DECORATE.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
DECORATE.onError(span, e2);
throw e2;
} finally {
DECORATE.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}
public static class SpanFinishingSubscription implements Subscription {
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(final AtomicReference<Span> spanRef) {
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
DECORATE.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
} }
} }
} }

View File

@ -1,13 +0,0 @@
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7'
}

View File

@ -1,31 +0,0 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import io.opentracing.Span;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscription;
public class SpanFinishingSubscription implements Subscription {
private final BaseDecorator decorator;
private final AtomicReference<Span> spanRef;
public SpanFinishingSubscription(
final BaseDecorator decorator, final AtomicReference<Span> spanRef) {
this.decorator = decorator;
this.spanRef = spanRef;
}
@Override
public void unsubscribe() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
decorator.beforeFinish(span);
span.finish();
}
}
@Override
public boolean isUnsubscribed() {
return spanRef.get() == null;
}
}

View File

@ -1,58 +0,0 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import rx.DDTracingUtil;
import rx.Observable;
import rx.Subscriber;
public class TracedOnSubscribe<T> implements Observable.OnSubscribe<T> {
private final Observable.OnSubscribe<?> delegate;
private final String operationName;
private final TraceScope.Continuation continuation;
private final BaseDecorator decorator;
public TracedOnSubscribe(
final Observable originalObservable,
final String operationName,
final BaseDecorator decorator) {
this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable);
this.operationName = operationName;
this.decorator = decorator;
final Scope parentScope = GlobalTracer.get().scopeManager().active();
continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Tracer tracer = GlobalTracer.get();
final Span span; // span finished by TracedSubscriber
if (continuation != null) {
try (final TraceScope scope = continuation.activate()) {
span = tracer.buildSpan(operationName).start();
}
} else {
span = tracer.buildSpan(operationName).start();
}
afterStart(span);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (!((TraceScope) scope).isAsyncPropagating()) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.call(new TracedSubscriber(span, subscriber, decorator));
}
}
protected void afterStart(final Span span) {
decorator.afterStart(span);
}
}

View File

@ -1,109 +0,0 @@
package datadog.trace.instrumentation.rxjava;
import datadog.trace.agent.decorator.BaseDecorator;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
public class TracedSubscriber<T> extends Subscriber<T> {
private final ScopeManager scopeManager = GlobalTracer.get().scopeManager();
private final AtomicReference<Span> spanRef;
private final Subscriber<T> delegate;
private final BaseDecorator decorator;
public TracedSubscriber(
final Span span, final Subscriber<T> delegate, final BaseDecorator decorator) {
spanRef = new AtomicReference<>(span);
this.delegate = delegate;
this.decorator = decorator;
final SpanFinishingSubscription subscription =
new SpanFinishingSubscription(decorator, spanRef);
delegate.add(subscription);
}
@Override
public void onStart() {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onStart();
}
} else {
delegate.onStart();
}
}
@Override
public void onNext(final T value) {
final Span span = spanRef.get();
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onNext(value);
} catch (final Throwable e) {
onError(e);
}
} else {
delegate.onNext(value);
}
}
@Override
public void onCompleted() {
final Span span = spanRef.getAndSet(null);
if (span != null) {
boolean errored = false;
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
delegate.onCompleted();
} catch (final Throwable e) {
// Repopulate the spanRef for onError
spanRef.compareAndSet(null, span);
onError(e);
errored = true;
} finally {
// finish called by onError, so don't finish again.
if (!errored) {
decorator.beforeFinish(span);
span.finish();
}
}
} else {
delegate.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
final Span span = spanRef.getAndSet(null);
if (span != null) {
try (final Scope scope = scopeManager.activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
decorator.onError(span, e);
delegate.onError(e);
} catch (final Throwable e2) {
decorator.onError(span, e2);
throw e2;
} finally {
decorator.beforeFinish(span);
span.finish();
}
} else {
delegate.onError(e);
}
}
}

View File

@ -80,7 +80,6 @@ include ':dd-java-agent:instrumentation:play-2.4'
include ':dd-java-agent:instrumentation:play-2.6' include ':dd-java-agent:instrumentation:play-2.6'
include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7'
include ':dd-java-agent:instrumentation:ratpack-1.4' include ':dd-java-agent:instrumentation:ratpack-1.4'
include ':dd-java-agent:instrumentation:rxjava-1'
include ':dd-java-agent:instrumentation:reactor-core-3.1' include ':dd-java-agent:instrumentation:reactor-core-3.1'
include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-2'
include ':dd-java-agent:instrumentation:servlet-3' include ':dd-java-agent:instrumentation:servlet-3'