Update reactor-core-3.1 to new agent api

This commit is contained in:
Trask Stalnaker 2019-10-19 11:57:47 -07:00
parent 9775ae5b2f
commit 4e4a9c9bb7
3 changed files with 25 additions and 27 deletions

View File

@ -1,9 +1,9 @@
package datadog.trace.instrumentation.reactor.core;
import datadog.trace.context.TraceScope;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import reactor.core.CoreSubscriber;
@ -18,15 +18,15 @@ import reactor.core.CoreSubscriber;
public class FluxAndMonoSubscribeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
public static AgentScope methodEnter(
@Advice.Argument(0) final CoreSubscriber subscriber, @Advice.This final Object thiz) {
final Span span =
final AgentSpan span =
subscriber
.currentContext()
.getOrDefault(ReactorCoreAdviceUtils.PUBLISHER_CONTEXT_KEY, null);
if (span != null) {
final Scope scope = GlobalTracer.get().scopeManager().activate(span, false);
((TraceScope) scope).setAsyncPropagation(true);
final AgentScope scope = activateSpan(span, false);
scope.setAsyncPropagation(true);
return scope;
}
return null;
@ -34,7 +34,7 @@ public class FluxAndMonoSubscribeAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
@Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
if (throwable != null) {
ReactorCoreAdviceUtils.finishSpanIfPresent(scope.span(), throwable);
}

View File

@ -1,11 +1,8 @@
package datadog.trace.instrumentation.reactor.core;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static reactor.core.publisher.Operators.lift;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import datadog.trace.instrumentation.api.AgentSpan;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
@ -21,12 +18,12 @@ public class ReactorCoreAdviceUtils {
public static final String PUBLISHER_CONTEXT_KEY =
"datadog.trace.instrumentation.reactor.core.Span";
public static <T> Mono<T> setPublisherSpan(final Mono<T> mono, final Span span) {
public static <T> Mono<T> setPublisherSpan(final Mono<T> mono, final AgentSpan span) {
return mono.<T>transform(finishSpanNextOrError())
.subscriberContext(Context.of(PUBLISHER_CONTEXT_KEY, span));
}
public static <T> Flux<T> setPublisherSpan(final Flux<T> flux, final Span span) {
public static <T> Flux<T> setPublisherSpan(final Flux<T> flux, final AgentSpan span) {
return flux.<T>transform(finishSpanNextOrError())
.subscriberContext(Context.of(PUBLISHER_CONTEXT_KEY, span));
}
@ -42,14 +39,14 @@ public class ReactorCoreAdviceUtils {
}
public static void finishSpanIfPresent(final Context context, final Throwable throwable) {
finishSpanIfPresent(context.getOrDefault(PUBLISHER_CONTEXT_KEY, (Span) null), throwable);
finishSpanIfPresent(context.getOrDefault(PUBLISHER_CONTEXT_KEY, (AgentSpan) null), throwable);
}
public static void finishSpanIfPresent(final Span span, final Throwable throwable) {
public static void finishSpanIfPresent(final AgentSpan span, final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.setError(true);
span.addThrowable(throwable);
}
span.finish();
}

View File

@ -1,9 +1,8 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace
import datadog.trace.instrumentation.api.AgentSpan
import datadog.trace.instrumentation.reactor.core.ReactorCoreAdviceUtils
import io.opentracing.Scope
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
@ -12,6 +11,8 @@ import spock.lang.Shared
import java.time.Duration
import static datadog.trace.instrumentation.api.AgentTracer.startSpan
class ReactorCoreTest extends AgentTestRunner {
public static final String EXCEPTION_MESSAGE = "test exception"
@ -207,9 +208,9 @@ class ReactorCoreTest extends AgentTestRunner {
// We have a 'trace-parent' that covers whole span and then we have publisher-parent that overs only
// operation to create publisher (and set its context).
// The expectation is that then publisher is executed under 'publisher-parent', not under 'trace-parent'
final Scope scope = GlobalTracer.get().buildSpan("publisher-parent").startActive(true)
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, scope.span())
scope.close()
final AgentSpan span = startSpan("publisher-parent")
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, span)
span.finish()
// Read all data from publisher
if (publisher instanceof Mono) {
@ -223,9 +224,9 @@ class ReactorCoreTest extends AgentTestRunner {
@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def cancelUnderTrace(def publisher) {
final Scope scope = GlobalTracer.get().buildSpan("publisher-parent").startActive(true)
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, scope.span())
scope.close()
final AgentSpan span = startSpan("publisher-parent")
publisher = ReactorCoreAdviceUtils.setPublisherSpan(publisher, span)
span.finish()
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {