Enable executor instrumentation for gRPC server calls
This might be more than is needed. Maybe cut back later.
This commit is contained in:
parent
51d74f6147
commit
63bee3737c
|
@ -4,6 +4,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
|
|||
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import io.grpc.ForwardingServerCallListener;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.ServerCall;
|
||||
|
@ -54,6 +55,11 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
spanBuilder.asChildOf(spanContext);
|
||||
}
|
||||
final Scope scope = spanBuilder.startActive(false);
|
||||
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
|
||||
final Span span = scope.span();
|
||||
|
||||
final ServerCall.Listener<ReqT> result;
|
||||
|
@ -66,6 +72,9 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
span.finish();
|
||||
throw e;
|
||||
} finally {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
scope.close();
|
||||
}
|
||||
|
||||
|
@ -95,6 +104,9 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.RPC)
|
||||
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
|
||||
.startActive(true);
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
try {
|
||||
delegate().onMessage(message);
|
||||
} catch (final RuntimeException | Error e) {
|
||||
|
@ -104,14 +116,23 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
this.span.finish();
|
||||
throw e;
|
||||
} finally {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHalfClose() {
|
||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
||||
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
delegate().onHalfClose();
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
} catch (final RuntimeException | Error e) {
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
|
@ -123,9 +144,15 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
@Override
|
||||
public void onCancel() {
|
||||
// Finishes span.
|
||||
try (final Scope ignored = tracer.scopeManager().activate(span, true)) {
|
||||
try (final Scope scope = tracer.scopeManager().activate(span, true)) {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
delegate().onCancel();
|
||||
span.setTag("canceled", true);
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
} catch (final RuntimeException | Error e) {
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
|
@ -137,8 +164,14 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
@Override
|
||||
public void onComplete() {
|
||||
// Finishes span.
|
||||
try (final Scope ignored = tracer.scopeManager().activate(span, true)) {
|
||||
try (final Scope scope = tracer.scopeManager().activate(span, true)) {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
delegate().onComplete();
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
} catch (final RuntimeException | Error e) {
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
|
@ -149,8 +182,14 @@ public class TracingServerInterceptor implements ServerInterceptor {
|
|||
|
||||
@Override
|
||||
public void onReady() {
|
||||
try (final Scope ignored = tracer.scopeManager().activate(span, false)) {
|
||||
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(true);
|
||||
}
|
||||
delegate().onReady();
|
||||
if (scope instanceof TraceScope) {
|
||||
((TraceScope) scope).setAsyncPropagation(false);
|
||||
}
|
||||
} catch (final RuntimeException | Error e) {
|
||||
Tags.ERROR.set(span, true);
|
||||
span.log(Collections.singletonMap(ERROR_OBJECT, e));
|
||||
|
|
Loading…
Reference in New Issue