Update rabbitmq-amqp-2.7 to new agent api

This commit is contained in:
Trask Stalnaker 2019-10-19 11:57:46 -07:00
parent 7460c32248
commit 462a6632f7
6 changed files with 93 additions and 117 deletions

View File

@ -1,9 +1,16 @@
package datadog.trace.instrumentation.rabbitmq.amqp; package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.instrumentation.api.AgentTracer.noopSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE; import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.TextMapExtractAdapter.GETTER;
import static datadog.trace.instrumentation.rabbitmq.amqp.TextMapInjectAdapter.SETTER;
import static net.bytebuddy.matcher.ElementMatchers.canThrow; import static net.bytebuddy.matcher.ElementMatchers.canThrow;
import static net.bytebuddy.matcher.ElementMatchers.isGetter; import static net.bytebuddy.matcher.ElementMatchers.isGetter;
import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isInterface;
@ -26,13 +33,10 @@ import com.rabbitmq.client.MessageProperties;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap; import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope; import datadog.trace.instrumentation.api.AgentScope;
import io.opentracing.Span; import datadog.trace.instrumentation.api.AgentSpan;
import io.opentracing.SpanContext; import datadog.trace.instrumentation.api.AgentSpan.Context;
import io.opentracing.noop.NoopSpan;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -107,7 +111,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
public static class ChannelMethodAdvice { public static class ChannelMethodAdvice {
@Advice.OnMethodEnter @Advice.OnMethodEnter
public static Scope startSpan( public static AgentScope onEnter(
@Advice.This final Channel channel, @Advice.Origin("Channel.#m") final String method) { @Advice.This final Channel channel, @Advice.Origin("Channel.#m") final String method) {
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class);
if (callDepth > 0) { if (callDepth > 0) {
@ -116,20 +120,18 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Connection connection = channel.getConnection(); final Connection connection = channel.getConnection();
final Scope scope = final AgentSpan span =
GlobalTracer.get() startSpan("amqp.command")
.buildSpan("amqp.command") .setTag(DDTags.RESOURCE_NAME, method)
.withTag(DDTags.RESOURCE_NAME, method) .setTag(Tags.PEER_PORT.getKey(), connection.getPort());
.withTag(Tags.PEER_PORT.getKey(), connection.getPort()) DECORATE.afterStart(span);
.startActive(true); DECORATE.onPeerConnection(span, connection.getAddress());
DECORATE.afterStart(scope); return activateSpan(span, true);
DECORATE.onPeerConnection(scope.span(), connection.getAddress());
return scope;
} }
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan( public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) { if (scope != null) {
DECORATE.onError(scope, throwable); DECORATE.onError(scope, throwable);
DECORATE.beforeFinish(scope); DECORATE.beforeFinish(scope);
@ -146,7 +148,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
@Advice.Argument(1) final String routingKey, @Advice.Argument(1) final String routingKey,
@Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props, @Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props,
@Advice.Argument(5) final byte[] body) { @Advice.Argument(5) final byte[] body) {
final Span span = GlobalTracer.get().activeSpan(); final AgentSpan span = activeSpan();
if (span != null) { if (span != null) {
PRODUCER_DECORATE.afterStart(span); // Overwrite tags set by generic decorator. PRODUCER_DECORATE.afterStart(span); // Overwrite tags set by generic decorator.
@ -157,14 +159,16 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
if (props == null) { if (props == null) {
props = MessageProperties.MINIMAL_BASIC; props = MessageProperties.MINIMAL_BASIC;
} }
span.setTag("amqp.delivery_mode", props.getDeliveryMode()); final Integer deliveryMode = props.getDeliveryMode();
if (deliveryMode != null) {
span.setTag("amqp.delivery_mode", deliveryMode);
}
// We need to copy the BasicProperties and provide a header map we can modify // We need to copy the BasicProperties and provide a header map we can modify
Map<String, Object> headers = props.getHeaders(); Map<String, Object> headers = props.getHeaders();
headers = (headers == null) ? new HashMap<String, Object>() : new HashMap<>(headers); headers = (headers == null) ? new HashMap<String, Object>() : new HashMap<>(headers);
GlobalTracer.get() propagate().inject(span, headers, SETTER);
.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(headers));
props = props =
new AMQP.BasicProperties( new AMQP.BasicProperties(
@ -189,11 +193,12 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
public static class ChannelGetAdvice { public static class ChannelGetAdvice {
@Advice.OnMethodEnter @Advice.OnMethodEnter
public static long takeTimestamp( public static long takeTimestamp(
@Advice.Local("placeholderScope") Scope placeholderScope, @Advice.Local("placeholderScope") AgentScope placeholderScope,
@Advice.Local("callDepth") int callDepth) { @Advice.Local("callDepth") int callDepth) {
callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class);
// Don't want RabbitCommandInstrumentation to mess up our actual parent span. // Don't want RabbitCommandInstrumentation to mess up our actual parent span.
placeholderScope = GlobalTracer.get().scopeManager().activate(NoopSpan.INSTANCE, false); placeholderScope = activateSpan(noopSpan(), false);
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
@ -202,32 +207,26 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
@Advice.This final Channel channel, @Advice.This final Channel channel,
@Advice.Argument(0) final String queue, @Advice.Argument(0) final String queue,
@Advice.Enter final long startTime, @Advice.Enter final long startTime,
@Advice.Local("placeholderScope") final Scope placeholderScope, @Advice.Local("placeholderScope") final AgentScope placeholderScope,
@Advice.Local("callDepth") final int callDepth, @Advice.Local("callDepth") final int callDepth,
@Advice.Return final GetResponse response, @Advice.Return final GetResponse response,
@Advice.Thrown final Throwable throwable) { @Advice.Thrown final Throwable throwable) {
if (placeholderScope.span() instanceof NoopSpan) {
placeholderScope.close(); placeholderScope.close();
}
if (callDepth > 0) { if (callDepth > 0) {
return; return;
} }
SpanContext parentContext = null; Context parentContext = null;
if (response != null && response.getProps() != null) { if (response != null && response.getProps() != null) {
final Map<String, Object> headers = response.getProps().getHeaders(); final Map<String, Object> headers = response.getProps().getHeaders();
parentContext = parentContext = headers == null ? null : propagate().extract(headers, GETTER);
headers == null
? null
: GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers));
} }
if (parentContext == null) { if (parentContext == null) {
final Span parent = GlobalTracer.get().activeSpan(); final AgentSpan parent = activeSpan();
if (parent != null) { if (parent != null) {
parentContext = parent.context(); parentContext = parent.context();
} }
@ -235,19 +234,19 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Connection connection = channel.getConnection(); final Connection connection = channel.getConnection();
final Integer length = response == null ? null : response.getBody().length;
// TODO: it would be better if we could actually have span wrapped into the scope started in // TODO: it would be better if we could actually have span wrapped into the scope started in
// OnMethodEnter // OnMethodEnter
final Span span = final AgentSpan span;
GlobalTracer.get() if (parentContext != null) {
.buildSpan("amqp.command") span = startSpan("amqp.command", parentContext, TimeUnit.MILLISECONDS.toMicros(startTime));
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) } else {
.asChildOf(parentContext) span = startSpan("amqp.command", TimeUnit.MILLISECONDS.toMicros(startTime));
.withTag("message.size", length) }
.withTag(Tags.PEER_PORT.getKey(), connection.getPort()) if (response != null) {
.start(); span.setTag("message.size", response.getBody().length);
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { }
span.setTag(Tags.PEER_PORT.getKey(), connection.getPort());
try (final AgentScope scope = activateSpan(span, false)) {
CONSUMER_DECORATE.afterStart(span); CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onGet(span, queue); CONSUMER_DECORATE.onGet(span, queue);
CONSUMER_DECORATE.onPeerConnection(span, connection.getAddress()); CONSUMER_DECORATE.onPeerConnection(span, connection.getAddress());

View File

@ -1,6 +1,7 @@
package datadog.trace.instrumentation.rabbitmq.amqp; package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
@ -11,9 +12,7 @@ import static net.bytebuddy.matcher.ElementMatchers.not;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import com.rabbitmq.client.Command; import com.rabbitmq.client.Command;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.interceptor.MutableSpan; import datadog.trace.instrumentation.api.AgentSpan;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.Map; import java.util.Map;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
@ -54,10 +53,10 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default {
public static class CommandConstructorAdvice { public static class CommandConstructorAdvice {
@Advice.OnMethodExit @Advice.OnMethodExit
public static void setResourceNameAddHeaders(@Advice.This final Command command) { public static void setResourceNameAddHeaders(@Advice.This final Command command) {
final Span span = GlobalTracer.get().activeSpan(); final AgentSpan span = activeSpan();
if (span instanceof MutableSpan && command.getMethod() != null) { if (span != null && command.getMethod() != null) {
if (((MutableSpan) span).getOperationName().equals("amqp.command")) { if (span.getSpanName().equals("amqp.command")) {
DECORATE.onCommand(span, command); DECORATE.onCommand(span, command);
} }
} }

View File

@ -5,11 +5,11 @@ import com.rabbitmq.client.Envelope;
import datadog.trace.agent.decorator.ClientDecorator; import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import io.opentracing.Scope; import datadog.trace.instrumentation.api.AgentSpan;
import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
public class RabbitDecorator extends ClientDecorator { public class RabbitDecorator extends ClientDecorator {
public static final RabbitDecorator DECORATE = new RabbitDecorator(); public static final RabbitDecorator DECORATE = new RabbitDecorator();
public static final RabbitDecorator PRODUCER_DECORATE = public static final RabbitDecorator PRODUCER_DECORATE =
@ -63,7 +63,7 @@ public class RabbitDecorator extends ClientDecorator {
return DDSpanTypes.MESSAGE_CLIENT; return DDSpanTypes.MESSAGE_CLIENT;
} }
public void onPublish(final Span span, final String exchange, final String routingKey) { public void onPublish(final AgentSpan span, final String exchange, final String routingKey) {
final String exchangeName = exchange == null || exchange.isEmpty() ? "<default>" : exchange; final String exchangeName = exchange == null || exchange.isEmpty() ? "<default>" : exchange;
final String routing = final String routing =
routingKey == null || routingKey.isEmpty() routingKey == null || routingKey.isEmpty()
@ -77,7 +77,7 @@ public class RabbitDecorator extends ClientDecorator {
span.setTag("amqp.routing_key", routingKey); span.setTag("amqp.routing_key", routingKey);
} }
public void onGet(final Span span, final String queue) { public void onGet(final AgentSpan span, final String queue) {
final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue; final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue;
span.setTag(DDTags.RESOURCE_NAME, "basic.get " + queueName); span.setTag(DDTags.RESOURCE_NAME, "basic.get " + queueName);
@ -85,9 +85,7 @@ public class RabbitDecorator extends ClientDecorator {
span.setTag("amqp.queue", queue); span.setTag("amqp.queue", queue);
} }
public void onDeliver(final Scope scope, final String queue, final Envelope envelope) { public void onDeliver(final AgentSpan span, final String queue, final Envelope envelope) {
final Span span = scope.span();
String queueName = queue; String queueName = queue;
if (queue == null || queue.isEmpty()) { if (queue == null || queue.isEmpty()) {
queueName = "<default>"; queueName = "<default>";
@ -103,7 +101,7 @@ public class RabbitDecorator extends ClientDecorator {
} }
} }
public void onCommand(final Span span, final Command command) { public void onCommand(final AgentSpan span, final Command command) {
final String name = command.getMethod().protocolMethodName(); final String name = command.getMethod().protocolMethodName();
if (!name.equals("basic.publish")) { if (!name.equals("basic.publish")) {

View File

@ -1,30 +1,20 @@
package datadog.trace.instrumentation.rabbitmq.amqp; package datadog.trace.instrumentation.rabbitmq.amqp;
import io.opentracing.propagation.TextMap; import datadog.trace.instrumentation.api.AgentPropagation;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
// TextMap works with <String,String>, but the type we're given is <String,Object> public class TextMapExtractAdapter implements AgentPropagation.Getter<Map<String, Object>> {
public class TextMapExtractAdapter implements TextMap {
private final Map<String, String> map = new HashMap<>(); public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
public TextMapExtractAdapter(final Map<String, Object> headers) { @Override
for (final Map.Entry<String, Object> entry : headers.entrySet()) { public Iterable<String> keys(final Map<String, Object> carrier) {
if (entry != null && entry.getValue() != null) { return carrier.keySet();
map.put(entry.getKey(), entry.getValue().toString());
}
}
} }
@Override @Override
public Iterator<Map.Entry<String, String>> iterator() { public String get(final Map<String, Object> carrier, final String key) {
return map.entrySet().iterator(); final Object obj = carrier.get(key);
} return obj == null ? null : obj.toString();
@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("Use inject adapter instead");
} }
} }

View File

@ -1,25 +1,14 @@
package datadog.trace.instrumentation.rabbitmq.amqp; package datadog.trace.instrumentation.rabbitmq.amqp;
import io.opentracing.propagation.TextMap; import datadog.trace.instrumentation.api.AgentPropagation;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
// TextMap works with <String,String>, but the type we're given is <String,Object> public class TextMapInjectAdapter implements AgentPropagation.Setter<Map<String, Object>> {
public class TextMapInjectAdapter implements TextMap {
private final Map<String, ? super String> map;
public TextMapInjectAdapter(final Map<String, ? super String> map) { public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();
this.map = map;
}
@Override @Override
public Iterator<Map.Entry<String, String>> iterator() { public void set(final Map<String, Object> carrier, final String key, final String value) {
throw new UnsupportedOperationException( carrier.put(key, value);
"TextMapInjectAdapter should only be used with Tracer.inject()");
}
@Override
public void put(final String key, final String value) {
map.put(key, value);
} }
} }

View File

@ -1,16 +1,18 @@
package datadog.trace.instrumentation.rabbitmq.amqp; package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE; import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.TextMapExtractAdapter.GETTER;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope; import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.ShutdownSignalException;
import io.opentracing.Scope; import datadog.trace.instrumentation.api.AgentScope;
import io.opentracing.SpanContext; import datadog.trace.instrumentation.api.AgentSpan;
import io.opentracing.noop.NoopScopeManager; import datadog.trace.instrumentation.api.AgentSpan.Context;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -61,24 +63,19 @@ public class TracedDelegatingConsumer implements Consumer {
final AMQP.BasicProperties properties, final AMQP.BasicProperties properties,
final byte[] body) final byte[] body)
throws IOException { throws IOException {
Scope scope = NoopScopeManager.NoopScope.INSTANCE; AgentScope scope = null;
try { try {
final Map<String, Object> headers = properties.getHeaders(); final Map<String, Object> headers = properties.getHeaders();
final SpanContext parentContext = final Context context = headers == null ? null : propagate().extract(headers, GETTER);
headers == null
? null
: GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers));
scope = final AgentSpan span =
GlobalTracer.get() startSpan("amqp.command", context)
.buildSpan("amqp.command") .setTag("message.size", body == null ? 0 : body.length)
.asChildOf(parentContext) .setTag("span.origin.type", delegate.getClass().getName());
.withTag("message.size", body == null ? 0 : body.length) CONSUMER_DECORATE.afterStart(span);
.withTag("span.origin.type", delegate.getClass().getName()) CONSUMER_DECORATE.onDeliver(span, queue, envelope);
.startActive(true);
CONSUMER_DECORATE.afterStart(scope); scope = activateSpan(span, true);
CONSUMER_DECORATE.onDeliver(scope, queue, envelope);
} catch (final Exception e) { } catch (final Exception e) {
log.debug("Instrumentation error in tracing consumer", e); log.debug("Instrumentation error in tracing consumer", e);
@ -89,12 +86,16 @@ public class TracedDelegatingConsumer implements Consumer {
delegate.handleDelivery(consumerTag, envelope, properties, body); delegate.handleDelivery(consumerTag, envelope, properties, body);
} catch (final Throwable throwable) { } catch (final Throwable throwable) {
if (scope != null) {
CONSUMER_DECORATE.onError(scope, throwable); CONSUMER_DECORATE.onError(scope, throwable);
}
throw throwable; throw throwable;
} finally { } finally {
if (scope != null) {
CONSUMER_DECORATE.beforeFinish(scope); CONSUMER_DECORATE.beforeFinish(scope);
scope.close(); scope.close();
} }
} }
} }
}
} }