Migrate RabbitMQ instrumentation to Decorator

This commit is contained in:
Tyler Benson 2019-03-07 17:30:52 -05:00
parent e3b871afb5
commit 2146678d0a
7 changed files with 169 additions and 93 deletions

View File

@ -93,10 +93,10 @@ public abstract class BaseDecorator {
public Span onPeerConnection(final Span span, final InetSocketAddress remoteConnection) {
assert span != null;
if (remoteConnection != null) {
onPeerConnection(span, remoteConnection.getAddress());
span.setTag(Tags.PEER_HOSTNAME.getKey(), remoteConnection.getHostName());
span.setTag(Tags.PEER_PORT.getKey(), remoteConnection.getPort());
onPeerConnection(span, remoteConnection.getAddress());
}
return span;
}
@ -104,6 +104,7 @@ public abstract class BaseDecorator {
public Span onPeerConnection(final Span span, final InetAddress remoteAddress) {
assert span != null;
if (remoteAddress != null) {
span.setTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName());
if (remoteAddress instanceof Inet4Address) {
Tags.PEER_HOST_IPV4.set(span, remoteAddress.getHostAddress());
} else if (remoteAddress instanceof Inet6Address) {

View File

@ -33,7 +33,11 @@ class BaseDecoratorTest extends Specification {
decorator.onPeerConnection(span, connection)
then:
1 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName)
if (connection.getAddress()) {
2 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName)
} else {
1 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName)
}
1 * span.setTag(Tags.PEER_PORT.key, connection.port)
if (connection.address instanceof Inet4Address) {
1 * span.setTag(Tags.PEER_HOST_IPV4.key, connection.address.hostAddress)

View File

@ -1,7 +1,9 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE;
import static net.bytebuddy.matcher.ElementMatchers.canThrow;
import static net.bytebuddy.matcher.ElementMatchers.isGetter;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
@ -22,7 +24,6 @@ import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
@ -33,7 +34,6 @@ import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -58,6 +58,11 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".RabbitDecorator",
packageName + ".RabbitDecorator$1",
packageName + ".RabbitDecorator$2",
packageName + ".TextMapInjectAdapter",
packageName + ".TextMapExtractAdapter",
packageName + ".TracedDelegatingConsumer",
@ -111,27 +116,23 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Connection connection = channel.getConnection();
return GlobalTracer.get()
.buildSpan("amqp.command")
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, method)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CLIENT)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName())
.withTag(Tags.PEER_PORT.getKey(), connection.getPort())
.startActive(true);
final Scope scope =
GlobalTracer.get()
.buildSpan("amqp.command")
.withTag(DDTags.RESOURCE_NAME, method)
.withTag(Tags.PEER_PORT.getKey(), connection.getPort())
.startActive(true);
DECORATE.afterStart(scope);
DECORATE.onPeerConnection(scope.span(), connection.getAddress());
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) {
if (throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
DECORATE.onError(scope, throwable);
DECORATE.beforeFinish(scope);
scope.close();
CallDepthThreadLocalMap.reset(Channel.class);
}
@ -148,16 +149,8 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Span span = GlobalTracer.get().activeSpan();
if (span != null) {
final String exchangeName = exchange == null || exchange.isEmpty() ? "<default>" : exchange;
final String routing =
routingKey == null || routingKey.isEmpty()
? "<all>"
: routingKey.startsWith("amq.gen-") ? "<generated>" : routingKey;
span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing);
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER);
span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER);
span.setTag("amqp.exchange", exchange);
span.setTag("amqp.routing_key", routingKey);
PRODUCER_DECORATE.afterStart(span); // Overwrite tags set by generic decorator.
PRODUCER_DECORATE.onPublish(span, exchange, routingKey);
span.setTag("message.size", body == null ? 0 : body.length);
// This is the internal behavior when props are null. We're just doing it earlier now.
@ -243,30 +236,19 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default {
final Integer length = response == null ? null : response.getBody().length;
final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue;
final Span span =
GlobalTracer.get()
.buildSpan("amqp.command")
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime))
.asChildOf(parentContext)
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, "basic.get " + queueName)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag("amqp.command", "basic.get")
.withTag("amqp.queue", queue)
.withTag("message.size", length)
.withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName())
.withTag(Tags.PEER_PORT.getKey(), connection.getPort())
.start();
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onGet(span, queue);
CONSUMER_DECORATE.onPeerConnection(span, connection.getAddress());
CONSUMER_DECORATE.onError(span, throwable);
CONSUMER_DECORATE.beforeFinish(span);
span.finish();
CallDepthThreadLocalMap.reset(Channel.class);
}

View File

@ -1,6 +1,7 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
@ -9,9 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.not;
import com.google.auto.service.AutoService;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Method;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
@ -36,8 +35,14 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
// These are only used by muzzleCheck.
packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer",
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".RabbitDecorator",
packageName + ".RabbitDecorator$1",
packageName + ".RabbitDecorator$2",
// These are only used by muzzleCheck:
packageName + ".TextMapExtractAdapter",
packageName + ".TracedDelegatingConsumer",
};
}
@ -51,16 +56,9 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default {
public static void setResourceNameAddHeaders(@Advice.This final Command command) {
final Span span = GlobalTracer.get().activeSpan();
final Method method = command.getMethod();
if (span instanceof MutableSpan && method != null) {
if (span instanceof MutableSpan && command.getMethod() != null) {
if (((MutableSpan) span).getOperationName().equals("amqp.command")) {
final String name = method.protocolMethodName();
if (!name.equals("basic.publish")) {
// Don't overwrite the name already set.
span.setTag(DDTags.RESOURCE_NAME, name);
}
span.setTag("amqp.command", name);
DECORATE.onCommand(span, command);
}
}
}

View File

@ -0,0 +1,115 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Envelope;
import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
public class RabbitDecorator extends ClientDecorator {
public static final RabbitDecorator DECORATE = new RabbitDecorator();
public static final RabbitDecorator PRODUCER_DECORATE =
new RabbitDecorator() {
@Override
protected String spanKind() {
return Tags.SPAN_KIND_PRODUCER;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_PRODUCER;
}
};
public static final RabbitDecorator CONSUMER_DECORATE =
new RabbitDecorator() {
@Override
protected String spanKind() {
return Tags.SPAN_KIND_CONSUMER;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_CONSUMER;
}
};
@Override
protected String[] instrumentationNames() {
return new String[] {"amqp", "rabbitmq"};
}
@Override
protected String service() {
return "rabbitmq";
}
@Override
protected String component() {
return "rabbitmq-amqp";
}
@Override
protected String spanKind() {
return Tags.SPAN_KIND_CLIENT;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_CLIENT;
}
public void onPublish(final Span span, final String exchange, final String routingKey) {
final String exchangeName = exchange == null || exchange.isEmpty() ? "<default>" : exchange;
final String routing =
routingKey == null || routingKey.isEmpty()
? "<all>"
: routingKey.startsWith("amq.gen-") ? "<generated>" : routingKey;
span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing);
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER);
span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER);
span.setTag("amqp.command", "basic.publish");
span.setTag("amqp.exchange", exchange);
span.setTag("amqp.routing_key", routingKey);
}
public void onGet(final Span span, final String queue) {
final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue;
span.setTag(DDTags.RESOURCE_NAME, "basic.get " + queueName);
span.setTag("amqp.command", "basic.get");
span.setTag("amqp.queue", queue);
}
public void onDeliver(final Scope scope, final String queue, final Envelope envelope) {
final Span span = scope.span();
String queueName = queue;
if (queue == null || queue.isEmpty()) {
queueName = "<default>";
} else if (queue.startsWith("amq.gen-")) {
queueName = "<generated>";
}
span.setTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName);
span.setTag("amqp.command", "basic.deliver");
if (envelope != null) {
span.setTag("amqp.exchange", envelope.getExchange());
span.setTag("amqp.routing_key", envelope.getRoutingKey());
}
}
public void onCommand(final Span span, final Command command) {
final String name = command.getMethod().protocolMethodName();
if (!name.equals("basic.publish")) {
// Don't overwrite the name already set.
span.setTag(DDTags.RESOURCE_NAME, name);
}
span.setTag("amqp.command", name);
}
}

View File

@ -1,23 +1,17 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopScopeManager;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@ -76,33 +70,15 @@ public class TracedDelegatingConsumer implements Consumer {
: GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers));
String queueName = queue;
if (queue == null || queue.isEmpty()) {
queueName = "<default>";
} else if (queue.startsWith("amq.gen-")) {
queueName = "<generated>";
}
final Tracer.SpanBuilder spanBuilder =
scope =
GlobalTracer.get()
.buildSpan("amqp.command")
.asChildOf(parentContext)
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag("amqp.command", "basic.deliver")
.withTag("message.size", body == null ? 0 : body.length)
.withTag("span.origin.type", delegate.getClass().getName());
if (envelope != null) {
spanBuilder
.withTag("amqp.exchange", envelope.getExchange())
.withTag("amqp.routing_key", envelope.getRoutingKey());
}
scope = spanBuilder.startActive(true);
.withTag("span.origin.type", delegate.getClass().getName())
.startActive(true);
CONSUMER_DECORATE.afterStart(scope);
CONSUMER_DECORATE.onDeliver(scope, queue, envelope);
} catch (final Exception e) {
log.debug("Instrumentation error in tracing consumer", e);
@ -113,11 +89,10 @@ public class TracedDelegatingConsumer implements Consumer {
delegate.handleDelivery(consumerTag, envelope, properties, body);
} catch (final Throwable throwable) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
CONSUMER_DECORATE.onError(scope, throwable);
throw throwable;
} finally {
CONSUMER_DECORATE.beforeFinish(scope);
scope.close();
}
}

View File

@ -370,6 +370,7 @@ class RabbitMQTest extends AgentTestRunner {
}
"$Tags.COMPONENT.key" "rabbitmq-amqp"
"$Tags.PEER_HOSTNAME.key" { it == null || it instanceof String }
"$Tags.PEER_HOST_IPV4.key" { "127.0.0.1" }
"$Tags.PEER_PORT.key" { it == null || it instanceof Integer }
switch (tag("amqp.command")) {