From b69f9ea0415499c1bdacd1d0375b2de4b72fae8a Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 15 Aug 2019 17:04:14 -0700 Subject: [PATCH] Enable async propagation for grpc client callbacks. --- .../grpc/client/TracingClientInterceptor.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java index bdb1a97faf..3b40273490 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/TracingClientInterceptor.java @@ -3,6 +3,7 @@ package datadog.trace.instrumentation.grpc.client; import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.DECORATE; import datadog.trace.api.DDTags; +import datadog.trace.context.TraceScope; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -39,6 +40,9 @@ public class TracingClientInterceptor implements ClientInterceptor { .start(); try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { DECORATE.afterStart(span); + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } final ClientCall result; try { @@ -71,7 +75,10 @@ public class TracingClientInterceptor implements ClientInterceptor { public void start(final Listener responseListener, final Metadata headers) { tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers)); - try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers); } catch (final Throwable e) { DECORATE.onError(span, e); @@ -83,7 +90,10 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void sendMessage(final ReqT message) { - try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } super.sendMessage(message); } catch (final Throwable e) { DECORATE.onError(span, e); @@ -114,6 +124,9 @@ public class TracingClientInterceptor implements ClientInterceptor { .asChildOf(span) .withTag("message.type", message.getClass().getName()) .startActive(true); + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } final Span messageSpan = scope.span(); DECORATE.afterStart(messageSpan); try { @@ -131,7 +144,10 @@ public class TracingClientInterceptor implements ClientInterceptor { public void onClose(final Status status, final Metadata trailers) { DECORATE.onClose(span, status); // Finishes span. - try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } delegate().onClose(status, trailers); } catch (final Throwable e) { DECORATE.onError(span, e); @@ -144,7 +160,10 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void onReady() { - try (final Scope ignored = tracer.scopeManager().activate(span, false)) { + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } delegate().onReady(); } catch (final Throwable e) { DECORATE.onError(span, e);