Update grpc-1.5 to new agent api

This commit is contained in:
Trask Stalnaker 2019-10-19 11:57:41 -07:00
parent 74fda2e198
commit 50dcc5bd4e
8 changed files with 110 additions and 189 deletions

View File

@ -7,7 +7,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import io.grpc.ClientInterceptor;
import io.opentracing.util.GlobalTracer;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
@ -58,7 +57,7 @@ public class GrpcClientBuilderInstrumentation extends Instrumenter.Default {
}
}
if (shouldRegister) {
interceptors.add(0, new TracingClientInterceptor(GlobalTracer.get()));
interceptors.add(0, TracingClientInterceptor.INSTANCE);
}
}
}

View File

@ -2,9 +2,8 @@ package datadog.trace.instrumentation.grpc.client;
import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.instrumentation.api.AgentSpan;
import io.grpc.Status;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
public class GrpcClientDecorator extends ClientDecorator {
public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator();
@ -29,14 +28,14 @@ public class GrpcClientDecorator extends ClientDecorator {
return null;
}
public Span onClose(final Span span, final Status status) {
public AgentSpan onClose(final AgentSpan span, final Status status) {
span.setTag("status.code", status.getCode().name());
span.setTag("status.description", status.getDescription());
onError(span, status.getCause());
if (!status.isOk()) {
Tags.ERROR.set(span, true);
span.setError(true);
}
return span;

View File

@ -1,25 +1,14 @@
package datadog.trace.instrumentation.grpc.client;
import datadog.trace.instrumentation.api.AgentPropagation;
import io.grpc.Metadata;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
public final class GrpcInjectAdapter implements TextMap {
private final Metadata metadata;
public final class GrpcInjectAdapter implements AgentPropagation.Setter<Metadata> {
public GrpcInjectAdapter(final Metadata metadata) {
this.metadata = metadata;
}
public static final GrpcInjectAdapter SETTER = new GrpcInjectAdapter();
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"GrpcInjectAdapter should only be used with Tracer.inject()");
}
@Override
public void put(final String key, final String value) {
this.metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
public void set(final Metadata carrier, final String key, final String value) {
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}
}

View File

@ -1,9 +1,14 @@
package datadog.trace.instrumentation.grpc.client;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.grpc.client.GrpcClientDecorator.DECORATE;
import static datadog.trace.instrumentation.grpc.client.GrpcInjectAdapter.SETTER;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@ -13,18 +18,10 @@ import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
public class TracingClientInterceptor implements ClientInterceptor {
private final Tracer tracer;
public TracingClientInterceptor(final Tracer tracer) {
this.tracer = tracer;
}
public static final TracingClientInterceptor INSTANCE = new TracingClientInterceptor();
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
@ -32,16 +29,11 @@ public class TracingClientInterceptor implements ClientInterceptor {
final CallOptions callOptions,
final Channel next) {
final Span span =
tracer
.buildSpan("grpc.client")
.withTag(DDTags.RESOURCE_NAME, method.getFullMethodName())
.start();
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
final AgentSpan span =
startSpan("grpc.client").setTag(DDTags.RESOURCE_NAME, method.getFullMethodName());
try (final AgentScope scope = activateSpan(span, false)) {
DECORATE.afterStart(span);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
scope.setAsyncPropagation(true);
final ClientCall<ReqT, RespT> result;
try {
@ -54,31 +46,26 @@ public class TracingClientInterceptor implements ClientInterceptor {
throw e;
}
return new TracingClientCall<>(tracer, span, result);
return new TracingClientCall<>(span, result);
}
}
static final class TracingClientCall<ReqT, RespT>
extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
final Tracer tracer;
final Span span;
final AgentSpan span;
TracingClientCall(
final Tracer tracer, final Span span, final ClientCall<ReqT, RespT> delegate) {
TracingClientCall(final AgentSpan span, final ClientCall<ReqT, RespT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new GrpcInjectAdapter(headers));
propagate().inject(span, headers, SETTER);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
super.start(new TracingClientCallListener<>(tracer, span, responseListener), headers);
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
super.start(new TracingClientCallListener<>(span, responseListener), headers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
@ -89,10 +76,8 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void sendMessage(final ReqT message) {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
super.sendMessage(message);
} catch (final Throwable e) {
DECORATE.onError(span, e);
@ -105,29 +90,21 @@ public class TracingClientInterceptor implements ClientInterceptor {
static final class TracingClientCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
final Tracer tracer;
final Span span;
final AgentSpan span;
TracingClientCallListener(
final Tracer tracer, final Span span, final ClientCall.Listener<RespT> delegate) {
TracingClientCallListener(final AgentSpan span, final ClientCall.Listener<RespT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void onMessage(final RespT message) {
final Scope scope =
tracer
.buildSpan("grpc.message")
.asChildOf(span)
.withTag("message.type", message.getClass().getName())
.startActive(true);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final Span messageSpan = scope.span();
final AgentSpan messageSpan =
startSpan("grpc.message", span.context())
.setTag("message.type", message.getClass().getName());
DECORATE.afterStart(messageSpan);
final AgentScope scope = activateSpan(messageSpan, true);
scope.setAsyncPropagation(true);
try {
delegate().onMessage(message);
} catch (final Throwable e) {
@ -143,10 +120,8 @@ public class TracingClientInterceptor implements ClientInterceptor {
public void onClose(final Status status, final Metadata trailers) {
DECORATE.onClose(span, status);
// Finishes span.
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onClose(status, trailers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
@ -159,10 +134,8 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void onReady() {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onReady();
} catch (final Throwable e) {
DECORATE.onError(span, e);

View File

@ -0,0 +1,19 @@
package datadog.trace.instrumentation.grpc.server;
import datadog.trace.instrumentation.api.AgentPropagation;
import io.grpc.Metadata;
public final class GrpcExtractAdapter implements AgentPropagation.Getter<Metadata> {
public static final GrpcExtractAdapter GETTER = new GrpcExtractAdapter();
@Override
public Iterable<String> keys(final Metadata carrier) {
return carrier.keys();
}
@Override
public String get(final Metadata carrier, final String key) {
return carrier.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}
}

View File

@ -7,7 +7,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import io.grpc.ServerInterceptor;
import io.opentracing.util.GlobalTracer;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
@ -36,6 +35,7 @@ public class GrpcServerBuilderInstrumentation extends Instrumenter.Default {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ServerDecorator",
packageName + ".GrpcServerDecorator",
packageName + ".GrpcExtractAdapter"
};
}
@ -57,7 +57,7 @@ public class GrpcServerBuilderInstrumentation extends Instrumenter.Default {
}
}
if (shouldRegister) {
interceptors.add(0, new TracingServerInterceptor(GlobalTracer.get()));
interceptors.add(0, TracingServerInterceptor.INSTANCE);
}
}
}

View File

@ -2,9 +2,8 @@ package datadog.trace.instrumentation.grpc.server;
import datadog.trace.agent.decorator.ServerDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.instrumentation.api.AgentSpan;
import io.grpc.Status;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
public class GrpcServerDecorator extends ServerDecorator {
public static final GrpcServerDecorator DECORATE = new GrpcServerDecorator();
@ -24,14 +23,14 @@ public class GrpcServerDecorator extends ServerDecorator {
return "grpc-server";
}
public Span onClose(final Span span, final Status status) {
public AgentSpan onClose(final AgentSpan span, final Status status) {
span.setTag("status.code", status.getCode().name());
span.setTag("status.description", status.getDescription());
onError(span, status.getCause());
if (!status.isOk()) {
Tags.ERROR.set(span, true);
span.setError(true);
}
return span;

View File

@ -1,9 +1,15 @@
package datadog.trace.instrumentation.grpc.server;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.grpc.server.GrpcExtractAdapter.GETTER;
import static datadog.trace.instrumentation.grpc.server.GrpcServerDecorator.DECORATE;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.api.AgentSpan.Context;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
@ -11,22 +17,12 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapExtractAdapter;
import java.util.HashMap;
import java.util.Map;
public class TracingServerInterceptor implements ServerInterceptor {
private final Tracer tracer;
public static final TracingServerInterceptor INSTANCE = new TracingServerInterceptor();
public TracingServerInterceptor(final Tracer tracer) {
this.tracer = tracer;
}
private TracingServerInterceptor() {}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
@ -34,38 +30,20 @@ public class TracingServerInterceptor implements ServerInterceptor {
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Map<String, String> headerMap = new HashMap<>();
for (final String key : headers.keys()) {
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
final String value = headers.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
headerMap.put(key, value);
}
}
final SpanContext spanContext =
tracer.extract(Format.Builtin.TEXT_MAP_EXTRACT, new TextMapExtractAdapter(headerMap));
final Tracer.SpanBuilder spanBuilder =
tracer
.buildSpan("grpc.server")
.withTag(DDTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName());
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
}
final Scope scope = spanBuilder.startActive(false);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final Span span = scope.span();
final Context spanContext = propagate().extract(headers, GETTER);
final AgentSpan span =
startSpan("grpc.server", spanContext)
.setTag(DDTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName());
DECORATE.afterStart(span);
final AgentScope scope = activateSpan(span, false);
scope.setAsyncPropagation(true);
final ServerCall.Listener<ReqT> result;
try {
// Wrap the server call so that we can decorate the span
// with the resulting status
TracingServerCall<ReqT, RespT> tracingServerCall =
new TracingServerCall<>(tracer, span, call);
final TracingServerCall<ReqT, RespT> tracingServerCall = new TracingServerCall<>(span, call);
// call other interceptors
result = next.startCall(tracingServerCall, headers);
@ -75,39 +53,30 @@ public class TracingServerInterceptor implements ServerInterceptor {
span.finish();
throw e;
} finally {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
scope.close();
}
// This ensures the server implementation can see the span in scope
return new TracingServerCallListener<>(tracer, span, result);
return new TracingServerCallListener<>(span, result);
}
static final class TracingServerCall<ReqT, RespT>
extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
final Tracer tracer;
final Span span;
final AgentSpan span;
TracingServerCall(
final Tracer tracer, final Span span, final ServerCall<ReqT, RespT> delegate) {
TracingServerCall(final AgentSpan span, final ServerCall<ReqT, RespT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void close(final Status status, final Metadata trailers) {
DECORATE.onClose(span, status);
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().close(status, trailers);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;
@ -117,55 +86,41 @@ public class TracingServerInterceptor implements ServerInterceptor {
static final class TracingServerCallListener<ReqT>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
final Tracer tracer;
final Span span;
private final AgentSpan span;
TracingServerCallListener(
final Tracer tracer, final Span span, final ServerCall.Listener<ReqT> delegate) {
TracingServerCallListener(final AgentSpan span, final ServerCall.Listener<ReqT> delegate) {
super(delegate);
this.tracer = tracer;
this.span = span;
}
@Override
public void onMessage(final ReqT message) {
final Scope scope =
tracer
.buildSpan("grpc.message")
.asChildOf(span)
.withTag("message.type", message.getClass().getName())
.startActive(true);
DECORATE.afterStart(scope.span());
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
final AgentSpan span =
startSpan("grpc.message", this.span.context())
.setTag("message.type", message.getClass().getName());
DECORATE.afterStart(span);
final AgentScope scope = activateSpan(span, true);
scope.setAsyncPropagation(true);
try {
delegate().onMessage(message);
} catch (final Throwable e) {
final Span span = scope.span();
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
this.span.finish();
throw e;
} finally {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
DECORATE.afterStart(scope.span());
scope.setAsyncPropagation(false);
DECORATE.afterStart(span);
scope.close();
}
}
@Override
public void onHalfClose() {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onHalfClose();
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
@ -177,15 +132,11 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onCancel() {
// Finishes span.
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onCancel();
span.setTag("canceled", true);
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;
@ -198,14 +149,10 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onComplete() {
// Finishes span.
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onComplete();
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;
@ -217,14 +164,10 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onReady() {
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try (final AgentScope scope = activateSpan(span, false)) {
scope.setAsyncPropagation(true);
delegate().onReady();
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.setAsyncPropagation(false);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);