diff --git a/.circleci/config.yml b/.circleci/config.yml index 1b2ec31aca..a9a4feed64 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -52,6 +52,8 @@ jobs: - image: *default_container # This is used by spymemcached instrumentation tests - image: memcached + # This is used by rabbitmq instrumentation tests + - image: rabbitmq steps: - checkout diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle new file mode 100644 index 0000000000..df36e2cd19 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/rabbitmq-amqp-2.7.gradle @@ -0,0 +1,47 @@ +muzzle { + pass { + group = "com.rabbitmq" + module = 'amqp-client' + versions = "[2.7.0,)" + assertInverse = true + } +} + +apply from: "${rootDir}/gradle/java.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' + + compile project(':dd-trace-ot') + compile project(':dd-java-agent:agent-tooling') + + compile deps.bytebuddy + compile deps.opentracing + compile deps.autoservice + annotationProcessor deps.autoservice + implementation deps.autoservice + + testCompile project(':dd-java-agent:testing') + + testCompile group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' + testCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.1.0.RELEASE' + + testCompile deps.testcontainers + + latestDepTestCompile group: 'com.rabbitmq', name: 'amqp-client', version: '+' + latestDepTestCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '+' +} + +configurations.testRuntime { + resolutionStrategy { + force group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0' + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java new file mode 100644 index 0000000000..c829a4450c --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -0,0 +1,284 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static io.opentracing.log.Fields.ERROR_OBJECT; +import static net.bytebuddy.matcher.ElementMatchers.canThrow; +import static net.bytebuddy.matcher.ElementMatchers.isGetter; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.isSetter; +import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.MessageProperties; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.noop.NoopSpan; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class RabbitChannelInstrumentation extends Instrumenter.Default { + + public RabbitChannelInstrumentation() { + super("amqp", "rabbitmq"); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Channel"))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TextMapInjectAdapter", + packageName + ".TextMapExtractAdapter", + packageName + ".TracedDelegatingConsumer", + }; + } + + @Override + public Map transformers() { + // We want the advice applied in a specific order, so use an ordered map. + final Map transformers = new LinkedHashMap<>(); + transformers.put( + isMethod() + .and( + not( + isGetter() + .or(isSetter()) + .or(nameEndsWith("Listener")) + .or(nameEndsWith("Listeners")) + .or(named("processAsync")) + .or(named("open")) + .or(named("close")) + .or(named("abort")) + .or(named("basicGet")))) + .and(isPublic()) + .and(canThrow(IOException.class).or(canThrow(InterruptedException.class))), + ChannelMethodAdvice.class.getName()); + transformers.put( + isMethod().and(named("basicPublish")).and(takesArguments(6)), + ChannelPublishAdvice.class.getName()); + transformers.put( + isMethod().and(named("basicGet")).and(takesArgument(0, String.class)), + ChannelGetAdvice.class.getName()); + transformers.put( + isMethod() + .and(named("basicConsume")) + .and(takesArgument(0, String.class)) + .and(takesArgument(6, named("com.rabbitmq.client.Consumer"))), + ChannelConsumeAdvice.class.getName()); + return transformers; + } + + public static class ChannelMethodAdvice { + @Advice.OnMethodEnter + public static Scope startSpan( + @Advice.This final Channel channel, @Advice.Origin("Channel.#m") final String method) { + final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); + if (callDepth > 0) { + return null; + } + + final Connection connection = channel.getConnection(); + + return GlobalTracer.get() + .buildSpan("amqp.command") + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, method) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CLIENT) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) + .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) + .startActive(true); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { + if (scope != null) { + if (throwable != null) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } + scope.close(); + CallDepthThreadLocalMap.reset(Channel.class); + } + } + } + + public static class ChannelPublishAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void setResourceNameAddHeaders( + @Advice.Argument(0) final String exchange, + @Advice.Argument(1) final String routingKey, + @Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props, + @Advice.Argument(5) final byte[] body) { + final Span span = GlobalTracer.get().activeSpan(); + + if (span != null) { + final String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; + final String routing = + routingKey == null || routingKey.isEmpty() + ? "" + : routingKey.startsWith("amq.gen-") ? "" : routingKey; + span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing); + span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); + span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER); + span.setTag("amqp.exchange", exchange); + span.setTag("amqp.routing_key", routingKey); + span.setTag("message.size", body == null ? 0 : body.length); + + // This is the internal behavior when props are null. We're just doing it earlier now. + if (props == null) { + props = MessageProperties.MINIMAL_BASIC; + } + span.setTag("amqp.delivery_mode", props.getDeliveryMode()); + + // We need to copy the BasicProperties and provide a header map we can modify + Map headers = props.getHeaders(); + headers = (headers == null) ? new HashMap() : new HashMap<>(headers); + + GlobalTracer.get() + .inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(headers)); + + props = + new AMQP.BasicProperties( + props.getContentType(), + props.getContentEncoding(), + headers, + props.getDeliveryMode(), + props.getPriority(), + props.getCorrelationId(), + props.getReplyTo(), + props.getExpiration(), + props.getMessageId(), + props.getTimestamp(), + props.getType(), + props.getUserId(), + props.getAppId(), + props.getClusterId()); + } + } + } + + public static class ChannelGetAdvice { + @Advice.OnMethodEnter + public static long takeTimestamp( + @Advice.Local("placeholderScope") Scope scope, @Advice.Local("callDepth") int callDepth) { + callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); + // Don't want RabbitCommandInstrumentation to mess up our actual parent span. + scope = GlobalTracer.get().scopeManager().activate(NoopSpan.INSTANCE, true); + return System.currentTimeMillis(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void extractAndStartSpan( + @Advice.This final Channel channel, + @Advice.Argument(0) final String queue, + @Advice.Enter final long startTime, + @Advice.Local("placeholderScope") final Scope scope, + @Advice.Local("callDepth") final int callDepth, + @Advice.Return final GetResponse response, + @Advice.Thrown final Throwable throwable) { + + if (scope.span() instanceof NoopSpan) { + scope.close(); + } + + if (callDepth > 0) { + return; + } + SpanContext parentContext = null; + + if (response != null && response.getProps() != null) { + final Map headers = response.getProps().getHeaders(); + + parentContext = + headers == null + ? null + : GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); + } + + if (parentContext == null) { + final Span parent = GlobalTracer.get().activeSpan(); + if (parent != null) { + parentContext = parent.context(); + } + } + + final Connection connection = channel.getConnection(); + + final Integer length = response == null ? null : response.getBody().length; + + final String queueName = queue.startsWith("amq.gen-") ? "" : queue; + + final Span span = + GlobalTracer.get() + .buildSpan("amqp.command") + .withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime)) + .asChildOf(parentContext) + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, "basic.get " + queueName) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag("amqp.command", "basic.get") + .withTag("amqp.queue", queue) + .withTag("message.size", length) + .withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName()) + .withTag(Tags.PEER_PORT.getKey(), connection.getPort()) + .start(); + + if (throwable != null) { + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } + + span.finish(); + CallDepthThreadLocalMap.reset(Channel.class); + } + } + + public static class ChannelConsumeAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrapConsumer( + @Advice.Argument(0) final String queue, + @Advice.Argument(value = 6, readOnly = false) Consumer consumer) { + // We have to save off the queue name here because it isn't available to the consumer later. + if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) { + consumer = new TracedDelegatingConsumer(queue, consumer); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java new file mode 100644 index 0000000000..5a14cb03c2 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -0,0 +1,76 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; + +import com.google.auto.service.AutoService; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Method; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDTags; +import datadog.trace.api.interceptor.MutableSpan; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(Instrumenter.class) +public class RabbitCommandInstrumentation extends Instrumenter.Default { + + public RabbitCommandInstrumentation() { + super("amqp", "rabbitmq"); + } + + @Override + public ElementMatcher typeMatcher() { + return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Command"))); + } + + @Override + public String[] helperClassNames() { + return new String[] { + // These are only used by muzzleCheck. + packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", + }; + } + + @Override + public Map transformers() { + return Collections.singletonMap(isConstructor(), CommandConstructorAdvice.class.getName()); + } + + public static class CommandConstructorAdvice { + @Advice.OnMethodExit + public static void setResourceNameAddHeaders(@Advice.This final Command command) { + final Span span = GlobalTracer.get().activeSpan(); + + final Method method = command.getMethod(); + if (span instanceof MutableSpan && method != null) { + if (((MutableSpan) span).getOperationName().equals("amqp.command")) { + final String name = method.protocolMethodName(); + + if (!name.equals("basic.publish")) { + // Don't overwrite the name already set. + span.setTag(DDTags.RESOURCE_NAME, name); + } + span.setTag("amqp.command", name); + } + } + } + + /** + * This instrumentation will match with 2.6, but the channel instrumentation only matches with + * 2.7 because of TracedDelegatingConsumer. This unused method is added to ensure consistent + * muzzle validation by preventing match with 2.6. + */ + public static void muzzleCheck(final TracedDelegatingConsumer consumer) { + consumer.handleRecoverOk(null); + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java new file mode 100644 index 0000000000..79709ef415 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapExtractAdapter.java @@ -0,0 +1,28 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import io.opentracing.propagation.TextMap; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +// TextMap works with , but the type we're given is +public class TextMapExtractAdapter implements TextMap { + + private final Map map = new HashMap<>(); + + public TextMapExtractAdapter(final Map headers) { + for (final Map.Entry entry : headers.entrySet()) { + map.put(entry.getKey(), entry.getValue().toString()); + } + } + + @Override + public Iterator> iterator() { + return map.entrySet().iterator(); + } + + @Override + public void put(final String key, final String value) { + throw new UnsupportedOperationException("Use inject adapter instead"); + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java new file mode 100644 index 0000000000..5f28b7383e --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TextMapInjectAdapter.java @@ -0,0 +1,25 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import io.opentracing.propagation.TextMap; +import java.util.Iterator; +import java.util.Map; + +// TextMap works with , but the type we're given is +public class TextMapInjectAdapter implements TextMap { + private final Map map; + + public TextMapInjectAdapter(final Map map) { + this.map = map; + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException( + "TextMapInjectAdapter should only be used with Tracer.inject()"); + } + + @Override + public void put(final String key, final String value) { + map.put(key, value); + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java new file mode 100644 index 0000000000..d3571b0750 --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -0,0 +1,113 @@ +package datadog.trace.instrumentation.rabbitmq.amqp; + +import static io.opentracing.log.Fields.ERROR_OBJECT; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.noop.NoopScopeManager; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Wrapping the consumer instead of instrumenting it directly because it doesn't get access to the + * queue name when the message is consumed. + */ +public class TracedDelegatingConsumer implements Consumer { + private final String queue; + private final Consumer delegate; + + public TracedDelegatingConsumer(final String queue, final Consumer delegate) { + this.queue = queue; + this.delegate = delegate; + } + + @Override + public void handleConsumeOk(final String consumerTag) { + delegate.handleConsumeOk(consumerTag); + } + + @Override + public void handleCancelOk(final String consumerTag) { + delegate.handleCancelOk(consumerTag); + } + + @Override + public void handleCancel(final String consumerTag) throws IOException { + delegate.handleCancel(consumerTag); + } + + @Override + public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) { + delegate.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(String consumerTag) { + delegate.handleRecoverOk(consumerTag); + } + + @Override + public void handleDelivery( + final String consumerTag, + final Envelope envelope, + final AMQP.BasicProperties properties, + final byte[] body) + throws IOException { + Scope scope = NoopScopeManager.NoopScope.INSTANCE; + try { + final Map headers = properties.getHeaders(); + final SpanContext parentContext = + headers == null + ? null + : GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers)); + + String queueName = queue; + if (queue == null || queue.isEmpty()) { + queueName = ""; + } else if (queue.startsWith("amq.gen-")) { + queueName = ""; + } + + scope = + GlobalTracer.get() + .buildSpan("amqp.command") + .asChildOf(parentContext) + .withTag(DDTags.SERVICE_NAME, "rabbitmq") + .withTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp") + .withTag("amqp.command", "basic.deliver") + .withTag("amqp.exchange", envelope.getExchange()) + .withTag("amqp.routing_key", envelope.getRoutingKey()) + .withTag("message.size", body == null ? 0 : body.length) + .withTag("span.origin.type", delegate.getClass().getName()) + .startActive(true); + } finally { + try { + + // Call delegate. + delegate.handleDelivery(consumerTag, envelope, properties, body); + + } catch (final Throwable throwable) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); + } finally { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy new file mode 100644 index 0000000000..2a65594d5f --- /dev/null +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -0,0 +1,329 @@ +import com.rabbitmq.client.AMQP +import com.rabbitmq.client.AlreadyClosedException +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Connection +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.DefaultConsumer +import com.rabbitmq.client.Envelope +import com.rabbitmq.client.GetResponse +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.DDTags +import io.opentracing.tag.Tags +import org.springframework.amqp.core.AmqpAdmin +import org.springframework.amqp.core.AmqpTemplate +import org.springframework.amqp.core.Queue +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory +import org.springframework.amqp.rabbit.core.RabbitAdmin +import org.springframework.amqp.rabbit.core.RabbitTemplate +import org.testcontainers.containers.GenericContainer +import spock.lang.Requires +import spock.lang.Shared + +import java.util.concurrent.Phaser + +import static datadog.trace.agent.test.TestUtils.runUnderTrace + +// Do not run tests locally on Java7 since testcontainers are not compatible with Java7 +// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers +@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) +class RabbitMQTest extends AgentTestRunner { + + /* + Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because + 'testcontainers' are built for Java 8 and Java 7 cannot load this class. + */ + @Shared + def rabbbitMQContainer + @Shared + def defaultRabbitMQPort = 5672 + @Shared + InetSocketAddress rabbitmqAddress = new InetSocketAddress("127.0.0.1", defaultRabbitMQPort) + + ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port) + Connection conn = factory.newConnection() + Channel channel = conn.createChannel() + + def setupSpec() { + + /* + CI will provide us with rabbitmq container running along side our build. + When building locally, however, we need to take matters into our own hands + and we use 'testcontainers' for this. + */ + if ("true" != System.getenv("CI")) { + rabbbitMQContainer = new GenericContainer('rabbitmq:latest') + .withExposedPorts(defaultRabbitMQPort) +// .withLogConsumer { output -> +// print output.utf8String +// } + rabbbitMQContainer.start() + rabbitmqAddress = new InetSocketAddress( + rabbbitMQContainer.containerIpAddress, + rabbbitMQContainer.getMappedPort(defaultRabbitMQPort) + ) + } + } + + def cleanupSpec() { + if (rabbbitMQContainer) { + rabbbitMQContainer.stop() + } + } + + def cleanup() { + try { + channel.close() + conn.close() + } catch (AlreadyClosedException e) { + // Ignore + } + } + + def "test rabbit publish/get"() { + setup: + GetResponse response = runUnderTrace("parent") { + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = channel.queueDeclare().getQueue() + channel.queueBind(queueName, exchangeName, routingKey) + + channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes()) + + return channel.basicGet(queueName, true) + } + + expect: + new String(response.getBody()) == "Hello, world!" + + and: + assertTraces(2) { + trace(0, 1) { + rabbitSpan(it, "basic.get ", TEST_WRITER[1][1]) + } + trace(1, 5) { + span(0) { + operationName "parent" + } + // reverse order + rabbitSpan(it, 1, "basic.publish $exchangeName -> $routingKey", span(0)) + rabbitSpan(it, 2, "queue.bind", span(0)) + rabbitSpan(it, 3, "queue.declare", span(0)) + rabbitSpan(it, 4, "exchange.declare", span(0)) + } + } + + where: + exchangeName | routingKey + "some-exchange" | "some-routing-key" + } + + def "test rabbit publish/get default exchange"() { + setup: + String queueName = channel.queueDeclare().getQueue() + + channel.basicPublish("", queueName, null, "Hello, world!".getBytes()) + + GetResponse response = channel.basicGet(queueName, true) + + expect: + new String(response.getBody()) == "Hello, world!" + + and: + assertTraces(3) { + trace(0, 1) { + rabbitSpan(it, "queue.declare") + } + trace(1, 1) { + rabbitSpan(it, "basic.publish -> ") + } + trace(2, 1) { + rabbitSpan(it, "basic.get ", TEST_WRITER[1][0]) + } + } + } + + def "test rabbit consume #messageCount messages"() { + setup: + channel.exchangeDeclare(exchangeName, "direct", false) + String queueName = (messageCount % 2 == 0) ? + channel.queueDeclare().getQueue() : + channel.queueDeclare("some-queue", false, true, true, null).getQueue() + channel.queueBind(queueName, exchangeName, "") + + def phaser = new Phaser() + phaser.register() + phaser.register() + def deliveries = [] + + Consumer callback = new DefaultConsumer(channel) { + @Override + void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + phaser.arriveAndAwaitAdvance() // Ensure publish spans are reported first. + deliveries << new String(body) + } + } + + channel.basicConsume(queueName, callback) + + (1..messageCount).each { + TEST_WRITER.waitForTraces(2 + (it * 2)) + channel.basicPublish(exchangeName, "", null, "msg $it".getBytes()) + TEST_WRITER.waitForTraces(3 + (it * 2)) + phaser.arriveAndAwaitAdvance() + } + def resource = messageCount % 2 == 0 ? "basic.deliver " : "basic.deliver $queueName" + + expect: + assertTraces(4 + (messageCount * 2)) { + trace(0, 1) { + rabbitSpan(it, "exchange.declare") + } + trace(1, 1) { + rabbitSpan(it, "queue.declare") + } + trace(2, 1) { + rabbitSpan(it, "queue.bind") + } + trace(3, 1) { + rabbitSpan(it, "basic.consume") + } + (1..messageCount).each { + def publishSpan = null + trace(2 + (it * 2), 1) { + publishSpan = span(0) + rabbitSpan(it, "basic.publish $exchangeName -> ") + } + trace(3 + (it * 2), 1) { + rabbitSpan(it, resource, publishSpan) + } + } + } + + deliveries == (1..messageCount).collect { "msg $it" } + + where: + exchangeName = "some-exchange" + messageCount << (1..4) + } + + def "test rabbit error (#command)"() { + when: + closure.call(channel) + + then: + def throwable = thrown(exception) + + and: + + assertTraces(1) { + trace(0, 1) { + rabbitSpan(it, command, null, throwable, errorMsg) + } + } + + where: + command | exception | errorMsg | closure + "exchange.declare" | IOException | null | { + it.exchangeDeclare("some-exchange", "invalid-type", true) + } + "Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | { + it.basicConsume(null, null) + } + "basic.get " | IOException | null | { + it.basicGet("amq.gen-invalid-channel", true) + } + } + + def "test spring rabbit"() { + setup: + def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port) + AmqpAdmin admin = new RabbitAdmin(connectionFactory) + def queue = new Queue("some-routing-queue", false, true, true, null) + admin.declareQueue(queue) + AmqpTemplate template = new RabbitTemplate(connectionFactory) + template.convertAndSend(queue.name, "foo") + String message = (String) template.receiveAndConvert(queue.name) + + expect: + message == "foo" + + and: + assertTraces(3) { + trace(0, 1) { + rabbitSpan(it, "queue.declare") + } + trace(1, 1) { + rabbitSpan(it, "basic.publish -> some-routing-queue") + } + trace(2, 1) { + rabbitSpan(it, "basic.get $queue.name", TEST_WRITER[1][0]) + } + } + } + + def rabbitSpan(TraceAssert trace, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) { + rabbitSpan(trace, 0, resource, parentSpan, exception, errorMsg) + } + + def rabbitSpan(TraceAssert trace, int index, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) { + trace.span(index) { + serviceName "rabbitmq" + operationName "amqp.command" + resourceName resource + + if (parentSpan) { + childOf parentSpan + } else { + parent() + } + + errored exception != null + + tags { + if (exception) { + errorTags(exception.class, errorMsg) + } + "$Tags.COMPONENT.key" "rabbitmq-amqp" + "$Tags.PEER_HOSTNAME.key" { it == null || it instanceof String } + "$Tags.PEER_PORT.key" { it == null || it instanceof Integer } + + switch (tag("amqp.command")) { + case "basic.publish": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_PRODUCER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER + "amqp.command" "basic.publish" + "amqp.exchange" { it == null || it == "some-exchange" } + "amqp.routing_key" { + it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-") + } + "amqp.delivery_mode" { it == null || it == 2 } + "message.size" Integer + break + case "basic.get": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER + "amqp.command" "basic.get" + "amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") } + "message.size" { it == null || it instanceof Integer } + break + case "basic.deliver": + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER + "amqp.command" "basic.deliver" + "span.origin.type" "RabbitMQTest\$1" + "amqp.exchange" "some-exchange" + "message.size" Integer + break + default: + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CLIENT + "amqp.command" { it == null || it == resource } + } + defaultTags() + } + } + } +} diff --git a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle index 7c2f33bd44..a7d3da67d6 100644 --- a/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle +++ b/dd-java-agent/instrumentation/spymemcached-2.12/spymemcached-2.12.gradle @@ -32,7 +32,7 @@ dependencies { testCompile project(':dd-java-agent:testing') testCompile group: 'net.spy', name: 'spymemcached', version: '2.12.0' - testCompile group: 'org.testcontainers', name: 'testcontainers', version: '1.7.3' + testCompile deps.testcontainers } configurations.latestDepTestCompile { diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java index d1cda8ab25..535a9c0e47 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.java @@ -13,6 +13,8 @@ import datadog.trace.common.writer.ListWriter; import datadog.trace.common.writer.Writer; import groovy.lang.Closure; import groovy.lang.DelegatesTo; +import groovy.transform.stc.ClosureParams; +import groovy.transform.stc.SimpleType; import io.opentracing.Tracer; import java.lang.instrument.ClassFileTransformer; import java.lang.instrument.Instrumentation; @@ -160,7 +162,10 @@ public abstract class AgentTestRunner extends Specification { public static void assertTraces( final int size, - @DelegatesTo(value = ListWriterAssert.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams( + value = SimpleType.class, + options = "datadog.trace.agent.test.asserts.ListWriterAssert") + @DelegatesTo(value = ListWriterAssert.class, strategy = Closure.DELEGATE_FIRST) final Closure spec) { ListWriterAssert.assertTraces(TEST_WRITER, size, spec); } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy index bf898418c7..7c1f8cfd01 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/ListWriterAssert.groovy @@ -2,6 +2,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan import datadog.trace.common.writer.ListWriter +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import org.codehaus.groovy.runtime.powerassert.PowerAssertionError import org.spockframework.runtime.Condition import org.spockframework.runtime.ConditionNotSatisfiedError @@ -20,6 +22,7 @@ class ListWriterAssert { } static void assertTraces(ListWriter writer, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.ListWriterAssert']) @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { try { writer.waitForTraces(expectedSize) @@ -55,6 +58,7 @@ class ListWriterAssert { } void trace(int index, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert']) @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy index f2f716f3ae..6f219c894d 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/SpanAssert.groovy @@ -1,6 +1,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import static TagsAssert.assertTags @@ -12,6 +14,7 @@ class SpanAssert { } static void assertSpan(DDSpan span, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert']) @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new SpanAssert(span) def clone = (Closure) spec.clone() @@ -72,7 +75,8 @@ class SpanAssert { assert span.isError() == errored } - void tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void tags(@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert']) + @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { assertTags(span, spec) } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy index 2083266af5..258ca34889 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy @@ -1,16 +1,19 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType class TagsAssert { private final Map tags private final Set assertedTags = new TreeSet<>() private TagsAssert(DDSpan span) { - this.tags = new TreeMap(span.tags) + this.tags = span.tags } static void assertTags(DDSpan span, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert']) @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new TagsAssert(span) def clone = (Closure) spec.clone() @@ -56,14 +59,23 @@ class TagsAssert { } } + def tag(String name) { + return tags[name] + } + def methodMissing(String name, args) { - if (args.length != 1) { + if (args.length == 0) { throw new IllegalArgumentException(args.toString()) } tag(name, args[0]) } void assertTagsAllVerified() { - assert tags.keySet() == assertedTags + def set = new TreeMap<>(tags).keySet() + set.removeAll(assertedTags) + // The primary goal is to ensure the set is empty. + // tags and assertedTags are included via an "always true" comparison + // so they provide better context in the error message. + assert tags.entrySet() != assertedTags && set.isEmpty() } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy index 2f259c1942..d75328e7b6 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TraceAssert.groovy @@ -1,6 +1,8 @@ package datadog.trace.agent.test.asserts import datadog.opentracing.DDSpan +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import static SpanAssert.assertSpan @@ -15,6 +17,7 @@ class TraceAssert { } static void assertTrace(List trace, int expectedSize, + @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert']) @DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) { assert trace.size() == expectedSize def asserter = new TraceAssert(trace) @@ -29,7 +32,7 @@ class TraceAssert { trace.get(index) } - void span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void span(int index, @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert']) @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java index b8a22df2ca..4491b970e1 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java @@ -15,6 +15,7 @@ public class DDSpanTypes { public static final String MEMCACHED = "memcached"; public static final String ELASTICSEARCH = "elasticsearch"; + public static final String MESSAGE_CLIENT = "queue"; public static final String MESSAGE_CONSUMER = "queue"; public static final String MESSAGE_PRODUCER = "queue"; } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java index c1dcf6767c..6df5cbcd27 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/propagation/HTTPCodec.java @@ -18,7 +18,7 @@ public class HTTPCodec implements Codec { // uint 64 bits max value, 2^64 - 1 static final BigInteger BIG_INTEGER_UINT64_MAX = - (new BigInteger("2")).pow(64).subtract(BigInteger.ONE); + new BigInteger("2").pow(64).subtract(BigInteger.ONE); private static final String OT_BAGGAGE_PREFIX = "ot-baggage-"; private static final String TRACE_ID_KEY = "x-datadog-trace-id"; @@ -120,16 +120,16 @@ public class HTTPCodec implements Codec { * @return the ID in String format if it passes validations * @throws IllegalArgumentException if val is not a number or if the number is out of range */ - private String validateUInt64BitsID(String val) throws IllegalArgumentException { + private String validateUInt64BitsID(final String val) throws IllegalArgumentException { try { - BigInteger validate = new BigInteger(val); + final BigInteger validate = new BigInteger(val); if (validate.compareTo(BigInteger.ZERO) == -1 || validate.compareTo(BIG_INTEGER_UINT64_MAX) == 1) { throw new IllegalArgumentException( "ID out of range, must be between 0 and 2^64-1, got: " + val); } return val; - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { throw new IllegalArgumentException( "Expecting a number for trace ID or span ID, but got: " + val, nfe); } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index e5e2e3b148..bc02b47888 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -50,6 +50,7 @@ ext { bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"), groovy : "org.codehaus.groovy:groovy-all:${versions.groovy}", junit : "junit:junit:${versions.junit}", + testcontainers : "org.testcontainers:testcontainers:1.7.3", testLogging : [ dependencies.create(group: 'ch.qos.logback', name: 'logback-classic', version: versions.logback), dependencies.create(group: 'org.slf4j', name: 'log4j-over-slf4j', version: versions.slf4j), diff --git a/settings.gradle b/settings.gradle index adec31c4c6..50e78919a2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -49,6 +49,7 @@ include ':dd-java-agent:instrumentation:netty-4.1' include ':dd-java-agent:instrumentation:okhttp-3' include ':dd-java-agent:instrumentation:osgi-classloading' include ':dd-java-agent:instrumentation:play-2.4' +include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3'