Add instrumentation for MessageListener

This commit is contained in:
Tyler Benson 2017-11-17 16:01:26 -08:00
parent 871ce37f80
commit 9ba647a2ac
5 changed files with 405 additions and 4 deletions

View File

@ -0,0 +1,84 @@
package dd.inst.jms1;
import static com.datadoghq.agent.integration.JmsUtil.toResourceName;
import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses;
import static dd.trace.ExceptionHandlers.defaultExceptionHandler;
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.datadoghq.agent.integration.MessagePropertyTextMap;
import com.datadoghq.trace.DDTags;
import com.google.auto.service.AutoService;
import dd.trace.Instrumenter;
import io.opentracing.ActiveSpan;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import javax.jms.Message;
import javax.jms.MessageListener;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
@AutoService(Instrumenter.class)
public final class JMS1MessageListenerInstrumentation implements Instrumenter {
@Override
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
return agentBuilder
.type(
not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))),
not(classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener")))
.transform(
new AgentBuilder.Transformer.ForAdvice()
.advice(
named("onMessage")
.and(takesArgument(0, named("javax.jms.Message")))
.and(isPublic()),
MessageListenerAdvice.class.getName())
.withExceptionHandler(defaultExceptionHandler()))
.asDecorator();
}
public static class MessageListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ActiveSpan startSpan(
@Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) {
final SpanContext extractedContext =
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
final ActiveSpan span =
GlobalTracer.get()
.buildSpan("jms.onMessage")
.asChildOf(extractedContext)
.withTag(DDTags.SERVICE_NAME, "jms")
.withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null))
.withTag(Tags.COMPONENT.getKey(), "jms1")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag("span.origin.type", listener.getClass().getName())
.startActive();
return span;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap("error.object", throwable));
}
span.deactivate();
}
}
}
}

View File

@ -12,6 +12,7 @@ import spock.lang.Unroll
import javax.jms.Connection
import javax.jms.Session
import javax.jms.TextMessage
import java.util.concurrent.atomic.AtomicReference
class JMS1Test extends Specification {
@ -46,7 +47,7 @@ class JMS1Test extends Specification {
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
def message = session.createTextMessage("a message")
writer.start()
producer.send(message)
TextMessage receivedMessage = consumer.receive()
@ -157,4 +158,126 @@ class JMS1Test extends Specification {
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
@Unroll
def "sending to a MessageListener on #resourceName generates a span"() {
setup:
def messageRef = new AtomicReference<TextMessage>()
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
consumer.setMessageListener { message ->
Thread.sleep(5) // Slow things down a bit.
messageRef.set(message)
}
def message = session.createTextMessage("a message")
producer.send(message)
writer.waitForTraces(2)
expect:
messageRef.get().text == "a message"
writer.size() == 2
and: // producer trace
def trace = writer.firstTrace()
trace.size() == 3
and: // span 0
def span0 = trace[0]
span0.context().operationName == "jms.produce"
span0.serviceName == "jms"
span0.resourceName == "Produced for $resourceName"
span0.type == null
!span0.context().getErrorFlag()
span0.context().parentId == 0
def tags0 = span0.context().tags
tags0["span.kind"] == "producer"
tags0["component"] == "jms1"
tags0["span.origin.type"] == ActiveMQMessageProducer.name
tags0["thread.name"] != null
tags0["thread.id"] != null
tags0.size() == 5
and: // span 1
def span1 = trace[1]
span1.context().operationName == "jms.produce"
span1.serviceName == "jms"
span1.resourceName == "Produced for $resourceName"
span1.type == null
!span1.context().getErrorFlag()
span1.context().parentId == span0.context().spanId
def tags1 = span1.context().tags
tags1["span.kind"] == "producer"
tags1["component"] == "jms1"
tags1["span.origin.type"] == ActiveMQMessageProducer.name
tags1["thread.name"] != null
tags1["thread.id"] != null
tags1.size() == 5
and: // span 2
def span2 = trace[2]
span2.context().operationName == "jms.produce"
span2.serviceName == "jms"
span2.resourceName == "Produced for $resourceName"
span2.type == null
!span2.context().getErrorFlag()
span2.context().parentId == span1.context().spanId
def tags2 = span2.context().tags
tags2["span.kind"] == "producer"
tags2["component"] == "jms1"
tags2["span.origin.type"] == ActiveMQMessageProducer.name
tags2["thread.name"] != null
tags2["thread.id"] != null
tags2.size() == 5
and: // consumer trace
def consumerTrace = writer.get(1)
consumerTrace.size() == 1
def consumerSpan = consumerTrace[0]
consumerSpan.context().operationName == "jms.onMessage"
consumerSpan.serviceName == "jms"
consumerSpan.resourceName == "Received from $resourceName"
consumerSpan.type == null
!consumerSpan.context().getErrorFlag()
consumerSpan.context().parentId == span2.context().spanId
def consumerTags = consumerSpan.context().tags
consumerTags["span.kind"] == "consumer"
consumerTags["component"] == "jms1"
consumerTags["span.origin.type"] != null
consumerTags["thread.name"] != null
consumerTags["thread.id"] != null
consumerTags.size() == 5
cleanup:
producer.close()
consumer.close()
where:
destination | resourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
}

View File

@ -0,0 +1,84 @@
package dd.inst.jms2;
import static com.datadoghq.agent.integration.JmsUtil.toResourceName;
import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses;
import static dd.trace.ExceptionHandlers.defaultExceptionHandler;
import static net.bytebuddy.matcher.ElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.datadoghq.agent.integration.MessagePropertyTextMap;
import com.datadoghq.trace.DDTags;
import com.google.auto.service.AutoService;
import dd.trace.Instrumenter;
import io.opentracing.ActiveSpan;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import javax.jms.Message;
import javax.jms.MessageListener;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.asm.Advice;
@AutoService(Instrumenter.class)
public final class JMS2MessageListenerInstrumentation implements Instrumenter {
@Override
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
return agentBuilder
.type(
not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))),
classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener"))
.transform(
new AgentBuilder.Transformer.ForAdvice()
.advice(
named("onMessage")
.and(takesArgument(0, named("javax.jms.Message")))
.and(isPublic()),
MessageListenerAdvice.class.getName())
.withExceptionHandler(defaultExceptionHandler()))
.asDecorator();
}
public static class MessageListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ActiveSpan startSpan(
@Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) {
final SpanContext extractedContext =
GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message));
final ActiveSpan span =
GlobalTracer.get()
.buildSpan("jms.onMessage")
.asChildOf(extractedContext)
.withTag(DDTags.SERVICE_NAME, "jms")
.withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null))
.withTag(Tags.COMPONENT.getKey(), "jms2")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag("span.origin.type", listener.getClass().getName())
.startActive();
return span;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) {
if (span != null) {
if (throwable != null) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap("error.object", throwable));
}
span.deactivate();
}
}
}
}

View File

@ -21,6 +21,7 @@ import spock.lang.Unroll
import javax.jms.Session
import javax.jms.TextMessage
import java.util.concurrent.atomic.AtomicReference
class JMS2Test extends Specification {
@ -80,7 +81,6 @@ class JMS2Test extends Specification {
def consumer = session.createConsumer(destination)
def message = session.createTextMessage("a message")
writer.start()
producer.send(message)
TextMessage receivedMessage = consumer.receive()
@ -148,4 +148,83 @@ class JMS2Test extends Specification {
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
@Unroll
def "sending to a MessageListener on #resourceName generates a span"() {
setup:
def messageRef = new AtomicReference<TextMessage>()
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)
consumer.setMessageListener { message ->
Thread.sleep(5) // Slow things down a bit.
messageRef.set(message)
}
def message = session.createTextMessage("a message")
producer.send(message)
writer.waitForTraces(2)
expect:
messageRef.get().text == "a message"
writer.size() == 2
and: // producer trace
def trace = writer.firstTrace()
trace.size() == 1
def producerSpan = trace[0]
producerSpan.context().operationName == "jms.produce"
producerSpan.serviceName == "jms"
producerSpan.resourceName == "Produced for $resourceName"
producerSpan.type == null
!producerSpan.context().getErrorFlag()
producerSpan.context().parentId == 0
def producerTags = producerSpan.context().tags
producerTags["span.kind"] == "producer"
producerTags["component"] == "jms2"
producerTags["span.origin.type"] == HornetQMessageProducer.name
producerTags["thread.name"] != null
producerTags["thread.id"] != null
producerTags.size() == 5
and: // consumer trace
def consumerTrace = writer.get(1)
consumerTrace.size() == 1
def consumerSpan = consumerTrace[0]
consumerSpan.context().operationName == "jms.onMessage"
consumerSpan.serviceName == "jms"
consumerSpan.resourceName == "Received from $resourceName"
consumerSpan.type == null
!consumerSpan.context().getErrorFlag()
consumerSpan.context().parentId == producerSpan.context().spanId
def consumerTags = consumerSpan.context().tags
consumerTags["span.kind"] == "consumer"
consumerTags["component"] == "jms2"
consumerTags["span.origin.type"] != null
consumerTags["thread.name"] != null
consumerTags["thread.id"] != null
consumerTags.size() == 5
cleanup:
producer.close()
consumer.close()
where:
destination | resourceName
session.createQueue("someQueue") | "Queue someQueue"
session.createTopic("someTopic") | "Topic someTopic"
session.createTemporaryQueue() | "Temporary Queue"
session.createTemporaryTopic() | "Temporary Topic"
}
}

View File

@ -2,12 +2,15 @@ package com.datadoghq.trace.writer;
import com.datadoghq.trace.DDBaseSpan;
import com.datadoghq.trace.Service;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
/** List writer used by tests mostly */
public class ListWriter extends CopyOnWriteArrayList<List<DDBaseSpan<?>>> implements Writer {
private final List<CountDownLatch> latches = new LinkedList<>();
public List<List<DDBaseSpan<?>>> getList() {
return this;
@ -19,7 +22,27 @@ public class ListWriter extends CopyOnWriteArrayList<List<DDBaseSpan<?>>> implem
@Override
public void write(final List<DDBaseSpan<?>> trace) {
add(trace);
synchronized (latches) {
add(trace);
for (final CountDownLatch latch : latches) {
if (size() >= latch.getCount()) {
while (latch.getCount() > 0) {
latch.countDown();
}
}
}
}
}
public void waitForTraces(final int number) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(number);
synchronized (latches) {
if (size() >= number) {
return;
}
latches.add(latch);
}
latch.await();
}
@Override
@ -29,11 +52,19 @@ public class ListWriter extends CopyOnWriteArrayList<List<DDBaseSpan<?>>> implem
@Override
public void start() {
clear();
close();
}
@Override
public void close() {
clear();
synchronized (latches) {
for (final CountDownLatch latch : latches) {
while (latch.getCount() > 0) {
latch.countDown();
}
}
latches.clear();
}
}
}