Enable async propagation for grpc client callbacks.
This commit is contained in:
parent
ab623ab7b4
commit
b69f9ea041
|
@ -3,6 +3,7 @@ package datadog.trace.instrumentation.grpc.client;
|
||||||
import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.DECORATE;
|
import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.DECORATE;
|
||||||
|
|
||||||
import datadog.trace.api.DDTags;
|
import datadog.trace.api.DDTags;
|
||||||
|
import datadog.trace.context.TraceScope;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.Channel;
|
import io.grpc.Channel;
|
||||||
import io.grpc.ClientCall;
|
import io.grpc.ClientCall;
|
||||||
|
@ -39,6 +40,9 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
.start();
|
.start();
|
||||||
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
|
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
|
||||||
DECORATE.afterStart(span);
|
DECORATE.afterStart(span);
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
|
|
||||||
final ClientCall<ReqT, RespT> result;
|
final ClientCall<ReqT, RespT> result;
|
||||||
try {
|
try {
|
||||||
|
@ -71,7 +75,10 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
public void start(final Listener<RespT> responseListener, final Metadata headers) {
|
public void start(final Listener<RespT> responseListener, final Metadata headers) {
|
||||||
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers));
|
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers));
|
||||||
|
|
||||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers);
|
super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers);
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
DECORATE.onError(span, e);
|
DECORATE.onError(span, e);
|
||||||
|
@ -83,7 +90,10 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMessage(final ReqT message) {
|
public void sendMessage(final ReqT message) {
|
||||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
super.sendMessage(message);
|
super.sendMessage(message);
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
DECORATE.onError(span, e);
|
DECORATE.onError(span, e);
|
||||||
|
@ -114,6 +124,9 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
.asChildOf(span)
|
.asChildOf(span)
|
||||||
.withTag("message.type", message.getClass().getName())
|
.withTag("message.type", message.getClass().getName())
|
||||||
.startActive(true);
|
.startActive(true);
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
final Span messageSpan = scope.span();
|
final Span messageSpan = scope.span();
|
||||||
DECORATE.afterStart(messageSpan);
|
DECORATE.afterStart(messageSpan);
|
||||||
try {
|
try {
|
||||||
|
@ -131,7 +144,10 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
public void onClose(final Status status, final Metadata trailers) {
|
public void onClose(final Status status, final Metadata trailers) {
|
||||||
DECORATE.onClose(span, status);
|
DECORATE.onClose(span, status);
|
||||||
// Finishes span.
|
// Finishes span.
|
||||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
delegate().onClose(status, trailers);
|
delegate().onClose(status, trailers);
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
DECORATE.onError(span, e);
|
DECORATE.onError(span, e);
|
||||||
|
@ -144,7 +160,10 @@ public class TracingClientInterceptor implements ClientInterceptor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReady() {
|
public void onReady() {
|
||||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||||
|
if (scope instanceof TraceScope) {
|
||||||
|
((TraceScope) scope).setAsyncPropagation(true);
|
||||||
|
}
|
||||||
delegate().onReady();
|
delegate().onReady();
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
DECORATE.onError(span, e);
|
DECORATE.onError(span, e);
|
||||||
|
|
Loading…
Reference in New Issue