From 9ba647a2ac99edb42bafd76cd55166ebe329ca42 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 17 Nov 2017 16:01:26 -0800 Subject: [PATCH] Add instrumentation for MessageListener --- .../JMS1MessageListenerInstrumentation.java | 84 ++++++++++++ .../jms-1/src/test/groovy/JMS1Test.groovy | 125 +++++++++++++++++- .../JMS2MessageListenerInstrumentation.java | 84 ++++++++++++ .../jms-2/src/test/groovy/JMS2Test.groovy | 81 +++++++++++- .../datadoghq/trace/writer/ListWriter.java | 35 ++++- 5 files changed, 405 insertions(+), 4 deletions(-) create mode 100644 dd-java-agent/integrations/jms-1/src/main/java/dd/inst/jms1/JMS1MessageListenerInstrumentation.java create mode 100644 dd-java-agent/integrations/jms-2/src/main/java/dd/inst/jms2/JMS2MessageListenerInstrumentation.java diff --git a/dd-java-agent/integrations/jms-1/src/main/java/dd/inst/jms1/JMS1MessageListenerInstrumentation.java b/dd-java-agent/integrations/jms-1/src/main/java/dd/inst/jms1/JMS1MessageListenerInstrumentation.java new file mode 100644 index 0000000000..f365ad2a8d --- /dev/null +++ b/dd-java-agent/integrations/jms-1/src/main/java/dd/inst/jms1/JMS1MessageListenerInstrumentation.java @@ -0,0 +1,84 @@ +package dd.inst.jms1; + +import static com.datadoghq.agent.integration.JmsUtil.toResourceName; +import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses; +import static dd.trace.ExceptionHandlers.defaultExceptionHandler; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.datadoghq.agent.integration.MessagePropertyTextMap; +import com.datadoghq.trace.DDTags; +import com.google.auto.service.AutoService; +import dd.trace.Instrumenter; +import io.opentracing.ActiveSpan; +import io.opentracing.SpanContext; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import javax.jms.Message; +import javax.jms.MessageListener; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; + +@AutoService(Instrumenter.class) +public final class JMS1MessageListenerInstrumentation implements Instrumenter { + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))), + not(classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener"))) + .transform( + new AgentBuilder.Transformer.ForAdvice() + .advice( + named("onMessage") + .and(takesArgument(0, named("javax.jms.Message"))) + .and(isPublic()), + MessageListenerAdvice.class.getName()) + .withExceptionHandler(defaultExceptionHandler())) + .asDecorator(); + } + + public static class MessageListenerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ActiveSpan startSpan( + @Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) { + + final SpanContext extractedContext = + GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message)); + + final ActiveSpan span = + GlobalTracer.get() + .buildSpan("jms.onMessage") + .asChildOf(extractedContext) + .withTag(DDTags.SERVICE_NAME, "jms") + .withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null)) + .withTag(Tags.COMPONENT.getKey(), "jms1") + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag("span.origin.type", listener.getClass().getName()) + .startActive(); + + return span; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) { + + if (span != null) { + if (throwable != null) { + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap("error.object", throwable)); + } + span.deactivate(); + } + } + } +} diff --git a/dd-java-agent/integrations/jms-1/src/test/groovy/JMS1Test.groovy b/dd-java-agent/integrations/jms-1/src/test/groovy/JMS1Test.groovy index 3e091b6881..78adb99c77 100644 --- a/dd-java-agent/integrations/jms-1/src/test/groovy/JMS1Test.groovy +++ b/dd-java-agent/integrations/jms-1/src/test/groovy/JMS1Test.groovy @@ -12,6 +12,7 @@ import spock.lang.Unroll import javax.jms.Connection import javax.jms.Session import javax.jms.TextMessage +import java.util.concurrent.atomic.AtomicReference class JMS1Test extends Specification { @@ -46,7 +47,7 @@ class JMS1Test extends Specification { def producer = session.createProducer(destination) def consumer = session.createConsumer(destination) def message = session.createTextMessage("a message") - writer.start() + producer.send(message) TextMessage receivedMessage = consumer.receive() @@ -157,4 +158,126 @@ class JMS1Test extends Specification { session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryTopic() | "Temporary Topic" } + + @Unroll + def "sending to a MessageListener on #resourceName generates a span"() { + setup: + def messageRef = new AtomicReference() + def producer = session.createProducer(destination) + def consumer = session.createConsumer(destination) + consumer.setMessageListener { message -> + Thread.sleep(5) // Slow things down a bit. + messageRef.set(message) + } + + def message = session.createTextMessage("a message") + producer.send(message) + writer.waitForTraces(2) + + expect: + messageRef.get().text == "a message" + writer.size() == 2 + + and: // producer trace + def trace = writer.firstTrace() + trace.size() == 3 + + and: // span 0 + def span0 = trace[0] + + span0.context().operationName == "jms.produce" + span0.serviceName == "jms" + span0.resourceName == "Produced for $resourceName" + span0.type == null + !span0.context().getErrorFlag() + span0.context().parentId == 0 + + + def tags0 = span0.context().tags + tags0["span.kind"] == "producer" + tags0["component"] == "jms1" + + tags0["span.origin.type"] == ActiveMQMessageProducer.name + + tags0["thread.name"] != null + tags0["thread.id"] != null + tags0.size() == 5 + + and: // span 1 + def span1 = trace[1] + + span1.context().operationName == "jms.produce" + span1.serviceName == "jms" + span1.resourceName == "Produced for $resourceName" + span1.type == null + !span1.context().getErrorFlag() + span1.context().parentId == span0.context().spanId + + + def tags1 = span1.context().tags + tags1["span.kind"] == "producer" + tags1["component"] == "jms1" + + tags1["span.origin.type"] == ActiveMQMessageProducer.name + + tags1["thread.name"] != null + tags1["thread.id"] != null + tags1.size() == 5 + + and: // span 2 + def span2 = trace[2] + + span2.context().operationName == "jms.produce" + span2.serviceName == "jms" + span2.resourceName == "Produced for $resourceName" + span2.type == null + !span2.context().getErrorFlag() + span2.context().parentId == span1.context().spanId + + + def tags2 = span2.context().tags + tags2["span.kind"] == "producer" + tags2["component"] == "jms1" + + tags2["span.origin.type"] == ActiveMQMessageProducer.name + + tags2["thread.name"] != null + tags2["thread.id"] != null + tags2.size() == 5 + + and: // consumer trace + def consumerTrace = writer.get(1) + consumerTrace.size() == 1 + + def consumerSpan = consumerTrace[0] + + consumerSpan.context().operationName == "jms.onMessage" + consumerSpan.serviceName == "jms" + consumerSpan.resourceName == "Received from $resourceName" + consumerSpan.type == null + !consumerSpan.context().getErrorFlag() + consumerSpan.context().parentId == span2.context().spanId + + + def consumerTags = consumerSpan.context().tags + consumerTags["span.kind"] == "consumer" + consumerTags["component"] == "jms1" + + consumerTags["span.origin.type"] != null + + consumerTags["thread.name"] != null + consumerTags["thread.id"] != null + consumerTags.size() == 5 + + cleanup: + producer.close() + consumer.close() + + where: + destination | resourceName + session.createQueue("someQueue") | "Queue someQueue" + session.createTopic("someTopic") | "Topic someTopic" + session.createTemporaryQueue() | "Temporary Queue" + session.createTemporaryTopic() | "Temporary Topic" + } } diff --git a/dd-java-agent/integrations/jms-2/src/main/java/dd/inst/jms2/JMS2MessageListenerInstrumentation.java b/dd-java-agent/integrations/jms-2/src/main/java/dd/inst/jms2/JMS2MessageListenerInstrumentation.java new file mode 100644 index 0000000000..6837c93f31 --- /dev/null +++ b/dd-java-agent/integrations/jms-2/src/main/java/dd/inst/jms2/JMS2MessageListenerInstrumentation.java @@ -0,0 +1,84 @@ +package dd.inst.jms2; + +import static com.datadoghq.agent.integration.JmsUtil.toResourceName; +import static dd.trace.ClassLoaderMatcher.classLoaderHasClasses; +import static dd.trace.ExceptionHandlers.defaultExceptionHandler; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.datadoghq.agent.integration.MessagePropertyTextMap; +import com.datadoghq.trace.DDTags; +import com.google.auto.service.AutoService; +import dd.trace.Instrumenter; +import io.opentracing.ActiveSpan; +import io.opentracing.SpanContext; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import javax.jms.Message; +import javax.jms.MessageListener; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; + +@AutoService(Instrumenter.class) +public final class JMS2MessageListenerInstrumentation implements Instrumenter { + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + not(isInterface()).and(hasSuperType(named("javax.jms.MessageListener"))), + classLoaderHasClasses("javax.jms.JMSContext", "javax.jms.CompletionListener")) + .transform( + new AgentBuilder.Transformer.ForAdvice() + .advice( + named("onMessage") + .and(takesArgument(0, named("javax.jms.Message"))) + .and(isPublic()), + MessageListenerAdvice.class.getName()) + .withExceptionHandler(defaultExceptionHandler())) + .asDecorator(); + } + + public static class MessageListenerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ActiveSpan startSpan( + @Advice.Argument(0) final Message message, @Advice.This final MessageListener listener) { + + final SpanContext extractedContext = + GlobalTracer.get().extract(Format.Builtin.TEXT_MAP, new MessagePropertyTextMap(message)); + + final ActiveSpan span = + GlobalTracer.get() + .buildSpan("jms.onMessage") + .asChildOf(extractedContext) + .withTag(DDTags.SERVICE_NAME, "jms") + .withTag(DDTags.RESOURCE_NAME, "Received from " + toResourceName(message, null)) + .withTag(Tags.COMPONENT.getKey(), "jms2") + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag("span.origin.type", listener.getClass().getName()) + .startActive(); + + return span; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final ActiveSpan span, @Advice.Thrown final Throwable throwable) { + + if (span != null) { + if (throwable != null) { + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap("error.object", throwable)); + } + span.deactivate(); + } + } + } +} diff --git a/dd-java-agent/integrations/jms-2/src/test/groovy/JMS2Test.groovy b/dd-java-agent/integrations/jms-2/src/test/groovy/JMS2Test.groovy index e47ad3fe5a..584af643d1 100644 --- a/dd-java-agent/integrations/jms-2/src/test/groovy/JMS2Test.groovy +++ b/dd-java-agent/integrations/jms-2/src/test/groovy/JMS2Test.groovy @@ -21,6 +21,7 @@ import spock.lang.Unroll import javax.jms.Session import javax.jms.TextMessage +import java.util.concurrent.atomic.AtomicReference class JMS2Test extends Specification { @@ -80,7 +81,6 @@ class JMS2Test extends Specification { def consumer = session.createConsumer(destination) def message = session.createTextMessage("a message") - writer.start() producer.send(message) TextMessage receivedMessage = consumer.receive() @@ -148,4 +148,83 @@ class JMS2Test extends Specification { session.createTemporaryQueue() | "Temporary Queue" session.createTemporaryTopic() | "Temporary Topic" } + + @Unroll + def "sending to a MessageListener on #resourceName generates a span"() { + setup: + def messageRef = new AtomicReference() + def producer = session.createProducer(destination) + def consumer = session.createConsumer(destination) + consumer.setMessageListener { message -> + Thread.sleep(5) // Slow things down a bit. + messageRef.set(message) + } + + def message = session.createTextMessage("a message") + producer.send(message) + writer.waitForTraces(2) + + expect: + messageRef.get().text == "a message" + writer.size() == 2 + + and: // producer trace + def trace = writer.firstTrace() + trace.size() == 1 + + def producerSpan = trace[0] + + producerSpan.context().operationName == "jms.produce" + producerSpan.serviceName == "jms" + producerSpan.resourceName == "Produced for $resourceName" + producerSpan.type == null + !producerSpan.context().getErrorFlag() + producerSpan.context().parentId == 0 + + + def producerTags = producerSpan.context().tags + producerTags["span.kind"] == "producer" + producerTags["component"] == "jms2" + + producerTags["span.origin.type"] == HornetQMessageProducer.name + + producerTags["thread.name"] != null + producerTags["thread.id"] != null + producerTags.size() == 5 + + and: // consumer trace + def consumerTrace = writer.get(1) + consumerTrace.size() == 1 + + def consumerSpan = consumerTrace[0] + + consumerSpan.context().operationName == "jms.onMessage" + consumerSpan.serviceName == "jms" + consumerSpan.resourceName == "Received from $resourceName" + consumerSpan.type == null + !consumerSpan.context().getErrorFlag() + consumerSpan.context().parentId == producerSpan.context().spanId + + + def consumerTags = consumerSpan.context().tags + consumerTags["span.kind"] == "consumer" + consumerTags["component"] == "jms2" + + consumerTags["span.origin.type"] != null + + consumerTags["thread.name"] != null + consumerTags["thread.id"] != null + consumerTags.size() == 5 + + cleanup: + producer.close() + consumer.close() + + where: + destination | resourceName + session.createQueue("someQueue") | "Queue someQueue" + session.createTopic("someTopic") | "Topic someTopic" + session.createTemporaryQueue() | "Temporary Queue" + session.createTemporaryTopic() | "Temporary Topic" + } } diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java index a4311c522b..316f3e0ecf 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java @@ -2,12 +2,15 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.Service; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList>> implements Writer { + private final List latches = new LinkedList<>(); public List>> getList() { return this; @@ -19,7 +22,27 @@ public class ListWriter extends CopyOnWriteArrayList>> implem @Override public void write(final List> trace) { - add(trace); + synchronized (latches) { + add(trace); + for (final CountDownLatch latch : latches) { + if (size() >= latch.getCount()) { + while (latch.getCount() > 0) { + latch.countDown(); + } + } + } + } + } + + public void waitForTraces(final int number) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(number); + synchronized (latches) { + if (size() >= number) { + return; + } + latches.add(latch); + } + latch.await(); } @Override @@ -29,11 +52,19 @@ public class ListWriter extends CopyOnWriteArrayList>> implem @Override public void start() { - clear(); + close(); } @Override public void close() { clear(); + synchronized (latches) { + for (final CountDownLatch latch : latches) { + while (latch.getCount() > 0) { + latch.countDown(); + } + } + latches.clear(); + } } }