Merge pull request #282 from DataDog/ark/async-refactor
Toggleable Async Propgation on the trace context
This commit is contained in:
commit
c7cf1cf36d
|
@ -31,7 +31,7 @@ public class ClassLoaderMatcher {
|
|||
}
|
||||
|
||||
public static ElementMatcher.Junction.AbstractBase<ClassLoader> classLoaderHasClassWithMethod(
|
||||
final String className, final String methodName, final Class... methodArgs) {
|
||||
final String className, final String methodName, final String... methodArgs) {
|
||||
return new ClassLoaderHasClassWithMethodMatcher(className, methodName, methodArgs);
|
||||
}
|
||||
|
||||
|
@ -205,10 +205,10 @@ public class ClassLoaderMatcher {
|
|||
|
||||
private final String className;
|
||||
private final String methodName;
|
||||
private final Class[] methodArgs;
|
||||
private final String[] methodArgs;
|
||||
|
||||
private ClassLoaderHasClassWithMethodMatcher(
|
||||
final String className, final String methodName, final Class... methodArgs) {
|
||||
final String className, final String methodName, final String... methodArgs) {
|
||||
this.className = className;
|
||||
this.methodName = methodName;
|
||||
this.methodArgs = methodArgs;
|
||||
|
@ -223,10 +223,14 @@ public class ClassLoaderMatcher {
|
|||
}
|
||||
try {
|
||||
final Class<?> aClass = Class.forName(className, false, target);
|
||||
final Class[] methodArgsClasses = new Class[methodArgs.length];
|
||||
for (int i = 0; i < methodArgs.length; ++i) {
|
||||
methodArgsClasses[i] = target.loadClass(methodArgs[i]);
|
||||
}
|
||||
if (aClass.isInterface()) {
|
||||
aClass.getMethod(methodName, methodArgs);
|
||||
aClass.getMethod(methodName, methodArgsClasses);
|
||||
} else {
|
||||
aClass.getDeclaredMethod(methodName, methodArgs);
|
||||
aClass.getDeclaredMethod(methodName, methodArgsClasses);
|
||||
}
|
||||
cache.put(target, true);
|
||||
return true;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import datadog.trace.api.Trace
|
||||
import datadog.trace.context.TraceScope
|
||||
import akka.pattern.ask
|
||||
import io.opentracing.util.GlobalTracer
|
||||
|
||||
|
@ -32,18 +33,21 @@ class AkkaActors {
|
|||
|
||||
@Trace
|
||||
def basicTell() : Unit = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
howdyGreeter ! WhoToGreet("Akka")
|
||||
howdyGreeter ! Greet
|
||||
}
|
||||
|
||||
@Trace
|
||||
def basicAsk() : Unit = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
howdyGreeter ! WhoToGreet("Akka")
|
||||
howdyGreeter ? Greet
|
||||
}
|
||||
|
||||
@Trace
|
||||
def basicForward() : Unit = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
helloGreeter ! WhoToGreet("Akka")
|
||||
helloGreeter ? Greet
|
||||
}
|
||||
|
|
|
@ -7,4 +7,7 @@ dependencies {
|
|||
compile deps.bytebuddy
|
||||
compile deps.opentracing
|
||||
compile deps.autoservice
|
||||
|
||||
testCompile project(':dd-java-agent:testing')
|
||||
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
apply from: "${rootDir}/gradle/java.gradle"
|
||||
apply plugin: 'scala'
|
||||
apply from: "${rootDir}/gradle/test-with-scala.gradle"
|
||||
|
||||
dependencies {
|
||||
compile project(':dd-trace-api')
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
|
||||
class ScalaInstrumentationTest extends AgentTestRunner {
|
||||
static {
|
||||
System.setProperty("dd.integration.java_concurrent.enabled", "true")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterTest() {
|
||||
// Ignore failures to instrument sun proxy classes
|
||||
}
|
||||
|
||||
def "scala futures and callbacks"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
|
||||
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala propagates across futures with no traces"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
|
||||
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala either promise completion"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithPromises()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
|
||||
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala first completed future"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
private DDSpan findSpan(List<DDSpan> trace, String opName) {
|
||||
for (DDSpan span : trace) {
|
||||
if (span.getOperationName() == opName) {
|
||||
return span
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,8 +1,9 @@
|
|||
import datadog.trace.api.Trace
|
||||
import datadog.trace.context.TraceScope
|
||||
import io.opentracing.util.GlobalTracer
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, Future, Promise}
|
||||
|
||||
class ScalaConcurrentTests {
|
||||
|
@ -12,6 +13,7 @@ class ScalaConcurrentTests {
|
|||
*/
|
||||
@Trace
|
||||
def traceWithFutureAndCallbacks() : Integer = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
val goodFuture: Future[Integer] = Future {
|
||||
tracedChild("goodFuture")
|
||||
1
|
||||
|
@ -32,6 +34,7 @@ class ScalaConcurrentTests {
|
|||
|
||||
@Trace
|
||||
def tracedAcrossThreadsWithNoTrace() :Integer = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
val goodFuture: Future[Integer] = Future {
|
||||
1
|
||||
}
|
||||
|
@ -51,6 +54,7 @@ class ScalaConcurrentTests {
|
|||
*/
|
||||
@Trace
|
||||
def traceWithPromises() : Integer = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
val keptPromise = Promise[Boolean]()
|
||||
val brokenPromise = Promise[Boolean]()
|
||||
val afterPromise = keptPromise.future
|
||||
|
@ -83,6 +87,7 @@ class ScalaConcurrentTests {
|
|||
*/
|
||||
@Trace
|
||||
def tracedWithFutureFirstCompletions() :Integer = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
val completedVal = Future.firstCompletedOf(
|
||||
List(
|
||||
Future {
|
||||
|
@ -106,6 +111,7 @@ class ScalaConcurrentTests {
|
|||
*/
|
||||
@Trace
|
||||
def tracedTimeout(): Integer = {
|
||||
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
|
||||
val f: Future[String] = Future {
|
||||
tracedChild("timeoutChild")
|
||||
while(true) {
|
|
@ -140,7 +140,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
public static DatadogWrapper wrapJob(
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) {
|
||||
if (scope instanceof TraceScope
|
||||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& task != null
|
||||
&& !(task instanceof DatadogWrapper)) {
|
||||
task = new RunnableWrapper(task, (TraceScope) scope);
|
||||
return (RunnableWrapper) task;
|
||||
}
|
||||
|
@ -161,7 +164,10 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
public static DatadogWrapper wrapJob(
|
||||
@Advice.Argument(value = 0, readOnly = false) Callable task) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) {
|
||||
if (scope instanceof TraceScope
|
||||
&& ((TraceScope) scope).isAsyncPropagating()
|
||||
&& task != null
|
||||
&& !(task instanceof DatadogWrapper)) {
|
||||
task = new CallableWrapper(task, (TraceScope) scope);
|
||||
return (CallableWrapper) task;
|
||||
}
|
||||
|
@ -182,7 +188,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
public static Collection<?> wrapJob(
|
||||
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
if (scope instanceof TraceScope) {
|
||||
if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating()) {
|
||||
Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
|
||||
for (Callable task : tasks) {
|
||||
if (task != null) {
|
||||
|
@ -217,13 +223,13 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
protected final TraceScope.Continuation continuation;
|
||||
|
||||
public DatadogWrapper(TraceScope scope) {
|
||||
continuation = scope.capture(true);
|
||||
continuation = scope.capture();
|
||||
log.debug("created continuation {} from scope {}", continuation, scope);
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
if (null != continuation) {
|
||||
continuation.activate().close();
|
||||
continuation.close();
|
||||
log.debug("canceled continuation {}", continuation);
|
||||
}
|
||||
}
|
||||
|
@ -241,6 +247,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
@Override
|
||||
public void run() {
|
||||
final TraceScope context = continuation.activate();
|
||||
context.setAsyncPropagation(true);
|
||||
try {
|
||||
delegatee.run();
|
||||
} finally {
|
||||
|
@ -261,6 +268,7 @@ public final class ExecutorInstrumentation extends Instrumenter.Configurable {
|
|||
@Override
|
||||
public T call() throws Exception {
|
||||
final TraceScope context = continuation.activate();
|
||||
context.setAsyncPropagation(true);
|
||||
try {
|
||||
return delegatee.call();
|
||||
} finally {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.scopemanager.ContinuableScope
|
||||
import datadog.trace.agent.test.AgentTestRunner
|
||||
import datadog.trace.api.Trace
|
||||
import io.opentracing.util.GlobalTracer
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Unroll
|
||||
|
||||
|
@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
// this child will have a span
|
||||
m.invoke(pool, new AsyncChild())
|
||||
// this child won't
|
||||
|
@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
@Override
|
||||
@Trace(operationName = "parent")
|
||||
void run() {
|
||||
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
|
||||
try {
|
||||
for (int i = 0; i < 20; ++ i) {
|
||||
Future f = pool.submit((Callable)child)
|
||||
|
@ -118,73 +122,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
|
|||
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | _
|
||||
new ScheduledThreadPoolExecutor(1) | _
|
||||
}
|
||||
|
||||
def "scala futures and callbacks"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
|
||||
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala propagates across futures with no traces"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
|
||||
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala either promise completion"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.traceWithPromises()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
|
||||
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
def "scala first completed future"() {
|
||||
setup:
|
||||
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
|
||||
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
|
||||
TEST_WRITER.waitForTraces(1)
|
||||
List<DDSpan> trace = TEST_WRITER.get(0)
|
||||
|
||||
expect:
|
||||
TEST_WRITER.size() == 1
|
||||
trace.size() == expectedNumberOfSpans
|
||||
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
|
||||
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
|
||||
}
|
||||
|
||||
private DDSpan findSpan(List<DDSpan> trace, String opName) {
|
||||
for (DDSpan span : trace) {
|
||||
if (span.getOperationName() == opName) {
|
||||
return span
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
import datadog.trace.api.Trace;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class AsyncChild implements Runnable, Callable {
|
||||
private final AtomicBoolean blockThread;
|
||||
private final boolean doTraceableWork;
|
||||
private final AtomicInteger numberOfWorkers = new AtomicInteger(0);
|
||||
|
||||
public AsyncChild() {
|
||||
this(true, false);
|
||||
}
|
||||
|
||||
public AsyncChild(boolean doTraceableWork, boolean blockThread) {
|
||||
this.doTraceableWork = doTraceableWork;
|
||||
this.blockThread = new AtomicBoolean(blockThread);
|
||||
}
|
||||
|
||||
public void unblock() {
|
||||
blockThread.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
runImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
runImpl();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void runImpl() {
|
||||
if (doTraceableWork) {
|
||||
asyncChild();
|
||||
}
|
||||
numberOfWorkers.getAndIncrement();
|
||||
try {
|
||||
while (blockThread.get()) {
|
||||
// busy-wait to block thread
|
||||
}
|
||||
} finally {
|
||||
numberOfWorkers.getAndDecrement();
|
||||
}
|
||||
}
|
||||
|
||||
@Trace(operationName = "asyncChild")
|
||||
private void asyncChild() {}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
logs/
|
|
@ -58,7 +58,7 @@ class Play26Test extends AgentTestRunner {
|
|||
root.traceId == 123
|
||||
root.parentId == 456
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/helloplay/:from"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /helloplay/:from"
|
||||
!root.context().getErrorFlag()
|
||||
root.context().tags["http.status_code"] == 200
|
||||
|
@ -85,7 +85,7 @@ class Play26Test extends AgentTestRunner {
|
|||
response.code() == 500
|
||||
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/make-error"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /make-error"
|
||||
root.context().getErrorFlag()
|
||||
root.context().tags["http.status_code"] == 500
|
||||
|
@ -116,7 +116,7 @@ class Play26Test extends AgentTestRunner {
|
|||
root.context().tags["error.type"] == RuntimeException.getName()
|
||||
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/exception"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /exception"
|
||||
root.context().tags["http.status_code"] == 500
|
||||
root.context().tags["http.url"] == "/exception"
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.google.auto.service.AutoService;
|
|||
import datadog.trace.agent.tooling.*;
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.SpanContext;
|
||||
|
@ -77,9 +78,7 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
|
|||
public static class PlayAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope startSpan(@Advice.Argument(0) final Request req) {
|
||||
// TODO
|
||||
// begin tracking across threads
|
||||
|
||||
final Scope scope;
|
||||
if (GlobalTracer.get().activeSpan() == null) {
|
||||
final SpanContext extractedContext;
|
||||
if (GlobalTracer.get().scopeManager().active() == null) {
|
||||
|
@ -88,15 +87,21 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
|
|||
} else {
|
||||
extractedContext = null;
|
||||
}
|
||||
return GlobalTracer.get()
|
||||
.buildSpan("play.request")
|
||||
.asChildOf(extractedContext)
|
||||
.startActive(false);
|
||||
scope =
|
||||
GlobalTracer.get()
|
||||
.buildSpan("play.request")
|
||||
.asChildOf(extractedContext)
|
||||
.startActive(false);
|
||||
} else {
|
||||
// An upstream framework (e.g. akka-http, netty) has already started the span.
|
||||
// Do not extract the context.
|
||||
return GlobalTracer.get().buildSpan("play.request").startActive(false);
|
||||
scope = GlobalTracer.get().buildSpan("play.request").startActive(false);
|
||||
}
|
||||
|
||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
||||
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true);
|
||||
}
|
||||
return scope;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
|
@ -111,7 +116,6 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
|
|||
if (!pathOption.isEmpty()) {
|
||||
final String path = (String) pathOption.get();
|
||||
scope.span().setTag(Tags.HTTP_URL.getKey(), path);
|
||||
scope.span().setOperationName(path);
|
||||
scope.span().setTag(DDTags.RESOURCE_NAME, req.method() + " " + path);
|
||||
}
|
||||
|
||||
|
@ -169,6 +173,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
|
|||
@Override
|
||||
public Object apply(Throwable t, boolean isCheck) throws Exception {
|
||||
try {
|
||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
||||
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
|
||||
}
|
||||
onError(span, t);
|
||||
} catch (Throwable t2) {
|
||||
LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t);
|
||||
|
@ -194,8 +201,9 @@ public final class PlayInstrumentation extends Instrumenter.Configurable {
|
|||
|
||||
@Override
|
||||
public Result apply(Result result) {
|
||||
// TODO
|
||||
// stop tracking across threads
|
||||
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {
|
||||
((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false);
|
||||
}
|
||||
try {
|
||||
Tags.HTTP_STATUS.set(span, result.header().status());
|
||||
} catch (Throwable t) {
|
||||
|
|
|
@ -58,7 +58,7 @@ class Play24Test extends AgentTestRunner {
|
|||
root.traceId == 123
|
||||
root.parentId == 456
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/helloplay/:from"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /helloplay/:from"
|
||||
!root.context().getErrorFlag()
|
||||
root.context().tags["http.status_code"] == 200
|
||||
|
@ -85,7 +85,7 @@ class Play24Test extends AgentTestRunner {
|
|||
response.code() == 500
|
||||
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/make-error"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /make-error"
|
||||
root.context().getErrorFlag()
|
||||
root.context().tags["http.status_code"] == 500
|
||||
|
@ -116,7 +116,7 @@ class Play24Test extends AgentTestRunner {
|
|||
root.context().tags["error.type"] == RuntimeException.getName()
|
||||
|
||||
root.serviceName == "unnamed-java-app"
|
||||
root.operationName == "/exception"
|
||||
root.operationName == "play.request"
|
||||
root.resourceName == "GET /exception"
|
||||
root.context().tags["http.status_code"] == 500
|
||||
root.context().tags["http.url"] == "/exception"
|
||||
|
|
|
@ -8,11 +8,21 @@ public interface TraceScope {
|
|||
*
|
||||
* <p>Should be called on the parent thread.
|
||||
*/
|
||||
Continuation capture(boolean finishOnClose);
|
||||
Continuation capture();
|
||||
|
||||
/** Close the activated context and allow any underlying spans to finish. */
|
||||
void close();
|
||||
|
||||
/** If true, this context will propagate across async boundaries. */
|
||||
boolean isAsyncPropagating();
|
||||
|
||||
/**
|
||||
* Enable or disable async propagation. Async propagation is initially set to false.
|
||||
*
|
||||
* @param value The new propagation value. True == propagate. False == don't propagate.
|
||||
*/
|
||||
void setAsyncPropagation(boolean value);
|
||||
|
||||
/** Used to pass async context between workers. */
|
||||
interface Continuation {
|
||||
/**
|
||||
|
@ -21,5 +31,8 @@ public interface TraceScope {
|
|||
* <p>Should be called on the child thread.
|
||||
*/
|
||||
TraceScope activate();
|
||||
|
||||
/** Cancel the continuation. */
|
||||
void close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package datadog.opentracing.scopemanager;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.ScopeManager;
|
||||
import io.opentracing.Span;
|
||||
|
@ -17,7 +18,11 @@ public class ContextualScopeManager implements ScopeManager {
|
|||
return context.activate(span, finishOnClose);
|
||||
}
|
||||
}
|
||||
return new ContinuableScope(this, span, finishOnClose);
|
||||
if (span instanceof DDSpan) {
|
||||
return new ContinuableScope(this, (DDSpan) span, finishOnClose);
|
||||
} else {
|
||||
return new SimpleScope(this, span, finishOnClose);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
package datadog.opentracing.scopemanager;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.opentracing.DDSpanContext;
|
||||
import datadog.opentracing.PendingTrace;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import io.opentracing.noop.NoopScopeManager;
|
||||
import java.io.Closeable;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -14,26 +13,40 @@ import lombok.extern.slf4j.Slf4j;
|
|||
|
||||
@Slf4j
|
||||
public class ContinuableScope implements Scope, TraceScope {
|
||||
final ContextualScopeManager scopeManager;
|
||||
final AtomicInteger refCount;
|
||||
private final Span wrapped;
|
||||
/** ScopeManager holding the thread-local to this scope. */
|
||||
private final ContextualScopeManager scopeManager;
|
||||
/**
|
||||
* Span contained by this scope. Async scopes will hold a reference to the parent scope's span.
|
||||
*/
|
||||
private final DDSpan spanUnderScope;
|
||||
/** If true, finish the span when openCount hits 0. */
|
||||
private final boolean finishOnClose;
|
||||
/** Count of open scope and continuations */
|
||||
private final AtomicInteger openCount;
|
||||
/** Scope to placed in the thread local after close. May be null. */
|
||||
private final Scope toRestore;
|
||||
/** Continuation that created this scope. May be null. */
|
||||
private final Continuation continuation;
|
||||
/** Flag to propagate this scope across async boundaries. */
|
||||
private final AtomicBoolean isAsyncPropagating = new AtomicBoolean(false);
|
||||
|
||||
ContinuableScope(
|
||||
final ContextualScopeManager scopeManager, final Span wrapped, final boolean finishOnClose) {
|
||||
this(scopeManager, new AtomicInteger(1), wrapped, finishOnClose);
|
||||
final ContextualScopeManager scopeManager,
|
||||
final DDSpan spanUnderScope,
|
||||
final boolean finishOnClose) {
|
||||
this(scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose);
|
||||
}
|
||||
|
||||
private ContinuableScope(
|
||||
final ContextualScopeManager scopeManager,
|
||||
final AtomicInteger refCount,
|
||||
final Span wrapped,
|
||||
final AtomicInteger openCount,
|
||||
final Continuation continuation,
|
||||
final DDSpan spanUnderScope,
|
||||
final boolean finishOnClose) {
|
||||
|
||||
this.scopeManager = scopeManager;
|
||||
this.refCount = refCount;
|
||||
this.wrapped = wrapped;
|
||||
this.openCount = openCount;
|
||||
this.continuation = continuation;
|
||||
this.spanUnderScope = spanUnderScope;
|
||||
this.finishOnClose = finishOnClose;
|
||||
this.toRestore = scopeManager.tlsScope.get();
|
||||
scopeManager.tlsScope.set(this);
|
||||
|
@ -41,30 +54,45 @@ public class ContinuableScope implements Scope, TraceScope {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
if (scopeManager.tlsScope.get() != this) {
|
||||
return;
|
||||
if (null != continuation) {
|
||||
spanUnderScope.context().getTrace().cancelContinuation(continuation);
|
||||
}
|
||||
|
||||
if (refCount.decrementAndGet() == 0 && finishOnClose) {
|
||||
wrapped.finish();
|
||||
if (openCount.decrementAndGet() == 0 && finishOnClose) {
|
||||
spanUnderScope.finish();
|
||||
}
|
||||
|
||||
scopeManager.tlsScope.set(toRestore);
|
||||
if (scopeManager.tlsScope.get() == this) {
|
||||
scopeManager.tlsScope.set(toRestore);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span span() {
|
||||
return wrapped;
|
||||
public DDSpan span() {
|
||||
return spanUnderScope;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAsyncPropagating() {
|
||||
return isAsyncPropagating.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAsyncPropagation(boolean value) {
|
||||
isAsyncPropagating.set(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* The continuation returned should be closed after the associa
|
||||
* The continuation returned must be closed or activated or the trace will not finish.
|
||||
*
|
||||
* @param finishOnClose
|
||||
* @return
|
||||
* @return The new continuation, or null if this scope is not async propagating.
|
||||
*/
|
||||
public Continuation capture(final boolean finishOnClose) {
|
||||
return new Continuation(this.finishOnClose && finishOnClose);
|
||||
public Continuation capture() {
|
||||
if (isAsyncPropagating()) {
|
||||
return new Continuation();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public class Continuation implements Closeable, TraceScope.Continuation {
|
||||
|
@ -72,72 +100,32 @@ public class ContinuableScope implements Scope, TraceScope {
|
|||
|
||||
private final AtomicBoolean used = new AtomicBoolean(false);
|
||||
private final PendingTrace trace;
|
||||
private final boolean finishSpanOnClose;
|
||||
|
||||
private Continuation(final boolean finishOnClose) {
|
||||
this.finishSpanOnClose = finishOnClose;
|
||||
refCount.incrementAndGet();
|
||||
if (wrapped.context() instanceof DDSpanContext) {
|
||||
final DDSpanContext context = (DDSpanContext) wrapped.context();
|
||||
trace = context.getTrace();
|
||||
trace.registerContinuation(this);
|
||||
} else {
|
||||
trace = null;
|
||||
}
|
||||
private Continuation() {
|
||||
openCount.incrementAndGet();
|
||||
final DDSpanContext context = (DDSpanContext) spanUnderScope.context();
|
||||
trace = context.getTrace();
|
||||
trace.registerContinuation(this);
|
||||
}
|
||||
|
||||
public ClosingScope activate() {
|
||||
public ContinuableScope activate() {
|
||||
if (used.compareAndSet(false, true)) {
|
||||
for (final ScopeContext context : scopeManager.scopeContexts) {
|
||||
if (context.inContext()) {
|
||||
return new ClosingScope(context.activate(wrapped, finishSpanOnClose));
|
||||
}
|
||||
}
|
||||
return new ClosingScope(
|
||||
new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose));
|
||||
return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose);
|
||||
} else {
|
||||
log.debug("Reusing a continuation not allowed. Returning no-op scope.");
|
||||
return new ClosingScope(NoopScopeManager.NoopScope.INSTANCE);
|
||||
log.debug(
|
||||
"Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked.");
|
||||
return new ContinuableScope(
|
||||
scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
used.getAndSet(true);
|
||||
if (trace != null) {
|
||||
if (used.compareAndSet(false, true)) {
|
||||
trace.cancelContinuation(this);
|
||||
}
|
||||
}
|
||||
|
||||
private class ClosingScope implements Scope, TraceScope {
|
||||
private final Scope wrappedScope;
|
||||
|
||||
private ClosingScope(final Scope wrappedScope) {
|
||||
this.wrappedScope = wrappedScope;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Continuation capture(boolean finishOnClose) {
|
||||
if (wrappedScope instanceof TraceScope) {
|
||||
return ((TraceScope) wrappedScope).capture(finishOnClose);
|
||||
} else {
|
||||
log.debug(
|
||||
"{} Failed to capture. ClosingScope does not wrap a TraceScope: {}.",
|
||||
this,
|
||||
wrappedScope);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
wrappedScope.close();
|
||||
ContinuableScope.Continuation.this.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span span() {
|
||||
return wrappedScope.span();
|
||||
ContinuableScope.this.close();
|
||||
} else {
|
||||
log.debug("Failed to close continuation {}. Already used.", this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package datadog.opentracing.scopemanager;
|
||||
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
|
||||
/** Simple scope implementation which does not propagate across threads. */
|
||||
public class SimpleScope implements Scope {
|
||||
private final ContextualScopeManager scopeManager;
|
||||
private final Span spanUnderScope;
|
||||
private final boolean finishOnClose;
|
||||
private final Scope toRestore;
|
||||
|
||||
public SimpleScope(
|
||||
final ContextualScopeManager scopeManager,
|
||||
final Span spanUnderScope,
|
||||
final boolean finishOnClose) {
|
||||
this.scopeManager = scopeManager;
|
||||
this.spanUnderScope = spanUnderScope;
|
||||
this.finishOnClose = finishOnClose;
|
||||
this.toRestore = scopeManager.tlsScope.get();
|
||||
scopeManager.tlsScope.set(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (finishOnClose) {
|
||||
spanUnderScope.finish();
|
||||
}
|
||||
|
||||
if (scopeManager.tlsScope.get() == this) {
|
||||
scopeManager.tlsScope.set(toRestore);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span span() {
|
||||
return spanUnderScope;
|
||||
}
|
||||
}
|
|
@ -36,7 +36,7 @@ public class DDAgentWriter implements Writer {
|
|||
public static final int DEFAULT_PORT = 8126;
|
||||
|
||||
/** Maximum number of traces kept in memory */
|
||||
static final int DEFAULT_MAX_TRACES = 1000;
|
||||
static final int DEFAULT_MAX_TRACES = 7000;
|
||||
|
||||
/** Timeout for the API in seconds */
|
||||
static final long API_TIMEOUT_SECONDS = 1;
|
||||
|
|
|
@ -92,11 +92,31 @@ class ScopeManagerTest extends Specification {
|
|||
finishSpan << [true, false]
|
||||
}
|
||||
|
||||
def "ref counting scope doesn't close if non-zero"() {
|
||||
def "ContinuableScope only creates continuations when propagation is set"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(true)
|
||||
def continuation = scope.capture(true)
|
||||
def continuation = scope.capture()
|
||||
|
||||
expect:
|
||||
continuation == null
|
||||
|
||||
when:
|
||||
scope.setAsyncPropagation(true)
|
||||
continuation = scope.capture()
|
||||
then:
|
||||
continuation != null
|
||||
|
||||
cleanup:
|
||||
continuation.close()
|
||||
}
|
||||
|
||||
def "ContinuableScope doesn't close if non-zero"() {
|
||||
setup:
|
||||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
|
||||
expect:
|
||||
!spanFinished(scope.span())
|
||||
|
@ -149,7 +169,8 @@ class ScopeManagerTest extends Specification {
|
|||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(false)
|
||||
def span = scope.span()
|
||||
def continuation = scope.capture(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
scope.close()
|
||||
span.finish()
|
||||
|
||||
|
@ -186,9 +207,10 @@ class ScopeManagerTest extends Specification {
|
|||
def parentScope = tracer.buildSpan("parent").startActive(true)
|
||||
def parentSpan = parentScope.span()
|
||||
ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
||||
childScope.setAsyncPropagation(true)
|
||||
def childSpan = childScope.span()
|
||||
|
||||
def continuation = childScope.capture(true)
|
||||
def continuation = childScope.capture()
|
||||
childScope.close()
|
||||
|
||||
expect:
|
||||
|
@ -209,11 +231,12 @@ class ScopeManagerTest extends Specification {
|
|||
|
||||
when:
|
||||
def newScope = continuation.activate()
|
||||
def newContinuation = newScope.capture(true)
|
||||
newScope.setAsyncPropagation(true)
|
||||
def newContinuation = newScope.capture()
|
||||
|
||||
then:
|
||||
newScope instanceof ContinuableScope.Continuation.ClosingScope
|
||||
scopeManager.active() == newScope.wrappedScope
|
||||
newScope instanceof ContinuableScope
|
||||
scopeManager.active() == newScope
|
||||
newScope != childScope && newScope != parentScope
|
||||
newScope.span() == childSpan
|
||||
!spanFinished(childSpan)
|
||||
|
@ -236,16 +259,17 @@ class ScopeManagerTest extends Specification {
|
|||
def builder = tracer.buildSpan("test")
|
||||
def scope = (ContinuableScope) builder.startActive(false)
|
||||
def span = scope.span()
|
||||
def continuation = scope.capture(false)
|
||||
scope.setAsyncPropagation(true)
|
||||
def continuation = scope.capture()
|
||||
scope.close()
|
||||
span.finish()
|
||||
|
||||
def newScope = continuation.activate()
|
||||
|
||||
expect:
|
||||
newScope instanceof ContinuableScope.Continuation.ClosingScope
|
||||
newScope instanceof ContinuableScope
|
||||
newScope != scope
|
||||
scopeManager.active() == newScope.wrappedScope
|
||||
scopeManager.active() == newScope
|
||||
spanFinished(span)
|
||||
writer == []
|
||||
|
||||
|
@ -259,24 +283,7 @@ class ScopeManagerTest extends Specification {
|
|||
scopeManager.active() == null
|
||||
spanFinished(childSpan)
|
||||
childSpan.context().parentId == span.context().spanId
|
||||
writer == []
|
||||
|
||||
when:
|
||||
if (closeScope) {
|
||||
newScope.close()
|
||||
}
|
||||
if (closeContinuation) {
|
||||
continuation.close()
|
||||
}
|
||||
|
||||
then:
|
||||
writer == [[childSpan, span]]
|
||||
|
||||
where:
|
||||
closeScope | closeContinuation
|
||||
true | false
|
||||
false | true
|
||||
true | true
|
||||
}
|
||||
|
||||
@Unroll
|
||||
|
@ -327,40 +334,28 @@ class ScopeManagerTest extends Specification {
|
|||
[new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "threadlocal to context with capture (#active)"() {
|
||||
def "ContinuableScope put in threadLocal after continuation activation"() {
|
||||
setup:
|
||||
contexts.each {
|
||||
scopeManager.addScopeContext(it)
|
||||
}
|
||||
ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true)
|
||||
scope.setAsyncPropagation(true)
|
||||
|
||||
expect:
|
||||
scopeManager.tlsScope.get() == scope
|
||||
|
||||
when:
|
||||
def cont = scope.capture(true)
|
||||
def cont = scope.capture()
|
||||
scope.close()
|
||||
|
||||
then:
|
||||
scopeManager.tlsScope.get() == null
|
||||
|
||||
when:
|
||||
active.each {
|
||||
((AtomicBoolean) contexts[it].enabled).set(true)
|
||||
}
|
||||
cont.activate()
|
||||
scopeManager.addScopeContext(new AtomicReferenceScope(true))
|
||||
def newScope = cont.activate()
|
||||
|
||||
then:
|
||||
scopeManager.tlsScope.get() == null
|
||||
|
||||
where:
|
||||
active | contexts
|
||||
[0] | [new AtomicReferenceScope(false)]
|
||||
[0] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
||||
[1] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
||||
[2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
||||
[0, 2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)]
|
||||
newScope != scope
|
||||
scopeManager.tlsScope.get() == newScope
|
||||
}
|
||||
|
||||
@Unroll
|
||||
|
|
Loading…
Reference in New Issue