Merge pull request #535 from DataDog/tyler/rabbit

Add instrumentation for RabbitMQ’s AMQP library
This commit is contained in:
Tyler Benson 2018-10-19 23:26:50 +10:00 committed by GitHub
commit 78c6f881ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 946 additions and 11 deletions

View File

@ -52,6 +52,8 @@ jobs:
- image: *default_container
# This is used by spymemcached instrumentation tests
- image: memcached
# This is used by rabbitmq instrumentation tests
- image: rabbitmq
steps:
- checkout

View File

@ -0,0 +1,47 @@
muzzle {
pass {
group = "com.rabbitmq"
module = 'amqp-client'
versions = "[2.7.0,)"
assertInverse = true
}
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
compileOnly group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0'
compile project(':dd-trace-ot')
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
compile deps.autoservice
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
testCompile group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0'
testCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.1.0.RELEASE'
testCompile deps.testcontainers
latestDepTestCompile group: 'com.rabbitmq', name: 'amqp-client', version: '+'
latestDepTestCompile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '+'
}
configurations.testRuntime {
resolutionStrategy {
force group: 'com.rabbitmq', name: 'amqp-client', version: '2.7.0'
}
}

View File

@ -0,0 +1,284 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.canThrow;
import static net.bytebuddy.matcher.ElementMatchers.isGetter;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isSetter;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.noop.NoopSpan;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class RabbitChannelInstrumentation extends Instrumenter.Default {
public RabbitChannelInstrumentation() {
super("amqp", "rabbitmq");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Channel")));
}
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".TextMapInjectAdapter",
packageName + ".TextMapExtractAdapter",
packageName + ".TracedDelegatingConsumer",
};
}
@Override
public Map<? extends ElementMatcher, String> transformers() {
// We want the advice applied in a specific order, so use an ordered map.
final Map<ElementMatcher, String> transformers = new LinkedHashMap<>();
transformers.put(
isMethod()
.and(
not(
isGetter()
.or(isSetter())
.or(nameEndsWith("Listener"))
.or(nameEndsWith("Listeners"))
.or(named("processAsync"))
.or(named("open"))
.or(named("close"))
.or(named("abort"))
.or(named("basicGet"))))
.and(isPublic())
.and(canThrow(IOException.class).or(canThrow(InterruptedException.class))),
ChannelMethodAdvice.class.getName());
transformers.put(
isMethod().and(named("basicPublish")).and(takesArguments(6)),
ChannelPublishAdvice.class.getName());
transformers.put(
isMethod().and(named("basicGet")).and(takesArgument(0, String.class)),
ChannelGetAdvice.class.getName());
transformers.put(
isMethod()
.and(named("basicConsume"))
.and(takesArgument(0, String.class))
.and(takesArgument(6, named("com.rabbitmq.client.Consumer"))),
ChannelConsumeAdvice.class.getName());
return transformers;
}
public static class ChannelMethodAdvice {
@Advice.OnMethodEnter
public static Scope startSpan(
@Advice.This final Channel channel, @Advice.Origin("Channel.#m") final String method) {
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class);
if (callDepth > 0) {
return null;
}
final Connection connection = channel.getConnection();
return GlobalTracer.get()
.buildSpan("amqp.command")
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, method)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CLIENT)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName())
.withTag(Tags.PEER_PORT.getKey(), connection.getPort())
.startActive(true);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) {
if (throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
scope.close();
CallDepthThreadLocalMap.reset(Channel.class);
}
}
}
public static class ChannelPublishAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void setResourceNameAddHeaders(
@Advice.Argument(0) final String exchange,
@Advice.Argument(1) final String routingKey,
@Advice.Argument(value = 4, readOnly = false) AMQP.BasicProperties props,
@Advice.Argument(5) final byte[] body) {
final Span span = GlobalTracer.get().activeSpan();
if (span != null) {
final String exchangeName = exchange == null || exchange.isEmpty() ? "<default>" : exchange;
final String routing =
routingKey == null || routingKey.isEmpty()
? "<all>"
: routingKey.startsWith("amq.gen-") ? "<generated>" : routingKey;
span.setTag(DDTags.RESOURCE_NAME, "basic.publish " + exchangeName + " -> " + routing);
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER);
span.setTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_PRODUCER);
span.setTag("amqp.exchange", exchange);
span.setTag("amqp.routing_key", routingKey);
span.setTag("message.size", body == null ? 0 : body.length);
// This is the internal behavior when props are null. We're just doing it earlier now.
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
span.setTag("amqp.delivery_mode", props.getDeliveryMode());
// We need to copy the BasicProperties and provide a header map we can modify
Map<String, Object> headers = props.getHeaders();
headers = (headers == null) ? new HashMap<String, Object>() : new HashMap<>(headers);
GlobalTracer.get()
.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapInjectAdapter(headers));
props =
new AMQP.BasicProperties(
props.getContentType(),
props.getContentEncoding(),
headers,
props.getDeliveryMode(),
props.getPriority(),
props.getCorrelationId(),
props.getReplyTo(),
props.getExpiration(),
props.getMessageId(),
props.getTimestamp(),
props.getType(),
props.getUserId(),
props.getAppId(),
props.getClusterId());
}
}
}
public static class ChannelGetAdvice {
@Advice.OnMethodEnter
public static long takeTimestamp(
@Advice.Local("placeholderScope") Scope scope, @Advice.Local("callDepth") int callDepth) {
callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class);
// Don't want RabbitCommandInstrumentation to mess up our actual parent span.
scope = GlobalTracer.get().scopeManager().activate(NoopSpan.INSTANCE, true);
return System.currentTimeMillis();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void extractAndStartSpan(
@Advice.This final Channel channel,
@Advice.Argument(0) final String queue,
@Advice.Enter final long startTime,
@Advice.Local("placeholderScope") final Scope scope,
@Advice.Local("callDepth") final int callDepth,
@Advice.Return final GetResponse response,
@Advice.Thrown final Throwable throwable) {
if (scope.span() instanceof NoopSpan) {
scope.close();
}
if (callDepth > 0) {
return;
}
SpanContext parentContext = null;
if (response != null && response.getProps() != null) {
final Map<String, Object> headers = response.getProps().getHeaders();
parentContext =
headers == null
? null
: GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers));
}
if (parentContext == null) {
final Span parent = GlobalTracer.get().activeSpan();
if (parent != null) {
parentContext = parent.context();
}
}
final Connection connection = channel.getConnection();
final Integer length = response == null ? null : response.getBody().length;
final String queueName = queue.startsWith("amq.gen-") ? "<generated>" : queue;
final Span span =
GlobalTracer.get()
.buildSpan("amqp.command")
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(startTime))
.asChildOf(parentContext)
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, "basic.get " + queueName)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag("amqp.command", "basic.get")
.withTag("amqp.queue", queue)
.withTag("message.size", length)
.withTag(Tags.PEER_HOSTNAME.getKey(), connection.getAddress().getHostName())
.withTag(Tags.PEER_PORT.getKey(), connection.getPort())
.start();
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
span.finish();
CallDepthThreadLocalMap.reset(Channel.class);
}
}
public static class ChannelConsumeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapConsumer(
@Advice.Argument(0) final String queue,
@Advice.Argument(value = 6, readOnly = false) Consumer consumer) {
// We have to save off the queue name here because it isn't available to the consumer later.
if (consumer != null && !(consumer instanceof TracedDelegatingConsumer)) {
consumer = new TracedDelegatingConsumer(queue, consumer);
}
}
}
}

View File

@ -0,0 +1,76 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import com.google.auto.service.AutoService;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.Method;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public class RabbitCommandInstrumentation extends Instrumenter.Default {
public RabbitCommandInstrumentation() {
super("amqp", "rabbitmq");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named("com.rabbitmq.client.Command")));
}
@Override
public String[] helperClassNames() {
return new String[] {
// These are only used by muzzleCheck.
packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer",
};
}
@Override
public Map<? extends ElementMatcher, String> transformers() {
return Collections.singletonMap(isConstructor(), CommandConstructorAdvice.class.getName());
}
public static class CommandConstructorAdvice {
@Advice.OnMethodExit
public static void setResourceNameAddHeaders(@Advice.This final Command command) {
final Span span = GlobalTracer.get().activeSpan();
final Method method = command.getMethod();
if (span instanceof MutableSpan && method != null) {
if (((MutableSpan) span).getOperationName().equals("amqp.command")) {
final String name = method.protocolMethodName();
if (!name.equals("basic.publish")) {
// Don't overwrite the name already set.
span.setTag(DDTags.RESOURCE_NAME, name);
}
span.setTag("amqp.command", name);
}
}
}
/**
* This instrumentation will match with 2.6, but the channel instrumentation only matches with
* 2.7 because of TracedDelegatingConsumer. This unused method is added to ensure consistent
* muzzle validation by preventing match with 2.6.
*/
public static void muzzleCheck(final TracedDelegatingConsumer consumer) {
consumer.handleRecoverOk(null);
}
}
}

View File

@ -0,0 +1,28 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import io.opentracing.propagation.TextMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
// TextMap works with <String,String>, but the type we're given is <String,Object>
public class TextMapExtractAdapter implements TextMap {
private final Map<String, String> map = new HashMap<>();
public TextMapExtractAdapter(final Map<String, Object> headers) {
for (final Map.Entry<String, Object> entry : headers.entrySet()) {
map.put(entry.getKey(), entry.getValue().toString());
}
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
return map.entrySet().iterator();
}
@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("Use inject adapter instead");
}
}

View File

@ -0,0 +1,25 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
// TextMap works with <String,String>, but the type we're given is <String,Object>
public class TextMapInjectAdapter implements TextMap {
private final Map<String, ? super String> map;
public TextMapInjectAdapter(final Map<String, ? super String> map) {
this.map = map;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"TextMapInjectAdapter should only be used with Tracer.inject()");
}
@Override
public void put(final String key, final String value) {
map.put(key, value);
}
}

View File

@ -0,0 +1,113 @@
package datadog.trace.instrumentation.rabbitmq.amqp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.noop.NoopScopeManager;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
* Wrapping the consumer instead of instrumenting it directly because it doesn't get access to the
* queue name when the message is consumed.
*/
public class TracedDelegatingConsumer implements Consumer {
private final String queue;
private final Consumer delegate;
public TracedDelegatingConsumer(final String queue, final Consumer delegate) {
this.queue = queue;
this.delegate = delegate;
}
@Override
public void handleConsumeOk(final String consumerTag) {
delegate.handleConsumeOk(consumerTag);
}
@Override
public void handleCancelOk(final String consumerTag) {
delegate.handleCancelOk(consumerTag);
}
@Override
public void handleCancel(final String consumerTag) throws IOException {
delegate.handleCancel(consumerTag);
}
@Override
public void handleShutdownSignal(final String consumerTag, final ShutdownSignalException sig) {
delegate.handleShutdownSignal(consumerTag, sig);
}
@Override
public void handleRecoverOk(String consumerTag) {
delegate.handleRecoverOk(consumerTag);
}
@Override
public void handleDelivery(
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body)
throws IOException {
Scope scope = NoopScopeManager.NoopScope.INSTANCE;
try {
final Map<String, Object> headers = properties.getHeaders();
final SpanContext parentContext =
headers == null
? null
: GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(headers));
String queueName = queue;
if (queue == null || queue.isEmpty()) {
queueName = "<default>";
} else if (queue.startsWith("amq.gen-")) {
queueName = "<generated>";
}
scope =
GlobalTracer.get()
.buildSpan("amqp.command")
.asChildOf(parentContext)
.withTag(DDTags.SERVICE_NAME, "rabbitmq")
.withTag(DDTags.RESOURCE_NAME, "basic.deliver " + queueName)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "rabbitmq-amqp")
.withTag("amqp.command", "basic.deliver")
.withTag("amqp.exchange", envelope.getExchange())
.withTag("amqp.routing_key", envelope.getRoutingKey())
.withTag("message.size", body == null ? 0 : body.length)
.withTag("span.origin.type", delegate.getClass().getName())
.startActive(true);
} finally {
try {
// Call delegate.
delegate.handleDelivery(consumerTag, envelope, properties, body);
} catch (final Throwable throwable) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
} finally {
scope.close();
}
}
}
}

View File

@ -0,0 +1,329 @@
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.AlreadyClosedException
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Consumer
import com.rabbitmq.client.DefaultConsumer
import com.rabbitmq.client.Envelope
import com.rabbitmq.client.GetResponse
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.testcontainers.containers.GenericContainer
import spock.lang.Requires
import spock.lang.Shared
import java.util.concurrent.Phaser
import static datadog.trace.agent.test.TestUtils.runUnderTrace
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
class RabbitMQTest extends AgentTestRunner {
/*
Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
*/
@Shared
def rabbbitMQContainer
@Shared
def defaultRabbitMQPort = 5672
@Shared
InetSocketAddress rabbitmqAddress = new InetSocketAddress("127.0.0.1", defaultRabbitMQPort)
ConnectionFactory factory = new ConnectionFactory(host: rabbitmqAddress.hostName, port: rabbitmqAddress.port)
Connection conn = factory.newConnection()
Channel channel = conn.createChannel()
def setupSpec() {
/*
CI will provide us with rabbitmq container running along side our build.
When building locally, however, we need to take matters into our own hands
and we use 'testcontainers' for this.
*/
if ("true" != System.getenv("CI")) {
rabbbitMQContainer = new GenericContainer('rabbitmq:latest')
.withExposedPorts(defaultRabbitMQPort)
// .withLogConsumer { output ->
// print output.utf8String
// }
rabbbitMQContainer.start()
rabbitmqAddress = new InetSocketAddress(
rabbbitMQContainer.containerIpAddress,
rabbbitMQContainer.getMappedPort(defaultRabbitMQPort)
)
}
}
def cleanupSpec() {
if (rabbbitMQContainer) {
rabbbitMQContainer.stop()
}
}
def cleanup() {
try {
channel.close()
conn.close()
} catch (AlreadyClosedException e) {
// Ignore
}
}
def "test rabbit publish/get"() {
setup:
GetResponse response = runUnderTrace("parent") {
channel.exchangeDeclare(exchangeName, "direct", false)
String queueName = channel.queueDeclare().getQueue()
channel.queueBind(queueName, exchangeName, routingKey)
channel.basicPublish(exchangeName, routingKey, null, "Hello, world!".getBytes())
return channel.basicGet(queueName, true)
}
expect:
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(2) {
trace(0, 1) {
rabbitSpan(it, "basic.get <generated>", TEST_WRITER[1][1])
}
trace(1, 5) {
span(0) {
operationName "parent"
}
// reverse order
rabbitSpan(it, 1, "basic.publish $exchangeName -> $routingKey", span(0))
rabbitSpan(it, 2, "queue.bind", span(0))
rabbitSpan(it, 3, "queue.declare", span(0))
rabbitSpan(it, 4, "exchange.declare", span(0))
}
}
where:
exchangeName | routingKey
"some-exchange" | "some-routing-key"
}
def "test rabbit publish/get default exchange"() {
setup:
String queueName = channel.queueDeclare().getQueue()
channel.basicPublish("", queueName, null, "Hello, world!".getBytes())
GetResponse response = channel.basicGet(queueName, true)
expect:
new String(response.getBody()) == "Hello, world!"
and:
assertTraces(3) {
trace(0, 1) {
rabbitSpan(it, "queue.declare")
}
trace(1, 1) {
rabbitSpan(it, "basic.publish <default> -> <generated>")
}
trace(2, 1) {
rabbitSpan(it, "basic.get <generated>", TEST_WRITER[1][0])
}
}
}
def "test rabbit consume #messageCount messages"() {
setup:
channel.exchangeDeclare(exchangeName, "direct", false)
String queueName = (messageCount % 2 == 0) ?
channel.queueDeclare().getQueue() :
channel.queueDeclare("some-queue", false, true, true, null).getQueue()
channel.queueBind(queueName, exchangeName, "")
def phaser = new Phaser()
phaser.register()
phaser.register()
def deliveries = []
Consumer callback = new DefaultConsumer(channel) {
@Override
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
phaser.arriveAndAwaitAdvance() // Ensure publish spans are reported first.
deliveries << new String(body)
}
}
channel.basicConsume(queueName, callback)
(1..messageCount).each {
TEST_WRITER.waitForTraces(2 + (it * 2))
channel.basicPublish(exchangeName, "", null, "msg $it".getBytes())
TEST_WRITER.waitForTraces(3 + (it * 2))
phaser.arriveAndAwaitAdvance()
}
def resource = messageCount % 2 == 0 ? "basic.deliver <generated>" : "basic.deliver $queueName"
expect:
assertTraces(4 + (messageCount * 2)) {
trace(0, 1) {
rabbitSpan(it, "exchange.declare")
}
trace(1, 1) {
rabbitSpan(it, "queue.declare")
}
trace(2, 1) {
rabbitSpan(it, "queue.bind")
}
trace(3, 1) {
rabbitSpan(it, "basic.consume")
}
(1..messageCount).each {
def publishSpan = null
trace(2 + (it * 2), 1) {
publishSpan = span(0)
rabbitSpan(it, "basic.publish $exchangeName -> <all>")
}
trace(3 + (it * 2), 1) {
rabbitSpan(it, resource, publishSpan)
}
}
}
deliveries == (1..messageCount).collect { "msg $it" }
where:
exchangeName = "some-exchange"
messageCount << (1..4)
}
def "test rabbit error (#command)"() {
when:
closure.call(channel)
then:
def throwable = thrown(exception)
and:
assertTraces(1) {
trace(0, 1) {
rabbitSpan(it, command, null, throwable, errorMsg)
}
}
where:
command | exception | errorMsg | closure
"exchange.declare" | IOException | null | {
it.exchangeDeclare("some-exchange", "invalid-type", true)
}
"Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | {
it.basicConsume(null, null)
}
"basic.get <generated>" | IOException | null | {
it.basicGet("amq.gen-invalid-channel", true)
}
}
def "test spring rabbit"() {
setup:
def connectionFactory = new CachingConnectionFactory(rabbitmqAddress.hostName, rabbitmqAddress.port)
AmqpAdmin admin = new RabbitAdmin(connectionFactory)
def queue = new Queue("some-routing-queue", false, true, true, null)
admin.declareQueue(queue)
AmqpTemplate template = new RabbitTemplate(connectionFactory)
template.convertAndSend(queue.name, "foo")
String message = (String) template.receiveAndConvert(queue.name)
expect:
message == "foo"
and:
assertTraces(3) {
trace(0, 1) {
rabbitSpan(it, "queue.declare")
}
trace(1, 1) {
rabbitSpan(it, "basic.publish <default> -> some-routing-queue")
}
trace(2, 1) {
rabbitSpan(it, "basic.get $queue.name", TEST_WRITER[1][0])
}
}
}
def rabbitSpan(TraceAssert trace, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) {
rabbitSpan(trace, 0, resource, parentSpan, exception, errorMsg)
}
def rabbitSpan(TraceAssert trace, int index, String resource, DDSpan parentSpan = null, Throwable exception = null, String errorMsg = null) {
trace.span(index) {
serviceName "rabbitmq"
operationName "amqp.command"
resourceName resource
if (parentSpan) {
childOf parentSpan
} else {
parent()
}
errored exception != null
tags {
if (exception) {
errorTags(exception.class, errorMsg)
}
"$Tags.COMPONENT.key" "rabbitmq-amqp"
"$Tags.PEER_HOSTNAME.key" { it == null || it instanceof String }
"$Tags.PEER_PORT.key" { it == null || it instanceof Integer }
switch (tag("amqp.command")) {
case "basic.publish":
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_PRODUCER
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_PRODUCER
"amqp.command" "basic.publish"
"amqp.exchange" { it == null || it == "some-exchange" }
"amqp.routing_key" {
it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-")
}
"amqp.delivery_mode" { it == null || it == 2 }
"message.size" Integer
break
case "basic.get":
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER
"amqp.command" "basic.get"
"amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") }
"message.size" { it == null || it instanceof Integer }
break
case "basic.deliver":
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CONSUMER
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CONSUMER
"amqp.command" "basic.deliver"
"span.origin.type" "RabbitMQTest\$1"
"amqp.exchange" "some-exchange"
"message.size" Integer
break
default:
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.MESSAGE_CLIENT
"amqp.command" { it == null || it == resource }
}
defaultTags()
}
}
}
}

View File

@ -32,7 +32,7 @@ dependencies {
testCompile project(':dd-java-agent:testing')
testCompile group: 'net.spy', name: 'spymemcached', version: '2.12.0'
testCompile group: 'org.testcontainers', name: 'testcontainers', version: '1.7.3'
testCompile deps.testcontainers
}
configurations.latestDepTestCompile {

View File

@ -13,6 +13,8 @@ import datadog.trace.common.writer.ListWriter;
import datadog.trace.common.writer.Writer;
import groovy.lang.Closure;
import groovy.lang.DelegatesTo;
import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.SimpleType;
import io.opentracing.Tracer;
import java.lang.instrument.ClassFileTransformer;
import java.lang.instrument.Instrumentation;
@ -160,6 +162,9 @@ public abstract class AgentTestRunner extends Specification {
public static void assertTraces(
final int size,
@ClosureParams(
value = SimpleType.class,
options = "datadog.trace.agent.test.asserts.ListWriterAssert")
@DelegatesTo(value = ListWriterAssert.class, strategy = Closure.DELEGATE_FIRST)
final Closure spec) {
ListWriterAssert.assertTraces(TEST_WRITER, size, spec);

View File

@ -2,6 +2,8 @@ package datadog.trace.agent.test.asserts
import datadog.opentracing.DDSpan
import datadog.trace.common.writer.ListWriter
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import org.codehaus.groovy.runtime.powerassert.PowerAssertionError
import org.spockframework.runtime.Condition
import org.spockframework.runtime.ConditionNotSatisfiedError
@ -20,6 +22,7 @@ class ListWriterAssert {
}
static void assertTraces(ListWriter writer, int expectedSize,
@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.ListWriterAssert'])
@DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
try {
writer.waitForTraces(expectedSize)
@ -55,6 +58,7 @@ class ListWriterAssert {
}
void trace(int index, int expectedSize,
@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert'])
@DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index)

View File

@ -1,6 +1,8 @@
package datadog.trace.agent.test.asserts
import datadog.opentracing.DDSpan
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import static TagsAssert.assertTags
@ -12,6 +14,7 @@ class SpanAssert {
}
static void assertSpan(DDSpan span,
@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert'])
@DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
def asserter = new SpanAssert(span)
def clone = (Closure) spec.clone()
@ -72,7 +75,8 @@ class SpanAssert {
assert span.isError() == errored
}
void tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
void tags(@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert'])
@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
assertTags(span, spec)
}
}

View File

@ -1,16 +1,19 @@
package datadog.trace.agent.test.asserts
import datadog.opentracing.DDSpan
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
class TagsAssert {
private final Map<String, Object> tags
private final Set<String> assertedTags = new TreeSet<>()
private TagsAssert(DDSpan span) {
this.tags = new TreeMap(span.tags)
this.tags = span.tags
}
static void assertTags(DDSpan span,
@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TagsAssert'])
@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
def asserter = new TagsAssert(span)
def clone = (Closure) spec.clone()
@ -56,14 +59,23 @@ class TagsAssert {
}
}
def tag(String name) {
return tags[name]
}
def methodMissing(String name, args) {
if (args.length != 1) {
if (args.length == 0) {
throw new IllegalArgumentException(args.toString())
}
tag(name, args[0])
}
void assertTagsAllVerified() {
assert tags.keySet() == assertedTags
def set = new TreeMap<>(tags).keySet()
set.removeAll(assertedTags)
// The primary goal is to ensure the set is empty.
// tags and assertedTags are included via an "always true" comparison
// so they provide better context in the error message.
assert tags.entrySet() != assertedTags && set.isEmpty()
}
}

View File

@ -1,6 +1,8 @@
package datadog.trace.agent.test.asserts
import datadog.opentracing.DDSpan
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import static SpanAssert.assertSpan
@ -15,6 +17,7 @@ class TraceAssert {
}
static void assertTrace(List<DDSpan> trace, int expectedSize,
@ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.TraceAssert'])
@DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) {
assert trace.size() == expectedSize
def asserter = new TraceAssert(trace)
@ -29,7 +32,7 @@ class TraceAssert {
trace.get(index)
}
void span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
void span(int index, @ClosureParams(value = SimpleType, options = ['datadog.trace.agent.test.asserts.SpanAssert']) @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index)
}

View File

@ -15,6 +15,7 @@ public class DDSpanTypes {
public static final String MEMCACHED = "memcached";
public static final String ELASTICSEARCH = "elasticsearch";
public static final String MESSAGE_CLIENT = "queue";
public static final String MESSAGE_CONSUMER = "queue";
public static final String MESSAGE_PRODUCER = "queue";
}

View File

@ -18,7 +18,7 @@ public class HTTPCodec implements Codec<TextMap> {
// uint 64 bits max value, 2^64 - 1
static final BigInteger BIG_INTEGER_UINT64_MAX =
(new BigInteger("2")).pow(64).subtract(BigInteger.ONE);
new BigInteger("2").pow(64).subtract(BigInteger.ONE);
private static final String OT_BAGGAGE_PREFIX = "ot-baggage-";
private static final String TRACE_ID_KEY = "x-datadog-trace-id";
@ -120,16 +120,16 @@ public class HTTPCodec implements Codec<TextMap> {
* @return the ID in String format if it passes validations
* @throws IllegalArgumentException if val is not a number or if the number is out of range
*/
private String validateUInt64BitsID(String val) throws IllegalArgumentException {
private String validateUInt64BitsID(final String val) throws IllegalArgumentException {
try {
BigInteger validate = new BigInteger(val);
final BigInteger validate = new BigInteger(val);
if (validate.compareTo(BigInteger.ZERO) == -1
|| validate.compareTo(BIG_INTEGER_UINT64_MAX) == 1) {
throw new IllegalArgumentException(
"ID out of range, must be between 0 and 2^64-1, got: " + val);
}
return val;
} catch (NumberFormatException nfe) {
} catch (final NumberFormatException nfe) {
throw new IllegalArgumentException(
"Expecting a number for trace ID or span ID, but got: " + val, nfe);
}

View File

@ -50,6 +50,7 @@ ext {
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"),
groovy : "org.codehaus.groovy:groovy-all:${versions.groovy}",
junit : "junit:junit:${versions.junit}",
testcontainers : "org.testcontainers:testcontainers:1.7.3",
testLogging : [
dependencies.create(group: 'ch.qos.logback', name: 'logback-classic', version: versions.logback),
dependencies.create(group: 'org.slf4j', name: 'log4j-over-slf4j', version: versions.slf4j),

View File

@ -49,6 +49,7 @@ include ':dd-java-agent:instrumentation:netty-4.1'
include ':dd-java-agent:instrumentation:okhttp-3'
include ':dd-java-agent:instrumentation:osgi-classloading'
include ':dd-java-agent:instrumentation:play-2.4'
include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7'
include ':dd-java-agent:instrumentation:ratpack-1.4'
include ':dd-java-agent:instrumentation:servlet-2'
include ':dd-java-agent:instrumentation:servlet-3'