Refactored grpc instumenter (#72)

* Refactored grpc instrumenter

* Cleaned up formatting

* Simplified interceptors

* Fixed some more formatting

* Formatting

* Addressed maintainer comments
This commit is contained in:
Pontus Rydin 2020-01-18 15:09:50 -05:00 committed by Trask Stalnaker
parent 736c70ffe0
commit ed42c83b50
8 changed files with 106 additions and 120 deletions

View File

@ -1,12 +1,15 @@
package io.opentelemetry.auto.instrumentation.grpc.client;
import io.grpc.Status;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.api.SpanTypes;
import io.opentelemetry.auto.decorator.ClientDecorator;
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Status;
import io.opentelemetry.trace.Tracer;
public class GrpcClientDecorator extends ClientDecorator {
public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator();
public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto");
@Override
protected String[] instrumentationNames() {
@ -28,16 +31,17 @@ public class GrpcClientDecorator extends ClientDecorator {
return null;
}
public AgentSpan onClose(final AgentSpan span, final Status status) {
public Span onClose(final Span span, final io.grpc.Status status) {
span.setAttribute("status.code", status.getCode().name());
span.setAttribute("status.description", status.getDescription());
if (status.getDescription() != null) {
span.setAttribute("status.description", status.getDescription());
}
onError(span, status.getCause());
if (!status.isOk()) {
span.setError(true);
span.setStatus(Status.UNKNOWN);
}
return span;
}
}

View File

@ -1,14 +1,14 @@
package io.opentelemetry.auto.instrumentation.grpc.client;
import io.grpc.Metadata;
import io.opentelemetry.auto.instrumentation.api.AgentPropagation;
import io.opentelemetry.context.propagation.HttpTextFormat;
public final class GrpcInjectAdapter implements AgentPropagation.Setter<Metadata> {
public final class GrpcInjectAdapter implements HttpTextFormat.Setter<Metadata> {
public static final GrpcInjectAdapter SETTER = new GrpcInjectAdapter();
@Override
public void set(final Metadata carrier, final String key, final String value) {
public void put(final Metadata carrier, final String key, final String value) {
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}
}

View File

@ -1,9 +1,7 @@
package io.opentelemetry.auto.instrumentation.grpc.client;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.propagate;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.startSpan;
import static io.opentelemetry.auto.instrumentation.grpc.client.GrpcClientDecorator.DECORATE;
import static io.opentelemetry.auto.instrumentation.grpc.client.GrpcClientDecorator.TRACER;
import static io.opentelemetry.auto.instrumentation.grpc.client.GrpcInjectAdapter.SETTER;
import io.grpc.CallOptions;
@ -16,8 +14,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.opentelemetry.auto.api.MoreTags;
import io.opentelemetry.auto.instrumentation.api.AgentScope;
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
public class TracingClientInterceptor implements ClientInterceptor {
@ -29,9 +27,9 @@ public class TracingClientInterceptor implements ClientInterceptor {
final CallOptions callOptions,
final Channel next) {
final AgentSpan span =
startSpan("grpc.client").setAttribute(MoreTags.RESOURCE_NAME, method.getFullMethodName());
try (final AgentScope scope = activateSpan(span, false)) {
final Span span = TRACER.spanBuilder("grpc.client").startSpan();
span.setAttribute(MoreTags.RESOURCE_NAME, method.getFullMethodName());
try (final Scope scope = TRACER.withSpan(span)) {
DECORATE.afterStart(span);
final ClientCall<ReqT, RespT> result;
@ -41,45 +39,43 @@ public class TracingClientInterceptor implements ClientInterceptor {
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
return new TracingClientCall<>(span, result);
}
}
static final class TracingClientCall<ReqT, RespT>
extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
final AgentSpan span;
final Span span;
TracingClientCall(final AgentSpan span, final ClientCall<ReqT, RespT> delegate) {
TracingClientCall(final Span span, final ClientCall<ReqT, RespT> delegate) {
super(delegate);
this.span = span;
}
@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
propagate().inject(span, headers, SETTER);
try (final AgentScope scope = activateSpan(span, false)) {
TRACER.getHttpTextFormat().inject(span.getContext(), headers, SETTER);
try (final Scope scope = TRACER.withSpan(span)) {
super.start(new TracingClientCallListener<>(span, responseListener), headers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
}
@Override
public void sendMessage(final ReqT message) {
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
super.sendMessage(message);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
}
@ -87,28 +83,26 @@ public class TracingClientInterceptor implements ClientInterceptor {
static final class TracingClientCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
final AgentSpan span;
final Span span;
TracingClientCallListener(final AgentSpan span, final ClientCall.Listener<RespT> delegate) {
TracingClientCallListener(final Span span, final ClientCall.Listener<RespT> delegate) {
super(delegate);
this.span = span;
}
@Override
public void onMessage(final RespT message) {
final AgentSpan messageSpan =
startSpan("grpc.message", span.context())
.setAttribute("message.type", message.getClass().getName());
final Span messageSpan = TRACER.spanBuilder("grpc.message").setParent(span).startSpan();
messageSpan.setAttribute("message.type", message.getClass().getName());
DECORATE.afterStart(messageSpan);
final AgentScope scope = activateSpan(messageSpan, true);
try {
try (final Scope scope = TRACER.withSpan(messageSpan)) {
delegate().onMessage(message);
} catch (final Throwable e) {
DECORATE.onError(messageSpan, e);
throw e;
} finally {
DECORATE.beforeFinish(messageSpan);
scope.close();
messageSpan.end();
}
}
@ -116,25 +110,25 @@ public class TracingClientInterceptor implements ClientInterceptor {
public void onClose(final Status status, final Metadata trailers) {
DECORATE.onClose(span, status);
// Finishes span.
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onClose(status, trailers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;
} finally {
DECORATE.beforeFinish(span);
span.finish();
span.end();
}
}
@Override
public void onReady() {
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onReady();
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
}

View File

@ -1,27 +1,12 @@
package io.opentelemetry.auto.instrumentation.grpc.server;
import io.grpc.Metadata;
import io.opentelemetry.auto.instrumentation.api.AgentPropagation;
import java.util.ArrayList;
import java.util.List;
import io.opentelemetry.context.propagation.HttpTextFormat;
public final class GrpcExtractAdapter implements AgentPropagation.Getter<Metadata> {
public final class GrpcExtractAdapter implements HttpTextFormat.Getter<Metadata> {
public static final GrpcExtractAdapter GETTER = new GrpcExtractAdapter();
@Override
public Iterable<String> keys(final Metadata carrier) {
final List<String> keys = new ArrayList<>();
for (final String key : carrier.keys()) {
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
keys.add(key);
}
}
return keys;
}
@Override
public String get(final Metadata carrier, final String key) {
return carrier.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));

View File

@ -1,12 +1,15 @@
package io.opentelemetry.auto.instrumentation.grpc.server;
import io.grpc.Status;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.api.SpanTypes;
import io.opentelemetry.auto.decorator.ServerDecorator;
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
public class GrpcServerDecorator extends ServerDecorator {
public static final GrpcServerDecorator DECORATE = new GrpcServerDecorator();
public static final Tracer TRACER = OpenTelemetry.getTracerFactory().get("io.opentelemetry.auto");
@Override
protected String[] instrumentationNames() {
@ -23,16 +26,16 @@ public class GrpcServerDecorator extends ServerDecorator {
return "grpc-server";
}
public AgentSpan onClose(final AgentSpan span, final Status status) {
public Span onClose(final Span span, final Status status) {
span.setAttribute("status.code", status.getCode().name());
span.setAttribute("status.description", status.getDescription());
if (status.getDescription() != null) {
span.setAttribute("status.description", status.getDescription());
}
onError(span, status.getCause());
if (!status.isOk()) {
span.setError(true);
span.setStatus(io.opentelemetry.trace.Status.UNKNOWN);
}
return span;
}
}

View File

@ -1,10 +1,8 @@
package io.opentelemetry.auto.instrumentation.grpc.server;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.activateSpan;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.propagate;
import static io.opentelemetry.auto.instrumentation.api.AgentTracer.startSpan;
import static io.opentelemetry.auto.instrumentation.grpc.server.GrpcExtractAdapter.GETTER;
import static io.opentelemetry.auto.instrumentation.grpc.server.GrpcServerDecorator.DECORATE;
import static io.opentelemetry.auto.instrumentation.grpc.server.GrpcServerDecorator.TRACER;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
@ -14,9 +12,9 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.opentelemetry.auto.api.MoreTags;
import io.opentelemetry.auto.instrumentation.api.AgentScope;
import io.opentelemetry.auto.instrumentation.api.AgentSpan;
import io.opentelemetry.auto.instrumentation.api.AgentSpan.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
public class TracingServerInterceptor implements ServerInterceptor {
@ -30,29 +28,29 @@ public class TracingServerInterceptor implements ServerInterceptor {
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Context spanContext = propagate().extract(headers, GETTER);
final AgentSpan span =
startSpan("grpc.server", spanContext)
.setAttribute(MoreTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName());
final SpanContext spanContext = TRACER.getHttpTextFormat().extract(headers, GETTER);
final Span span = TRACER.spanBuilder("grpc.server").setParent(spanContext).startSpan();
span.setAttribute(MoreTags.RESOURCE_NAME, call.getMethodDescriptor().getFullMethodName());
DECORATE.afterStart(span);
final AgentScope scope = activateSpan(span, false);
final ServerCall.Listener<ReqT> result;
try {
// Wrap the server call so that we can decorate the span
// with the resulting status
final TracingServerCall<ReqT, RespT> tracingServerCall = new TracingServerCall<>(span, call);
try (final Scope scope = TRACER.withSpan(span)) {
// call other interceptors
result = next.startCall(tracingServerCall, headers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
throw e;
} finally {
scope.close();
try {
// Wrap the server call so that we can decorate the span
// with the resulting status
final TracingServerCall<ReqT, RespT> tracingServerCall =
new TracingServerCall<>(span, call);
// call other interceptors
result = next.startCall(tracingServerCall, headers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.end();
throw e;
}
}
// This ensures the server implementation can see the span in scope
@ -61,9 +59,9 @@ public class TracingServerInterceptor implements ServerInterceptor {
static final class TracingServerCall<ReqT, RespT>
extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
final AgentSpan span;
final Span span;
TracingServerCall(final AgentSpan span, final ServerCall<ReqT, RespT> delegate) {
TracingServerCall(final Span span, final ServerCall<ReqT, RespT> delegate) {
super(delegate);
this.span = span;
}
@ -71,7 +69,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void close(final Status status, final Metadata trailers) {
DECORATE.onClose(span, status);
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().close(status, trailers);
} catch (final Throwable e) {
DECORATE.onError(span, e);
@ -82,41 +80,42 @@ public class TracingServerInterceptor implements ServerInterceptor {
static final class TracingServerCallListener<ReqT>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
private final AgentSpan span;
private final Span span;
TracingServerCallListener(final AgentSpan span, final ServerCall.Listener<ReqT> delegate) {
TracingServerCallListener(final Span span, final ServerCall.Listener<ReqT> delegate) {
super(delegate);
this.span = span;
}
@Override
public void onMessage(final ReqT message) {
final AgentSpan span =
startSpan("grpc.message", this.span.context())
.setAttribute("message.type", message.getClass().getName());
final Span span =
TRACER.spanBuilder("grpc.message").setParent(this.span.getContext()).startSpan();
span.setAttribute("message.type", message.getClass().getName());
DECORATE.afterStart(span);
final AgentScope scope = activateSpan(span, true);
final Scope scope = TRACER.withSpan(span);
try {
delegate().onMessage(message);
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(this.span);
this.span.finish();
this.span.end();
throw e;
} finally {
DECORATE.beforeFinish(span);
span.end();
scope.close();
}
}
@Override
public void onHalfClose() {
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onHalfClose();
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
}
@ -124,7 +123,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onCancel() {
// Finishes span.
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onCancel();
span.setAttribute("canceled", true);
} catch (final Throwable e) {
@ -132,32 +131,32 @@ public class TracingServerInterceptor implements ServerInterceptor {
throw e;
} finally {
DECORATE.beforeFinish(span);
span.finish();
span.end();
}
}
@Override
public void onComplete() {
// Finishes span.
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onComplete();
} catch (final Throwable e) {
DECORATE.onError(span, e);
throw e;
} finally {
DECORATE.beforeFinish(span);
span.finish();
span.end();
}
}
@Override
public void onReady() {
try (final AgentScope scope = activateSpan(span, false)) {
try (final Scope scope = TRACER.withSpan(span)) {
delegate().onReady();
} catch (final Throwable e) {
DECORATE.onError(span, e);
DECORATE.beforeFinish(span);
span.finish();
span.end();
throw e;
}
}

View File

@ -2,7 +2,6 @@ import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.ManagedChannel
import io.grpc.Metadata
import io.grpc.Server
import io.grpc.Status
import io.grpc.StatusRuntimeException
@ -12,7 +11,6 @@ import io.grpc.stub.StreamObserver
import io.opentelemetry.auto.api.MoreTags
import io.opentelemetry.auto.api.SpanTypes
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.instrumentation.grpc.server.GrpcExtractAdapter
import io.opentelemetry.auto.test.AgentTestRunner
import io.opentelemetry.sdk.trace.SpanData
@ -270,17 +268,4 @@ class GrpcTest extends AgentTestRunner {
"Status - description" | Status.PERMISSION_DENIED.withDescription("some description")
"StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description")
}
def "skip binary headers"() {
setup:
def meta = new Metadata()
meta.put(Metadata.Key.<String> of("test", Metadata.ASCII_STRING_MARSHALLER), "val")
meta.put(Metadata.Key.<byte[]> of("test-bin", Metadata.BINARY_BYTE_MARSHALLER), "bin-val".bytes)
when:
def keys = GrpcExtractAdapter.GETTER.keys(meta)
then:
keys == ["test"]
}
}

View File

@ -24,7 +24,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ListWriter implements SpanProcessor {
private volatile List<List<SpanData>> traces = new CopyOnWriteArrayList<>();
@ -40,11 +42,25 @@ public class ListWriter implements SpanProcessor {
@Override
public void onStart(final ReadableSpan readableSpan) {
final SpanData sd = readableSpan.toSpanData();
log.debug(
">>> SPAN START: {} id={} traceid={} parent={}",
sd.getName(),
sd.getSpanId().toLowerBase16(),
sd.getTraceId().toLowerBase16(),
sd.getParentSpanId().toLowerBase16());
spanOrders.put(readableSpan.getSpanContext().getSpanId(), nextSpanOrder.getAndIncrement());
}
@Override
public void onEnd(final ReadableSpan readableSpan) {
final SpanData sd = readableSpan.toSpanData();
log.debug(
"<<< SPAN END: {} id={} traceid={} parent={}",
sd.getName(),
sd.getSpanId().toLowerBase16(),
sd.getTraceId().toLowerBase16(),
sd.getParentSpanId().toLowerBase16());
final SpanData span = readableSpan.toSpanData();
synchronized (structuralChangeLock) {
boolean found = false;