From 2146678d0ab28ca1cec4858684a325d8224cc157 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 7 Mar 2019 17:30:52 -0500 Subject: [PATCH] Migrate RabbitMQ instrumentation to Decorator --- .../trace/agent/decorator/BaseDecorator.java | 5 +- .../agent/decorator/BaseDecoratorTest.groovy | 6 +- .../amqp/RabbitChannelInstrumentation.java | 70 ++++------- .../amqp/RabbitCommandInstrumentation.java | 24 ++-- .../rabbitmq/amqp/RabbitDecorator.java | 115 ++++++++++++++++++ .../amqp/TracedDelegatingConsumer.java | 41 ++----- .../src/test/groovy/RabbitMQTest.groovy | 1 + 7 files changed, 169 insertions(+), 93 deletions(-) create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/decorator/BaseDecorator.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/decorator/BaseDecorator.java index fa6a982d9d..958241ac4b 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/decorator/BaseDecorator.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/decorator/BaseDecorator.java @@ -93,10 +93,10 @@ public abstract class BaseDecorator { public Span onPeerConnection(final Span span, final InetSocketAddress remoteConnection) { assert span != null; if (remoteConnection != null) { + onPeerConnection(span, remoteConnection.getAddress()); + span.setTag(Tags.PEER_HOSTNAME.getKey(), remoteConnection.getHostName()); span.setTag(Tags.PEER_PORT.getKey(), remoteConnection.getPort()); - - onPeerConnection(span, remoteConnection.getAddress()); } return span; } @@ -104,6 +104,7 @@ public abstract class BaseDecorator { public Span onPeerConnection(final Span span, final InetAddress remoteAddress) { assert span != null; if (remoteAddress != null) { + span.setTag(Tags.PEER_HOSTNAME.getKey(), remoteAddress.getHostName()); if (remoteAddress instanceof Inet4Address) { Tags.PEER_HOST_IPV4.set(span, remoteAddress.getHostAddress()); } else if (remoteAddress instanceof Inet6Address) { diff --git a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/decorator/BaseDecoratorTest.groovy b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/decorator/BaseDecoratorTest.groovy index 22c49f017c..aff33f2d93 100644 --- a/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/decorator/BaseDecoratorTest.groovy +++ b/dd-java-agent/agent-tooling/src/test/groovy/datadog/trace/agent/decorator/BaseDecoratorTest.groovy @@ -33,7 +33,11 @@ class BaseDecoratorTest extends Specification { decorator.onPeerConnection(span, connection) then: - 1 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName) + if (connection.getAddress()) { + 2 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName) + } else { + 1 * span.setTag(Tags.PEER_HOSTNAME.key, connection.hostName) + } 1 * span.setTag(Tags.PEER_PORT.key, connection.port) if (connection.address instanceof Inet4Address) { 1 * span.setTag(Tags.PEER_HOST_IPV4.key, connection.address.hostAddress) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index ad58120b32..ee9c2c0929 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -1,7 +1,9 @@ package datadog.trace.instrumentation.rabbitmq.amqp; import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; -import static io.opentracing.log.Fields.ERROR_OBJECT; +import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE; +import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; +import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.PRODUCER_DECORATE; import static net.bytebuddy.matcher.ElementMatchers.canThrow; import static net.bytebuddy.matcher.ElementMatchers.isGetter; import static net.bytebuddy.matcher.ElementMatchers.isInterface; @@ -22,7 +24,6 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.MessageProperties; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; import io.opentracing.Scope; @@ -33,7 +34,6 @@ import io.opentracing.propagation.Format; import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -58,6 +58,11 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { return new String[] { + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".RabbitDecorator", + packageName + ".RabbitDecorator$1", + packageName + ".RabbitDecorator$2", packageName + ".TextMapInjectAdapter", packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", @@ -111,27 +116,23 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { final Connection connection = channel.getConnection(); - return GlobalTracer.get() - .buildSpan("amqp.command") - .withTag(DDTags.SERVICE_NAME, "rabbitmq") - .withTag(DDTags.RESOURCE_NAME, method) - .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CLIENT) - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) - .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") - .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) - .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) - .startActive(true); + final Scope scope = + GlobalTracer.get() + .buildSpan("amqp.command") + .withTag(DDTags.RESOURCE_NAME, method) + .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) + .startActive(true); + DECORATE.afterStart(scope); + DECORATE.onPeerConnection(scope.span(), connection.getAddress()); + return scope; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { if (scope != null) { - if (throwable != null) { - final Span span = scope.span(); - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); - } + DECORATE.onError(scope, throwable); + DECORATE.beforeFinish(scope); scope.close(); CallDepthThreadLocalMap.reset(Channel.class); } @@ -148,16 +149,8 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { final Span span = GlobalTracer.get().activeSpan(); if (span != null) { - final String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; - final String routing = - routingKey == null || routingKey.isEmpty() - ? "" - : routingKey.startsWith("amq.gen-") ? "" : routingKey; - span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing); - span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); - span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); - span.setTag("amqp.exchange", exchange); - span.setTag("amqp.routing_key", routingKey); + PRODUCER_DECORATE.afterStart(span); // Overwrite tags set by generic decorator. + PRODUCER_DECORATE.onPublish(span, exchange, routingKey); span.setTag("message.size", body == null ? 0 : body.length); // This is the internal behavior when props are null. We're just doing it earlier now. @@ -243,30 +236,19 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { final Integer length = response == null ? null : response.getBody().length; - final String queueName = queue.startsWith("amq.gen-") ? "" : queue; - final Span span = GlobalTracer.get() .buildSpan("amqp.command") .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) .asChildOf(parentContext) - .withTag(DDTags.SERVICE_NAME, "rabbitmq") - .withTag(DDTags.RESOURCE_NAME, "basic.get " + queueName) - .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) - .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") - .withTag("amqp.command", "basic.get") - .withTag("amqp.queue", queue) .withTag("message.size", length) - .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) .start(); - - if (throwable != null) { - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); - } - + CONSUMER_DECORATE.afterStart(span); + CONSUMER_DECORATE.onGet(span, queue); + CONSUMER_DECORATE.onPeerConnection(span, connection.getAddress()); + CONSUMER_DECORATE.onError(span, throwable); + CONSUMER_DECORATE.beforeFinish(span); span.finish(); CallDepthThreadLocalMap.reset(Channel.class); } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index 40ed55cdc5..9511ad63cf 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.rabbitmq.amqp; import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isInterface; @@ -9,9 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.not; import com.google.auto.service.AutoService; import com.rabbitmq.client.Command; -import com.rabbitmq.client.Method; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.api.interceptor.MutableSpan; import io.opentracing.Span; import io.opentracing.util.GlobalTracer; @@ -36,8 +35,14 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { return new String[] { - // These are only used by muzzleCheck. - packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".RabbitDecorator", + packageName + ".RabbitDecorator$1", + packageName + ".RabbitDecorator$2", + // These are only used by muzzleCheck: + packageName + ".TextMapExtractAdapter", + packageName + ".TracedDelegatingConsumer", }; } @@ -51,16 +56,9 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { public static void setResourceNameAddHeaders(@Advice.This final Command command) { final Span span = GlobalTracer.get().activeSpan(); - final Method method = command.getMethod(); - if (span instanceof MutableSpan && method != null) { + if (span instanceof MutableSpan && command.getMethod() != null) { if (((MutableSpan) span).getOperationName().equals("amqp.command")) { - final String name = method.protocolMethodName(); - - if (!name.equals("basic.publish")) { - // Don't overwrite the name already set. - span.setTag(DDTags.RESOURCE_NAME, name); - } - span.setTag("amqp.command", name); + DECORATE.onCommand(span, command); } } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java new file mode 100644 index 0000000000..66b3122f5b --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java @@ -0,0 +1,115 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Envelope; +import datadog.trace.agent.decorator.ClientDecorator; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.tag.Tags; + +public class RabbitDecorator extends ClientDecorator { + public static final RabbitDecorator DECORATE = new RabbitDecorator(); + + public static final RabbitDecorator PRODUCER_DECORATE = + new RabbitDecorator() { + @Override + protected String spanKind() { + return Tags.SPAN_KIND_PRODUCER; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_PRODUCER; + } + }; + + public static final RabbitDecorator CONSUMER_DECORATE = + new RabbitDecorator() { + @Override + protected String spanKind() { + return Tags.SPAN_KIND_CONSUMER; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_CONSUMER; + } + }; + + @Override + protected String[] instrumentationNames() { + return new String[] {"amqp", "rabbitmq"}; + } + + @Override + protected String service() { + return "rabbitmq"; + } + + @Override + protected String component() { + return "rabbitmq-amqp"; + } + + @Override + protected String spanKind() { + return Tags.SPAN_KIND_CLIENT; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_CLIENT; + } + + public void onPublish(final Span span, final String exchange, final String routingKey) { + final String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; + final String routing = + routingKey == null || routingKey.isEmpty() + ? "" + : routingKey.startsWith("amq.gen-") ? "" : routingKey; + span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing); + span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); + span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); + span.setTag("amqp.command", "basic.publish"); + span.setTag("amqp.exchange", exchange); + span.setTag("amqp.routing_key", routingKey); + } + + public void onGet(final Span span, final String queue) { + final String queueName = queue.startsWith("amq.gen-") ? "" : queue; + span.setTag(DDTags.RESOURCE_NAME, "basic.get " + queueName); + + span.setTag("amqp.command", "basic.get"); + span.setTag("amqp.queue", queue); + } + + public void onDeliver(final Scope scope, final String queue, final Envelope envelope) { + final Span span = scope.span(); + + String queueName = queue; + if (queue == null || queue.isEmpty()) { + queueName = ""; + } else if (queue.startsWith("amq.gen-")) { + queueName = ""; + } + span.setTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName); + span.setTag("amqp.command", "basic.deliver"); + + if (envelope != null) { + span.setTag("amqp.exchange", envelope.getExchange()); + span.setTag("amqp.routing_key", envelope.getRoutingKey()); + } + } + + public void onCommand(final Span span, final Command command) { + final String name = command.getMethod().protocolMethodName(); + + if (!name.equals("basic.publish")) { + // Don't overwrite the name already set. + span.setTag(DDTags.RESOURCE_NAME, name); + } + span.setTag("amqp.command", name); + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java index dc39ec984b..e9a3c5f1c6 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -1,23 +1,17 @@ package datadog.trace.instrumentation.rabbitmq.amqp; -import static io.opentracing.log.Fields.ERROR_OBJECT; +import static datadog.trace.instrumentation.rabbitmq.amqp.RabbitDecorator.CONSUMER_DECORATE; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; -import datadog.trace.api.DDSpanTypes; -import datadog.trace.api.DDTags; import io.opentracing.Scope; -import io.opentracing.Span; import io.opentracing.SpanContext; -import io.opentracing.Tracer; import io.opentracing.noop.NoopScopeManager; import io.opentracing.propagation.Format; -import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; import java.io.IOException; -import java.util.Collections; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -76,33 +70,15 @@ public class TracedDelegatingConsumer implements Consumer { : GlobalTracer.get() .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); - String queueName = queue; - if (queue == null || queue.isEmpty()) { - queueName = ""; - } else if (queue.startsWith("amq.gen-")) { - queueName = ""; - } - - final Tracer.SpanBuilder spanBuilder = + scope = GlobalTracer.get() .buildSpan("amqp.command") .asChildOf(parentContext) - .withTag(DDTags.SERVICE_NAME, "rabbitmq") - .withTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName) - .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) - .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") - .withTag("amqp.command", "basic.deliver") .withTag("message.size", body == null ? 0 : body.length) - .withTag("span.origin.type", delegate.getClass().getName()); - - if (envelope != null) { - spanBuilder - .withTag("amqp.exchange", envelope.getExchange()) - .withTag("amqp.routing_key", envelope.getRoutingKey()); - } - - scope = spanBuilder.startActive(true); + .withTag("span.origin.type", delegate.getClass().getName()) + .startActive(true); + CONSUMER_DECORATE.afterStart(scope); + CONSUMER_DECORATE.onDeliver(scope, queue, envelope); } catch (final Exception e) { log.debug("Instrumentation error in tracing consumer", e); @@ -113,11 +89,10 @@ public class TracedDelegatingConsumer implements Consumer { delegate.handleDelivery(consumerTag, envelope, properties, body); } catch (final Throwable throwable) { - final Span span = scope.span(); - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + CONSUMER_DECORATE.onError(scope, throwable); throw throwable; } finally { + CONSUMER_DECORATE.beforeFinish(scope); scope.close(); } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 7a5c08d089..fe1db12e0b 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -370,6 +370,7 @@ class RabbitMQTest extends AgentTestRunner { } "$Tags.COMPONENT.key" "rabbitmq-amqp" "$Tags.PEER_HOSTNAME.key" { it == null || it instanceof String } + "$Tags.PEER_HOST_IPV4.key" { "127.0.0.1" } "$Tags.PEER_PORT.key" { it == null || it instanceof Integer } switch (tag("amqp.command")) {