From 8ffeb5bedf3f1b9d33ade0420f0c13304e424d8f Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 12 Oct 2018 11:43:32 +1000 Subject: [PATCH 1/6] =?UTF-8?q?Add=20instrumentation=20for=20RabbitMQ?= =?UTF-8?q?=E2=80=99s=20AMQP=20library?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .circleci/config.yml | 2 + .../rabbitmq-amqp-2.6.gradle | 47 +++ .../amqp/RabbitChannelInstrumentation.java | 286 +++++++++++++++++ .../amqp/RabbitCommandInstrumentation.java | 52 ++++ .../rabbitmq/amqp/TextMapExtractAdapter.java | 28 ++ .../rabbitmq/amqp/TextMapInjectAdapter.java | 25 ++ .../amqp/TracedDelegatingConsumer.java | 109 +++++++ .../src/test/groovy/RabbitMQTest.groovy | 293 ++++++++++++++++++ .../spymemcached-2.12.gradle | 2 +- .../agent/test/asserts/TagsAssert.groovy | 12 +- .../java/datadog/trace/api/DDSpanTypes.java | 1 + .../opentracing/propagation/HTTPCodec.java | 8 +- gradle/dependencies.gradle | 1 + settings.gradle | 1 + 14 files changed, 859 insertions(+), 8 deletions(-) create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java create mode 100644 dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy diff --git a/.circleci/config.yml b/.circleci/config.yml index 3a5f616d71..4a079f031d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -52,6 +52,8 @@ jobs: - image: *default_container # This is used by spymemcached instrumentation tests - image: memcached + # This is used by rabbitmq instrumentation tests + - image: rabbitmq steps: - checkout diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle new file mode 100644 index 0000000000..483b92e946 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle @@ -0,0 +1,47 @@ +muzzle { + pass { + group = "com.rabbitmq" + module = 'amqp-client' + versions = "[2.6.0,)" + assertInverse = true + } +} + +apply from: "${rootDir}/gradle/java.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + + compile project(':dd-trace-ot') + compile project(':dd-java-agent:agent-tooling') + + compile deps.bytebuddy + compile deps.opentracing + compile deps.autoservice + annotationProcessor deps.autoservice + implementation deps.autoservice + + testCompile project(':dd-java-agent:testing') + + testCompile group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + testCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.1.0.RELEASE' + + testCompile deps.testcontainers + + latestDepTestCompile group: 'com.rabbitmq', name: 'amqp-client', version: '+' + latestDepTestCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '+' +} + +configurations.testRuntime { + resolutionStrategy { + force group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java new file mode 100644 index 0000000000..9c4abc9853 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -0,0 +1,286 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static io.opentracing.log.Fields.ERROR_OBJECT; +import static net.bytebuddy.matcher.ElementMatchers.canThrow; +import static net.bytebuddy.matcher.ElementMatchers.isGetter; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.isSetter; +import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.MessageProperties; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.noop.NoopSpan; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class RabbitChannelInstrumentation extends Instrumenter.Default { + + public RabbitChannelInstrumentation() { + super("amqp", "rabbitmq"); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Channel"))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", + packageName + ".TracedDelegatingConsumer", + }; + } + + @Override + public Map transformers() { + // We want the advice applied in a specific order, so use an ordered map. + final Map transformers = new LinkedHashMap<>(); + transformers.put( + isMethod() + .and( + not( + isGetter() + .or(isSetter()) + .or(nameEndsWith("Listener")) + .or(nameEndsWith("Listeners")) + .or(named("processAsync")) + .or(named("open")) + .or(named("close")) + .or(named("abort")) + .or(named("basicGet")))) + .and(isPublic()) + .and(canThrow(IOException.class).or(canThrow(InterruptedException.class))), + ChannelMethodAdvice.class.getName()); + transformers.put( + isMethod().and(named("basicPublish")).and(takesArguments(6)), + ChannelPublishAdvice.class.getName()); + transformers.put( + isMethod().and(named("basicGet")).and(takesArgument(0, String.class)), + ChannelGetAdvice.class.getName()); + transformers.put( + isMethod() + .and(named("basicConsume")) + .and(takesArgument(0, String.class)) + .and(takesArgument(6, named("com.rabbitmq.client.Consumer"))), + ChannelConsumeAdvice.class.getName()); + return transformers; + } + + public static class ChannelMethodAdvice { + @Advice.OnMethodEnter + public static Scope startSpan( + @Advice.This final Channel channel, @Advice.Origin("Channel.#m") final String method) { + final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); + if (callDepth > 0) { + return null; + } + + final Connection connection = channel.getConnection(); + + return GlobalTracer.get() + .buildSpan("amqp.command") + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, method) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CLIENT) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) + .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) + .startActive(true); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { + if (scope != null) { + if (throwable != null) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } + scope.close(); + CallDepthThreadLocalMap.reset(Channel.class); + } + } + + // Added to ensure consistent muzzle validation for all instrumentation. + public static void muzzleCheck(final Command cmd) { + com.rabbitmq.client.Method.class.getName(); + cmd.getMethod(); + } + } + + public static class ChannelPublishAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void setResourceNameAddHeaders( + @Advice.Argument(0) final String exchange, + @Advice.Argument(1) final String routingKey, + @Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props, + @Advice.Argument(5) final byte[] body) { + final Span span = GlobalTracer.get().activeSpan(); + + if (span != null) { + span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchange); + span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); + span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); + span.setTag("amqp.exchange", exchange); + span.setTag("amqp.routing_key", routingKey); + span.setTag("message.size", body == null ? 0 : body.length); + + // This is the internal behavior when props are null. We're just doing it earlier now. + if (props == null) { + props = MessageProperties.MINIMAL_BASIC; + } + span.setTag("amqp.delivery_mode", props.getDeliveryMode()); + + // We need to copy the BasicProperties and provide a header map we can modify + Map headers = props.getHeaders(); + headers = (headers == null) ? new HashMap() : new HashMap<>(headers); + + GlobalTracer.get() + .inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(headers)); + + props = + new AMQP.BasicProperties( + props.getContentType(), + props.getContentEncoding(), + headers, + props.getDeliveryMode(), + props.getPriority(), + props.getCorrelationId(), + props.getReplyTo(), + props.getExpiration(), + props.getMessageId(), + props.getTimestamp(), + props.getType(), + props.getUserId(), + props.getAppId(), + props.getClusterId()); + } + } + } + + public static class ChannelGetAdvice { + @Advice.OnMethodEnter + public static long takeTimestamp( + @Advice.Local("placeholderScope") Scope scope, @Advice.Local("callDepth") int callDepth) { + callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); + // Don't want RabbitCommandInstrumentation to mess up our actual parent span. + scope = GlobalTracer.get().scopeManager().activate(NoopSpan.INSTANCE, true); + return System.currentTimeMillis(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void extractAndStartSpan( + @Advice.This final Channel channel, + @Advice.Argument(0) final String queue, + @Advice.Enter final long startTime, + @Advice.Local("placeholderScope") final Scope scope, + @Advice.Local("callDepth") final int callDepth, + @Advice.Return final GetResponse response, + @Advice.Thrown final Throwable throwable) { + + if (scope.span() instanceof NoopSpan) { + scope.close(); + } + + if (callDepth > 0) { + return; + } + SpanContext parentContext = null; + + if (response != null && response.getProps() != null) { + final Map headers = response.getProps().getHeaders(); + + parentContext = + headers == null + ? null + : GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); + } + + if (parentContext == null) { + final Span parent = GlobalTracer.get().activeSpan(); + if (parent != null) { + parentContext = parent.context(); + } + } + + final Connection connection = channel.getConnection(); + + final Integer length = response == null ? null : response.getBody().length; + + final String queueName = queue.startsWith("amq.gen-") ? "" : queue; + + final Span span = + GlobalTracer.get() + .buildSpan("amqp.command") + .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) + .asChildOf(parentContext) + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, "basic.get " + queueName) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag("amqp.command", "basic.get") + .withTag("amqp.queue", queue) + .withTag("message.size", length) + .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) + .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) + .start(); + + if (throwable != null) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } + + span.finish(); + CallDepthThreadLocalMap.reset(Channel.class); + } + } + + public static class ChannelConsumeAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrapConsumer( + @Advice.Argument(0) final String queue, + @Advice.Argument(value = 6, readOnly = false) Consumer consumer) { + // We have to save off the queue name here because it isn't available to the consumer later. + if (consumer != null) { + consumer = new TracedDelegatingConsumer(queue, consumer); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java new file mode 100644 index 0000000000..dffb5b5b3b --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; + +import com.google.auto.service.AutoService; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Method; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDTags; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class RabbitCommandInstrumentation extends Instrumenter.Default { + + public RabbitCommandInstrumentation() { + super("amqp", "rabbitmq"); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Command"))); + } + + @Override + public Map transformers() { + return Collections.singletonMap(isConstructor(), CommandConstructorAdvice.class.getName()); + } + + public static class CommandConstructorAdvice { + @Advice.OnMethodExit + public static void setResourceNameAddHeaders(@Advice.This final Command command) { + final Span span = GlobalTracer.get().activeSpan(); + + final Method method = command.getMethod(); + if (span != null && method != null) { + final String name = method.protocolMethodName(); + span.setTag(DDTags.RESOURCE_NAME, name); + span.setTag("amqp.command", name); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java new file mode 100644 index 0000000000..79709ef415 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java @@ -0,0 +1,28 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import io.opentracing.propagation.TextMap; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +// TextMap works with , but the type we're given is +public class TextMapExtractAdapter implements TextMap { + + private final Map map = new HashMap<>(); + + public TextMapExtractAdapter(final Map headers) { + for (final Map.Entry entry : headers.entrySet()) { + map.put(entry.getKey(), entry.getValue().toString()); + } + } + + @Override + public Iterator> iterator() { + return map.entrySet().iterator(); + } + + @Override + public void put(final String key, final String value) { + throw new UnsupportedOperationException("Use inject adapter instead"); + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java new file mode 100644 index 0000000000..5f28b7383e --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import io.opentracing.propagation.TextMap; +import java.util.Iterator; +import java.util.Map; + +// TextMap works with , but the type we're given is +public class TextMapInjectAdapter implements TextMap { + private final Map map; + + public TextMapInjectAdapter(final Map map) { + this.map = map; + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException( + "TextMapInjectAdapter should only be used with Tracer.inject()"); + } + + @Override + public void put(final String key, final String value) { + map.put(key, value); + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java new file mode 100644 index 0000000000..e7307d8f9d --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -0,0 +1,109 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static io.opentracing.log.Fields.ERROR_OBJECT; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.noop.NoopScopeManager; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Wrapping the consumer instead of instrumenting it directly because it doesn't get access to the + * queue name when the message is consumed. + */ +public class TracedDelegatingConsumer implements Consumer { + private final String queueName; + private final Consumer delegate; + + public TracedDelegatingConsumer(final String queueName, final Consumer delegate) { + this.queueName = queueName; + this.delegate = delegate; + } + + @Override + public void handleConsumeOk(final String consumerTag) { + delegate.handleConsumeOk(consumerTag); + } + + @Override + public void handleCancelOk(final String consumerTag) { + delegate.handleCancelOk(consumerTag); + } + + @Override + public void handleCancel(final String consumerTag) throws IOException { + delegate.handleCancel(consumerTag); + } + + @Override + public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { + delegate.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk() { + delegate.handleRecoverOk(); + } + + @Override + public void handleDelivery( + final String consumerTag, + final Envelope envelope, + final AMQP.BasicProperties properties, + final byte[] body) + throws IOException { + Scope scope = NoopScopeManager.NoopScope.INSTANCE; + try { + final Map headers = properties.getHeaders(); + final SpanContext parentContext = + headers == null + ? null + : GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); + + final String resourceName = + queueName == null ? "basic.deliver null" : "basic.deliver " + queueName; + + scope = + GlobalTracer.get() + .buildSpan("amqp.command") + .asChildOf(parentContext) + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, resourceName) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag("amqp.command", "basic.deliver") + .withTag("amqp.exchange", envelope.getExchange()) + .withTag("amqp.routing_key", envelope.getRoutingKey()) + .withTag("message.size", body == null ? 0 : body.length) + .withTag("span.origin.type", delegate.getClass().getName()) + .startActive(true); + } finally { + try { + + // Call delegate. + delegate.handleDelivery(consumerTag, envelope, properties, body); + + } catch (final Throwable throwable) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } finally { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy new file mode 100644 index 0000000000..730ab3fafd --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy @@ -0,0 +1,293 @@ +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.AlreadyClosedException +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Connection +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.DefaultConsumer +import com.rabbitmq.client.Envelope +import com.rabbitmq.client.GetResponse +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags +import org.springframework.amqp.core.AmqpAdmin +import org.springframework.amqp.core.AmqpTemplate +import org.springframework.amqp.core.Queue +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory +import org.springframework.amqp.rabbit.core.RabbitAdmin +import org.springframework.amqp.rabbit.core.RabbitTemplate +import org.testcontainers.containers.GenericContainer +import spock.lang.Requires +import spock.lang.Shared + +import java.util.concurrent.Phaser + +// Do not run tests locally on Java7 since testcontainers are not compatible with Java7 +// It is fine to run on CI because CI provides memcached externally, not through testcontainers +@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) +class RabbitMQTest extends AgentTestRunner { + + /* + Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because + 'testcontainers' are built for Java 8 and Java 7 cannot load this class. + */ + @Shared + def rabbbitMQContainer + @Shared + def defaultRabbitMQPort = 5672 + @Shared + InetSocketAddress rabbitmqAddress = new InetSocketAddress("127.0.0.1", defaultRabbitMQPort) + + ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port) + Connection conn = factory.newConnection() + Channel channel = conn.createChannel() + + def setupSpec() { + + /* + CI will provide us with memcached container running along side our build. + When building locally, however, we need to take matters into our own hands + and we use 'testcontainers' for this. + */ + if ("true" != System.getenv("CI")) { + rabbbitMQContainer = new GenericContainer('rabbitmq:latest') + .withExposedPorts(defaultRabbitMQPort) +// .withLogConsumer { output -> +// print output.utf8String +// } + rabbbitMQContainer.start() + rabbitmqAddress = new InetSocketAddress( + rabbbitMQContainer.containerIpAddress, + rabbbitMQContainer.getMappedPort(defaultRabbitMQPort) + ) + } + } + + def cleanupSpec() { + if (rabbbitMQContainer) { + rabbbitMQContainer.stop() + } + } + + def cleanup() { + try { + channel.close() + conn.close() + } catch (AlreadyClosedException e) { + // Ignore + } + } + + def "test rabbit publish/get"() { + setup: + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = channel.queueDeclare().getQueue() + channel.queueBind(queueName, exchangeName, routingKey) + + channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + + GetResponse response = channel.basicGet(queueName, true) + + expect: + new String(response.getBody()) == "Hello, world!" + + and: + assertTraces(5) { + trace(0, 1) { + rabbitSpan(it, "exchange.declare") + } + trace(1, 1) { + rabbitSpan(it, "queue.declare") + } + trace(2, 1) { + rabbitSpan(it, "queue.bind") + } + trace(3, 1) { + rabbitSpan(it, "basic.publish") + } + trace(4, 1) { + rabbitSpan(it, "basic.get ", TEST_WRITER[3][0]) + } + } + + where: + exchangeName = "some-exchange" + routingKey = "some-routing-key" + } + + def "test rabbit consume #messageCount messages"() { + setup: + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = channel.queueDeclare("some-queue", false, true, true, null).getQueue() + channel.queueBind(queueName, exchangeName, "") + + def phaser = new Phaser() + phaser.register() + phaser.register() + def deliveries = [] + + Consumer callback = new DefaultConsumer(channel) { + @Override + void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + phaser.arriveAndAwaitAdvance() // Ensure publish spans are reported first. + deliveries << new String(body) + } + } + + channel.basicConsume(queueName, callback) + + (1..messageCount).each { + TEST_WRITER.waitForTraces(2 + (it * 2)) + channel.basicPublish(exchangeName, "", null, "msg $it".getBytes()) + TEST_WRITER.waitForTraces(3 + (it * 2)) + phaser.arriveAndAwaitAdvance() + } + + expect: + assertTraces(4 + (messageCount * 2)) { + trace(0, 1) { + rabbitSpan(it, "exchange.declare") + } + trace(1, 1) { + rabbitSpan(it, "queue.declare") + } + trace(2, 1) { + rabbitSpan(it, "queue.bind") + } + trace(3, 1) { + rabbitSpan(it, "basic.consume") + } + (1..messageCount).each { + def publishSpan = null + trace(2 + (it * 2), 1) { + publishSpan = span(0) + rabbitSpan(it, "basic.publish") + } + trace(3 + (it * 2), 1) { + rabbitSpan(it, "basic.deliver some-queue", publishSpan) + } + } + } + + deliveries == (1..messageCount).collect { "msg $it" } + + where: + exchangeName = "some-exchange" + messageCount << (1..3) + } + + def "test rabbit error (#command)"() { + when: + closure.call(channel) + + then: + def throwable = thrown(exception) + + and: + + assertTraces(1) { + trace(0, 1) { + rabbitSpan(it, command, null, throwable, errorMsg) + } + } + + where: + command | exception | errorMsg | closure + "exchange.declare" | IOException | null | { + it.exchangeDeclare("some-exchange", "invalid-type", true) + } + "Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | { + it.basicConsume(null, null) + } + "basic.get " | IOException | null | { + it.basicGet("amq.gen-invalid-channel", true) + } + } + + def "test spring rabbit"() { + setup: + def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port) + AmqpAdmin admin = new RabbitAdmin(connectionFactory) + def queue = new Queue("some-routing-queue", false, true, true, null) + admin.declareQueue(queue) + AmqpTemplate template = new RabbitTemplate(connectionFactory) + template.convertAndSend(queue.name, "foo") + String message = (String) template.receiveAndConvert(queue.name) + + expect: + message == "foo" + + and: + assertTraces(3) { + trace(0, 1) { + rabbitSpan(it, "queue.declare") + } + trace(1, 1) { + rabbitSpan(it, "basic.publish") + } + trace(2, 1) { + rabbitSpan(it, "basic.get $queue.name", TEST_WRITER[1][0]) + } + } + } + + def rabbitSpan(TraceAssert trace, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) { + trace.span(0) { + serviceName "rabbitmq" + operationName "amqp.command" + resourceName resource + + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + + errored exception != null + + tags { + if (exception) { + errorTags(exception.class, errorMsg) + } + "$Tags.COMPONENT.key" "rabbitmq-amqp" + "$Tags.PEER_HOSTNAME.key" { it == null || it instanceof String } + "$Tags.PEER_PORT.key" { it == null || it instanceof Integer } + + switch (tag("amqp.command")) { + case "basic.publish": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_PRODUCER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER + "amqp.command" "basic.publish" + "amqp.exchange" { it == null || it == "some-exchange" } + "amqp.routing_key" { it == null || it == "some-routing-key" || it == "some-routing-queue" } + "amqp.delivery_mode" { it == null || it == 2 } + "message.size" Integer + break + case "basic.get": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER + "amqp.command" "basic.get" + "amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") } + "message.size" { it == null || it instanceof Integer } + break + case "basic.deliver": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER + "amqp.command" "basic.deliver" + "span.origin.type" "RabbitMQTest\$1" + "amqp.exchange" "some-exchange" + "message.size" Integer + break + default: + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CLIENT + "amqp.command" { it == null || it == resource } + } + defaultTags() + } + } + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle index 7c2f33bd44..a7d3da67d6 100644 --- a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle +++ b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle @@ -32,7 +32,7 @@ dependencies { testCompile project(':dd-java-agent:testing') testCompile group: 'net.spy', name: 'spymemcached', version: '2.12.0' - testCompile group: 'org.testcontainers', name: 'testcontainers', version: '1.7.3' + testCompile deps.testcontainers } configurations.latestDepTestCompile { diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy index 2083266af5..0ca039234c 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy @@ -7,7 +7,7 @@ class TagsAssert { private final Set assertedTags = new TreeSet<>() private TagsAssert(DDSpan span) { - this.tags = new TreeMap(span.tags) + this.tags = span.tags } static void assertTags(DDSpan span, @@ -56,14 +56,20 @@ class TagsAssert { } } + def tag(String name) { + return tags[name] + } + def methodMissing(String name, args) { - if (args.length != 1) { + if (args.length == 0) { throw new IllegalArgumentException(args.toString()) } tag(name, args[0]) } void assertTagsAllVerified() { - assert tags.keySet() == assertedTags + def set = new TreeMap<>(tags).keySet() + set.removeAll(assertedTags) + assert tags.entrySet() != assertedTags && set.isEmpty() } } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java index b8a22df2ca..4491b970e1 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java @@ -15,6 +15,7 @@ public class DDSpanTypes { public static final String MEMCACHED = "memcached"; public static final String ELASTICSEARCH = "elasticsearch"; + public static final String MESSAGE_CLIENT = "queue"; public static final String MESSAGE_CONSUMER = "queue"; public static final String MESSAGE_PRODUCER = "queue"; } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java index c1dcf6767c..6df5cbcd27 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java @@ -18,7 +18,7 @@ public class HTTPCodec implements Codec { // uint 64 bits max value, 2^64 - 1 static final BigInteger BIG_INTEGER_UINT64_MAX = - (new BigInteger("2")).pow(64).subtract(BigInteger.ONE); + new BigInteger("2").pow(64).subtract(BigInteger.ONE); private static final String OT_BAGGAGE_PREFIX = "ot-baggage-"; private static final String TRACE_ID_KEY = "x-datadog-trace-id"; @@ -120,16 +120,16 @@ public class HTTPCodec implements Codec { * @return the ID in String format if it passes validations * @throws IllegalArgumentException if val is not a number or if the number is out of range */ - private String validateUInt64BitsID(String val) throws IllegalArgumentException { + private String validateUInt64BitsID(final String val) throws IllegalArgumentException { try { - BigInteger validate = new BigInteger(val); + final BigInteger validate = new BigInteger(val); if (validate.compareTo(BigInteger.ZERO) == -1 || validate.compareTo(BIG_INTEGER_UINT64_MAX) == 1) { throw new IllegalArgumentException( "ID out of range, must be between 0 and 2^64-1, got: " + val); } return val; - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { throw new IllegalArgumentException( "Expecting a number for trace ID or span ID, but got: " + val, nfe); } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index e5e2e3b148..bc02b47888 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -50,6 +50,7 @@ ext { bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"), groovy : "org.codehaus.groovy:groovy-all:${versions.groovy}", junit : "junit:junit:${versions.junit}", + testcontainers : "org.testcontainers:testcontainers:1.7.3", testLogging : [ dependencies.create(group: 'ch.qos.logback', name: 'logback-classic', version: versions.logback), dependencies.create(group: 'org.slf4j', name: 'log4j-over-slf4j', version: versions.slf4j), diff --git a/settings.gradle b/settings.gradle index 1370f11b61..1f8df945fa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -46,6 +46,7 @@ include ':dd-java-agent:instrumentation:netty-4.1' include ':dd-java-agent:instrumentation:okhttp-3' include ':dd-java-agent:instrumentation:osgi-classloading' include ':dd-java-agent:instrumentation:play-2.4' +include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.6' include ':dd-java-agent:instrumentation:ratpack-1.4' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3' From 8ff5c7c36c55c2d4b3f22c9df0dfb54944286a90 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 18 Oct 2018 16:08:37 +1000 Subject: [PATCH 2/6] Review fix comments and version range. --- .../rabbitmq-amqp-2.7.gradle} | 8 ++++---- .../amqp/RabbitChannelInstrumentation.java | 12 +++-------- .../amqp/RabbitCommandInstrumentation.java | 17 ++++++++++++++++ .../rabbitmq/amqp/TextMapExtractAdapter.java | 0 .../rabbitmq/amqp/TextMapInjectAdapter.java | 0 .../amqp/TracedDelegatingConsumer.java | 20 +++++++++++-------- .../src/test/groovy/RabbitMQTest.groovy | 13 +++++++----- .../agent/test/asserts/TagsAssert.groovy | 3 +++ settings.gradle | 2 +- 9 files changed, 48 insertions(+), 27 deletions(-) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle => rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle} (95%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java (97%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java (75%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java (100%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java (100%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java (86%) rename dd-java-agent/instrumentation/{rabbitmq-amqp-2.6 => rabbitmq-amqp-2.7}/src/test/groovy/RabbitMQTest.groovy (94%) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle similarity index 95% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle index 483b92e946..df36e2cd19 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/rabbitmq-amqp-2.6.gradle +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle @@ -2,7 +2,7 @@ muzzle { pass { group = "com.rabbitmq" module = 'amqp-client' - versions = "[2.6.0,)" + versions = "[2.7.0,)" assertInverse = true } } @@ -18,7 +18,7 @@ testSets { } dependencies { - compileOnly group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + compileOnly group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' compile project(':dd-trace-ot') compile project(':dd-java-agent:agent-tooling') @@ -31,7 +31,7 @@ dependencies { testCompile project(':dd-java-agent:testing') - testCompile group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + testCompile group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' testCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.1.0.RELEASE' testCompile deps.testcontainers @@ -42,6 +42,6 @@ dependencies { configurations.testRuntime { resolutionStrategy { - force group: 'com.rabbitmq', name: 'amqp-client', version: '2.6.0' + force group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java similarity index 97% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index 9c4abc9853..78f6c59734 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -17,7 +17,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Command; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; @@ -135,12 +134,6 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { CallDepthThreadLocalMap.reset(Channel.class); } } - - // Added to ensure consistent muzzle validation for all instrumentation. - public static void muzzleCheck(final Command cmd) { - com.rabbitmq.client.Method.class.getName(); - cmd.getMethod(); - } } public static class ChannelPublishAdvice { @@ -153,7 +146,8 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { final Span span = GlobalTracer.get().activeSpan(); if (span != null) { - span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchange); + final String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; + span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName); span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); span.setTag("amqp.exchange", exchange); @@ -278,7 +272,7 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { @Advice.Argument(0) final String queue, @Advice.Argument(value = 6, readOnly = false) Consumer consumer) { // We have to save off the queue name here because it isn't available to the consumer later. - if (consumer != null) { + if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) { consumer = new TracedDelegatingConsumer(queue, consumer); } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java similarity index 75% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index dffb5b5b3b..88c700d951 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -31,6 +31,14 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Command"))); } + @Override + public String[] helperClassNames() { + return new String[] { + // These are only used by muzzleCheck. + packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", + }; + } + @Override public Map transformers() { return Collections.singletonMap(isConstructor(), CommandConstructorAdvice.class.getName()); @@ -48,5 +56,14 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { span.setTag("amqp.command", name); } } + + /** + * This instrumentation will match with 2.6, but the channel instrumentation only matches with + * 2.7 because of TracedDelegatingConsumer. This unused method is added to ensure consistent + * muzzle validation by preventing match with 2.6. + */ + public static void muzzleCheck(final TracedDelegatingConsumer consumer) { + consumer.handleRecoverOk(null); + } } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java similarity index 100% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java similarity index 100% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java similarity index 86% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java index e7307d8f9d..d3571b0750 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -24,11 +24,11 @@ import java.util.Map; * queue name when the message is consumed. */ public class TracedDelegatingConsumer implements Consumer { - private final String queueName; + private final String queue; private final Consumer delegate; - public TracedDelegatingConsumer(final String queueName, final Consumer delegate) { - this.queueName = queueName; + public TracedDelegatingConsumer(final String queue, final Consumer delegate) { + this.queue = queue; this.delegate = delegate; } @@ -53,8 +53,8 @@ public class TracedDelegatingConsumer implements Consumer { } @Override - public void handleRecoverOk() { - delegate.handleRecoverOk(); + public void handleRecoverOk(String consumerTag) { + delegate.handleRecoverOk(consumerTag); } @Override @@ -73,15 +73,19 @@ public class TracedDelegatingConsumer implements Consumer { : GlobalTracer.get() .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); - final String resourceName = - queueName == null ? "basic.deliver null" : "basic.deliver " + queueName; + String queueName = queue; + if (queue == null || queue.isEmpty()) { + queueName = ""; + } else if (queue.startsWith("amq.gen-")) { + queueName = ""; + } scope = GlobalTracer.get() .buildSpan("amqp.command") .asChildOf(parentContext) .withTag(DDTags.SERVICE_NAME, "rabbitmq") - .withTag(DDTags.RESOURCE_NAME, resourceName) + .withTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName) .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy similarity index 94% rename from dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy rename to dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 730ab3fafd..cafab9413f 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.6/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -26,7 +26,7 @@ import spock.lang.Shared import java.util.concurrent.Phaser // Do not run tests locally on Java7 since testcontainers are not compatible with Java7 -// It is fine to run on CI because CI provides memcached externally, not through testcontainers +// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers @Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) class RabbitMQTest extends AgentTestRunner { @@ -48,7 +48,7 @@ class RabbitMQTest extends AgentTestRunner { def setupSpec() { /* - CI will provide us with memcached container running along side our build. + CI will provide us with rabbitmq container running along side our build. When building locally, however, we need to take matters into our own hands and we use 'testcontainers' for this. */ @@ -121,7 +121,9 @@ class RabbitMQTest extends AgentTestRunner { def "test rabbit consume #messageCount messages"() { setup: channel.exchangeDeclare(exchangeName, "direct", false) - String queueName = channel.queueDeclare("some-queue", false, true, true, null).getQueue() + String queueName = (messageCount % 2 == 0) ? + channel.queueDeclare().getQueue() : + channel.queueDeclare("some-queue", false, true, true, null).getQueue() channel.queueBind(queueName, exchangeName, "") def phaser = new Phaser() @@ -145,6 +147,7 @@ class RabbitMQTest extends AgentTestRunner { TEST_WRITER.waitForTraces(3 + (it * 2)) phaser.arriveAndAwaitAdvance() } + def resource = messageCount % 2 == 0 ? "basic.deliver " : "basic.deliver $queueName" expect: assertTraces(4 + (messageCount * 2)) { @@ -167,7 +170,7 @@ class RabbitMQTest extends AgentTestRunner { rabbitSpan(it, "basic.publish") } trace(3 + (it * 2), 1) { - rabbitSpan(it, "basic.deliver some-queue", publishSpan) + rabbitSpan(it, resource, publishSpan) } } } @@ -176,7 +179,7 @@ class RabbitMQTest extends AgentTestRunner { where: exchangeName = "some-exchange" - messageCount << (1..3) + messageCount << (1..4) } def "test rabbit error (#command)"() { diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy index 0ca039234c..29b5ed270f 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy @@ -70,6 +70,9 @@ class TagsAssert { void assertTagsAllVerified() { def set = new TreeMap<>(tags).keySet() set.removeAll(assertedTags) + // The primary goal is to ensure the set is empty. + // tags and assertedTags are included via an "always true" comparison + // so they provide better context in the error message. assert tags.entrySet() != assertedTags && set.isEmpty() } } diff --git a/settings.gradle b/settings.gradle index 1f8df945fa..3095fcf270 100644 --- a/settings.gradle +++ b/settings.gradle @@ -46,7 +46,7 @@ include ':dd-java-agent:instrumentation:netty-4.1' include ':dd-java-agent:instrumentation:okhttp-3' include ':dd-java-agent:instrumentation:osgi-classloading' include ':dd-java-agent:instrumentation:play-2.4' -include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.6' +include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3' From 7ece2fe2a65553a66b14d13682430645f2dc05cb Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 19 Oct 2018 10:25:32 +1000 Subject: [PATCH 3/6] Prevent exchange name being overwritten in publish resource name. --- .../amqp/RabbitCommandInstrumentation.java | 5 ++- .../src/test/groovy/RabbitMQTest.groovy | 36 ++++++++++++++++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index 88c700d951..7c969154ec 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -52,7 +52,10 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { final Method method = command.getMethod(); if (span != null && method != null) { final String name = method.protocolMethodName(); - span.setTag(DDTags.RESOURCE_NAME, name); + if (!name.equals("basic.publish")) { + // Don't overwrite the name already set. + span.setTag(DDTags.RESOURCE_NAME, name); + } span.setTag("amqp.command", name); } } diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index cafab9413f..bab7fc9f6f 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -106,7 +106,7 @@ class RabbitMQTest extends AgentTestRunner { rabbitSpan(it, "queue.bind") } trace(3, 1) { - rabbitSpan(it, "basic.publish") + rabbitSpan(it, "basic.publish $exchangeName") } trace(4, 1) { rabbitSpan(it, "basic.get ", TEST_WRITER[3][0]) @@ -114,8 +114,34 @@ class RabbitMQTest extends AgentTestRunner { } where: - exchangeName = "some-exchange" - routingKey = "some-routing-key" + exchangeName | routingKey + "some-exchange" | "some-routing-key" + } + + def "test rabbit publish/get default exchange"() { + setup: + String queueName = channel.queueDeclare("some-routing-queue", false, true, true, null).getQueue() + String routingKey = queueName + + channel.basicPublish("", routingKey, null, "Hello, world!".getBytes()) + + GetResponse response = channel.basicGet(queueName, true) + + expect: + new String(response.getBody()) == "Hello, world!" + + and: + assertTraces(3) { + trace(0, 1) { + rabbitSpan(it, "queue.declare") + } + trace(1, 1) { + rabbitSpan(it, "basic.publish ") + } + trace(2, 1) { + rabbitSpan(it, "basic.get some-routing-queue", TEST_WRITER[1][0]) + } + } } def "test rabbit consume #messageCount messages"() { @@ -167,7 +193,7 @@ class RabbitMQTest extends AgentTestRunner { def publishSpan = null trace(2 + (it * 2), 1) { publishSpan = span(0) - rabbitSpan(it, "basic.publish") + rabbitSpan(it, "basic.publish $exchangeName") } trace(3 + (it * 2), 1) { rabbitSpan(it, resource, publishSpan) @@ -229,7 +255,7 @@ class RabbitMQTest extends AgentTestRunner { rabbitSpan(it, "queue.declare") } trace(1, 1) { - rabbitSpan(it, "basic.publish") + rabbitSpan(it, "basic.publish ") } trace(2, 1) { rabbitSpan(it, "basic.get $queue.name", TEST_WRITER[1][0]) From 64595cf4851a38969ddd98c2cb933c61432120dc Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 19 Oct 2018 11:51:57 +1000 Subject: [PATCH 4/6] Add type hints and parent assertion --- .../src/test/groovy/RabbitMQTest.groovy | 44 +++++++++++-------- .../trace/agent/test/AgentTestRunner.java | 7 ++- .../test/asserts/ListWriterAssert.groovy | 4 ++ .../agent/test/asserts/SpanAssert.groovy | 6 ++- .../agent/test/asserts/TagsAssert.groovy | 3 ++ .../agent/test/asserts/TraceAssert.groovy | 5 ++- 6 files changed, 47 insertions(+), 22 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index bab7fc9f6f..57a75c6f93 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -25,6 +25,8 @@ import spock.lang.Shared import java.util.concurrent.Phaser +import static datadog.trace.agent.test.TestUtils.runUnderTrace + // Do not run tests locally on Java7 since testcontainers are not compatible with Java7 // It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers @Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) @@ -83,33 +85,33 @@ class RabbitMQTest extends AgentTestRunner { def "test rabbit publish/get"() { setup: - channel.exchangeDeclare(exchangeName, "direct", false) - String queueName = channel.queueDeclare().getQueue() - channel.queueBind(queueName, exchangeName, routingKey) + GetResponse response = runUnderTrace("parent") { + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = channel.queueDeclare().getQueue() + channel.queueBind(queueName, exchangeName, routingKey) - channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) - GetResponse response = channel.basicGet(queueName, true) + return channel.basicGet(queueName, true) + } expect: new String(response.getBody()) == "Hello, world!" and: - assertTraces(5) { + assertTraces(2) { trace(0, 1) { - rabbitSpan(it, "exchange.declare") + rabbitSpan(it, "basic.get ", TEST_WRITER[1][1]) } - trace(1, 1) { - rabbitSpan(it, "queue.declare") - } - trace(2, 1) { - rabbitSpan(it, "queue.bind") - } - trace(3, 1) { - rabbitSpan(it, "basic.publish $exchangeName") - } - trace(4, 1) { - rabbitSpan(it, "basic.get ", TEST_WRITER[3][0]) + trace(1, 5) { + span(0) { + operationName "parent" + } + // reverse order + rabbitSpan(it, 1, "basic.publish $exchangeName", span(0)) + rabbitSpan(it, 2, "queue.bind", span(0)) + rabbitSpan(it, 3, "queue.declare", span(0)) + rabbitSpan(it, 4, "exchange.declare", span(0)) } } @@ -264,7 +266,11 @@ class RabbitMQTest extends AgentTestRunner { } def rabbitSpan(TraceAssert trace, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) { - trace.span(0) { + rabbitSpan(trace, 0, resource, parentSpan, exception, errorMsg) + } + + def rabbitSpan(TraceAssert trace, int index, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) { + trace.span(index) { serviceName "rabbitmq" operationName "amqp.command" resourceName resource diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java index d1cda8ab25..535a9c0e47 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java @@ -13,6 +13,8 @@ import datadog.trace.common.writer.ListWriter; import datadog.trace.common.writer.Writer; import groovy.lang.Closure; import groovy.lang.DelegatesTo; +import groovy.transform.stc.ClosureParams; +import groovy.transform.stc.SimpleType; import io.opentracing.Tracer; import java.lang.instrument.ClassFileTransformer; import java.lang.instrument.Instrumentation; @@ -160,7 +162,10 @@ public abstract class AgentTestRunner extends Specification { public static void assertTraces( final int size, - @DelegatesTo(value = ListWriterAssert.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams( + value = SimpleType.class, + options = "datadog.trace.agent.test.asserts.ListWriterAssert") + @DelegatesTo(value = ListWriterAssert.class, strategy = Closure.DELEGATE_FIRST) final Closure spec) { ListWriterAssert.assertTraces(TEST_WRITER, size, spec); } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy index bf898418c7..7c1f8cfd01 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy @@ -2,6 +2,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan import datadog.trace.common.writer.ListWriter +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import org.codehaus.groovy.runtime.powerassert.PowerAssertionError import org.spockframework.runtime.Condition import org.spockframework.runtime.ConditionNotSatisfiedError @@ -20,6 +22,7 @@ class ListWriterAssert { } static void assertTraces(ListWriter writer, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.ListWriterAssert']) @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { try { writer.waitForTraces(expectedSize) @@ -55,6 +58,7 @@ class ListWriterAssert { } void trace(int index, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert']) @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy index f2f716f3ae..6f219c894d 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy @@ -1,6 +1,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import static TagsAssert.assertTags @@ -12,6 +14,7 @@ class SpanAssert { } static void assertSpan(DDSpan span, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert']) @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new SpanAssert(span) def clone = (Closure) spec.clone() @@ -72,7 +75,8 @@ class SpanAssert { assert span.isError() == errored } - void tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void tags(@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert']) + @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { assertTags(span, spec) } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy index 29b5ed270f..258ca34889 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy @@ -1,6 +1,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType class TagsAssert { private final Map tags @@ -11,6 +13,7 @@ class TagsAssert { } static void assertTags(DDSpan span, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert']) @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new TagsAssert(span) def clone = (Closure) spec.clone() diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy index 2f259c1942..d75328e7b6 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy @@ -1,6 +1,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import static SpanAssert.assertSpan @@ -15,6 +17,7 @@ class TraceAssert { } static void assertTrace(List trace, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert']) @DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) { assert trace.size() == expectedSize def asserter = new TraceAssert(trace) @@ -29,7 +32,7 @@ class TraceAssert { trace.get(index) } - void span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void span(int index, @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert']) @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) } From 97dd3039092da031e99a41223e2a6b8684feb75e Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 19 Oct 2018 12:27:34 +1000 Subject: [PATCH 5/6] =?UTF-8?q?Don=E2=80=99t=20change=20resource=20name=20?= =?UTF-8?q?of=20span=20unless=20it=E2=80=99s=20for=20Rabbit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../amqp/RabbitCommandInstrumentation.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index 7c969154ec..5a14cb03c2 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -11,6 +11,7 @@ import com.rabbitmq.client.Command; import com.rabbitmq.client.Method; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.DDTags; +import datadog.trace.api.interceptor.MutableSpan; import io.opentracing.Span; import io.opentracing.util.GlobalTracer; import java.util.Collections; @@ -50,13 +51,16 @@ public class RabbitCommandInstrumentation extends Instrumenter.Default { final Span span = GlobalTracer.get().activeSpan(); final Method method = command.getMethod(); - if (span != null && method != null) { - final String name = method.protocolMethodName(); - if (!name.equals("basic.publish")) { - // Don't overwrite the name already set. - span.setTag(DDTags.RESOURCE_NAME, name); + if (span instanceof MutableSpan && method != null) { + if (((MutableSpan) span).getOperationName().equals("amqp.command")) { + final String name = method.protocolMethodName(); + + if (!name.equals("basic.publish")) { + // Don't overwrite the name already set. + span.setTag(DDTags.RESOURCE_NAME, name); + } + span.setTag("amqp.command", name); } - span.setTag("amqp.command", name); } } From b693335ef5cb1c7b4eb8589e8b80ab442ce35e4d Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 19 Oct 2018 13:19:50 +1000 Subject: [PATCH 6/6] Add routing key to resource name for publish command. --- .../amqp/RabbitChannelInstrumentation.java | 6 +++++- .../src/test/groovy/RabbitMQTest.groovy | 19 ++++++++++--------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index 78f6c59734..c829a4450c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -147,7 +147,11 @@ public class RabbitChannelInstrumentation extends Instrumenter.Default { if (span != null) { final String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; - span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName); + final String routing = + routingKey == null || routingKey.isEmpty() + ? "" + : routingKey.startsWith("amq.gen-") ? "" : routingKey; + span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing); span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); span.setTag("amqp.exchange", exchange); diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 57a75c6f93..2a65594d5f 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -108,7 +108,7 @@ class RabbitMQTest extends AgentTestRunner { operationName "parent" } // reverse order - rabbitSpan(it, 1, "basic.publish $exchangeName", span(0)) + rabbitSpan(it, 1, "basic.publish $exchangeName -> $routingKey", span(0)) rabbitSpan(it, 2, "queue.bind", span(0)) rabbitSpan(it, 3, "queue.declare", span(0)) rabbitSpan(it, 4, "exchange.declare", span(0)) @@ -122,10 +122,9 @@ class RabbitMQTest extends AgentTestRunner { def "test rabbit publish/get default exchange"() { setup: - String queueName = channel.queueDeclare("some-routing-queue", false, true, true, null).getQueue() - String routingKey = queueName + String queueName = channel.queueDeclare().getQueue() - channel.basicPublish("", routingKey, null, "Hello, world!".getBytes()) + channel.basicPublish("", queueName, null, "Hello, world!".getBytes()) GetResponse response = channel.basicGet(queueName, true) @@ -138,10 +137,10 @@ class RabbitMQTest extends AgentTestRunner { rabbitSpan(it, "queue.declare") } trace(1, 1) { - rabbitSpan(it, "basic.publish ") + rabbitSpan(it, "basic.publish -> ") } trace(2, 1) { - rabbitSpan(it, "basic.get some-routing-queue", TEST_WRITER[1][0]) + rabbitSpan(it, "basic.get ", TEST_WRITER[1][0]) } } } @@ -195,7 +194,7 @@ class RabbitMQTest extends AgentTestRunner { def publishSpan = null trace(2 + (it * 2), 1) { publishSpan = span(0) - rabbitSpan(it, "basic.publish $exchangeName") + rabbitSpan(it, "basic.publish $exchangeName -> ") } trace(3 + (it * 2), 1) { rabbitSpan(it, resource, publishSpan) @@ -257,7 +256,7 @@ class RabbitMQTest extends AgentTestRunner { rabbitSpan(it, "queue.declare") } trace(1, 1) { - rabbitSpan(it, "basic.publish ") + rabbitSpan(it, "basic.publish -> some-routing-queue") } trace(2, 1) { rabbitSpan(it, "basic.get $queue.name", TEST_WRITER[1][0]) @@ -297,7 +296,9 @@ class RabbitMQTest extends AgentTestRunner { "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER "amqp.command" "basic.publish" "amqp.exchange" { it == null || it == "some-exchange" } - "amqp.routing_key" { it == null || it == "some-routing-key" || it == "some-routing-queue" } + "amqp.routing_key" { + it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-") + } "amqp.delivery_mode" { it == null || it == 2 } "message.size" Integer break